一文读懂 kafka 的事务机制 2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 一文读懂 kafka 的事务机制

4.4 在事务机制下,KAFKA 对事务状态的容错

在事务机制下,KAFKA 在内部通过以下几点,实现了对事务状态的持久化存储,以及对 transaction coordinator 的容错:

  • transaction coordinator 是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一(不是一个独立的新的进程);
  • transaction coordinator 将事务的状态保存在内存中,并持久化到 transaction log 中;
  • transaction log 是 kakafa 的一个内部 topic(类似大家熟悉的 consumer_offsets ,是一个内部 topic);
  • transaction log 有多个分区,每个分区都有一个 leader,该 leade对应哪个 kafka broker,则那个 broker上的 transaction coordinator 就负责对这些分区的写操作;
  • transaction coordinator 是唯一负责读写 transaction log 的组件,如果某个 kafka broker 宕机的话,其负责的 transaction log 的 partitions 就没有了对应的 leader,此时会通过选举机制选举出一个新的 coordinator,该 coordinator 会从这些 transaction log partitions 在其它节点的副本中恢复状态数据;
  • 正是由于 transaction log 是 kakfa 的一个内部 topic, 所以 KAFKA 可以通过内部的复制协议和选举机制(replication protocol and leader election processes),来确保对事务状态 transaction state 的持久化存储,以及对 transaction coordinator 的容错。

4.5 开启事务后,对 producer 和 consumer 的性能影响如何

  • 开启事务后,producer 有些许写放大,这主要涉及到:
  • 事务开始前, producer 需要将生产的消息的 partition 注册到 transaction coordinator, 这涉及到 rpt 调用;
  • 事务结束时, transaction coordinator 需要注入 transaction marker 到消息对应的 Partition, 这也涉及到 rpt 调用;
  • 事务进行过程中,transaction coordinator 需要将事务状态如 “prepare_commit” “complete_commit” 等持久化到 transaction log,这涉及到磁盘写;
  • 但由于以上写操作的时间复杂度,更多是跟消息涉及到的分区个数相关,而不是跟消息的具体条数相关,且 KAFKA 通过 batch 机制尽量减小了 RPC 调用次数,所以对 Producer 的性能影响并不大;
  • Confluent 官网有性能测试相关博文,其中讲到 “for a producer producing 1KB records at maximum throughput, committing messages every 100ms results in only a 3% degradation in throughput. Smaller messages or shorter transaction commit intervals would result in more severe degradation.“,即在他们的测试场景下,开启事务后性能只有3% 的下降;


  • 开启事务后,对 consumer 的性能影响相对对 producer 的性能影响更小,consumer 仍然是轻量级高吞吐的,几乎没有性能影响:
  • 这主要是因为,consumer 在 read_committed 模式下,只需要额外做一些消息的过滤,即过滤掉回滚了的事务的 message, 和 open 状态的事务的 message;
  • 但过滤这些消息时,因为 topic 中存储的消息包括正常的数据消息和控制消息,这些消息包含了足够的元数据信息来支持消息过滤,所以KAFKA consumer 不需要跟 transactional coordinator 进行 rpc 交互;
  • 同时,consumer 也不需要缓存读取的消息以等待事务的结束,而且由于底层仍然可以使用 zero-copy 机制来读取消息,所以相对于没有开启事务的 consumer,其性能几乎没有任何下降!

5 如何在应用程序中使用 KAFKA 事务

5.1 应用程序中使用 KAFKA 事务,涉及到 producer 和 consumer 的配置项的更改(当然 producer 和 consumer 的配置项的更改,可以更改配置文件也可以直接在代码中指定)。

  • producer 配置项更改:
  • enable.idempotence = true
  • acks = “all”
  • retries > 1 (preferably MAX_INT)
  • transactional.id = ‘some unique id’


  • consumer 配置项更改:
  • 根据需要配置 isolation.level 为 “read_committed”, 或 “read_uncommitted”;

5.2 应用程序中使用 KAFKA 事务,涉及到应用程序代码调整,调用 transactional API

应用程序代码调整,调用 transactional API 大体顺序如下:

image.png

应用程序代码调整,示例伪代码(融合了Producer 和 Consumer)和关键步骤注释,如下:

KafkaProducer producer = createKafkaProducer(
  “bootstrap.servers”, “localhost:9092”,
  “transactional.id”, “my-transactional-id”); // the transactional.id is a must
/**After the producer.initTransactions() returns, any transactions started by another instance of a producer with the same transactional.id would have been closed and fenced off.
*/
producer.initTransactions();
/**
This specifies that the KafkaConsumer should only read non-transactional messages, or committed transactional messages from its input topics. 
*/
KafkaConsumer consumer = createKafkaConsumer(
  “bootstrap.servers”, “localhost:9092”,
  “group.id”, “my-consumerGroup-id”,
  "isolation.level", "read_committed");
consumer.subscribe(Collections.singleton(“inputTopic”));
/**
Consume some records, start a transaction, process the consumed records, write the processed records to the output topic, send the consumed offsets to the offsets topic, and finally commit the transaction. With the guarantees mentioned above, we know that the offsets and the output records will be committed as an atomic unit.
*/
while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(new ProducerRecord(“outputTopic”, record));
    producer.sendOffsetsToTransaction(currentOffsets(consumer), my-consumerGroup-id);  //This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern.
  producer.commitTransaction();
}

image.png

5.3 应用程序中使用 KAFKA 事务,如何选用一个全局一致的 transactional.id?

如上文所述,transactional.id 在 kafka 的事务机制中扮演了关键的角色,kafka 正是基于该参数来过滤掉僵尸生产者的 (fencing out zombies).

