开发者社区 问答 正文

将流视频分段上传到S3后,视频文件已损坏

我想从stdin中读取视频文件流,并使用分段上传将其上传到s3。但是,由于某些未知原因,文件被上传到s3之后已损坏。我不确定是否必须分别传递某种元数据。我的代码段如下。

OutputStream s3OutputStream = new S3UploadStream(s3Config.getS3Client(), bucketName,
          mediaDownloadRequest.getVideoExtension(), s3MediaLink, threadCount);

int line;
while ((line = reader.read()) >= 0) {
   s3OutputStream.write(line);
}
s3OutputStream.close();

S3UploadStream类:

public class S3UploadStream extends OutputStream {

  private final static Integer PART_SIZE = 5 * 1024 * 1024;
  private final AmazonS3 s3client;
  private final String bucketName;
  private final String objectUrl;
  private final String uploadId;
  private final List<PartETag> partETags = new LinkedList<>();
  private final ThreadPoolExecutor executor;
  private byte[] partData = new byte[PART_SIZE];
  private int partDataIndex = 0;
  private int totalPartCountIndex = 0;
  private volatile Boolean closed = false;

  public S3UploadStream(AmazonS3 s3client, String bucketName, VideoExtension ext, String objectUrl,
      int uploadThreadsCount) {
    this.s3client = s3client;
    this.bucketName = bucketName;
    this.objectUrl = objectUrl;
    ObjectMetadata metadata = new ObjectMetadata();
    metadata.setContentType("video/" + ext);
    InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName,
        objectUrl, metadata);
    InitiateMultipartUploadResult initResponse = s3client.initiateMultipartUpload(initRequest);
    this.uploadId = initResponse.getUploadId();
    this.executor = new ThreadPoolExecutor(uploadThreadsCount, uploadThreadsCount, 60,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(100));
  }


  @Override
  public synchronized void write(int b) throws IOException {
    if (closed) {
      throw new IOException("Trying to write to a closed S3UploadStream");
    }
    partData[partDataIndex++] = (byte) b;
    uploadPart(false);
  }

  @Override
  public synchronized void close() {
    if (closed) {
      return;
    }
    closed = true;
    uploadPart(true);
    executor.shutdown();
    try {
      executor.awaitTermination(2, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
    }
    CompleteMultipartUploadRequest compRequest =
        new CompleteMultipartUploadRequest(bucketName, objectUrl, uploadId, partETags);
    s3client.completeMultipartUpload(compRequest);
  }

  private synchronized void uploadPart(Boolean force) {
    if (!force && partDataIndex < PART_SIZE) {
      return;
    }
    createUploadPartTask(partData, partDataIndex);
    partData = new byte[PART_SIZE];
    partDataIndex = 0;
  }

  private synchronized void createUploadPartTask(byte[] partData, int partDataIndex) {
    InputStream stream = new ByteArrayInputStream(partData, 0, partDataIndex);
    UploadPartRequest uploadRequest = new UploadPartRequest()
        .withBucketName(bucketName)
        .withKey(objectUrl)
        .withUploadId(uploadId)
        .withPartNumber(++totalPartCountIndex)
        .withInputStream(stream)
        .withPartSize(partDataIndex);
    executor.execute(() -> {
      PartETag partETag = s3client.uploadPart(uploadRequest).getPartETag();
      synchronized (partETags) {
        partETags.add(partETag);
      }
    });
  }

展开
收起
垚tutu 2019-11-29 20:00:29 1004 分享 版权
阿里云 AI 助理回答

从你提供的代码片段来看,问题可能出在处理视频文件流的方式上。在Java中,reader.read()默认读取的是一个字节(返回值为int,因为字节范围是-128到127,而int可以覆盖这个范围),但视频文件作为二进制数据,应该以字节数组的形式进行读写以保持其原始格式不变。

你的S3UploadStream类中的write(int b)方法每次只写入一个字节,这可能导致视频文件的二进制数据被错误地处理,尤其是在字节边界处,从而造成上传后文件损坏。

为了修复这个问题,你需要调整读取和写入方式,确保以字节数组的形式处理视频流。下面是一个修改建议:

  1. 修改读取方式:使用InputStream而不是可能按字符读取的Reader来处理二进制数据。
  2. 批量写入:在S3UploadStream中,重写write(byte[] b, int off, int len)方法来支持批量写入字节数组,而非单个字节。

下面是修改后的代码示例:

// 修改初始化部分,使用InputStream替代Reader
InputStream inputStream = ...; // 从stdin或其他来源获取InputStream

byte[] buffer = new byte[4096]; // 缓冲区大小可以根据实际情况调整
int bytesRead;

while ((bytesRead = inputStream.read(buffer)) != -1) {
    s3OutputStream.write(buffer, 0, bytesRead);
}

s3OutputStream.close();

对于S3UploadStream类,确保重写了正确的write方法来处理字节数组:

@Override
public synchronized void write(byte[] b, int off, int len) throws IOException {
    if (closed) {
        throw new IOException("Trying to write to a closed S3UploadStream");
    }
    // 这里需要根据实际情况调整如何将接收到的数据整合到partData中,并适时调用uploadPart
    // 注意考虑len可能大于PART_SIZE的情况,需要适当分割数据并上传
    // 示例逻辑简化,实际需处理数据分段、线程池提交等细节
    System.arraycopy(b, off, partData, partDataIndex, len);
    partDataIndex += len;
    uploadPart(false); // 根据情况判断是否真的需要立即上传此部分
}

请注意,上述代码仅提供了一个概念性的修正方向,具体实现时还需考虑数据分块逻辑的细节,比如当累积的数据达到或超过PART_SIZE时才执行uploadPart,以及如何正确管理partDataIndextotalPartCountIndex以保证所有数据都被正确上传。同时,确保在关闭输出流之前,所有剩余的数据都已上传完成。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答
问答分类:
问答地址: