Tablestore结合Spark的流批一体SQL实战

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,

作者:王卓然 花名琸然 阿里云存储服务技术专家


背景介绍

电子商务模式是指在网络环境和大数据环境下基于一定技术基础的商务运作方式和盈利模式,对于数据的分析和可视化是电商运营中最重要的部分之一,而电商大屏提供了数据分析和可视化的完美结合。电商大屏包含有全量订单和实时订单的聚合,全量订单的聚合提供的是全景的综合数据视图,而实时订单的聚合展示的是实时的运营指标数据。本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,其效果图如下。image.png

架构设计

在本次的电商大屏实战中,客户端会实时向Tablestore插入原始订单数据,实时流计算会通过Spark Structured Streaming实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore,最终在DataV大屏上进行展示,而离线批计算通过Spark SQL进行原始订单数据的总金额和用户维度总金额的离线聚合,聚合结果也会写回Tablestore, 并最终在DataV大屏上进行展示,整个场景的架构图如下图所示。
image.png

准备工作

1.创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群
2.下载E-MapReduce的最新SDK包,包名的格式为`js
emr-datasources_shaded_*.jar
` 里面会包含有Tablestore相关的Spark批流Source和Sink。

数据源说明

数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间),数据示例如下图所示。
image.png

批流SQL流程详解

创建数据源表

1.登陆EMR Header机器,执行以下命令,启动sql客户端,该客户端用于批流SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版的SDK包。

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

2.创建原始订单数据表(Source表)的外表order_source,该外表将用于后续的流批SQL执行。

DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderSource",
tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);

参数说明:

参数名 解释
endpoint 表格存储实例的访问地址
access.key.id 阿里云账号AK ID
access.key.secret 阿里云账号AK Secret
instance.name 表格存储实例名
table.name 表格存储表名
tunnel.id 表格存储的增量通道ID, 该参数用于实时的增量SQL, 批量SQL时非必须。
catalog 表的字段Schema定义,上述示例中对应的四个列为UserId(主键), OrderId(主键), price, timestamp,数据类型分别为string, string, double, long。

实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore。首先创建流计算的Sink外表order_stream_sink(对应Tablestore表OrderStreamSink),然后运行流计算SQL进行实时聚合,最后将聚合结果实时写回Tablestore目的表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,对应的Sink表中有四个字段,begin(开始时间,主键列,格式为2019-11-27 14:54:00),end(结束时间,主键列),count(订单数),totalPrice(订单总金额)。

// 创建Sink表order_stream_sink对应Tablestore的表OrderStreamSink(主键为begin和end两列)
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

// 在order_source表上创建视图order_source_stream_view
CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");

// 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合,
// 聚合结果将写回Tablestore表OrderStreamSink。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

在运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果存放在OrderStreamSink表中,通过Tablestore和DataV的直连功能,可以很容易的将结果绘制在DataV的大屏上。
image.png

离线批计算

离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合,首先会创建两张Sink表分别存放历史总金额和用户维度总金额的聚合数据,然后直接在源表order_source上运行批计算SQL,最后得到聚合结果。

// 批计算任务
// 用户维度结果表:OrderBatchSink(主键UserId, 属性列count,totalPrice)
// 总数据维度结果表:OrderTotalSink(主键Count, 属性列totalPrice)
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderBatchSink",
tunnel.id="",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

DROP TABLE IF EXISTS order_totol_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderTotalSink",
tunnel.id="",
catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

运行以下批计算SQL进行用户维度聚合结果的更新。

// SQL命令
INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
// 实际运行
spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
Time taken: 5.107 seconds

image.png
运行以下批计算SQL进行总数据维度结果的更新。

// SQL命令
INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
// 实际运行
spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
Time taken: 4.272 seconds

image.png

写在最后

本文通过使用一套存储(Tablestore)和一套计算(Spark)完成了批流计算的有效结合,更多有关批流一体的细节和干货可以参见Tablestore结合Spark的云上流批一体大数据架构
对Tablestore有任何问题,随时欢迎同我们进行交流,钉钉群号:11789671(1群)、23307953(2群)。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
47 3
|
1月前
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
49 0
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
70 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
88 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
59 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
75 0
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
134 13
下一篇
DataWorks