Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。

Apache Flink PMC (项目管理委员会)很高兴地宣布 Apache Flink 2.1.0 版本正式发布,这标志着实时数据处理引擎向统一 Data + AI 平台的里程碑式演进。本次版本汇聚全球 116 位贡献者,完成 16 项改进提案(FLIPs),解决 220 多个问题,重点强化了实时 AI 与智能流处理的深度融合

  • 实时 AI 能力突破
  • 新增 AI 模型 DDL,支持通过 Flink SQL 与 Table API 创建和修改 AI 模型,实现 AI 模型的灵活管理。
  • 扩展 ML_PREDICT 表值函数,支持通过 Flink SQL 实时调用 AI 模型,为构建端到端实时 AI 工作流奠定基础。
  • 实时数据处理增强
  • 通过开放 Flink 核心能力——托管状态、事件时间流处理及表变更日志,Process Table Functions(PTFs) 使 Flink SQL 解锁了更强大的事件驱动型应用开发能力
  • 引入 VARIANT 数据类型,高效处理 JSON 等半结构化数据,结合PARSE_JSON函数与湖仓格式(如 Paimon),实现动态 Schema 数据分析。
  • 重点优化流式 Join,创新性的引入 DeltaJoin MultiJoin 策略,消除状态瓶颈,提升资源利用率与作业稳定性。

Flink 2.1.0 将实时数据处理与 AI 模型无缝集成,推动企业从实时分析迈向实时智能决策,以满足现代数据应用不断变化的需求。感谢各位贡献者的支持!

Flink SQL 提升

Model DDLs 支持

自 Flink 2.0 引入 AI 模型相关 SQL 语法后,用户可通过类似创建 Catalog 对象的方式定义模型,并在 Flink SQL 中调用 AI 模型。Flink 2.1进一步扩展支持通过 Table API(Java/Python) 定义模型,通过编程实现模型的灵活管理。

示例:

  • 使用 Flink SQL 创建模型
CREATE MODEL my_model
INPUT (f0 STRING)
OUTPUT (label STRING)
WITH (
  'task' = 'classification',
  'type' = 'remote',
  'provider' = 'openai',
  'openai.endpoint' = 'remote',
  'openai.api_key' = 'abcdefg',
);
  • 使用 Java API 创建模型
tEnv.createModel(
    "MyModel", 
    ModelDescriptor.forProvider("OPENAI")
      .inputSchema(Schema.newBuilder()
        .column("f0", DataTypes.STRING())
        .build())
      .outputSchema(Schema.newBuilder()
        .column("label", DataTypes.STRING())
        .build())
      .option("task", "classification")
      .option("type", "remote")
      .option("provider", "openai")
      .option("openai.endpoint", "remote")
      .option("openai.api_key", "abcdefg")
      .build(),
true);

更多信息:

  • FLIP-437: Support ML Models in Flink SQL[1]

  • FLIP-507: Add Model DDL methods in TABLE API[2]

实时 AI 函数

基于模型 DDL,Flink 2.1扩展了 ML_PREDICT 表值函数(TVF),支持在 Flink SQL 中实时调用机器学习模型,提供内置兼容 OpenAI API 的模型调用支持,同时开放自定义模型接口,标志着 Flink 从实时数据处理引擎向统一的实时 Data + AI 平台演进。未来将继续扩展 ML_EVALUATEVECTOR_SEARCH 等函数,用户可以使用 Flink SQL 更方便地构建端到端实时 AI 工作流。

-- Declare a AI model
CREATE MODEL `my_model`
INPUT (text STRING)
OUTPUT (response STRING)
WITH(
  'provider' = 'openai',
  'endpoint' = 'https://api.openai.com/v1/llm/v1/chat',
  'api-key' = 'abcdefg',
  'system-prompt' = 'translate to Chinese',
  'model' = 'gpt-4o'
);
-- Basic usage
SELECT * FROM ML_PREDICT(
  TABLE input_table,
  MODEL my_model,
  DESCRIPTOR(text)
);
-- With configuration options
SELECT * FROM ML_PREDICT(
  TABLE input_table,
  MODEL my_model,
  DESCRIPTOR(text)
  MAP['async', 'true', 'timeout', '100s']
);
-- Using named parameters
SELECT * FROM ML_PREDICT(
  INPUT => TABLE input_table,
  MODEL => MODEL my_model,
  ARGS => DESCRIPTOR(text),
  CONFIG => MAP['async', 'true']
);

更多信息:

  • 模型推理[3]
  • FLIP-437: Support ML Models in Flink SQL[4]

