背景
在订单系统中,利用 Spark streamimg 或者 Flink 对接数据流并进行数据分析是一种常见请求。常见的场景如下:
大促开始后,实时显示当前总成交金额。
大促开始后,实时画出成交量走势图。
……
MySQL 若需要支持此类场景,需要开发应用,解析 binlog 中的数据,对接中间件,开发成本运维成本都会更高,且系统复杂度也会提升。而表格存储提供了通道服务,可以直接对接 Spark streaming 或 Flink,不仅省去了开发者解析数据、对接中间件的工作,且可以将开发者从复杂的系统架构中解放出来,更加专注于业务逻辑的处理。
本文将一步一步展示,如何利用表格存储通道服务,实现对订单系统实时成交额和订单数的统计工作。
基于通道服务的 Tablestore 解决方案
通道服务说明
通道服务(Tunnel Service)是基于表格存储数据接口上的全增量一体化服务。通道服务提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立数据通道,可以简单地实现对表中历史存量和新增数据的消费处理。具体可参考通道服务概述。
我们可以将 Spark streamimg 或者 Flink 和 Tablestore 的通道服务进行对接,对于表格存储中的数据变动,进行实时计算,完成上述场景中的需求。
本文架构
本文分别使用 Spark streaming 和 Flink 对接 Tablestore 的通道服务,完成对订单数量和成交额的实时统计。原始订单数据由 Java 服务写入到 MySQL,再由 DTS 服务将数据同步到 Tablestore,这一部分内容已经在前面的文章中详细说明。Spark streaming / Flink 通过通道服务拿到实时数据变化,聚合,将统计结果写回到 Tablestore 中的 sink 表中。完整架构如下:
准备工作
Tablestore 申请
创建源表并开通 Tunnel 服务
在 Tablestore 中建表的过程不再进行描述。本文,使用 DTS 同步工具将 MySQL 中的数据同步到 Tablestore 中,Tablestore 中的订单表 order_contract 为 DTS 服务自动创建,其表结构如下:
列名称 |
类型 |
列说明 |
字段内容说明 |
oId |
STRING |
主键 |
订单id |
c_id |
STRING |
客户id |
|
c_name |
STRING |
客户名称 |
|
create_time |
STRING |
订单创建时间 |
|
has_paid |
INTEGER |
是否已经支付 |
|
p_brand |
STRING |
商品品牌 |
|
p_count |
INTEGER |
商品数量 |
|
p_id |
STRING |
商品id |
|
p_name |
STRING |
商品名称 |
|
p_price |
DOUBLE |
商品单价 |
|
pay_time |
STRING |
支付时间 |
|
s_id |
STRING |
商家id |
|
s_name |
STRING |
商家名称 |
|
total_price |
DOUBLE |
订单金额 |
建表后在实例管理页,找到“数据表列表”,点击刚刚创建的表order_contract进入表管理页面。
在表管理页面,选择实时消费通道,创建通道,
通道名称自定,通道类型选择增量。点击确定完成通道创建。
此时可以看到创建的通道的通道 id。
创建 sink 表
sink 表用于存储 Spark streaming 计算后的结果数据。在本文中 sink 表命名为 order_sink,用于存储单位时间段内订单成交数量和成交额。其表结构如下:
列名称 |
类型 |
列说明 |
字段内容说明 |
order_start |
STRING |
主键 |
记录时间段的开始时间 |
order_end |
STRING |
记录时间段的结束时间 |
|
order_count |
INTEGER |
记录时间范围内的总订单数 |
|
total_price |
DOUBLE |
记录时间范围内的总成交额 |
Spark streaming 对接 Tunnel
创建集群
创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群。
登录 Spark-sql 客户端
在集群管理页面,点击创建的集群。
点击主机列表,点击emr-header-1机器。
点击远程连接,
选择立即登录。
输入创建集群时设定的密码。登录机器。
在指令行输入以下指令,登录 Spark-sql 客户端,
streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2
进入如下界面。
流计算
在 Spark-sql 中执行 SQL,创建源表,
DROP TABLE IF EXISTS order_contract;
CREATE TABLE order_contract
USING tablestore
OPTIONS(
endpoint="https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="test-20210609",
table.name="order_contract",
tunnel.id="3d71bb67-58da-4c72-b36f-08b79df7c85d",
catalog='{"columns": {"oId": {"col": "oId", "type": "string"}, "total_price": {"col": "total_price", "type": "double"}, "pay_time": {"cols": "pay_time", "type": "long"}}}'
);
执行以下 SQL 创建目标表,
DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="test-20210609",
table.name="order_sink",
catalog='{"columns":{"order_start":{"col":"order_start","type":"string"},"order_end":{"col":"order_end","type":"string"},"order_count":{"col":"order_count","type":"long"},"total_price":{"total_price":"end","type":"double"}}}'
);
在目标表建立视图,
CREATE SCAN order_contract_view ON order_contract USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");
创建 Stream 任务,
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
SELECT CAST(window.start AS String) AS order_start, CAST(window.end AS String) AS order_end,
count(*) AS order_count, sum(total_price) AS total_price
FROM order_contract_view
GROUP BY window(to_timestamp(pay_time / 1000000), "30 seconds");
提交任务后,可以看到机器执行流式任务输出的日志。
计算结果
可以在 order_sink 表中看到实时统计结果如图。
Flink 对接 Tunnel
创建集群
下单后,在控制台,点击创建集群。
选择订单号,点击下一步。
填入集群信息。
选择 OSS Bucket、VPC、Zone。若没有可以点击右侧问号新建。点击下一步,点击创建。
等待集群启动,启动成功后,可以点击创建项目。
创建项目
点击创建项目,填入参数。点击确定。
可以在项目列表看到新建的项目。
流计算
在项目列表点击新建的项目,点击开发。
新建作业。输入作业名。
创建作业成功后,在页面中填入如下 SQL。
CREATE TABLE order_contract (
oId VARCHAR,
c_id VARCHAR,
c_name VARCHAR,
create_time VARCHAR,
has_paid BIGINT,
p_brand VARCHAR,
p_count BIGINT,
p_id VARCHAR,
p_name VARCHAR,
p_price DOUBLE,
pay_time BIGINT,
s_id VARCHAR,
s_name VARCHAR,
total_price DOUBLE,
ts AS PROCTIME(),
primary key(oId)
) WITH (
type = 'ots',
instanceName = 'test-20210609',
tableName = 'order_contract',
accessId = '',
accessKey = '',
endPoint = 'https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com',
tunnelName = 'test20210610'
);
CREATE TABLE order_sink (
order_start VARCHAR,
order_end VARCHAR,
order_count BIGINT,
total_price DOUBLE,
primary key(order_start)
) WITH (
type = 'ots',
instanceName = 'test-20210609',
tableName = 'order_sink',
accessId = '',
accessKey = '',
endPoint = 'https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com',
valueColumns = 'order_end,order_count,total_price'
);
INSERT INTO order_sink
SELECT
DATE_FORMAT(TUMBLE_START(order_contract.ts, INTERVAL '30' SECOND), 'yyyy-MM-dd hh:mm:ss') AS order_start,
DATE_FORMAT(TUMBLE_END(order_contract.ts, INTERVAL '30' SECOND), 'yyyy-MM-dd hh:mm:ss') AS order_end,
COUNT(oId) as order_count,
SUM(total_price) as total_price
FROM order_contract
GROUP BY TUMBLE(ts, INTERVAL '30' SECOND);
点击语法检查,通过后。点击上线。
选择系统分配,点击下一步继续。
SQL 检查通过后,继续点击下一步。填入期望事件后,点击下一步,点击上线。
上线成功后,可以在运维页看到作业如图。
计算结果
可以在 order_sink 表中看到实时统计结果如图。
总结
表格存储通过通道服务,对流式数据计算进行了支持。相比于 MySQL 中解析 binlog 方案,使用 Tablestore 通道服务直接对接流式处理工具,运维开发成本更低,系统架构更加简单。本文分别使用了 Spark streaming 和 Flink 对订单系统中典型场景进行了模拟操作,完成了实时统计交易额、订单数的流式作业,整个过程简单易懂。