使用 Instance Tunnel 获取 Maxcompute Instance 的执行结果

简介: 本篇将介绍如何使用 Instance Tunnel 来获取 Maxcompute Instance 执行结果。

本篇将介绍如何使用 Instance Tunnel 来获取 Maxcompute Instance 执行结果。

源起

每天我们都会在 Maxcompute 平台上提交 select query,用于查询特定的数据。然而,熟悉平台的同学都知道,从平台获取 sql 查询结果是一个 Restful 请求,可能碰到以下两个问题:

  • 1 获取数据超时。 如果数据分布在多个存储小文件上,平台需要花费大量时间来收集和归并这些数据。然而在这个漫长的归并过程中,获取数据的 Restful 请求可能已经超时了。此时 Maxcompute Console 会有如下警告:Warning: ODPS request failed, requestID:xxxx, retryCount:1, will retry in xxx seconds.
  • 2 获取数据量受限。 由于一次 Restful 请求的返回数据有限,且一次性获取全量数据到本地时可能将内存撑爆等问题,Maxcompute SQL 的查询结果条数是受限的,具体的数值为 project 上的配置项 READ_TABLE_MAX_ROW (默认为 10000)。这就会出现明明我们的查询结果 2W 条,最后却在 Maxcompute Console 或者 Logview 上只看到 1W条 的诡异情况了。

针对上述问题,提出下列解法,其中解法 3 可解决上述两种问题,解法 1、2 仅适用于问题 1:

  • 1 合并存储小文件。 如果查询语句的数据源只有一张表,那么可以在 Maxcompute Console 执行alter table <source_table_name> merge smallfiles ; 命令将小文件合并之后,再重新执行 select 查询。
  • 2 创建临时表,合并临时表的存储小文件。 如果是一个多表计算的查询结果,可以通过 create table <tmp_table_name> as select ....命令创建临时表,再参照解法 a 合并临时表的存储小文件。
  • 3 创建临时表,使用 Tunnel 下载。 Tunnel 是 Maxcompute 批量数据通道。因此我们可以通过 create table <tmp_table_name> as select ....命令创建临时表,然后通过 Tunnel 命令或者 Tunnel SDK 来将临时表数据下载下来。

Instance Tunnel 概述

上面的方法虽然能解决问题,但总是有那么点"绕",有那么点"费"。

有没有更加直接的方法呢?
如果是之前,只能怂了,但是现在我们要大声的说有!

在最近的 Maxcompute 版本( >=S27 ) 中,我们开发了Instanc Tunnel功能。
Instance Tunnel 提供使用 Tunnel 来下载 SQL 查询结果的功能,不仅能摆脱上述两类问题,可直接获取查询结果;还丰富了 Maxcompute Tunnel 下载通道,不再局限于表数据。换句话说,以前我们可以用 Tunnel 来下载 Maxcompute 表数据,如今,我们也可以用 Tunnel 来下载 Maxcompute Instance 的数据。

Instance Tunnel 使用

Instance Tunnel 的使用方式与 Table Tunnel 基本一致,下面分别介绍使用客户端和 SDK 来下载 Instance 执行结果的方法。

1. 用 Maxcompute Console 来获取 Instance 数据

1.1 使用 Tunnel download 命令将特定 Instance 的执行结果下载到本地文件

命令:
tunnel download instance://<[project_name/]instance_id> <path>
参数:

project_name:  instance 所在的项目名称;
instance_id:  待下载数据的 instance id

举例:


// 执行一条 select 查询:
odps@ odps_test_project>select * from wc_in;
ID = 20170724071705393ge3csfb8
... ...

// 使用 Instance Tunnel Download 命令下载执行结果到本地文件
odps@ odps_test_project>tunnel download instance://20170724071705393ge3csfb8 result;
2017-07-24 15:18:47  -  new session: 2017072415184785b6516400090ca8    total lines: 8
2017-07-24 15:18:47  -  file [0]: [0, 8), result
downloading 8 records into 1 file
2017-07-24 15:18:47  -  file [0] start
2017-07-24 15:18:48  -  file [0] OK. total: 44 bytes
download OK

