Apache Flink 2.2.0: 推动实时数据与人工智能融合,赋能AI时代的流处理

简介: Apache Flink 2.2.0 发布,增强AI能力,支持ML_PREDICT与VECTOR_SEARCH,提升物化表、批处理及PyFlink性能,优化连接器与调度,助力实时智能数据处理。

Apache Flink PMC 很高兴地宣布 Apache Flink 2.2.0 版本发布了。Flink 2.2.0 版本进一步增强了 AI 函数 和 向量检索功能,改进了物化表和连接器框架,并优化了批处理和 PyFlink 支持。Flink 2.2.0 版本总共由来自全球的 73 位贡献者参与,累计推进了 9 个 FLIP(Flink 重要改进提案),完成了 220 多项缺陷修复和改进。


Flink 2.2.0 版本无缝集成实时数据处理与人工智能,开启了人工智能时代。该版本增强了用于大规模语言模型推理的 ML_PREDICT 和用于实时向量搜索的 VECTOR_SEARCH,从而增强了流式人工智能应用的能力。重点功能包括:物化表增强、Delta Join 优化、均衡任务调度和更多连接器优化(包括限流框架和均匀分片),显著提升了处理性能、可扩展性和可靠性,为构建智能、低延迟的数据管道奠定了坚实的基础。我们衷心感谢所有贡献者的宝贵支持!


接下来让我们深入了解 Flink 2.2.0 版本的重点内容。


Flink SQL 增强和改进

实时AI函数

从 Flink 2.1 版本起,Apache Flink 通过 Flink SQL 中的 ML_PREDICT 函数支持使用 LLM 功能,用户能够以简单高效的方式执行语义分析。在 Flink 2.2.0 版本中,Table API 支持了模型推理操作,允许将机器学习模型直接集成到数据处理中,并使用特定提供商(例如 OpenAI)的模型对数据进行预测处理。

使用示例:

  • 创建并使用模型
// 1. Set up the local environment
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// 2. Create a source table from in-memory data
Table myTable = tEnv.fromValues(
    ROW(FIELD("text", STRING())),
    row("Hello"),
    row("Machine Learning"),
    row("Good morning")
);

// 3. Create model
tEnv.createModel(
    "my_model",
    ModelDescriptor.forProvider("openai")
        .inputSchema(Schema.newBuilder().column("input", STRING()).build())
        .outputSchema(Schema.newBuilder().column("output", STRING()).build())
        .option("endpoint", "https://api.openai.com/v1/chat/completions")
        .option("model", "gpt-4.1")
        .option("system-prompt", "translate to chinese")
        .option("api-key", "<your-openai-api-key-here>")
        .build()
);

Model model = tEnv.fromModel("my_model");

// 4. Use the model to make predictions
Table predictResult = model.predict(myTable, ColumnList.of("text"));

// 5. Async prediction example
Table asyncPredictResult = model.predict(
    myTable, 
    ColumnList.of("text"), 
    Map.of("async", "true")
);

更多信息请参考:FLINK-38104[1]、FLIP-526[2]

向量搜索

Apache Flink 通过 ML_PREDICT 函数和大模型进行了无缝衔接,已在情感分析、实时问答系统等场景中得到技术验证。然而目前的架构仅允许 Flink 使用嵌入模型将非结构化文本数据转换为高维向量特征,然后将这些特征持久化到下游存储系统,缺乏对向量空间进行实时在线查询和相似性分析的能力。


Flink 2.2.0 提供了 VECTOR_SEARCH 函数,使用户能够直接在 Flink 中执行流式向量相似性搜索和实时上下文检索。


以下 SQL 语句为例:

-- Basic usage
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10
);

-- With configuration options
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10,
  MAP['async', 'true', 'timeout', '100s']
);

-- Using named parameters
SELECT * FROM 
input_table, LATERAL VECTOR_SEARCH(
  SEARCH_TABLE => TABLE vector_table,
  COLUMN_TO_QUERY => input_table.vector_column,
  COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
  TOP_K => 10,
  CONFIG => MAP['async', 'true', 'timeout', '100s']
);

-- Searching with contant value
SELECT * 
FROM VECTOR_SEARCH(
  TABLE vector_table,
  ARRAY[10, 20],
  DESCRIPTOR(index_column),
  10,
);

更多信息请参考:FLINK-38422[3]、FLIP-540[4]、Vector Search[5]


物化表

物化表[6]是 Flink SQL 中引入的一种新的表类型,用于简化批处理和流式数据管道,提供一致的开发体验。创建物化表时,只需指定数据新鲜度和查询条件,引擎即可自动生成物化表的模式,并创建相应的数据管道以保证指定的数据新鲜度。


