Peeking into Apache Flink's Engine Room

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

Join Processing in Apache Flink

In this blog post, we cut through Apache Flink’s layered architecture and take a look at its internals with a focus on how it handles joins. Specifically, I will

  • show how easy it is to join data sets using Flink’s fluent APIs,
  • discuss basic distributed join strategies, Flink’s join implementations, and its memory management,
  • talk about Flink’s optimizer that automatically chooses join strategies,
  • show some performance numbers for joining data sets of different sizes, and finally
  • briefly discuss joining of co-located and pre-sorted data sets

这篇blog会从flink内部详细看看是如何处理join的,尤其,

如何用Flink API,简单的实现join 
讨论基本的join策略,flink join的实现和内存管理 
讨论flink的优化器,如何自动选择join策略 
显示不同数据size上的性能数据, 
最后简单讨论一下co-located和预排序的数据集的join问题

 

How do I join with Flink?

Flink provides fluent APIs in Java and Scala to write data flow programs.

复制代码
// define your data types
case class PageVisit(url: String, ip: String, userId: Long)
case class User(id: Long, name: String, email: String, country: String)

// get your data from somewhere
val visits: DataSet[PageVisit] = ...
val users: DataSet[User] = ...

// filter the users data set
val germanUsers = users.filter((u) => u.country.equals("de"))
// join data sets
val germanVisits: DataSet[(PageVisit, User)] =
      // equi-join condition (PageVisit.userId = User.id)
     visits.join(germanUsers).where("userId").equalTo("id")
复制代码

可以看到用flink api实现join还是比较简单的

 

How does Flink join my data?

Flink如何做join,分两个阶段,

Ship Strategy

Local Strategy

 

Ship Strategy

对于分布式数据,数据是散落在各个partition中的,要做join,首先要把相同join key的数据放到一起,这个过程称为Ship Strategy

Flink的Ship Strategy有两种,

Repartition-Repartition strategy (RR)

The Repartition-Repartition strategy partitions both inputs, R and S, on their join key attributes using the same partitioning function.

image

比如,R,S做join,我们就用一个相同partition函数,按join key,把R,S的所有分区,shuffle到3个local join上

这样做需要把full shuffle of both data sets over the network

 

Broadcast-Forward strategy (BF)

The Broadcast-Forward strategy sends one complete data set (R) to each parallel instance that holds a partition of the other data set (S)

image

S不动,把R广播到所有的join实例上;显然如果R足够小,这样做是很有效的

 

Flink’s Memory Management

对join的场景,很容易想到,在ship阶段,需要shuffle大量的数据,内存是否会OutOfMemoryException,或是否会发生full gc

Flink handles this challenge by actively managing its memory. 
When a worker node (TaskManager) is started, it allocates a fixed portion (70% by default) of the JVM’s heap memory that is available after initialization as 32KB byte arrays.

This design has several nice properties.

First, the number of data objects on the JVM heap is much lower resulting in less garbage collection pressure.

Second, objects on the heap have a certain space overhead and the binary representation is more compact. Especially data sets of many small elements benefit from that.

Third, an algorithm knows exactly when the input data exceeds its working memory and can react by writing some of its filled byte arrays to the worker’s local filesystem. After the content of a byte array is written to disk, it can be reused to process more data. Reading data back into memory is as simple as reading the binary data from the local filesystem.

The following figure illustrates Flink’s memory management.

image

Flink是主动管理JVM的heap内存的,会申请一组32KB的memory segments,给各种算法用

好处就是,这样减少GC的压力;而且数据是序列化成binary后存储,overhead很小;自己管理内存,内存不够的时候,可以知道并写磁盘文件

 

Local Strategies

After the data has been distributed across all parallel join instances using either a Repartition-Repartition or Broadcast-Forward ship strategy, each instance runs a local join algorithm to join the elements of its local partition.

Flink’s runtime features two common join strategies to perform these local joins:

  • the Sort-Merge-Join strategy (SM) and
  • the Hybrid-Hash-Join strategy (HH).

在ship strategy,我们已经把相同join key的数据,放到同一个local partition上,现在要做的只是run一个local join算法

Flink有两种local join算法,

The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase).

The sort is done in-memory if the local partition of a data set is small enough. 
Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key. The figure below shows how the Sort-Merge-Join strategy works.

image

这个算法,如其名,两个过程,sort,merge

