Peeking into Apache Flink's Engine Room

简介:

Join Processing in Apache Flink

In this blog post, we cut through Apache Flink’s layered architecture and take a look at its internals with a focus on how it handles joins. Specifically, I will

  • show how easy it is to join data sets using Flink’s fluent APIs,
  • discuss basic distributed join strategies, Flink’s join implementations, and its memory management,
  • talk about Flink’s optimizer that automatically chooses join strategies,
  • show some performance numbers for joining data sets of different sizes, and finally
  • briefly discuss joining of co-located and pre-sorted data sets

这篇blog会从flink内部详细看看是如何处理join的,尤其,

如何用Flink API,简单的实现join 
讨论基本的join策略,flink join的实现和内存管理 
讨论flink的优化器,如何自动选择join策略 
显示不同数据size上的性能数据, 
最后简单讨论一下co-located和预排序的数据集的join问题

 

How do I join with Flink?

Flink provides fluent APIs in Java and Scala to write data flow programs.

复制代码
// define your data types
case class PageVisit(url: String, ip: String, userId: Long)
case class User(id: Long, name: String, email: String, country: String)

// get your data from somewhere
val visits: DataSet[PageVisit] = ...
val users: DataSet[User] = ...

// filter the users data set
val germanUsers = users.filter((u) => u.country.equals("de"))
// join data sets
val germanVisits: DataSet[(PageVisit, User)] =
      // equi-join condition (PageVisit.userId = User.id)
     visits.join(germanUsers).where("userId").equalTo("id")
复制代码

可以看到用flink api实现join还是比较简单的

 

How does Flink join my data?

Flink如何做join,分两个阶段,

Ship Strategy

Local Strategy

 

Ship Strategy

对于分布式数据,数据是散落在各个partition中的,要做join,首先要把相同join key的数据放到一起,这个过程称为Ship Strategy

Flink的Ship Strategy有两种,

Repartition-Repartition strategy (RR)

The Repartition-Repartition strategy partitions both inputs, R and S, on their join key attributes using the same partitioning function.

image

比如,R,S做join,我们就用一个相同partition函数,按join key,把R,S的所有分区,shuffle到3个local join上

这样做需要把full shuffle of both data sets over the network

 

Broadcast-Forward strategy (BF)

The Broadcast-Forward strategy sends one complete data set (R) to each parallel instance that holds a partition of the other data set (S)

image

S不动,把R广播到所有的join实例上;显然如果R足够小,这样做是很有效的

 

Flink’s Memory Management

对join的场景,很容易想到,在ship阶段,需要shuffle大量的数据,内存是否会OutOfMemoryException,或是否会发生full gc

Flink handles this challenge by actively managing its memory. 
When a worker node (TaskManager) is started, it allocates a fixed portion (70% by default) of the JVM’s heap memory that is available after initialization as 32KB byte arrays.

This design has several nice properties.

First, the number of data objects on the JVM heap is much lower resulting in less garbage collection pressure.

Second, objects on the heap have a certain space overhead and the binary representation is more compact. Especially data sets of many small elements benefit from that.

Third, an algorithm knows exactly when the input data exceeds its working memory and can react by writing some of its filled byte arrays to the worker’s local filesystem. After the content of a byte array is written to disk, it can be reused to process more data. Reading data back into memory is as simple as reading the binary data from the local filesystem.

The following figure illustrates Flink’s memory management.

image

Flink是主动管理JVM的heap内存的,会申请一组32KB的memory segments,给各种算法用

好处就是,这样减少GC的压力;而且数据是序列化成binary后存储,overhead很小;自己管理内存,内存不够的时候,可以知道并写磁盘文件

 

Local Strategies

After the data has been distributed across all parallel join instances using either a Repartition-Repartition or Broadcast-Forward ship strategy, each instance runs a local join algorithm to join the elements of its local partition.

Flink’s runtime features two common join strategies to perform these local joins:

  • the Sort-Merge-Join strategy (SM) and
  • the Hybrid-Hash-Join strategy (HH).

在ship strategy,我们已经把相同join key的数据,放到同一个local partition上,现在要做的只是run一个local join算法

Flink有两种local join算法,

The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase).

The sort is done in-memory if the local partition of a data set is small enough. 
Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key. The figure below shows how the Sort-Merge-Join strategy works.

image

这个算法,如其名,两个过程,sort,merge

