奇奇怪怪的问题之Kafka 重平衡

简介: kafka Rebalance

kafka 2.11 attempt to heart beat failed since the group is rebalancing

生产环境遇到kafka 2.11 重平衡问题,记录

为了解决问题,先还原此报错

window下搭建kafka单节点

https://kafka.apache.org/downloads 2.1.1版本

下载后解压

修改 config目录下 zookeeper.properties  dataDir 指定zk数据存放目录(默认是linux目录结构)

修改 config目录下 server.properties log log.dirs 指定kafka日志存放目录(默认是linux目录结构)

启动zk

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

zk端口默认2181

启动kafka

.\bin\windows\kafka-server-start.bat config\server.properties

kafka默认端口 9092

使用kafka_toole工具连接测试:

测试成功,单节点搭建完成

模拟程序生产者和消费者

开发程序,模拟生产者和消费者

# 引入jar包
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>


配置文件

spring.kafka.producer.bootstrap-servers=localhost:9092 # 配置kafka地址spring.kafka.consumer.max-poll-records=1#每次拉取一条数据 方便测试spring.kafka.consumer.heartbeat-interval=100# 心跳时间100ms


生产者

使用KafkaTemplate

controller和service代码省略

packagecom.example.kafka_test.service.impl;
importcom.example.kafka_test.service.ProducerService;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.kafka.core.KafkaTemplate;
importorg.springframework.stereotype.Service;
importstaticcom.example.kafka_test.KafkaConstant.TOPIC_NAME;
/*** @Author: Zy* @Date: 2021/11/25 10:00*/@ServicepublicclassProducerServiceImplimplementsProducerService {
@AutowiredprivateKafkaTemplate<String, String>kafkaTemplate;
@OverridepublicvoidsendMsg(Stringmsg) {
kafkaTemplate.send(TOPIC_NAME, msg);
    }
}



消费者

packagecom.example.kafka_test.service.impl;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.kafka.annotation.KafkaListener;
importorg.springframework.stereotype.Component;
importorg.springframework.stereotype.Service;
importstaticcom.example.kafka_test.KafkaConstant.CONSUMER_GROUP;
importstaticcom.example.kafka_test.KafkaConstant.TOPIC_NAME;
/*** @Author: Zy* @Date: 2021/11/25 10:13*/@Slf4j@ComponentpublicclassConsumerServiceImpl {
@KafkaListener(topics=TOPIC_NAME, groupId=CONSUMER_GROUP)
publicvoidconsumerMsg(Stringmsg) {
try {
#休眠防止消费速度快无法观察日志Thread.sleep(10000);
        } catch (InterruptedExceptione) {
e.printStackTrace();
        }
log.info(msg);
    }
}


测试1-重平衡的触发

造数据

使用postman调用生产者造数据

启动第一个消费者

正常启动,且开始消费

启动第二个消费者

同样的分区,同样的消费者组

启动成功后,发现第一个消费者大量输出重平衡日志,且重平衡期间数据并没有消费

总结

消费者的数量变动会触发Rebalance 重平衡期间数据不消费

测试2-还原报错

Attempt to heart beat failed since the group is rebalancing

修改配置参数

spring.kafka.consumer.heartbeat-interval=100# 每100ms发送一次心跳spring.kafka.consumer.properties.max.poll.interval.ms=9000#最大每9000ms拉取一次数据spring.kafka.consumer.properties.session.timeout.ms=30000#超过此时间没有发送心跳,则认为消费者死亡,剔除组,并触发rebalance


修改后像测试1一样重启,发现大量报错


还原报错成功,总结一下

总结

错误原因:

Attempt to heart beat failed since the group is rebalancing

先分析一下这句话,发送心跳请求失败,消费者组正在重平衡

也就是说触发这个问题的条件有两个:

  1. 发送心跳
  2. 消费者重平衡

发送心跳请求:

在kafka 0.11版本之前,心跳请求是跟poll()请求一起发送的,即拉取一次数据发送一次心跳

在kafka 0.11版本之后,心跳请求是单独的线程,由 spring.kafka.consumer.heartbeat-interval 参数控制心跳请求的间隔时间

重平衡:

触发重平衡的情况如下:

  1. 有新的consumer加入
  2. 旧的consumer挂了
  3. coordinator挂了,集群选举出新的coordinator
  4. topic的partition新加
  5. consumer调用unsubscrible(),取消topic的订阅

经过以上分析,可以得到结果;
由于拉取数据消费过慢,两次poll之间的时间超过了session.timeout.ms,被认为此消费者已死亡,触发了rebalance,而在rebalance的过程中,发送心跳请求导致的报错.

解决办法:

  1. 调高心跳请求的间隔时间  heartbeat-interval 此项可不调整,因为心跳时间偏大的情况下,也可能触发rebalance
  2. 调高超时时间 session.timeout.ms
  3. 减少每次poll的拉取数据量  max-poll-records 防止每次poll拉取的数据处理的时间过长,导致超时


其中最好的办法就是2 3

最重要的就是poll到的数据要在session.timeout.ms时间内处理完.


目录
相关文章
|
5月前
|
消息中间件 负载均衡 Kafka
Kafka消费组重新平衡流程
Kafka消费组重新平衡流程
|
7月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
231 3
|
7月前
|
消息中间件 负载均衡 Kafka
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
778 2
|
7月前
|
消息中间件 负载均衡 Kafka
Kafka - 3.x 分区分配策略及再平衡不完全指北
Kafka - 3.x 分区分配策略及再平衡不完全指北
222 0
|
消息中间件 Kafka RocketMQ
Kafka重平衡机制
当集群中有新成员加入,或者某些主题增加了分区之后,消费者是怎么进行重新分配分区再进行消费的?这里就涉及到重平衡(Rebalance)的概念,下面我就给大家讲解一下什么是 Kafka 重平衡机制,我尽量做到图文并茂通俗易懂。
1337 0
Kafka重平衡机制
|
消息中间件 算法 关系型数据库
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
|
消息中间件 Kafka API
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)
|
消息中间件 存储 负载均衡
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
「事件驱动架构」Apache Kafka再平衡协议:再平衡协议101
|
消息中间件 缓存 Kafka
「事件驱动架构」Kafka再平衡协议:静态成员和增量合作再平衡
「事件驱动架构」Kafka再平衡协议:静态成员和增量合作再平衡
|
消息中间件 网络协议 Java
【Kafka从入门到成神系列 六】Kafka 消费组及重平衡
【Kafka从入门到成神系列 六】Kafka 消费组及重平衡
【Kafka从入门到成神系列 六】Kafka 消费组及重平衡
下一篇
DataWorks