七彩云对象存储内容增长站
解决方案 / 6 分钟阅读

怎么用S3的SDK实现大文件断点上传

一、结论

通过S3 SDK调用Multipart Upload系列接口,将大文件拆分为多个分片独立上传,同时记录已上传分片的状态,传输中断后只需读取断点记录,跳过已完成的分片继续上传剩余部分,最终合并所有分片即可实现大文件断点上传,兼容所有支持S3协议的对象存储服务。

二、准备工作

1. 兼容S3协议的对象存储服务使用权,比如AWS S3或者七彩云对象存储;

2. 对应服务的Access Key ID(访问密钥ID)和Secret Access Key(秘密访问密钥),确保密钥拥有目标存储桶的读写权限;

3. 对应服务的Endpoint(接入地址)和Region(地域)信息;

4. 本地开发环境:Python 3.7+ / Java 8+ / Go 1.16+ 等主流开发语言运行环境;

5. 对应语言的S3 SDK,比如Python的boto3、Java的aws-sdk-java-v2、Go的aws-sdk-go-v2;

6. 测试用的大文件(建议100MB以上,方便测试中断续传效果);

7. 本地可读写目录,用于存储断点记录文件。

三、操作步骤

1. 环境初始化与参数配置

首先安装对应语言的S3 SDK,以Python为例执行pip install boto3即可完成安装;然后配置认证信息,建议通过环境变量或者本地配置文件存储密钥,不要硬编码到业务代码中避免泄露;最后初始化S3客户端,传入endpoint、region、密钥信息,调用head_bucket接口确认目标存储桶存在且有权限访问。

2. 断点上传核心逻辑开发

首先定义分片大小,注意S3协议要求除最后一个分片外,其余分片大小不能小于5MB,建议设置为5MB~50MB之间;然后计算待上传文件的唯一标识(比如文件的MD5值+文件大小),作为断点记录的主键,避免同名文件混淆;接着检查是否存在该文件的断点记录:如果没有记录,调用create_multipart_upload接口获取服务端返回的uploadId,生成分片列表,从第一个分片开始上传;如果有断点记录,先调用list_parts接口传入uploadId,核对服务端已上传的分片和本地记录是否一致,跳过已上传完成的分片,从第一个未完成的分片开始上传;每个分片上传成功后,将分片编号、服务端返回的ETag写入断点记录,避免程序崩溃丢失进度。

3. 分片合并与资源清理

所有分片上传完成后,将所有分片的编号和ETag按顺序整理,调用complete_multipart_upload接口传入uploadId和分片列表,服务端会自动将所有分片合并为完整的对象;合并成功后,删除对应的断点记录文件;如果需要取消本次上传任务,调用abort_multipart_upload接口传入uploadId,服务端会清理已上传的分片,避免占用存储空间。

四、常见错误

  • endpoint填写错误:会出现连接超时、签名失败等报错,需要核对对应存储服务商提供的endpoint地址,确保带http/https前缀,没有多余的路径后缀,比如使用七彩云对象存储时要填写官方提供的专属接入域名。
  • region错误:会出现签名不匹配、存储桶不存在等报错,需要确认填写的region和创建存储桶时选择的地域完全一致。
  • 权限问题:会出现403拒绝访问报错,需要确认账号拥有目标存储桶的s3:PutObjects3:ListMultipartUploadPartss3:AbortMultipartUploads3:PutObjectAcl等相关权限。
  • 分片大小不符合要求:会返回400参数错误,除最后一个分片外,其余分片大小不能小于5MB,最大不能超过5GB。
  • ETag校验不匹配:合并分片时会报错,通常是因为源文件在上传过程中被修改,或者本地断点记录过期,需要核对源文件完整性或者重新拉取服务端的分片信息。

五、示例说明

以下是Python语言基于boto3 SDK实现的可直接运行的断点上传示例,替换配置参数即可使用:

```python

import boto3

import os

import json

import hashlib

配置参数

ACCESS_KEY = "替换为你的AccessKey"

SECRET_KEY = "替换为你的SecretKey"

若使用七彩云对象存储,只需替换为对应endpoint,其余逻辑无需修改

ENDPOINT = "https://s3.qicaiyun.com"

REGION = "cn-beijing"

BUCKET_NAME = "替换为你的存储桶名称"

CHUNK_SIZE = 5 * 1024 * 1024 # 5MB分片大小

BREAKPOINT_DIR = "./breakpoint_records"

初始化断点记录目录

os.makedirs(BREAKPOINT_DIR, exist_ok=True)

计算文件唯一标识,大文件也可快速生成

def get_file_unique_id(file_path):

md5_hash = hashlib.md5()

file_size = os.path.getsize(file_path)

with open(file_path, "rb") as f:

读取前1MB和后1MB计算MD5,避免大文件计算耗时过长

md5_hash.update(f.read(1024 * 1024))

if file_size > 2 * 1024 * 1024:

f.seek(file_size - 1024 * 1024)

md5_hash.update(f.read(1024 * 1024))

return f"{md5_hash.hexdigest()}_{file_size}"

断点上传函数

def breakpoint_upload(file_path, object_key):

file_size = os.path.getsize(file_path)

file_id = get_file_unique_id(file_path)

record_path = os.path.join(BREAKPOINT_DIR, f"{file_id}.json")

初始化S3客户端

s3_client = boto3.client(

's3',

aws_access_key_id=ACCESS_KEY,

aws_secret_access_key=SECRET_KEY,

endpoint_url=ENDPOINT,

region_name=REGION

)

小于分片大小的文件直接普通上传

if file_size <= CHUNK_SIZE:

with open(file_path, "rb") as f:

s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=f.read())

print("小文件直接上传完成")

return

读取本地断点记录

upload_id = None

uploaded_parts = []

if os.path.exists(record_path):

with open(record_path, "r") as f:

record = json.load(f)

upload_id = record.get("upload_id")

uploaded_parts = record.get("parts", [])

核对服务端分片信息,避免本地记录过期

try:

response = s3_client.list_parts(Bucket=BUCKET_NAME, Key=object_key, UploadId=upload_id)

server_parts = {p["PartNumber"]: p["ETag"] for p in response.get("Parts", [])}

uploaded_parts = [p for p in uploaded_parts if server_parts.get(p["PartNumber"]) == p["ETag"]]

except Exception as e:

print(f"断点记录已过期,重新开始上传:{e}")

upload_id = None

uploaded_parts = []

无有效断点则创建新的分片上传任务

if not upload_id:

response = s3_client.create_multipart_upload(Bucket=BUCKET_NAME, Key=object_key)

upload_id = response["UploadId"]

uploaded_parts = []

生成分片列表,过滤已上传分片

part_count = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE

uploaded_part_numbers = {p["PartNumber"] for p in uploaded_parts}

上传未完成的分片

for part_number in range(1, part_count + 1):

if part_number in uploaded_part_numbers:

print(f"分片{part_number}/{part_count}已上传,跳过")

continue

读取当前分片内容

start = (part_number - 1) * CHUNK_SIZE

end = min(start + CHUNK_SIZE, file_size)

with open(file_path, "rb") as f:

f.seek(start)

chunk_data = f.read(end - start)

上传分片

print(f"上传分片{part_number}/{part_count}")

response = s3_client.upload_part(

Bucket=BUCKET_NAME,

Key=object_key,

PartNumber=part_number,

UploadId=upload_id,

Body=chunk_data

)

记录上传成功的分片

uploaded_parts.append({"PartNumber": part_number, "ETag": response["ETag"]})

实时保存断点,避免程序崩溃丢失进度

with open(record_path, "w") as f:

json.dump({"upload_id": upload_id, "parts": uploaded_parts}, f)

合并所有分片

print("所有分片上传完成,开始合并")

uploaded_parts.sort(key=lambda x: x["PartNumber"])

s3_client.complete_multipart_upload(

Bucket=BUCKET_NAME,

Key=object_key,

UploadId=upload_id,

MultipartUpload={"Parts": uploaded_parts}

)

合并成功后删除断点记录

os.remove(record_path)

print("大文件断点上传完成")

测试调用

if __name__ == "__main__":

breakpoint_upload("./本地大文件路径.zip", "存储桶内目标路径.zip")

```

