一文读懂 kafka 的事务机制 1

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 一文读懂 kafka 的事务机制

1 前言

大家好,我是明哥!

KAFKA 作为开源分布式事件流平台,在大数据和微服务领域都有着广泛的应用场景,是实时流处理场景下消息队列事实上的标准。用一句话概括,KAFKA 是实时数仓的基石,是事件驱动架构的灵魂。

但是一些技术小伙伴,尤其是一些很早就开始使用 KAFKA 的技术小伙伴们,对 KAFKA 的发展趋势和一些新特性,并不太熟悉,在使用过程中也踩了不少坑。

有鉴于此,我们会通过一系列 KAFKA 相关博文,专门讲述 KAFKA 的这些新特性。

本文是该系列文章之一,讲述 KAFAK 的事务机制。

michaelli:一文读懂kafka的幂等生产者7 赞同 · 3 评论文章

以下是正文。

2 技术大背景-大数据发展趋势

在前期的一篇博文中,我们讲述过大数据的发展趋势之一,就是大数据与数据库日益融合的趋势。

michaelli:从技术视角看大数据行业的发展趋势3 赞同 · 0 评论文章

  • 早期大数据粗放式发展时,为了快速推向市场,丢失了很多传统数据库领域里良好的一些特性,(如事务ACID,如记录级别的增删改,如秒级甚至毫秒级的延迟),由此欠下了很多技术债。
  • 近些年随着技术的进一步成熟,大数据在不断朝着更精细化的方向发展,参考了很多传统数据库的理念和技术,补齐了很多早期的技术债,使得大数据组件越来越像存储与计算分离的数据库,也进而推出了数据据湖仓/湖仓一体的理念。
  • 技术债之一,就是大数据参考数据库实现了对事务 acid 特性的支持。具体到框架层面,数据湖三剑客 DeltaLake/Hudi/Iceberg, 还有本文的 KAFKA 事务机制,都是这个范畴。

3 什么是 KAFKA 的事务机制

  • KAFKA 的事务机制,是 KAFKA 实现端到端有且仅有一次语义(end-to-end EOS)的基础;
  • KAFKA 的事务机制,涉及到 transactional producer 和 transactional consumer, 两者配合使用,才能实现端到端有且仅有一次的语义(end-to-end EOS);
  • 当然kakfa 的 producer 和 consumer 是解耦的,你也可以使用非 transactional 的 consumer 来消费 transactional producer 生产的消息,但此时就丢失了事务 ACID 的支持;
  • 通过事务机制,KAFKA 可以实现对多个 topic 的多个 partition 的原子性的写入,即处于同一个事务内的所有消息,不管最终需要落地到哪个 topic 的哪个 partition, 最终结果都是要么全部写成功,要么全部写失败(Atomic multi-partition writes);
  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件;
  • 事实上,开启 kafka事务时,kafka 会自动开启幂等生产者。

4 KAFKA 内部是如何支持事务的

4.1 为支持事务机制,KAFKA 引入了两个新的组件:Transaction Coordinator 和 Transaction Log

为支持事务机制,KAFKA 引入了两个新的组件:Transaction Coordinator 和 Transaction Log,如下图所示:

image.png

  • transaction coordinator 是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一(不是一个独立的新的进程);
  • transaction log 是 kafka 的一个内部 topic(类似大家熟悉的 __consumer_offsets ,是一个内部 topic);
  • transaction log 有多个分区,每个分区都有一个 leader,该 leade对应哪个 kafka broker,哪个 broker 上的 transaction coordinator 就负责对这些分区的写操作;
  • 由于 transaction coordinator 是 kafka broker 内部的一个模块,而 transaction log 是 kakfa 的一个内部 topic, 所以 KAFKA 可以通过内部的复制协议和选举机制(replication protocol and leader election processes),来确保 transaction coordinator 的可用性和 transaction state 的持久性;
  • transaction log topic 内部存储的只是事务的最新状态和其相关元数据信息,kafka producer 生产的原始消息,仍然是只存储在kafka producer指定的 topic 中。事务的状态有:“Ongoing,” “Prepare commit,” 和 “Completed” 。
  • 实际上,每个 transactional.id 通过 hash 都对应到 了 transaction log 的一个分区,所以每个 transactional.id 都有且仅有一个 transaction coordinator 负责。

image.png

4.2 为支持事务机制,KAFKA 将日志文件格式进行了扩展,添加了控制消息 control batch

为支持事务机制,KAFKA 将底层日志文件的格式进行了扩展:

  • 日志中除了普通的消息,还有一种消息专门用来标志事务的状态,它就是控制消息 control batch;
  • 控制消息跟其他正常的消息一样,都被存储在日志中,但控制消息不会被返回给 consumer 客户端;
  • 控制消息共有两种类型:commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;
  • RecordBatch 中 attributes 字段的第5位用来标志当前消息是否处于事务中,1代表消息处于事务中,0则反之;(A record batch is a container for records. )
  • RecordBatch 中 attributes 字段的第6位用来标识当前消息是否是控制消息,1代表是控制消息,0则反之;
  • 由于控制消息总是处于事务中,所以控制消息对应的RecordBatch 的 attributes 字段的第5位和第6位都被置为1;

