Tablestore+Delta Lake(快速开始)

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文介绍如何在E-MapReduce中通过Tablestore Spark Streaming Source将TableStore中的数据实时导入到Delta Lake中。

作者:王卓然 花名琸然 阿里云存储服务技术专家


背景介绍

近些年来HTAP(Hybrid transaction/analytical processing)的热度越来越高,通过将存储和计算组合起来,既能支持传统的海量结构化数据分析,又能支持快速的事务更新写入,是设计数据密集型系统的一个成熟的架构。
表格存储(Tablestore)是阿里云自研的 NoSQL 多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务(PB 级存储、千万 TPS 以及毫秒级延迟),借助于表格存储的底层引擎,能够很好的完成OLTP场景下的需求。Delta Lake类似于支持Delta的Data Lake(数据湖),使用列存来存base数据,行的格式存储新增delta数据,进而做到支持数据操作的ACID和CRUD,完全兼容Spark的大数据生态,通过结合Delta Lake和Spark生态,能够很好的完成OLAP场景下的需求。下图展示的是Tablestore和Delta Lake结合的HATP场景的一个简要的逻辑结构图,有关结构化大数据分析平台设计的更多细节和干货,可以参阅文章 结构化大数据分析平台设计

image.png

准备工作

  • 登录阿里云E-MapReduce控制台
  • 创建Hadoop集群(若已创建,请跳过)
  • 确保将Tablestore实例部署在E-MapReduce集群
    相同的VPC环境下

步骤一 创建Tablestore源表


详细开通步骤请参考官方文档,本文demo中所创建出来的表名为Source,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
image.png

为表Source建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。
image.png

技术注解:

通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:

全量:对数据表中历史存量数据消费处理

增量:对数据表中新增数据消费处理
全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费
通道服务的详细介绍可查询Tablestore官网文档

步骤二 获取相关jar包并上传到hadoop集群


  • 获取环境依赖的JAR包。

Jar包 | 获取方法 |
------- | ------- |
emr-tablestore-X.X.X.jarX.X.X: Since 1.9.0+ |Maven 库中下载:https://mvnrepository.com/artifact/com.aliyun.emr/emr-tablestore
tablestore-X.X.X-jar-with-dependencies.jar |下载 EMR SDK 相关的Tablestore依赖包。https://repo1.maven.org/maven2/com/aliyun/openservices/tablestore/5.3.0/tablestore-5.3.0-jar-with-dependencies.jar


  • 集群管理页面,单击已创建的Hadoop集群的集群ID ,进入集群与服务管理页面
  • 在左侧导航树中选择主机列表,然后在右侧查看Hadoop集群中emr-header-1主机的IP信息。
  • 在SSH客户端中新建一个命令窗口,登录Hadoop集群的emr-header-1主机。
  • 上传所有JAR包到emr-header-1节点的某个目录下。

步骤三 运行Spark Streaming作业


  1. 以一个基于emr demo修改的代码为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),完整的代码由于改动较大,不在本文中一一说明,后续会合到emr demo官方项目中。
  2. 该样例以Tablestore表作为数据源,通过结合Tablestore CDC技术,Tablestore Streaming Source和Delta Sink,演示的是TableStore到Delta Lake的一个完整链路。
  3. 按以下命令,启动spark streaming作业,开启一个实时同步Tablestore Source表中数据到Delta Lake Table的监听程序。

各个参数说明如下:

参数 参数说明
com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC 所要运行的主程序类
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
tablestore-X.X.X-jar-with-dependencies.jar EMR SDK 相关的Tablestore依赖包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程序类)
instance Tablestore实例名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘钥
endPoint Tablestore实例的endPoint
maxOffsetsPerChannel Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
catalog 同步的列名,详见Catalog字段说明

步骤四 数据CRUD示例


  1. 首先在TableStore里插入两行,本次示例中,我们建了8列的同步列,包括两个主键(PkString, PkInt)和六个属性列(col1, col2, col3, timestamp, col5和col6)。由于表格存储是Free-Schema的结构,我们可以任意的插入属性列,TableStore的Spark Source会自动的做属性列的筛选。如下面两张图所示,在插入两行数据后,Delta Table中同步也可以马上读取到两行,且数据一致。
    image.png