首先是,sort,如果数据足够小,在内存中直接sort,这个比较简单 
如果数据比较大,需要用到外排算法,比如内存可以排序1G数据,而我有3G数据,那就每次只读1G,排序,存入一个文件,这样最终会产生3个已局部排序的文件,如图 
读的时候,同时打开3个文件,边读边merge就可以产生一个全局有序的stream

然后是,merge,对于两个已排序的inputs,做join很简单

 

The Hybrid-Hash-Join distinguishes its inputs as build-side and probe-side input and works in two phases, a build phase followed by a probe phase.

In the build phase, the algorithm reads the build-side input and inserts all data elements into an in-memory hash table indexed by their join key attributes. If the hash table outgrows the algorithm’s working memory, parts of the hash table (ranges of hash indexes) are written to the local filesystem. The build phase ends after the build-side input has been fully consumed.

In the probe phase, the algorithm reads the probe-side input and probes the hash table for each element using its join key attribute. If the element falls into a hash index range that was spilled to disk, the element is also written to disk. Otherwise, the element is immediately joined with all matching elements from the hash table. If the hash table completely fits into the working memory, the join is finished after the probe-side input has been fully consumed. Otherwise, the current hash table is dropped and a new hash table is built using spilled parts of the build-side input.

This hash table is probed by the corresponding parts of the spilled probe-side input. Eventually, all data is joined. Hybrid-Hash-Joins perform best if the hash table completely fits into the working memory because an arbitrarily large the probe-side input can be processed on-the-fly without materializing it. However even if build-side input does not fit into memory, the the Hybrid-Hash-Join has very nice properties. In this case, in-memory processing is partially preserved and only a fraction of the build-side and probe-side data needs to be written to and read from the local filesystem. The next figure illustrates how the Hybrid-Hash-Join works.

 

image

这个算法会把两个input,分成bulid input和probe input

其中build input会用于以join key来build一个hash table,如果这个hash table足够小,那么很简单 
当build完hash table后,我们只需要遍历probe input,如果落在hash table中,就做join 
这个方法,如果build input足够小,会非常高效,因为我们不需要在内存中存probe input,只需要读一条处理一条即可

但如果build input比较大,内存放不下整个hash table怎么办? 
也很简单,内存不够的时候,把部分hash table,以hash index range,存入磁盘 
这样当遍历probe input的时候,如果对应的hash table在磁盘,那么暂时把这部分probe input也存入磁盘

最后,当遍历完probe input后,内存中的hash table已经完成join,删掉,载入磁盘中的hash table完成最后的join

 

How does Flink choose join strategies?

Ship and local strategies do not depend on each other and can be independently chosen. 
Therefore, Flink can execute a join of two data sets R and S in nine different ways by combining any of the three ship strategies (RR, BF with R being broadcasted, BF with S being broadcasted) with any of the three local strategies (SM, HH with R being build-side, HH with S being build-side).

Flink features a cost-based optimizer which automatically chooses the execution strategies for all operators including joins. 
Without going into the details of cost-based optimization, this is done by computing cost estimates for execution plans with different strategies and picking the plan with the least estimated costs. 
Thereby, the optimizer estimates the amount of data which is shipped over the the network and written to disk. 
If no reliable size estimates for the input data can be obtained, the optimizer falls back to robust default choices.

A key feature of the optimizer is to reason about existing data properties.

For example, if the data of one input is already partitioned in a suitable way, the generated candidate plans will not repartition this input. Hence, the choice of a RR ship strategy becomes more likely. The same applies for previously sorted data and the Sort-Merge-Join strategy. Flink programs can help the optimizer to reason about existing data properties by providing semantic information about user-defined functions [4].

While the optimizer is a killer feature of Flink, it can happen that a user knows better than the optimizer how to execute a specific join. Similar to relational database systems, Flink offers optimizer hints to tell the optimizer which join strategies to pick [5].

总的来说,Ship和local的策略可以分开选择; 
Flink的optimizer会自动选择策略,根据就是,optimizer会对每个策略进行cost estimates,选择cost相对较小的策略

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
745 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
445 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
6月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
772 9
Apache Flink:从实时数据分析到实时AI
|
6月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
696 0
|
5月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1834 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
6月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
679 6
|
6月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
576 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
11月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
1289 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
11月前
|
存储 大数据 数据处理
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
202 0

推荐镜像

更多