【Kafka从入门到成神系列 七】Kafka 位移主题

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Kafka从入门到成神系列 七】Kafka 位移主题

一、前言


二、位移主题

上期我们讲述了我们将我们的 位移偏移量(offset) 放到我们 Kafka 的位移主题:__consumer_offsets

1. 原因

我们老版本的 Kafka 是依靠 Zookeeper 的,他会自动或手动的将我们的位移提交到 Zookeeper 保存。当 Consumer 重启后,从我们的 Zookeeper 读取位移数据。

看似完美无瑕的操作,既能减少 Broker 的负担,又方便伸缩扩展。后来为什么被废弃了呢?

我们来看下官方对于 Zookeeper 的定义:Apache ZooKeeper 它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。

Zookeeper 作为分布式系统的协调器,如果让其不断的写入数据,假设我们 Kafka 或者 dubbo 挂了之后,Zookeeper 会重新进行 Leader 的选举,这个时候,你的写入数据的操作会影响 Leader 的选举

新版本社区对其进行了改进,将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。主题天然性的支持高持久性和高频的写操作。

2. 简介

位移主题是一个普通的 Kafka 主题,你可以闲着没事去修改、删除、新增。当然,之后的风险需要你自己承担。

既然是主题,那么肯定是有消息发送的。位移主题的消息格式是 Kafka 自己定义的,用户不能修改,也就是你不能随意的向这个主题写入消息,一旦你装逼写入你自己定义的消息格式,那么你的 Broker 可能直接崩溃。

我们猜想一下,这个消息格式(Key 和 Value 分别表示消息的键值和消息体)大概率是个什么样的呢?

我们上期讲到,一个消费者组里面的消费者消费一个主题下的几个分区, 也就相当于我们的 offset 应该是分区这个级别的。

消费者组中含有 Group ID,我们可以通过该 ID 确定那个消费组。再通过 Topic 和 Partition 确定某个具体的分区。所以,我们的 key 应该是:key = (Group ID, Topic, Partition)

我们消息体正常来说,肯定是包含 offset 的,还包含其他一些元数据,比如:时间戳和用户自定义的数据等。

3. 创建

当 Kafka 集群中的第一个 Consumer 启动时,Kafka 会自动创建位移主题。

该位移主题的默认分区数为:offsets.topic.num.partitions = 50,其副本因子为:offsets.topic.replication.factor = 3

如果位移主题是Kafka 自动创建,那么该主题的分区数是50,副本数是 3。

4. 提交

位移的提交分为两种:自动提交和手动提交。主要通过参数 enable.auto.commit 来进行管理。

如果我们将 enable.auto.commit 设置成 false,那么我们就要使用 consumer.commitSync 手动进行提交。

如果我们选择的是 自动提交,这个时候会存在一个问题:只要我们的 Consumer 一直启动着,它就会无限期地向位移主题写入消息。

当我们的 Consumer 消费到了某个主题的最新一条消息,位移是 100,之后没有任何消息产生,故我们的 Consumer 无消息可消费了,所以位移永远保持在 100。由于是自动提交,会不断的写入位移 = 100 的消息。显然,对于这类消息,我们只保存一条即可,其余的都可以进行删除。

Kafak 使用 Compaction 来进行对消息的删除,也可以称之为 整理

**Compact策略:**对于同一个 key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。该策略会扫描所有的日志,删除其过期消息。



7f6787894b3bcadcb4ce01cc24520c8c.png

图中位移为 0、2 和 3 的消息的 Key 都是 K1。Compact 之后,分区只需要保存位移为 3 的消息,因为它是最新发送的。

Kafka 提供了专门的后台线程定期的巡检 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫做 Log Cleaner。

5. 总结

Kafka 神秘的位移主题 __consumer_offsets,包括引入它的契机与原因、它的作用、消息格式、写入的时机以及管理策略等,这对我们了解 Kafka 特别是 Kafka Consumer 的位移管理是大有帮助的。

实际上,将很多元数据以消息的方式存入 Kafka 内部主题的做法越来越流行。除了 Consumer 位移管理,Kafka 事务也是利用了这个方法,当然那是另外的一个内部主题了。

社区的想法很简单:既然 Kafka 天然实现了高持久性和高吞吐量,那么任何有这两个需求的子服务自然也就不必求助于外部系统,用 Kafka 自己实现就好了。

三、位移提交

位移在我们 Kafka 中有两种概念:

  • 一种是生产者向 Partition 中发送消息的 offset
  • 一种是消费者消费消息提交的 offset,这里的 offset 指的是 Consumer 要消费的下一条消息的位移。

**Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报的过程被称为提交位移。**Consumer 能够同时消费多个分区的消息,所以 Consumer 需要为分配给它的每个分区提交各自的位移数。

