开发者学堂课程【开源 Flink 极客训练营:Flink SQL _ Table 介绍与实战】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/760/detail/13341
Flink SQL _ Table 介绍与实战
三、EDMO
1.电商用户行为实时分析
从kafka中实时消费用户的行为的数据,与My SQL数据进行关联,写入elasticsearch中,用kibana工具做elasticsearch的可视化,是一个端到端的实现应用构建。
最终展示成果,实时监控当天各方面各种维度的指标。
第一个为当前累计的独立用户数,达到的独立访问用户数。
第二个为类目排行榜,统计当天为止各个类目销量的排行榜
第三个为每小时全网购买个数,每个小时共发生多少次购买,在凌晨购买量少
第四个为全网累计独立用户数,零点到当年时刻累计的独立用户数,单调递增的曲线,一点的值代表零点到一点的独立用户数,七点的值,代表零点到七点的独立用户数。
2.用户行为日志
列名称 |
说明 |
用户ID |
整数类型,序列化后的用户ID |
商品ID |
整数类型,序列化后的商品ID |
商品类目ID |
整数类型,序列化后的商品所属类目ID |
行为类型 |
字符串,枚举类型,包括('pv', 'buy','cart' , ' fav') |
时间戳 |
行为发生的时间戳 |
2017-11-27当天的行为,数据生成器每秒生成2000条数据
数据来源:阿里云天池公开数据集
数据中有用户ID、商品ID、商品类目ID,行为类型、时间戳几个字段,pv代表点击时间,buy代表购买时间,cart代表加入购物车的时间,fav代表收藏时间
时间戳,可理解为事件的时间
四、演练
Flink SQL Demo:构建一个端到端的流式应用
本文将基于Kafka,MySQL, Elasticsearch,Kibana,使用Flink SQL构建一个电商用户行为的实时分析应用。本文所有的实战演练都将在Flink SQL CLl上执行,全程只涉及SQL纯文本,无需一行Java/Scala代码,无需安装IDE。本实战演练的最终效果图;
1.准备
一台装有 Docker 的Linux或 MacOS计算机。
2.使用Docker Compose启动容器
本实战演示所依赖的组件全都编排到了容器中,因此可以通过docker-compose一键启动。你可以通过wget命令自动下载该docker-compose.yml文件,也可以手动下载。
mkdir flink - sql - demo; cd flink - sql- demo;
wget https: //raw.githubusercontent.com/wuchong/flink-sql- demo/v1.11-GN/docker-compose.yml
该Docker Compose中包含的容器有:
- Flink sQL Client:用于提交Flink SQL
- Flink集群:包含一个JobManager和一个TaskManager 用于运行SQL任务。
- DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到Kafka集群中。默认每秒生成2000条数据,能持续生成一个多小时。也可以更改docker-compose.yml 中datagen的 speedup参数来调整生成速率(重启docker compose才能生效)。
- MySQL:集成了MySQL 5.7,以及预先创建好了类目表( category ),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。
- Kafka:主要用作数据源。DataGen组件会自动将数据灌入这个容器中。
- zookeeper: Kafka容器依赖。
- Elasticsearch:主要存储Flink SQL产出的数据。
- Kibana:可视化Elasticsearch中的数据。
在启动容器前,建议修改Docker的配置,将资源调整到4GB以及4核。启动所有的容器,只需要在docker-compose-yml所在目录下运行如下命令。
docker-compose down
进入SQL CLI客户端
运行如下命令进入SQL CLI客户端:
docker-compose exec sql- client ./sql- client.sh
该命令会在容器中启动SQL CLI客户端。你应该能在CLI客户端中看到如下的环境界面。
把datagen的数据源的镜像可以控制产生和速度,把2000改成3000,使产出速度快一些。
输入docker-compose up -d
启动docker中的容器
Jobmanager与taskmanager是Flink的集群
Kibana用来做可视化,数据就是效果图
Zookeeper是kafka 的依赖项
Elasticsearch放物化视图的结果
Mysql放维度的数据
sql - client提交作业,纯文本作业提交的客户端界面
Datagen 数据上传器,包含用户行为日志文件
使用
docker-compose exec kafka bash -c 'kafka=console-consumer. sh --topic user_behavior --bootstrap-serves
命令,查看Kafka最新命令
有各种ID、用户的行为与行为日志,ts代表行为当时发生的时间,时间从2017.11.27零点到二十四点的日志
启动sql - client,出现一个大松鼠
运行sql命令
创建数据源
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTANP(3),
proctime As PROCTIME(),-- generates processing-time attribute using computed column
WATEFMARK FOR ts AS ts - INTERVAL '5' SECOND-- defines watermark on ts column,marks ts as ever WITH(
" connector' = " kafka ' ,-- using kafka connector
"topic" = 'user_behavior" , -- kafka topic
"scan.startup.mode' = 'earliest-offset',-- reading from the beginning
"properties.bootstrap.servers" = "kafka;9094',-- kafka broker address
"forinat' = " json" -- the data format is json
);
如上我们按照数据的格式声明了5个字段,除此之外,我们还通过计算列语法和PROCTINE()内置函数声明了一个产生处理时间的虚拟列。我们还通过WATERMARK语法,在ts字段上声明了watermark 策略(容忍5秒乱序),ts 字段因此也成了事件时间列。关于时间属性以及DDL语法可以阅读官方文档了解更多:
定义计算列,使用proctime 内置函数,watermark定义事件时间,定义在ts中,as后面是watermark的策略,watermark生成的函数,根据ts-5秒代表数据的值,代表的含义是容忍5秒乱序,ts是事件时间,proctime是系统时间,后续基于proctime等做windows处理,
With接的如何连接到外部系统的属性,定义connector是kafka,用kafka连接器连接外部的kafka topic,topic名字是user_behavior,scan的策略从earliest-offset读起,properties.bootstrap.servers地址为kafka;9094端口,forinat 是 json的格式
运行,表创建成功
在 SQL CLI中成功创建Kafka表后,可以通过show tables;和describe user_behavior;来查看目前已注册的表,以及表的详细信息。我们也可以直接在SQL CLI中运行 SELECT * FROMN user_behavio:预览下数据(按q退出)。
在CLI中快速查看kafka中的数据
提交作业到集群中,将作业运行的结果拉回到本地
事件时间2017.11.27,系统时间是当前时间
进入8081端口
自动取消
在completed中看到执行果的job
在overview中看到10个slot,1个task manager
也可以查看日志,可以通过UI方式提交架包作业
通过三个实战画图表,深入理解Flink SQL
统计每小时的成交量
使用DDL创建Elasticsearch表
首先在SQL CLI中创建一个ES结果表,根据场景需求主要需要保存两个数据:小时、成交量。
CREATE TABLE buy_ cnt _per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
)WITH(
"connector' = 'elasticsearch-7',-- using elasticsearch connector
"hosts'= 'http://elasticsearch:9200", -- elasticsearch address
"index' = 'buy_cnt_per_hour"-- elasticsearch index name,similar to database table name
);
buy_cnt_per_hour索引名
不需要在Elasticsearch中事先创建buy_cnt_per_hour索引,Flink Job会自动创建该索引。
提交Query