Trisk:在 Flink 上实现以 task 为中心的流处理动态 Reconfiguration 的 Control Plane

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 新加坡国立大学计算机系博士毛言粲在 FFA 2021 的分享

摘要:本文整理自新加坡国立大学计算机系博士在读生毛言粲在 Flink Forward Asia 2021 核心技术专场的分享。主要内容包括:

  1. 背景:流作业动态调控
  2. 挑战:兼顾普适、高效和易用
  3. 设计:以 Task 为中心的系统设计
  4. 实现:基于 Flink 的 Barrier 机制
  5. 评估:Trisk 与已有系统的性能对比

FFA 2021 直播回放 & 演讲 PDF 下载

一、背景:流作业动态调控

img

流数据处理是非常重要的一种数据处理方式,它在各个领域都有广泛的应用,比如机器学习、数据分析和实时事件处理以及实时交易等领域。流处理拥有低延迟和高吞吐量的特性,它被大规模部署成以流任务实例 stream task 构成的流作业 stream job 并行处理输入的数据流,流作业会部署成并行的流任务,这些流任务实例被中间流连接,并形成一个有向无环图。

流数据的并行处理是通过将输入数据在并行任务之间进行分区,然后每个任务独立处理分配的分区任务实时实现的,因为流作业是长期执行且会随着时间抖动,而不同的流作业有不同的性能需求,比如实时交易任务对延迟很敏感,而一些数据分析的任务对吞吐量要求很高。为了达到不同流处理作业的性能要求,动态重配置流任务的技术很关键。

img

常见的数据抖动有如下几类:

  • 第一,输入速率的变化。流作业是长期执行,而数据流的输入速率会不可预测地发生动态变化,导致静态分配的资源无法低延迟高吞吐地处理数据流;
  • 第二,数据倾斜。流数据的数据分布会动态变化,比如某个数据的出现频率增大会导致对应 stream task 的工作负载变大,延迟变大;
  • 第三,新兴事件的产生。流数据中可能会出现新兴事件或者数据,这种数据无法被当前执行逻辑正确地执行。比如新型诈骗交易需要通过新的规则才能检测到。

img

针对不同的数据抖动,有不同类型的重配置技术来优化流作业,从而在保证资源利用率的同时,以高吞吐低延时的性能来处理流数据。

  • 针对输入数列的变化,可以通过 scaling 的方式动态伸缩资源,提高吞吐量、降低延迟;
  • 针对数据倾斜,可以通过 load balancing 的方式来重新分布并行执行流任务之间的工作负载,以重新达到负载均衡;
  • 对于新兴事件的处理,可以通过 change of logic 的方式来更新流任务的执行逻辑,从而可以正确地处理新兴事件和数据。

img

有了不同类型的数据抖动和重配置技术后,需要考虑下一个问题就是如何动态地检测数据抖动,并选择合适的方法来调控流任务。为了解决这个的问题,通常是通过设计一个控制器来对任务进行动态重配置,控制器主要通过实时监听流作业分析症状,然后针对不同的症状修改不同的流作业配置,来做性能优化。

这个过程分为三步:监听、诊断、重配置。

  1. 首先控制器可以实时监听流任务,目前流作业的控制器主要通过监听系统层面的metrics比如CPU utilization,或应用层面的metrics比如端到端的数据处理延迟、吞吐量积压等,来进行建模分析和策略判断;
  2. 然后控制器通过控制策略来诊断症状,控制策略可以通过预定义的规则,比如CPU利用率高于一定阈值就执行scaling out,或进行模型分析,比如预测需要达到的资源分配来诊断存在的问题;
  3. 最后控制器选择不同类型的重配置方法去动态优化流作业。

img

为了减少为不同流作业实现控制器的工程开销,需要有一个控制平台来对流作业进行托管。控制平台封装了 metrics 和重配置方法,并且对外提供相应的 API,从而开发者可以在流作业部署好之后通过在控制平台提交控制器,对流作业进行托管。这样的控制器也包含了自定义的控制策略,并且可以直接使用控制平台的 API 实现 metrics 的采集和重配置,隐藏了系统底层的处理逻辑,简化了控制器的设计和开发。