// 查看结果
   cat result
slkdfj
hellp
apple
tea
peach
apple
tea
teaa

1.2 通过配置参数使 SQL 查询默认采用 Instance Tunnel 方式输出执行结果

在 Maxcompute Console 中打开 use_instance_tunnel 选项之后,执行的 select query 就会默认使用 Instance tunnel 来下载结果了,再也不会出现文章开头所描述的两种问题了。打开该配置有两种方法:

  • 1) 如果已经下载最新的 Console,odps_config.ini 里面已经默认打开此选项,并默认将 instance_tunnel_max_record 设置成了10000 。 如下所示:
# download sql results by instance tunnel
use_instance_tunnel=true
# the max records when download sql results by instance tunnel
instance_tunnel_max_record=10000

其中 instance_tunnel_max_record 表示使用 Instance tunnel 下载 sql 查询结果的条数。若不设置,下载条数不受限。

  • 2) 使用 set console.sql.result.instancetunnel=true; 开启此功能。
// 打开 Instance tunnel 选项
odps@ odps_test_tunnel_project>set console.sql.result.instancetunnel=true;
OK

// 运行 select query
odps@ odps_test_tunnel_project>select * from wc_in;
ID = 20170724081946458g14csfb8
Log view:
http://logview/xxxxx.....
+------------+
| key        |
+------------+
| slkdfj     |
| hellp      |
| apple      |
| tea        |
| peach      |
| apple      |
| tea        |
| teaa       |
+------------+
A total of 8 records fetched by instance tunnel.

可以看到,如果使用 Instance tunnel 的方式来输出 select 查询结果,会在最后打印一条提示。比如上面例子中的提示告诉我们这个 instance 的执行结果一共有 8 条数据。同样也可以 set console.sql.result.instancetunnel=false;来关闭此功能。

2. 用 MaxCompute Instance Tunnel SDK 来获取 Instance 执行结果

MaxCompute Java SDK 和 Python SDK 都对 Instance tunnel 进行了支持,下面介绍用法。

2.1 使用 Java SDK 获取 Instance 执行结果

对于 Java SDK,从 0.27.2-public (JavaDoc )版本开始,我们提供两种方式来获取数据。

  • 使用 SQLTask.getResultSet() 静态方法获取:
   Odps odps = OdpsUtils.newDefaultOdps(); // 初始化 Odps 对象
    Instance i = SQLTask.run(odps, "select * from wc_in;");
    i.waitForSuccess();
    
    // 根据 instance 对象,获取结果迭代器
    ResultSet rs = SQLTask.getResultSet(i);
    for (Record r : rs) {
      // 输出结果条数
      System.out.println(rs.getRecordCount());

      for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
        // wc_in 表字段均为 STRING, 这里就直接打印输出
        System.out.println(r.get(col));
      }
    }
  • 创建 InstanceTunnel.DownloadSession 来获取:
   Odps odps = OdpsUtils.newDefaultOdps(); // 初始化 Odps 对象
    Instance i = SQLTask.run(odps, "select * from wc_in;");
    i.waitForSuccess();
    
    // 创建 InstanceTunnel
    InstanceTunnel tunnel = new InstanceTunnel(odps);
    // 根据 instance id,创建 DownloadSession
    InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());

    long count = session.getRecordCount();
     // 输出结果条数
    System.out.println(count);

    // 获取数据的写法与 TableTunnel 一样
    TunnelRecordReader reader = session.openRecordReader(0, count);
    Record record;
    while ((record = reader.read()) != null) {
      for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
        // wc_in 表字段均为 STRING, 这里就直接打印输出
        System.out.println(record.get(col));
      }
    }
    reader.close();  

2.2 使用 PyODPS 获取 Instance 执行结果

对于 PyODPS 来说,我们可以在 instance 上通过 open_reader 来获取数据,而从 0.7.7.1 的版本开始,我们可以通过 open_reader 使用 instance tunnel 来获取全量数据。

