Flink Batch SQL 1.10 实践

简介: 1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。

作者:李劲松(之信)

Flink作为流批统一的计算框架,在1.10中完成了大量batch相关的增强与改进。1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。

架构

Stack

图片 1.png

首先来看下stack,在新的Blink planner中,batch也是架设在Transformation上的,这就意味着我们和Dataset完全没有关系了:

  1. 我们可以尽可能的和streaming复用组件,复用代码,有同一套行为。
  2. 如果想要Table/SQL的toDataset或者fromDataset,那就完全没戏了。尽可能的在Table的层面来处理吧。
  3. 后续我们正在考虑在DataStream上构建BoundedStream,给DataStream带来批处理的功能。

网络模型

图片 2.png

Batch模式就是在中间结果落盘,这个模式和典型的Batch处理是一致的,比如MapReduce/Spark/Tez。

Flink以前的网络模型也分为Batch和Pipeline两种,但是Batch模式只是支持上下游隔断执行,也就是说资源用量可以不用同时满足上下游共同的并发。但是另外一个关键点是Failover没有对接好,1.9和1.10在这方面进行了改进,支持了单点的Failover。

建议在Batch时打开:

jobmanager.execution.failover-strategy = region

为了避免重启过于频繁导致JobMaster太忙了,可以把重启间隔提高:

restart-strategy.fixed-delay.delay = 30 s

Batch模式的好处有:

  • 容错好,可以单点恢复
  • 调度好,不管多少资源都可以运行
  • 性能差,中间数据需要落盘,强烈建议开启压缩
    taskmanager.network.blocking-shuffle.compression.enabled = true

Batch模式比较稳,适合传统Batch作业,大作业。

图片 3.png

Pipeline模式是Flink的传统模式,它完全和Streaming作业用的是同一套代码,其实社区里Impala和Presto也是类似的模式,纯走网络,需要处理反压,不落盘,它主要的优缺点是:

  • 容错差,只能全局重来
  • 调度差,你得保证有足够的资源
  • 性能好,Pipeline执行,完全复用Stream,复用流控反压等功能。

有条件可以考虑开启Pipeline模式。

调度模型

Flink on Yarn支持两种模式,Session模式和Per job模式,现在已经在调度层次高度统一了。

  1. Session模式没有最大进程限制,当有Job需要资源时,它就会去Yarn申请新资源,当Session有空闲资源时,它就会给Job复用,所以它的模型和PerJob是基本一样的。
  2. 唯一的不同只是:Session模式可以跨作业复用进程。

另外,如果想要更好的复用进程,可以考虑加大TaskManager的超时释放:
resourcemanager.taskmanager-timeout = 900000

资源模型

先说说并发:

  1. 对Source来说:目前Hive的table是根据InputSplit来定需要多少并发的,它之后能Chain起来的Operators自然都是和source相同的并发。
  2. 对下游网络传输过后的Operators(Tasks)来说:除了一定需要单并发的Task来说,其它Task全部统一并发,由table.exec.resource.default-parallelism统一控制。

我们在Blink内部实现了基于统计信息来推断并发的功能,但是其实以上的策略在大部分场景就够用了。

Manage内存

图片 4.png

目前一个TaskManager里面含有多个Slot,在Batch作业中,一个Slot里只能运行一个Task (关闭SlotShare)。

对内存来说,单个TM会把Manage内存切分成Slot粒度,如果1个TM中有n个Slot,也就是Task能拿到1/n的manage内存。

我们在1.10做了重大的一个改进就是:Task中chain起来的各个operators按照比例来瓜分内存,所以现在配置的算子内存都是一个比例值,实际拿到的还要根据Slot的内存来瓜分。

这样做的一个重要好处是:

  1. 不管当前Slot有多少内存,作业能都run起来,这大大提高了开箱即用。
  2. 不管当前Slot有多少内存,Operators都会把内存瓜分干净,不会存在浪费的可能。

当然,为了运行的效率,我们一般建议单个Slot的manage内存应该大于500MB。

另一个事情,在1.10后,我们去除了OnHeap的manage内存,所以只有off-heap的manage内存。

外部系统集成

Hive

