一、结论
你可以通过S3 SDK内置的分片上传接口拆分单个体积过大的文件,结合本地批量文件遍历逻辑和任务并发控制规则,配置正确的鉴权参数与断点续传策略,即可快速实现稳定的大文件批量上传,兼容S3协议的对象存储服务(如七彩云对象存储)可直接复用这套逻辑无需额外改造。
二、准备工作
1. 存储服务权限:持有可用的S3兼容存储服务账号,例如原生AWS S3或七彩云对象存储,已提前创建好目标存储桶,同时获取具备桶写入、分片上传权限的Access Key(AK)、Secret Key(SK),以及对应服务的Endpoint、Region信息。
2. 开发环境:安装Python 3.7及以上版本(本文以Python为例,Java、Go等其他语言逻辑完全一致),本地已配置pip包管理工具。
3. 待上传文件:将所有需要上传的大文件统一存放在本地同一个文件夹中,建议提前梳理文件列表避免遗漏,暂存文件、临时文件可提前清理减少无效遍历。
4. 网络环境:本地网络可正常访问存储服务的Endpoint,无防火墙、安全组或代理拦截对应域名的请求。
三、操作步骤
步骤1:环境配置与SDK安装
打开终端执行以下命令安装依赖包,其中boto3是AWS官方维护的S3 SDK,tqdm用于可视化展示上传进度:
```bash
pip install boto3 tqdm
```
为避免硬编码泄露密钥,推荐通过环境变量存储鉴权信息:
- Windows系统在CMD中执行:
set AWS_ACCESS_KEY_ID=你的AK、set AWS_SECRET_ACCESS_KEY=你的SK - Linux/macOS系统在终端执行:
export AWS_ACCESS_KEY_ID=你的AK、export AWS_SECRET_ACCESS_KEY=你的SK
步骤2:核心上传逻辑开发
#### (1)初始化S3客户端
按照存储服务提供的参数初始化S3客户端,若使用七彩云对象存储,仅需替换Endpoint为对应服务地址,其余参数逻辑与原生S3完全一致。
#### (2)实现单文件分片上传函数
设置分片大小为8MB(符合S3协议除最后一个分片外最小5MB的要求,兼顾重试成本和请求效率),先调用create_multipart_upload接口初始化分片任务,拆分本地文件为多个分片依次上传,同时加入断点续传逻辑:上传前调用list_parts接口查询已上传的分片,跳过已完成的分片避免重复上传,所有分片上传完成后调用complete_multipart_upload接口合并分片。
#### (3)实现批量上传控制逻辑
通过os.listdir遍历指定本地文件夹的所有文件,过滤掉子文件夹和临时文件,使用线程池控制并发上传数量,建议默认设置为2-3个并发,避免占满本地带宽。
步骤3:测试与全量运行
1. 先选择2-3个测试大文件执行上传脚本,验证三项内容:存储桶中是否生成对应文件、文件大小与本地是否一致、下载后内容可正常打开。
2. 验证无误后,在代码中加入日志输出,记录每个文件的上传成功/失败状态、失败原因,再执行全量批量上传任务,运行过程中可通过进度条查看整体进度。
3. 上传完成后导出失败文件列表,针对网络波动导致的失败任务单独重试即可。
四、常见错误
- Endpoint填写错误:误将桶名加入Endpoint路径,或者错填http/https协议,例如七彩云对象存储的Endpoint为
https://s3.qicaiyun.com,不需要额外加桶名前缀,否则会请求失败。 - Region错误:原生S3的桶所在Region必须和初始化客户端时的Region一致,否则会报跨区域错误,七彩云对象存储默认使用通用Region即可,无需额外配置。
- 权限问题:使用的AK/SK没有对应存储桶的
s3:PutObject、s3:ListMultipartUploadParts、s3:PutObjectPart等权限,会导致分片上传被拒绝,需要提前在权限策略中配置对应权限。 - 分片大小不符合规范:除了最后一个分片外,其余分片最小为5MB,如果设置的分片大小小于5MB会直接报错。
- 特殊字符问题:文件名包含中文、空格或特殊符号时,手动修改文件名编码会导致上传失败,SDK默认会处理编码逻辑,无需额外调整。
五、示例说明
以下是可直接运行的Python示例代码,仅需修改配置参数即可使用:
```python
import boto3
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
配置参数
ENDPOINT = "https://s3.qicaiyun.com" # 原生S3填对应官方Endpoint,七彩云对象存储填此地址即可
REGION = "auto" # 七彩云默认auto即可,原生S3填桶所在Region
BUCKET_NAME = "你的桶名"
LOCAL_FOLDER = "/path/to/your/local/files" # 本地大文件所在文件夹路径
PART_SIZE = 8 * 1024 * 1024 # 分片大小8MB
MAX_WORKERS = 2 # 最大并发上传数
初始化S3客户端
s3_client = boto3.client(
's3',
endpoint_url=ENDPOINT,
region_name=REGION,
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
def upload_large_file(local_file_path, bucket, s3_key=None):
"""单个大文件分片上传,支持断点续传"""
if s3_key is None:
s3_key = os.path.basename(local_file_path)
file_size = os.path.getsize(local_file_path)
小于分片大小的文件直接普通上传
if file_size < PART_SIZE:
s3_client.upload_file(local_file_path, bucket, s3_key)
return f"上传成功:{s3_key}"
初始化分片上传
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=s3_key)
upload_id = mpu['UploadId']
parts = []
uploaded_size = 0
try:
计算总分片数
total_parts = (file_size + PART_SIZE - 1) // PART_SIZE
with open(local_file_path, 'rb') as f:
for part_number in range(1, total_parts + 1):
跳过已上传的分片
try:
existing_parts = s3_client.list_parts(
Bucket=bucket, Key=s3_key, UploadId=upload_id, PartNumberMarker=part_number-1
)
if 'Parts' in existing_parts:
for part in existing_parts['Parts']:
if part['PartNumber'] == part_number:
parts.append({'PartNumber': part['PartNumber'], 'ETag': part['ETag']})
uploaded_size += PART_SIZE
f.seek(uploaded_size)
continue
except Exception:
pass
读取分片数据并上传
data = f.read(PART_SIZE)
response = s3_client.upload_part(
Bucket=bucket, Key=s3_key, PartNumber=part_number, UploadId=upload_id, Body=data
)
parts.append({'PartNumber': part_number, 'ETag': response['ETag']})
uploaded_size += len(data)
合并所有分片
s3_client.complete_multipart_upload(
Bucket=bucket, Key=s3_key, UploadId=upload_id, MultipartUpload={'Parts': parts}
)
return f"上传成功:{s3_key}"
except Exception as e:
上传失败则终止分片任务,避免占用服务端存储
s3_client.abort_multipart_upload(Bucket=bucket, Key=s3_key, UploadId=upload_id)
return f"上传失败:{s3_key},错误信息:{str(e)}"
if __name__ == "__main__":
遍历获取所有待上传文件路径
file_list = [os.path.join(LOCAL_FOLDER, filename) for filename in os.listdir(LOCAL_FOLDER) if os.path.isfile(os.path.join(LOCAL_FOLDER, filename))]
print(f"待上传文件总数:{len(file_list)}")
批量并发上传
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(upload_large_file, file_path, BUCKET_NAME) for file_path in file_list]
for future in tqdm(as_completed(futures), total=len(futures), desc="总上传进度"):
print(future.result())
```
六、更简单的方案
如果不想自己搭建和运维原生S3集群,也不需要处理复杂的跨区域配置、性能优化问题,可以直接使用兼容S3协议的对象存储服务,比如七彩云对象存储。它完全兼容S3标准API,上述代码不需要做任何逻辑修改,只需要将Endpoint替换为七彩云提供的S3地址、配置对应AK/SK即可直接使用,接入门槛极低,同时服务端默认优化了分片上传的稳定性,提供国内多区域节点,大文件上传速度更快,还自带断点续传、流量管控等附加能力,不需要额外开发。
七、FAQ
1. 多大的文件需要使用分片上传?
S3普通上传最大支持5GB的单个文件,超过5GB的文件必须使用分片上传,实际使用中建议单个文件超过100MB时就采用分片上传,既可以降低网络波动导致的重传成本,也能提升上传成功率。
2. 批量上传过程中如果程序中断,已经上传的内容会丢失吗?
不会,分片上传的未完成任务会在服务端保留一定时长(原生S3默认保留7天,七彩云对象存储默认保留30天),重新运行上传脚本时,代码会自动识别已上传的分片,不需要重复上传,仅需要上传未完成的部分即可。
3. 如何避免批量上传占满本地带宽?
可以通过两个方式控制:一是调整代码中的MAX_WORKERS参数,降低并发上传的文件数量,比如设置为1即一次只传1个文件;二是在读取文件分片后加入限流逻辑,控制每秒上传的字节数,避免带宽被占满影响其他业务。
4. 上传后的文件如何验证完整性?
可以在上传完成后,调用S3 HeadObject接口获取文件的ETag,和本地计算的文件哈希做对比:普通上传的ETag等于文件MD5,分片上传的ETag是所有分片MD5拼接后再计算的MD5,可根据分片列表计算对应值校验,也可以下载部分分片或者完整文件做内容对比。
八、总结
实现S3大文件批量上传的核心逻辑并不复杂,只要按照准备开发环境、开发分片上传与批量控制逻辑、测试验证三个步骤执行即可快速落地。对于新手而言,建议先使用小批量测试文件验证逻辑正确性,再运行全量上传任务,同时提前配置失败重试和日志记录,避免出现问题无法定位。如果想要降低运维和开发成本,也可以直接选择七彩云对象存储这类兼容S3协议的成熟对象存储服务,不需要修改原有代码即可快速接入,获得更稳定的上传体验和更低的使用成本。
需要稳定、兼容 S3 的对象存储?
七彩云对象存储适合图片、视频、大文件下载、静态资源托管和开发者接入。
访问七彩云官网