我想从stdin中读取视频文件流,并使用分段上传将其上传到s3。但是,由于某些未知原因,文件被上传到s3之后已损坏。我不确定是否必须分别传递某种元数据。我的代码段如下。
OutputStream s3OutputStream = new S3UploadStream(s3Config.getS3Client(), bucketName,
mediaDownloadRequest.getVideoExtension(), s3MediaLink, threadCount);
int line;
while ((line = reader.read()) >= 0) {
s3OutputStream.write(line);
}
s3OutputStream.close();
S3UploadStream类:
public class S3UploadStream extends OutputStream {
private final static Integer PART_SIZE = 5 * 1024 * 1024;
private final AmazonS3 s3client;
private final String bucketName;
private final String objectUrl;
private final String uploadId;
private final List<PartETag> partETags = new LinkedList<>();
private final ThreadPoolExecutor executor;
private byte[] partData = new byte[PART_SIZE];
private int partDataIndex = 0;
private int totalPartCountIndex = 0;
private volatile Boolean closed = false;
public S3UploadStream(AmazonS3 s3client, String bucketName, VideoExtension ext, String objectUrl,
int uploadThreadsCount) {
this.s3client = s3client;
this.bucketName = bucketName;
this.objectUrl = objectUrl;
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType("video/" + ext);
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName,
objectUrl, metadata);
InitiateMultipartUploadResult initResponse = s3client.initiateMultipartUpload(initRequest);
this.uploadId = initResponse.getUploadId();
this.executor = new ThreadPoolExecutor(uploadThreadsCount, uploadThreadsCount, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
}
@Override
public synchronized void write(int b) throws IOException {
if (closed) {
throw new IOException("Trying to write to a closed S3UploadStream");
}
partData[partDataIndex++] = (byte) b;
uploadPart(false);
}
@Override
public synchronized void close() {
if (closed) {
return;
}
closed = true;
uploadPart(true);
executor.shutdown();
try {
executor.awaitTermination(2, TimeUnit.MINUTES);
} catch (InterruptedException e) {
}
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(bucketName, objectUrl, uploadId, partETags);
s3client.completeMultipartUpload(compRequest);
}
private synchronized void uploadPart(Boolean force) {
if (!force && partDataIndex < PART_SIZE) {
return;
}
createUploadPartTask(partData, partDataIndex);
partData = new byte[PART_SIZE];
partDataIndex = 0;
}
private synchronized void createUploadPartTask(byte[] partData, int partDataIndex) {
InputStream stream = new ByteArrayInputStream(partData, 0, partDataIndex);
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(objectUrl)
.withUploadId(uploadId)
.withPartNumber(++totalPartCountIndex)
.withInputStream(stream)
.withPartSize(partDataIndex);
executor.execute(() -> {
PartETag partETag = s3client.uploadPart(uploadRequest).getPartETag();
synchronized (partETags) {
partETags.add(partETag);
}
});
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
从你提供的代码片段来看,问题可能出在处理视频文件流的方式上。在Java中,reader.read()
默认读取的是一个字节(返回值为int,因为字节范围是-128到127,而int可以覆盖这个范围),但视频文件作为二进制数据,应该以字节数组的形式进行读写以保持其原始格式不变。
你的S3UploadStream
类中的write(int b)
方法每次只写入一个字节,这可能导致视频文件的二进制数据被错误地处理,尤其是在字节边界处,从而造成上传后文件损坏。
为了修复这个问题,你需要调整读取和写入方式,确保以字节数组的形式处理视频流。下面是一个修改建议:
InputStream
而不是可能按字符读取的Reader
来处理二进制数据。S3UploadStream
中,重写write(byte[] b, int off, int len)
方法来支持批量写入字节数组,而非单个字节。下面是修改后的代码示例:
// 修改初始化部分,使用InputStream替代Reader
InputStream inputStream = ...; // 从stdin或其他来源获取InputStream
byte[] buffer = new byte[4096]; // 缓冲区大小可以根据实际情况调整
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
s3OutputStream.write(buffer, 0, bytesRead);
}
s3OutputStream.close();
对于S3UploadStream
类,确保重写了正确的write
方法来处理字节数组:
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Trying to write to a closed S3UploadStream");
}
// 这里需要根据实际情况调整如何将接收到的数据整合到partData中,并适时调用uploadPart
// 注意考虑len可能大于PART_SIZE的情况,需要适当分割数据并上传
// 示例逻辑简化,实际需处理数据分段、线程池提交等细节
System.arraycopy(b, off, partData, partDataIndex, len);
partDataIndex += len;
uploadPart(false); // 根据情况判断是否真的需要立即上传此部分
}
请注意,上述代码仅提供了一个概念性的修正方向,具体实现时还需考虑数据分块逻辑的细节,比如当累积的数据达到或超过PART_SIZE
时才执行uploadPart
,以及如何正确管理partDataIndex
和totalPartCountIndex
以保证所有数据都被正确上传。同时,确保在关闭输出流之前,所有剩余的数据都已上传完成。