Process Table Functions(PTFs)

Apache Flink 现已支持 Process Table Functions(PTFs),这是 Flink SQL 与 Table API 中最强大的函数类型。从概念上讲,PTF 是所有 UDF 的“超集”,能够将零个、一个或多张表映射为零个、一个或多行数据。借助 PTFs,你可以实现与内置算子一样功能丰富的自定义算子,并直接访问 Flink 的托管状态、事件时间、定时器以及表级变更日志。

PTFs 支持如下操作:

  • 对表的每一行执行转换。
  • 可以把一张表从逻辑上拆分成不同子集,并对每个子集分别转换。
  • 缓存已见事件,供后续多次访问。
  • 在未来某个时间点继续处理,实现等待、同步或超时机制。
  • 利用复杂状态机或基于规则的条件逻辑对事件进行缓存与聚合。

这显著缩小了 Flink SQL 与 DataStream API 之间的差距,同时保留了 Flink SQL 生态的健壮性与易用性。

关于 PTFs 的语法与语义细节,详见:Process Table Functions[5]

示例如下:

// Declare a ProcessTableFunction for memorizing your customers
public static class GreetingWithMemory extends ProcessTableFunction<String> {
    public static class CountState {
        public long counter = 0L;
    }
    public void eval(@StateHint CountState state, @ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
        state.counter++;
        collect("Hello " + input.getFieldAs("name") + ", your " + state.counter + " time?");
    }
}
TableEnvironment env = TableEnvironment.create(...);
// Call the PTF in Table API
env.fromValues("Bob", "Alice", "Bob")
   .as("name")
   .partitionBy($("name"))
   .process(GreetingWithMemory.class)
   .execute()
   .print();
// Call the PTF in SQL
env.executeSql("SELECT * FROM GreetingWithMemory(TABLE Names PARTITION BY name)")

更多信息

  • FLIP-440: User-defined SQL operators / ProcessTableFunction (PTF)[6]

Variant 类型

新增 VARIANT 半结构化数据类型(如 JSON),支持存储任意嵌套结构(包括基本类型、ARRAYMAP)并保留原始类型信息。相比 ROWSTRUCTURED 类型,VARIANT 在处理动态 Schema 数据时更灵活。配合 PARSE_JSON 函数及 Apache Paimon 等表格式,可实现湖仓中半结构化数据的高效分析。

CREATE TABLE t1 (
  id INTEGER,
  v STRING -- a json string
) WITH (
  'connector' = 'mysql-cdc',
  ...
);
CREATE TABLE t2 (
  id INTEGER,
  v VARIANT
) WITH (
  'connector' = 'paimon'
  ...
);
-- write to t2 with VARIANT type
INSERT INTO t2 SELECT id, PARSE_JSON(v) FROM t1;

更多信息:

  • Variant Type[7]
  • FLIP-521: Integrating Variant Type into Flink: Enabling Efficient Semi-Structured Data Processing[8]

Structured 类型增强

支持在 CREATE TABLE 语句中直接声明用户定义结构体类型(STRUCTURED),解决类型兼容性问题并提升易用性。

示例:

CREATE TABLE UserTable (
    uid BIGINT,
    user STRUCTURED<'User', name STRING, age INT>
);
-- Casts a row type into a structured type
INSERT INTO UserTable 
SELECT 1, CAST(('Alice', 30) AS STRUCTURED<'User', name STRING, age INT>);

更多信息

  • FLIP-520: Simplify StructuredType handling[9]

  • Structured Type[10]

Delta Join

引入一种新的 DeltaJoin 算子,相比传统的双流 Join 方案,配合 Apache Fluss 等流存储,可以实现 Join 算子无状态化,解决了大状态导致的资源瓶颈、检查点缓慢和恢复延迟等问题。该特性已默认启用,同时需要依赖 Apache Fluss 存储相应版本支持,敬请关注 Support DeltaJoin on Flink 2.1[11]。

更多信息

  • FLIP-486: Introduce A New DeltaJoin[12]

级联双流 Join 优化

使用多个级联流式 Join 的 Flink 作业经常会因 State 过大而导致运行不稳定和性能下降。在这个版本,我们引入了一种新的 MultiJoin 算子, 通过在单个算子内直接进行多流 Join,消除中间结果存储,每条流输入的记录最多存储一份,从而实现实现 "零中间状态" ,显著提升了资源利用率与作业稳定性。目前仅支持 INNER、LEFT JOIN ,并且需要通过参数 table.optimizer.multi-join.enabled=true 启用。