从 Flink 2.2.0 开始,FRESHNESS 不再是 CREATE MATERIALIZED TABLE 和 CREATE OR ALTER MATERIALIZED TABLE DDL 语句的必要组成部分。Flink 2.2.0 引入了一个新的 MaterializedTableEnricher 接口,该接口为自定义的默认逻辑提供了一个正式的扩展方式,允许高级用户实现“智能”的默认行为(例如,从上游表推断数据新鲜度)。


此外,用户可以使用 DISTRIBUTED BYDISTRIBUTED INTO 来支持物化表的分桶。用户还可以使用 SHOW MATERIALIZED TABLES 来展示所有物化表。

使用方式如下:

CREATE MATERIALIZED TABLE my_materialized_table
    PARTITIONED BY (ds)
    DISTRIBUTED INTO 5 BUCKETS
    FRESHNESS = INTERVAL '1' HOUR
    AS SELECT
        ds
    FROM
     ...

更多信息请参考:FLINK-38532[7]、FLINK-38311[8]、FLIP-542[9]、FLIP-551[10]


SinkUpsertMaterializer V2

SinkUpsertMaterializer 是 Flink 中的一个算子,在乱序的变更事件发送到 upsert 接收器之前对其进行协调。在某些情况下,这个算子的性能会呈指数级下降。Flink 2.2.0 引入了一种针对此类情况优化的新版本实现。

更多信息请参考:FLINK-38459[11]、FLIP-544[12]


Delta Join

Apache Flink 2.1 版本引入了新的 Delta Join 算子,以缓解 regular join 中由于庞大状态带来的问题。通过双向查找 join 取代了 regular join 维护的大量状态,直接重用源表中的数据。


Flink 2.2.0 增加了对更多 SQL 模式转换为 Delta Join 的支持。Delta Join 现在支持在不使用 DELETE 操作的情况下应用 CDC 数据源,并允许在数据源之后进行投影和过滤操作。此外,Delta Join 还支持缓存,这有助于减少对外部存储的请求。


目前,Apache Fluss (Incubating) [13]源表可以用作 Delta Join 的源表,可以在 Fluss[14]相关文档查看对应表的定义方式和使用案例。


更多信息请参考:Delta Joins[15]、Delta Join in Fluss[16]

SQL类型

在 Flink 2.2 版本前,SQL 中定义的ROW类型(例如 SELECT CAST(f AS ROW<i NOT NULL>))会忽略 NOT NULL 约束。这虽然更符合 SQL 标准,但在处理嵌套数据时会导致许多类型不一致和晦涩难懂的错误信息。例如,这阻止了在计算列或join key中使用ROW类型。Flink 2.2.0 版本修改了该行为,考虑ROW的可空性。配置项 table.legacy-nested-row-nullability 允许在需要开启来恢复旧行为,建议更新之前忽略约束的已有查询。


Flink 2.2.0 转换字符串为 TIME 类型时会考虑正确的精度(0-3),将不正确的字符串转换为时间类型(例如,小时部分大于 24)现在会导致运行时异常。BINARY 和 VARBINARY 之间的类型转换现在会正确考虑长度。

更多信息请参考:FLINK-20539[17]、FLINK-38181[18]


使用 UniqueKeys 进行状态管理

Flink 2.2 版本对 StreamingMultiJoinOperator 进行了优化和变更,使用 UniqueKeys 而不是 UpsertKeys 来进行状态管理。该算子在 Flink 2.1 中以实验状态发布,后续会持续进行优化,这些优化可能会导致不兼容的变化。

更多信息请参考:FLINK-38209[19]


Runtime 新特性和改进

均衡任务调度

Flink 2.2.0 引入了一种均衡的任务调度策略,以实现任务管理器的任务负载均衡并减少作业瓶颈。

更多信息请参考:FLINK-31757[20]、FLIP-370[21]


增强HistoryServer工作历史保留策略

在 Flink 2.2.0 版本前,HistoryServer 仅支持基于数量的作业归档保留策略,这不足以满足需要基于时间保留或组合规则的场景。用户在 Flink 2.2.0 可以使用新的配置项 historyserver.archive.retained-ttl 并结合 historyserver.archive.retained-jobs 来满足更多场景需求。

更多信息请参考:FLINK-38229[22]、FLIP-490[23]


Metrics

自 Flink 2.2.0 版本起,用户可以为作业中使用的每个算子/转换分配自定义指标变量。这些变量随后会被指标报告器转换为标签,允许用户为特定运算符的指标添加标签。例如,您可以使用此功能来命名和区分数据源。

用户现在可以通过 traces.checkpoint.span-detail-level[24] 控制检查点 span 的详细级别。最高级别会报告每个任务和子任务的 span 树。报告的自定义 span 现在可以包含子 span。更多详情请参阅 Traces[25]

更多信息请参考:FLINK-38158[26]、FLINK-38353[27]

Connector 框架优化

