开发者社区 问答 正文

MaxCompute用户指南:数据上传下载:批量数据通道SDK介绍:TunnelBufferedWriter



一次完整的上传流程通常包括以下步骤:


  1. 先对数据进行划分。

  2. 为每个数据块指定 block Id,即调用 openRecordWriter(id)。

  3. 用一个或多个线程分别将这些 block 上传上去,并在某个 block 上传失败以后,需要对整个 block 进行重传。

  4. 在所有 block 都上传以后,向服务端提供上传成功的 blockid list 进行校验,即调用 session.commit([1,2,3,…])。
    由于服务端对 block 管理,连接超时等的一些限制,上传过程逻辑变得比较复杂,为了简化上传过程,SDK 提供了更高级的一种 RecordWriter——TunnelBufferWriter。

接口定义如下:
  1.     public class TunnelBufferedWriter implements RecordWriter {
  2.         public TunnelBufferedWriter(TableTunnel.UploadSession session, CompressOption option) throws IOException;
  3.         public long getTotalBytes();
  4.         public void setBufferSize(long bufferSize);
  5.         public void setRetryStrategy(RetryStrategy strategy);
  6.         public void write(Record r) throws IOException;
  7.         public void close() throws IOException;
  8.     }

TunnelBufferedWriter 对象:

  • 生命周期:从创建 RecordWriter 到数据上传结束。

  • 创建 TunnelBufferedWriter 实例:通过调用 UploadSession 的 openBufferedWriter 接口创建。

  • 数据上传:调用 Write 接口,数据会先写入本地缓存区,缓存区满后会批量提交到服务端,避免了连接超时,同时,如果上传失败会自动进行重试。

  • 结束上传: 调用 close 接口,最后再调用 UploadSession 的 commit 接口,即可完成上传。

  • 缓冲区控制:可以通过 setBufferSize 这个接口修改缓冲区占内存的字节数(bytes),建议设置 64M 以上的大小,避免服务端产生过多小文件,影响性能,一般无须设置,维持默认值即可。

  • 重试策略设置:您可以选择三种重试回避策略:指数回避(EXPONENTIAL_BACKOFF)、线性时间回避(LINEAR_BACKOFF)、常数时间回避(CONSTANT_BACKOFF)。例如:下面这段代码可以将Write 的重试次数调整为 6,每一次重试之前先分别回避 4s、8s、16s、32s、64s 和 128s(从 4开始的指数递增的序列),这个也是默认的行为,一般情况不建议调整。
  1. RetryStrategy retry
  2.   = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)
  3. writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
  4. writer.setRetryStrategy(retry);

展开
收起
行者武松 2017-10-23 15:32:03 2238 分享 版权
阿里云 AI 助理回答

您描述的是使用阿里云MaxCompute(原名ODPS)的Tunnel SDK进行数据上传的过程,这个过程通过一系列步骤确保了数据上传的高效性和可靠性。下面我将根据您的描述,进一步解释每个关键步骤和如何利用TunnelBufferedWriter来简化这一流程。

数据划分与Block ID分配

在上传大量数据到MaxCompute表之前,数据会被逻辑上划分为多个数据块(block),每个数据块被赋予一个唯一的block ID。这一步骤通过调用openRecordWriter(id)实现,但实际操作中,当使用TunnelBufferedWriter时,这些细节对用户透明化,你只需关注写入数据本身。

使用TunnelBufferedWriter的优势

  • 简化上传逻辑TunnelBufferedWriter自动管理数据缓存、批量提交以及失败重试,大大简化了上传过程中的复杂性。
  • 本地缓存与批量提交:数据先写入本地内存缓冲区,满了之后再批量上传至服务端,减少了网络交互次数,提高了效率,并且能有效应对连接超时问题。
  • 自动重传机制:如果数据上传过程中出现错误,TunnelBufferedWriter会自动尝试重新上传,无需用户干预。
  • 缓冲区大小调整:虽然默认配置通常能满足大多数需求,但通过setBufferSize可以根据实际情况调整缓冲区大小,以优化性能或减少资源消耗。
  • 自定义重试策略:允许用户根据需要设置重试次数和重试间隔,提供了三种回避策略,增强了对异常情况的处理灵活性。

示例代码解析

RetryStrategy retry = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF);
TunnelBufferedWriter writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
writer.setRetryStrategy(retry);

这段代码展示了如何自定义重试策略并应用到TunnelBufferedWriter上。它设置了最大重试次数为6次,首次重试等待时间为4秒,后续每次重试等待时间翻倍,这是一种指数回退策略,有助于在遇到暂时性故障时避免频繁重试导致的服务压力。

结束上传

完成所有数据写入后,调用close()方法关闭TunnelBufferedWriter,然后通过uploadSession.commit()提交上传成功的block ID列表,确认数据上传完成。

总之,TunnelBufferedWriter是MaxCompute Tunnel SDK提供的高级接口,旨在提供一种更高效、更健壮的数据上传方式,特别适合大规模数据导入场景,通过自动化处理许多底层细节,使得开发者能够更加专注于业务逻辑。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答