前言
阿里云实时计算 Flink 版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。
表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。
本文章将以商品订单场景为基础,介绍如何从 0 到 1 通过 Flink+Tablestore 进行大数据分析。
场景介绍
某大型连锁超市会实时产生大量的消费数据,通过分析这些数据可获取到商品售卖热度、门店经营状态极具商业价值的信息,便于辅助经营者的商业决策。现需要设计一套方案,获取每分钟不同商品类别的 GMV (商品交易总额)。
本文采用云数据库 RDS 作为商品消费订单的存储库,通过 mysql-cdc connector 作为源表接入实时计算Flink。以表格存储 Tablestore 作为商品元信息的存储库,通过 OTS connector 作为维表接入实时计算Flink。配置流计算作业任务计算商品 GMV ,并写入表格存储 Tablestore 结果表进行保存。方案架构图如下:
实现步骤准备工作
创建 RDS MySQL 实例。创建步骤请参考创建 RDS MySQL 实例。
开通表格存储服务,并创建按量模式实例。
开通实时计算服务,并购买Flink全托管集群。
数据源准备
1. 登录RDS控制台,登陆数据管理DMS,创建consume_record表。
CREATE TABLE `consume_record` (
`consume_id` varchar(20) NOT NULL,
`product_id` varchar(20) NOT NULL,
`consume_time` bigint(20) NOT NULL,
`consume_name` varchar(20) NOT NULL,
`consume_phone` varchar(20) NOT NULL,
PRIMARY KEY (`consume_id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8
COMMENT='消费记录数据源表';
2. 登录 Tablestore 控制台。创建 gmv_result、product 两张表。
说明:Tablestore数据表是schema free的,只需要定义主键,无需定义属性列。关于创建表步骤请参考创建数据表。
product表。作为流计算任务的维表,存储商品元数据信息。
字段名 |
字段类型 |
是否主键 |
描述 |
product_ID |
STRING |
是 |
商品ID |
price |
BIGINT |
否 |
商品单价 |
product_type |
STRING |
否 |
商品类别 |
gmv_result表。作为流计算任务的结果表,存储商品交易总额的计算结果。
字段名 |
字段类型 |
是否主键 |
描述 |
product_type |
STRING |
是 |
商品类型 |
gmv_time |
STRING |
是 |
统计时间 |
total_price |
BIGINT |
否 |
商品交易总额 |
Flink 作业配置
1. 登陆 Flink 全托管控制台,创建项目并创建作业,作业名 gmv_pre_aggregation。
作业脚本
-- mysql-cdc 源表
CREATE TEMPORARY TABLE consume_record (
`consume_id` VARCHAR(20),
`product_id` VARCHAR(20),
`consume_time` BIGINT,
`consume_name` VARCHAR(20),
`consume_phone` VARCHAR(20),
PRIMARY KEY(consume_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '',
'port' = '3306',
'username' = '',
'password' = '',
'database-name' = '',
'table-name' = ''
);
-- tablestore 维表
CREATE TEMPORARY TABLE product (
product_ID STRING,
price BIGINT,
product_type STRING,
PRIMARY KEY (product_ID) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = '',
'instanceName' = '',
'tableName' = '',
'accessId' = '',
'accessKey' = ''
);
-- tablestore 结果表
CREATE TEMPORARY TABLE gmv_result (
product_type STRING,
gmv_time BIGINT,
total_price BIGINT,
PRIMARY KEY (product_type) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = '',
'instanceName' = '',
'tableName' = '',
'accessId' = '',
'accessKey' = '',
'valueColumns' = 'gmv_time,total_price'
);
INSERT INTO `gmv_result`
select
d.product_type,
UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as gmv_time,
sum (d.price) as total_price
from
`consume_record` as s
JOIN `product` for system_time as of proctime() as d
on s.product_id = d.product_ID
GROUP BY d.product_type,UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd')
2. 上线作业。
结果展示
作业 gmv_pre_aggregation 直接写入 gmv 结果到 Tablestore,登录 Tablestore 控制台查询 gmv_result 表 即可获取商品交易总额结果。
方案改进
采用 Flink + Tablestore 方案很好地实现了商品交易总额的计算。然而 Flink 作业中固定了源表与维表参与计算的字段,如果业务需求发生变化,不得不重新设计作业脚本,再次进行作业下线上线操作,非常繁琐。所以将对上述方案做一下改进:Flink 流计算作业中仅负责关联消费记录和商品信息数据并写入 Tablestore 中。再通过Tablestore 的功能之一多元索引进行数据分析。多元索引基于倒排索引、列式存储、空间索引等,可解决大数据的复杂查询、分析聚合等需求。通过在数据表上建立多元索引,可实现全文检索、前缀查询、模糊查询、组合查询、统计聚合等功能。方案二架构图如下:
实现步骤
数据源准备
登录 Tablestore 控制台。创建 consume_product 表。
说明:Tablestore数据表是schema free的,只需要定义主键,无需定义属性列。关于创建表步骤请参考创建数据表。
consume_product表。作为流计算任务结果表,保存了商品消费信息与商品元数据信息。
字段名 |
字段类型 |
是否主键 |
描述 |
consume_id |
STRING |
是 |
消费ID(主键) |
product_ID |
STRING |
是 |
商品ID |
product_num |
BIGINT(10) |
否 |
商品数量 |
price |
DOUBLE |
否 |
商品单价 |
consume_time |
BIGINT(20) |
否 |
消费时间 |
product_type |
STRING |
否 |
商品类别 |
Flink 作业配置
1. 登陆 Flink 全托管控制台,创建项目并创建作业,作业名 gmv_post_aggregation。
-- mysql-cdc 源表,与之前相同
CREATE TEMPORARY TABLE consume_record (
`consume_id` VARCHAR(20),
`product_id` VARCHAR(20),
`consume_time` BIGINT,
`consume_name` VARCHAR(20),
`consume_phone` VARCHAR(20),
PRIMARY KEY(consume_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '',
'port' = '3306',
'username' = '',
'password' = '',
'database-name' = '',
'table-name' = ''
);
-- tablestore 维表,与之前相同
CREATE TEMPORARY TABLE product (
product_ID STRING,
price BIGINT,
product_type STRING,
PRIMARY KEY (product_ID) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = '',
'instanceName' = '',
'tableName' = '',
'accessId' = '',
'accessKey' = ''
);
-- tablestore 结果表
CREATE TEMPORARY TABLE consume_product (
consume_id STRING,
product_id STRING,
price BIGINT,
consume_time BIGINT,
consume_name STRING,
consume_phone STRING,
PRIMARY KEY (consume_id,product_id) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = '',
'instanceName' = '',
'tableName' = '',
'accessId' = '',
'accessKey' = '',
'valueColumns' = 'price,consume_time,consume_name,consume_phone'
);
insert into consume_product
select s.consume_id,d.product_ID as product_id,d.price,
UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as consume_time,
s.consume_name,s.consume_phone
from `consume_record` as s
join `product` for system_time as of proctime() as d
on s.product_id = d.product_ID
2. 上线作业
创建多元索引
登录 Tablestore 控制台,在 consume_product 表上建立多元索引。可通过控制台 SQL 查询或 SDK 分析获取商品交易总额信息。
说明:多元索引创建步骤请参考创建及使用多元索引。
创建索引
结果展示
SQL查询
SDK查询
SearchRequest searchRequest = SearchRequest.newBuilder()
.tableName("consume_product")
.indexName("consume_product_index")
.searchQuery(SearchQuery.newBuilder()
.query(QueryBuilders.matchAll())
.addGroupBy(GroupByBuilders.groupByField("groupByProductID","product_id").addSubAggregation(
AggregationBuilders.sum("sumagg","price")
))
.build())
.build();
SearchResponse searchResponse = syncClient.search(searchRequest);
for(GroupByFieldResultItem item : searchResponse.getGroupByResults().getAsGroupByFieldResult("groupByProductID").getGroupByFieldResultItems()){
System.out.println("商品ID:"+item.getKey()+",交易总额:"+item.getSubAggregationResults().getAsSumAggregationResult("sumagg").getValue());
}
SDK查询结果
商品ID:A001,交易总额:20.0
商品ID:A002,交易总额:40.0
商品ID:A004,交易总额:20.0
商品ID:A003,交易总额:5.0
商品ID:A005,交易总额:15.0
商品ID:A006,交易总额:5.0
商品ID:A008,交易总额:5.0
联系我们
本篇文章演示了基于 Flink + Tablestore 方案在大数据计算场景下的应用。后续,我们会推出 Flink on Tablestore 系列文章,并针对维表和结果表场景推出最佳实践文章。
希望本次文章对你有帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。