Apache Flink SQL概览

简介: 本篇核心目标是让大家概要了解一个完整的Apache Flink SQL Job的组成部分,以及Apache Flink SQL所提供的核心算子的语义,最后会应用Tumble Window编写一个End-to-End的页面访问的统计示例。

本篇核心目标是让大家概要了解一个完整的Apache Flink SQL Job的组成部分,以及Apache Flink SQL所提供的核心算子的语义,最后会应用Tumble Window编写一个End-to-End的页面访问的统计示例。

Apache Flink SQL Job的组成

我们做任何数据计算都离不开读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于Apache Flink SQL编写的计算Job也离不开这个三部分,如下所所示:

如上所示,一个完整的Apache Flink SQL Job 由如下三部分:

  • Source Operator - Soruce operator是对外部数据源的抽象, 目前Apache Flink内置了很多常用的数据源实现,比如上图提到的Kafka。

  • Query Operators - 查询算子主要完成如图的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多数传统数据库支持的操作。

  • Sink Operator - Sink operator 是对外结果表的抽象,目前Apache Flink也内置了很多常用的结果表的抽象,比如上图提到的Kafka。

Apache Flink SQL 核心算子

SQL是Structured Quevy Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从Early History of SQL中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Language](结构化英语查询语言),旨在操纵和检索存储在IBM原始准关系数据库管理系统System R中的数据。直到1986年,ANSI和ISO标准组正式采用了标准的"数据库语言SQL"语言定义。Apache Flink SQL 核心算子的语义设计也参考了19922011等ANSI-SQL标准。接下来我们将简单为大家介绍Apache Flink SQL 每一个算子的语义。

SELECT

SELECT 用于从数据集/流中选择数据,语法遵循ANSI-SQL标准,语义是关系代数中的投影(Projection),对关系进行垂直分割,消去某些列。

一个使用Select的语句如下:

SELECT ColA, ColC FROME tab ;

WHERE

WHERE 用于从数据集/流中过滤数据,与SELECT一起使用,语法遵循ANSI-SQL标准,语义是关系代数的Selection,根据某些条件对关系做水平分割,即选择符合条件的记录,如下所示:

对应的SQL语句如下:

SELECT * FROM tab WHERE ColA <> 'a2' ;

GROUP BY

GROUP BY 是对数据进行分组的操作,比如我需要分别计算一下一个学生表里面女生和男生的人数分别是多少,如下:

对应的SQL语句如下:

SELECT sex, COUNT(name) AS count FROM tab GROUP BY sex ;

UNION ALL

UNION ALL 将两个表合并起来,要求两个表的字段完全一致,包括字段类型、字段顺序,语义对应关系代数的Union,只是关系代数是Set集合操作,会有去重复操作,UNION ALL 不进行去重,如下所示:

对应的SQL语句如下:

SELECT * FROM T1 UNION ALL SELECT * FROM T2

UNION

UNION 将两个流给合并起来,要求两个流的字段完全一致,包括字段类型、字段顺序,并其UNION 不同于UNION ALL,UNION会对结果数据去重,与关系代数的Union语义一致,如下:

对应的SQL语句如下:

SELECT * FROM T1 UNION SELECT * FROM T2

JOIN

JOIN 用于把来自两个表的行联合起来形成一个宽表,Apache Flink支持的JOIN类型:

  • JOIN - INNER JOIN

  • LEFT JOIN - LEFT OUTER JOIN

  • RIGHT JOIN - RIGHT OUTER JOIN

  • FULL JOIN - FULL OUTER JOIN

JOIN与关系代数的Join语义相同,具体如下:

对应的SQL语句如下(INNER JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI JOIN T2 ON T1.ColC = T2.ColC ;

对应的SQL语句如下(LEFT JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;

说明:

  • 细心的读者可能发现上面T2.ColC是添加了前缀T2了,这里需要说明一下,当两张表有字段名字一样的时候,我需要指定是从那个表里面投影的。

  • RIGHT JOIN相当于LEFT JOIN左右两个表交互一下位置。FULL JOIN相当于RIGHT JOIN和LEFT JOIN之后进行UNION ALL操作。

Window

在Apache Flink中有2种类型的Window,一种是OverWindow,即传统数据库的标准开窗,每一个元素都对应一个窗口。一种是GroupWindow,目前在SQL中GroupWindow都是基于时间进行窗口划分的。

OverWindow

OVER Window 目前支持由如下三个元素组合的8中类型:

  • 时间 - Processing Time 和 EventTime

  • 数据集 - Bounded 和 UnBounded

  • 划分方式 - ROWS 和 RANGE 我们以的Bounded ROWS 和 Bounded RANGE 两种常用类型,想大家介绍Over Window的语义

Bounded ROWS Over Window

Bounded ROWS OVER Window 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。

语法

SELECT

FROM Tab1

-   value_expression - 进行分区的字表达式;

-   timeCol - 用于元素排序的时间字段;

-   rowCount - 是定义根据当前行开始向前追溯几行元素;

语义

我们以3个元素(2 PRECEDING)的窗口为例,如下图:

上图所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,虽然有元素都是同一时刻到达,但是他们仍然是在不同的窗口,这一点有别于RANGE OVER Window.

Bounded RANGE Over Window

Bounded RANGE OVER Window 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口;

语法

Bounded RANGE OVER Window的语法如下:

SELECT

agg1(col1) OVER(

[PARTITION BY (value_expression1,..., value_expressionN)]

ORDER BY timeCol

RANGE

BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,

...
FROM Tab1
  • value_expression - 进行分区的字表达式;

  • timeCol - 用于元素排序的时间字段;

  • timeInterval - 是定义根据当前行开始向前追溯指定时间的元素行;

语义

我们以3秒中数据(INTERVAL '2' SECOND)的窗口为例,如下图:

注意: 上图所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一时刻到达,他们是在同一个窗口,这一点有别于ROWS OVER Window.

GroupWindow

根据窗口数据划分的不同,目前Apache Flink有如下3种Bounded Winodw:

  • Tumble - 滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

  • Hop - 滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

  • Session - 会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加;

说明: Aapche Flink 还支持UnBounded的 Group Window,也就是全局Window,流上所有数据都在一个窗口里面,语义非常简单,这里不做详细介绍了。

GroupWindow的语法如下:

SELECT

[gk],

agg1(col1),

...

aggN(colN)

FROM Tab1

GROUP BY [WINDOW(definition)], [gk]
  • [WINDOW(definition)] - 在具体窗口语义介绍中介绍。

Tumble Window

Tumble 滚动窗口有固定size,窗口数据不重叠,具体语义如下:

假设我们要写一个2分钟大小的Tumble,示例SQL如下:

SELECT gk, COUNT(*) AS pv

FROM tab

GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE), gk

Hop Window

Hop 滑动窗口和滚动窗口类似,窗口有固定的size,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的新建频率。因此当slide值小于窗口size的值的时候多个滑动窗口会重叠,具体语义如下:

假设我们要写一个每5分钟统计近10分钟的页面访问量(PV).

SELECT gk, COUNT(*) AS pv

FROM tab

GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), gk

Session Window

Session 会话窗口 是没有固定大小的窗口,通过session的活跃度分组元素。不同于滚动窗口和滑动窗口,会话窗口不重叠,也没有固定的起止时间。一个会话窗口在一段时间内没有接收到元素时,即当出现非活跃间隙时关闭。一个会话窗口 分配器通过配置session gap来指定非活跃周期的时长,具体语义如下:

假设我们要写一个统计连续的两个访问用户之间的访问时间间隔不超过3分钟的的页面访问量(PV).

SELECT gk, COUNT(*) AS pv

FROM pageAccessSession_tab

GROUP BY SESSION(rowtime, INTERVAL '3' MINUTE), gk

说明:
很多场景用户需要获得Window的开始和结束时间,上面的GroupWindow的SQL示例中没有体现,那么窗口的开始和结束时间应该怎样获取呢? Apache Flink 我们提供了如下辅助函数:

  • TUMBLE_START/TUMBLE_END

  • HOP_START/HOP_END

  • SESSION_START/SESSION_END

这些辅助函数如何使用,请参考如下完整示例的使用方式。

完整的 SQL Job 案例

上面我们介绍了Apache Flink SQL核心算子的语法及语义,这部分将选取Bounded EventTime Tumble Window为例为大家编写一个完整的包括Source和Sink定义的Apache Flink SQL Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户ID和访问时间。我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV). 具体数据如下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00