基准测试:我们进行了一项基准测试,比较了 MultiJoin 与双流 Join 的优势,更多详情请参见 MultiJoin Benchmark[13]

更多信息

  • FLIP-516: Multi-Way Join Operator[14]

异步 Lookup Join 增强

异步 Lookup Join 在之前的版本中,即使用户将 table.exec.async-lookup.output-mode 设置为 ALLOW_UNORDERED,引擎在处理更新流时仍会强制回退到 ORDERED 以保证正确性。从 2.1 版本开始,引擎允许并行处理无关的更新记录,同时保证正确性,从而在处理更新流时实现更高的吞吐。

更多信息:

  • FLIP-519: Introduce async lookup key ordered mode[15]

Sink 节点复用

当单个作业中多个 INSERT INTO 语句更新目标表相同列时(下版本将支持不同列),优化器将自动合并 Sink 节点实现复用,该特性可以极大提升 Apache Paimon 等湖仓格式的部分更新场景使用体验。

更多信息:

  • FLIP-506: Support Reuse Multiple Table Sinks in Planner[16]

Compiled Plan 支持 Smile 格式

新增 Smile 二进制格式(兼容 JSON 格式)用于执行计划序列化,较 JSON 更节省内存。默认仍使用 JSON 格式,需通过显示调用 CompiledPlan#asSmileBytesPlanReference#fromSmileBytes 方法启用。

更多信息:

  • FLIP-508: Add support for Smile format for Compiled plans[17]

  • Smile 格式规范[18]

运行时提升

异步 Sink 可插拔的批量处理

支持自定义批量写入策略,用户可根据业务需求灵活扩展异步 Sink 的批量写入逻辑。

更多信息:

  • FLIP-509: Add pluggable Batching for Async Sink[19]

分片级水位线指标

在 Flink 2.1 版本,我们新增细粒度分片监控指标,涵盖水位线进度与状态统计:

  • currentWatermark:该分片最新接收到的水位线值
  • activeTimeMsPerSecond:该分片每秒处于数据处理状态的时间(毫秒)
  • pausedTimeMsPerSecond:因水位线对齐该分片每秒的暂停时间(毫秒)
  • idleTimeMsPerSecond:每秒空闲时长(毫秒)
  • accumulatedActiveTimeMs:累计活跃时长(毫秒)
  • accumulatedPausedTimeMs:累计暂停时长(毫秒)
  • accumulatedIdleTimeMs:累计空闲时长(毫秒)

更多信息:

  • FLIP-513: Split-level Watermark Metrics[20]

连接器提升

Keyed State 连接器

在 Flink 2.1 中,我们为 Keyed State 引入了一个新的 SQL 连接器。该连接器允许用户使用 Flink SQL 直接查询Checkpoint 和 Savepoint 中的 Keyed State,从而使得更容易探查、调试和验证 Flink 作业状态,无需定制工具,该功能对于分析长期运行的作业和验证状态迁移尤其有用。

CREATE TABLE keyed_state (
    k INT,
    user_id STRING,
    balance DOUBLE
) WITH (
    'connector' = 'savepoint',
    'path' = 'file:///savepoint/path'
);
-- 直接查询状态快照
SELECT * FROM keyed_state;

更多信息:

  • FLIP-496: SQL connector for keyed savepoint data[21]

其他重要更新

  • PyFlink:新增支持 Python 3.12,移除 Python 3.8 支持。
  • 依赖升级:flink-shaded 升级至 20.0 以支持 Smile 格式,Parquet 升级至 1.15.3 修复安全漏洞(CVE-2025-30065)。

升级说明

Apache Flink 社区努力确保升级过程尽可能平稳, 但是升级到 2.1 版本可能需要用户对现有应用程序做出一些调整。请参考 Release Notes[22]获取更多的升级时需要的改动与可能的问题列表细节。

贡献者列表

Apache Flink 社区感谢对此版本做出贡献的每一位贡献者:


主要链接:

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-507%253A+Add+Model+DDL+methods+in+TABLE+API

[3] https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/sql/queries/model-inference/

