kafka 2.11 attempt to heart beat failed since the group is rebalancing
生产环境遇到kafka 2.11 重平衡问题,记录
为了解决问题,先还原此报错
window下搭建kafka单节点
https://kafka.apache.org/downloads 2.1.1版本
下载后解压
修改 config目录下 zookeeper.properties dataDir 指定zk数据存放目录(默认是linux目录结构)
修改 config目录下 server.properties log log.dirs 指定kafka日志存放目录(默认是linux目录结构)
启动zk
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
zk端口默认2181
启动kafka
.\bin\windows\kafka-server-start.bat config\server.properties
kafka默认端口 9092
使用kafka_toole工具连接测试:
测试成功,单节点搭建完成
模拟程序生产者和消费者
开发程序,模拟生产者和消费者
# 引入jar包 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
配置文件
spring.kafka.producer.bootstrap-servers=localhost:9092 # 配置kafka地址spring.kafka.consumer.max-poll-records=1#每次拉取一条数据 方便测试spring.kafka.consumer.heartbeat-interval=100# 心跳时间100ms
生产者
使用KafkaTemplate
controller和service代码省略
packagecom.example.kafka_test.service.impl; importcom.example.kafka_test.service.ProducerService; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.kafka.core.KafkaTemplate; importorg.springframework.stereotype.Service; importstaticcom.example.kafka_test.KafkaConstant.TOPIC_NAME; /*** @Author: Zy* @Date: 2021/11/25 10:00*/publicclassProducerServiceImplimplementsProducerService { privateKafkaTemplate<String, String>kafkaTemplate; publicvoidsendMsg(Stringmsg) { kafkaTemplate.send(TOPIC_NAME, msg); } }
消费者
packagecom.example.kafka_test.service.impl; importlombok.extern.slf4j.Slf4j; importorg.springframework.kafka.annotation.KafkaListener; importorg.springframework.stereotype.Component; importorg.springframework.stereotype.Service; importstaticcom.example.kafka_test.KafkaConstant.CONSUMER_GROUP; importstaticcom.example.kafka_test.KafkaConstant.TOPIC_NAME; /*** @Author: Zy* @Date: 2021/11/25 10:13*/publicclassConsumerServiceImpl { topics=TOPIC_NAME, groupId=CONSUMER_GROUP) (publicvoidconsumerMsg(Stringmsg) { try { #休眠防止消费速度快无法观察日志Thread.sleep(10000); } catch (InterruptedExceptione) { e.printStackTrace(); } log.info(msg); } }
测试1-重平衡的触发
造数据
使用postman调用生产者造数据
启动第一个消费者
正常启动,且开始消费
启动第二个消费者
同样的分区,同样的消费者组
启动成功后,发现第一个消费者大量输出重平衡日志,且重平衡期间数据并没有消费
总结
消费者的数量变动会触发Rebalance 重平衡期间数据不消费
测试2-还原报错
Attempt to heart beat failed since the group is rebalancing
修改配置参数
spring.kafka.consumer.heartbeat-interval=100# 每100ms发送一次心跳spring.kafka.consumer.properties.max.poll.interval.ms=9000#最大每9000ms拉取一次数据spring.kafka.consumer.properties.session.timeout.ms=30000#超过此时间没有发送心跳,则认为消费者死亡,剔除组,并触发rebalance
修改后像测试1一样重启,发现大量报错
还原报错成功,总结一下
总结
错误原因:
Attempt to heart beat failed since the group is rebalancing
先分析一下这句话,发送心跳请求失败,消费者组正在重平衡
也就是说触发这个问题的条件有两个:
- 发送心跳
- 消费者重平衡
发送心跳请求:
在kafka 0.11版本之前,心跳请求是跟poll()请求一起发送的,即拉取一次数据发送一次心跳
在kafka 0.11版本之后,心跳请求是单独的线程,由 spring.kafka.consumer.heartbeat-interval 参数控制心跳请求的间隔时间
重平衡:
触发重平衡的情况如下:
- 有新的consumer加入
- 旧的consumer挂了
- coordinator挂了,集群选举出新的coordinator
- topic的partition新加
- consumer调用unsubscrible(),取消topic的订阅
经过以上分析,可以得到结果;
由于拉取数据消费过慢,两次poll之间的时间超过了session.timeout.ms,被认为此消费者已死亡,触发了rebalance,而在rebalance的过程中,发送心跳请求导致的报错.
解决办法:
- 调高心跳请求的间隔时间 heartbeat-interval 此项可不调整,因为心跳时间偏大的情况下,也可能触发rebalance
- 调高超时时间 session.timeout.ms
- 减少每次poll的拉取数据量 max-poll-records 防止每次poll拉取的数据处理的时间过长,导致超时
其中最好的办法就是2 3
最重要的就是poll到的数据要在session.timeout.ms时间内处理完.