2023-12-20 10:40:20 7阅读
对象存储融合版java sdk提供了标准的分片上传接口,可以使用这些接口实现分片上传和断点续传。
初始化分片上传任务接口
初始化分片上传任务使用InitiateMultipartUpload接口,调用成功之后服务器会返回一个uploadId,这个uploadId就是这次上传任务的Id,通过这个Id可以管理这个上传任务。
public String InitiateMultipartUpload(String bucket, String key) {
System.out.println("InitiateMultipartUpload");
InitiateMultipartUploadRequest initReq = new InitiateMultipartUploadRequest(bucket, key);
initReq.withCannedACL(CannedAccessControlList.PublicRead);
InitiateMultipartUploadResult initRes = s3Client.initiateMultipartUpload(initReq);
System.out.println("InitiateMultipartUpload success, uploadId=" + initRes.getUploadId());
return initRes.getUploadId();
}
上传分片接口
获取到UploadId之后,可以使用这个Id来进行分片的上传,用户需要定义一个分片大小,自己对需要上传的文件进行切片,分片大小不能小于5MB(最后一个分片例外)。
public String UploadPart(String bucket, String key, String uploadId, int partNumber, byte[] buf) {
UploadPartRequest partReq = new UploadPartRequest();
partReq.setBucketName(bucket);
partReq.setKey(key);
partReq.setUploadId(uploadId);
partReq.setInputStream(new ByteArrayInputStream(buf));
partReq.setPartNumber(partNumber); // 从1开始
partReq.setObjectMetadata(new ObjectMetadata());
partReq.setPartSize(buf.length);
System.out.printf("UploadPart %d, currentPartSize=%d\n", partNumber, buf.length);
UploadPartResult partRes = s3Client.uploadPart(partReq);
System.out.printf("UploadPart %d success, etag=%s\n", partNumber, partRes.getETag());
return partRes.getETag();
}
完成分片上传接口
上传完所有分片之后,需要调用完成分片上传接口,服务器会合并所有分片,并把分片上传任务标记完成。
public String CompleteMultipartUpload(String bucket, String key, String uploadId, List<PartETag> partETags) {
System.out.println("CompleteMultipartUpload");
CompleteMultipartUploadRequest completeReq =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
CompleteMultipartUploadResult completeRes = s3Client.completeMultipartUpload(completeReq);
System.out.println("CompleteMultipartUpload success, etag=" + completeRes.getETag());
return completeRes.getETag();
}
获取已经上传的分片列表接口
分片任务完成之前,可以调用ListParts来获取已经上传的分片列表,使用这个接口,就可以实现断点续传的功能,在上传分片的时候,对于已经完成的分片,不需要重复上传。
public Map<Integer, PartETag> ListParts(String bucket, String key, String uploadId) {
System.out.println("ListParts");
ListPartsRequest listParts = new ListPartsRequest(bucket, key, uploadId);
PartListing partListing = this.s3Client.listParts(listParts);
Map<Integer, PartETag> partETagsMap = new HashMap<>();
for (PartSummary part : partListing.getParts()) {
partETagsMap.put(part.getPartNumber(), new PartETag(part.getPartNumber(), part.getETag()));
}
return partETagsMap;
}
断点续传
上传任务没有完成,可以保存uploadId,下次需要重新启动上传任务的时候,通过UploadId获取到已经上传的分片列表,实现断点续传。
static class UploadParam {
public String bucket;
public String key;
public File file;
public int partSize;
public int lastPartSize;
public int totalPartNum;
}
public void resumeUpload(UploadParam param, String uploadId) {
System.out.println("resumeUpload start");
// 根据uploadId获取已经上传成功的分片,断点续传的时候可以不需要上传这些分片
Map<Integer, PartETag> partETagsMap = this.ListParts(param.bucket, param.key, uploadId);
try {
List<PartETag> partETags = new ArrayList<>();
InputStream stream = new FileInputStream(param.file);
for (int i = 1; i <= param.totalPartNum; i++){
// 计算当前分片大小
int currentPartSize = param.partSize;
if (param.lastPartSize != 0 && i == param.totalPartNum){
currentPartSize = param.lastPartSize;
}
PartETag partEtag = partETagsMap.get(i);
if (partEtag != null) {
// 跳过,不需要上传此分片
partETags.add(new PartETag(partEtag.getPartNumber(), partEtag.getETag()));
stream.skip(currentPartSize);
continue;
}
// 读取分片内容
byte[] buf = new byte[currentPartSize];
stream.read(buf);
// 上传分片
String etag = this.UploadPart(param.bucket, param.key, uploadId, i, buf);
partETags.add(new PartETag(i, etag));
}
// 完成上传
this.CompleteMultipartUpload(param.bucket, param.key, uploadId, partETags);
} catch (Exception e) {
System.out.println("resumeUpload: error=" + e.getMessage());
}
}
2023-12-20 10:40:20 7阅读
对象存储融合版java sdk提供了标准的分片上传接口,可以使用这些接口实现分片上传和断点续传。
初始化分片上传任务接口
初始化分片上传任务使用InitiateMultipartUpload接口,调用成功之后服务器会返回一个uploadId,这个uploadId就是这次上传任务的Id,通过这个Id可以管理这个上传任务。
public String InitiateMultipartUpload(String bucket, String key) {
System.out.println("InitiateMultipartUpload");
InitiateMultipartUploadRequest initReq = new InitiateMultipartUploadRequest(bucket, key);
initReq.withCannedACL(CannedAccessControlList.PublicRead);
InitiateMultipartUploadResult initRes = s3Client.initiateMultipartUpload(initReq);
System.out.println("InitiateMultipartUpload success, uploadId=" + initRes.getUploadId());
return initRes.getUploadId();
}
上传分片接口
获取到UploadId之后,可以使用这个Id来进行分片的上传,用户需要定义一个分片大小,自己对需要上传的文件进行切片,分片大小不能小于5MB(最后一个分片例外)。
public String UploadPart(String bucket, String key, String uploadId, int partNumber, byte[] buf) {
UploadPartRequest partReq = new UploadPartRequest();
partReq.setBucketName(bucket);
partReq.setKey(key);
partReq.setUploadId(uploadId);
partReq.setInputStream(new ByteArrayInputStream(buf));
partReq.setPartNumber(partNumber); // 从1开始
partReq.setObjectMetadata(new ObjectMetadata());
partReq.setPartSize(buf.length);
System.out.printf("UploadPart %d, currentPartSize=%d\n", partNumber, buf.length);
UploadPartResult partRes = s3Client.uploadPart(partReq);
System.out.printf("UploadPart %d success, etag=%s\n", partNumber, partRes.getETag());
return partRes.getETag();
}
完成分片上传接口
上传完所有分片之后,需要调用完成分片上传接口,服务器会合并所有分片,并把分片上传任务标记完成。
public String CompleteMultipartUpload(String bucket, String key, String uploadId, List<PartETag> partETags) {
System.out.println("CompleteMultipartUpload");
CompleteMultipartUploadRequest completeReq =
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
CompleteMultipartUploadResult completeRes = s3Client.completeMultipartUpload(completeReq);
System.out.println("CompleteMultipartUpload success, etag=" + completeRes.getETag());
return completeRes.getETag();
}
获取已经上传的分片列表接口
分片任务完成之前,可以调用ListParts来获取已经上传的分片列表,使用这个接口,就可以实现断点续传的功能,在上传分片的时候,对于已经完成的分片,不需要重复上传。
public Map<Integer, PartETag> ListParts(String bucket, String key, String uploadId) {
System.out.println("ListParts");
ListPartsRequest listParts = new ListPartsRequest(bucket, key, uploadId);
PartListing partListing = this.s3Client.listParts(listParts);
Map<Integer, PartETag> partETagsMap = new HashMap<>();
for (PartSummary part : partListing.getParts()) {
partETagsMap.put(part.getPartNumber(), new PartETag(part.getPartNumber(), part.getETag()));
}
return partETagsMap;
}
断点续传
上传任务没有完成,可以保存uploadId,下次需要重新启动上传任务的时候,通过UploadId获取到已经上传的分片列表,实现断点续传。
static class UploadParam {
public String bucket;
public String key;
public File file;
public int partSize;
public int lastPartSize;
public int totalPartNum;
}
public void resumeUpload(UploadParam param, String uploadId) {
System.out.println("resumeUpload start");
// 根据uploadId获取已经上传成功的分片,断点续传的时候可以不需要上传这些分片
Map<Integer, PartETag> partETagsMap = this.ListParts(param.bucket, param.key, uploadId);
try {
List<PartETag> partETags = new ArrayList<>();
InputStream stream = new FileInputStream(param.file);
for (int i = 1; i <= param.totalPartNum; i++){
// 计算当前分片大小
int currentPartSize = param.partSize;
if (param.lastPartSize != 0 && i == param.totalPartNum){
currentPartSize = param.lastPartSize;
}
PartETag partEtag = partETagsMap.get(i);
if (partEtag != null) {
// 跳过,不需要上传此分片
partETags.add(new PartETag(partEtag.getPartNumber(), partEtag.getETag()));
stream.skip(currentPartSize);
continue;
}
// 读取分片内容
byte[] buf = new byte[currentPartSize];
stream.read(buf);
// 上传分片
String etag = this.UploadPart(param.bucket, param.key, uploadId, i, buf);
partETags.add(new PartETag(i, etag));
}
// 完成上传
this.CompleteMultipartUpload(param.bucket, param.key, uploadId, partETags);
} catch (Exception e) {
System.out.println("resumeUpload: error=" + e.getMessage());
}
}