实现淘宝母婴订单实时查询和可视化|Flink-Learning实战营

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本场景将以 阿里云实时计算Flink版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例、Elasticsearch Connector 连接 Elasticsearch 检索分析服务实例,并以一个淘宝母婴订单实时查询的例子尝试上手 Connector 的数据捕获、数据写入等功能。

作者|吴西荣@阿里云

为进一步帮助开发者学习使用 Flink,Apache Flink 中文社区近期发起 Flink-Learning 实战营项目。本次实战营通过真实有趣的实战场景帮助开发者实操体验 Flink,课程包括实时数据接入、实时数据分析、实时数据应用的场景实。并结合小松鼠助教模式,全方位帮助入营开发者轻松玩转 Flink,点击下方图片扫码即刻入营

图片

本期将继续详细介绍 Flink- Learning 实战营。


实验简介

随着“全面二孩”政策落地、居民可支配收入稳步增加等因素的刺激,中国的母婴消费市场持续增长。与此同时,随着国民消费升级 90 后宝爸、宝妈人数剧增,消费需求与消费理念都发生了巨大的变化。据罗兰贝格最新公布的报告预计,已经经过了 16 个年头发展的母婴行业,到 2020 年,整体规模将达到 3.6 万亿元,2016-2020 年复合增速高达 17%,行业前景看起来一片光明。如此大好形势下,母婴人群在母婴消费上有什么特点?消费最高的项目是什么?

本场景中订单和婴儿信息存储在 MySQL 中,对于订单表,为了方便进行分析,我们让它关联上其对应的婴儿信息,构成一张宽表,使用 Flink 实时把它写到 Elasticsearch 中;另一方面数据经过分组聚合后,计算出订单数量和婴儿出生的关系,实时把它写到 Elasticsearch 中并展示到 Kibana 大屏中。

实验资源

实验所开通的云产品因数据连通性要求,需使用同一 Region 可用区,建议都选取北京 Region 的同一可用区。涉及的云产品包括阿里云实时计算 Flink 版、检索分析服务 Elasticsearch 版、阿里云数据库 RDS。

体验目标

本场景将以 阿里云实时计算Flink版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例、Elasticsearch Connector 连接 Elasticsearch 检索分析服务实例,并以一个淘宝母婴订单实时查询的例子尝试上手 Connector 的数据捕获、数据写入等功能。

按步骤完成本次实验后,您将掌握的知识有:

  • 使用 Flink 实时计算平台创建并提交作业的方法;
  • 编写基于 Flink Table API SQL 语句的能力;
  • 使用 MySQL Connector 对数据库进行读取的方法;
  • 使用 Elasticsearch Connector 对数据库进行写入的方法。

步骤一:创建资源

开始实验之前,您需要先创建相关实验资源,确保 RDS 实例、Elasticsearch 实例、Flink 实例在同一个 VPC 网络下,并配置完成 RDS 白名单、Elasticsearch 白名单使网络打通。

步骤二:创建数据库表

在这个例子中,我们将创建三张数据表,其中一张 orders_dataset_tmp 是导入数据的临时表,其他两张作为源表,体验淘宝母婴订单实时查询。

1.点击云数据库 RDS 控制台「实例列表」,切换到上面创建实例所在的 region,点击自己的实例名称进入详情页,首次使用分别点击「账号管理」和「数据库管理」,创建账号和数据库并使账号绑定到指定数据库。

2.点击云数据库 RDS 实例详情页上方「登录数据库」,会自动跳转到 DMS 数据管理平台,输入用户名和密码登录刚刚创建的实例,点击左侧「数据库实例」-「已登录实例」列表,双击要编辑的数据库名,然后在右侧 SQL Console 命令区输入以下建表指令并执行:

create table orders_dataset_tmp(
    user_id bigint comment '用户身份信息',            
    auction_id bigint comment '购买行为编号',        
    cat_id bigint comment '商品种类序列号',            
    cat1 bigint comment '商品序列号(根类别)',                
    property TEXT comment '商品属性',            
    buy_mount int comment '购买数量',            
    day TEXT comment '购买时间'                
);

create table orders_dataset(
    order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单id',
    user_id bigint comment '用户身份信息',            
    auction_id bigint comment '购买行为编号',        
    cat_id bigint comment '商品种类序列号',            
    cat1 bigint comment '商品序列号(根类别)',                
    property TEXT comment '商品属性',            
    buy_mount int comment '购买数量',            
    day TEXT comment '购买时间'                
);

