离线批量数据通道Tunnel的最佳实践及常见问题

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 基本介绍及应用场景 Tunnel是Odps提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载,仅提供每次批量大于等于64MB数据的场景,小批量流式数据场景请使用DataHub实时数据通道以获得更好的性能和体验。

基本介绍及应用场景

Tunnel是MaxCompute提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载,
仅提供每次批量大于等于64MB数据的场景,小批量流式数据场景请使用DataHub实时数据通道以获得更好的性能和体验。

SDK上传最佳实践

import java.io.IOException;
import java.util.Date;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;

public class UploadSample {
 private static String accessId = "<your access id>";
 private static String accessKey = "<your access Key>";
 private static String odpsUrl = "http://service.odps.aliyun.com/api";

 private static String project = "<your project>";
 private static String table = "<your table name>";
 private static String partition = "<your partition spec>";

 public static void main(String args[]) {
   // 准备工作,仅需做一次
   Account account = new AliyunAccount(accessId, accessKey);
   Odps odps = new Odps(account);
   odps.setEndpoint(odpsUrl);
   odps.setDefaultProject(project);
   TableTunnel tunnel = new TableTunnel(odps);

   try {
     // 确定写入分区
     PartitionSpec partitionSpec = new PartitionSpec(partition);
     // 在服务端创建一个在本表本分区上有效期24小时的session,24小时内该session可以共计上传20000个Block数据
     // 创建Session的时耗为秒级,会在服务端使用部分资源、创建临时目录等,操作较重,因此强烈建议同一个分区数据尽可能复用Session上传。
     UploadSession uploadSession = tunnel.createUploadSession(project,
         table, partitionSpec);
     System.out.println("Session Status is : "
         + uploadSession.getStatus().toString());
     TableSchema schema = uploadSession.getSchema();
     // 准备数据后打开Writer开始写入数据,准备数据后写入一个Block,每个Block仅能成功上传一次,不可重复上传,CloseWriter成功代表该Block上传完成,失败可以重新上传该Block,同一个Session下最多允许20000个BlockId,即0-19999,若超出请CommitSession并且再创建一个新Session使用,以此类推。
     // 单个Block内写入数据过少会产生大量小文件 严重影响计算性能, 强烈建议每次写入64MB以上数据(100GB以内数据均可写入同一Block)
     // 可通过数据的平均大小与记录数量大致计算总量即 64MB < 平均记录大小*记录数 < 100GB

     // maxBlockID服务端限制为20000,用户可以根据自己业务需求,每个Session使用一定数量的block例如100个,但是建议每个Session内使用block越多越好,因为创建Session是一个很重的操作
     // 如果创建一个Session后仅仅上传少量数据,不仅会造成小文件、空目录等问题,还会严重影响上传整体性能(创建Session花费秒级,真正上传可能仅仅用了十几毫秒)
     int maxBlockID = 20000;
     for (int blockId = 0; blockId < maxBlockID; blockId++) {
       // 准备好至少64MB以上数据,准备完成后方可写入
       // 例如:读取若干文件或者从数据库中读取数据
       try {
         // 在该Block上创建一个Writer,writer创建后任意一段时间内,若某连续2分钟没有写入4KB以上数据,则会超时断开连接
         // 因此建议在创建writer前在内存中准备可以直接进行写入的数据
         RecordWriter recordWriter = uploadSession.openRecordWriter(blockId);

         // 将读取到的所有数据转换为Tunnel Record格式并切入
         int recordNumber = 1000000;
         for (int index = 0; i < recordNumber; i++) {
           // 将第index条原始数据转化为odps record
           Record record = uploadSession.newRecord();
           for (int i = 0; i < schema.getColumns().size(); i++) {
             Column column = schema.getColumn(i);
             switch (column.getType()) {
               case BIGINT:
                 record.setBigint(i, 1L);
                 break;
               case BOOLEAN:
                 record.setBoolean(i, true);
                 break;
               case DATETIME:
                 record.setDatetime(i, new Date());
                 break;
               case DOUBLE:
                 record.setDouble(i, 0.0);
                 break;
               case STRING:
                 record.setString(i, "sample");
                 break;
               default:
                 throw new RuntimeException("Unknown column type: "
                     + column.getType());
             }
           }
           // Write本条数据至服务端,每写入4KB数据会进行一次网络传输
           // 若120s没有网络传输服务端将会关闭连接,届时该Writer将不可用,需要重新写入
           recordWriter.write(record);
         }
         // close成功即代表该block上传成功,但是在整个Session Commit前,这些数据是在odps 临时目录中不可见的
         recordWriter.close();
       } catch (TunnelException e) {
         // 建议重试一定次数
         e.printStackTrace();
         System.out.println("write failed:" + e.getMessage());
       } catch (IOException e) {
         // 建议重试一定次数
         e.printStackTrace();
         System.out.println("write failed:" + e.getMessage());
       }
     }
     // 提交所有Block,uploadSession.getBlockList()可以自行指定需要提交的Block,Commit成功后数据才会正式写入Odps分区,Commit失败建议重试10次
     for (int retry = 0; retry < 10; ++retry) {
       try {
         // 秒级操作,正式提交数据
         uploadSession.commit(uploadSession.getBlockList());
         break;
       } catch (TunnelException e) {
         System.out.println("uploadSession commit failed:" + e.getMessage());
       } catch (IOException e) {
         System.out.println("uploadSession commit failed:" + e.getMessage());
       }
     }
     System.out.println("upload success!");

   } catch (TunnelException e) {
     e.printStackTrace();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
}

构造器举例说明:

PartitionSpec(String spec):通过字符串构造此类对象。

参数:

spec: 分区定义字符串,比如: pt='1',ds='2'。
因此程序中应该这样配置:private static String partition = "pt='XXX',ds='XXX'";

常见问题

MaxCompute Tunnel是什么?

Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。目前Tunnel仅支持表(不包括视图View)数据的上传下载。

BlockId是否可以重复?

同一个UploadSession里的blockId不能重复。也就是说,对于同一个UploadSession,用一个blockId打开RecordWriter,写入一批数据后,调用close,
然后再commit完成后,写入成功后不可以重新再用该blockId打开另一个RecordWriter写入数据。 Block默认最多20000个,即0-19999。

Block大小是否存在限制?

一个block大小上限 100GB,强烈建议大于64M的数据,每一个Block对应一个文件,小于64MB的文件统称为小文件,小文件过多将会影响使用性能。
使用新版BufferedWriter可以更简单的进行上传功能避免小文件等问题 Tunnel-SDK-BufferedWriter

Session是否可以共享使用,存在生命周期吗?

每个Session在服务端的生命周期为24小时,创建后24小时内均可使用,也可以跨进程/线程共享使用,但是必须保证同一个BlockId没有重复使用,分布式上传可以按照如下步骤:
创建Session->数据量估算->分配Block(例如线程1使用0-100,线程2使用100-200)->准备数据->上传数据->Commit所有写入成功的Block。

Session创建后不使用是否对系统有消耗?

每个Session在创建时会生成两个文件目录,如果大量创建而不使用,会导致临时目录增多,大量堆积时可能造成系统负担,请一定避免此类行为,尽量共享利用session。

遇到Write/Read超时或IOException怎么处理?

上传数据时,Writer每写入8KB数据会触发一次网络动作,如果120秒内没有网络动作,服务端将主动关闭连接,届时Writer将不可用,请重新打开一个新的Writer写入。

建议使用 [Tunnel-SDK-BufferedWriter]接口上传数据,该接口对用户屏蔽了blockId的细节,并且内部带有数据缓存区,会自动进行失败重试。

下载数据时,Reader也有类似机制,若长时间没有网络IO会被断开连接,建议Read过程连续进行中间不穿插其他系统的接口。

MaxCompute Tunnel目前有哪些语言的SDK?

MaxCompute Tunnel目前提供Java版的SDK。

MaxCompute Tunnel 是否支持多个客户端同时上传同一张表?

支持。

MaxCompute Tunnel适合批量上传还是流式上传

MaxCompute Tunnel用于批量上传,不适合流式上传,流式上传可以使用[DataHub高速流式数据通道],毫秒级延时写入。

MaxCompute Tunnel上传数据时一定要先存在分区吗?

是的,Tunnel不会自动创建分区。

Dship 与 MaxCompute Tunnel的关系?

dship是一个工具,通过MaxCompute Tunnel来上传下载。

Tunnel upload数据的行为是追加还是覆盖?

追加的模式。

Tunnel路由功能是怎么回事?

路由功能指的是Tunnel SDK通过设置MaxCompute获取Tunnel Endpoint的功能。因此,SDK可以只设置MaxCompute的endpoint来正常工作。

用MaxCompute Tunnel上传数据时,一个block的数据量大小多大比较合适

没有一个绝对最优的答案,要综合考虑网络情况,实时性要求,数据如何使用以及集群小文件等因素。一般,如果数量较大是持续上传的模式,可以在64M - 256M,
如果是每天传一次的批量模式,可以设大一些到1G左右

使用MaxCompute Tunnel 下载, 总是提示timeout

一般是endpoint错误,请检查Endpoint配置,简单的判断方法是通过telnet等方法检测网络连通性。

通过MaxCompute Tunnel下载,抛出You have NO privilege ‘odps:Select‘ on {acs:odps:*:projects/XXX/tables/XXX}. project ‘XXX‘ is protected的异常

该project开启了数据保护功能,用户操作这是从一个项目的数据导向另一个项目,这需要该project的owner操作。

Tunnel上传抛出异常ErrorCode=FlowExceeded, ErrorMessage=Your flow quota is exceeded.**

Tunnel对请求的并发进行了控制,默认上传和下载的并发Quota为2000,任何相关的请求发出到结束过程中均会占用一个Quota单位。若出现类似错误,有如下几种建议的解决方案:
1 sleep一下再重试;
2 将project的tunnel并发quota调大,需要联系管理员评估流量压力;
3 报告project owner调查谁占用了大量并发quota,控制一下。
image

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
6月前
|
Java 数据处理 调度
Dataphin常见问题之离线管道同步数据datax就报连接超时如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
3月前
|
DataWorks 关系型数据库 MySQL
DataWorks实时数据导入:如何实现源源不断的数据流?
【8月更文挑战第22天】在数据处理领域,高效实时传输至关重要。阿里云DataWorks提供全面的数据集成服务,支持多种数据导入方式,尤其实时导入功能因高效处理能力备受欢迎。通过创建数据源与数据集,并配置实时同步任务,可实现数据从MySQL等源到DataWorks数据仓库的快速准确流入。此流程不仅提升了数据处理效率,也确保了数据实时性和准确性,为企业决策提供强有力的支持。
47 1
|
3月前
|
DataWorks 监控 安全
DataWorks产品使用合集之怎么设置实时同步任务的速率和并发
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么进行批量离线同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
66 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
4月前
|
canal 监控 关系型数据库
实时计算 Flink版产品使用问题之如何在实例里配置监控哪些库,哪些表,包括黑名单,白名单
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
分布式计算 DataWorks Oracle
DataWorks产品使用合集之从集成那批量建作业同步数据的,如何批量修改
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
46 0
|
6月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之在进行数据同步作业时,有什么方法可以用于检查源端和目标端的数据一致性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之在DataWorks中,通过脚本模式来配置同步任务的读取端的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
57 0
|
6月前
|
DataWorks Java 数据库连接
DataWorks操作报错合集之dataworks 离线同步任务 出现 “实时生成的过期实例” 如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。