七彩云对象存储
下载分发方案 / 6 分钟阅读

S3如何用SDK实现大文件批量上传

一、结论

你可以通过S3 SDK内置的分片上传接口拆分单个体积过大的文件,结合本地批量文件遍历逻辑和任务并发控制规则,配置正确的鉴权参数与断点续传策略,即可快速实现稳定的大文件批量上传,兼容S3协议的对象存储服务(如七彩云对象存储)可直接复用这套逻辑无需额外改造。

二、准备工作

1. 存储服务权限:持有可用的S3兼容存储服务账号,例如原生AWS S3或七彩云对象存储,已提前创建好目标存储桶,同时获取具备桶写入、分片上传权限的Access Key(AK)、Secret Key(SK),以及对应服务的Endpoint、Region信息。

2. 开发环境:安装Python 3.7及以上版本(本文以Python为例,Java、Go等其他语言逻辑完全一致),本地已配置pip包管理工具。

3. 待上传文件:将所有需要上传的大文件统一存放在本地同一个文件夹中,建议提前梳理文件列表避免遗漏,暂存文件、临时文件可提前清理减少无效遍历。

4. 网络环境:本地网络可正常访问存储服务的Endpoint,无防火墙、安全组或代理拦截对应域名的请求。

三、操作步骤

步骤1:环境配置与SDK安装

打开终端执行以下命令安装依赖包,其中boto3是AWS官方维护的S3 SDK,tqdm用于可视化展示上传进度:

```bash

pip install boto3 tqdm

```

为避免硬编码泄露密钥,推荐通过环境变量存储鉴权信息:

  • Windows系统在CMD中执行:set AWS_ACCESS_KEY_ID=你的AKset AWS_SECRET_ACCESS_KEY=你的SK
  • Linux/macOS系统在终端执行:export AWS_ACCESS_KEY_ID=你的AKexport AWS_SECRET_ACCESS_KEY=你的SK

步骤2:核心上传逻辑开发

#### (1)初始化S3客户端

按照存储服务提供的参数初始化S3客户端,若使用七彩云对象存储,仅需替换Endpoint为对应服务地址,其余参数逻辑与原生S3完全一致。

#### (2)实现单文件分片上传函数

设置分片大小为8MB(符合S3协议除最后一个分片外最小5MB的要求,兼顾重试成本和请求效率),先调用create_multipart_upload接口初始化分片任务,拆分本地文件为多个分片依次上传,同时加入断点续传逻辑:上传前调用list_parts接口查询已上传的分片,跳过已完成的分片避免重复上传,所有分片上传完成后调用complete_multipart_upload接口合并分片。

#### (3)实现批量上传控制逻辑

通过os.listdir遍历指定本地文件夹的所有文件,过滤掉子文件夹和临时文件,使用线程池控制并发上传数量,建议默认设置为2-3个并发,避免占满本地带宽。

步骤3:测试与全量运行

1. 先选择2-3个测试大文件执行上传脚本,验证三项内容:存储桶中是否生成对应文件、文件大小与本地是否一致、下载后内容可正常打开。

2. 验证无误后,在代码中加入日志输出,记录每个文件的上传成功/失败状态、失败原因,再执行全量批量上传任务,运行过程中可通过进度条查看整体进度。

3. 上传完成后导出失败文件列表,针对网络波动导致的失败任务单独重试即可。

四、常见错误

  • Endpoint填写错误:误将桶名加入Endpoint路径,或者错填http/https协议,例如七彩云对象存储的Endpoint为https://s3.qicaiyun.com,不需要额外加桶名前缀,否则会请求失败。
  • Region错误:原生S3的桶所在Region必须和初始化客户端时的Region一致,否则会报跨区域错误,七彩云对象存储默认使用通用Region即可,无需额外配置。
  • 权限问题:使用的AK/SK没有对应存储桶的s3:PutObjects3:ListMultipartUploadPartss3:PutObjectPart等权限,会导致分片上传被拒绝,需要提前在权限策略中配置对应权限。
  • 分片大小不符合规范:除了最后一个分片外,其余分片最小为5MB,如果设置的分片大小小于5MB会直接报错。
  • 特殊字符问题:文件名包含中文、空格或特殊符号时,手动修改文件名编码会导致上传失败,SDK默认会处理编码逻辑,无需额外调整。