提交位移主要为了保障 Consumer 的消费进度,这样我们的 Consumer 即使发生故障重启,也能够从 Kafka 中读取之前提交的位移值,,然后从相应的位移进行消费。为了防止消息重新进行消费一遍,和之前讲过的读书案例类似。

当然,这种位移记录也存在一定的风险。**位移提交的语义保障是由你来负责的,Kafka 只会无脑的接受你提交的位移。**简单来说,当你目前消费了 10 条消息,你却提交了 100 的位移,下一次会在 100 的位移处进行消费,形消息丢失的结果。

Kafka 提供多种位移的方法,从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

1. 同步提交

最简单的 API 就是 KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。这是一个同步操作,一直等待,直到位移被成功提交才进行返回。

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  process(records); // 处理消息
  try {
    consumer.commitSync();
        } catch (CommitFailedException e) {
          handle(e); // 处理提交失败异常
        }
}

我们可以发现,调用 Consumer.commitSync() 方法的时机,是你在处理完 poll() 方法返回的所有消息之后。如果你提早的提交了位移,就会造成数据丢失(这一批数据还没消费完,你直接给提交了,万一这批数据出现差错,也没办法再次拉取消费了)。

那我们的自动提交可以避免这个问题嘛,可以避免消息丢失的问题,但可能出现重复性消费的问题

自动提交:Kafka 会保证在开始调用 poll() 方法时,提交上一次 poll() 的所有消息。从顺序上来说,poll() 方法的逻辑是先提交上一批消息的位移,在处理下一批消息。

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在我们假设提交位移之后的 3 秒 发生了重平衡操作。在重平衡之后,我们 Consumer 消费的分区被打乱了,需要重新进行消费,这样会进行重复性消费。我们虽然能减少自动提交的间隔,但不可能完全消除它,也是自动提交机制的一个缺陷。

再回头看看我们的手动提交,好处就在于灵活。

2. 异步提交

Kafka 社区为我们提供了另一个 API 方法Kafka 社区为我们提供了另一个 API 方法。是一个异步操作。调用 commitAsync 之后,他会立即返回,不会阻塞。Kafka 还为其提供了回调函数(callback)。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    process(records); // 处理消息
    consumer.commitAsync((offsets, exception) -> {
        // 这里可以实现提交之后的逻辑
        if (exception != null)
            handle(exception);
    });
}

这种异步提交最大的一个缺点就是,无法进行重试。因为是异步操作,如果等到网络超时再重试的话,当前的位移值可能早已经过期了,无实在意义。

3. 异步同步相结合

所以,我们可以将我们的 commitSync 和 commitAsync 组合起来

  1. 利用 commitSync 的自动重试来规避那些瞬时错误(网络抖动、Broker GC)
  2. 我们不希望程序总处于阻塞状态,影响 TPS。
try {
    while (true) {
        // poll(long timeout):以 timeout 轮询去拉取数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        commitAysnc(); // 使用异步提交规避阻塞
    }
} catch (Exception e) {
    handle(e); // 处理异常
} finally {
    try {
        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  } finally {
        consumer.close();
    }
}

对于常规性、阶段性的手动提交,我们采用异步提交,而在我们的 Consumer 关闭前,我们调用异步提交,以确保我们的 Consumer 位移的正确性。

4. 分批提交

Kafka 还给我们提供了另外一组方法,比如我们的 poll() 方法一次性拉取了 5000 条消息,我们能不能分批进行提交,比如,我们消息消费完 500 条进行一次提交。Kafka 中利用 commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。 这两个方法实现分批提交的功能。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
        process(record);  // 处理消息
        offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);
                    if(count % 100 == 0){
                        consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
                    }
                    count++;
  }
}





相关文章
|
1月前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
41 0
|
1月前
|
消息中间件 Java Kafka
kafka入门demo
kafka入门demo
40 0
|
1月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
1月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
83 1
|
1月前
|
消息中间件 存储 Kafka
Kafka【基础入门】
Kafka【基础入门】
36 1
|
18天前
|
消息中间件 监控 安全
探究Kafka主题删除失败的根本原因
探究Kafka主题删除失败的根本原因
10 0
|
1月前
|
消息中间件 Kafka 数据处理
了解Kafka位移自动提交的秘密:避免常见陷阱的方法
了解Kafka位移自动提交的秘密:避免常见陷阱的方法
60 1
|
1月前
|
消息中间件 存储 Kafka
Kafka - 3.x offset位移不完全指北
Kafka - 3.x offset位移不完全指北
78 0
|
1月前
|
消息中间件 存储 分布式计算
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
48 0
|
1月前
|
消息中间件 算法 Kafka
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)
155 0

热门文章

最新文章