读Flink源码谈设计:Exactly Once

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 将Flink应用至生产已有一段时间,刚上生产的时候有幸排查过因数据倾斜引起的Checkpoint超时问题——当时简单的了解了相关机制,最近正好在读Flink源码,不如趁这个机会搞清楚。在这里,我们首先要搞清楚两种Exactly-Once的区别:- Exactly Once:在计算引擎内部,数据不丢失不重复。本质是通过Flink开启检查点进行Barrier对齐,即可做到。- End to End Exactly Once:这意味着从数据读取、引擎处理到写入外部存储的整个过程中,数据都是不丢失不重复的。这要求数据源可重放,写入端支持事务的恢复和回滚或幂等。

本文首发于泊浮目的语雀:https://www.yuque.com/17sing

版本 日期 备注
1.0 2022.2.2 文章首发
1.1 2022.2.14 更新3.4部分,增强注释部分
1.2 2022.2.27 更新3.6部分,删除部分对于1.14版本不适的描述
1.3 2022.3.8 fix typo

0.前言

将Flink应用至生产已有一段时间,刚上生产的时候有幸排查过因数据倾斜引起的Checkpoint超时问题——当时简单的了解了相关机制,最近正好在读Flink源码,不如趁这个机会搞清楚。

在这里,我们首先要搞清楚两种Exactly-Once的区别:

  • Exactly Once:在计算引擎内部,数据不丢失不重复。本质是通过Flink开启检查点进行Barrier对齐,即可做到。
  • End to End Exactly Once:这意味着从数据读取、引擎处理到写入外部存储的整个过程中,数据都是不丢失不重复的。这要求数据源可重放,写入端支持事务的恢复和回滚或幂等。

本文基于Flink 1.14代码进行分析。

1. 数据倾斜为什么会引起Checkpoint超时

做Checkpoint时算子会有一个barrier的对齐机制(为何一定要对齐后面会讲到)。以下图为例讲解对齐过程:

4207742-d36e179d0c4aeea0.png

当两条边下发barrier时,barrier1比barrier2先到达了算子,那么算子会将一条边输入的元素缓存起来,直到barrier2到了做Checkpoint以后才会下发元素。

每个算子对齐barrier后,会进行异步状态存储,然后下发barrier。每个算子做完Checkpoint时,会通知CheckpointCoordinator。当CheckpointCoordinator得知所有算子的Checkpoint都做完时,认为本次Checkpoint完成。

而在我们的应用程序中,有一个map算子接受了大量数据,导致barrier一直没有下发,最终整个Checkpoint超时。

2. Checkpoint的原理

其具体原理可以参考Flink团队的论文:Lightweight Asynchronous Snapshots for Distributed Dataflow。简单来说,早期流计算的容错方案都是周期性做全局状态的快照,但这有两个缺点:

  • 阻塞计算——做快照时是同步阻塞的。
  • 会将当前算子未处理以及正在处理的record一起做进快照,因此快照会变得特别大。

而Flink是基于Chandy-Lamport 算法来扩展的——该算法异步地执行快照,同时要求数据源可重放,但仍然会存储上游数据。而Flink的方案提出的方案在无环图中并不会存储数据。

在Flink中(无环有向图),会周期性的插入Barrier这个标记,告知下游算子开始做快照。这个算法基于以下前提:

  • 网络传输可靠,可以做到FIFO。这里会对算子进行blockedunblocked操作,如果一个算子是blocked,它会把从上游通道接收到的所有数据缓存起来,直接收到unblocked的信号才发送。
  • Task可以对它们的通道进行以下操作:block, unblock, send messages, broading messages
  • 对于Source节点来说,会被抽象成Nil输入通道。

3. Checkpoint的实现

在Flink中,做Checkpoint大致由以下几步组成:

  1. 可行性检查
  2. JobMaster通知Task触发检查点
  3. TaskExecutor执行检查点
  4. JobMaster确认检查点

接下来,让我们跟着源码来看一下里面的具体实现。

3.1 可行性检查