参见源码:


image.pngimage.png

4.3 在事务机制下,KAFKA 消息的读写流程

首先看下应用程序代码中 KAFKA transactional api 的调用顺序:

image.png


对应代码中的 KAFKA transactional api,其消息读写流程,示意图如下:

image.png

image.png

  • KAFKA 生产者通过 initTransactions API 将 transactional.id 注册到 transactional coordinator:此时,此时 coordinator 会关闭所有有相同 transactional.id 且处于 pending 状态的事务,同时也会递增 epoch 来屏蔽僵尸生产者 (zombie producers). 该操作对每个 producer session 只执行一次.(producer.initTransaction())

image.png

image.png

  • KAFKA 生产者通过 beginTransaction API 开启事务,并通过 send API 发送消息到目标topic:此时消息对应的 partition 会首先被注册到 transactional coordinator,然后 producer 按照正常流程发送消息到目标 topic,且在发送消息时内部会通过校验屏蔽掉僵尸生产者(zombie producers are fenced out.(producer.beginTransaction();producer.send()*N;);

image.png

image.png

image.png

image.png

  • KAFKA 生产者通过 commitTransaction API 提交事务或通过abortTransaction API回滚事务:此时会向 transactional coordinator 提交请求,开始两阶段提交协议 (producer.commitTransaction();producer.abortTransaction(););
  • 在两阶段提交协议的第一阶段,transactional coordinator 更新内存中的事务状态为 “prepare_commit”,并将该状态持久化到 transaction log 中;
  • 在两阶段提交协议的第二阶段, coordinator 首先写 transaction marker 标记到目标 topic 的目标 partition,这里的 transaction marker,就是我们上文说的控制消息,控制消息共有两种类型:commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;
  • 在两阶段提交协议的第二阶段, coordinator 在向目标 topic 的目标 partition 写完控制消息后,会更新事务状态为 “commited” 或 “abort”, 并将该状态持久化到 transaction log 中;

image.png

  • KAFKA 消费者消费消息时可以指定具体的读隔离级别,当指定使用 read_committed 隔离级别时,在内部会使用存储在目标 topic-partition 中的 事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息;
  • 需要注意的是,过滤消息时,KAFKA consumer 不需要跟 transactional coordinator 进行 rpc 交互,因为 topic 中存储的消息,包括正常的数据消息和控制消息,包含了足够的元数据信息来支持消息过滤;
  • KAFKA 消费者消费消息时也可以指定使用 read_uncommitted 隔离级别,此时目标 topic-partition 中的所有消息都会被返回,不会进行过滤。
  • 过滤消息的相关源码,可以参见 org.apache.kafka.clients.consumer.internals.Fetcher:

image.png


相关文章
|
9月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
5月前
|
消息中间件 Java Kafka
掌握Kafka事务,看这篇就够了
先赞后看,南哥助你Java进阶一大半Kafka事务实际上引入了原子多分区写入的概念,播客画了以下流程图,展示了事务在分区级别如何工作。我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。
138 2
掌握Kafka事务,看这篇就够了
|
4月前
|
消息中间件 存储 分布式计算
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
63 4
|
4月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
58 2
|
9月前
|
消息中间件 Kafka API
Kafka Exactly Once 语义实现原理:幂等性与事务消息
Apache Kafka的Exactly-Once语义确保了消息处理的准确性和一致性。通过幂等性和事务消息,Kafka实现了要么全处理要么全不处理的原子性。文章详细解析了Kafka事务的工作流程,包括生产者的幂等性(通过序列号保证),以及事务消息的提交和回滚过程。Kafka事务提供了ACID保证,但存在性能限制,如额外的RPC请求和单生产者只能执行一个事务。此外,事务适用于同集群内的操作,跨集群时原子性无法保证。了解这些原理有助于开发者更好地利用Kafka事务构建可靠的数据处理系统。
226 3
 Kafka Exactly Once 语义实现原理:幂等性与事务消息
|
9月前
|
消息中间件 Kafka
【Kafka系列】Kafka事务一般在什么场景下使用呢
面试官:听说你精通Kafka,那我就考考你吧面试官:不用慌尽管说,错了也没关系😊。。。❤️。
125 2
【Kafka系列】Kafka事务一般在什么场景下使用呢
|
消息中间件 存储 Kafka
一文读懂 kafka 的事务机制 2
一文读懂 kafka 的事务机制
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
消息中间件 Kubernetes Java
Apache Kafka-事务消息的支持与实现(本地事务)
Apache Kafka-事务消息的支持与实现(本地事务)
668 0
|
消息中间件 存储 Java
「事件驱动架构」Apache Kafka中的事务
「事件驱动架构」Apache Kafka中的事务