首先是,sort,如果数据足够小,在内存中直接sort,这个比较简单 
如果数据比较大,需要用到外排算法,比如内存可以排序1G数据,而我有3G数据,那就每次只读1G,排序,存入一个文件,这样最终会产生3个已局部排序的文件,如图 
读的时候,同时打开3个文件,边读边merge就可以产生一个全局有序的stream

然后是,merge,对于两个已排序的inputs,做join很简单

 

The Hybrid-Hash-Join distinguishes its inputs as build-side and probe-side input and works in two phases, a build phase followed by a probe phase.

In the build phase, the algorithm reads the build-side input and inserts all data elements into an in-memory hash table indexed by their join key attributes. If the hash table outgrows the algorithm’s working memory, parts of the hash table (ranges of hash indexes) are written to the local filesystem. The build phase ends after the build-side input has been fully consumed.

In the probe phase, the algorithm reads the probe-side input and probes the hash table for each element using its join key attribute. If the element falls into a hash index range that was spilled to disk, the element is also written to disk. Otherwise, the element is immediately joined with all matching elements from the hash table. If the hash table completely fits into the working memory, the join is finished after the probe-side input has been fully consumed. Otherwise, the current hash table is dropped and a new hash table is built using spilled parts of the build-side input.

This hash table is probed by the corresponding parts of the spilled probe-side input. Eventually, all data is joined. Hybrid-Hash-Joins perform best if the hash table completely fits into the working memory because an arbitrarily large the probe-side input can be processed on-the-fly without materializing it. However even if build-side input does not fit into memory, the the Hybrid-Hash-Join has very nice properties. In this case, in-memory processing is partially preserved and only a fraction of the build-side and probe-side data needs to be written to and read from the local filesystem. The next figure illustrates how the Hybrid-Hash-Join works.

 

image

这个算法会把两个input,分成bulid input和probe input

其中build input会用于以join key来build一个hash table,如果这个hash table足够小,那么很简单 
当build完hash table后,我们只需要遍历probe input,如果落在hash table中,就做join 
这个方法,如果build input足够小,会非常高效,因为我们不需要在内存中存probe input,只需要读一条处理一条即可

但如果build input比较大,内存放不下整个hash table怎么办? 
也很简单,内存不够的时候,把部分hash table,以hash index range,存入磁盘 
这样当遍历probe input的时候,如果对应的hash table在磁盘,那么暂时把这部分probe input也存入磁盘

最后,当遍历完probe input后,内存中的hash table已经完成join,删掉,载入磁盘中的hash table完成最后的join

 

How does Flink choose join strategies?

Ship and local strategies do not depend on each other and can be independently chosen. 
Therefore, Flink can execute a join of two data sets R and S in nine different ways by combining any of the three ship strategies (RR, BF with R being broadcasted, BF with S being broadcasted) with any of the three local strategies (SM, HH with R being build-side, HH with S being build-side).

Flink features a cost-based optimizer which automatically chooses the execution strategies for all operators including joins. 
Without going into the details of cost-based optimization, this is done by computing cost estimates for execution plans with different strategies and picking the plan with the least estimated costs. 
Thereby, the optimizer estimates the amount of data which is shipped over the the network and written to disk. 
If no reliable size estimates for the input data can be obtained, the optimizer falls back to robust default choices.

A key feature of the optimizer is to reason about existing data properties.

For example, if the data of one input is already partitioned in a suitable way, the generated candidate plans will not repartition this input. Hence, the choice of a RR ship strategy becomes more likely. The same applies for previously sorted data and the Sort-Merge-Join strategy. Flink programs can help the optimizer to reason about existing data properties by providing semantic information about user-defined functions [4].

While the optimizer is a killer feature of Flink, it can happen that a user knows better than the optimizer how to execute a specific join. Similar to relational database systems, Flink offers optimizer hints to tell the optimizer which join strategies to pick [5].

总的来说,Ship和local的策略可以分开选择; 
Flink的optimizer会自动选择策略,根据就是,optimizer会对每个策略进行cost estimates,选择cost相对较小的策略

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
30498
分享
相关文章
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
363 33
The Past, Present and Future of Apache Flink
|
3月前
|
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
979 13
Apache Flink 2.0-preview released
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
155 3
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
350 2
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
464 31
Apache Flink 流批融合技术介绍
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
109 1
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
303 0
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
110 0
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
99 0

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等