Flink SQL _ Table 介绍与实战 | 学习笔记(三)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink SQL _ Table 介绍与实战

开发者学堂课程【开源 Flink 极客训练营Flink SQL _ Table 介绍与实战】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/760/detail/13341


Flink SQL _ Table 介绍与实战


提交Query

统计每小时的成交量就是每小时共有多少""buy”的用户行为。因此会需要用到TUMBLE窗口函数,按照一小时切窗。然后每个窗口分别统计“buy”的个数,这可以通过先过滤出""buy"的数据,然后COUNT(*)实现。

INSERT INTO buy_cnt_per_hour

SELECT HOUR (TUMBLE_START(ts,INTERVAL '1'HOUR)),COUNT(*)

FROM user_behavior

HERE behavior = " buy"

GROUP BY TUMBLE(ts, INTERVAL"1"HOUR);

ts事件时间,时间属性字段,开窗大小一小时

使用HOUR内置函数,从一个TIMESTAMP列中提取出一天中第几个小时的值。使用了INSERT TNTO将query的结果持续不断地插入到上文定义的es结果表中(可以将es结果表理解成query 的物化视图)。另外可以阅读该文档了解更多关于窗口聚合的内容:https://ci.apache.org/projects/flink/flink-docs-release-

1.11/dev/table/sql/queries.htmIifgroup-windows

在Flink SQL CLI中运行上述查询后,在Flink Web UI中就能看到提交的任务,该任务是一个流式任务,因此会一直运行。

图片39.png

成功提交,查看运行作业,有两个节点

图片40.png

使用Kibana可视化结果

通过Docker Compose启动了Kibana容器,可以通过http://ocalhost:5601访问Kibana.

需要先配置一个index pattern.点击左侧工具栏的"Management",就能找到"Index Patterns".点击"Create Index Pattern",

图片41.png

然后通过输入完整的索引名"buy. cnt per hour"创建index pattern.创建完成后,Kibana 就知道索引后,可以开始探索数据了。

先点击左侧工具栏的"Discovery"按钮,Kibang 就会列出刚刚创建的索引中的内容。

5601本地端口访问界面,刚进入是空的,无数据,通过Index Patterns,点击create index pattern

选择索引名 buy_cnt_per_hour,点击next,图片42.png

创建index pattern完成后,才能在Kibana中去做Discovery,可视化等操作,

图片43.png

Discovery中查看写入的数据,

图片44.png

创建一个Dashboard用来展示各个可视化的视图。点击页面左侧的"Dashboard", 创建一个名为"用户

行为日志分析"的Dashboard.然后点击"Create New"创建一个新的视图,选择"Area" 面积图,选择

" buy_cnt_per_hour"索引,按照如下截图中的配置(左侧)画出成交量面积图,并保存为“每小时成交量"。

点击左侧Dashboard,创建新的Dashboard,点击save,创建名字,

图片45.png

进入edit界面,添加可视化图

图片46.png

文本框,表述Dashboard

图片47.png

做每小时成交量图,在edit中选择Area面积图,选择索引

Y轴是购买量Max,X轴小时,选择hour_of_day

点击播放键

图片48.png

凌晨有波谷,符合常识,保存到Dashboard中

统计一天每10分钟 累计独立用户数

另一个可视化是统计-天中每一刻的累计独立用户数(uv) ,即每一刻的uv数都代表从0点到当前时刻为止的总计uv数,因此该曲线肯定是单调递增的。

首先在SQL CLI中创建-个Elasticsearch表,用于存储结果汇总数据。主要字段有:日期时间和累积uv数。将日期时间作为Elasticsearch中的document id,便于更新该日期时间的uv值。

CREATE TABLE cumulative. _ uv (

date_ str STRING,

time. str STRING,

uv BIGINT,

PRIMARY KEY (date_ str, tine str) NOT ENFORCED

) WITH (

"connector" = "elasticsearch-7",

"hosts" = "http://elasticsearch:9200",

"index" = " cumulative. uv"

);

为了实现该曲线,可以先通过OVER WINDOW计算出每条数据的当前分钟,以及当前累计uv (从0点开始到当前

行为止的独立用户数)。uv 的统计我们通过内置的COUNT(DISTINCT user_ 1d) 来完成,Flink SQL内部对COUNT

DISTINCT做了非常多的优化,因此可以放心使用。

为了实现该曲线,先抽取出日期和时间字段,使用DATE FORMAT抽取出基本的日期与时间,再用SUBSTR 和

字符串连接函数11将时间修正到10分钟级别,如: 12:10. 12:28. 其次,在外层查询上基于日期分组,求当前

最大的时间,和UV,写入到Elasticsearch的索引中。UV的统计通过内置的COUNT(DISTINCT user. id)来完成,Fink SQL内部对COUNT DISTINCT做了非常多的优化,因此可以放心使用。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
190 15
|
24天前
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
40 3
|
24天前
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
44 0
|
2月前
|
SQL 关系型数据库 MySQL
sql注入原理与实战(三)数据库操作
sql注入原理与实战(三)数据库操作
sql注入原理与实战(三)数据库操作
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
53 0
|
2月前
|
SQL 数据处理 数据库
SQL语句优化与查询结果优化:提升数据库性能的实战技巧
在数据库管理和应用中,SQL语句的编写和查询结果的优化是提升数据库性能的关键环节
|
2月前
|
SQL 监控 关系型数据库
SQL语句性能分析:实战技巧与详细方法
在数据库管理中,分析SQL语句的性能是优化数据库查询、提升系统响应速度的重要步骤
|
2月前
|
SQL 关系型数据库 Serverless
sql注入原理与实战(四)数据表操作
sql注入原理与实战(四)数据表操作
|
2月前
|
SQL 存储 Java
sql注入原理与实战(二)数据库原理
sql注入原理与实战(二)数据库原理
|
2月前
|
SQL 前端开发 安全
sql注入原理与实战(一)
sql注入原理与实战(一)