【kafka原理】 消费者偏移量__consumer_offsets_相关解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 【kafka原理】 消费者偏移量__consumer_offsets_相关解析

我们在kafka的log文件中发现了还有很多以 __consumer_offsets_的文件夹;总共50个;


由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。


__consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

__consumer_offsets 的每条消息格式大致如图所示


image.png

image.png

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。


考虑到一个 kafka 生成环境中可能有很多consumer 和 consumer group,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将负载分散到不同的 __consumer_offsets 分区上。


一般情况下,当集群中第一次有消费者消费消息时会自动创建__consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50。


1. 消费Topic消息

打开一个session a,执行下面的消费者命令 ;指定了消费组:szz1-group; topic:szz1-test-topic

bin/kafka-console-consumer.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic

2.产生消息

打开一个新的session b,执行生产消息命令

bin/kafka-console-producer.sh --broker-list  xxx1:9092,xxx2:9092,xxx3:9092  --topic szz1-test-topic

发送几条消息

image.png

然后可以看到刚刚打开的 session a 消费了消息;

image.png

3. 查看指定消费组的消费位置offset

bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group

image.png

可以看到图中 展示了每个partition 对应的消费者id; 因为只开了一个消费者; 所以是这个消费者同时消费3个partition;

CURRENT-OFFSET: 当前消费组消费到的偏移量

LOG-END-OFFSET: 日志最后的偏移量

CURRENT-OFFSET = LOG-END-OFFSET 说明当前消费组已经全部消费了;


那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看;

image.png

我发送了2条消息之后, partition-0 partition-1 的LOG-END-OFFSET: 日志最后的偏移量分别增加了1; 但是CURRENT-OFFSET: 当前消费组消费到的偏移量 保持不变;因为没有被消费;


重新打开一个消费组 继续消费*


重新打开session之后, 会发现控制台输出了刚刚发送的2条消息; 并且偏移量也更新了

image.png

4. 从头开始消费 --from-beginning

如果我们用新的消费组去消费一个Topic,那么默认这个消费组的offset会是最新的; 也就是说历史的不会消费

例如下面我们新开一个session c ;消费组设置为szz1-group3

bin/kafka-console-consumer.sh --bootstrap-server   xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3    --topic szz1-test-topic

查看消费情况

 bin/kafka-consumer-groups.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092  --describe --group szz1-group3

image.png

可以看到CURRENT-OFFSET = LOG-END-OFFSET ;


如何让新的消费组/者 从头开始消费呢? 加上参数 --from-beginning


5.如何确认 consume_group 在哪个__consumer_offsets-? 中

Math.abs(groupID.hashCode()) % numPartitions


6. 查找__consumer_offsets 分区数中的消费组偏移量offset

上面的 3. 查看指定消费组的消费位置offset 中,我们知道如何查看指定的topic消费组的偏移量;

那还有一种方式也可以查询


先通过 consume_group 确定分区数; 例如 "szz1-group".hashCode()%50=32; 那我们就知道 szz-group消费组的偏移量信息存放在 __consumer_offsets_32中;

通过命令

 bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

image.png

前面的 是key 后面的是value;key由 消费组+Topic+分区数 确定; 后面的value就包含了 消费组的偏移量信息等等

然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的offset;

7 查询topic的分布情况

bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称


image.png

相关文章
|
1月前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
89 13
|
2月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
98 1
|
9天前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
|
20天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
80 14
|
28天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
103 1
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
147 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
90 3
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
213 0

热门文章

最新文章

推荐镜像

更多