--
create table baby_dataset(
    user_id bigint NOT NULL PRIMARY KEY,    
    birthday text comment '婴儿生日',
    gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown'
);

3.在 DMS 数据管理平台,点击左侧「常用功能」-「数据导入」,配置如下信息后点击提交申请,将 (sample)sam_tianchi_mum_baby_trade_history.csv 导入 orders_dataset_tmp 表,(sample)sam_tianchi_mum_baby.csv 导入 baby_dataset 表。

配置项 说明
数据库 模糊搜索数据库名后点击对应的 MySQL 实例
文件编码 自动识别
导入模式 极速模式
文件类型 CSV 格式
目标表 模糊搜索要导入的表名后点击选中
数据位置 选择第1行为属性
写入方式 INSERT
附件 点击上传按钮上传要导入到表的对应文件

导入完成之后执行以下 SQL 将订单数据导入到订单源表 orders_dataset 中。

insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
select * from orders_dataset_tmp;

步骤三:配置 Elasticsearch 自动创建索引

进入检索分析服务控制台,Elasticsearch 实例列表找到自己的实例,然后点击实例名进入详情界面,点击「配置与管理」-「ES 集群配置」,点击「修改配置」,选择「允许自动创建索引」,点击「确定」。

1

2

修改配置需要等待十几分钟,请耐心等待配置变更完成后再继续使用 Elasticsearch。

步骤四:创建实时查询 SQL 作业

1.进入实时计算 Flink 平台,点击左侧边栏中的「应用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型选择「流作业 / SQL」,其余设置保持默认。如下所示:

3

2.成功创建作业后,右侧编辑窗格应该显示新作业的内容:

4

3.接下来,我们在右侧编辑窗格中输入以下语句来创建二张临时表,并使用 MySQL CDC 连接器实时捕获 orders_datasetbaby_dataset的变化:

CREATE TEMPORARY TABLE orders_dataset (
    order_id BIGINT,
  `user_id` bigint,            
    auction_id bigint,        
    cat_id bigint,            
    cat1 bigint,                
    property varchar,            
    buy_mount int,            
    `day` varchar    ,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'orders_dataset'
);
CREATE TEMPORARY TABLE baby_dataset (
    `user_id` bigint,
    birthday varchar,
    gender int,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'baby_dataset'
);

需要将 hostname参数替换为早些时候创建资源的域名、将 usernamepassword参数替换为数据库登录用户名及密码、将 database-name参数替换为之前在 RDS 后台中创建的数据库名称。

其中,'connector' = 'mysql'指定了使用 MySQL CDC 连接器来捕获变化数据。您需要使用准备步骤中申请的 RDS MySQL URL、用户名、密码,以及之前创建的数据库名替换对应部分。

任何时候您都可以点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

4.为了测试是否成功地捕获了源表数据,紧接着在下面写一行 SELECT * FROM source_table;语句,选中临时表和 select 语句,并点击工具栏中的「执行」按钮。如果控制台中打印了相应的数据行,则说明捕获成功,如下图所示:

5

5.我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 Elasticsearch 连接器连接到 Elasticsearch 实例:

CREATE TEMPORARY TABLE es_sink(
    order_id BIGINT,
    `user_id` bigint,            
    auction_id bigint,        
    cat_id bigint,            
    cat1 bigint,                
    property varchar,            
    buy_mount int,            
    `day` varchar    ,
    birthday varchar,
    gender int,
   PRIMARY KEY(order_id) NOT ENFORCED  -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
) WITH (
'connector' = 'elasticsearch-7',
  'hosts' = 'http://**********:9200',
  'index' = 'enriched_orders',
  'username' ='elastic',
  'password' ='*******'--创建ES实例时自定义的密码
);

需要将 hosts参数替换为早些时候创建资源的域名、将 password参数替换为登录 Kibana 密码。

其中,'connector' = 'elasticsearch-7'指定了使用 Elasticsearch 连接器来连接 Elasticsearch 实例写入数据。您需要使用准备步骤中申请的 Elasticsearch URL、用户名、密码。

6.接下来,我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。并把宽表数据写入到 Elasticsearch 的 enriched_orders 索引中。我们在 Flink 作业编辑窗格中输入如下代码:

INSERT INTO es_sink
SELECT o.*,
    b.birthday,
    b.gender
FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o
LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b
    ON o.user_id = b.user_id;

在保证源表中有数据的情况下,再次执行 Flink 作业,观察控制台的输出结果:

6

