融合接口
功能说明
分片上传步骤较多,包括初始化、文件切段、各个分片上传、完成上传。为了简化分片上传,SDK提供了方便的融合接口用于上传文件,用户不需要关心文件的大小,SDK会自动对大文件使用分片上传。
代码示例
import botocore.config
import botocore.session
import s3transfer.manager
import datetime
class TransferDemo(object):
def __init__(self):
config = botocore.config.Config(
signature_version='s3v4', # 签名版本,s3 or s3v4
)
self.bucket = '<your-bucket-name>'
session = botocore.session.get_session()
self.s3_client = session.create_client(
's3',
aws_access_key_id='<your-access-key>',
aws_secret_access_key='<your-secret-key>',
endpoint_url='<your-endpoint>',
config=config)
MB = 1024 * 1024
transConfig = s3transfer.manager.TransferConfig(
multipart_threshold=5 * MB, # 大于该值使用分片上传
multipart_chunksize=5 * MB, # 分片大小
max_request_concurrency=2,
)
# 设置带宽,不填表示不限制
# transConfig.max_bandwidth = 1 * MB
self.transfer = s3transfer.manager.TransferManager(self.s3_client, transConfig)
def upload(self):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), "upload start")
key = '<your-object-key>'
# 扩展配置,可以设置ContentType和ACL
extraArgs = {'ContentType': 'text/plain', 'ACL': 'public-read'}
with open('<file-path>', 'rb') as f:
future = self.transfer.upload(f, self.bucket, key, extra_args=extraArgs)
future.result()
请求参数
参数 | 类型 | 说明 |
---|
bucket | string | 桶名 |
key | string | 对象名 |
fileobj | fileobj | 打开的文件对象 |
extra_args | dict | 扩展配置,可以设置ContentType和ACL;ACL取值private | public-read | public-read-write |
TransferConfig参数,
参数 | 类型 | 说明 |
---|
multipart_threshold | int | 大于该值使用分片上传 |
multipart_chunksize | int | 分片大小,默认5MB |
max_request_concurrency | int | 上传分片并发数 |
关于Content-Type的配置
Content-Type用于标识文件的资源类型,比如image/png
, image/jpg
是图片类型,video/mpeg
, video/mp4
是视频类型,text/plain
, text/html
是文本类型, 浏览器针对不同的Content-Type会有不同的操作,比如图片类型可以预览,视频类型可以播放,文本类型可以直接打开。application/octet-stream
类型会直接打开下载窗口。
有些用户反馈图片和视频无法预览的问题,主要就是Content-Type没有正确设置导致的;Content-Type参数需要用户主动设置,默认是application/octet-stream
。在python sdk中,可以根据对象key值后缀扩展名来决定文件的Content-Type,参考代码如下:
import mimetypes
def mime_type(key):
mt = mimetypes.guess_type(key)[0]
if mt == None :
return ""
return mt
初始化分片上传任务
功能说明
您可以使用create_multipart_upload接口创建上传任务。
代码示例
# create_multipart_upload
resp = self.s3_client.create_multipart_upload(
Bucket='<your-bucket-name>',
Key='<your-object-key>'
)
upload_id = resp['UploadId']
print('create_multipart_upload success upload_id: %s' %upload_id)
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 对象key | 是 |
返回结果
参数 | 类型 | 说明 |
---|
UploadId | string | 分片上传任务的id |
上传分片
功能说明
初始化分片上传任务后,指定分片上传任务的id可以上传分片数据,可以将大文件分割成分片后上传,除了最后一个分片,每个分片的数据大小为5MB~5GB,每个分片上传任务最多上传10000个分片。您可以使用upload_part上传分片。
代码示例
def multipart_upload(self):
bucket = '<your-bucket-name>'
key = '<your-object-key>'
local_path = '<file-path>'
parts = [] # part list uploaded by client
part_size = 5 * 1024 * 1024
# create_multipart_upload
resp = self.s3_client.create_multipart_upload(
Bucket=bucket,
Key=key
)
upload_id = resp['UploadId']
print('create_multipart_upload success upload_id: %s' %upload_id)
# upload_part
with open(local_path, 'rb') as f:
s = f.read(part_size)
part_num = 1
while s:
file_chunk = io.BytesIO(s)
resp = self.s3_client.upload_part(
Bucket=bucket,
Key=key,
Body=file_chunk,
UploadId=upload_id,
PartNumber=part_num,
)
print('upload part %d success' %part_num)
part = {
'ETag': resp['ETag'],
'PartNumber': part_num
}
parts.append(part)
s = f.read(part_size)
part_num += 1
# complete_multipart_upload
resp = self.s3_client.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={
'Parts': parts
},
)
print('complete_multipart_upload success upload_id: %s' %upload_id)
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 对象key | 是 |
Body | bytes|file | 对象的数据 | 是 |
PartNumber | int | 当前分片号码 | 是 |
UploadId | string | 通过创建上传任务接口获取到的任务Id | 是 |
返回结果
参数 | 类型 | 说明 |
---|
ETag | string | 本次上传分片对应的Entity Tag |
合并分片
功能说明
合并指定分片上传任务id对应任务中已上传的对象分片,使之成为一个完整的文件。您可以使用complete_multipart_upload接口合并分片。
代码示例
# complete_multipart_upload
resp = self.s3_client.complete_multipart_upload(
Bucket='<your-bucket-name>',
Key='<your-object-key>',
UploadId=upload_id,
MultipartUpload={
'Parts': parts
},
)
print('complete_multipart_upload success upload_id: %s' %upload_id)
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 对象key | 是 |
MultipartUpload | MultipartUpload | 包含了每个已上传的分片的ETag和PartNUmber等信息 | 是 |
UploadId | string | 通过创建上传任务接口获取到的任务Id | 是 |
返回结果
参数 | 类型 | 说明 |
---|
ETag | string | 本次上传对象后对应的Entity Tag |
列举分片上传任务
功能说明
您可以使用list_multipart_uploads获取未完成的上传任务。
代码示例
def list_multipart_uploads(self):
print('list_multipart_uploads')
prefix = '<your-object-key>'
resp = self.s3_client.list_multipart_uploads(
Bucket='<your-bucket-name>',
Prefix=prefix,
MaxUploads=50,
)
for task in resp['Uploads']:
print('key: %s, id: %s' %(task['Key'], task['UploadId']))
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
MaxUploads | int | 用于指定获取任务的最大数量(1-1000) | 否 |
Prefix | string | 指定上传对象key的前缀 | 否 |
返回结果
参数 | 类型 | 说明 |
---|
Uploads | Uploads | 包含了零个或多个已初始化的上传分片信息的数组。数组中的每一项包含了分片初始化时间、分片上传操作发起者、对象key、对象拥有者、存储类型和UploadId等信息 |
列举已上传的分片
功能说明
您可以使用list_parts获取一个未完成的上传任务中已完成上传的分片信息。
代码示例
def list_parts(self):
print('list_parts')
key = '<your-object-key>'
resp = self.s3_client.list_parts(
Bucket='<your-bucket-name>',
Key=key,
UploadId='<upload id>',
)
print(resp)
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 对象key | 是 |
UploadId | string | 指定返回该任务id所属的分片上传的分片信息 | 是 |
返回结果
参数 | 类型 | 说明 |
---|
Parts | Parts | 包含了已上传分片信息的数组,数组中的每一项包含了该分片的Entity tag、最后修改时间、PartNumber和大小等信息 |
复制分片
功能说明
复制分片操作可以从一个已存在的对象中拷贝指定分片的数据,当拷贝的对象大小超过5GB,必须使用复制分片操作完成对象的复制。除了最后一个分片外,每个拷贝分片的大小范围是[5MB,5GB]。在一个在拷贝大对象之前,需要使用初始化分片上传操作获取一个upload id,在完成拷贝操作之后,需要使用合并分片操作组装已拷贝的分片成为一个对象。您可以使用upload_part_copy复制一个分片。
代码示例
def multipart_copy(self):
bucket = '<dst-bucket-name>'
key = '<dst-object-key>'
source_bucket = '<source-bucket-name>'
source_key = '<source-object-key>'
parts = [] # part list uploaded by client
part_size = 5 * 1024 * 1024
# head_object
resp = self.s3_client.head_object(
Bucket=bucket,
Key=source_key
)
print('head_object success length: %s' %resp['ContentLength'])
size = resp['ContentLength']
# create_multipart_upload
resp = self.s3_client.create_multipart_upload(
Bucket=bucket,
Key=key
)
upload_id = resp['UploadId']
print('create_multipart_upload success upload_id: %s' %upload_id)
# upload_part
start = 0
part_num = 1
while start < size:
if start + part_size < size:
end = start + part_size - 1
else:
end = size - 1
resp = self.s3_client.upload_part_copy(
Bucket=bucket,
Key=key,
CopySource={'Bucket': source_bucket, 'Key': source_key},
UploadId=upload_id,
PartNumber=part_num,
CopySourceRange='bytes=%d-%d' %(start, end)
)
print('copy part %d success' %part_num)
part = {
'ETag': resp['CopyPartResult']['ETag'],
'PartNumber': part_num
}
parts.append(part)
start = end + 1
part_num += 1
# complete_multipart_upload
resp = self.s3_client.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={
'Parts': parts
},
)
print('complete_multipart_upload success upload_id: %s' %upload_id)
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 目的对象key | 是 |
PartNumber | int | 当前分片号码 | 是 |
UploadId | string | 通过创建上传任务接口获取到的任务Id | 是 |
CopySource | string | 源对象 | 是 |
CopySourceRange | string | 指定本次分片拷贝的数据范围,必须是"bytes=first-last"的格式,例如"bytes=0-9"表示拷贝原对象中前10字节的数据,只有当拷贝的分片大小大于5MB的时候有效 | 否 |
关于CopySource:
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 源对象key | 是 |
返回结果
参数 | 类型 | 说明 |
---|
CopyPartResult | CopyPartResult | 包含拷贝分片的Entity Tag和最后修改时间等信息 |
取消分片上传任务
功能说明
您可以使用abort_multipart_upload终止一个分片上传任务。
代码示例
def abort_multipart_upload(self):
print('abort_multipart_upload')
key = '<your-object-key>'
upload_id = '<upload-id>'
resp = self.s3_client.abort_multipart_upload(
Bucket='<your-bucket-name>',
Key=key,
UploadId=upload_id,
)
print(resp)
请求参数
参数 | 类型 | 说明 | 是否必要 |
---|
Bucket | string | 桶名称 | 是 |
Key | string | 对象key | 是 |
UploadId | string | 需要终止的上传任务id | 是 |