image.png

  1. 接着,在Tablestore中进行一些更新行和插入行的操作,如下面的两个图所示,等待一小段micro-batch的数据同步后,表格存储中的数据同步变化能够即时的更新到Delta Table中。
    image.png

image.png

  1. 将Tablestore中的数据全部清空,如下面两图所示,Delta Table也同步的变成了空。
    image.png

image.png

  1. 在集群上,Delta Table默认存放在HDFS中,如下图所示,_delta_log目录中存放的json文件是Transaction log,parquet格式的文件是底层的数据文件。
    image.png

相关文章推荐:5分钟迅速搭建云上Lambda大数据分析架构


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
相关文章
|
6月前
|
设计模式 测试技术 持续交付
软件质量与维护
【8月更文第22天】在软件开发周期中,软件的质量和维护是非常重要的两个方面。软件质量直接影响着用户体验和系统的可靠性,而软件维护则是确保软件长期稳定运行的关键。本文将详细介绍软件质量的概念、软件质量模型、质量保证与质量控制的过程,以及软件维护的类型和过程,并探讨如何提高软件的可维护性。
548 0
|
存储 NoSQL 网络协议
【最佳实践】Elastic stack 实时分析下你的Redis slowlog
Redis 是目前最流行的 NoSQL 内存数据库,然而如果在使用过程中出现滥用、乱用的情况,很容易发生性能问题,此时我们就要去关注慢查询日志,本文尝试给大家介绍一种通过 elastic stack 来快速分析 Redis 慢查询日志的方法,希望能给大家提供帮助。
1044 0
【最佳实践】Elastic stack 实时分析下你的Redis slowlog
|
9月前
|
前端开发 NoSQL Java
毕业设计|springboot+h5的购物商城系统(一)
毕业设计|springboot+h5的购物商城系统
190 2
|
9月前
|
Python
BackTrader 中文文档(六)(3)
BackTrader 中文文档(六)
108 0
|
3月前
|
分布式计算 DataWorks 数据处理
"DataWorks高级技巧揭秘:手把手教你如何在PyODPS节点中将模型一键写入OSS,实现数据处理的完美闭环!"
【10月更文挑战第23天】DataWorks是企业级的云数据开发管理平台,支持强大的数据处理和分析功能。通过PyODPS节点,用户可以编写Python代码执行ODPS任务。本文介绍了如何在DataWorks中训练模型并将其保存到OSS的详细步骤和示例代码,包括初始化ODPS和OSS服务、读取数据、训练模型、保存模型到OSS等关键步骤。
232 3
|
6月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何将STRING类型转换为DATETIME类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
123 1
|
9月前
|
存储 Java Linux
Maven 仓库
Maven仓库是管理项目依赖的存储位置,分为本地、中央和远程三种类型。本地仓库在首次执行Maven命令时创建,默认位于用户目录下的`.m2/repository/`。如果本地缺少依赖,Maven会从远程仓库下载至本地。中央仓库由Maven社区维护,包含大量开源Java构件,是默认的网络资源,可通过http://search.maven.org/#browse进行浏览搜索。远程仓库则用于存放非标准或特定组织的构件。可以通过settings.xml配置本地仓库路径。
|
9月前
|
分布式计算 DataWorks API
DataWorks产品使用合集之在DataWorks中,通过spark访问外网的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
240 0
|
9月前
|
机器学习/深度学习 PyTorch 算法框架/工具
PyTorch 2.2 中文官方教程(十五)(3)
PyTorch 2.2 中文官方教程(十五)
226 2
PyTorch 2.2 中文官方教程(十五)(3)
|
9月前
|
C#
C#调试与测试 | Assert(断言)
什么是Assert呢? 断言是一种用于在程序运行时检查条件是否满足的工具。如果条件不满足,断言就会抛出一个异常,从而帮助我们快速定位问题并进行调试。 在C#中,可以使用Debug.Assert方法来实现断言,该方法接受一个布尔表达式作为参数,如果该表达式的值为false,就会抛出一个AssertionFailedException异常。
354 0