那么如何在跨 session 的众多 producer 中 (向同一个kafka集群中生产消息的 producer 有多个,这些 producer 还有可能会重启),选用一个全局一致的transactional.id,以互不影响呢?

大体的思路有两种:

  • 一是通过一个统一的外部存储,来记录生产者使用的 transactional.id 和该生产者涉及到的topic-partition之间的映射关系;
  • 二是通过某些静态编码机制来生成一个全局唯一的 transactional.id

在现实应用中,使用后者的更多些。比如 Kafka 生态的 Kafka Streams 使用的就是后者,FlinkKafkaProducer 使用的也是后者。

FlinkKafkaProducer 是根据 taskName+operatorUid+SubtaskIndex+Counter 来自动生成 transactional.id 的,如下源码所示:(FlinkKafkaProducer 是对 kafka transactional api 的更高级的封装和抽象,更易于使用,其底层融合了 Flink checkpoint 的两阶段提交协议和 Kafka transactional api 的两阶段提交协议,后续笔者会通过另一篇博客进行解读)。


image.pngimage.png

注意:使用 transactional API, 用户需要配置 transactional.id,但不需要配置 ProducerId,Kafka 内部会自动生成并维护一个全局唯一的 ProducerIds,如下源码所示:

image.png

6 知识总结

  • KAFKA 的事务机制,是 KAFKA 实现端到端有且仅有一次语义(end-to-end EOS)的基础;
  • KAFKA 的事务机制,涉及到 transactional producer 和 transactional consumer, 两者配合使用,才能实现端到端有且仅有一次的语义(end-to-end EOS);
  • 通过事务机制,KAFKA 实现了对多个 topic 的多个 partition 的原子性的写入(Atomic multi-partition writes);
  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件:用户可以根据需要,配置使用幂等生产者但不开启事务;也可以根据需要开启 kafka事务,此时kafka 会使用幂等生产者;
  • 为支持事务机制,KAFKA 引入了两个新的组件: Transaction Coordinator 和 Transaction Log,其中 transaction coordinator 是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一(不是一个独立的新的进程);而 transaction log 是 kakafa 的一个内部 topic;
  • 为支持事务机制,kafka 将日志文件格式进行了扩展:日志中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息 controlBatch,它有两种类型:commit和abort,分别用来表征事务已经成功提交或已经被成功终止。
  • 开启了事务的生产者,生产的消息最终还是正常写到目标 topic 中,但同时也会通过 transaction coordinator 使用两阶段提交协议,将事务状态标记 transaction marker,也就是控制消息 controlBatch,写到目标 topic 中,控制消息共有两种类型 commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;
  • 开启了事务的消费者,如果配置读隔离级别为 read-committed, 在内部会使用存储在目标 topic-partition 中的事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息,从而确保只读到已提交的事务的 message;// 对于非事务生产者生产的消息,他们不处于事务中,使用 read-committed 隔离级别的开启了事务的消费者,可以正常读取这些消息;
  • 开启了事务的消费者,过滤消息时,KAFKA consumer 不需要跟 transactional coordinator 进行 rpc 交互,因为 topic 中存储的消息,包括正常的数据消息和控制消息,包含了足够的元数据信息来支持消息过滤;
  • 当然 kakfa 的 producer 和 consumer 是解耦的,你也可以使用非 transactional consumer 来消费 transactional producer 生产的消息,此时目标 topic-partition 中的所有消息都会被返回,不会进行过滤,此时也就丢失了事务 ACID 的支持;

更多细节,可以参考:KIP-98:Exactly Once Delivery and Transactional Messaging

相关文章
|
7月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
3月前
|
消息中间件 Java Kafka
掌握Kafka事务,看这篇就够了
先赞后看,南哥助你Java进阶一大半Kafka事务实际上引入了原子多分区写入的概念,播客画了以下流程图,展示了事务在分区级别如何工作。我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。
106 2
掌握Kafka事务,看这篇就够了
|
2月前
|
消息中间件 存储 分布式计算
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
43 4
|
2月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
39 2
|
7月前
|
消息中间件 Kafka API
Kafka Exactly Once 语义实现原理:幂等性与事务消息
Apache Kafka的Exactly-Once语义确保了消息处理的准确性和一致性。通过幂等性和事务消息,Kafka实现了要么全处理要么全不处理的原子性。文章详细解析了Kafka事务的工作流程,包括生产者的幂等性(通过序列号保证),以及事务消息的提交和回滚过程。Kafka事务提供了ACID保证,但存在性能限制,如额外的RPC请求和单生产者只能执行一个事务。此外,事务适用于同集群内的操作,跨集群时原子性无法保证。了解这些原理有助于开发者更好地利用Kafka事务构建可靠的数据处理系统。
191 3
 Kafka Exactly Once 语义实现原理:幂等性与事务消息
|
7月前
|
消息中间件 Kafka
【Kafka系列】Kafka事务一般在什么场景下使用呢
面试官:听说你精通Kafka,那我就考考你吧面试官:不用慌尽管说,错了也没关系😊。。。❤️。
108 2
【Kafka系列】Kafka事务一般在什么场景下使用呢
|
消息中间件 存储 大数据
一文读懂 kafka 的事务机制 1
一文读懂 kafka 的事务机制
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
消息中间件 Kubernetes Java
Apache Kafka-事务消息的支持与实现(本地事务)
Apache Kafka-事务消息的支持与实现(本地事务)
651 0
|
消息中间件 存储 Java
「事件驱动架构」Apache Kafka中的事务
「事件驱动架构」Apache Kafka中的事务