开发者学堂课程【开源 Flink 极速上手教程:Flink SQL_Table 介绍与实战】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/331/detail/3711
Flink SQL_Table 介绍与实战(二)
六、Demo 演练
通过一个实战演练的 demo 去了解这些功能,
这个 demo 是电商的用户行为分析,会从 kafka 中去实时消费用户的数据,与Mexico中的数据进行关联,然后将它写到 elastisearch 中,然后进行格式化处理,这是一个端到端的实时应用的构建,在整个过程中,只用到了 flink CRI,因此如果对Java 不了解的话,就很适合这个课程。在这个过程中不会去写代码。
下图为最终成果展示图:
会有一个 dashbute 去监控各方面的指标,第一个指标是当前累积的独立用户数,第二是统计各个类目销量排行榜,第三是每小时全网购买量。第四张图是全网累积的独立用户数,1点的值表示0-1点的独立用户数,7点的值表示0-7的独立用户数,所以是一个单调递增的曲线。
接下来理解一下数据源的数据,来自于某宝的用户行为日志,选取了在11月27号当天的一个行为,这个数据里面有这么几个字段,一个是用户 ID,还有商品 ID,还有商品所属的这个内容,以及行为的类型,行为的类型的话有这么几种有 PV, PV 代表点击事件,buy 就代表购买时间,cart 就代表加入购物车的时间,fav 代表收藏事件,还有一个时间戳的字段,就代表这个行为当时发生的这个时间,其实可以把它理解为这个事件的时间,所以我们也会把这个时间戳声明成一个事件时间,然后在上面定义,然后在这个上面去做一些 window 的处理。
接下来进入到演练的环节:演练的实例 demo 的代码已经上传到文档中了,如果感兴趣可以根据文档去做。这个新的 demo 是根据最新的版本的1.11,所以会运用到1.11的新的特性,如果需要进行演练的话,就只需要在本机安装好一个新的 docker 即可,整个演练过程不需要下载新的架包等等,只需要用到 docker。
首先创建一个新的目录:
flink sql demo,接下来把docker compose的下载文件下载下来就好。下载好之后,打开文件查看一下,先来查看一下数据源的镜像,这里控制数据源产生的速度,前面把2000改成了3000,因为演示环节希望产生速度能快一些,输入命令将 docker compos e中的容器都启动起来。
Jobmanager 和 taskmanager 是 flink 的集群。Kibana 是用来做可视化的处理,也就是刚看到的效果图,然后 keyboard 是 kafka 的一大项目,
这是一个蓝色的一个存放物化视图的一个结果,我们的 query 的结果还有一个Mexico,Mexico 里面我们会放一些维度的数据,比如说我们的那个内部的一些信息,然后 kafka 就是我们的数据源,主要是在 flink sql client。它主要是用来对用擦除文本的一个作业提交的一个客户端的一个界面。然后最后是一个数据生成器,然后它里面其实就包了那个用户的一些行为认知的这个日志文件,然后以每隔多少的一个速率,往这个 kafka 里面吐数据。建立起来之后就会往 kafka 里面源源不断的吐数据。
Creating flink-sql-demo_jobmanager_1 ... done
Creating flink-sql-demo_kibana_1 ... done
Creating flink-sql-demo_zookeeper_1 ... done
Creating flink-sql-demo_elasticsearch_1 ... done
Creating flink-sql-demo_mysql_1 ... done
Creating flink-sql-demo_taskmanager_1 ... done
Creating flink-sql-demo_kafka_1 ... done
Creating flink-sql-demo_sql-client_1 ... done
Creating flink-sql-demo_datagen1 ... done
可以用 docker compose 的命令看一下最新的数据是什么样的,docker compose 第一次启动较慢。
结果展示如下:
user_id":"952483" item_id";"310884" "category_id":"4580532"、"behavior" ; "ts":"2017-11-27 00:00:00"7
user_id":"794777” item_id":"5119439" “category_id":"982926" "behavior": "pv",
"pv”,"ts":"2017-11-27 00:00:00"7
user_id":"875914"
user_id":"980877" item_id";"4484065" "category_id":"1320293” "Behavior": pv" "ts":"2017-11-27 00:00:00"]
item_id":"5097906" "category_id":"149192" , "behavior": "ts":"2017-11-27 00:00:00"
user_id":"944074" item_id":"2348702", category_id":“3002561", “behavior" :“Py”
pv" "ts":“2017-11-2700:00:00"
user_id":"973127” item_id":"1132597" “category_id":“4181361", "behavior" : pv" "ts":“2017-11-27 00:00:00"1
user_id":"84681",
user_id":"732136" “item_id":"3505100" category_id":“2465336", "behavior". 'ts":"2017-11-27 00:00:00"7
"item_id":"3815446' “category_id":“2342116" "behavior":‘pv”、
pv" "ts":"2017-11-2700:00:00"1
user_id":"940143", item_id";"2157435” “category_id":“1013319", "behavior"; "pv" "ts":"2017-11-27 00:00:00"
30N 59
看到了当中的一些数据,它有 user ID 有商品 ID有类目 ID 有这用户的行为,还有代表这个行为当时发生的这个时间,在这个时间呢就是从2017年11月27号的0点开始的一个到24点的一个行为的日志。
接下来启动 sql client 启动起来,当看到有一只大松鼠的时候,即代表启动成功了。
就可以在 sql client 的命令,第一步是用 ddl 创建数据源,会用到 create table 的 ddl 里面的表,里面有五个字段。还有一个计算列,计算列是用了一个time的逆函数。代表的是系统时间的计算列,还有 water mark 定义了一个用户时间。
这个事件时间我们是定义在这个帖子上,也就是这个用户数据里面携带的这个时间,这个策略也就是生成的一个函数,就是根据 s 减5秒作为这个值,它代表的含义就是容忍五秒的数据的乱序。
做一些 window 的处理啊等等,里面跟的都是一些如何连接到外部系统的一些属性,比如说我们这里定义了 connec t 就会用这个连接器去连接外部的这个 kafka,图片名字叫 behavior,地址是9194端口,他是一个 jason 格式。
CREATE TABLE user_behavior(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3).
proctime AS PROCTIME(), -- generates processing-time attribute using computed columm
WATERMARK FOR ts AS ts - INTERVAL '5'SECOND -- defines watermark on ts column, marks ts as ever
)WITH(
'connector' 'kafka' -using kafka connector
'topic'='user_behavior', ——kafka topic
"scan.startup.mode ‘earliest-offset reading from the beginning
"properties.bootstrap.servers'='kafka:9094' kafka broker address
'format' 'son' -- the data format is json
然后把ddl放到 sql client 去运行,就把目录创建成功了。关于时间属性和 ddl 可以参考文档去了解。有需要的话也可以看一下已经注册好的表,进去以后,有几个字段。
在 CRI 里面快速看一眼 kafka 的数据,提交了一个作业到集群里面去,运行的结果会出现在集群中。可以看到就是刚才看到的一些值。
还可以看到这个里面事件的这个时间是11月27号,也就是系统的时间,你可以发现当前时间与其不一致,因为这个显示的是这个 UC 的时间,所以说跟现在是有这个八个小时差的。
接下里进到8081的端口,就是刚刚 docker client 的集群的页面。
我们刚刚提交的这个命令如果取消掉,那么在此页面可以看到它自动的就会被 cancel掉,然后可以看到这个刚刚执行过的这个效果,呃,我这边我们可以看到我们有十个slots,然后1个 task manager。然后1个 jobmanager,也可以通过 ui 的方式提交架包的作业。
Demo 演示
接下来就通过三个实战去画画一些图标来深入理解功能,首先第一个就是要去统计每小时的一个成交量,首先我们会用 ddl 去创建一个 elastic search 的目标,定义一个每小时的一个成交量的表,然后他的社区中的一个索引名,把它叫做 buy counter per hour。Host 定义地址,connecter 是用 elastic search 的集群,需要注意的是不需要在 elastic search 社区事先创建这个处理的集群,ddl 会自动创建好。
然后会在 querry 去做一个每小时的成交量,其次需要做一个一小时的划窗,每小时需要用 tample 的这个 windows 的语法,然后tample 的第一个字段是定义一个时间的属性,就是刚才介绍的这个试镜时间。一个时间属性的字段可以在 ts 上做一个tample 的 windows 的计算。开创的大小是一小时,每小时会画一个窗口,去统计数据的成交量,所以在做窗口之前会过滤出失败的这个事件,会进入到这个窗口里面。
这个窗口的计算函数不断进行创新,然后用这个函数去提取出这个窗口的这个小时是第几个小时,根据 insert into 把 query 的这个运行结果实时的写入到 buy count 这个表里面去,也就是刚刚定义的 elastic search 社区。关于更多的窗口聚合的这个内容,也可以去访问这个文档,开放的文档利于我们更好的去理解更多的信息。
使用 DDL 创建 Elasticsearch 表
我们先在 SQLCLI 中创建一个ES 结果表,根据场景需求主要需要保存两个数据:小时、成交量。
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
)WITH(
'connector' 'elasticsearch-7'.-- using elasticsearch connector
"hosts'=‘http://elasticsearch:9200', — elasticsearch address
'index'm‘buy_cnt_per_hour' - elasticsearch index name, similar to database table name);
我们不需要在 Elasticsearch 中事先创建 buy_cnt_per_hour 索引,FlinkJob 会自动创建该索引。
提交 Query,统计每小时的成交量就是每小时共有多少"buy”的用户行为。因此会需要用到 TUMBLE 窗口函数,按照一小时切窗。然后每个窗口分别统计"buy"的个数,这可以通过先过滤出"buy"的数据,然后 COUNT(*) 实现。
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(tS, INTERVAL '1' HOUR)), COUNT(K) ★
FROM user_behavior
WHERE behavior = "buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
这里我们使用 HOUR 内置函数,从一个 TIMESTAMP 列中提取出一天中第几个小时的值。使用了 INSERT INTO 将 query 的结果持续不断地插入到上文定义的 es 结果表中(可以将 es 结果表理解成 query 的物化视图)。另外可以阅读该文档了解更多关于窗口聚合的内容:
https://ci.apache.org/projects/flink/flink-docs-release-
1.11/dev/table/sql/queries.htmlt#group-windows
提交到 query 之后,返回用户 id之后,就说明成功提交了。进入到页面,sql会发现有俩个节点,一个是 source 的节点,还有一个是 window 聚合的节点。然后就可以去可视化一下结果,刚刚通过 docker compose 也启动了 keeper 的容器,然后可以在5601的端口,可以访问到 ui 的界面,刚进来的时候里面是空的,什么都没有。所以我们需要 create index pattern,在齿轮处,进行操作,去选择索引名。
当我们的 query 提交以后,刚刚提交以后,无法它会自动的创建索引,这八个索引名把这个create index pattern 创建起来。创建以后,在 keeper 当中做一些 discovery进行可视化。在里面可以看到一些字段,这些字段就是在 ddl 里面的字段的名字然后还有对应的值。在这里要做的把每小时成交量的图给画出来。
首先用 dash bote 里面,创建一个新的 dash bote。命名为实时用户行为分析。然后可以添加一些可视化的图,先来一个文本框,把 dash bote 的功能描述好。
然后开始画一个每小时成交量的图。找一个 item。画一个面积图,选择一个索引,在y轴去购买量的 max,然后表明每小时成交量。在x轴展示的是小时,在字母序取24,然后点一下播放键看一眼这个图,现在因为现在我们只跑到了这个10:00的数据,如果你不断的刷新的话,可能会看到这个数据还在不断的不断的新的数据冒出来,所以我现在看到的就是一个0~4点,然后可以看到凌晨这个阶段的话,它有一个非常大的一个波谷,这个也比较符合我们的这个常识,然后我们把它保存到dash bote 中,命名为全网每小时成交量。
接下来画下一个图,下一个图是统计每十分钟累积的的独立用户数,也是在各种报表啊,或者说一个大屏啊,你经常会遇到一个需求,就是一个累积的一个统计分析,所谓累积就是说每一个时刻的点。它描述是从0点到当前时刻的一个连接的一个统计图,就比如说独立用户数,就是每天的这个0点到当前时刻,它的一个独立用户数的一个累计值,同样的我们就像跟我们先去创建一个目标表,然后用来存这个汇总的数据结果。
那主要会有这么几个资料,一个就是日期加时间,还有一个就是这个 ub 的值,那我们这里特殊的是我们把这个日期加时间证明成了 PK,这个声明的这个 PK 就会把这个PK的这个值当做这个特别的 ID,然后 docker 的 ID 是可以用来做更新的,所以说可以基于这个 ID 来做不断的更新。
统计一天每10分钟累计独立用户数
另一个有意思的可视化是统计一天中每一刻的累计独立用户数(uv),也就是每一刻的uv数都代表从0点到当前时刻为止的总计 uv 数,因此该曲线肯定是单调递增的。
我们仍然先在 SQLCLI 中创建一个 Elasticsearch 表,用于存储结果汇总数据。主要字段有:日期时间和累积 uv 数。我们将日期时间作为Elasticsearch中的document id,便于更新该日期时间的 uv 值。
CREATE TABLE cumulative_uv(
date_str STRING,
time_str STRING,
UV BIGINT,
PRIMARY KEY(date_str,time_str) NOT ENFORCED
)WITH(
'connector' 'elasticsearch-7',
"hosts' 'http://elasticsearch:9200°
'index' 'cumulative_uv'
);
为了实现该曲线,我们可以先通过 OVER WINDOW 计算出每条数据的当前分钟,以及当前累计 uv(从0点开始到当前行为止的独立用户数)。uv的统计我们通过内置的 COUNT(DISTINCT user_id)来完成,Flink SQL 内部对 COUNTDISTINCT 做了非常多的优化,因此可以放心使用。
为了实现该由线,我们先抽取出日期和时间字段,我们使用 DATE_FORMAT 抽取出基本的日期与时间,再用 SUBSTR 和字符串连接函数11将时间修正到10分钟级别,如:12:10,12:20。其次,我们在外层查询上基于日期分组,求当前最大的时间,和UV,写入到Elasticsearch的索引中。UV 的统计我们通过内置的 COUNT(DISTINCT user_id)来完成,Flink SQL 内部对 COUNTDISTINCT 做了非常多的优化,因此可以放心使用。
为了实现曲线,query 由俩层来组成。内存的 query,主要就是把日期和时间给选出来。日期就需要用到 formate,这里比较特殊的是:我们的需求是做一个每十分钟的点,就是不需要每秒钟或者每分钟的一个点,这样对于社群来讲压力比较大。而且也不需要这么细的精度,做到10分钟的级别即可。所以这里用一个字符穿的函数,先取出这一部分的数据,在分钟级的数据进行填零补充。把它变成12点10分的字符串。根据日期进行分组,当前最大的时间,用 flink sql 去算 uv,在 flink 里面,可以放心使用。
这里我们是使用 date str 和 time str 去作为组件插入到社区中做更新。但是在这里是用日期做分组,时间取得是 max。更主要的原因是在算一个累积的uv,在query只会不断刷新最大的 uv 值,如果要把当前最大的uv中更新到社区中的话,就是去更新的最大的时间对应的 uv 值。提交这个查询以后,也可以看一下query。
具体代码如下:
INSERT INTOcumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as u FROM(
SELECT
DATE_FORHAT(ts,‘yyyy-HM-dd') as date str,
SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) '0'as time_str, user_id
FROMuser_behavior)
GROUP BY date_str;
在 sql client 里面提交查询之后,也可以去看一下 query,会发现有俩个节点,一个节点会源源不断的读取kafka的数据,第二个节点是 group aggregate,这个 group aggregate 跟 window 的 group bydate 的区别是只有普通字段,没有 window 的函数字段。运行时不会等到 window 结束,而是在每条数据来了以后刷新结果到数据库,会有一个不断地更新流进行刷新。
把刚刚的 index 给加进来,去做可视化处理
回到 dashbote,选择添加新的视图,现在是要做一个累积的 uv,选择的是连线图,y 轴统计的是累积访问独立用户数。取得是 uv 的值。每个点只有一个独立的 uv 值,所以取 max。在 x 轴是每十分钟的点,
如下图所示:
可以看到独立用户数的曲线是一条单调递增的曲线。在凌晨阶段,斜率比较平缓,到白天的时候斜率会比较陡。把图加到 dash bote 上,命名为累积独立访问用户数。
如下图:
第三张图是类目排行榜,想要了解类目。因为在商品中类目分的太细了,大概有5000多类目。对应的是很细的目 id,所以对于排行榜意义不大。所以更需要归结到顶级的类目中。所以在开始前准备了 Mexico 的容器,在 Mexico 的容器存储的是俩个数据。一个是 Mexico 的 id,以及 category name 的名字。维表在查询的时候,在flink的算字里面,可以起一个预定的缓存。缓存数据存储的压力。可以先进到Mexico的数据,进到 Mexico 的 demo 里面 Mexico 里面有一个 category 的一个表,里面的数据有一个子类目的 id,映射到的是副类目的名字,目的是把类目名字取出来,进行一个排名。
INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id)as uv
FRON(
SELECT
DATE_FORMAT(ts, ‘yyyy-HH-dd') as date_str,
SUBSTR(DATE_FORMAT(ts, ‘HH:mm'),1,4) 0'as time_str,
user id
FROM user_sehavior)
GROUP BY date_str;
首先把刚讨论的 Mexico 的 ddl,在 sql 的 client 里面运行。把类目也进行 pk,进行排名。
我们需要不断的把类目成交量不断更新。然后在 query,会运用到 query 的语法去做一个视图。视图的话其实是不会做一个 query,目的是因为简化写法,如果把俩个query会比较复杂,所以需要进行简化。里面的更多语法可以去参考相关文档进行参考。在第二层的 query 里面,会根据选出来的类目名字,过滤出成交的数据,统计出类目名字的成交量。
在 SQLCLI 中创建 MySQL 表,后续用作维表查询。
CREATE TABLE category_dim
sub category id BIGINT,
parent_category_name STRING
WITH(
'connector" 'jdbe'.
"url'='jdbc:mysql://mysql:3306/flink
"table-name’='category
"username' "root'
"password'=‘123456
"lookup.cache.max-rows '520.
"lookup.cache.ttl' '10min'
同时我们再创建一个 Elasticsearch 表,用于存储类目统计结果。
CREATE TABLE top_category (
category_name STRING PRIMARY KEY NOT ENFORCED,
buy_cnt BIGINT
)WITH(
"connector' 'elasticsearch-7'.
"hosts" 'http://elasticsearch:9200'
"index" 'top_category'
);
把 insert into 直接运用到 count 的统计,提交上去先进到 flink ui 里面,可以看到有俩个节点:
去做类目和成交量的统计分析,也是需要建立起索引。回到 dash bote,就可以去添加类目的可视化的图。在 y 轴就是去统计类目成交量,x 轴用类目的名字,排序是根据成交量来拍,按照倒序。可以看到衣服和鞋子占了所有类目的一半以上,比其他的类目要多一半的成交量。
将其保存到 dash bate上面,然后最后可以加一个目标的可视化的图,去定义一个想达到的一个 uv 数的目标,画一个目标图。
用 max uv 用户数,把间隔可以划大一些,独立用户数的目标是达到100w,今天是来了58w 的独立用户数。加到 dash bote 上,进行拖拽修改画的更好看一些:
演示到此结束。