强烈推荐Hive Catalog + Hive,这也是目前批处理最成熟的架构。在1.10中,除了对以前功能的完善以外,其它做了几件事:

  1. 多版本支持,支持Hive 1.X 2.X 3.X
  2. 完善了分区的支持,包括分区读,动态/静态分区写,分区统计信息的支持。
  3. 集成Hive内置函数,可以通过以下方式来load:
    a)TableEnvironment.loadModule("hiveModule",new HiveModule("hiveVersion"))
  4. 优化了ORC的性能读,使用向量化的读取方式,但是目前只支持Hive 2+版本,且要求列没有复杂类型。有没有进行过优化差距在5倍量级。

兼容Streaming Connectors

得益于流批统一的架构,目前的流Connectors也能在batch上使用,比如HBase的Lookup和Sink、JDBC的Lookup和Sink、Elasticsearch的Sink,都可以在Batch无缝对接使用起来。

实践

SQL-CLI

在1.10中,SQL-CLI也做了大量的改动,比如把SQL-CLI做了stateful,里面也支持了DDL,还支持了大量的DDL命令,给SQL-CLI暴露了很多TableEnvironment的能力,这让用户可以方便得多。后续,我们也需要对接JDBC的客户端,让用户可以更好的对接外部工具。但是SQL-CLI仍然待继续改进,比如目前仍然只支持Session模式,不支持Per Job模式。

编程方式

TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build());

老的BatchTableEnv因为绑定了Dataset,而且区分Java和Scala,是不干净的设计方式,所以Blink planner只支持新的TableEnv。

TableEnv注册的source, sink, connector, functions,都是temporary的,重启之后即失效了。如果需要持久化的object,考虑使用HiveCatalog。

tEnv.registerCatalog(“hive”, hiveCatalog);
tEnv.useCatalog(“hive”);

可以通过tEnv.sqlQuery来执行DML,这样可以获得一个Table,我们也通过collect来获得小量的数据:

Table table = tEnv.sqlQuery(“SELECT COUNT(*) FROM MyTable”);
List<Row> results = TableUtils.collectToList(table);
System.out.println(results);

可以通过tEnv.sqlUpdate来执行DDL,但是目前并不支持创建hive的table,只能创建Flink类型的table:

tEnv.sqlUpdate(
   "CREATE TABLE myResult (" +
      "  cnt BIGINT"
      ") WITH (" +
      "  'connector.type'='jdbc'," 
         ……
      ")");

可以通过tEnv.sqlUpdate来执行insert语句,Insert到临时表或者Catalog表中,比如insert到上面创建的临时JDBC表中:

tEnv.sqlUpdate(“INSERT INTO myResult SELECT COUNT(*) FROM MyTable”);
tEnv.execute(“MyJob”);

当结果表是Hive表时,可以使用Overwrite语法,也可以使用静态Partition的语法,这需要打开Hive的方言:

tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

结语

目前Flink batch SQL仍然在高速发展中,但是1.10已经是一个可用的版本了,它在功能上、性能上都有很大的提升,后续还有很多有意思的features,等待着大家一起去挖掘。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1077 43
|
8月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
491 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
9月前
|
SQL Java 关系型数据库
在 RDB 上跑 SQL------SPL 轻量级多源混算实践 1
SPL 支持通过 JDBC 连接 RDB,可动态生成 SQL 并传参,适用于 Java 与 SQL 结合的各类场景。本文以 MySQL 为例,演示如何配置数据库连接、编写 SPL 脚本查询 2024 年订单数据,并支持参数过滤和 SQL 混合计算。脚本可在 IDE 直接执行或集成至 Java 应用调用。
|
8月前
|
SQL 关系型数据库 Java
SQL 移植--SPL 轻量级多源混算实践 7
不同数据库的 SQL 语法存在差异,尤其是函数写法不同,导致 SQL 移植困难。SPL 提供 sqltranslate 函数,可将标准 SQL 转换为特定数据库语法,实现 SQL 语句在不同数据库间的无缝迁移,支持多种数据库函数映射与自定义扩展。
|
9月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1185 1
|
10月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
570 9
Flink在B站的大规模云原生实践
|
11月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1742 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
741 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
527 9

相关产品

  • 实时计算 Flink版