如何使用Tunnel SDK上传/下载MaxCompute复杂类型数据

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 基于Tunnel SDK如何上传复杂类型数据到MaxCompute?首先介绍一下MaxCompute复杂数据类型:复杂数据类型MaxCompute采用基于ODPS2.0的SQL引擎,丰富了对复杂数据类型类型的支持。

基于Tunnel SDK如何上传复杂类型数据到MaxCompute?首先介绍一下MaxCompute复杂数据类型:

复杂数据类型

MaxCompute采用基于ODPS2.0的SQL引擎,丰富了对复杂数据类型类型的支持。MaxCompute支持ARRAY, MAP, STRUCT类型,并且可以任意嵌套使用并提供了配套的内建函数。

类型 定义方法 构造方法
ARRAY array;array> array(1, 2, 3); array(array(1, 2); array(3, 4))
MAP map;map> map(“k1”, “v1”, “k2”, “v2”);map(1S, array(‘a’, ‘b’), 2S, array(‘x’, ‘y))
STRUCT struct;struct< field1:bigint, field2:array, field3:map> named_struct(‘x’, 1, ‘y’, 2);named_struct(‘field1’, 100L, ‘field2’, array(1, 2), ‘field3’, map(1, 100, 2, 200)

复杂类型构造与操作函数

返回类型 签名 注释
MAP map(K key1, V value1, K key2, V value2, ...) 使用给定key/value对建立map, 所有key类型一致,必须是基本类型,所有value类型一致,可为任意类型
ARRAY map_keys(Map m) 将参数中的map的所有key作为数组返回,输入NULL,返回NULL
ARRAY map_values(MAP m) 将参数中的map的所有value作为数组返回,输入NULL,返回NULL
int size(MAP) 取得给定MAP元素数目
TABLE explode(MAP) 表生成函数,将给定MAP展开,每个key/value一行,每行两列分别对应key和value
ARRAY array(T value1, T value2, ...) 使用给定value构造ARRAY,所有value类型一致
int size(ARRAY) 取得给定ARRAY元素数目
boolean array_contains(ARRAY a, value v) 检测给定ARRAY a中是否包含v
ARRAY sort_array(ARRAY) 对给定数组排序
ARRAY collect_list(T col) 聚合函数,在给定group内,将col指定的表达式聚合为一个数组
ARRAY collect_set(T col) 聚合函数,在给定group内,将col指定的表达式聚合为一个无重复元素的集合数组
TABLE explode(ARRAY) 表生成函数,将给定ARRAY展开,每个value一行,每行一列对应相应数组元素
TABLE (int, T) posexplode(ARRAY) 表生成函数,将给定ARRAY展开,每个value一行,每行两列分别对应数组从0开始的下标和数组元素
STRUCT struct(T1 value1, T2 value2, ...) 使用给定value列表建立struct, 各value可为任意类型,生成struct的field的名称依次为col1, col2, ...
STRUCT named_struct(name1, value1, name2, value2, ...) 使用给定name/value列表建立struct, 各value可为任意类型,生成struct的field的名称依次为name1, name2, ...
TABLE (f1 T1, f2 T2, ...) inline(ARRAY>) 表生成函数,将给定struct数组展开,每个元素对应一行,每行每个struct元素对应一列

Tunnel SDK 介绍

Tunnel 是 ODPS 的数据通道,用户可以通过 Tunnel 向 ODPS 中上传或者下载数据。
TableTunnel 是访问 ODPS Tunnel 服务的入口类,仅支持表数据(非视图)的上传和下载。

对一张表或 partition 上传下载的过程,称为一个session。session 由一或多个到 Tunnel RESTful API 的 HTTP Request 组成。
session 用 session ID 来标识,session 的超时时间是24小时,如果大批量数据传输导致超过24小时,需要自行拆分成多个 session。
数据的上传和下载分别由 TableTunnel.UploadSession 和 TableTunnel.DownloadSession 这两个会话来负责。
TableTunnel 提供创建 UploadSession 对象和 DownloadSession 对象的方法.

  • 典型表数据上传流程: 
    1) 创建 TableTunnel
    2) 创建 UploadSession
    3) 创建 RecordWriter,写入 Record
    4)提交上传操作

  • 典型表数据下载流程:
    1) 创建 TableTunnel
    2) 创建 DownloadSession
    3) 创建 RecordReader,读取 Record

基于Tunnel SDK构造复杂类型数据