Scan数据源限流功能

Flink 作业频繁地与外部系统交换数据,这会消耗网络带宽和 CPU 资源。当这些资源稀缺时,过于频繁地拉取数据可能会干扰其他工作负载,例如 Kafka/MySQL CDC 连接器。在 Flink 2.2 中,我们引入了 RateLimiter 接口,为Scan数据源提供请求速率限制,连接器开发人员可以将其与限流框架集成,以实现自己的限流策略。此功能仅在 DataStream API 中可用。

更多信息请参考:FLINK-38497[28]、FLIP-535[29]


支持均匀分片

SplitEnumerator 负责分配分片工作,但它无法了解这些分片的实际运行时状态或分布情况。这使得 SplitEnumerator 无法保证分片均匀分布,并且极易出现数据倾斜。从 Flink 2.2 开始,SplitEnumerator 获得了分片分布信息,并提供了在运行时均匀分配分片的能力。例如,此功能可用于解决 Kafka 连接器中的数据倾斜问题。

更多信息请参考:FLINK-38564[30]、FLIP-537[31]

其他改进

PyFlink

在 Flink 2.2 中,我们为 Python DataStream API 添加了异步函数支持。这使得 Python 用户能够在 Flink 作业中高效地查询外部服务,例如通常部署在独立 GPU 集群中的 LLM 服务等。


此外,我们还提供了全面的支持,以确保外部服务访问的稳定性。一方面,我们支持限制发送到外部服务的并发请求数量,以避免服务过载。另一方面,我们也添加了重试机制,以应对可能由网络抖动或其他瞬态问题导致的临时服务不可用情况。

以下是一个简单的使用示例:

from typing import List
from ollama import AsyncClient

from pyflink.common import Types, Time, Row
from pyflink.datastream import (
    StreamExecutionEnvironment,
    AsyncDataStream,
    AsyncFunction,
    RuntimeContext,
    CheckpointingMode,
)


class AsyncLLMRequest(AsyncFunction[Row, str]):

    def __init__(self, host, port):
        self._host = host
        self._port = port

    def open(self, runtime_context: RuntimeContext):
        self._client = AsyncClient(host='{}:{}'.format(self._host, self._port))

    async def async_invoke(self, value: Row) -> List[str]:
        message = {"role": "user", "content": value.question}
        question_id = value.id
        ollam_response = await self._client.chat(model="qwen3:4b", messages=[message])
        return [
            f"Question ID {question_id}: response: {ollam_response['message']['content']}"
        ]

    def timeout(self, value: Row) -> List[str]:
        # return a default value in case timeout
        return [f"Timeout for this question: {value.a}"]


