Flink SQL 核心概念剖析与编程案例实战

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式

本次,我们从 0 开始逐步剖析 Flink SQL 的来龙去脉以及核心概念,并附带完整的示例程序,希望对大家有帮助!

本文大纲

一、快速体验 Flink SQL

为了快速搭建环境体验 Flink SQL,我们使用 Docker 来安装一些基础组件,包括 zk 和 kafka,如果你有这个环境,可以略过了。

在 Centos 7 上安装 Docker 环境,具体见这个链接,此处就不细说了:https://blog.csdn.net/qq_24434251/article/details/105712044

1、拉取安装并执行 zookeeper 镜像

docker pull debezium/zookeeper
docker run -d -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

2、拉取安装并执行 kafka 镜像

docker pull debezium/kafka
docker run -d -it --rm --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.56.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --link zookeeper:zookeeper debezium/kafka

3、进入 kafka 容器内的命令行

docker exec -it kafka /bin/bash

4、创建一个 topic

/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.10:2181 --topic user_log --partitions 1 --replication-factor 1

5、在 IDEA 中启动程序

这里不贴代码太长了,具体程序可以参见我的 github:https://github.com/nicekk/Flink-Practice

6、写入数据

/kafka/bin/kafka-console-producer.sh --broker-list  192.168.56.10:9092 --topic user_log

数据样例:

{"user_id":123,"item_id":345,"ts":"2021-01-05 23:04:00"}
{"user_id":345,"item_id":345,"ts":"2021-01-05 23:04:00"}

7、结果输出:

二、数据类型系统

继续说明 Flink SQL 使用之前,我们还需要谈一谈 Flink 的数据类型系统。

Flink 作为一款高性能的计算框架,必然绕不开分布式计算、数据传输和持久化这些问题。

在数据传输过程中,要对数据进行序列化和反序列化:序列化就是将一个内存对象转换成二进制串,形成网络传输或者持久化的数据流;反序列化将二进制串转换为内存对象,这样就可以直接在编程语言中读写这个对象。

Flink 是运行在 JVM 上的,计算过程中会有大量的数据存储在内存中,这就会面临一些问题,如 Java 对象存储密度较低等。

针对这些问题,最常用的方案就是自己实现一个显示的内存管理,用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。

所以,Flink 对数据类型推断越准确,越能更早的完成数据类型检查,帮助 Flink 更好的规划内存,节省存储空间。

比如下面这个类,Tuple3 <Integer,Double,Person> ,包含三种数据类型。

其中 Person 包含两个字段,分别是 id 和 name。

如图,int 占四个字节,通过 IntSerializer 序列化操作之后,给它分配 4 个字节就行了。对象之间可以紧凑的在一起存储,不像 Java 的序列化会有更多的存储损耗。

(数据类型系统,是 Flink 一个非常大的领域,我们会单开一篇文章来详细说明,此处只想说明一下数据类型的重要作用)

三、在无界数据流上怎么执行 SQL

在有界的数据集上执行 SQL ,相信大家每天都深有体会,每天都会做。有界的数据集是静止的,离线模式下,SQL 可以访问完整的数据集,查询产生结果后就终止了。

而数据流是无限的,意味着程序需要一直运行,等待数据进入并进行处理,这样的一种模式如何和 SQL 关联起来呢?

这里我们要引入两个概念:动态表(Dynamic Table)和持续查询(Continuous Queries )。

(1)动态表

如果想用 SQL 去分析一个数据流,那第一件事就是要把流转换成表。

如下图,左边是一个点击的事件流,有姓名,事件时间,点击的 url 信息。右边是一张表,也有这三个字段。

从左边的流到右边的表,是一个逻辑上的映射过程,并没有将数据持久化。

随着左边流事件源源不断的到来,右边的表的记录也会一直追加更新。

这样一直变化的表,就称为「动态表」。

(2)连续查询

对于动态表的查询就被称为是连续查询。

如下图,将下面的 SQL 作用在动态表上,就产生了一个持续查询,因为这个查询一直不会终止掉,并且每个事件到来时,都会产生一次查询。

查询的结果,会生成一个新的动态表。

select 
  user,
  count(url) as cnt
  from clicks
 group by user;

Mary,./home)这条数据到来,产生查询的结果:【Mary,1】

(Bob,./cart) 这条数据到来,会在动态表上追加一条 Bob 的记录,最终的结果为:【Mary,1】【Bob,1】

