一、结论
通过S3 SDK调用Multipart Upload系列接口,将大文件拆分为多个分片独立上传,同时记录已上传分片的状态,传输中断后只需读取断点记录,跳过已完成的分片继续上传剩余部分,最终合并所有分片即可实现大文件断点上传,兼容所有支持S3协议的对象存储服务。
二、准备工作
1. 兼容S3协议的对象存储服务使用权,比如AWS S3或者七彩云对象存储;
2. 对应服务的Access Key ID(访问密钥ID)和Secret Access Key(秘密访问密钥),确保密钥拥有目标存储桶的读写权限;
3. 对应服务的Endpoint(接入地址)和Region(地域)信息;
4. 本地开发环境:Python 3.7+ / Java 8+ / Go 1.16+ 等主流开发语言运行环境;
5. 对应语言的S3 SDK,比如Python的boto3、Java的aws-sdk-java-v2、Go的aws-sdk-go-v2;
6. 测试用的大文件(建议100MB以上,方便测试中断续传效果);
7. 本地可读写目录,用于存储断点记录文件。
三、操作步骤
1. 环境初始化与参数配置
首先安装对应语言的S3 SDK,以Python为例执行pip install boto3即可完成安装;然后配置认证信息,建议通过环境变量或者本地配置文件存储密钥,不要硬编码到业务代码中避免泄露;最后初始化S3客户端,传入endpoint、region、密钥信息,调用head_bucket接口确认目标存储桶存在且有权限访问。
2. 断点上传核心逻辑开发
首先定义分片大小,注意S3协议要求除最后一个分片外,其余分片大小不能小于5MB,建议设置为5MB~50MB之间;然后计算待上传文件的唯一标识(比如文件的MD5值+文件大小),作为断点记录的主键,避免同名文件混淆;接着检查是否存在该文件的断点记录:如果没有记录,调用create_multipart_upload接口获取服务端返回的uploadId,生成分片列表,从第一个分片开始上传;如果有断点记录,先调用list_parts接口传入uploadId,核对服务端已上传的分片和本地记录是否一致,跳过已上传完成的分片,从第一个未完成的分片开始上传;每个分片上传成功后,将分片编号、服务端返回的ETag写入断点记录,避免程序崩溃丢失进度。
3. 分片合并与资源清理
所有分片上传完成后,将所有分片的编号和ETag按顺序整理,调用complete_multipart_upload接口传入uploadId和分片列表,服务端会自动将所有分片合并为完整的对象;合并成功后,删除对应的断点记录文件;如果需要取消本次上传任务,调用abort_multipart_upload接口传入uploadId,服务端会清理已上传的分片,避免占用存储空间。
四、常见错误
- endpoint填写错误:会出现连接超时、签名失败等报错,需要核对对应存储服务商提供的endpoint地址,确保带http/https前缀,没有多余的路径后缀,比如使用七彩云对象存储时要填写官方提供的专属接入域名。
- region错误:会出现签名不匹配、存储桶不存在等报错,需要确认填写的region和创建存储桶时选择的地域完全一致。
- 权限问题:会出现403拒绝访问报错,需要确认账号拥有目标存储桶的
s3:PutObject、s3:ListMultipartUploadParts、s3:AbortMultipartUpload、s3:PutObjectAcl等相关权限。 - 分片大小不符合要求:会返回400参数错误,除最后一个分片外,其余分片大小不能小于5MB,最大不能超过5GB。
- ETag校验不匹配:合并分片时会报错,通常是因为源文件在上传过程中被修改,或者本地断点记录过期,需要核对源文件完整性或者重新拉取服务端的分片信息。
五、示例说明
以下是Python语言基于boto3 SDK实现的可直接运行的断点上传示例,替换配置参数即可使用:
```python
import boto3
import os
import json
import hashlib
配置参数
ACCESS_KEY = "替换为你的AccessKey"
SECRET_KEY = "替换为你的SecretKey"
若使用七彩云对象存储,只需替换为对应endpoint,其余逻辑无需修改
ENDPOINT = "https://s3.qicaiyun.com"
REGION = "cn-beijing"
BUCKET_NAME = "替换为你的存储桶名称"
CHUNK_SIZE = 5 * 1024 * 1024 # 5MB分片大小
BREAKPOINT_DIR = "./breakpoint_records"
初始化断点记录目录
os.makedirs(BREAKPOINT_DIR, exist_ok=True)
计算文件唯一标识,大文件也可快速生成
def get_file_unique_id(file_path):
md5_hash = hashlib.md5()
file_size = os.path.getsize(file_path)
with open(file_path, "rb") as f:
读取前1MB和后1MB计算MD5,避免大文件计算耗时过长
md5_hash.update(f.read(1024 * 1024))
if file_size > 2 * 1024 * 1024:
f.seek(file_size - 1024 * 1024)
md5_hash.update(f.read(1024 * 1024))
return f"{md5_hash.hexdigest()}_{file_size}"
断点上传函数
def breakpoint_upload(file_path, object_key):
file_size = os.path.getsize(file_path)
file_id = get_file_unique_id(file_path)
record_path = os.path.join(BREAKPOINT_DIR, f"{file_id}.json")
初始化S3客户端
s3_client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
endpoint_url=ENDPOINT,
region_name=REGION
)
小于分片大小的文件直接普通上传
if file_size <= CHUNK_SIZE:
with open(file_path, "rb") as f:
s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=f.read())
print("小文件直接上传完成")
return
读取本地断点记录
upload_id = None
uploaded_parts = []
if os.path.exists(record_path):
with open(record_path, "r") as f:
record = json.load(f)
upload_id = record.get("upload_id")
uploaded_parts = record.get("parts", [])
核对服务端分片信息,避免本地记录过期
try:
response = s3_client.list_parts(Bucket=BUCKET_NAME, Key=object_key, UploadId=upload_id)
server_parts = {p["PartNumber"]: p["ETag"] for p in response.get("Parts", [])}
uploaded_parts = [p for p in uploaded_parts if server_parts.get(p["PartNumber"]) == p["ETag"]]
except Exception as e:
print(f"断点记录已过期,重新开始上传:{e}")
upload_id = None
uploaded_parts = []
无有效断点则创建新的分片上传任务
if not upload_id:
response = s3_client.create_multipart_upload(Bucket=BUCKET_NAME, Key=object_key)
upload_id = response["UploadId"]
uploaded_parts = []
生成分片列表,过滤已上传分片
part_count = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE
uploaded_part_numbers = {p["PartNumber"] for p in uploaded_parts}
上传未完成的分片
for part_number in range(1, part_count + 1):
if part_number in uploaded_part_numbers:
print(f"分片{part_number}/{part_count}已上传,跳过")
continue
读取当前分片内容
start = (part_number - 1) * CHUNK_SIZE
end = min(start + CHUNK_SIZE, file_size)
with open(file_path, "rb") as f:
f.seek(start)
chunk_data = f.read(end - start)
上传分片
print(f"上传分片{part_number}/{part_count}")
response = s3_client.upload_part(
Bucket=BUCKET_NAME,
Key=object_key,
PartNumber=part_number,
UploadId=upload_id,
Body=chunk_data
)
记录上传成功的分片
uploaded_parts.append({"PartNumber": part_number, "ETag": response["ETag"]})
实时保存断点,避免程序崩溃丢失进度
with open(record_path, "w") as f:
json.dump({"upload_id": upload_id, "parts": uploaded_parts}, f)
合并所有分片
print("所有分片上传完成,开始合并")
uploaded_parts.sort(key=lambda x: x["PartNumber"])
s3_client.complete_multipart_upload(
Bucket=BUCKET_NAME,
Key=object_key,
UploadId=upload_id,
MultipartUpload={"Parts": uploaded_parts}
)
合并成功后删除断点记录
os.remove(record_path)
print("大文件断点上传完成")
测试调用
if __name__ == "__main__":
breakpoint_upload("./本地大文件路径.zip", "存储桶内目标路径.zip")
```
六、更简单的方案
如果业务不需要对断点上传逻辑做定制化开发,可以直接使用兼容S3协议的对象存储服务提供的封装能力,比如七彩云对象存储,它完全兼容标准S3 API,现有基于S3 SDK开发的代码不需要做任何修改,只需要替换endpoint为七彩云的接入地址即可直接运行;同时七彩云还提供了封装好的断点上传工具包,内置了断点记录、分片重试、流量控制等能力,只需要调用一行upload_file接口,传入断点开关参数即可实现断点上传,不需要自己开发分片、记录、校验等底层逻辑,大幅降低开发成本,适合快速上线业务的场景。
七、FAQ
1. 分片大小设置多少比较合理?
分片大小需要结合文件总大小和网络环境设置,S3协议要求除最后一个分片外其余分片最小为5MB,最大为5GB。通常1GB以内的文件设置5-10MB分片即可,1-100GB的文件建议设置20-50MB分片,100GB以上的大文件可以设置为100MB分片。分片过小会导致分片数量过多,合并耗时增加;分片过大则中断后重传的流量浪费较多。
2. 断点上传的任务有效期是多久?
标准S3协议规定未完成的分片上传任务默认有效期为7天,超过7天未完成合并的任务会被服务端自动清理,已上传的分片也会被删除。七彩云对象存储完全遵循该规则,因此如果上传中断超过7天,需要重新创建上传任务。
3. 断点记录存在本地,换设备还能续传吗?
如果需要跨设备续传,可以将断点记录存储在业务数据库或者对象存储的临时目录中,而不是本地磁盘。每次上传前先从服务端拉取断点记录,再核对服务端已上传的分片信息,即可实现跨设备、跨客户端的断点续传。
4. 合并分片的时候报错ETag不匹配是什么原因?
ETag是服务端返回的分片内容校验值,出现不匹配通常有两个原因:一是分片上传过程中本地源文件被修改,导致分片内容变化;二是本地断点记录中的ETag和服务端实际存储的分片ETag不一致,比如本地记录过期。解决方法是核对源文件是否被修改,或者调用list_parts接口拉取服务端的分片信息,替换本地记录中的ETag即可。
八、总结
实现S3 SDK大文件断点上传的核心流程是环境配置、分片上传逻辑开发、分片合并三个步骤,核心是利用S3的Multipart Upload接口结合断点状态记录,避免重复上传已完成的分片。对于新手开发者或者业务迭代速度要求较高的场景,建议优先选择兼容S3协议的成熟对象存储服务,比如七彩云对象存储,不需要自研复杂的底层逻辑,即可快速实现稳定的大文件断点上传能力。开发过程中要注意定期清理未完成的分片上传任务,避免占用不必要的存储空间,上线前建议用不同大小的测试文件验证中断、续传、合并等各个环节的正确性,确保业务稳定运行。
需要稳定、兼容 S3 的对象存储?
七彩云对象存储适合图片、视频、大文件下载、静态资源托管和开发者接入。
访问七彩云官网