Tablestore结合Spark的流批一体SQL实战

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,

作者:王卓然 花名琸然 阿里云存储服务技术专家


背景介绍

电子商务模式是指在网络环境和大数据环境下基于一定技术基础的商务运作方式和盈利模式,对于数据的分析和可视化是电商运营中最重要的部分之一,而电商大屏提供了数据分析和可视化的完美结合。电商大屏包含有全量订单和实时订单的聚合,全量订单的聚合提供的是全景的综合数据视图,而实时订单的聚合展示的是实时的运营指标数据。本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,其效果图如下。image.png

架构设计

在本次的电商大屏实战中,客户端会实时向Tablestore插入原始订单数据,实时流计算会通过Spark Structured Streaming实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore,最终在DataV大屏上进行展示,而离线批计算通过Spark SQL进行原始订单数据的总金额和用户维度总金额的离线聚合,聚合结果也会写回Tablestore, 并最终在DataV大屏上进行展示,整个场景的架构图如下图所示。
image.png

准备工作

1.创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群
2.下载E-MapReduce的最新SDK包,包名的格式为`js
emr-datasources_shaded_*.jar
` 里面会包含有Tablestore相关的Spark批流Source和Sink。

数据源说明

数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间),数据示例如下图所示。
image.png

批流SQL流程详解

创建数据源表

1.登陆EMR Header机器,执行以下命令,启动sql客户端,该客户端用于批流SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版的SDK包。

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

2.创建原始订单数据表(Source表)的外表order_source,该外表将用于后续的流批SQL执行。

DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderSource",
tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);

参数说明:

参数名 解释
endpoint 表格存储实例的访问地址
access.key.id 阿里云账号AK ID
access.key.secret 阿里云账号AK Secret
instance.name 表格存储实例名
table.name 表格存储表名
tunnel.id 表格存储的增量通道ID, 该参数用于实时的增量SQL, 批量SQL时非必须。
catalog 表的字段Schema定义,上述示例中对应的四个列为UserId(主键), OrderId(主键), price, timestamp,数据类型分别为string, string, double, long。

实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore。首先创建流计算的Sink外表order_stream_sink(对应Tablestore表OrderStreamSink),然后运行流计算SQL进行实时聚合,最后将聚合结果实时写回Tablestore目的表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,对应的Sink表中有四个字段,begin(开始时间,主键列,格式为2019-11-27 14:54:00),end(结束时间,主键列),count(订单数),totalPrice(订单总金额)。

// 创建Sink表order_stream_sink对应Tablestore的表OrderStreamSink(主键为begin和end两列)
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

// 在order_source表上创建视图order_source_stream_view
CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");

// 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合,
// 聚合结果将写回Tablestore表OrderStreamSink。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

在运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果存放在OrderStreamSink表中,通过Tablestore和DataV的直连功能,可以很容易的将结果绘制在DataV的大屏上。
image.png

离线批计算

离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合,首先会创建两张Sink表分别存放历史总金额和用户维度总金额的聚合数据,然后直接在源表order_source上运行批计算SQL,最后得到聚合结果。

// 批计算任务
// 用户维度结果表:OrderBatchSink(主键UserId, 属性列count,totalPrice)
// 总数据维度结果表:OrderTotalSink(主键Count, 属性列totalPrice)
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderBatchSink",
tunnel.id="",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

DROP TABLE IF EXISTS order_totol_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderTotalSink",
tunnel.id="",
catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

运行以下批计算SQL进行用户维度聚合结果的更新。

// SQL命令
INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
// 实际运行
spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
Time taken: 5.107 seconds

image.png
运行以下批计算SQL进行总数据维度结果的更新。

// SQL命令
INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
// 实际运行
spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
Time taken: 4.272 seconds

image.png

写在最后

本文通过使用一套存储(Tablestore)和一套计算(Spark)完成了批流计算的有效结合,更多有关批流一体的细节和干货可以参见Tablestore结合Spark的云上流批一体大数据架构
对Tablestore有任何问题,随时欢迎同我们进行交流,钉钉群号:11789671(1群)、23307953(2群)。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
相关文章
|
1月前
|
SQL 数据库 开发者
MSSQL性能调优实战技巧:索引优化、SQL语句微调与并发控制策略
在Microsoft SQL Server(MSSQL)的管理与优化中,性能调优是一项复杂但至关重要的任务
|
1月前
|
SQL 监控 数据库
MSSQL性能调优实战策略:索引优化、SQL语句重构与并发控制
在Microsoft SQL Server(MSSQL)的管理和优化过程中,性能调优是确保数据库高效运行、满足业务需求的重要环节
|
1月前
|
SQL 运维 监控
MSSQL性能调优实战:索引优化、SQL查询效率提升与并发控制策略
在Microsoft SQL Server(MSSQL)的日常运维与性能优化中,精准的策略与技巧是实现高效数据库管理的关键
|
1月前
|
SQL 监控 数据库
MSSQL性能调优实战技巧:索引优化策略、SQL查询重构与并发控制详解
在Microsoft SQL Server(MSSQL)的管理与优化过程中,性能调优是确保数据库高效运行的关键环节
|
1月前
|
SQL 监控 数据库
MSSQL性能调优实战指南:精准索引策略、SQL查询优化与高效并发控制
在Microsoft SQL Server(MSSQL)的性能调优过程中,精准索引策略、SQL查询优化以及高效并发控制是三大核心要素
|
1月前
|
SQL 安全 数据库
Python Web开发者必学:SQL注入、XSS、CSRF攻击与防御实战演练!
【7月更文挑战第26天】在 Python Web 开发中, 安全性至关重要。本文聚焦 SQL 注入、XSS 和 CSRF 这三大安全威胁,提供实战防御策略。SQL 注入可通过参数化查询和 ORM 框架来防范;XSS 则需 HTML 转义用户输入与实施 CSP;CSRF 防御依赖 CSRF 令牌和双重提交 Cookie。掌握这些技巧,能有效加固 Web 应用的安全防线。安全是持续的过程,需贯穿开发始终。
54 1
Python Web开发者必学:SQL注入、XSS、CSRF攻击与防御实战演练!
|
15天前
|
SQL 存储 分布式计算
|
1月前
|
SQL 存储 API
ES 实战复杂sql查询、修改字段类型
ES 实战复杂sql查询、修改字段类型
36 1
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么编写和执行Spark SQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
SQL 运维 监控
MSSQL性能调优实战技巧:索引优化、SQL查询优化与并发控制策略
在Microsoft SQL Server(MSSQL)的运维过程中,性能调优是确保数据库高效运行、满足业务需求的关键环节