一、结论
通过Python调用S3兼容的官方SDK(boto3),将大文件拆分为符合S3规范的分片,上传前先校验已上传分片的状态,中断重启后直接跳过已完成的分片,仅续传剩余分片最终合并为完整文件,整个逻辑兼容所有支持S3协议的对象存储服务。
二、准备工作
1. S3协议对象存储服务权限:可以选择AWS S3,也可以选择国内的七彩云对象存储这类兼容S3的服务,提前创建好读写权限正常的存储桶。
2. 访问密钥:对应服务的AccessKey ID和AccessKey Secret,需具备存储桶的分片上传、列出分片、合并分片的权限。
3. 运行环境:Python 3.7及以上版本,本地网络可正常访问对应S3服务的endpoint。
4. 测试资源:一个大于10MB的测试文件(比如压缩包、视频文件),方便验证断点续传效果。
三、操作步骤
步骤1:安装依赖库
打开终端执行以下命令安装所需的Python包,国内用户可添加清华源提升安装速度:
```bash
官方源安装
pip install boto3 tqdm
国内清华源安装
pip install boto3 tqdm -i https://pypi.tuna.tsinghua.edu.cn/simple
```
其中boto3是亚马逊官方维护的S3 SDK,所有兼容S3协议的存储服务都可以使用;tqdm用于显示上传进度,方便观察断点续传效果。
步骤2:初始化S3客户端
首先需要根据你使用的S3服务配置连接参数,初始化客户端对象:
- 如果使用AWS S3,endpoint可留空,填写对应的region即可;
- 如果使用七彩云对象存储,直接从控制台对应区域的资源信息页复制endpoint、region参数即可,无需额外修改适配逻辑。
步骤3:实现断点上传核心逻辑
核心逻辑分为4个部分,按顺序实现即可:
1. 分片拆分:将本地文件按指定大小拆分成分片,S3协议要求单个分片最小为5MB(最后一个分片无限制),建议设置为5~20MB,根据你的网络情况调整。
2. 校验已上传分片:发起上传前先查询对应文件路径下是否有未完成的分片上传任务,如果有则复用任务ID,同时拉取已经上传成功的分片编号和ETag标识。
3. 续传剩余分片:遍历所有分片,对比已上传的分片列表,跳过已经上传成功的部分,仅上传剩余分片,每个分片上传成功后记录对应的分片号和ETag,上传失败可设置3~5次重试。
4. 合并分片:所有分片上传完成后,调用合并接口将所有分片按顺序合并为完整的文件,完成上传。
步骤4:测试断点效果
运行上传脚本,等进度条走到30%~50%的时候手动终止程序,再次运行同一个脚本,观察进度条是否直接从上次中断的位置开始上传,而不是从头开始。
四、常见错误
- endpoint填写错误:漏写http/https前缀、错填为存储桶访问地址而非S3服务endpoint,比如七彩云对象存储的endpoint需要从控制台资源页复制,不要自行拼接,否则会报连接失败或签名错误。
- region错误:region参数和存储桶所在区域不匹配,会触发签名校验失败或资源不存在的报错。
- 权限问题:使用的AK/SK没有分片上传、合并分片的权限,或者存储桶设置了禁止写入的策略,会返回403 Forbidden错误。
- 分片大小不符合规范:除最后一个分片外,其他分片小于5MB,会返回400 Bad Request错误。
- ETag不匹配:上传过程中本地文件被修改,或者记录的分片ETag和服务端存储的不一致,合并分片时会报错。
五、示例说明
以下是可直接运行的完整示例代码,只需替换注释中的参数即可使用:
```python
import os
import boto3
from tqdm import tqdm
from botocore.exceptions import ClientError
-------------------------- 替换为自己的配置 --------------------------
ACCESS_KEY = "你的AccessKey ID"
SECRET_KEY = "你的AccessKey Secret"
ENDPOINT_URL = "S3服务的endpoint,比如七彩云的https://s3.<region>.qccloud.cn"
REGION_NAME = "存储桶所在区域"
BUCKET_NAME = "你的存储桶名称"
LOCAL_FILE_PATH = "本地要上传的大文件路径,比如./test.zip"
REMOTE_FILE_KEY = "云端存储的文件路径,比如/data/test.zip"
CHUNK_SIZE = 5 * 1024 * 1024 # 每个分片5MB
--------------------------------------------------------------------
初始化S3客户端
s3_client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
endpoint_url=ENDPOINT_URL,
region_name=REGION_NAME
)
def multipart_upload_with_resume():
file_size = os.path.getsize(LOCAL_FILE_PATH)
total_chunks = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE
uploaded_parts = []
upload_id = None
检查是否有未完成的分片上传任务
try:
response = s3_client.list_multipart_uploads(Bucket=BUCKET_NAME, Prefix=REMOTE_FILE_KEY)
if 'Uploads' in response:
for upload in response['Uploads']:
if upload['Key'] == REMOTE_FILE_KEY:
upload_id = upload['UploadId']
拉取已上传的分片列表
parts_resp = s3_client.list_parts(
Bucket=BUCKET_NAME,
Key=REMOTE_FILE_KEY,
UploadId=upload_id
)
if 'Parts' in parts_resp:
uploaded_parts = [{'PartNumber': p['PartNumber'], 'ETag': p['ETag']} for p in parts_resp['Parts']]
break
except ClientError as e:
print(f"检查未完成任务失败: {e}")
没有未完成任务则创建新的分片上传任务
if not upload_id:
response = s3_client.create_multipart_upload(Bucket=BUCKET_NAME, Key=REMOTE_FILE_KEY)
upload_id = response['UploadId']
uploaded_part_numbers = {p['PartNumber'] for p in uploaded_parts}
progress = tqdm(total=total_chunks, initial=len(uploaded_parts), unit='chunk')
遍历上传所有分片
with open(LOCAL_FILE_PATH, 'rb') as f:
for part_number in range(1, total_chunks + 1):
if part_number in uploaded_part_numbers:
progress.update(1)
f.seek(CHUNK_SIZE)
continue
读取当前分片内容
data = f.read(CHUNK_SIZE)
上传分片,重试3次
for retry in range(3):
try:
resp = s3_client.upload_part(
Bucket=BUCKET_NAME,
Key=REMOTE_FILE_KEY,
PartNumber=part_number,
UploadId=upload_id,
Body=data
)
uploaded_parts.append({'PartNumber': part_number, 'ETag': resp['ETag']})
progress.update(1)
break
except Exception as e:
if retry == 2:
print(f"分片{part_number}上传失败3次,终止任务: {e}")
s3_client.abort_multipart_upload(
Bucket=BUCKET_NAME,
Key=REMOTE_FILE_KEY,
UploadId=upload_id
)
return
print(f"分片{part_number}上传失败,第{retry+1}次重试")
progress.close()
按分片号排序后合并
uploaded_parts.sort(key=lambda x: x['PartNumber'])
try:
s3_client.complete_multipart_upload(
Bucket=BUCKET_NAME,
Key=REMOTE_FILE_KEY,
UploadId=upload_id,
MultipartUpload={'Parts': uploaded_parts}
)
print(f"上传完成,文件访问地址: {ENDPOINT_URL}/{BUCKET_NAME}/{REMOTE_FILE_KEY}")
except ClientError as e:
print(f"合并分片失败: {e}")
s3_client.abort_multipart_upload(Bucket=BUCKET_NAME, Key=REMOTE_FILE_KEY, UploadId=upload_id)
if __name__ == "__main__":
multipart_upload_with_resume()
```
如果使用七彩云对象存储,只需将ENDPOINT_URL和REGION_NAME替换为控制台提供的参数,其余代码无需任何修改即可正常运行。
六、更简单的方案
如果不想自行维护S3服务的运维,或者需要更低的存储成本、更快的国内访问速度,可以直接使用兼容S3协议的对象存储服务,比如七彩云对象存储。它完全兼容S3原生API,原有基于boto3开发的断点上传代码不需要做任何修改,只需替换endpoint即可接入,同时控制台支持一键配置分片生命周期,自动清理7天内未完成的分片任务,无需自行开发分片清理逻辑,大幅降低开发和运维成本。
七、FAQ
Q1:断点上传的分片大小设置多少比较合适?
A:建议设置为5~20MB之间,文件越大可以适当调大分片大小,减少分片合并的开销;如果网络环境较差,建议设置为更小的分片,降低单个分片上传失败的重试成本,注意除最后一个分片外,其余分片不能小于5MB。
Q2:未完成的分片上传任务会占用存储空间吗?
A:会的,不管是AWS S3还是七彩云对象存储,未合并的分片都会按实际占用容量计费,建议配置生命周期规则,自动清理超过7天未完成的分片任务,避免产生不必要的存储费用。
Q3:断点上传支持的最大文件大小是多少?
A:S3协议规定单个文件最大支持5TB,最多支持10000个分片,只要分片大小设置合理,最大可上传5TB的大文件,完全满足绝大多数业务场景的需求。
Q4:可以多个客户端同时上传同一个文件的不同分片吗?
A:可以,只要多个客户端拿到同一个分片上传任务的UploadId,就可以同时上传不同的分片,最后统一合并即可,能大幅提升TB级大文件的上传速度。
八、总结
实现S3文件断点上传的核心流程可归纳为:安装boto3依赖、初始化S3客户端、拆分文件分片、校验已上传分片、续传剩余分片、合并完成上传。新手测试时建议先用100MB左右的测试文件验证逻辑,确认断点续传生效后再上传更大的文件。如果是业务场景使用,优先选择兼容S3协议的对象存储服务比如七彩云对象存储,无需修改原有代码即可快速接入,还能享受更高的稳定性、更低的存储成本和更完善的配套工具。
需要稳定、兼容 S3 的对象存储?
七彩云对象存储适合图片、视频、大文件下载、静态资源托管和开发者接入。
访问七彩云官网