Kafka 以消息存储系统在业界闻名,近几年来 Confluent 公司对 on Kafka 流式计算场景又先后推出了 Kafka Streams(流计算)、ksqlDB(基于 Kafka Streams 的类分析型 DB 系统)。笔者对发表在 SIGMOD 2021 上的论文《Consistency and Completeness: Rethinking Distributed Stream Processing in Apache Kafka》做一些总结,梳理 Kafka Streams 在流处理场景上的设计思路。
背景
streaming processing 场景之所以复杂,是因为要在性能、正确性、代价上取平衡。单独解决其中一个问题可以做到简化:
- 正确性:将 streaming processing 退化为 batch processing。
- 性能:全部基于内存实现流式框架,在 failover 场景下会丢失一定的数据正确性。
开篇点题,论文要解决的两个问就体现在题目上:
- 一致性(consistency):保证流处理应用可以从异常恢复到一致的状态,最终计算结果不丢失数据也不产生重复数据。图 1 为例,流应用在步骤 1.b 做异常恢复后,由于重新处理事件 11(时间戳)导致 state 两次更新(步骤 1.c 中出现结果不一致)。
- 完整性(completeness):保证流处理应用不会产生不完整(partial)结果,尤其是在数据迟到、时间乱序的情况。图 1 为例,流应用在步骤 1.d 展示了事件 12(时间戳)迟到的情况,意味着之前的处理结果不完整。
借助 Kafka Log 存储模型在 append/replay 上按序、事务能力,Kafka Streams 将 state 更新、数据 shuffle 中间存储都维护在 Kafka。使得流处理在一致性、完整性上的设计得到很大简化,思路就是分而治之:
- 一致性:幂等、事务写。
- 完整性:计算结果修正。
挑战
错误容忍:一致性
Exactly-Once 处理语义要覆盖两部分:结果输出、带状态算子的内部状态更新。
大规模部署下有多种场景的挑战:
- 存储引擎故障恢复:内存中数据会丢失。
- 流处理 processor 失败:计算进程崩溃导致检查点未持久化;失联的僵尸实例(instance)还在继续处理 partition 数据导致结果重复。
- RPC 失败:例如网络超时的重试,可能引发数据重复。
为了对齐流处理 pipeline 的多个 state 检查点(checkpoint),会在数据流中注入标记(marker)作为同步的屏障(barrier),checkpoint 的完成取决于:要持久化的数据量;应用中标记流动的速率(例如会收到反压影响)。单单 checkpoint 机制完成 Exactly-Once 是吃力的,例如 output commit 问题:计算结果可能在异常前已经发出(emit)了,等异常恢复后,state 回滚到前一个检查点,对输入数据做重新处理会导致输出的数据重复。
乱序处理:完整性
数据完整性,一般定义为:当输入数据的 t+1 时间戳到来是,系统推断输出结果中与 t 时间戳以前数据相关的结果已经完整。
基于检查点的方法一般会阻塞结果发出直到指定时间点以前的数据都已经完成。因此需要等待一个外部信号来指示什么时候数据完整。
另一种是基于 mirco-batching 方法,将无界的流拆分为多个批次的有界数据,每个批次等待数据完整(牺牲延迟)的时候处理,同步触发计算与结果发出。
系统设计
DSL 与拓扑
Kafka Streams 应用通过若干拓扑(topology)定义计算逻辑。拓扑包括:
- 点:流处理 processor,例如map、filter、join、aggregate等算子;source/sink processor是特例,表示拓扑的起止。
- 边:连接两个流处理器的数据流,stream。
例如图 2,Kafka Streams 将应用逻辑翻译成拓扑。拓扑可以再分为多个子拓扑(sub-topology)。划分依据是:组成子拓扑的多个连续算子之间不需要数据 shuffle(不涉及修改 partitioning key),因此不涉及网络传输(子拓扑内的多个算子运行更高效)。
水平扩展的存储能力,使得 Kafka streams 应用大大缓解了反压影响。当下游子拓扑处理比上游慢时,堆积的数据可以通过 Kafka topic 做缓冲。
子拓扑内的带状态算子是需要和 state store 打交道的。默认情况下,写出的 state 的信息也会写一份到 Kafka changelog topic。这种设计是 Kafka 的特色,大量引入 inner topic,包括:changelog topic(state 变更,开启 compaction 压缩),transaction-log topic(事务日志),seq-id topic(broker 用于 Exactly-Once 写入)等。
数据并行并行机制
Kafka Stream 的并行模型中,最小执行单元定义为 task。每个 task 包含一个完整子拓扑的所有处理器,因此每个 task 所执行的代码完全一样,只是各自处理的数据集不重叠且互补关系。一个子拓扑会在 1 或多个 task 中运行,一个 task 对应一个源 topic 的 partition。一个实例可以运行多个 task。
带状态算子的 task 迁移实例时,新实例上会重放一遍 changelog topic 里的状态变化。Kafka Streams 会尽量达到多实例之间的负载均衡,同时又满足调度亲和性。以达到跨实例建做 re-balance 时尽量少做状态迁移。
同样的,Kafka Streams 优化器也会对 DSL 生成拓扑做优化。例如图 3 的场景,changelog topic 和 sink topic 可以在一个操作内完成,因为从聚合操作结束到发出数据,中间没有更多的转换操作。
Exactly-Once 设计
论文中将失败时 state 管理、output commit 与下面问题等价来看:
将哪些输入数据视为已处理完成的,那些输出数据视为对下游 processor 可见的?怎么提供幂等、事务的 append 写入能力?
partition 级别的幂等写
producer 的唯一 id 定义为 transaction-id,配置一个递增的 epoch 来识别 producer 的多段生命周期(进程重启时发生变化)。
producer 发送的每条数据记录,都关联一个递增的序列号(seq-id),seq-id 会持久化到 Kafka 内部 topic 中,在 broker 做 failover 时被使用到。
seq-id 与 transaction-id 联合起来作为 prodcuer 生命周期内 Kafka broker 数据去重的依据。
在 Kafka broker 内实现一个 transaction coordinator 管理 trasnaction-id、epoch、seq-id 等元数据信息。这些最新信息(事务状态、关联的 topic partition)被写入 trasnaction-log topic。一个 producer 以确定性的 transaction-id 被哈希映射到 trasnaction-log topic 的唯一 patition。
跨 partition 的事务写
Exactly-Once 语义具体来看,要做到以下几项操作原子完成:
- 输出结果记录到 sink topic,并且对于下游 conumser 可见。
- 计算依赖的 state store 更新。
- 输入数据 topic 的消费位置(offset)移动并完成 commit。
怎么将上述写多个 Kafka topic 变成一次事务?
传统的二阶段提交协议,需要写两次,一次 log,另一次数据。Kafka 实现的事务协议只写一次log,利用追加的 offset 递增特性,可以将需要终止处理的数据对下游消费者不可见(未 commit 就是隐藏)。
如图 4 完整演示了一次事务操作:
- 4.b 步骤是注册 producer 到 transaction Coordinator。transaction coordinator 会对指定 transaction-id 注册上来的 producer 做 epoch 递增。考虑到使用相同 transaction-id 的僵尸实例情况,较老 epoch 的 producer 写入数据会忽略。注册在得到 transaction coordinator 回复后,producer 完成了事务的初始化操作。
- 紧接着,两阶段事务提交,一个 producer 同时只能有一个进行中的事务。producer 在发送数据之前,先在 transaction Coordinator 上注册 pattition(步骤 4.c),完成注册后,producer 可以准备发送数据到 patition。
- 提交事务
- 首先prodcuer flush 所有写请求,等待 kafka broker 的回复收到。
- producer 发送另一个请求到 transaction coordinator 来初始化提交操作,通过二阶段提交:
- 第一阶段:coordinator 更新事务状态为 PrepareCommit,记录状态到 transaction-log topic。这个更新可以认为是事务的同步屏障(barrier),状态更新写入 transaction log后,事务就不会停止了(failover 时 PreapreCommit 状态会做状态恢复并继续事务)。
- 第二阶段:异步地将提交事务的标记写入事务所注册的多个 partition。步骤 4.f 在所有事务标记都被 partition leader ack 以后,coordinator 更新事务状态为 CompleteCommit,之后允许 producer 开启下一个新的事务。
如何终止一个进行中的事务?
- 首先将事务状态更新为 PrepareAbort,事务终止的标记会被写入事务注册的多个 partition,表明通过 transaction-id 写入的在该标记以前的数据都被终止且对下游消费者不可见。
- 当所有事务标记 partition leader ack 后,事务终止并更新事务状态为 CompleteAbort。
与写相配合的是读的部分:
- 下游消费者被配置只读取上游 commited 数据。
- task 的 commited offset 只在事务提交以后才会整整更新到到源 topic 上。
性能影响
事务的代价与 partition 数相关度高,因为事务标记被写到各个 partition,随着 partition 数目增加,事务的成本也变高。
实现上影响事务提交吞吐、延时的主要因素是提交间隔(commit interval)。增加提交间隔,可以提升吞吐,代价是增加一些延时。论文提到一些优化,例如当多个 task 被分配到一个实例上,多次 patitinon 操作可以在一次注册操作重完成,相关写处理也可以被合并为一个进行中的事务。
图 5 是论文给出的性能对比:Exactly-Once 语义比 At-Least-Once 语义吞吐能力上下降 10%-20%。
结果修正设计
首先,Kafka 在存储上有 stream/table 的二象性。stream 和 table 二象性通过互转得到体现:
- table as streamtable 可以看作是某个时刻的 snapshot,记录 stream 中每个 key 在当前的最新值。遍历 table 中的每个 key-value 就可以得到一个 stream。
- stream as tablestream 是追加写入的 changelog,对 stream 从头到尾重放可以构建出一个 table。另外,对 stream 做聚合操作也会得到一个 table。
在乱序数据场景下,对 operator 类型有一个区分:
- 无状态 operator:例如 filter/mapValue,不需要引入重排(造成业务延迟的原因),在接收、处理新数据记录时可以直接发出(emit)处理结果。
- 有状态 operator:例如 join/aggergation,依赖前一批接收到的输入数据来确定当前的处理结果,是顺序敏感的。
在 Kafka Streams 中,可以对时序敏感的运算符设置参数(算子级 grace period,不是系统级的全局 watermark 参数),迟到的数据如果延迟在参数阈值以内,还是会被接收、处理。同时 Kafka Streams 检查每个运算符的输出类型,决定是否可以发出(emit)推测性(speculatively)的结果:
- 下游是 append-only 类型的 stream,系统就需要累积数据并直到数据完整后再发出。
- 下游是 table 类型,系统可以即时发出推测性结果,即使在后续收到迟到数据(但满足 grace period 要求),下游算子可以把上游算子输出的 changelog stream 作为输入,基于该输入实现修正处理状态(撤回旧的处理结果,并重新累积新的计算结果)。
在 DSL 机制中,用户需要为累积(accumulations)和撤回(retractions)提供对应的实现,知道怎样修正结果,例如 join 派生成新的 table,那么修正会从当前子拓扑往后传播。
图 6 展示了 Kafka Streams 怎样处理迟到数据。窗口聚合周期 5 秒,grace peroid 10 秒:
- 乱序事件 14(时间戳)在 6.b 到达,因此 6.c 中旧窗口 [10,15) 结果被更新,修改的结果 w10:2 以 changelog 形式被发出。
- 6.d 在 grace period 间隔达到后,[10,15) 窗口结果被回收(旧 state GC),对应该时间戳窗口内的事件也不再被处理。
性能上,如果每次修正都直接穿透到下游,会产生高网络、cpu消耗(对应大量的累积、撤回)。Kafka Streams 的应对是,DSL 上设计了压制(suppress),可以缓冲一批修正操作再发出到下游,尤其是对于相同 key 的缓冲,多条记录会被压缩到 1 个记录。
总结与展望
在生产应用时,可以配置事务提交间隔、算子级别的延时容忍参数,适配不同业务对于性能、正确性的取舍要求。论文举了两个生产部署的例子:
- Bloomberg Real-time Pricing Platform:不同业务复杂度有差别,性能分布在 10K-25K EPS左右,Exactly-Once 性能损失在 6%-10% 左右。
- Expedia Real-time Conversation Platform:简单场景下 commit interval 100ms。对于复杂的对话-视图聚合服务,commit interval 配置为 1500ms,并且开启压制以降低磁盘、网络 IO 开销。
论文展望部分主要特性提到:
- 降端到端的延时,应对故障场景,使用 cascading rollback 算法处理未提交的输入数据。
- 跨多个 Kafka Streams 应用支持一致的状态查询服务。
- 基于对齐的事务边界支持实时控制策略,包括:auto scaling、应用程序更新、配置更新。
最后是一点个人感受:
- 整篇文章可读性很好,对于背景、问题、工程方法都讲得非常清晰,毕竟系统也已经生产应用好几年了。
- 一致性、完整性设计大量工作由 Kafka 存储层支持,流计算相当于存储之上的一层堆叠,分层架构简化了 Kafka 用户的流处理技术栈。
- 性能(开销、延时)上应该不是它的强项,对比 Flink 在 statestore、优化器等一些优化工作,Kafka streams 的真正大规模应用验证还是太少。
- 从消息队列一体化来看,好几家都在跟进(存储层 + 处理层 + 分析层),Confluent 开始得比较早。