Source 定义

自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment的addSource方法获取DataStream, 所以我们需要自定义一个SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。出于代码篇幅问题,我们如下只介绍核心部分,完整代码 请查看: EventTimeTumbleWindowDemo.scala

Source Function定义

支持接收携带EventTime的数据集合,Either的数据结构Right是WaterMark,Left是元数据:

class MySourceFunction[T](dataList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {

override def run(ctx: SourceContext[T]): Unit = {

dataList.foreach {

case Left(t) => ctx.collectWithTimestamp(t._2, t._1)

case Right(w) => ctx.emitWatermark(new Watermark(w)) // emit watermark

}

}

}

定义 StreamTableSource

我们自定义的Source要携带我们测试的数据,以及对应的WaterMark数据,具体如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

// 页面访问表数据 rows with timestamps and watermarks

val data = Seq(

// Data

Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),

// Watermark

Right(1510365660000L),

..

)

val fieldNames = Array("accessTime", "region", "userId")

val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))

val rowType = new RowTypeInfo(

Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],

fieldNames)

override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {

// 添加数据源实现

execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)

}

...

}

Sink 定义

我们简单的将计算结果写入到Apache Flink内置支持的CSVSink中,定义Sink如下:

def getCsvTableSink: TableSink[Row] = {

val tempFile = ...

new CsvTableSink(tempFile.getAbsolutePath).configure(

Array[String]("region", "winStart", "winEnd", "pv"),

Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))

}

构建主程序

主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:

def main(args: Array[String]): Unit = {

// Streaming 环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

val tEnv = TableEnvironment.getTableEnvironment(env)

// 设置EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//方便我们查出输出数据

env.setParallelism(1)

val sourceTableName = "mySource"

// 创建自定义source数据结构

val tableSource = new MyTableSource

val sinkTableName = "csvSink"

// 创建CSV sink 数据结构

val tableSink = getCsvTableSink

// 注册source

tEnv.registerTableSource(sourceTableName, tableSource)

// 注册sink

tEnv.registerTableSink(sinkTableName, tableSink)

val sql =

"SELECT " +

" region, " +

" TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +

" TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +

" FROM mySource " +

" GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"

tEnv.sqlQuery(sql).insertInto(sinkTableName);

env.execute()

}

执行并查看运行结果

执行主程序后我们会在控制台得到Sink的文件路径,如下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

Cat 方式查看计算结果,如下:

jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1

BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1

BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2

ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1

小结

本篇概要的介绍了Apache Flink SQL 的所有核心算子,并以一个End-to-End的示例展示了如何编写Apache Flink SQL的Job. 希望对大家有所帮助,更多Apache Flink SQL的介绍欢迎大家访问作者的如下专栏或专辑:

更多资讯请访问 Apache Flink 中文社区网站

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
8月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1455 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
660 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
10月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1248 43
|
10月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
1187 9
Apache Flink:从实时数据分析到实时AI
|
10月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
609 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
10月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
930 0
|
9月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
3180 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
10月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
1048 6
|
10月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
820 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
11月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1308 1

推荐镜像

更多