def main(output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.enable_checkpointing(30000, CheckpointingMode.EXACTLY_ONCE)
    ds = env.from_collection(
        [
            ("Who are you?", 0),
            ("Tell me a joke", 1),
            ("Tell me the result of comparing 0.8 and 0.11", 2),
        ],
        type_info=Types.ROW_NAMED(["question", "id"], [Types.STRING(), Types.INT()]),
    )

    result_stream = AsyncDataStream.unordered_wait(
        data_stream=ds,
        async_function=AsyncLLMRequest(),
        timeout=Time.seconds(100),
        capacity=1000,
        output_type=Types.STRING(),
    )

    # define the sink
    result_stream.print()

    # submit for execution
    env.execute()


if __name__ == "__main__":
    main(known_args.output)

更多信息请参考:FLINK-38190[32]

升级 commons-lang3 依赖到 3.18.0

将 commons-lang3 从 3.12.0 升级到 3.18.0 以解决 CVE-2025-48924。

更多信息请参考:FLINK-38193[33]

protobuf-java 从 3.x 升级到 4.32.1

Flink 2.2 从 protobuf-java 3.21.7(Protocol Buffers 版本 21)升级到 protobuf-java 4.32.1(对应 Protocol Buffers 版本 32)。此次升级实现了以下功能:

  • Protobuf 版本支持:完全支持 Protocol Buffers v27 及更高版本中引入的 edition = "2023"edition = "2024" 语法。版本提供了一种统一的方法,将 proto2 和 proto3 的功能与细粒度的特性控制相结合。
  • 改进 Proto3 字段存在性检查:更好地处理 proto3 中的可选字段,不再受限于旧版 protobuf 的限制,无需将 protobuf.read-default-values 设置为 true 来进行字段存在性检查。
  • 性能提升:利用了 11 个 Protocol Buffers 版本(版本 22-32)中的性能改进和错误修复。
  • 现代 Protobuf 特性:可访问更新的 protobuf 功能,包括 Edition 2024 特性和改进的运行时行为。

使用 proto2 和 proto3 .proto 文件的用户可以兼容使用,无需更改。

更多信息请参考:FLINK-38547[34]

升级注意事项

Flink 社区致力于确保版本升级过程尽可能顺畅。但某些变更可能需要用户在升级到 2.2 版本时,对程序的某些部分进行调整。请参阅发版说明[35]以获取升级过程中需要进行的调整和需要检查的全部注意事项。

相关链接:

[1] https://issues.apache.org/jira/browse/FLINK-38104

[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-526%3A+Model+ML_PREDICT%2C+ML_EVALUATE+Table+API

[3] https://issues.apache.org/jira/browse/FLINK-38422

[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-540%3A+Support+VECTOR_SEARCH+in+Flink+SQL

[5] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/sql/queries/vector-search/

[6] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/materialized-table/overview/

[7] https://issues.apache.org/jira/browse/FLINK-38532

[8] https://issues.apache.org/jira/browse/FLINK-38311

[9] https://cwiki.apache.org/confluence/display/FLINK/FLIP-542%3A+Make+materialized+table+DDL+consistent+with+regular+tables

[10] https://cwiki.apache.org/confluence/display/FLINK/FLIP-551%3A+Make+FRESHNESS+Optional+for+Materialized+Tables

[11] https://issues.apache.org/jira/browse/FLINK-38459

[12] https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2

[13] https://fluss.apache.org/blog/fluss-open-source/

[14] https://fluss.apache.org/docs/engine-flink/delta-joins/

[15] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/tuning/#delta-joins

[16] https://fluss.apache.org/docs/engine-flink/delta-joins/

[17] https://issues.apache.org/jira/browse/FLINK-20539

[18] https://issues.apache.org/jira/browse/FLINK-38181

[19] https://issues.apache.org/jira/browse/FLINK-38209

[20] https://issues.apache.org/jira/browse/FLINK-38229

[21] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=332499857

[22] https://issues.apache.org/jira/browse/FLINK-38229

[23] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=332499857

[24] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/deployment/config/#traces-checkpoint-span-detail-level

[25] https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/ops/traces/

[26] https://issues.apache.org/jira/browse/FLINK-38158

[27] https://issues.apache.org/jira/browse/FLINK-38353

[28] https://issues.apache.org/jira/browse/FLINK-38497

[29] https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source

[30] https://issues.apache.org/jira/browse/FLINK-38564

[31] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480

[32] https://issues.apache.org/jira/browse/FLINK-38190

[33] https://issues.apache.org/jira/browse/FLINK-38193

[34] https://issues.apache.org/jira/browse/FLINK-38547

[35] https://nightlies.apache.org/flink/flink-docs-release-2.2/release-notes/flink-2.2/

相关文章
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
413 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
3月前
|
存储 SQL 缓存
Delta Join:为超大规模流处理实现计算与历史数据解耦
Delta Join(FLIP-486)是Flink流式Join的范式革新,通过将历史数据存储与计算解耦,实现按需查询外部存储(如Fluss、Paimon),避免状态无限增长。它解决了传统Join在高基数场景下的状态爆炸问题,显著降低资源消耗:状态减少50TB,成本降10倍,Checkpoint从小时级缩短至秒级,恢复速度提升87%。兼容标准SQL,自动优化转换,适用于海量数据实时关联场景,推动流处理迈向高效、稳定、可扩展的新阶段。
477 1
Delta Join:为超大规模流处理实现计算与历史数据解耦
|
5月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1904 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
3月前
|
存储 SQL 缓存
Delta Join:为超大规模流处理实现计算与历史数据解耦
Delta Join(FLIP-486)是Flink流式Join的范式革新,通过将计算与历史数据解耦,避免传统Join因存储全量状态导致的资源爆炸。它采用无状态查询机制,按需从Fluss或Paimon等外部存储获取数据,显著降低状态大小、Checkpoint时间及恢复成本。实测中消除50TB状态,资源消耗降10倍,CPU内存节省超80%,作业恢复提速87%。兼容标准SQL,自动优化转换,适用于高基数流式关联、实时审计等场景,标志着大规模流处理进入高效稳定新阶段。
174 0
|
6月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
695 6
|
11月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
1299 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
6月前
|
存储 人工智能 分布式计算
数据不用搬,AI直接炼!阿里云AnalyticDB AI数据湖仓一站式融合AI+BI
阿里云瑶池旗下的云原生数据仓库AnalyticDB MySQL版(以下简称ADB)诞生于高性能实时数仓时代,实现了PB级结构化数据的高效处理和分析。在前几年,为拥抱大数据的浪潮,ADB从传统数仓拓展到数据湖仓,支持Paimon/Iceberg/Delta Lake/Hudi湖格式,为开放的数据湖提供数据库级别的性能、可靠性和管理能力,从而更好地服务以SQL为核心的大规模数据处理和BI分析,奠定了坚实的湖仓一体基础。
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1672 13
Apache Flink 2.0-preview released