Tablestore+Delta Lake(快速开始)

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本文介绍如何在E-MapReduce中通过Tablestore Spark Streaming Source将TableStore中的数据实时导入到Delta Lake中。 背景介绍 近些年来HTAP(Hybrid transaction/analytical processing)的热度越来越高,通过将存储和计算组合起来,既能支持传统的海量结构化数据分析,又能支持快速的事务更新写入,是设计数据密集型系统的一个成熟的架构。

本文介绍如何在E-MapReduce中通过Tablestore Spark Streaming Source将TableStore中的数据实时导入到Delta Lake中。

背景介绍

近些年来HTAP(Hybrid transaction/analytical processing)的热度越来越高,通过将存储和计算组合起来,既能支持传统的海量结构化数据分析,又能支持快速的事务更新写入,是设计数据密集型系统的一个成熟的架构。
表格存储(Tablestore)是阿里云自研的 NoSQL 多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务(PB 级存储、千万 TPS 以及毫秒级延迟),借助于表格存储的底层引擎,能够很好的完成OLTP场景下的需求。Delta Lake类似于支持Delta的Data Lake(数据湖),使用列存来存base数据,行的格式存储新增delta数据,进而做到支持数据操作的ACID和CRUD,完全兼容Spark的大数据生态,通过结合Delta Lake和Spark生态,能够很好的完成OLAP场景下的需求。下图展示的是Tablestore和Delta Lake结合的HATP场景的一个简要的逻辑结构图,有关结构化大数据分析平台设计的更多细节和干货,可以参阅文章 结构化大数据分析平台设计

1

准备工作

步骤一 创建Tablestore源表

详细开通步骤请参考官方文档,本文demo中所创建出来的表名为Source,表的Schema如下图所示,该表有PKString和PkInt两个主键,类型分别为String和Interger。
2

为表Source建立一个增量通道,如下图所示,通道列表里面会显示该通道的名字、ID以及类型。
3

技术注解:
通道服务(Tunnel Service)是基于Tablestore数据接口之上的全增量一体化服务,包含三种通道类型:

  • 全量:对数据表中历史存量数据消费处理
  • 增量:对数据表中新增数据消费处理
  • 全量加增量:先对数据表总历史存量数据消费,之后对新增数据消费

通道服务的详细介绍可查询Tablestore官网文档

步骤二 获取相关jar包并上传到hadoop集群

步骤三 运行Spark Streaming作业

  1. 以一个基于emr demo修改的代码为样例,编译生成JAR包,JAR包需要上传到Hadoop集群的emr-header-1主机中(参见步骤二),完整的代码由于改动较大,不在本文中一一说明,后续会合到emr demo官方项目中。
  2. 该样例以Tablestore表作为数据源,通过结合Tablestore CDC技术,Tablestore Streaming Source和Delta Sink,演示的是TableStore到Delta Lake的一个完整链路。
  3. 按以下命令,启动spark streaming作业,开启一个实时同步Tablestore Source表中数据到Delta Lake Table的监听程序。
    spark-submit --class com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC  --jars emr-tablestore-X.X.X-SNAPSHOT.jar,tablestore-X.X.X-jar-with-dependencies.jar examples-X.X.X-shaded.jar <instance> <tableName> <tunnelId> <accessKeyId> <accessKeySecret> <endPoint> <maxOffsetsPerChannel>
    

各个参数说明如下:

参数 参数说明
com.aliyun.emr.example.spark.sql.streaming.DeltaTableStoreCDC 所要运行的主程序类
emr-tablestore-X.X.X-SNAPSHOT.jar 包含Tablestore source的jar包
tablestore-X.X.X-jar-with-dependencies.jar EMR SDK 相关的Tablestore依赖包
examples-X.X.X-shaded.jar 基于EMR demo修改的包(包含主程序类)
instance Tablestore实例名
tableName Tablestore表名
tunnelId Tablestore表的通道Id
accessKeyId Tablestore的accessKeyId
accessKeySecret Tablestore的秘钥
endPoint Tablestore实例的endPoint
maxOffsetsPerChannel Tablestore通道 Channel在每个Spark Batch周期内同步的最大数据条数,默认10000。
catalog 同步的列名,详见Catalog字段说明