大部分流处理系统都封装了比较成熟的 metrics 系统,因此控制平台可以基于原有系统 API 实现 metrics 的采集,然而动态重配置的支持仍是一个较大的挑战。

二、挑战:兼顾普适、高效和易用

img

动态重配置的控制平台应当具有三种性质:

  • 普适性,不同类型的控制策略需要使用不同类型的重配置方法;
  • 高效性,重配置的执行应在短时间内完成,并且尽量不阻塞原数据处理;
  • 易用性,API 应简单易用,用户调用时无需知道系统底层逻辑。

img

但是目前已有的解决方案只能满足上述部分性质,比如 Flink 支持动态地对流作业进行重配置,并提供了简单易用的 online interface 为用户实现控制器流作业的动态重配置。通过修改源代码和重新提交流作业的方式,Flink 的原生支持具有很强的普适性和应用性。然而重新部署也会带来很大的开销。比如资源重分配和全局的状态恢复。

Flink 重配置的具体执行流程如下:首先 JobManager 会触发一个 Savepoint 到整个流作业的 pipeline 上,Savepoint 完成之后,当前流作业的 global snapshots 将会返回到 JobManager 中,JobManager 在收到所有的 snapshots 后,终止当前的 pipeline,然后以新的配置重新部署流作业,并从当前的 Savepoint 恢复状态重新开始。

三、设计:以 Task 为中心的系统设计

为了满足重配置的三种性质,我们将介绍 Trisk:以 Task 为中心的流作业控制平台。

img

上图是 Trisk 的系统架构,它支持对流处理的重配置进行定义和实现,提供了以 Task 为中心的配置抽象,这个抽象包含了当前流作业三个维度的执行配置,并且基于抽象封装了原子操作,使得配置方法可以通过在抽象上组合原子操作来定义。为了提高效率,不同于 Flink 本身提供的宗旨和重启机制,Trisk 采用了部分暂停和恢复的技术来执行重配置,并且它的封装可以进一步利用 Flink 系统中的 Checkpoint 机制来实现一致性。同时 Trisk 提供了易于使用的编程 API,有预先定义好的常用重配置 API,还将原子操作封装为 API 来让用户自定义重配置。

Trisk的工作流程如下:

Trisk runtime 维护了 restful API,用户可以通过接口提交控制逻辑代码。接着由 Trisk runtime 编译代码并生成对应的控制策略,它会根据当前流作业的 metrics 做诊断和重配置决策。控制策略诊断到当前运行的流作业的数据抖动后,会通过与 Trisk runtime 交互来对流作业进行重配置。

其过程如下:首先控制策略会从 Trisk runtime 中获取一个 Trisk 配置抽象,用来获取当前流作业每个 task 的配置情况,然后会根据诊断结果使用不同类型的原子操作来对 Trisk 抽象进行更新。比如,如果判断出了输入速率增高的问题,控制策略将会通过分配更多的资源来部署新的 task,并且重新分配 task 之间的工作量,来增加流作业的吞吐量。最后控制策略会通过把更新好的 Trisk 抽象送回到 Trisk runtime 中,Trisk runtime 根据更新好的配置对流作业执行重配置优化 。

Trisk 重配置的执行是通过与底层的流系统进行交互来实现的,采用了部分暂停与恢复的方法来实现工作流程,因此可以避免终止整个流作业的情况下保持一致性,并且只会对部分 task 进行更新来降低时间开销。整个过程可以分为三步:prepare-sync-update。

其流程如下:prepare 阶段,流系统基于更新后的 Trisk 抽象,找出被更新的受影响的 task,并准备这些 task 更新后的实际配置;sync 阶段,为保证数据一致性,执行期间需全局同步流作业并暂停受影响的 task,不受影响的 task 可以继续执行。这里通过 Flink 的 checkpoint barrier 机制实现这个同步过程;update 阶段,受影响的 task 将被独立更新,并在更新完成后继续执行。

