一、消费者相关
1.1 消费组
1.1.1 查看消费组命令找不到消费组
详情: 运行以下命令后,控制台界面无任何输出
kafka-consumer-groups.sh --bootstrap-server 192.168.2.83:9092 --list
如下图:
解决方法:
指定消费者群组进行消费,成功消费后将自动创建所指定的消费者群组。
springboot指定消费者群组:消费者配置类添加下面代码(消费者配置类可查看springboot连接kafka)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kakfa-test");
结果如图:
1.2 主题
1.2.1 kafka默认主题_consumer_offsets 不小心删除
详情: 删除了_consumer_offsets主题
如下图:
解决方法:
- 重新创建主题 __consumer_offsets
bin/kafka-topics.sh --bootstrap-server 192.168.2.86:9092 --topic __consumer_offsets --create
- 重启kafka服务
- 进行查看
结果如图,可以看到主题__consumer_offsets:
不成功的检查一下 kafka 服务配置 ,添加:
auto.create.topic.enable=true
二、发布者相关
三、Spring Boot连接相关
3.1 消费者相关
3.1.1 连接报错
详情: springboot启动后连接kafka集群报错:协调器不正确,标记协调器未知。
Request joining group due to: rebalance failed due to 'This is not the correct coordinator.' (NotCoordinatorException)
如下图:
解决方法
该问题出现原因一般是因为消费者类配置不正确,或提供group id 有问题。若不清楚group id请显示的指定分区,避免将错误的group id写入。
@KafkaListener(topicPartitions = { @TopicPartition(topic = "kafka-topic-test", partitions = { "0", "8" })
})
3.1.1.2 消费报错
详情: 消费成功,但报错 This error handler cannot process 'org.apache.kafka.common.errors.InvalidGroupIdException's; no record information is available
如下图:
解决方法
这个提示很明显,没有提供可用的 group id。我们只需要在消费者配置中设置一个即可,这个错误的原因是未设置group id,但又使用了subscribe(topic) 或基于 Kafka 的偏移量管理策略使用组管理功能引起。
3.1.3 数据过期
报错信息:org.apache.kafka.common.errors.TimeoutException: Expiring 10 record(s) for 主题名-0:120013 ms has passed since batch creation
报错原因:某些记录放入队列的速度比从客户端发送的速度快。
原因分析:生产者发送消息时,它们会存储在缓冲区中(在将它们发送到目标代理之前),并且记录会分组在一起以提高吞吐量。当新记录添加到批次中时,必须在可配置的时间窗口内发送,该时间窗口由 控制request.timeout.ms(默认设置为 30 秒)。如果批次在队列中的时间较长,则会抛出TimeoutException ,然后批次将记录从队列中删除,并且不会传递给代理。
解决方法: 增加 request.timeout.ms
的值,或者 减少 batch.size
的值,然后使 linger.ms
为 0 ,重启 kafka
备注:如果这些都无效!兄弟,你得看看网络了。