一、前言
二、位移主题
上期我们讲述了我们将我们的 位移偏移量(offset) 放到我们 Kafka 的位移主题:__consumer_offsets
1. 原因
我们老版本的 Kafka 是依靠 Zookeeper 的,他会自动或手动的将我们的位移提交到 Zookeeper 保存。当 Consumer 重启后,从我们的 Zookeeper 读取位移数据。
看似完美无瑕的操作,既能减少 Broker 的负担,又方便伸缩扩展。后来为什么被废弃了呢?
我们来看下官方对于 Zookeeper 的定义:Apache ZooKeeper 它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。
Zookeeper 作为分布式系统的协调器,如果让其不断的写入数据,假设我们 Kafka 或者 dubbo 挂了之后,Zookeeper 会重新进行 Leader 的选举,这个时候,你的写入数据的操作会影响 Leader 的选举
新版本社区对其进行了改进,将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。主题天然性的支持高持久性和高频的写操作。
2. 简介
位移主题是一个普通的 Kafka 主题,你可以闲着没事去修改、删除、新增。当然,之后的风险需要你自己承担。
既然是主题,那么肯定是有消息发送的。位移主题的消息格式是 Kafka 自己定义的,用户不能修改,也就是你不能随意的向这个主题写入消息,一旦你装逼写入你自己定义的消息格式,那么你的 Broker 可能直接崩溃。
我们猜想一下,这个消息格式(Key 和 Value 分别表示消息的键值和消息体)大概率是个什么样的呢?
我们上期讲到,一个消费者组里面的消费者消费一个主题下的几个分区, 也就相当于我们的 offset 应该是分区这个级别的。
消费者组中含有 Group ID,我们可以通过该 ID 确定那个消费组。再通过 Topic 和 Partition 确定某个具体的分区。所以,我们的 key 应该是:key = (Group ID, Topic, Partition)
我们消息体正常来说,肯定是包含 offset 的,还包含其他一些元数据,比如:时间戳和用户自定义的数据等。
3. 创建
当 Kafka 集群中的第一个 Consumer 启动时,Kafka 会自动创建位移主题。
该位移主题的默认分区数为:offsets.topic.num.partitions = 50
,其副本因子为:offsets.topic.replication.factor = 3
如果位移主题是Kafka 自动创建,那么该主题的分区数是50,副本数是 3。
4. 提交
位移的提交分为两种:自动提交和手动提交。主要通过参数 enable.auto.commit
来进行管理。
如果我们将 enable.auto.commit
设置成 false
,那么我们就要使用 consumer.commitSync
手动进行提交。
如果我们选择的是 自动提交,这个时候会存在一个问题:只要我们的 Consumer 一直启动着,它就会无限期地向位移主题写入消息。
当我们的 Consumer 消费到了某个主题的最新一条消息,位移是 100,之后没有任何消息产生,故我们的 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交,会不断的写入位移 = 100 的消息。显然,对于这类消息,我们只保存一条即可,其余的都可以进行删除。
Kafak 使用 Compaction 来进行对消息的删除,也可以称之为 整理。
**Compact策略:**对于同一个 key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。该策略会扫描所有的日志,删除其过期消息。
图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。
Kafka 提供了专门的后台线程定期的巡检 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫做 Log Cleaner。
5. 总结
Kafka 神秘的位移主题 __consumer_offsets,包括引入它的契机与原因、它的作用、消息格式、写入的时机以及管理策略等,这对我们了解 Kafka 特别是 Kafka Consumer 的位移管理是大有帮助的。
实际上,将很多元数据以消息的方式存入 Kafka 内部主题的做法越来越流行。除了 Consumer 位移管理,Kafka 事务也是利用了这个方法,当然那是另外的一个内部主题了。
社区的想法很简单:既然 Kafka 天然实现了高持久性和高吞吐量,那么任何有这两个需求的子服务自然也就不必求助于外部系统,用 Kafka 自己实现就好了。
三、位移提交
位移在我们 Kafka 中有两种概念:
- 一种是生产者向 Partition 中发送消息的 offset
- 一种是消费者消费消息提交的 offset,这里的 offset 指的是 Consumer 要消费的下一条消息的位移。
**Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报的过程被称为提交位移。**Consumer 能够同时消费多个分区的消息,所以 Consumer 需要为分配给它的每个分区提交各自的位移数。
提交位移主要为了保障 Consumer 的消费进度,这样我们的 Consumer 即使发生故障重启,也能够从 Kafka 中读取之前提交的位移值,,然后从相应的位移进行消费。为了防止消息重新进行消费一遍,和之前讲过的读书案例类似。
当然,这种位移记录也存在一定的风险。**位移提交的语义保障是由你来负责的,Kafka 只会无脑的接受你提交的位移。**简单来说,当你目前消费了 10 条消息,你却提交了 100 的位移,下一次会在 100 的位移处进行消费,形消息丢失的结果。
Kafka 提供多种位移的方法,从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
1. 同步提交
最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。这是一个同步操作,一直等待,直到位移被成功提交才进行返回。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 } }
我们可以发现,调用 Consumer.commitSync() 方法的时机,是你在处理完 poll() 方法返回的所有消息之后。如果你提早的提交了位移,就会造成数据丢失(这一批数据还没消费完,你直接给提交了,万一这批数据出现差错,也没办法再次拉取消费了)。
那我们的自动提交可以避免这个问题嘛,可以避免消息丢失的问题,但可能出现重复性消费的问题。
自动提交:Kafka 会保证在开始调用 poll() 方法时,提交上一次 poll() 的所有消息。从顺序上来说,poll() 方法的逻辑是先提交上一批消息的位移,在处理下一批消息。
在默认情况下,Consumer 每 5 秒自动提交一次位移。现在我们假设提交位移之后的 3 秒 发生了重平衡操作。在重平衡之后,我们 Consumer 消费的分区被打乱了,需要重新进行消费,这样会进行重复性消费。我们虽然能减少自动提交的间隔,但不可能完全消除它,也是自动提交机制的一个缺陷。
再回头看看我们的手动提交,好处就在于灵活。
2. 异步提交
Kafka 社区为我们提供了另一个 API 方法:Kafka 社区为我们提供了另一个 API 方法。是一个异步操作。调用 commitAsync 之后,他会立即返回,不会阻塞。Kafka 还为其提供了回调函数(callback)。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 consumer.commitAsync((offsets, exception) -> { // 这里可以实现提交之后的逻辑 if (exception != null) handle(exception); }); }
这种异步提交最大的一个缺点就是,无法进行重试。因为是异步操作,如果等到网络超时再重试的话,当前的位移值可能早已经过期了,无实在意义。
3. 异步同步相结合
所以,我们可以将我们的 commitSync 和 commitAsync 组合起来
- 利用 commitSync 的自动重试来规避那些瞬时错误(网络抖动、Broker GC)
- 我们不希望程序总处于阻塞状态,影响 TPS。
try { while (true) { // poll(long timeout):以 timeout 轮询去拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 commitAysnc(); // 使用异步提交规避阻塞 } } catch (Exception e) { handle(e); // 处理异常 } finally { try { consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 } finally { consumer.close(); } }
对于常规性、阶段性的手动提交,我们采用异步提交,而在我们的 Consumer 关闭前,我们调用异步提交,以确保我们的 Consumer 位移的正确性。
4. 分批提交
Kafka 还给我们提供了另外一组方法,比如我们的 poll() 方法一次性拉取了 5000 条消息,我们能不能分批进行提交,比如,我们消息消费完 500 条进行一次提交。Kafka 中利用 commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。 这两个方法实现分批提交的功能。
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record: records) { process(record); // 处理消息 offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1); if(count % 100 == 0){ consumer.commitAsync(offsets, null); // 回调处理逻辑是 null } count++; } }