线上kafka消息堆积,consumer掉线,怎么办?

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 线上kafka消息堆积,consumer掉线,怎么办?

线上kafka消息堆积,所有consumer全部掉线,到底怎么回事?

最近处理了一次线上故障,具体故障表现就是kafka某个topic消息堆积,这个topic的相关consumer全部掉线。

整体排查过程和事后的复盘都很有意思,并且结合本次故障,对kafka使用的最佳实践有了更深刻的理解。

好了,一起来回顾下这次线上故障吧,最佳实践总结放在最后,千万不要错过。


1、现象


  • 线上kafka消息突然开始堆积
  • 消费者应用反馈没有收到消息(没有处理消息的日志)
  • kafka的consumer group上看没有消费者注册
  • 消费者应用和kafka集群最近一周内没有代码、配置相关变更


2、排查过程


服务端、客户端都没有特别的异常日志,kafka其他topic的生产和消费都是正常,所以基本可以判断是客户端消费存在问题。

所以我们重点放在客户端排查上。

1)arthas在线修改日志等级,输出debug

由于客户端并没有明显异常日志,因此只能通过arthas修改应用日志等级,来寻找线索。

果然有比较重要的发现:

2022-10-25 17:36:17,774 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Disabling heartbeat thread
2022-10-25 17:36:17,773 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Sending LeaveGroup request to coordinator xxxxxx (id: 2147483644 rack: null)

看起来是kafka-client自己主动发送消息给kafka集群,进行自我驱逐了。因此consumer都掉线了。


2)arthas查看相关线程状态变量
用arthas vmtool命令进一步看下kafka-client相关线程的状态。

640.png

可以看到 HeartbeatThread线程状态是WAITING,Cordinator状态是UNJOINED。

此时,结合源码看,大概推断是由于消费时间过长,导致客户端自我驱逐了。

于是立刻尝试修改max.poll.records,减少一批拉取的消息数量,同时增大max.poll.interval.ms参数,避免由于拉取间隔时间过长导致自我驱逐。

参数修改上线后,发现consumer确实不掉线了,但是消费一段时间后,还是就停止消费了。


3、最终原因


相关同学去查看了消费逻辑,发现了业务代码中的死循环,确认了最终原因。

消息内容中的一个字段有新的值,触发了消费者消费逻辑的死循环,导致后续消息无法消费。同时,消费阻塞导致消费者自我驱逐,partition重新reblance,所有消费者逐个自我驱逐。

这里核心涉及到kafka的消费者和kafka之间的保活机制,可以简单了解一下。

640.png

kafka-client会有一个独立线程HeartbeatThread跟kafka集群进行定时心跳,这个线程跟lisenter无关,完全独立。

根据debug日志显示的“Sending LeaveGroup request”信息,我们可以很容易定位到自我驱逐的逻辑。

640.png

HeartbeatThread线程在发送心跳前,会比较一下当前时间跟上次poll时间,一旦大于max.poll.interval.ms 参数,就会发起自我驱逐了。


4、进一步思考


虽然最后原因找到了,但是回顾下整个排查过程,其实并不顺利,主要有两点:

  • kafka-client对某个消息消费超时能否有明确异常?而不是只看到自我驱逐和rebalance
  • 有没有办法通过什么手段发现 消费死循环?


4.1 kafka-client对某个消息消费超时能否有明确异常?


4.1.1 kafka似乎没有类似机制


我们对消费逻辑进行断点,可以很容易看到整个调用链路。

640.png

对消费者来说,主要采用一个线程池来处理每个kafkaListener,一个listener就是一个独立线程。

这个线程会同步处理 poll消息,然后动态代理回调用户自定义的消息消费逻辑,也就是我们在@KafkaListener中写的业务。

640.png

所以,从这里可以知道两件事情。

第一点,如果业务消费逻辑很慢或者卡住了,会影响poll。

第二点,这里没有看到直接设置消费超时的参数,其实也不太好做。

因为这里做了超时中断,那么poll也会被中断,是在同一个线程中。所以要么poll和消费逻辑在两个工作线程,要么中断掉当前线程后,重新起一个线程poll。

所以从业务使用角度来说,可能的实现,还是自己设置业务超时。比较通用的实现,可以是在消费逻辑中,用线程池处理消费逻辑,同时用Future get阻塞超时中断。

google了一下,发现kafka 0.8 曾经有consumer.timeout.ms这个参数,但是现在的版本没有这个参数了,不知道是不是类似的作用。


4.1.2 RocketMQ有点相关机制


然后去看了下RocketMQ是否有相关实现,果然有发现。

在RocketMQ中,可以对consumer设置consumeTimeout,这个超时就跟我们的设想有一点像了。

consumer会启动一个异步线程池对正在消费的消息做定时做 cleanExpiredMsg() 处理。

640.png

注意,如果消息类型是顺序消费(orderly),这个机制就不生效。

如果是并发消费,那么就会进行超时判断,如果超时了,就会将这条消息的信息通过sendMessageBack() 方法发回给broker进行重试。

640.png

如果消息重试超过一定次数,就会进入RocketMQ的死信队列。

spring-kafka其实也有做类似的封装,可以自定义一个死信topic,做异常处理


4.2 有办法快速发现死循环吗?


一般来说,死循环的线程会导致CPU飙高、OOM等现象,在本次故障中,并没有相关异常表现,所以并没有联系到死循环的问题。

那通过这次故障后,对kafka相关机制有了更深刻了解,poll间隔超时很有可能就是消费阻塞甚至死循环导致。

所以,如果下次出现类似问题,消费者停止消费,但是kafkaListener线程还在,可以直接通过arthas的 thread id 命令查看对应线程的调用栈,看看是否有异常方法死循环调用。


5、最佳实践


通过此次故障,我们也可以总结几点kafka使用的最佳实践:

  • 使用消息队列进行消费时,一定需要多考虑异常情况,包括幂等、耗时处理(甚至死循环)的情况。
  • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理,并最好做超时控制。
  • 减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。
  • 参考以下说明调整参数值:max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。max.poll.interval.ms: 该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。


往期热门笔记合集推荐:

  • HBase原理与实战笔记合集
  • MySQL实战笔记合集
  • Canal/Otter源码与实战笔记合集
  • Java实战技巧笔记合集
目录
相关文章
|
8月前
|
消息中间件 Kafka API
kafka Consumer high-level api 之白名单
kafka Consumer high-level api 之白名单
|
2月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
96 4
|
4月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
394 4
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
164 4
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
123 2
|
5月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
100 8
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
119 7
|
5月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
214 4