4.4 在事务机制下,KAFKA 对事务状态的容错
在事务机制下,KAFKA 在内部通过以下几点,实现了对事务状态的持久化存储,以及对 transaction coordinator 的容错:
- transaction coordinator 是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一(不是一个独立的新的进程);
- transaction coordinator 将事务的状态保存在内存中,并持久化到 transaction log 中;
- transaction log 是 kakafa 的一个内部 topic(类似大家熟悉的 consumer_offsets ,是一个内部 topic);
- transaction log 有多个分区,每个分区都有一个 leader,该 leade对应哪个 kafka broker,则那个 broker上的 transaction coordinator 就负责对这些分区的写操作;
- transaction coordinator 是唯一负责读写 transaction log 的组件,如果某个 kafka broker 宕机的话,其负责的 transaction log 的 partitions 就没有了对应的 leader,此时会通过选举机制选举出一个新的 coordinator,该 coordinator 会从这些 transaction log partitions 在其它节点的副本中恢复状态数据;
- 正是由于 transaction log 是 kakfa 的一个内部 topic, 所以 KAFKA 可以通过内部的复制协议和选举机制(replication protocol and leader election processes),来确保对事务状态 transaction state 的持久化存储,以及对 transaction coordinator 的容错。
4.5 开启事务后,对 producer 和 consumer 的性能影响如何
- 开启事务后,producer 有些许写放大,这主要涉及到:
- 事务开始前, producer 需要将生产的消息的 partition 注册到 transaction coordinator, 这涉及到 rpt 调用;
- 事务结束时, transaction coordinator 需要注入 transaction marker 到消息对应的 Partition, 这也涉及到 rpt 调用;
- 事务进行过程中,transaction coordinator 需要将事务状态如 “prepare_commit” “complete_commit” 等持久化到 transaction log,这涉及到磁盘写;
- 但由于以上写操作的时间复杂度,更多是跟消息涉及到的分区个数相关,而不是跟消息的具体条数相关,且 KAFKA 通过 batch 机制尽量减小了 RPC 调用次数,所以对 Producer 的性能影响并不大;
- Confluent 官网有性能测试相关博文,其中讲到 “for a producer producing 1KB records at maximum throughput, committing messages every 100ms results in only a 3% degradation in throughput. Smaller messages or shorter transaction commit intervals would result in more severe degradation.“,即在他们的测试场景下,开启事务后性能只有3% 的下降;
- 开启事务后,对 consumer 的性能影响相对对 producer 的性能影响更小,consumer 仍然是轻量级高吞吐的,几乎没有性能影响:
- 这主要是因为,consumer 在 read_committed 模式下,只需要额外做一些消息的过滤,即过滤掉回滚了的事务的 message, 和 open 状态的事务的 message;
- 但过滤这些消息时,因为 topic 中存储的消息包括正常的数据消息和控制消息,这些消息包含了足够的元数据信息来支持消息过滤,所以KAFKA consumer 不需要跟 transactional coordinator 进行 rpc 交互;
- 同时,consumer 也不需要缓存读取的消息以等待事务的结束,而且由于底层仍然可以使用 zero-copy 机制来读取消息,所以相对于没有开启事务的 consumer,其性能几乎没有任何下降!
5 如何在应用程序中使用 KAFKA 事务
5.1 应用程序中使用 KAFKA 事务,涉及到 producer 和 consumer 的配置项的更改(当然 producer 和 consumer 的配置项的更改,可以更改配置文件也可以直接在代码中指定)。
- producer 配置项更改:
- enable.idempotence = true
- acks = “all”
- retries > 1 (preferably MAX_INT)
- transactional.id = ‘some unique id’
- consumer 配置项更改:
- 根据需要配置 isolation.level 为 “read_committed”, 或 “read_uncommitted”;
5.2 应用程序中使用 KAFKA 事务,涉及到应用程序代码调整,调用 transactional API
应用程序代码调整,调用 transactional API 大体顺序如下:
应用程序代码调整,示例伪代码(融合了Producer 和 Consumer)和关键步骤注释,如下:
KafkaProducer producer = createKafkaProducer( “bootstrap.servers”, “localhost:9092”, “transactional.id”, “my-transactional-id”); // the transactional.id is a must /**After the producer.initTransactions() returns, any transactions started by another instance of a producer with the same transactional.id would have been closed and fenced off. */ producer.initTransactions(); /** This specifies that the KafkaConsumer should only read non-transactional messages, or committed transactional messages from its input topics. */ KafkaConsumer consumer = createKafkaConsumer( “bootstrap.servers”, “localhost:9092”, “group.id”, “my-consumerGroup-id”, "isolation.level", "read_committed"); consumer.subscribe(Collections.singleton(“inputTopic”)); /** Consume some records, start a transaction, process the consumed records, write the processed records to the output topic, send the consumed offsets to the offsets topic, and finally commit the transaction. With the guarantees mentioned above, we know that the offsets and the output records will be committed as an atomic unit. */ while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); producer.beginTransaction(); for (ConsumerRecord record : records) producer.send(new ProducerRecord(“outputTopic”, record)); producer.sendOffsetsToTransaction(currentOffsets(consumer), my-consumerGroup-id); //This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. producer.commitTransaction(); }
5.3 应用程序中使用 KAFKA 事务,如何选用一个全局一致的 transactional.id?
如上文所述,transactional.id 在 kafka 的事务机制中扮演了关键的角色,kafka 正是基于该参数来过滤掉僵尸生产者的 (fencing out zombies).
那么如何在跨 session 的众多 producer 中 (向同一个kafka集群中生产消息的 producer 有多个,这些 producer 还有可能会重启),选用一个全局一致的transactional.id,以互不影响呢?
大体的思路有两种:
- 一是通过一个统一的外部存储,来记录生产者使用的 transactional.id 和该生产者涉及到的topic-partition之间的映射关系;
- 二是通过某些静态编码机制来生成一个全局唯一的 transactional.id
在现实应用中,使用后者的更多些。比如 Kafka 生态的 Kafka Streams 使用的就是后者,FlinkKafkaProducer 使用的也是后者。
FlinkKafkaProducer 是根据 taskName+operatorUid+SubtaskIndex+Counter 来自动生成 transactional.id 的,如下源码所示:(FlinkKafkaProducer 是对 kafka transactional api 的更高级的封装和抽象,更易于使用,其底层融合了 Flink checkpoint 的两阶段提交协议和 Kafka transactional api 的两阶段提交协议,后续笔者会通过另一篇博客进行解读)。
注意:使用 transactional API, 用户需要配置 transactional.id,但不需要配置 ProducerId,Kafka 内部会自动生成并维护一个全局唯一的 ProducerIds,如下源码所示:
6 知识总结
- KAFKA 的事务机制,是 KAFKA 实现端到端有且仅有一次语义(end-to-end EOS)的基础;
- KAFKA 的事务机制,涉及到 transactional producer 和 transactional consumer, 两者配合使用,才能实现端到端有且仅有一次的语义(end-to-end EOS);
- 通过事务机制,KAFKA 实现了对多个 topic 的多个 partition 的原子性的写入(Atomic multi-partition writes);
- KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件:用户可以根据需要,配置使用幂等生产者但不开启事务;也可以根据需要开启 kafka事务,此时kafka 会使用幂等生产者;
- 为支持事务机制,KAFKA 引入了两个新的组件: Transaction Coordinator 和 Transaction Log,其中 transaction coordinator 是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一(不是一个独立的新的进程);而 transaction log 是 kakafa 的一个内部 topic;
- 为支持事务机制,kafka 将日志文件格式进行了扩展:日志中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息 controlBatch,它有两种类型:commit和abort,分别用来表征事务已经成功提交或已经被成功终止。
- 开启了事务的生产者,生产的消息最终还是正常写到目标 topic 中,但同时也会通过 transaction coordinator 使用两阶段提交协议,将事务状态标记 transaction marker,也就是控制消息 controlBatch,写到目标 topic 中,控制消息共有两种类型 commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;
- 开启了事务的消费者,如果配置读隔离级别为 read-committed, 在内部会使用存储在目标 topic-partition 中的事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息,从而确保只读到已提交的事务的 message;// 对于非事务生产者生产的消息,他们不处于事务中,使用 read-committed 隔离级别的开启了事务的消费者,可以正常读取这些消息;
- 开启了事务的消费者,过滤消息时,KAFKA consumer 不需要跟 transactional coordinator 进行 rpc 交互,因为 topic 中存储的消息,包括正常的数据消息和控制消息,包含了足够的元数据信息来支持消息过滤;
- 当然 kakfa 的 producer 和 consumer 是解耦的,你也可以使用非 transactional consumer 来消费 transactional producer 生产的消息,此时目标 topic-partition 中的所有消息都会被返回,不会进行过滤,此时也就丢失了事务 ACID 的支持;
更多细节,可以参考:KIP-98:Exactly Once Delivery and Transactional Messaging