RocketMq重复消费问题排查

简介: RocketMq重复消费问题排查

前情

出现了重复消费的问题,同一个消息被重复消费了多次,导致了用户端收到了多条重复的消息,最终排查发现,是因为消费者在处理消息的方法onMessage中有异常没有捕获到,导致异常上抛,被consumeMessage捕获并判定为消费失败,从而放到了重试队列当中进行重试,下面我们就来看看RocketMq中会引起消息重试的两种情况,内部异常和消费超时。

源码

在Consumer中处理消息时,会在消费完消息后判断消费的总时长,如果比超时时间要长则返回TIME_OUT,注意这里的超时是在consumeMessage内部逻辑处理完毕之后在进行判断的,如果内部逻辑处理成功,但耗时较长,那么也会被判断为超时。

在DefaultMQPushConsumer.java中定义了消费的超时时间为15分钟。

consumeMessage方法中会有两种返回状态,正常的状态消费成功CONSUME_SUCCESS和出现异常时的重试状态RECONSUME_LATER。

如果消费时长超过超时时间那么即便consumeMessage方法处理成功,返回状态也是TIME_OUT。

代码

class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;
        public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
            this.msgs = msgs;
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }
        public List<MessageExt> getMsgs() {
            return msgs;
        }
        public ProcessQueue getProcessQueue() {
            return processQueue;
        }
        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }
            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }
            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
            }
            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }
        public MessageQueue getMessageQueue() {
            return messageQueue;
        }
    }
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之如何排查是哪个队列导致的异常TPS增加
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
消息中间件 弹性计算 运维
消息队列RocketMQ版:消费异常运维排查体验
本实验场景介绍消息队列RocketMQ版的可观测工具功能,通过示例程序模拟生产环境消费业务故障,并通过产品提供的开箱即用的可观测工具定位消费异常。
消息队列RocketMQ版:消费异常运维排查体验
|
消息中间件 RocketMQ
RocketMQ消费者没有成功消费消息的问题排查
RocketMQ消费者没有成功消费消息的问题排查
1637 1
|
消息中间件 Java RocketMQ
《Rocket MQ 使用排查指南8-13》电子版地址
发送消息耗时太久?客户端发送常见异常报错?启动发送端连接异常?Java 进程消息堆积严重...这些问题都有答案啦!详细的排查步骤和问题回答帮你快速搞定Rocket MQ使用过程中的各类问题。还有细致的问题原因分析和最佳的问题解决方案。100+常见问题,《Rocket MQ 使用排查指南》一本搞定!还等什么?即刻下载阅读吧!
71 0
《Rocket MQ 使用排查指南8-13》电子版地址
|
消息中间件 Java RocketMQ
《Rocket MQ 使用排查指南8-13》电子版
发送消息耗时太久?客户端发送常见异常报错?启动发送端连接异常?Java 进程消息堆积严重...这些问题都有答案啦!详细的排查步骤和问题回答帮你快速搞定Rocket MQ使用过程中的各类问题。还有细致的问题原因分析和最佳的问题解决方案。100+常见问题,《Rocket MQ 使用排查指南》一本搞定!还等什么?即刻下载阅读吧!
122 0
《Rocket MQ 使用排查指南8-13》电子版
|
消息中间件 Java RocketMQ
《Rocket MQ 使用排查指南8-13》电子版下载
发送消息耗时太久?客户端发送常见异常报错?启动发送端连接异常?Java 进程消息堆积严重...这些问题都有答案啦!详细的排查步骤和问题回答帮你快速搞定Rocket MQ使用过程中的各类问题。还有细致的问题原因分析和最佳的问题解决方案。100+常见问题,《Rocket MQ 使用排查指南》一本搞定!还等什么?即刻下载阅读吧!
87 0
《Rocket MQ 使用排查指南8-13》电子版下载
|
消息中间件 Java RocketMQ
《Rocket MQ 使用排查指南8-13》电子版下载地址
发送消息耗时太久?客户端发送常见异常报错?启动发送端连接异常?Java 进程消息堆积严重...这些问题都有答案啦!详细的排查步骤和问题回答帮你快速搞定Rocket MQ使用过程中的各类问题。还有细致的问题原因分析和最佳的问题解决方案。100+常见问题,《Rocket MQ 使用排查指南》一本搞定!还等什么?即刻下载阅读吧!
77 0
《Rocket MQ 使用排查指南8-13》电子版下载地址
EMQ
|
消息中间件 存储 网络性能优化
MQTT 客户端出现连接订阅等问题时如何排查?
这是一期EMQX社区专题FAQ,我们整理了近期社区中关注度较高的问题,在这里进行统一汇总解答。
EMQ
436 0
|
消息中间件 Java RocketMQ
Rocket MQ 使用排查指南
Rocket MQ 使用排查指南
187 0
|
消息中间件 运维 监控
一次 RocketMQ 进程自动退出排查经验分享(实战篇)
一次 RocketMQ 进程自动退出排查经验分享(实战篇)
一次 RocketMQ 进程自动退出排查经验分享(实战篇)
下一篇
DataWorks