Apache Flink 零基础入门(八): SQL 编程实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文是 Apache Flink 零基础入门系列文章第八篇,将通过五个实例讲解 Flink SQL 的编程实践。

作者:伍翀(云邪)

本文是 Apache Flink 零基础入门系列文章第八篇,将通过五个实例讲解 Flink SQL 的编程实践。

注: 本教程实践基于 Ververica 开源的 sql-training 项目。基于 Flink 1.7.2 。

通过本课你能学到什么?

本文将通过五个实例来贯穿 Flink SQL 的编程实践,主要会涵盖以下几个方面的内容。

  1. 如何使用 SQL CLI 客户端
  2. 如何在流上运行 SQL 查询
  3. 运行 window aggregate 与 non-window aggregate,理解其区别
  4. 如何用 SQL 消费 Kafka 数据
  5. 如何用 SQL 将结果写入 Kafka 和 ElasticSearch

本文假定您已具备基础的 SQL 知识。

环境准备

本文教程是基于 Docker 进行的,因此你只需要安装了 Docker 即可。不需要依赖 Java、Scala 环境、或是IDE。

注意:Docker 默认配置的资源可能不太够,会导致运行 Flink Job 时卡死。因此推荐配置 Docker 资源到 3-4 GB,3-4 CPUs。

本次教程的环境使用 Docker Compose 来安装,包含了所需的各种服务的容器,包括:

  • Flink SQL Client:用来提交query,以及可视化结果
  • Flink JobManager 和 TaskManager:用来运行 Flink SQL 任务。
  • Apache Kafka:用来生成输入流和写入结果流。
  • Apache Zookeeper:Kafka 的依赖项
  • ElasticSearch:用来写入结果

我们已经提供好了Docker Compose 配置文件,可以直接下载 docker-compose.yml 文件。

然后打开命令行窗口,进入存放 docker-compose.yml 文件的目录,然后运行以下命令:

  • Linux & MacOS
docker-compose up -d
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

docker-compose 命令会启动所有所需的容器。第一次运行的时候,Docker 会自动地从 Docker Hub 下载镜像,这可能会需要一段时间(将近 2.3GB)。之后运行的话,几秒钟就能启动起来了。运行成功的话,会在命令行中看到以下输出,并且也可以在 http://localhost:8081 访问到 Flink Web UI。

运行 Flink SQL CLI 客户端

运行下面命令进入 Flink SQL CLI 。

docker-compose exec sql-client ./sql-client.sh

该命令会在容器中启动 Flink SQL CLI 客户端。然后你会看到如下的欢迎界面。

数据介绍

Docker Compose 中已经预先注册了一些表和数据,可以运行 SHOW TABLES; 来查看。本文会用到的数据是 Rides 表,这是一张出租车的行车记录数据流,包含了时间和位置信息,运行 DESCRIBE Rides; 可以查看表结构。

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: Long           // 行为ID (包含两条记录,一条入一条出)
 |-- taxiId: Long           // 出租车ID 
 |-- isStart: Boolean       // 开始 or 结束
 |-- lon: Float             // 经度
 |-- lat: Float             // 纬度
 |-- rideTime: TimeIndicatorTypeInfo(rowtime)     // 时间
 |-- psgCnt: Integer        // 乘客数

Rides 表的详细定义见 training-config.yaml

实例1:过滤

例如我们现在只想查看发生在纽约的行车记录

注:Docker 环境中已经预定义了一些内置函数,如 isInNYC(lon, lat) 可以确定一个经纬度是否在纽约,toAreaId(lon, lat) 可以将经纬度转换成区块。

因此,此处我们可以使用 isInNYC 来快速过滤出纽约的行车记录。在 SQL CLI 中运行如下 Query:

SELECT * FROM Rides WHERE isInNYC(lon, lat);

SQL CLI 便会提交一个 SQL 任务到 Docker 集群中,从数据源(Rides 流存储在Kafka中)不断拉取数据,并通过 isInNYC 过滤出所需的数据。SQL CLI 也会进入可视化模式,并不断刷新展示过滤后的结果:

也可以到 http://localhost:8081 查看 Flink 作业的运行情况。

实例2:Group Aggregate

我们的另一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车... 当然,我们仍然只关心纽约的行车事件。

因此,我们可以按照乘客数psgCnt做分组,使用 COUNT(*) 计算出每个分组的事件数,注意在分组前需要先过滤出isInNYC的数据。在 SQL CLI 中运行如下 Query:

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

SQL CLI 的可视化结果如下所示,结果每秒都在发生变化。不过最大的乘客数不会超过 6 人。

实例3:Window Aggregate

为了持续地监测纽约的交通流量,需要计算出每个区块每5分钟的进入的车辆数。我们只关心至少有5辆车子进入的区块。

此处需要涉及到窗口计算(每5分钟),所以需要用到 Tumbling Window 的语法。“每个区块” 所以还要按照 toAreaId 进行分组计算。“进入的车辆数” 所以在分组前需要根据 isStart 字段过滤出进入的行车记录,并使用 COUNT(*) 统计车辆数。最后还有一个 “至少有5辆车子的区块” 的条件,这是一个基于统计值的过滤条件,所以可以用 SQL HAVING 子句来完成。