步骤四 数据CRUD示例

  1. 首先在TableStore里插入两行,本次示例中,我们建了8列的同步列,包括两个主键(PkString, PkInt)和六个属性列(col1, col2, col3, timestamp, col5和col6)。由于表格存储是Free-Schema的结构,我们可以任意的插入属性列,TableStore的Spark Source会自动的做属性列的筛选。如下面两张图所示,在插入两行数据后,Delta Table中同步也可以马上读取到两行,且数据一致。
    4
    5

  2. 接着,在Tablestore中进行一些更新行和插入行的操作,如下面的两个图所示,等待一小段micro-batch的数据同步后,表格存储中的数据同步变化能够即时的更新到Delta Table中。
    6
    7

  3. 将Tablestore中的数据全部清空,如下面两图所示,Delta Table也同步的变成了空。
    8
    9

  4. 在集群上,Delta Table默认存放在HDFS中,如下图所示,_delta_log目录中存放的json文件是Transaction log,parquet格式的文件是底层的数据文件。
    10

写在最后

本文介绍了如何在E-MapReduce中通过Tablestore Spark Streaming Source将TableStore中的数据实时导入到Delta Lake中,如果对基于Tablestore的大数据存储分析感兴趣的朋友可以加入我们的技术交流群(钉钉:23307953 或者11789671),来与我们一起探讨。
image

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
7月前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
103 0
|
存储 SQL JSON
Delta Lake、Hudi与Iceberg详解
Delta Lake、Hudi与Iceberg详解
1022 0
Delta Lake、Hudi与Iceberg详解
|
流计算
Delta Lake中CDC的实现
Delta Lake中CDC的实现
154 0
|
SQL 存储 分布式计算
数据湖揭秘—Delta Lake
Delta Lake 是 DataBricks 公司开源的、用于构建湖仓架构的存储框架。能够支持 Spark,Flink,Hive,PrestoDB,Trino 等查询/计算引擎。作为一个开放格式的存储层,它在提供了批流一体的同时,为湖仓架构提供可靠的,安全的,高性能的保证。
4090 7
数据湖揭秘—Delta Lake
|
存储 分布式计算 DataWorks
基于Delta lake、Hudi格式的湖仓一体方案
Delta Lake 和 Hudi 是流行的开放格式的存储层,为数据湖同时提供流式和批处理的操作,这允许我们在数据湖上直接运行 BI 等应用,让数据分析师可以即时查询新的实时数据,从而对您的业务产生即时的洞察。MaxCompute 在湖仓一体架构中,通过支持 Delta Lake 和 Hudi 在数据湖中提供数据仓库性能。
1455 2
基于Delta lake、Hudi格式的湖仓一体方案
|
SQL 分布式计算 搜索推荐
《 Delta Lake 数据湖专题系列5讲》文章回顾
《Delta Lake 数据湖专题系列5讲》由阿里云 DDI 团队翻译整理自大数据技术公司 Databricks 针对数据湖 Delta Lake 系列技术文章。阅读完此系列文章可以帮助您达到入门级,对数据湖 Lakehouse 有整体上的认识和应用,掌握理论知识体系。
《 Delta Lake 数据湖专题系列5讲》文章回顾
|
SQL 存储 分布式计算
Data Lake 三剑客——Delta、Hudi、Iceberg 对比分析
定性上讲,三者均为 Data Lake 的数据存储中间层,其数据管理的功能均是基于一系列的 meta 文件。meta 文件的角色类似于数据库的 catalog/wal,起到 schema 管理、事务管理和数据管理的功能。
15940 2
Data Lake 三剑客——Delta、Hudi、Iceberg 对比分析
|
NoSQL 分布式计算 Spark
Tablestore+Delta Lake(快速开始)
本文介绍如何在E-MapReduce中通过Tablestore Spark Streaming Source将TableStore中的数据实时导入到Delta Lake中。
Tablestore+Delta Lake(快速开始)
|
存储 JSON 分布式计算
使用EMR DataFrame 流处理 Tablestore
使用Spark的DataFrame方式访问表格存储,并在本地和集群上分别进行运行调试。 ### 前提条件 - 了解Spark访问表格存储的依赖包,并在使用时通过maven方式引入项目中。 - Spark相关:spark-core、spark-sql、spark-hive - Spark Tablestore connector:emr-tablestore-.jar
461 0
使用EMR DataFrame 流处理 Tablestore
|
存储 SQL 分布式计算
使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖
本文介绍使用 Data Lake Formation (DLF)服务,实时订阅 Tablestore(原 OTS) 的数据,并以 Delta Lake 的格式投递进入 OSS,构建实时数据湖。 ## 架构介绍 表格存储是一种全托管的云原生数据库,使用表格存储您无需担心软硬件预置、配置、故障、集群扩展、安全等问题。提供高服务可用性的同时极大地减少了管理成本。 表格存储支持多种数据库模型
802 0
使用 Data Lake Formation(DLF) 进行 Tablestore 数据实时入湖