Flink SQL _ Table 介绍与实战 | 学习笔记(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 快速学习 Flink SQL _ Table 介绍与实战

开发者学堂课程【开源 Flink 极客训练营Flink SQL _ Table 介绍与实战】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/760/detail/13341


Flink SQL _ Table 介绍与实战


三、EDMO

1.电商用户行为实时分析

图片24.png

从kafka中实时消费用户的行为的数据,与My SQL数据进行关联,写入elasticsearch中,用kibana工具做elasticsearch的可视化,是一个端到端的实现应用构建。

图片25.png

最终展示成果,实时监控当天各方面各种维度的指标。

第一个为当前累计的独立用户数,达到的独立访问用户数。

第二个为类目排行榜,统计当天为止各个类目销量的排行榜

第三个为每小时全网购买个数,每个小时共发生多少次购买,在凌晨购买量少

第四个为全网累计独立用户数,零点到当年时刻累计的独立用户数,单调递增的曲线,一点的值代表零点到一点的独立用户数,七点的值,代表零点到七点的独立用户数。

2.用户行为日志

列名称

说明

用户ID

整数类型,序列化后的用户ID

商品ID

整数类型,序列化后的商品ID

商品类目ID

整数类型,序列化后的商品所属类目ID

行为类型

字符串,枚举类型,包括('pv', 'buy','cart' , ' fav')

时间戳

行为发生的时间戳

 

2017-11-27当天的行为,数据生成器每秒生成2000条数据

数据来源:阿里云天池公开数据集

数据中有用户ID、商品ID、商品类目ID,行为类型、时间戳几个字段,pv代表点击时间,buy代表购买时间,cart代表加入购物车的时间,fav代表收藏时间

时间戳,可理解为事件的时间


四、演练

Flink SQL Demo:构建一个端到端的流式应用

本文将基于Kafka,MySQL, Elasticsearch,Kibana,使用Flink SQL构建一个电商用户行为的实时分析应用。本文所有的实战演练都将在Flink SQL CLl上执行,全程只涉及SQL纯文本,无需一行Java/Scala代码,无需安装IDE。本实战演练的最终效果图;

图片26.png

1.准备

一台装有 Docker 的Linux或 MacOS计算机。

2.使用Docker Compose启动容器

本实战演示所依赖的组件全都编排到了容器中,因此可以通过docker-compose一键启动。你可以通过wget命令自动下载该docker-compose.yml文件,也可以手动下载。

mkdir flink - sql - demo; cd flink - sql- demo;

wget https: //raw.githubusercontent.com/wuchong/flink-sql- demo/v1.11-GN/docker-compose.yml

该Docker Compose中包含的容器有:

- Flink sQL Client:用于提交Flink SQL

- Flink集群:包含一个JobManager和一个TaskManager 用于运行SQL任务。

- DataGen:数据生成器。容器启动后会自动开始生成用户行为数据,并发送到Kafka集群中。默认每秒生成2000条数据,能持续生成一个多小时。也可以更改docker-compose.yml 中datagen的 speedup参数来调整生成速率(重启docker compose才能生效)。

- MySQL:集成了MySQL 5.7,以及预先创建好了类目表( category ),预先填入了子类目与顶级类目的映射关系,后续作为维表使用。

- Kafka:主要用作数据源。DataGen组件会自动将数据灌入这个容器中。

- zookeeper: Kafka容器依赖。

- Elasticsearch:主要存储Flink SQL产出的数据。

- Kibana:可视化Elasticsearch中的数据。

在启动容器前,建议修改Docker的配置,将资源调整到4GB以及4核。启动所有的容器,只需要在docker-compose-yml所在目录下运行如下命令。

docker-compose down

进入SQL CLI客户端

运行如下命令进入SQL CLI客户端:

docker-compose exec sql- client ./sql- client.sh

该命令会在容器中启动SQL CLI客户端。你应该能在CLI客户端中看到如下的环境界面。

把datagen的数据源的镜像可以控制产生和速度,把2000改成3000,使产出速度快一些。

图片27.png

输入docker-compose up -d

启动docker中的容器

图片28.png

Jobmanager与taskmanager是Flink的集群

Kibana用来做可视化,数据就是效果图

Zookeeper是kafka 的依赖项

Elasticsearch放物化视图的结果

Mysql放维度的数据

sql - client提交作业,纯文本作业提交的客户端界面

Datagen 数据上传器,包含用户行为日志文件

使用

docker-compose exec kafka bash -c 'kafka=console-consumer. sh --topic user_behavior --bootstrap-serves

命令,查看Kafka最新命令

图片29.png

有各种ID、用户的行为与行为日志,ts代表行为当时发生的时间,时间从2017.11.27零点到二十四点的日志

启动sql - client,出现一个大松鼠

图片30.png

运行sql命令

创建数据源

CREATETABLE user_behavior (

user_id BIGINT,

item_id BIGINT,

category_id BIGINT,

behavior STRING,

ts TIMESTANP(3),

proctime As PROCTIME(),-- generates processing-time attribute using computed column

WATEFMARK FOR ts AS ts - INTERVAL '5' SECOND-- defines watermark on ts column,marks ts as ever WITH(

" connector' = " kafka ' ,-- using kafka connector

"topic" = 'user_behavior" , -- kafka topic

"scan.startup.mode' = 'earliest-offset',-- reading from the beginning

"properties.bootstrap.servers" = "kafka;9094',-- kafka broker address

"forinat' = " json" -- the data format is json

);

如上我们按照数据的格式声明了5个字段,除此之外,我们还通过计算列语法和PROCTINE()内置函数声明了一个产生处理时间的虚拟列。我们还通过WATERMARK语法,在ts字段上声明了watermark 策略(容忍5秒乱序),ts 字段因此也成了事件时间列。关于时间属性以及DDL语法可以阅读官方文档了解更多:

定义计算列,使用proctime 内置函数,watermark定义事件时间,定义在ts中,as后面是watermark的策略,watermark生成的函数,根据ts-5秒代表数据的值,代表的含义是容忍5秒乱序,ts是事件时间,proctime是系统时间,后续基于proctime等做windows处理,

With接的如何连接到外部系统的属性,定义connector是kafka,用kafka连接器连接外部的kafka topic,topic名字是user_behavior,scan的策略从earliest-offset读起,properties.bootstrap.servers地址为kafka;9094端口,forinat 是 json的格式

图片31.png

图片32.png

运行,表创建成功

在 SQL CLI中成功创建Kafka表后,可以通过show tables;和describe user_behavior;来查看目前已注册的表,以及表的详细信息。我们也可以直接在SQL CLI中运行 SELECT * FROMN user_behavio:预览下数据(按q退出)。

在CLI中快速查看kafka中的数据

图片33.png

提交作业到集群中,将作业运行的结果拉回到本地

图片34.png

事件时间2017.11.27,系统时间是当前时间

进入8081端口

图片35.png

自动取消

在completed中看到执行果的job

图片36.png

在overview中看到10个slot,1个task manager

图片37.png

也可以查看日志,可以通过UI方式提交架包作业

通过三个实战画图表,深入理解Flink SQL

统计每小时的成交量

使用DDL创建Elasticsearch表

首先在SQL CLI中创建一个ES结果表,根据场景需求主要需要保存两个数据:小时、成交量。

CREATE TABLE buy_ cnt _per_hour (

hour_of_day BIGINT,

buy_cnt BIGINT

)WITH(

"connector' = 'elasticsearch-7',-- using elasticsearch connector

"hosts'= 'http://elasticsearch:9200", -- elasticsearch address

"index' = 'buy_cnt_per_hour"-- elasticsearch index name,similar to database table name

);

buy_cnt_per_hour索引名

不需要在Elasticsearch中事先创建buy_cnt_per_hour索引,Flink Job会自动创建该索引。

图片38.png

提交Query


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL实战笔记】02.一条SQL更新语句是如何执行的-1
【4月更文挑战第4天】SQL更新语句执行涉及查询和日志模块,主要为`redo log`和`binlog`。`redo log`先写日志再写磁盘,保证`crash-safe`;`binlog`记录逻辑日志,支持所有引擎,且追加写入。执行过程分为执行器查找数据、更新内存和`redo log`(prepare状态)、写入`binlog`、提交事务(`redo log`转commit)。两阶段提交确保日志逻辑一致,支持数据库恢复至任意时间点。
20 0
|
1天前
|
SQL 数据库
数据库SQL语言实战(六)
本次实战的重点就在于对表格本身的一些处理,包括复制表格、修改表格结构、修改表格数据
|
1天前
|
SQL Oracle 关系型数据库
数据库SQL语言实战(五)(数据库系统概念第三章练习题)
本文的SQL语言适用的是Oracle数据库与mySQL可能存在略微不同
|
1天前
|
SQL Oracle 关系型数据库
数据库SQL语言实战(四)(数据库系统概念第三章练习题)
本文的SQL语言适用的是Oracle数据库与mySQL可能存在略微不同
数据库SQL语言实战(四)(数据库系统概念第三章练习题)
|
1天前
|
SQL Oracle 关系型数据库
数据库SQL语言实战(三)
本篇文章重点在于SQL中的各种删除操作
|
22天前
|
SQL 自然语言处理 数据库
NL2SQL实践系列(2):2024最新模型实战效果(Chat2DB-GLM、书生·浦语2、InternLM2-SQL等)以及工业级案例教学
NL2SQL实践系列(2):2024最新模型实战效果(Chat2DB-GLM、书生·浦语2、InternLM2-SQL等)以及工业级案例教学
NL2SQL实践系列(2):2024最新模型实战效果(Chat2DB-GLM、书生·浦语2、InternLM2-SQL等)以及工业级案例教学
|
26天前
|
SQL 数据库
数据库SQL语言实战(一)
数据库SQL语言实战(一)
|
26天前
|
SQL 数据库
数据库SQL语言实战(二)
数据库SQL语言实战(二)