(Mary,./prod?id=1) 这条数据到来,会更新动态表的 Mary 的记录,最终结果为:【Mary,2】【Bob,1】

(Liz,./home) 这条数据到来,会在动态表上追加一个记录,最终结果为:【Mary,2】【Bob,1】【Liz,1】

这样的话,我们就可以使用 SQL 在动态表上连续查询,产生新的动态表。(实际上,在上一篇中,我们已经知道,SQL 最终是会变成程序执行的)。

(3)查询限制

由于流是无限的,我们不得不思考一个问题,那就是所有的查询语句都能在流上执行吗?

答案是否定的,主要是两点原因,一是维护的状态比较大,二是计算更新的成本高。

由于连续查询会一直运行,为了更新之前产生的结果,需要维护所有的输出行,这样的话,内存中存储的数据会越来越大。

然后有时候,即使只来了一条记录,也需要重新计算和更新之前大部分的结果行,这样的查询也不适合作为连续查询。

比如下面的 SQL,求排名,每次来数据之后,都需要计算大量数据的排名:

SELECT user, RANK() OVER (ORDER BY lastLogin)
  FROM ( 
          SELECT user, 
                 MAX(cTime) AS lastAction 
            FROM clicks GROUP BY user
  );

(4)结果输出

最后一个问题,Flink 是一个计算引擎,自身不存储数据,那么它是如何表示更新数据并更新到外部存储?这里我们举两个例子来说明

1、目标表是控制台

我们可以回到上面的那个例子,例子中,由于目标是控制台,可以任意打印结果。

-- 源表,连接 kafka,从最新的地方开始消费
CREATE TABLE user_log (
  user_id bigint,
  item_id bigint,
  ts TIMESTAMP
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_log',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = '192.168.56.10:9092',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false'
)
-- 目标表是控制台,直接打印
CREATE TABLE user_log_result(
  user_id bigint,
  cnt bigint
) WITH (
  'connector' = 'print'
)
-- 查询的 SQL,一个简单的 group by ,统计源表的 user_id 数量,写到目标表
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id

当我们第一次输入一条数据时: {"user_id":123,"item_id":345,"ts":"2021-01-05 23:04:00"} 控制台上打印:

3> +I(123,1)

当我们再次输入一条数据时:{"user_id":123,"item_id":123,"ts":"2021-01-05 23:04:00"} 控制台上打印了两条数据:

3> -U(123,1)

3> +U(123,2)

+I,-U,+U 表示一行数据的 changelog,+I 表示是新增的数据,-U 表示之前的记录已经被更新,之前的记录要回撤,+U 表示本次更新的数据。

可以看到,输出结果是以对于每行产生 changelog 的形式来表示的。

如果 sink 阶段要使用 DataStream Api,可以把动态表变成流,继续 sink 到下游节点。如果使用 SQL,则直接可以发送到下游。

具体程序见:

2、目标表是 Kafka 的时候

-- 源表,连接 kafka,从最新的地方开始消费
CREATE TABLE user_log (
  user_id bigint,
  item_id bigint,
  ts TIMESTAMP
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_log',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = '192.168.56.10:9092',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false'
)
-- 目标表是 Kafka
CREATE TABLE user_log_result (
  user_id bigint,
  cnt bigint
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_log_result',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = '192.168.56.10:9092',
  'format' = 'json'
)
-- 查询的 SQL,一个简单的 group by ,统计源表的 user_id 数量,写到目标表
insert into user_log_result select user_id,count(1) cnt from user_log group by user_id

此时再运行,直接就报错了,提示信息如下:

Exception in thread "main" org.apache.flink.table.api.TableException: 
Table sink 'default_catalog.default_database.user_log_result' doesn't support consuming update changes 
which is produced by node GroupAggregate(groupBy=[user_id], select=[user_id, COUNT(*) AS cnt])

大意是:这是一个 Group 的聚合,而目标表 user_log_result (kafka)不支持更新的数据。kafka 只能支持一直新增的数据。

如果我们换成下面的 SQL,数据只有新增不会更新,就可以运行了。当然也可以把目标表换成其他可以更新的介质,如 mysql ,hbase 等等。

insert into user_log_result select user_id,count(1) cnt from user_log group by user_id

具体程序见:

四、时间、INTERVAL 与 窗口计算

窗口计算永远是流计算的核心,窗口将无限流切分为有限大小的数据集,可以对这个有限数据集进行计算。