instance = o.execute_sql('select * from movielens_ratings limit 20000')
with instance.open_reader() as reader:
    print(reader.count)
    # for record in reader 就是遍历这2万条数据,这里通过切片只取10条
    for record in reader[:10]:  
        print(record)

Instance Tunnel 约束

虽然 Instance Tunnel 为我们提供了非常方便的获取 Instance 执行结果的方式,但是为了保护用户数据安全,此功能也受到了诸多的权限约束:

  • 若使用 Instance Tunnel 下载数据时,数据条数不超过 1W,则只要对该 Instance 有 Read 权限即可使用。 此行为与使用 Restful API 获取查询数据的行为一致;
  • 若使用 Instance Tunnel 下载数据时,数据条数超过 1W,则需要对 instance sql 查询语句中涉及到的所有源表进行权限检查,用户需要具有这些表的 Read 权限才可。

相关资料

Instance Tunnel 刚刚上线,欢迎讨论和吐槽 ^_^

  • Java SDK 和客户端使用,可联系作者
  • PyOdps 钉钉群:11701793
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之大数据计算MaxCompute即使用相同的SQL语句在DataWorks和Tunnel上执行,结果却不同,如何解决
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7天前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之在本地用tunnel命令上传excel表格到mc遇到报错: tunnel upload C:\Users***\Desktop\a.xlsx mc里的非分区表名 -s false;该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
9天前
|
分布式计算 DataWorks DataX
DataWorks产品使用合集之DataX的ODPSReader和Tunnel是两种不同的读取MC(原名ODPS)数据的方式吗
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
12 3
|
SQL 存储 分布式计算
MaxCompute Tunnel 技术原理及开发实战|学习笔记
快速学习 MaxCompute Tunnel 技术原理及开发实战
356 0
MaxCompute Tunnel 技术原理及开发实战|学习笔记
|
分布式计算 Java 开发工具
【MaxCompute 常见问题】Tunnel SDK
使用 Tunnel SDK 上传数据时,上传的数据可以自动分配到各个分区吗? Tunnel SDK 上传数据时,是无法自动分配到各个分区的。每一次上传只支持将数据上传到一张表或表的一个分区,有分区的表一定要指定上传的分区,多级分区一定要指定到末级分区。
【MaxCompute 常见问题】Tunnel SDK
|
编解码 分布式计算 Java
Maxcompute tunnel 上传典型问题 | 学习笔记
快速学习 Maxcompute tunnel 上传典型问题
789 0
|
存储 SQL 分布式计算
MaxCompute Tunnel 日常工作使用讲解|阿里云产品内容精选(四十六)
MaxCompute是面向分析的企业级SaaS模式云数据仓库,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
|
SQL 存储 分布式计算
MaxCompute Tunnel 技术原理及开发实战
MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案, 致力于批量结构化数据的存储和计算,为用户提供数据仓库的解决方案及分析建模服务。Tunnel是MaxCompute提供的数据传输服务,提供高并发的离线数据上传下载服务,适合于全量数据或历史数据的批量导入, 并且在MaxCompute的客户端工具中,提供对应的命令实现本地文件与服务数据的互通。
7020 0
|
分布式计算 DataWorks MaxCompute
使用日志审计查看MaxCompute执行过哪些操作
MaxCompute完整地记录用户的各项操作行为,会自动将操作日志实时投递到ActionTrail中,ActionTrail针对作业(Instance)、表(Table)、函数(Function)、资源(Resource)、用户(User)、角色(Role)和授权(Privilege)等事件的多种操作行为进行记录。
2917 0
使用日志审计查看MaxCompute执行过哪些操作
|
分布式计算 大数据 Java
如何使用Tunnel SDK上传/下载MaxCompute复杂类型数据
基于Tunnel SDK如何上传复杂类型数据到MaxCompute?首先介绍一下MaxCompute复杂数据类型: 复杂数据类型 MaxCompute采用基于ODPS2.0的SQL引擎,丰富了对复杂数据类型类型的支持。
9317 0