专栏
天翼云开发者社区

对象存储融合版java sdk实现断点续传

2023-12-20 10:40:20 7阅读

对象存储融合版java sdk提供了标准的分片上传接口,可以使用这些接口实现分片上传和断点续传。

初始化分片上传任务接口

初始化分片上传任务使用InitiateMultipartUpload接口,调用成功之后服务器会返回一个uploadId,这个uploadId就是这次上传任务的Id,通过这个Id可以管理这个上传任务。

public String InitiateMultipartUpload(String bucket, String key) {
    System.out.println("InitiateMultipartUpload");
    InitiateMultipartUploadRequest initReq = new InitiateMultipartUploadRequest(bucket, key);
    initReq.withCannedACL(CannedAccessControlList.PublicRead);
    InitiateMultipartUploadResult initRes = s3Client.initiateMultipartUpload(initReq);
    System.out.println("InitiateMultipartUpload success, uploadId=" + initRes.getUploadId());
    return initRes.getUploadId();
}

上传分片接口

获取到UploadId之后,可以使用这个Id来进行分片的上传,用户需要定义一个分片大小,自己对需要上传的文件进行切片,分片大小不能小于5MB(最后一个分片例外)。

public String UploadPart(String bucket, String key, String uploadId, int partNumber, byte[] buf) {
    UploadPartRequest partReq = new UploadPartRequest();
    partReq.setBucketName(bucket);
    partReq.setKey(key);
    partReq.setUploadId(uploadId);
    partReq.setInputStream(new ByteArrayInputStream(buf));
    partReq.setPartNumber(partNumber); // 从1开始
    partReq.setObjectMetadata(new ObjectMetadata());
    partReq.setPartSize(buf.length);
    System.out.printf("UploadPart %d, currentPartSize=%d\n", partNumber, buf.length);
    UploadPartResult partRes = s3Client.uploadPart(partReq);
    System.out.printf("UploadPart %d success, etag=%s\n", partNumber, partRes.getETag());
    return partRes.getETag();
}

完成分片上传接口

上传完所有分片之后,需要调用完成分片上传接口,服务器会合并所有分片,并把分片上传任务标记完成。

public String CompleteMultipartUpload(String bucket, String key, String uploadId, List<PartETag> partETags) {
    System.out.println("CompleteMultipartUpload");
    CompleteMultipartUploadRequest completeReq =
            new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
    CompleteMultipartUploadResult completeRes = s3Client.completeMultipartUpload(completeReq);
    System.out.println("CompleteMultipartUpload success, etag=" + completeRes.getETag());
    return completeRes.getETag();
}

获取已经上传的分片列表接口

分片任务完成之前,可以调用ListParts来获取已经上传的分片列表,使用这个接口,就可以实现断点续传的功能,在上传分片的时候,对于已经完成的分片,不需要重复上传。

public Map<Integer, PartETag> ListParts(String bucket, String key, String uploadId) {
    System.out.println("ListParts");
    ListPartsRequest listParts = new ListPartsRequest(bucket, key, uploadId);
    PartListing partListing = this.s3Client.listParts(listParts);
    Map<Integer, PartETag> partETagsMap = new HashMap<>();
    for (PartSummary part : partListing.getParts()) {
        partETagsMap.put(part.getPartNumber(), new PartETag(part.getPartNumber(), part.getETag()));
    }
    return partETagsMap;
}

断点续传

上传任务没有完成,可以保存uploadId,下次需要重新启动上传任务的时候,通过UploadId获取到已经上传的分片列表,实现断点续传。

static class UploadParam {
    public String bucket;
    public String key;
    public File file;
    public int partSize;
    public int lastPartSize;
    public int totalPartNum;
}

public void resumeUpload(UploadParam param, String uploadId) {
    System.out.println("resumeUpload start");
    // 根据uploadId获取已经上传成功的分片,断点续传的时候可以不需要上传这些分片
    Map<Integer, PartETag> partETagsMap = this.ListParts(param.bucket, param.key, uploadId);
    try {
        List<PartETag> partETags = new ArrayList<>();
        InputStream stream = new FileInputStream(param.file);
        for (int i = 1; i <= param.totalPartNum; i++){
            // 计算当前分片大小
            int currentPartSize = param.partSize;
            if (param.lastPartSize != 0 && i == param.totalPartNum){
                currentPartSize = param.lastPartSize;
            }
            PartETag partEtag = partETagsMap.get(i);
            if (partEtag != null) {
                // 跳过,不需要上传此分片
                partETags.add(new PartETag(partEtag.getPartNumber(), partEtag.getETag()));
                stream.skip(currentPartSize);
                continue;
            }
            // 读取分片内容
            byte[] buf = new byte[currentPartSize];
            stream.read(buf);
            // 上传分片
            String etag = this.UploadPart(param.bucket, param.key, uploadId, i, buf);
            partETags.add(new PartETag(i, etag));
        }

        // 完成上传
        this.CompleteMultipartUpload(param.bucket, param.key, uploadId, partETags);
    } catch (Exception e) {
        System.out.println("resumeUpload: error=" + e.getMessage());
    }
}

 

  • 0
  • 0
  • 0
0 评论
0/1000
评论(0) 发表评论
陈****斌

陈****斌

1 篇文章 0 粉丝
关注