在谈到窗口的时候,总是会情不自禁冒出 N 多的概念,比如:事件时间,处理时间,窗口开始时间,窗口结束时间,滑动窗口,滚动窗口,窗口大小,水印 .......

在最新的 Flink SQL 中,已经可以在 DDL 中定义所有的这一切了,让我们各个击破他们。

1. INTERVAL

Interval 这个东西,并不是 Flink SQL 中特有的,在 ANSI SQL 中就有,下面我们以 Oracle 举例来说明。

首先得有 Oracle 环境,这里我们使用 Docker 来搭建,具体教程见这个链接:

https://blog.csdn.net/qq_24434251/article/details/112341197

INTERVAL 表示一段时间差,直接建表体验一下

create table INTERVAL_TAB
(
    DURATION INTERVAL DAY (2) TO SECOND (5)
)

表示建一个表,字段 duration 表示 天 到 秒,括号的数字表示精度。

insert into interval_tab (duration) values (interval '3 12:32' day(3) to minute );

插入的这条数据表示一段时间:3天12小时32分钟

可能感觉这个没啥用,比如我问你在公司入职几年了,你可以轻松说出来,但是如果我问你在公司入职多少天了,这就很复杂了,中间的闰年,2 月都要考虑,有了这样的表示方法就很方便了。

比如可以很轻易的算出今天之前100天,是哪一天:

select sysdate,sysdate - interval '100' day(3)  as "当前时间-100天" from dual;

有了 INTERVAL ,我们就可以轻松表示窗口的时间长短了。

2. 窗口计算

滚动窗口 - 使用ProcessingTime

-- 源表,user_name 用户名,data 数据
CREATE TABLE user_actions (
    user_name string,
    data string,
    user_action_time as PROCTIME()
   ) WITH (
    'connector' = 'kafka',
    'topic' = 'user_log',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = '192.168.56.10:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false'
)
-- 结果表
CREATE TABLE user_action_result(
  window_start TIMESTAMP(3),
  cnt bigint
) WITH (
  'connector' = 'print'
)
-- 窗口计算
INSERT INTO user_action_result
select * from (
    SELECT TUMBLE_START(user_action_time, INTERVAL '10' SECOND) window_start, COUNT(DISTINCT user_name) cnt
    FROM user_actions
    GROUP BY TUMBLE(user_action_time, INTERVAL '10' SECOND)
)
-- 测试数据
{"user_name":"zhangsan","data":"browse"}
{"user_name":"lisi","data":"browse"}

首先源表上,我们使用了 processing time,加载了字段 user_action_time 上,这并不是我们数据中的字段,而是程序自动给我们加上的,是一个虚拟字段作为时间属性。

然后是查询 SQL, group by 后面的 TUMBLE(user_action_time, INTERVAL '10' SECOND),表示这是一个滚动窗口,使用 user_action_time 作为时间字段,并且窗口大小为 INTERVAL '10' SECOND ,表示 10 s,就是刚刚讲到的 INTERVAL 的语法。

select 中的 TUMBLE_START(user_action_time, INTERVAL '10' SECOND) 是窗口的开始时间,COUNT(DISTINCT user_name) 表示统计每个窗口中的 user_name 去重值。

具体程序见:

滚动窗口 - 使用 EventTime

首先仍然需要在执行环境中声明使用 EventTime:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

修改一下源表的定义

CREATE TABLE user_actions (
    user_name string,
    data string,
    user_action_time TIMESTAMP(3),
    WATERMARK FOR user_action_time as user_action_time - INTERVAL '5' SECOND
   ) WITH (
    'connector' = 'kafka',
    'topic' = 'user_log',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = '192.168.56.10:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false'
)

可以看到,有一个时间字段是 user_action_time,然后 使用 WATERMARK FOR user_action_time as user_action_time - INTERVAL '5' SECOND ,来表示把 user_action_time 作为时间字段,并且声明一个 5s 延迟的 watermark。只用一句 SQL 就定义好了 event_time 和 水位。

具体程序可以去我的 github 上下载:

https://github.com/nicekk/Flink-Practice

五、总结

看完本文相信你已经对 Flink SQL 有了初步的认识,再打开 IDEA,亲自动手操作一遍就会有更加深刻的认识,这也就达到了本文的目的了。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
351 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
5月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
175 11
|
7月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
219 5
|
9月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1655 27
|
10月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1124 2
探索Flink动态CEP:杭州银行的实战案例
|
10月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1029 27
|
10月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
708 14
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
373 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版