现在,点击控制台上的「上线」按钮,即可将我们编写的 Flink SQL 作业部署上线执行。您可以登录 Kibana 点击「Stack Management」-「Index Management」搜索 enriched_orders 查看enriched_orders索引是否成功创建。

阿里云实时计算控制台在使用「执行」功能调试时,不会写入任何数据到下游中。因此为了测试使用 SQL Connector 写入汇表,您必须使用「上线」功能。

7

您也可以进入 Flink UI 控制台观察流数据处理图。

7.Elasticsearch 的enriched_orders索引创建成功后,点击「Discover」 -「create index pattern」 ,输入enriched_orders,点击「Next step」 - 「create index pattern」,创建完成后就可以在「Kibana」-「Discover」看到写入的数据了。

8

8.接下来,我们通过对 MySQL 中源表的数据进行增改删操作,每执行一步就刷新一下「Kibana」-「Discover」界面,观察数据的变化。

8.1 order_dataset 表添加一条数据

insert into orders_dataset values ( DEFAULT ,2222222,2222222,50018855,38,'21458:33304;6933666:4421827;21475:137319;12121566:3861755',1,'20130915');

9

8.2 baby_dataset 表中添加一条数据

insert into baby_dataset values(144335047,'20150523',1);

写入前

10

写入后

11

8.3 order_dataset表更新一条数据

select order_id from orders_dataset where user_id = 2757;
--根据查到的order_id更新数据
UPDATE orders_dataset SET auction_id = 2222223 WHERE order_id = ;

更新前

12

更新后

13

8.4 order_dataset 表中删除一条数据

select order_id from orders_dataset where user_id = 2222222;
DELETE FROM orders_dataset WHERE order_id = ;

删除前

14

删除后

15

步骤五:创建实时大屏 SQL 作业

前面四步和步骤四的前面四步相同,区别在于后面步骤作业的处理逻辑 SQL 不同,要统计的指标不同,所以 Elasticsearch 的 Schema 与之前不同。

1.进入实时计算 Flink 平台,点击左侧边栏中的「应用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型选择「流作业 / SQL」,其余设置保持默认。如下所示:

16

2.成功创建作业后,右侧编辑窗格应该显示新作业的内容:

17

3.接下来,我们在右侧编辑窗格中输入以下语句来创建二张临时表,并使用 MySQL CDC 连接器实时捕获 orders_datasetbaby_dataset的变化:

CREATE TEMPORARY TABLE orders_dataset (
    order_id BIGINT,
  `user_id` bigint,            
    auction_id bigint,        
    cat_id bigint,            
    cat1 bigint,                
    property varchar,            
    buy_mount int,            
    `day` varchar    ,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'orders_dataset'
);
CREATE TEMPORARY TABLE baby_dataset (
    `user_id` bigint,
    birthday varchar,
    gender int,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'baby_dataset'
);

需要将 hostname参数替换为早些时候创建资源的域名、将 usernamepassword参数替换为数据库登录用户名及密码、将 database-name参数替换为之前在 RDS 后台中创建的数据库名称。

其中,'connector' = 'mysql'指定了使用 MySQL CDC 连接器来捕获变化数据。您需要使用准备步骤中申请的 RDS MySQL URL、用户名、密码,以及之前创建的数据库名替换对应部分。

任何时候您都可以点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

4.为了测试是否成功地捕获了源表数据,紧接着在下面写一行 SELECT * FROM source_table;语句,选中临时表和select语句,并点击工具栏中的「执行」按钮。如果控制台中打印了相应的数据行,则说明捕获成功,如下图所示:

18

5.我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 Elasticsearch 连接器连接到 Elasticsearch 实例:

CREATE TEMPORARY TABLE es_sink(
  day_year varchar,
  `buy_num` bigint,            
    baby_num bigint,
  PRIMARY KEY(day_year) NOT ENFORCED  -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
) WITH (
'connector' = 'elasticsearch-7',
  'hosts' = 'http://**********:9200',
  'index' = 'enriched_orders_view',
  'username' ='elastic',
  'password' ='*******'--创建ES实例时自定义的密码
);

需要将 hosts参数替换为早些时候创建资源的域名、将 password参数替换为登录Kibana密码。

其中,'connector' = 'elasticsearch-7'指定了使用 Elasticsearch 连接器来连接 Elasticsearch 实例写入数据。您需要使用准备步骤中申请的 Elasticsearch URL、用户名、密码。