代码示例:

            RecordWriter recordWriter = uploadSession.openRecordWriter(0);
      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();

      // prepare data
      List arrayData = Arrays.asList(1, 2, 3);
      Map<String, Long> mapData = new HashMap<String, Long>();
      mapData.put("a", 1L);
      mapData.put("c", 2L);

      List<Object> structData = new ArrayList<Object>();
      structData.add("Lily");
      structData.add(18);

      // set data to record
      record.setArray(0, arrayData);
      record.setMap(1, mapData);
      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),
                                           structData));

      // write the record
      recordWriter.write(record);

从MaxCompute下载复杂类型数据

代码示例:

            RecordReader recordReader = downloadSession.openRecordReader(0, 1);

      // read the record
      ArrayRecord record1 = (ArrayRecord)recordReader.read();

      // get array field data
      List field0 = record1.getArray(0);
      List<Long> longField0 = record1.getArray(Long.class, 0);

      // get map field data
      Map field1 = record1.getMap(1);
      Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1);

      // get struct field data
      Struct field2 = record1.getStruct(2);

运行实例

完整代码如下:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.data.SimpleStruct;
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.StructTypeInfo;

public class TunnelComplexTypeSample {

  private static String accessId = "<your access id>";
  private static String accessKey = "<your access Key>";
  private static String odpsUrl = "<your odps endpoint>";
  private static String project = "<your project>";

  private static String table = "<your table name>";

  // partitions of a partitioned table, eg: "pt=\'1\',ds=\'2\'"
  // if the table is not a partitioned table, do not need it
  private static String partition = "<your partition spec>";

  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsUrl);
    odps.setDefaultProject(project);

    try {
      TableTunnel tunnel = new TableTunnel(odps);
      PartitionSpec partitionSpec = new PartitionSpec(partition);

      // ---------- Upload Data ---------------
      // create upload session for table
      // the table schema is {"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>}
      UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);
      // get table schema
      TableSchema schema = uploadSession.getSchema();

      // open record writer
      RecordWriter recordWriter = uploadSession.openRecordWriter(0);
      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();

      // prepare data
      List arrayData = Arrays.asList(1, 2, 3);
      Map<String, Long> mapData = new HashMap<String, Long>();
      mapData.put("a", 1L);
      mapData.put("c", 2L);

      List<Object> structData = new ArrayList<Object>();
      structData.add("Lily");
      structData.add(18);

      // set data to record
      record.setArray(0, arrayData);
      record.setMap(1, mapData);
      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),
                                           structData));

      // write the record
      recordWriter.write(record);

      // close writer
      recordWriter.close();

      // commit uploadSession, the upload finish
      uploadSession.commit(new Long[]{0L});
      System.out.println("upload success!");

      // ---------- Download Data ---------------
      // create download session for table
      // the table schema is {"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>}
      DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
      schema = downloadSession.getSchema();

      // open record reader, read one record here for example
      RecordReader recordReader = downloadSession.openRecordReader(0, 1);

      // read the record
      ArrayRecord record1 = (ArrayRecord)recordReader.read();

      // get array field data
      List field0 = record1.getArray(0);
      List<Long> longField0 = record1.getArray(Long.class, 0);

      // get map field data
      Map field1 = record1.getMap(1);
      Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1);

      // get struct field data
      Struct field2 = record1.getStruct(2);

      System.out.println("download success!");
    } catch (TunnelException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }

  }
}

image

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
51 0
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
67 0
|
3月前
|
SQL 存储 分布式计算
"SQLTask携手Tunnel:打造高效海量数据导出解决方案,轻松应对大数据挑战
【8月更文挑战第22天】SQLTask搭配Tunnel实现高效海量数据导出。SQLTask擅长执行复杂查询,但直接导出受限(约1万条)。Tunnel专注数据传输,无大小限制。二者结合,先用SQLTask获取数据,再通过Tunnel高效导出至目标位置(如CSV、OSS等),适用于大数据场景,需配置节点及连接,示例代码展示全过程,满足企业级数据处理需求。
78 2
|
4月前
|
分布式计算 运维 DataWorks
MaxCompute操作报错合集之用户已在DataWorks项目中,并有项目的开发和运维权限,下载数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之如何使用SDK获取ODPS上的资源文件
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
分布式计算 大数据 Go
MaxCompute操作报错合集之使用go sdk调用GetTunnelEndpoint出现报错:InvalidAction.NotFoundSpecified api is not found,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
分布式计算 Java 调度
MaxCompute产品使用合集之使用Tunnel Java SDK上传BINARY数据类型时,应该使用什么作为数据类字节
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
3天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
41 7
|
3天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
13 2

相关产品

  • 云原生大数据计算服务 MaxCompute