最后的 Query 如下所示:

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;

在 SQL CLI 中运行后,其可视化结果如下所示,每个 area + window_end 的结果输出后就不会再发生变化,但是会每隔 5 分钟会输出一批新窗口的结果。因为 Docker 环境中的source我们做了10倍的加速读取(相对于原始速度),所以演示的时候,大概每隔30秒就会输出一批新窗口。

Window Aggregate 与 Group Aggregate 的区别

从实例2和实例3的结果显示上,可以体验出来 Window Aggregate 与 Group Aggregate 是有一些明显的区别的。其主要的区别是,Window Aggregate 是当window结束时才输出,其输出的结果是最终值,不会再进行修改,其输出流是一个 Append 流。而 Group Aggregate 是每处理一条数据,就输出最新的结果,其结果是在不断更新的,就好像数据库中的数据一样,其输出流是一个 Update 流

另外一个区别是,window 由于有 watermark ,可以精确知道哪些窗口已经过期了,所以可以及时清理过期状态,保证状态维持在稳定的大小。而 Group Aggregate 因为不知道哪些数据是过期的,所以状态会无限增长,这对于生产作业来说不是很稳定,所以建议对 Group Aggregate 的作业配上 State TTL 的配置。

例如统计每个店铺每天的实时PV,那么就可以将 TTL 配置成 24+ 小时,因为一天前的状态一般来说就用不到了。

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id

当然,如果 TTL 配置地太小,可能会清除掉一些有用的状态和数据,从而导致数据精确性地问题。这也是用户需要权衡地一个参数。

实例4:将 Append 流写入 Kafka

上一小节介绍了 Window Aggregate 和 Group Aggregate 的区别,以及 Append 流和 Update 流的区别。在 Flink 中,目前 Update 流只能写入支持更新的外部存储,如 MySQL, HBase, ElasticSearch。Append 流可以写入任意地存储,不过一般写入日志类型的系统,如 Kafka。

这里我们希望将“每10分钟的搭乘的乘客数”写入Kafka。

我们已经预定义了一张 Kafka 的结果表 Sink_TenMinPsgCntstraining-config.yaml 中有完整的表定义)。

在执行 Query 前,我们先运行如下命令,来监控写入到 TenMinPsgCnts topic 中的数据:

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

每10分钟的搭乘的乘客数可以使用 Tumbling Window 来描述,我们使用 INSERT INTO Sink_TenMinPsgCnts 来直接将 Query 结果写入到结果表。

INSERT INTO Sink_TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,  
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  CAST(SUM(psgCnt) AS BIGINT) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

我们可以监控到 TenMinPsgCnts topic 的数据以 JSON 的形式写入到了 Kafka 中:

实例5:将 Update 流写入 ElasticSearch

最后我们实践一下将一个持续更新的 Update 流写入 ElasticSearch 中。我们希望将“每个区域出发的行车数”,写入到 ES 中。

我们也已经预定义好了一张 Sink_AreaCnts 的 ElasticSearch 结果表(training-config.yaml 中有完整的表定义)。该表中只有两个字段 areaIdcnt

同样的,我们也使用 INSERT INTO 将 Query 结果直接写入到 Sink_AreaCnts 表中。

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt 
FROM Rides 
WHERE isStart
GROUP BY toAreaId(lon, lat);

在 SQL CLI 中执行上述 Query 后,Elasticsearch 会自动地创建 area-cnts 索引。Elasticsearch 提供了一个 REST API 。我们可以访问

随着 Query 的一直运行,你也可以观察到一些统计值(_all.primaries.docs.count, _all.primaries.docs.deleted)在不断的增长:http://localhost:9200/area-cnts/_stats

总结

本文带大家使用 Docker Compose 快速上手 Flink SQL 的编程,并对比 Window Aggregate 和 Group Aggregate 的区别,以及这两种类型的作业如何写入到 外部系统中。感兴趣的同学,可以基于这个 Docker 环境更加深入地去实践,例如运行自己写的 UDF , UDTF, UDAF。查询内置地其他源表等等。


▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:

https://developer.aliyun.com/special/ffa2019

首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:

https://tianchi.aliyun.com/markets/tianchi/flink2019

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
492 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
6月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
838 26
|
4月前
|
存储 大数据 数据处理
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
|
7月前
|
存储 SQL 人工智能
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
1117 13
Apache Flink 2.0:Streaming into the Future
|
SQL Kubernetes Cloud Native
开发者社区精选直播合集(三十六)| Flink实践合集
Flink 作为业界公认为最好的流计算引擎,不仅仅局限于做流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎,以其高吞吐低延时的优异实时计算能力、支持海量数据的亚秒级快速响应帮助企业和开发者实现数据算力升级,并成为阿里、腾讯、滴滴、美团、字节跳动、Netflix、Lyft 等国内外知名公司建设实时计算平台的首选。
开发者社区精选直播合集(三十六)|  Flink实践合集
|
10月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
8月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
2837 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
8月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
340 56
|
6月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
421 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
7月前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多