[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL

[5] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/ptfs/

[6] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=298781093

[7] https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/types/#other-data-types

[8] https://cwiki.apache.org/confluence/display/FLINK/FLIP-521%253A+Integrating+Variant+Type+into+Flink

[9] https://cwiki.apache.org/confluence/display/FLINK/FLIP-520%3A+Simplify+StructuredType+handling

[10] https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/types/#user-defined-data-types

[11] https://github.com/apache/fluss/issues/1143

[12] https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%253A+Introduce+A+New+DeltaJoin

[13] https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/tuning/#multiple-regular-joins

[14] https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator

[15] https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A++Introduce+async+lookup+key+ordered+mode

[16] https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A+Support+Reuse+Multiple+Table+Sinks+in+Planner

[17] https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A+Add+support+for+Smile+format+for+Compiled+plans

[18] https://github.com/FasterXML/smile-format-specification

[19] https://cwiki.apache.org/confluence/display/FLINK/FLIP-509+Add+pluggable+Batching+for+Async+Sink

[20] https://cwiki.apache.org/confluence/display/FLINK/FLIP-513%3A+Split-level+Watermark+Metrics

[21] https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%253A+SQL+connector+for+keyed+savepoint+data

[22] https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.1/



来源  | Apache Flink公众号

作者  |  达龙@阿里云

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
20天前
|
人工智能 供应链 数据可视化
一文读懂AI引擎与Together规则引擎重塑智能决策
从1950年图灵提出人工智能设想到如今AI引擎实现自主决策,Together规则引擎正成为智能决策核心。它通过动态规划、多工具调用与持续学习机制,赋能供应链、财务、定价等场景,提升决策透明度与效率。Together助力AI引擎突破落地瓶颈,推动企业管理迈向“决策即服务”新时代。
|
8天前
|
传感器 人工智能 边缘计算
智能就在身边:AI如何优化边缘计算
智能就在身边:AI如何优化边缘计算
67 2
|
26天前
|
JSON 人工智能 Java
基于Spring AI构建智能Text-to-SQL转换器:一个完整的MCP
Spring AI 更新结构化输出转换器,弃用旧版 Parser 类,引入与 Spring 框架对齐的 Converter 体系,提升命名规范与功能兼容性。新版本支持 JSON、XML 及 Java 对象转换,确保 LLM 输出结构化,便于下游应用处理。
|
8天前
|
人工智能 机器人 新能源
深化新工科建设 共探智能新未来 | 阿里云支持南京大学苏州校区“AI DAY”盛大启幕丨云工开物
9月12日,南京大学苏州校区举办“AI新视界:深化新工科建设进行式”活动,采用教师与学生双专场模式,通过主题分享、实践演练、产业课题发布等形式,搭建产教融合AI交流平台,助力未来产业科技人才培养。
|
8天前
|
机器学习/深度学习 人工智能 运维
运维告警别乱飞了!AI智能报警案例解析
运维告警别乱飞了!AI智能报警案例解析
64 0
|
22天前
|
人工智能 测试技术 调度
写用例写到怀疑人生?AI 智能测试平台帮你一键生成!
霍格沃兹测试开发学社推出AI智能测试用例生成功能,结合需求文档一键生成高质量测试用例,大幅提升效率,减少重复劳动。支持自定义提示词、多文档分析与批量管理,助力测试人员高效完成测试设计,释放更多时间投入核心分析工作。平台已开放内测,欢迎体验!
|
15天前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
194 14
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
19天前
|
存储 人工智能 监控
LangGraph实战:从零构建智能交易机器人,让多个AI智能体像投资团队一样协作
如今的量化交易已远超传统技术指标,迈向多智能体协作的新时代。本文介绍了一个基于 **LangGraph** 构建的多智能体交易系统,模拟真实投资机构的运作流程:数据分析师收集市场情报,研究员展开多空辩论,交易员制定策略,风险团队多角度评估,最终由投资组合经理做出决策。系统具备记忆学习能力,通过每次交易积累经验,持续优化决策质量。
122 8
LangGraph实战:从零构建智能交易机器人,让多个AI智能体像投资团队一样协作
|
6天前
|
人工智能 编解码 自然语言处理
重磅更新!ModelScope FlowBench 支持视频生成 + 图像编辑,AI创作全面升级!
很高兴地向大家宣布,ModelScope FlowBench 客户端迎来重大功能升级! 本次更新不仅正式支持了视频节点功能,还新增了图像编辑与IC-Light智能打光等实用功能,同时对多个图像处理节点进行了深度优化和扩展。现在,您只需在 FlowBench 中轻松串联节点,即可使用 Wan2.1/Wan2.2、Qwen-Image-Edit、FLUX Kontext、IC-Light等强大模型,轻松实现创意内容的生成与编辑。 无论你是内容创作者、视觉设计师,还是AI技术爱好者,这次更新都将为你打开全新的创作边界。
133 14
|
6天前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
80 4

推荐镜像

更多