6.接下来,我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。然后对宽表数据的订单时间取到月份进行分组 GROUP BY,并统计每个分组中订单的购买数量SUM和出生婴儿的数量COUNT,并将结果数据写入到 Elasticsearch 的 enriched_orders_view索引中。我们在 Flink 作业编辑窗格中输入如下代码:

INSERT INTO es_sink
SELECT 
    SUBSTRING(tmp1.`day` FROM 1 FOR 6) as day_year,
    SUM(tmp1.buy_mount) as buy_num,
    COUNT(birthday) as baby_num
FROM(
    SELECT o.*,
        b.birthday,
        b.gender
    FROM orders_dataset /*+ OPTIONS('server-id'='123456-123457') */ o
    LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123458-123459') */ as b
        ON o.user_id = b.user_id
) tmp1
GROUP BY SUBSTRING(tmp1.`day` FROM 1 FOR 6)

在保证源表中有数据的情况下,再次执行 Flink 作业,观察控制台的输出结果:

19

现在,点击控制台上的「上线」按钮,即可将我们编写的 Flink SQL 作业部署上线执行。您可以登录 Kibana 点击「Stack Management」-「Index Management」搜索 enriched_orders_view 查看enriched_orders_view索引是否成功创建。

阿里云实时计算控制台在使用「执行」功能调试时,不会写入任何数据到下游中。因此为了测试使用 SQL Connector 写入汇表,您必须使用「上线」功能。

20

您也可以进入 Flink UI 控制台观察流数据处理图。

7.Elasticsearch 的enriched_orders_view索引创建成功后,点击「Discover」 -「create index pattern」 ,输入enriched_orders_view,点击「Next step」 - 「create index pattern」,创建完成后就可以在「Kibana」-「Discover」看到写入的数据了。

21

8.在「Discover」界面点击左下角「Available fields」-「baby_num」,点击后会展示「TOP 5 VALUES」小窗口,点击窗口下方的「Visualize」,即可跳转到可视化图表界面。

22

跳转界面后切换图形格式为柱状图 Bar。

23

24

配置右侧X-axisY-axis

X-axis 配置Select a field为day_year.keyword,Number of values选择到最大100,order by选择 alphabetical ,order direction 选择 ascending,Display name自定义横轴名称,此处定义为 day_year_month ,然后点击Close

25

Y-axis 配置Select a field为buy_num,Display name自定义纵轴名称,此处定义为 buy_num ,Axis side 选择 Left ,然后点击Close。界面中间即生成了对应的折线图。

26

9.点击右下角的+,新建一个 layer,切换新建的 layer 的图格式为折线图 Line

27

28

配置右侧X-axisY-axis

X-axis配置Select a field为day_year.keyword,Number of values选择到最大100,order by选择 alphabetical ,order direction 选择 ascending,Display name自定义横轴名称,此处定义为 day_year_month ,然后点击Close

与上一个 X-axis 配置完全相同。

Y-axis 配置Select a field为baby_num,Display name自定义纵轴名称,此处定义为 baby_num ,Value format选择Pecent,Axis side 选择 Right ,然后点击Close。界面中间即生成了对应的折线图与柱状图的复合图。

29

10.最后点击右上角的Save,定义此图表的名称即可保存。

想要了解更多商品销售额实时统计的实验信息吗?快来尝试一下吧!

img

点击即刻入营


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
大数据 流计算
大数据Flink实现订单自动好评
大数据Flink实现订单自动好评
46 0
|
11月前
|
SQL 运维 关系型数据库
实现淘宝母婴订单实时查询和实时大屏实验手册|Flink-Learning 实战营
加入 Flink-Learning 实战营,动手体验真实有趣的实战场景。只需 2 小时,让您变身 Flink 实战派。实战营采取了 Flink 专家在线授课,专属社群答疑,小松鼠助教全程陪伴的学习模式。
11809 2
实现淘宝母婴订单实时查询和实时大屏实验手册|Flink-Learning 实战营
|
Java 流计算
Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单(下)
Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单
176 0
Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单(下)
|
存储 算法 Java
Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单(上)
Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单
464 0
Flink状态管理与Checkpoint实战——模拟电商订单计算过程中宕机的场景,探索宕机恢复时如何精准继续计算订单(上)
|
大数据 流计算
flink实战 —— 定时器实现已完成订单自动五星好评
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 背景需求 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,24小时之内没有做出评价,系统自动给与五星好评,我们今天主要使用flink的定时器来简单实现这一功能。
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
487 5
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1370 1
官宣|Apache Flink 1.19 发布公告
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
147 3
|
2月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0
|
2月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
86 1

相关产品

  • 实时计算 Flink版