对象存储融合版java sdk实现断点续传

2023-12-20 10:40:20 7阅读

对象存储融合版java sdk提供了标准的分片上传接口,可以使用这些接口实现分片上传和断点续传。

初始化分片上传任务接口

初始化分片上传任务使用InitiateMultipartUpload接口,调用成功之后服务器会返回一个uploadId,这个uploadId就是这次上传任务的Id,通过这个Id可以管理这个上传任务。

public String InitiateMultipartUpload(String bucket, String key) {
    System.out.println("InitiateMultipartUpload");
    InitiateMultipartUploadRequest initReq = new InitiateMultipartUploadRequest(bucket, key);
    initReq.withCannedACL(CannedAccessControlList.PublicRead);
    InitiateMultipartUploadResult initRes = s3Client.initiateMultipartUpload(initReq);
    System.out.println("InitiateMultipartUpload success, uploadId=" + initRes.getUploadId());
    return initRes.getUploadId();
}

上传分片接口

获取到UploadId之后,可以使用这个Id来进行分片的上传,用户需要定义一个分片大小,自己对需要上传的文件进行切片,分片大小不能小于5MB(最后一个分片例外)。

public String UploadPart(String bucket, String key, String uploadId, int partNumber, byte[] buf) {
    UploadPartRequest partReq = new UploadPartRequest();
    partReq.setBucketName(bucket);
    partReq.setKey(key);
    partReq.setUploadId(uploadId);
    partReq.setInputStream(new ByteArrayInputStream(buf));
    partReq.setPartNumber(partNumber); // 从1开始
    partReq.setObjectMetadata(new ObjectMetadata());
    partReq.setPartSize(buf.length);
    System.out.printf("UploadPart %d, currentPartSize=%d\n", partNumber, buf.length);
    UploadPartResult partRes = s3Client.uploadPart(partReq);
    System.out.printf("UploadPart %d success, etag=%s\n", partNumber, partRes.getETag());
    return partRes.getETag();
}

完成分片上传接口

上传完所有分片之后,需要调用完成分片上传接口,服务器会合并所有分片,并把分片上传任务标记完成。

public String CompleteMultipartUpload(String bucket, String key, String uploadId, List<PartETag> partETags) {
    System.out.println("CompleteMultipartUpload");
    CompleteMultipartUploadRequest completeReq =
            new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
    CompleteMultipartUploadResult completeRes = s3Client.completeMultipartUpload(completeReq);
    System.out.println("CompleteMultipartUpload success, etag=" + completeRes.getETag());
    return completeRes.getETag();
}

获取已经上传的分片列表接口

分片任务完成之前,可以调用ListParts来获取已经上传的分片列表,使用这个接口,就可以实现断点续传的功能,在上传分片的时候,对于已经完成的分片,不需要重复上传。

public Map<Integer, PartETag> ListParts(String bucket, String key, String uploadId) {
    System.out.println("ListParts");
    ListPartsRequest listParts = new ListPartsRequest(bucket, key, uploadId);
    PartListing partListing = this.s3Client.listParts(listParts);
    Map<Integer, PartETag> partETagsMap = new HashMap<>();
    for (PartSummary part : partListing.getParts()) {
        partETagsMap.put(part.getPartNumber(), new PartETag(part.getPartNumber(), part.getETag()));
    }
    return partETagsMap;
}

断点续传

上传任务没有完成,可以保存uploadId,下次需要重新启动上传任务的时候,通过UploadId获取到已经上传的分片列表,实现断点续传。

static class UploadParam {
    public String bucket;
    public String key;
    public File file;
    public int partSize;
    public int lastPartSize;
    public int totalPartNum;
}

public void resumeUpload(UploadParam param, String uploadId) {
    System.out.println("resumeUpload start");
    // 根据uploadId获取已经上传成功的分片,断点续传的时候可以不需要上传这些分片
    Map<Integer, PartETag> partETagsMap = this.ListParts(param.bucket, param.key, uploadId);
    try {
        List<PartETag> partETags = new ArrayList<>();
        InputStream stream = new FileInputStream(param.file);
        for (int i = 1; i <= param.totalPartNum; i++){
            // 计算当前分片大小
            int currentPartSize = param.partSize;
            if (param.lastPartSize != 0 && i == param.totalPartNum){
                currentPartSize = param.lastPartSize;
            }
            PartETag partEtag = partETagsMap.get(i);
            if (partEtag != null) {
                // 跳过,不需要上传此分片
                partETags.add(new PartETag(partEtag.getPartNumber(), partEtag.getETag()));
                stream.skip(currentPartSize);
                continue;
            }
            // 读取分片内容
            byte[] buf = new byte[currentPartSize];
            stream.read(buf);
            // 上传分片
            String etag = this.UploadPart(param.bucket, param.key, uploadId, i, buf);
            partETags.add(new PartETag(i, etag));
        }

        // 完成上传
        this.CompleteMultipartUpload(param.bucket, param.key, uploadId, partETags);
    } catch (Exception e) {
        System.out.println("resumeUpload: error=" + e.getMessage());
    }
}

 

文章来自专栏

篇文章 订阅
0 评论
0/1000
评论(0) 发表评论
  • 0
    点赞
  • 0
    收藏
  • 0
    评论