Trisk的三维抽象源自于流任务的三个步骤:

  • 第一步:流作业提交到流系统时,会被封装成一个 Logical Graph 里面包含了流任务的执行逻辑,其中顶点 operator 里包含了 User Defined Function,边表示 operator 之间的中间数据流,每个 operator 会使用 UDF 来处理输入的数据流,并生成输出流,流入后面的 operator。
  • 第二步:Logical Graph 的每一个 operator 会并行运行一定数量的 stream task,且输入数据流会被分配到不同的 stream task 并行执行。每个 stream task 分配到的输入数据流被称为该 task 的工作负载配置。
  • 第三步,这些并行的 stream task 会被部署到服务器中物理执行,每个 stream task 都会在一台机器上分配到一定的资源比如 CPU 和内存,这样的资源分配描述了 stream task 的 resource 配置。

img

因此 Trisk 的三维抽象就是包含了以 task 为中心的 execution logic,workload、resources 配置,最终形成了一个有向无环图,存放在了 Trisk runtime 中。

我们对抽象中的每一个维度的更新都封装了原子操作,通过对三维抽象中每一个维度执行原子操作,可以细粒度地重配置流作业,从而满足重配置的普适性。比如 scaling 可以通过分配 resources 来重配置新的 task,并重分配并行任务之间的 workload 来实现。

img

上图展示了一个 scaling out 的例子,由于输入速率的不均匀上升导致 task2 的负载增大、延迟上升,且 task3 的利用率也很高,因此我们需要通过执行 scaling out 来分配一个新的执行任务 task5 并转移一部分 task2 的 workload 到 task5 上,来让当前流作业能继续低延迟高吞吐地处理输入流数据。

Trisk 提供了常用的重配置 API,对应着我们之前提到的三种重配置方法:scaling、load balancing、change of logic,用户可以使用提供的 API 在 Trisk 上实现控制策略。这些控制策略可以编译为运行在 Trisk runtime 上的线程来动态管理流作业。

img

上图例子显示了一个可以实现在流作业动态负载均衡的控制策略 load balance 的实现。它通过每秒检测 task 的工作负载,比如监听每个 task 的处理数据量的分布,并在 task 间的分布发生变化时重新分配 task 的 workload 来实现负载均衡。

同时用户也可以通过基于三维抽象的原子操作来定义新的重配置方法。我们将三种原子操作封装成了 assignLogic、assignWorkload、assignedResource 三个 API。

img

上图展示了 scaling 重配置方法基于对抽象执行原子操作的代码实现。通过 assignResource 来为新创建的任务分配资源,然后通过 assignWorkload 重新分配并行任务之间的工作负载来实现。

四、实现:基于 Flink 的 Barrier 机制

img

Trisk 控制平台是单独运行的一个后台服务,它封装提供了重配置 API。在 Flink 系统层中,也加入了一些新的组件来和 Trisk runtime 交互,并且高效执行对流作业的重配置。在 runtime 层中,controller 保存了用户自定义的控制策略和重配置方法。StreamManager 是 Trisk 的核心,它为用户提供了 API 并且维护了 web service 来接收新的 controller。在系统层中,JobReconfigCoordinator 维护 Trisk 抽象到 Flink 物理配置的映射,并协调执行重配置来保证流作业在重配置前后的数据一致性。

每个 StreamTask 会维护一个 TaskConfigManager,它会管理并更新对应 StreamTask 中的配置,来实现重配置。

img

Flink 内部的组件架构如上图。JobReconfigCoordinator 存在于 Flink 的 JobManager 中,并且在每个 StreamTask 上都维护了一个 TaskConfigManager。JobReconfigCoordinator 和 TaskConfigManager 可以通过 Flink 网络层进行远程交互,实现控制逻辑。

img

上图展示了重配置在 Flink 上的执行总览。

