Tablestore结合Spark的流批一体SQL实战

本文涉及的产品
对象存储 OSS,20GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
阿里云盘企业版 CDE,企业版用户数5人 500GB空间
简介:

背景介绍

电子商务模式是指在网络环境和大数据环境下基于一定技术基础的商务运作方式和盈利模式,对于数据的分析和可视化是电商运营中最重要的部分之一,而电商大屏提供了数据分析和可视化的完美结合。电商大屏包含有全量订单和实时订单的聚合,全量订单的聚合提供的是全景的综合数据视图,而实时订单的聚合展示的是实时的运营指标数据。本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,其效果图如下。
1

架构设计

在本次的电商大屏实战中,客户端会实时向Tablestore插入原始订单数据,实时流计算会通过Spark Structured Streaming实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore,最终在DataV大屏上进行展示,而离线批计算通过Spark SQL进行原始订单数据的总金额和用户维度总金额的离线聚合,聚合结果也会写回Tablestore, 并最终在DataV大屏上进行展示,整个场景的架构图如下图所示。
2

准备工作

  1. 创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群
  2. 下载E-MapReduce的最新SDK包,包名的格式为emr-datasources_shaded_*.jar,里面会包含有Tablestore相关的Spark批流Source和Sink。

数据源说明

数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间),数据示例如下图所示。
3

批流SQL流程详解

创建数据源表

1.登陆EMR Header机器,执行以下命令,启动sql客户端,该客户端用于批流SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版的SDK包。

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

2.创建原始订单数据表(Source表)的外表order_source,该外表将用于后续的流批SQL执行。

DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderSource",
tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);

参数说明:

参数名 解释
endpoint 表格存储实例的访问地址
access.key.id 阿里云账号AK ID
access.key.secret 阿里云账号AK Secret
instance.name 表格存储实例名
table.name 表格存储表名
tunnel.id 表格存储的增量通道ID, 该参数用于实时的增量SQL, 批量SQL时非必须。
catalog 表的字段Schema定义,上述示例中对应的四个列为UserId(主键), OrderId(主键), price, timestamp,数据类型分别为string, string, double, long。

实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore。首先创建流计算的Sink外表order_stream_sink(对应Tablestore表OrderStreamSink),然后运行流计算SQL进行实时聚合,最后将聚合结果实时写回Tablestore目的表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,对应的Sink表中有四个字段,begin(开始时间,主键列,格式为2019-11-27 14:54:00),end(结束时间,主键列),count(订单数),totalPrice(订单总金额)。

// 创建Sink表order_stream_sink对应Tablestore的表OrderStreamSink(主键为begin和end两列)
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

// 在order_source表上创建视图order_source_stream_view
CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");

// 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合,
// 聚合结果将写回Tablestore表OrderStreamSink。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

在运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果存放在OrderStreamSink表中,通过Tablestore和DataV的直连功能,可以很容易的将结果绘制在DataV的大屏上。
4

离线批计算

离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合,首先会创建两张Sink表分别存放历史总金额和用户维度总金额的聚合数据,然后直接在源表order_source上运行批计算SQL,最后得到聚合结果。

// 批计算任务
// 用户维度结果表:OrderBatchSink(主键UserId, 属性列count,totalPrice)
// 总数据维度结果表:OrderTotalSink(主键Count, 属性列totalPrice)
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderBatchSink",
tunnel.id="",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

DROP TABLE IF EXISTS order_totol_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderTotalSink",
tunnel.id="",
catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

运行以下批计算SQL进行用户维度聚合结果的更新。

// SQL命令
INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
// 实际运行
spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
Time taken: 5.107 seconds

5

运行以下批计算SQL进行总数据维度结果的更新。

// SQL命令
INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
// 实际运行
spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
Time taken: 4.272 seconds

6

写在最后

本文通过使用一套存储(Tablestore)和一套计算(Spark)完成了批流计算的有效结合,更多有关批流一体的细节和干货可以参见Tablestore结合Spark的云上流批一体大数据架构
对Tablestore有任何问题,随时欢迎同我们进行交流,钉钉群号:11789671(1群)、23307953(2群)。

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
26天前
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
40 3
|
26天前
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
45 0
|
1月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 关系型数据库 MySQL
sql注入原理与实战(三)数据库操作
sql注入原理与实战(三)数据库操作
sql注入原理与实战(三)数据库操作
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
61 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
87 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
53 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
72 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
45 0
|
2月前
|
SQL 数据处理 数据库
SQL语句优化与查询结果优化:提升数据库性能的实战技巧
在数据库管理和应用中,SQL语句的编写和查询结果的优化是提升数据库性能的关键环节