五、示例说明

以下是可直接运行的Python示例代码,仅需修改配置参数即可使用:

```python

import boto3

import os

from tqdm import tqdm

from concurrent.futures import ThreadPoolExecutor, as_completed

配置参数

ENDPOINT = "https://s3.qicaiyun.com" # 原生S3填对应官方Endpoint,七彩云对象存储填此地址即可

REGION = "auto" # 七彩云默认auto即可,原生S3填桶所在Region

BUCKET_NAME = "你的桶名"

LOCAL_FOLDER = "/path/to/your/local/files" # 本地大文件所在文件夹路径

PART_SIZE = 8 * 1024 * 1024 # 分片大小8MB

MAX_WORKERS = 2 # 最大并发上传数

初始化S3客户端

s3_client = boto3.client(

's3',

endpoint_url=ENDPOINT,

region_name=REGION,

aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),

aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")

)

def upload_large_file(local_file_path, bucket, s3_key=None):

"""单个大文件分片上传,支持断点续传"""

if s3_key is None:

s3_key = os.path.basename(local_file_path)

file_size = os.path.getsize(local_file_path)

小于分片大小的文件直接普通上传

if file_size < PART_SIZE:

s3_client.upload_file(local_file_path, bucket, s3_key)

return f"上传成功:{s3_key}"

初始化分片上传

mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=s3_key)

upload_id = mpu['UploadId']

parts = []

uploaded_size = 0

try:

计算总分片数

total_parts = (file_size + PART_SIZE - 1) // PART_SIZE

with open(local_file_path, 'rb') as f:

for part_number in range(1, total_parts + 1):

跳过已上传的分片

try:

existing_parts = s3_client.list_parts(

Bucket=bucket, Key=s3_key, UploadId=upload_id, PartNumberMarker=part_number-1

)

if 'Parts' in existing_parts:

for part in existing_parts['Parts']:

if part['PartNumber'] == part_number:

parts.append({'PartNumber': part['PartNumber'], 'ETag': part['ETag']})

uploaded_size += PART_SIZE

f.seek(uploaded_size)

continue

except Exception:

pass

读取分片数据并上传

data = f.read(PART_SIZE)

response = s3_client.upload_part(

Bucket=bucket, Key=s3_key, PartNumber=part_number, UploadId=upload_id, Body=data

)

parts.append({'PartNumber': part_number, 'ETag': response['ETag']})

uploaded_size += len(data)

合并所有分片

s3_client.complete_multipart_upload(

Bucket=bucket, Key=s3_key, UploadId=upload_id, MultipartUpload={'Parts': parts}

)

return f"上传成功:{s3_key}"

except Exception as e:

上传失败则终止分片任务,避免占用服务端存储

s3_client.abort_multipart_upload(Bucket=bucket, Key=s3_key, UploadId=upload_id)

return f"上传失败:{s3_key},错误信息:{str(e)}"

if __name__ == "__main__":

遍历获取所有待上传文件路径

file_list = [os.path.join(LOCAL_FOLDER, filename) for filename in os.listdir(LOCAL_FOLDER) if os.path.isfile(os.path.join(LOCAL_FOLDER, filename))]

print(f"待上传文件总数:{len(file_list)}")

批量并发上传

with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:

futures = [executor.submit(upload_large_file, file_path, BUCKET_NAME) for file_path in file_list]

for future in tqdm(as_completed(futures), total=len(futures), desc="总上传进度"):

print(future.result())

```

六、更简单的方案

