RocketMQ消费者没有成功消费消息的问题排查

简介: RocketMQ消费者没有成功消费消息的问题排查

背景

今天下游同事反馈,有一些以取消的订单库存还原异常了,导致部分商品库存没有还原。查日志发现没有收到还原消息,但是查看发送方是可以确认消息是已经发了的,那么是什么原因导致消费者没有收到,或者收到后没有处理消息呢。最后发现这些消息的状态都是NOT_ONLINE,原因是服务挂了,重启之后便可以重新消费了。让我们看看这个调查过程。

调查

消息丢失如何排查?当我们在使用mq的时候,经常会遇到消息消费异常的问题,原因有很多种,比如:

  • producer发送失败
  • consumer消费异常
  • consumer根本就没收到消息

「那么我们该如何排查了?」

其实借助RocketMQ-Dashboard就能高效的排查,里面有很多你想象不到的功能。

首先我们先查找期望消费的消息,查找的方式有很多种,根据消息id,时间等。

「消息没找到?」

说明proder发送异常,也有可能是消息过期了,因为rocketmq的消息默认保存72h,此时到producer端的日志进一步确认即可。

「消息找到了!」

接着看消息的消费状态,如下图消息的消费状态为NOT_ONLINE。

「NOT_ONLINE代表什么含义呢?」

别着急,我们一步步来分析,先看看TrackType到底有多少种状态。

1

2

3

4

5

6

7

8

publicenumTrackType {

    CONSUMED,

    CONSUMED_BUT_FILTERED,

    PULL,

    NOT_CONSUME_YET,

    NOT_ONLINE,

    UNKNOWN

}

每种类型的解释如下:

类型

解释

CONSUMED

消息已经被消费

CONSUMED_BUT_FILTERED

消息已经投递但被过滤

PULL

消息消费的方式是拉模式

NOT_CONSUME_YET

目前没有被消费

NOT_ONLINE

CONSUMER不在线

UNKNOWN

未知错误

「怎么判定消息已经被消费?」

上一节我们讲到,broker会用一个map来保存每个queue的消费进度,「如果queue的offset大于被查询消息的offset则消息被消费,否则没有被消费」(NOT_CONSUME_YET)。

我们在RocketMQ-Dashboard上其实就能看到每个队列broker端的offset(代理者位点)以及消息消费的offset(消费者位点),差值就是没有被消费的消息。

当消息都被消费时,差值为0,如下图所示:

「CONSUMED_BUT_FILTERED表示消息已经投递,但是已经被过滤掉了」。例如producer发的是topicA,tagA,但是consumer订阅的却是topicA,tagB。

「CONSUMED_BUT_FILTERED(消息已经被投递但被过滤)是怎么发生的呢?」

这个就不得不提到RocketMQ中的一个概念,「消息消费要满足订阅关系一致性,即一个consumerGroup中的所有消费者订阅的topic和tag必须保持一致,不然就会造成消息丢失」。

如下图场景,发送了4条消息,consumer1订阅了topica-taga,而consumer2订阅了topica-tab。consumer1消费q0中的数据,consumer2消费q1中的数据。

投递到q0的msg-1和msg-3只有msg-1能被正常消费,而msg-3则是CONSUMED_BUT_FILTERED。因为msg-3被投递到q0,但是consumer1不消费tagb的消息导致消息被过滤,造成消息丢失。

同理msg-2这条消息也会丢失。

「注意,还有一个非常重要的点」!

虽然消息消费失败了,但是消息的offset还会正常提交,即 「消息消费失败了,但是状态也会是CONSUMED」。

「RocketMQ认为消息消费失败需要重试的场景有哪些?」

  • 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 返回null
  • 主动或被动抛出异常

「那么消费失败的消息去哪了呢?」

当消息消费失败,会被放到重试队列中,Topic名字为%RETRY% + consumerGroup。

「Consumer没订阅这个topic啊,怎么才能消费到重试消息?」

其实在Consumer启动的时候,框架内部帮你订阅了这个topic,所以重试消息能被消费到。

「另外消息不是一直重试,而是每隔1段时间进行重试」

 

第几次重试

与上次重试的间隔时间

第几次重试

与上次重试的间隔时间

1

10 秒

9

7 分钟

2

30 秒

10

8 分钟

3

1 分钟

11

9 分钟

4

2 分钟

12

10 分钟

5

3 分钟

13

20 分钟

6

4 分钟

14

30 分钟

7

5 分钟

15

1 小时

8

6 分钟

16

2 小时

 

当消息超过最大消费次数16次,会将消息投递到死信队列中,死信队列的topic名为%DLQ% + consumerGroup。

「因此当你发现消息状态为CONSUMED,但是消费失败时,去重试队列和死信队列中找就行了」。

 

本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
115 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
83 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
68 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
59 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
75 0
|
5月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
151 2
|
5月前
|
消息中间件 负载均衡 Apache
【RocketMQ系列七】消费者和生产者的实现细节
【RocketMQ系列七】消费者和生产者的实现细节
149 1
下一篇
DataWorks