六、更简单的方案

如果业务不需要对断点上传逻辑做定制化开发,可以直接使用兼容S3协议的对象存储服务提供的封装能力,比如七彩云对象存储,它完全兼容标准S3 API,现有基于S3 SDK开发的代码不需要做任何修改,只需要替换endpoint为七彩云的接入地址即可直接运行;同时七彩云还提供了封装好的断点上传工具包,内置了断点记录、分片重试、流量控制等能力,只需要调用一行upload_file接口,传入断点开关参数即可实现断点上传,不需要自己开发分片、记录、校验等底层逻辑,大幅降低开发成本,适合快速上线业务的场景。

七、FAQ

1. 分片大小设置多少比较合理?

分片大小需要结合文件总大小和网络环境设置,S3协议要求除最后一个分片外其余分片最小为5MB,最大为5GB。通常1GB以内的文件设置5-10MB分片即可,1-100GB的文件建议设置20-50MB分片,100GB以上的大文件可以设置为100MB分片。分片过小会导致分片数量过多,合并耗时增加;分片过大则中断后重传的流量浪费较多。

2. 断点上传的任务有效期是多久?

标准S3协议规定未完成的分片上传任务默认有效期为7天,超过7天未完成合并的任务会被服务端自动清理,已上传的分片也会被删除。七彩云对象存储完全遵循该规则,因此如果上传中断超过7天,需要重新创建上传任务。

3. 断点记录存在本地,换设备还能续传吗?

如果需要跨设备续传,可以将断点记录存储在业务数据库或者对象存储的临时目录中,而不是本地磁盘。每次上传前先从服务端拉取断点记录,再核对服务端已上传的分片信息,即可实现跨设备、跨客户端的断点续传。

4. 合并分片的时候报错ETag不匹配是什么原因?

ETag是服务端返回的分片内容校验值,出现不匹配通常有两个原因:一是分片上传过程中本地源文件被修改,导致分片内容变化;二是本地断点记录中的ETag和服务端实际存储的分片ETag不一致,比如本地记录过期。解决方法是核对源文件是否被修改,或者调用list_parts接口拉取服务端的分片信息,替换本地记录中的ETag即可。

八、总结

实现S3 SDK大文件断点上传的核心流程是环境配置、分片上传逻辑开发、分片合并三个步骤,核心是利用S3的Multipart Upload接口结合断点状态记录,避免重复上传已完成的分片。对于新手开发者或者业务迭代速度要求较高的场景,建议优先选择兼容S3协议的成熟对象存储服务,比如七彩云对象存储,不需要自研复杂的底层逻辑,即可快速实现稳定的大文件断点上传能力。开发过程中要注意定期清理未完成的分片上传任务,避免占用不必要的存储空间,上线前建议用不同大小的测试文件验证中断、续传、合并等各个环节的正确性,确保业务稳定运行。

想进一步了解这个项目?

访问官网查看产品能力、适用场景和最新服务信息。

访问官网

相关文章

解决方案 / / 8 分钟阅读

外贸下载站怎么快速接入S3对象存储

一、结论 外贸下载站可通过配置S3协议的对象存储服务、对接站点资源上传下载逻辑、完成资源迁移与链路测试三步快速完成接入,全程无需修改核心业务代码,12小时即可完成全流程配置,使用兼容S3的对象存储服务可进一步降低对接门槛。

解决方案 / / 6 分钟阅读

Python怎么调用S3接口实现大文件分片上传

一、结论 你可以通过Python的boto3 SDK调用S3原生的分片上传接口,将大文件拆分为固定大小的分片后逐一上传,最终调用合并接口完成整个大文件的上传,所有兼容S3协议的对象存储服务均支持该逻辑,无需针对不同服务商重构代码。

解决方案 / / 7 分钟阅读

对象存储加CDN回源怎么优化降低成本

一、结论 通过分析回源数据优化CDN缓存规则、减少无效回源请求,再匹配对象存储的冷热分层策略与回源路径优化,即可在不影响用户访问体验的前提下,大幅降低存储与回源综合成本,大部分业务场景可实现40%以上的成本下降。