参考代码:CheckpointCoordinator#startTriggeringCheckpoint

  1. 确保作业不是处于关闭中或未启动的状态(见CheckpointPlanCalculator#calculateCheckpointPlan)。
  2. 生成新的CheckpointingID,并创建一个PendingCheckpoint——当所有Task都完成了Checkpoint,则会转换成一个CompletedCheckpoint。同时也会注册一个线程去关注是否有超时的情况,如果超时则会Abort当前的Checkpoint(见CheckpointPlanCalculator#createPendingCheckpoint)。
  3. 触发MasterHook。部分外部系统在触发检查点之前,需要做一些扩展逻辑,通过该实现MasterHook可以实现通知机制(见CheckpointPlanCalculator#snapshotMasterState)。
  4. 重复步骤1,没问题的话通知SourceStreamTask开始触发检查点(见CheckpointPlanCalculator#triggerCheckpointRequest)。

3.2 JobMaster通知Task触发检查点

CheckpointPlanCalculator#triggerCheckpointRequest中,会通过triggerTasks方法调用到Execution#triggerCheckpoint方法。Execution对应了一个Task实例,因此JobMaster可以通过里面的Slot引用找到其TaskManagerGateway,发送远程请求触发Checkpoint。

3.3 TaskManager执行检查点

TaskManager在代码中的体现为TaskExecutor。当JobMaster触发远程请求至TaskExecutor时,handle的方法为TaskExecutor#triggerCheckpoint,之后便会调用Task#triggerCheckpointBarrier来做:

  1. 做一些检查,比如Task是否是Running状态
  2. 触发Checkpoint:调用CheckpointableTask#triggerCheckpointAsync
  3. 执行检查点:CheckpointableTask#triggerCheckpointAsync。以StreamTask实现为例,这里会考虑上游已经Finish时如何触发下游Checkpoint的情况——通过塞入CheckpointBarrier来触发;如果任务没有结束,则调用StreamTask#triggerCheckpointAsyncInMailbox。最终都会走入SubtaskCheckpointCoordinator#checkpointState来触发Checkpoint。
  4. 算子保存快照:调用OperatorChain#broadcastEvent:保存OperatorState与KeyedState。
  5. 调用SubtaskCheckpointCoordinatorImpl#finishAndReportAsync,:异步的汇报当前快照已完成。

3.4 JobMaster确认检查点

|-- RpcCheckpointResponder
  \-- acknowledgeCheckpoint
|-- JobMaster
  \-- acknowledgeCheckpoint
|-- SchedulerBase
  \-- acknowledgeCheckpoint
|-- ExecutionGraphHandler
  \-- acknowledgeCheckpoint
|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage

在3.1中,我们提到过PendingCheckpoint。这里面维护了一些状来确保Task全部Ack、Master全部Ack。当确认完成后,CheckpointCoordinator将会通知所有的Checkpoint已经完成。

|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage
  \-- sendAcknowledgeMessages  //通知下游Checkpoint已经完成。如果Sink实现了TwoPhaseCommitSinkFunction,将会Commit;如果因为一些原因导致Commit没有成功,则会抛出一个FlinkRuntimeException,而pendingCommitTransactions中的将会继续保存失败的CheckpointId,当检查点恢复时将会重新执行。

3.5 检查点恢复

该部分代码较为简单,有兴趣的同学可以根据相关调用栈自行阅读代码。

|-- Task
  \-- run
  \-- doRun
|-- StreamTask
  \-- invoke
  \-- restoreInternal
  \-- restoreGates
|-- OperatorChain
  \-- initializeStateAndOpenOperators
|-- StreamOperator
  \-- initializeState
|-- StreamOperatorStateHandler
  \-- initializeOperatorState
|-- AbstractStreamOperator
  \-- initializeState
|-- StreamOperatorStateHandler
  \-- initializeOperatorState
|-- CheckpointedStreamOperator
  \-- initializeState #调用用户代码

3.6 End to End Exactly Once

端到端的精准一次实现其实是比较困难的——考虑一个Source对N个Sink的场景。故此Flink设计了相应的接口来保障端到端的精准一次,分别是:

  • TwoPhaseCommitSinkFunction:想做精准一次的Sink必须实现此接口。
  • CheckpointedFunction:Checkpoint被调用时的钩子。
  • CheckpointListener:顾名思义,当Checkpoint完成或失败时会通知此接口的实现者。

目前Source和Sink全部ExactlyOnce实现的只有Kafka——其上游支持断点读取,下游支持回滚or幂等。有兴趣的同学可以阅读该接口的相关实现。

可能有同学会好奇为什么JDBC Sink没有实现ExactlyOnce。本质和这个接口的执行方式无法兼容JDBC的事务使用方式——当一个算子意味退出时,是无法再对之前的事务进行操作的。因此TwoPhaseCommitSinkFunction中的retryCommit以及retryRollback是无法进行的——见https://github.com/apache/flink/pull/10847。JDBC的Sink是基于XA实现的,尽可能保证一致性。这里可能又有同学会问了为什么不用Upset类的语句,因为这个方式并不通用——对于Upset需要一个唯一键,不然性能极差。

4. 小结

本文以问题视角切入Checkpoint的原理与实现,并对相关源码做了简单的跟踪。其实代码的线路是比较清晰的,但涉及大量的类——有心的同学可能已经发现,这是单一职责原则的体现。TwoPhaseCommitSinkFunction中的实现也是典型的模版方法设计模式。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
557 0
读Flink源码谈设计:图的抽象与分层
|
6月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
582 1
|
6月前
|
流计算
Flink源码解析
Flink源码解析
96 0
|
资源调度 流计算
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(下)
150 1
|
3月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之源码 deploy,生成带有时间戳的jar包,如何修改配置信息
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
监控 Java 流计算
读Flink源码谈设计:Metric
前阵子笔者涉及了些许监控相关的开发工作,在开发过程中也碰到过些许问题,便翻读了Flink相关部分的代码,在读代码的过程中发现了一些好的设计,因此也是写成文章整理上来。
397 0
读Flink源码谈设计:Metric
|
监控 流计算
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)
Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)
88 1
|
6月前
|
存储 SQL API
读Flink源码谈设计:流批一体的实现与现状
在Dataflow相关的论文发表前,大家都往往认为需要两套API来实现流计算和批计算,典型的实现便是Lambda架构。
619 0
|
11月前
|
消息中间件 分布式计算 资源调度
深度解读flink kerberos认证(含流程图及源码)
深度解读flink kerberos认证(含流程图及源码)
136 0

相关产品

  • 实时计算 Flink版