在 prepare 阶段,Coordinator 会收到 Trisk runtime 层分析好的抽象并准备好 StreamTask 的新配置。比如对 scaling 分配资源是通过获取一个新的 resource slot 实现,重分配 workload 是通过更新上游 task 的 result partition 和下游 task input gate 来实现的。对于 stateful 的 task 来,重分配 workload 还需要更新 task state backend。

在 synchronize 阶段,Coordinator 会利用 Flink 原有的 checkpoint barrier 机制,对受影响的 task 进行同步和暂停从而保证数据的一致性,其过程主要是通过从 source task 开始向整个 pipeline 发送 barrier,受影响的 StreamTask 会在接收到 barrier 之后暂停并等待来自 Coordinator 的更新指令。

同步完成后进入 update 阶段,Coordinator 会通知所有受影响的 task 去并行执行 update 来更新自己的配置。StreamTask 在更新完自己的配置后会自动恢复执行,并与上下游重新连接。

img

具体的实现细节有如下几项:

首先,对 Trisk abstraction 内部的配置和 Flink 的 JobGraph、ExecutionGraph 做了映射,因此 prepare 阶段中 Coordinator 会去更新对应的 JobGraph 和 ExecutionGraph,然后通过 Flink 的 barrier 机制实现了重配置执行中的同步来保证数据一致性。

其次,每个 task 的原子操作都尽量利用 Flink 原有的机制对 StreamTask 进行动态修改。比如 assignWorkload 是通过重新初始化一个 state backend 再重新更新上游 task 的 result partition 和当前 task 的 input gate 实现的。

重配置的具体执行流程分为以下几步:

首先在 prepare 阶段,JobReconfigCoordinator 会更新 JobGraph 和 ExecutionGraph。然后根据更新情况标记受影响的 StreamTask。prepare 完成后,Coordinator 利用 barrier 机制实现整个 pipeline 的同步,从 source task 通过 inject barrier 发送到整个 pipeline。受影响的 task 收到所有上游 task 的 barrier 后会暂停并 ack 到 Coordinator 中,再向下游 task 发送 barrier。下游 task 收到 barrier 之后也执行类似的操作,受影响的 task 暂停并 ack,而不受影响的继续保持执行。所有 task 都 ack 到了 Coordinator 之后,同步结束。

接下来进入 update 阶段,在 update 阶段,Coordinator 会通知 TaskConfigManager 去更新 StreamTask 的配置,更新完成后与上下游重新连接,并继续执行。

至此,重配置流程结束。

五、评估:Trisk 与已有系统的性能对比

我们进行了小规模实验,主要围绕以下两点目标:

  • 第一, 在 Trisk 上实现的控制器总体效果如何,是否能满足控制器的优化目标比如延迟控制?
  • 第二, 对比已有的重配置执行技术,如 Flink 原生支持和前沿的 Megaphone 机制,Trisk 的执行效率如何?

img

实验环境如下:我们将 Trisk 实现在 Flink-1.10.0 上,并配置了 4 个节点的 Flink standalone cluster,每个节点配置了 8 个 slots。我们使用了一个真实应用 stock-exchange 和一个合成应用 word-count 来实现。stock-exchange 是一个实时的股票交易任务,需要实时处理股票交易订单,来避免对用户的交易决策造成影响。word-count 是一个常用于数据分析中的操作,我们主要对输入流的每一个 key 进行 count。

img

我们在 stock-exchange 上实现了一个简单但具有代表性的 latency-aware 控制器。最初 stock-exchange 作业部署了 10 个任务,输入流是股票申报订单,输入曲线如左图所示。控制器可以通过使用 scaling 和 load balancing 来控制作业的延迟,主要根据输入速率和工作负载来作出决策。比如在第一百秒的时候,因为输入速率增大,所以做 scaling out;而在第四百秒的时候,因为输入速率降低,所以会做 scaling in。

在 Trisk 和 Flink 上实现的控制器都需要大概 100 行代码,主要包含了控制策略的逻辑。

实验结果如右图所示。为了展示控制器的优化效果,我们主要对比了 Trisk/Flink 的原生支持/静态配置下的 stock-exchange 作业的延时变化情况。红线是静态配置的 stock-exchange 作业,绿线是 Flink 上的控制器对流作业的优化效果,蓝线则是 Trisk 对 stock-exchange 的优化效果。

