开发者学堂课程【开源 Flink 极客训练营:Flink SQL _ Table 介绍与实战】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/760/detail/13341
Flink SQL _ Table 介绍与实战
提交Query
统计每小时的成交量就是每小时共有多少""buy”的用户行为。因此会需要用到TUMBLE窗口函数,按照一小时切窗。然后每个窗口分别统计“buy”的个数,这可以通过先过滤出""buy"的数据,然后COUNT(*)实现。
INSERT INTO buy_cnt_per_hour
SELECT HOUR (TUMBLE_START(ts,INTERVAL '1'HOUR)),COUNT(*)
FROM user_behavior
HERE behavior = " buy"
GROUP BY TUMBLE(ts, INTERVAL"1"HOUR);
ts事件时间,时间属性字段,开窗大小一小时
使用HOUR内置函数,从一个TIMESTAMP列中提取出一天中第几个小时的值。使用了INSERT TNTO将query的结果持续不断地插入到上文定义的es结果表中(可以将es结果表理解成query 的物化视图)。另外可以阅读该文档了解更多关于窗口聚合的内容:https://ci.apache.org/projects/flink/flink-docs-release-
1.11/dev/table/sql/queries.htmIifgroup-windows
在Flink SQL CLI中运行上述查询后,在Flink Web UI中就能看到提交的任务,该任务是一个流式任务,因此会一直运行。
成功提交,查看运行作业,有两个节点
使用Kibana可视化结果
通过Docker Compose启动了Kibana容器,可以通过http://ocalhost:5601访问Kibana.
需要先配置一个index pattern.点击左侧工具栏的"Management",就能找到"Index Patterns".点击"Create Index Pattern",
然后通过输入完整的索引名"buy. cnt per hour"创建index pattern.创建完成后,Kibana 就知道索引后,可以开始探索数据了。
先点击左侧工具栏的"Discovery"按钮,Kibang 就会列出刚刚创建的索引中的内容。
5601本地端口访问界面,刚进入是空的,无数据,通过Index Patterns,点击create index pattern
选择索引名 buy_cnt_per_hour,点击next,
创建index pattern完成后,才能在Kibana中去做Discovery,可视化等操作,
Discovery中查看写入的数据,
创建一个Dashboard用来展示各个可视化的视图。点击页面左侧的"Dashboard", 创建一个名为"用户
行为日志分析"的Dashboard.然后点击"Create New"创建一个新的视图,选择"Area" 面积图,选择
" buy_cnt_per_hour"索引,按照如下截图中的配置(左侧)画出成交量面积图,并保存为“每小时成交量"。
点击左侧Dashboard,创建新的Dashboard,点击save,创建名字,
进入edit界面,添加可视化图
文本框,表述Dashboard
做每小时成交量图,在edit中选择Area面积图,选择索引
Y轴是购买量Max,X轴小时,选择hour_of_day
点击播放键
凌晨有波谷,符合常识,保存到Dashboard中
统计一天每10分钟 累计独立用户数
另一个可视化是统计-天中每一刻的累计独立用户数(uv) ,即每一刻的uv数都代表从0点到当前时刻为止的总计uv数,因此该曲线肯定是单调递增的。
首先在SQL CLI中创建-个Elasticsearch表,用于存储结果汇总数据。主要字段有:日期时间和累积uv数。将日期时间作为Elasticsearch中的document id,便于更新该日期时间的uv值。
CREATE TABLE cumulative. _ uv (
date_ str STRING,
time. str STRING,
uv BIGINT,
PRIMARY KEY (date_ str, tine str) NOT ENFORCED
) WITH (
"connector" = "elasticsearch-7",
"hosts" = "http://elasticsearch:9200",
"index" = " cumulative. uv"
);
为了实现该曲线,可以先通过OVER WINDOW计算出每条数据的当前分钟,以及当前累计uv (从0点开始到当前
行为止的独立用户数)。uv 的统计我们通过内置的COUNT(DISTINCT user_ 1d) 来完成,Flink SQL内部对COUNT
DISTINCT做了非常多的优化,因此可以放心使用。
为了实现该曲线,先抽取出日期和时间字段,使用DATE FORMAT抽取出基本的日期与时间,再用SUBSTR 和
字符串连接函数11将时间修正到10分钟级别,如: 12:10. 12:28. 其次,在外层查询上基于日期分组,求当前
最大的时间,和UV,写入到Elasticsearch的索引中。UV的统计通过内置的COUNT(DISTINCT user. id)来完成,Fink SQL内部对COUNT DISTINCT做了非常多的优化,因此可以放心使用。