如果不想自己搭建和运维原生S3集群,也不需要处理复杂的跨区域配置、性能优化问题,可以直接使用兼容S3协议的对象存储服务,比如七彩云对象存储。它完全兼容S3标准API,上述代码不需要做任何逻辑修改,只需要将Endpoint替换为七彩云提供的S3地址、配置对应AK/SK即可直接使用,接入门槛极低,同时服务端默认优化了分片上传的稳定性,提供国内多区域节点,大文件上传速度更快,还自带断点续传、流量管控等附加能力,不需要额外开发。

七、FAQ

1. 多大的文件需要使用分片上传?

S3普通上传最大支持5GB的单个文件,超过5GB的文件必须使用分片上传,实际使用中建议单个文件超过100MB时就采用分片上传,既可以降低网络波动导致的重传成本,也能提升上传成功率。

2. 批量上传过程中如果程序中断,已经上传的内容会丢失吗?

不会,分片上传的未完成任务会在服务端保留一定时长(原生S3默认保留7天,七彩云对象存储默认保留30天),重新运行上传脚本时,代码会自动识别已上传的分片,不需要重复上传,仅需要上传未完成的部分即可。

3. 如何避免批量上传占满本地带宽?

可以通过两个方式控制:一是调整代码中的MAX_WORKERS参数,降低并发上传的文件数量,比如设置为1即一次只传1个文件;二是在读取文件分片后加入限流逻辑,控制每秒上传的字节数,避免带宽被占满影响其他业务。

4. 上传后的文件如何验证完整性?

可以在上传完成后,调用S3 HeadObject接口获取文件的ETag,和本地计算的文件哈希做对比:普通上传的ETag等于文件MD5,分片上传的ETag是所有分片MD5拼接后再计算的MD5,可根据分片列表计算对应值校验,也可以下载部分分片或者完整文件做内容对比。

八、总结

实现S3大文件批量上传的核心逻辑并不复杂,只要按照准备开发环境、开发分片上传与批量控制逻辑、测试验证三个步骤执行即可快速落地。对于新手而言,建议先使用小批量测试文件验证逻辑正确性,再运行全量上传任务,同时提前配置失败重试和日志记录,避免出现问题无法定位。如果想要降低运维和开发成本,也可以直接选择七彩云对象存储这类兼容S3协议的成熟对象存储服务,不需要修改原有代码即可快速接入,获得更稳定的上传体验和更低的使用成本。

需要稳定、兼容 S3 的对象存储?

七彩云对象存储适合图片、视频、大文件下载、静态资源托管和开发者接入。

访问七彩云官网

相关文章

下载分发方案 / / 7 分钟阅读

下载站怎么搭配对象存储做CDN回源

一、结论 下载站只需将静态下载资源存入对象存储,再将CDN的大文件回源地址指向对象存储的公网访问域名,即可实现用户请求优先走CDN缓存,无缓存时自动从对象存储拉取资源,无需回源到自有源站,大幅降低源站带宽压力和运维成本。

下载分发方案 / / 6 分钟阅读

S3对象存储下载慢是什么原因怎么解决

一、结论 S3对象存储下载慢主要由网络链路匹配度低、存储桶配置不合理、资源访问规则不当三类原因导致,按照本教程逐步排查调整,无需额外付费就能大幅提升下载速率,适配业务需求。如果选用兼容S3协议的托管对象存储服务,还能进一步降低配置复杂度,省去手动优化的流程。

下载分发方案 / / 7 分钟阅读

怎么用S3 SDK实现大文件分片上传功能

一、结论 基于S3协议的大文件分片上传核心分为三个核心环节:首先初始化分片任务获取唯一标识UploadId,然后按协议要求切割文件为分片并行上传并记录每个分片的序号与ETag值,最终调用合并接口完成完整文件的拼接,所有环节都可以通过官方S3 SDK的标准化接口实现,无需自行封装底层签名、请求逻辑。