为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

简介: Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。

一、为什么需要带有 subscribe 的 group.id

  • 消费概念:

Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。

  • 偏移量

每当我们启动一个消费者时,它都会加入一个消费者组,然后根据该消费者组中的其他消费者数量,为其分配要读取的分区。对于这些分区,它会检查列表读取偏移量是否已知,如果找到,它将从这一点开始读取消息。如果没有找到偏移量,则参数 auto.offset.reset 控制是从分区中最早的消息还是从最新的消息开始读取。

二、我们需要使用commitSync手动提交偏移量吗?

  • 是否需要手动提交偏移?

是否需要提交偏移量取决于作为参数 enable.auto.commit 选择的值。默认情况下,此设置为 true,这意味着消费者将定期自动提交其偏移量(由auto.commit.interval.ms 决定提交的频率)。如果将其设置为 false,那么将需要自己提交偏移量。这种默认行为可能也是导致很多发现 kafka 总是从最新的开始消费的原因,由于偏移量是自动提交的,因此它将使用该偏移量。

  • 有没有办法从头开始重播消息?

如果想每次都从头开始读取,可以调用seekToBeginning,如果不带参数调用,它将重置为所有订阅分区中的第一条消息,或者仅重置您传入的那些分区。

  • seekToBeginning

查找每个给定分区的第一个偏移量。poll(long) 该函数延迟计算,仅在调用或时才查找所有分区中的第一个偏移量position(TopicPartition)。如果未提供分区,则查找所有当前分配的分区的第一个偏移量。

public class MyListener implements ConsumerSeekAware {

...

  @Override
  public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
      callback.seekToBeginning(assignments.keySet());
  }

}
  • 有没有办法从最后开始重播消息?

有的,可以使用 seekToEnd() 查找所有分配的分区到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分区到该时间戳表示的偏移量。

public class MyListener extends AbstractConsumerSeekAware {

  @KafkaListener(...)
  void listn(...) {
      ...
  }
}

public class SomeOtherBean {

  MyListener listener;

  ...

  void someMethod() {
      this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
  }

}

三、如果我想手动提交偏移量,该怎么做?

  • 1、禁用自动提交

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  • 提交方法

对于手动提交,KafkaConsumers提供了两种方法,即 commitSync() 和 commitAsync()。commitSync()是一个阻塞调用,在偏移量成功提交后返回,commitAsync()则立即返回。如果想知道提交是否成功,可以为回调处理程序 ( OffsetCommitCallback) 提供一个方法参数。请注意,在两次提交调用中,消费者都会提交最新poll()调用的偏移量。
举个例子:假设一个分区主题有一个消费者并且最后一次调用poll()返回偏移量为 4、5、6 的消息。提交时,偏移量 6 将被提交,因为这是消费者客户端跟踪的最新偏移量。
同时,commitSync() 和 commitAsync() 都允许更多地控制我们想要提交的偏移量:如果你使用允许你指定的相应重载,那么Map<TopicPartition, OffsetAndMetadata>消费者将仅提交指定的偏移量(即,映射可以包含分配的分区的任何子集) ,并且指定的偏移量可以为任意值)。

  • 同步提交:

阻塞线程,直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
      consumer.commitSync();
  }
}

对于 for 循环中的每次迭代,只有在consumer.commitSync()成功返回或因抛出异常而中断后,代码才会移至下一次迭代。

  • 异步提交:

是一种非阻塞方法。调用它不会阻塞线程。相反,它将继续处理以下指令,无论最终是成功还是失败。

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
      consumer.commitAsync(callback);
  }
}

对于 for 循环中的每次迭代,无论consumer.commitAsync()最终会发生什么,代码都会移至下一次迭代。并且,提交的结果将由定义的回调函数处理。

  • 权衡:延迟与数据一致性

1、如果必须确保数据一致性,请选择commitSync(),因为它将确保在执行任何进一步操作之前,你将知道偏移量提交是成功还是失败。但由于它是同步和阻塞的,你将花费更多的时间来等待提交完成,这会导致高延迟。
2、如果可以接受某些数据不一致并希望具有低延迟,请选择commitAsync(),因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时代码将继续执行。

目录
相关文章
|
6月前
|
消息中间件 存储 Kafka
Kafka日志处理:深入了解偏移量查找与切分文件
**摘要:** 本文介绍了如何在Kafka中查找偏移量为23的消息,涉及ConcurrentSkipListMap的查询、索引文件的二分查找及日志分段的物理位置搜索。还探讨了Kafka日志分段的切分策略,包括大小、时间、索引大小和偏移量达到特定阈值时的切分条件。理解这些对于优化Kafka的性能和管理日志至关重要。
249 2
|
3月前
|
消息中间件 SQL 分布式计算
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
48 6
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
205 0
|
6月前
|
消息中间件 监控 Java
Kafka 新的消费组默认的偏移量设置和消费行为
Kafka 新的消费组默认的偏移量设置和消费行为
527 1
|
6月前
|
消息中间件 Kafka 数据库
面试题Kafka问题之查看偏移量为23的消息如何解决
面试题Kafka问题之查看偏移量为23的消息如何解决
47 0
|
消息中间件 负载均衡 大数据
Kafka - 分区中各种偏移量的说明
Kafka - 分区中各种偏移量的说明
228 0
Kafka - 分区中各种偏移量的说明
|
8月前
|
消息中间件 Kafka 数据库连接
实时计算 Flink版操作报错合集之无法将消费到的偏移量提交到Kafka如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
153 3
|
7月前
|
消息中间件 JSON Kafka
实时计算 Flink版操作报错合集之kafka源表没有指定group.id,遇到报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
155 0
|
8月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
258 3

热门文章

最新文章