如何实时统计最近 15 秒的商品销售额|Flink-Learning 实战营

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 想要了解如何使用 Flink 实时统计最近 15 秒的商品销售额吗?本实验将以阿里云实时计算 Flink 版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例,并以实时商品销售数据统计的例子,引导开发者上手 Connector 的数据捕获、数据写入等功能。

为进一步帮助开发者学习使用 Flink,Apache Flink 中文社区近期发起 Flink-Learning 实战营项目。本次实战营通过真实有趣的实战场景帮助开发者实操体验 Flink,课程包括实时数据接入、实时数据分析、实时数据应用的场景实。并结合小松鼠助教模式,全方位帮助入营开发者轻松玩转 Flink,点击下方图片扫码即刻入营

本期内容详细介绍 Flink 实战营第一期实战。


想要了解如何使用 Flink 实时统计最近 15 秒的商品销售额吗?本实验将以阿里云实时计算 Flink 版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例,并以实时商品销售数据统计的例子,引导开发者上手 Connector 的数据捕获、数据写入等功能。

完成本次实验后,您将掌握的知识有:

  • 使用 Flink 实时计算平台创建并提交作业的方法;
  • 编写基于 Flink Table API SQL 语句的能力;
  • 使用 MySQL Connector 对数据库进行读写的方法。

Flink MySQL Connector 简介

MySQL Connector 可以将本地或远程的 MySQL 数据库连接到 Flink 中,并方便地使用 Flink Table API 与之交互、捕获数据变更、并将处理结果写回数据库。本实验主要介绍如何在阿里云实时计算平台上使用 Flink MySQL 连接器的相关功能,并使用 Table API 编写一个简单的例子,尝试 MySQL 作为源表、维表、汇表的不同功能。

步骤一:创建资源

开始实验之前,您需要开通阿里云实时计算 Flink 版,创建实验资源。

步骤二:创建数据库表

在这个例子中,我们将创建三张数据表,分别作为源表、维表、汇表,演示 MySQL Connector 的不同功能。

  1. 进入云数据库 RDS 后台,并登录刚刚创建资源的后台页面。
  2. 点击左侧边栏的 + 加号按钮,创建一个测试用数据库,然后在右侧命令区输入以下建表指令并执行:
-- Source Table;
CREATE TABLE `source_table` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `good_id` int DEFAULT NULL,
  `amount` int DEFAULT NULL,
  `record_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
);

-- Dimension Table;
CREATE TABLE `dimension_table` (
  `good_id` int unsigned NOT NULL,
  `good_name` varchar(256) DEFAULT NULL,
  `good_price` int DEFAULT NULL,
  PRIMARY KEY (`good_id`)
);

-- Sink Table;
CREATE TABLE `sink_table` (
  `record_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `good_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `sell_amount` int DEFAULT NULL,
  PRIMARY KEY (`record_timestamp`)
);

步骤三:创建 Flink 作业

  1. 进入实时计算 Flink 平台,点击左侧边栏中的「应用」—「作业开发」菜单,并点击顶部工具栏的「新建」按钮新建一个作业。作业名字任意,类型选择「流作业 / SQL」,其余设置保持默认。如下所示:

1

  1. 成功创建作业后,右侧编辑窗格应该显示新作业的内容:

2

  1. 接下来,我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 MySQL CDC 连接器实时捕获 source_table的变化:
CREATE TEMPORARY TABLE source_table (
    id INT NOT NULL PRIMARY KEY NOT ENFORCED,
    record_time TIMESTAMP_LTZ(3),
    good_id INT,
    amount INT,
    WATERMARK FOR record_time AS record_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'source_table'
);

需要将 hostname参数替换为早些时候创建资源的域名、将 usernamepassword参数替换为数据库登录用户名及密码、将 database-name参数替换为之前在 RDS 后台中创建的数据库名称。

其中,'connector' = 'mysql-cdc'指定了使用 MySQL CDC 连接器来捕获变化数据。您需要使用准备步骤中申请的 RDS MySQL URL、用户名、密码,以及之前创建的数据库名替换对应部分。

任何时候您都可以点击顶部工具栏中的「验证」按钮,来确认作业 Flink SQL 语句中是否存在语法错误。

  1. 为了测试是否成功地捕获了源表数据,紧接着在下面写一行 SELECT * FROM source_table;语句,并点击工具栏中的「执行」按钮。接着,向 source_table表中插入一些数据。如果控制台中打印了相应的数据行,则说明捕获成功,如下图所示:
    3

  2. 接下来,我们希望对原始数据按照时间窗口进行分组计算。我们使用 TUMBLE相关窗口函数结合 GROUP BY,将长度 15 秒内的订单数据按照商品 ID 进行归类,并使用 SUM计算其销售总额。我们在 Flink 作业编辑窗格中输入如下代码:

SELECT 
  good_id, 
  tumble_start(
    record_time, interval '15' seconds
  ) AS record_timestamp, 
  sum(amount) AS total_amount 
FROM 
  source_table 
GROUP BY 
  tumble (
    record_time, interval '15' seconds
  ), 
  good_id;

为了测试这一效果,需要向数据库中插入多条数据。你可以在 RDS 中执行附件中的「示例数据.sql」来插入数据,或者使用「示例数据生成.py」脚本实时地插入数据。

在保证源表中有数据的情况下,再次执行 Flink 作业,观察控制台的输出结果:

4

  1. 在这个业务场景中,购买商品信息使用 good_id记录,而商品 ID 到可读商品名字的映射表、每件商品的价格等信息则存储在另一张维度表(Dimension Table)中。我们同样可以使用 Flink SQL 连接维度表,只需在 Flink 作业中编写下面的语句:
CREATE TEMPORARY TABLE dimension_table (
    good_id INT NOT NULL PRIMARY KEY NOT ENFORCED,
    good_name VARCHAR(256),
    good_price INT
) WITH (
    'connector' = 'mysql',
    'hostname' = '******************.mysql.rds.aliyuncs.com',
    'port' = '3306',
    'username' = '***********',
    'password' = '***********',
    'database-name' = '***********',
    'table-name' = 'dimension_table'
);

这里,我们希望根据上一步中统计出的「每 15 秒商品销售量」信息,计算出每件商品的销售额。由于商品名称及商品价格数据存储在另一张维度表 dimension_table中,我们需要将结果视图和 dimension_table进行 JOIN 操作,并将「商品销售量」、「商品价格」相乘计算出「商品销售额」,并提取结果中的商品可读名称信息作为结果表。

需要确保 dimension_table中存在对应商品 ID 的条目。你可以在 RDS 中执行附件中的「示例数据.sql」来插入数据。

作业代码如下:

SELECT 
  record_timestamp, 
  good_name, 
  total_amount * good_price AS revenue 
FROM 
  (
    SELECT 
      good_id, 
      tumble_start(
        record_time, interval '15' seconds
      ) AS record_timestamp, 
      sum(amount) AS total_amount 
    FROM 
      source_table 
    GROUP BY 
      tumble (
        record_time, interval '15' seconds
      ), 
      good_id
  ) AS tumbled_table 
  LEFT JOIN dimension_table ON tumbled_table.good_id = dimension_table.good_id;

其中第 7 到 20 行和上一步骤的 SQL 语句一致。

执行上面的语句,并观察控制台中的统计数据:

5

  1. 最后,我们将这些实时的统计数据写回数据库,Flink SQL 也可以简单地实现这一点。首先我们需要创建一张用于连接汇表的 Flink 临时表,如下所示:
CREATE TEMPORARY TABLE sink_table (
    record_timestamp TIMESTAMP(3) NOT NULL PRIMARY KEY NOT ENFORCED,
    good_name VARCHAR(128),
    sell_amount INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://******************.mysql.rds.aliyuncs.com:3306/***********',
  'table-name' = 'sink_table',
  'username' = '***********',
  'password' = '***********',
  'scan.auto-commit' = 'true'
);

然后,只需要将上面的 SELECT 语句的输出结果 INSERT 到该表就可以了:

INSERT INTO sink_table 
SELECT 
  record_timestamp, 
  -- ... 和上面的语句一样

现在,点击控制台上的「上线」按钮,即可将我们编写的 Flink SQL 作业部署上线执行。您可以使用数据库客户端等软件观察汇表中是否写入了正确的数据。

阿里云实时计算控制台在使用「执行」功能调试时,不会写入任何数据到下游中。因此为了测试使用 SQL Connector 写入汇表,您必须使用「上线」功能。

6

您也可以进入 Flink UI 控制台观察流数据处理图。在这个简单的示例中,首先进行的是源表数据的捕获与窗口聚合;接着和维度表进行 JOIN 操作得到运算结果;最后将处理数据存入汇表。

想要了解更多商品销售额实时统计的实验信息吗?快来尝试一下吧!

7

入营立享权益


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 消息中间件 缓存
腾讯看点基于 Flink 的实时数仓及多维实时数据分析实践
当业务发展到一定规模,实时数据仓库是一个必要的基础服务。从数据驱动方面考虑,多维实时数据分析系统的重要性也不言而喻。但是当数据量巨大的情况下,拿腾讯看点来说,一天上报的数据量达到万亿级的规模,要实现极低延迟的实时计算和亚秒级的多维实时查询是有技术挑战的。
腾讯看点基于 Flink 的实时数仓及多维实时数据分析实践
|
5月前
|
数据可视化
实时榜单排行计算
实时榜单排行计算
125 0
实时榜单排行计算
|
11月前
|
SQL 关系型数据库 MySQL
实现淘宝母婴订单实时查询和可视化|Flink-Learning实战营
本场景将以 阿里云实时计算Flink版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例、Elasticsearch Connector 连接 Elasticsearch 检索分析服务实例,并以一个淘宝母婴订单实时查询的例子尝试上手 Connector 的数据捕获、数据写入等功能。
521 3
实现淘宝母婴订单实时查询和可视化|Flink-Learning实战营
|
11月前
|
SQL 运维 关系型数据库
实现淘宝母婴订单实时查询和实时大屏实验手册|Flink-Learning 实战营
加入 Flink-Learning 实战营,动手体验真实有趣的实战场景。只需 2 小时,让您变身 Flink 实战派。实战营采取了 Flink 专家在线授课,专属社群答疑,小松鼠助教全程陪伴的学习模式。
11809 2
实现淘宝母婴订单实时查询和实时大屏实验手册|Flink-Learning 实战营
|
12月前
|
数据挖掘
白话Elasticsearch38-深入聚合数据分析之案例实战 下钻分析之统计每季度每个品牌的销售额
白话Elasticsearch38-深入聚合数据分析之案例实战 下钻分析之统计每季度每个品牌的销售额
88 0
|
存储 自然语言处理 机器人
Hologres+大模型初探,让ChatGPT回答商家问题
本文介绍基于Hologres+ChatGPT提供智能客服服务的实践。
2321 1
Hologres+大模型初探,让ChatGPT回答商家问题
|
机器学习/深度学习 存储 数据采集
使用Databricks进行营销效果归因分析的应用实践| 学习笔记
快速学习使用Databricks进行营销效果归因分析的应用实践
152 0
使用Databricks进行营销效果归因分析的应用实践| 学习笔记
|
数据采集 大数据 开发者
离线数据计算-国际查询转换率及其他|学习笔记
快速学习离线数据计算-国际查询转换率及其他
135 0
|
数据采集 分布式计算 关系型数据库
离线计算-国内查询转换率|学习笔记
快速学习离线计算-国内查询转换率
153 0
|
存储 消息中间件 缓存
腾讯看点基于 Flink 构建万亿数据量下的实时数仓及实时查询系统
腾讯看点基于 Flink 构建实时数仓以及实时数据查询系统,亚秒级的响应多维条件查询请求。
腾讯看点基于 Flink 构建万亿数据量下的实时数仓及实时查询系统