红线结果表明,虽然静态配置在开始时运行良好,但因为输入速率的增加,它无法实时处理 100 秒过后的数据,导致延迟增加了两个数量级。相比之下,使用 Flink 原生配置实现的控制器,能够适应工作负载的变化,但是在执行重配置期间会导致高延迟峰值,大概比平时的延迟高出 1~2 个数量级。而 Trisk 上做出决策的控制器展示了毫秒级的重配置完成时间,且只有可以忽略不计的延迟增量。这主要归功于 Trisk 的部分暂停与恢复技术。

img

再将 Trisk 重配置执行期间的运行效果与两个现有的方法进行比较,一个是 Flink 的终止和重启机制,以及 megaphone 提出了 fluid state migration 机制,可以在 key 层面对重配置进行同步和更新。实验中我们对 word-count 使用了 load balancing,初始配置有 20 个任务,并在第 50 秒时触发 load balancing。整个过程会重分配所有并行任务之间的 workload。

为了了解他们的行为,我们比较了执行重配置时的延迟和吞吐量。

从延迟图可以看出,Trisk 比 Flink 重配置带来的延迟低,而与 megaphone 相比,Trisk 具有最短的完成时间,但峰值延迟相对较高。从吞吐量图中可以看出,在重配置过程中 Trisk 的吞吐量下降了,但恢复得比 Flink 快。对于 megaphone 来说,fluid state migration 需要更长的时间来完成重配置,但在重配置阶段会有更低的峰值延迟和更高的吞吐量。

总的来说,我们提出了 Trisk:以 Task 为中心的控制平台,可以普适、高效、和易用地支持重配置方法。在未来的工作中,我们也将继续探索在 Trisk 上实现更多样的控制策略,来更好地利用 Trisk 上的重配置方法。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
关系型数据库 MySQL API
实时计算 Flink版产品使用合集之可以通过mysql-cdc动态监听MySQL数据库的数据变动吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
206 0
|
7月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之oracle-cdc如何进行动态加表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
监控 大数据 Java
使用Apache Flink进行大数据实时流处理
Apache Flink是开源流处理框架,擅长低延迟、高吞吐量实时数据流处理。本文深入解析Flink的核心概念、架构(包括客户端、作业管理器、任务管理器和数据源/接收器)和事件时间、窗口、状态管理等特性。通过实战代码展示Flink在词频统计中的应用,讨论其实战挑战与优化。Flink作为大数据处理的关键组件,将持续影响实时处理领域。
1097 5
|
8月前
|
消息中间件 资源调度 Java
实时计算 Flink版产品使用合集之部署yarn模式,怎么实现峰谷动态并行度扩容缩容
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
资源调度 关系型数据库 数据库
实时计算 Flink版产品使用合集之flink-cdc.sh xx.yaml提交到yarn 发现没有启动task manager的,怎么处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 资源调度 Java
Flink问题之动态配置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
196 1
|
5月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
69 0
|
5月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
112 0
|
5月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
386 0
|
5月前
|
消息中间件 大数据 Kafka
Apache Flink 大揭秘:征服大数据实时流处理的神奇魔法,等你来解锁!
【8月更文挑战第5天】Apache Flink 是一款强大的开源大数据处理框架,专长于实时流处理。本教程通过两个示例引导你入门:一是计算数据流中元素的平均值;二是从 Kafka 中读取数据并实时处理。首先确保已安装配置好 Flink 和 Kafka 环境。第一个 Java 示例展示了如何创建流执行环境,生成数据流,利用 `flatMap` 转换数据,并使用 `keyBy` 和 `sum` 计算平均值。第二个示例则演示了如何设置 Kafka 消费者属性,并从 Kafka 主题读取数据。这两个示例为你提供了使用 Flink 进行实时流处理的基础。随着进一步学习,你将能应对更复杂的实时数据挑战。
98 0

相关产品

  • 实时计算 Flink版