MaxCompute Tunnel SDK数据上传利器——BufferedWriter使用指南

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 fa.

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps


MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 failover 的工作交给了客户端。

用户在使用 Tunnel SDK 编程时,需要对 block 这一层的语义进行认知,并且驱动数据上传的整个过程[1],并且自己进行容错,毕竟『网络错误是正常而不是异常』。由于用户文档中并没有强调这一点的重要性,导致很多用户踩了坑,一种常见的出错场景是,当客户端写数据的速度过慢,两次 write 的间隔超时[2],导致整个 block 上传失败。

High Level API

MaxCompute Java SDK 在 0.21.3-public  之后新增了 BufferredWriter 这个更高层的 API,简化了数据上传的过程,并且提供了容错的功能。 BufferedWriter 对用户隐藏了 block 这个概念,从用户角度看,就是在 session 上打开一个 writer 然后往里面写记录即可:

RecordWriter writer = null;

try {
  int i = 0;  
  writer = uploadSession.openBufferedWriter();
  Record product = uploadSession.newRecord();

  for (String item : items) {
    product.setString("name", item);
    product.setBigint("id", i);
    writer.write(product);
    i += 1;
  }
} finally {
  if (writer != null) {
    writer.close();
  }
}
uploadSession.commit();

具体实现时 BufferedWriter 先将记录缓存在客户端的缓冲区中,并在缓冲区填满之后打开一个 http 连接进行上传。BufferedWriter 会尽最大可能容错,保证数据上传上去。

  • 由于屏蔽了底层细节,这个接口可能并不适合数据预划分、断点续传、分批次上传等需要细粒度控制的场景。

多线程上传示例

多线程上传时,每个线程只需要打开一个 writer 往里面写数据就行了。

class UploadThread extends Thread {
  private UploadSession session;
  private static int RECORD_COUNT = 1200;

  public UploadThread(UploadSession session) {
    this.session = session;
  }

  @Override
  public void run() {
    RecordWriter writer = up.openBufferedWriter();
    Record r = up.newRecord();
    for (int i = 0; i < RECORD_COUNT; i++) {
      r.setBigint(0, i);
      writer.write(r);
    }
    writer.close();
  }
};

public class Example {
  public static void main(String args[]) {

   // 初始化 MaxCompute 和 tunnel 的代码

   TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(projectName, tableName);
   UploadThread t1 = new UploadThread(up);
   UploadThread t2 = new UploadThread(up);

   t1.start();
   t2.start();
   t1.join();
   t2.join();

   uploadSession.commit();
 }

更多控制

重试策略

由于底层在上传出错时会回避一段固定的时间并进行重试,但如果你的程序不想花太多时间在重试上,或者你的程序位于一个极其恶劣的网络环境中,为此 TunnelBufferedWriter 允许用户配置重试策略。

用户可以选择三种重试回避策略:指数回避(EXPONENTIAL_BACKOFF)、线性时间回避(LINEAR_BACKOFF)、常数时间回避(CONSTANT_BACKOFF)。

例如下面这段代码可以将,write 的重试次数调整为 6,每一次重试之前先分别回避 4s、8s、16s、32s、64s 和 128s(从 4 开始的指数递增的序列)。

RetryStrategy retry 
  = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)

writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
writer.setRetryStrategy(retry);

缓冲区控制

如果你的程序对 JVM 的内存有严格的要求,可以通过下面这个接口修改缓冲区占内存的字节数(bytes):

writer.setBufferSize(1024*1024);

默认配置每一个 Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上传一个 block 的数据[3]。

多个进程共享 Session

由于一个 Session 的上传状态是通过维护一个 block list 实现的,对于多线程程序来讲,通过锁很容易实现资源的分配。但对于两个进程空间里的程序想要复用一个 Session 时,必须通过一种机制对资源进行隔离。

具体地,在 getUploadSession 的时候,必须指定这个共享这个 Session 的进程数目,以及一个用来区分进程的 global id:

//程序1:这个 session 将被两个 writer 共享,我是其中第 0 个
TableTunnel.UploadSession up 
  = tunnel.getUploadSession(projectName, tableName, sid, 2, 0); 
writer = session.openBufferedWriter();

//程序1:这个 session 将被两个 writer 共享,我是其中第 1 个
TableTunnel.UploadSession up 
  = tunnel.getUploadSession(projectName, tableName, sid, 2, 1); 
writer = session.openBufferedWriter();

Notes

[1] 一次完整的上传流程通常包括以下步骤:

先对数据进行划分
为每个数据块指定 block id,即调用 openRecordWriter(id)
然后用一个或多个线程分别将这些 block 上传上去
并在某个 block 上传失败以后,需要对整个 block 进行重传
在所有 block 都上传以后,向服务端提供上传成功的 blockid list 进行校验,即调用 session.commit([1,2,3,...])
[2] 因为使用长连接,服务端有计时器判断是否客户端是否 alive

[3] block 在服务端有 20000 个的数量上限,如果 BufferSize 设得太小会导致 20000 个 block 很快被用光

[4] Session的有效期为24小时,超过24小时会导致数据上传失败

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
8天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之如何使用SDK获取ODPS上的资源文件
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
10天前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之使用ODPS Tunnel Upload功能时,遇到报错:Java 堆内存不足,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
8天前
|
分布式计算 大数据 Go
MaxCompute操作报错合集之使用go sdk调用GetTunnelEndpoint出现报错:InvalidAction.NotFoundSpecified api is not found,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8天前
|
分布式计算 Java 调度
MaxCompute产品使用合集之使用Tunnel Java SDK上传BINARY数据类型时,应该使用什么作为数据类字节
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
分布式计算 大数据 Java
MaxCompute产品使用合集之如何通过Java SDK下载
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之当使用Tunnel API进行数据操作时,MaxCompute会根据什么进行相应的处理
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之在本地用tunnel命令上传excel表格到mc遇到报错: tunnel upload C:\Users***\Desktop\a.xlsx mc里的非分区表名 -s false;该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
9天前
|
分布式计算 DataWorks NoSQL
DataWorks产品使用合集之同步Holo数据到ODPS的过程中,出现部分数据的值变为星号(),是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之同步Holo数据到ODPS的过程中,出现部分数据的值变为星号(),是什么原因
|
1天前
|
存储 分布式计算 监控
日志数据投递到MaxCompute最佳实践
日志服务采集到日志后,有时需要将日志投递至MaxCompute的表中进行存储与分析。本文主要向用户介绍将数据投递到MaxCompute完整流程,方便用户快速实现数据投递至MaxCompute。
45 2
|
8天前
|
SQL 机器学习/深度学习 分布式计算
MaxCompute产品使用合集之数据删除之后,是否支持回滚
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute