RocketMq重复消费问题排查

简介: RocketMq重复消费问题排查

前情

出现了重复消费的问题,同一个消息被重复消费了多次,导致了用户端收到了多条重复的消息,最终排查发现,是因为消费者在处理消息的方法onMessage中有异常没有捕获到,导致异常上抛,被consumeMessage捕获并判定为消费失败,从而放到了重试队列当中进行重试,下面我们就来看看RocketMq中会引起消息重试的两种情况,内部异常和消费超时。

源码

在Consumer中处理消息时,会在消费完消息后判断消费的总时长,如果比超时时间要长则返回TIME_OUT,注意这里的超时是在consumeMessage内部逻辑处理完毕之后在进行判断的,如果内部逻辑处理成功,但耗时较长,那么也会被判断为超时。

在DefaultMQPushConsumer.java中定义了消费的超时时间为15分钟。

consumeMessage方法中会有两种返回状态,正常的状态消费成功CONSUME_SUCCESS和出现异常时的重试状态RECONSUME_LATER。

如果消费时长超过超时时间那么即便consumeMessage方法处理成功,返回状态也是TIME_OUT。

代码

class ConsumeRequest implements Runnable {
        private final List<MessageExt> msgs;
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;
        public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
            this.msgs = msgs;
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }
        public List<MessageExt> getMsgs() {
            return msgs;
        }
        public ProcessQueue getProcessQueue() {
            return processQueue;
        }
        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }
            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
            }
            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
            }
            if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
            }
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }
        public MessageQueue getMessageQueue() {
            return messageQueue;
        }
    }
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 RocketMQ 存储
rocketMq - 并发消费过程
rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。 并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发消息的ack机制,消费进度的持久化,这篇分享会就这几个问题分解展开。
3428 0
|
9天前
|
消息中间件 人工智能 Java
RocketMQ重复消费的症状以及解决方案
RocketMQ重复消费的症状以及解决方案
|
4月前
|
消息中间件 存储 编解码
RocketMQ系列 | 全网最全的导致RocketMQ消息“丢失”的几个场景都在这了,肯定有你不知道!
发送时会丢失消息、消息存储场景丢失消息、消费时会丢失消息
145 1
|
8月前
|
消息中间件 弹性计算 运维
消息队列RocketMQ版:消费异常运维排查体验
本实验场景介绍消息队列RocketMQ版的可观测工具功能,通过示例程序模拟生产环境消费业务故障,并通过产品提供的开箱即用的可观测工具定位消费异常。
832 0
消息队列RocketMQ版:消费异常运维排查体验
|
9月前
|
消息中间件 存储 负载均衡
RocketMQ的消费逻辑
最基础的实现逻辑。
128 0
|
9月前
|
消息中间件 弹性计算 Java
RocketMQ-没有消费者的消息堆积场景分析
RocketMQ-没有消费者的消息堆积场景分析
244 1
|
10月前
|
消息中间件 Kafka
Kafka消息的重复消费问题如何解决的 ?
Kafka 通过使用消费者组(Consumer Group)来解决消息的重复消费问题。
1190 0
|
11月前
|
消息中间件 负载均衡 算法
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。 为什么需要保证幂等性呢?是因为消息会重复消费。 为什么消息会重复消费? 明明已经消费了,为什么消息会被再次被消费呢? 不同的MQ产生的原因可能不一样 本文就以RocketMQ为例,来扒一扒RocketMQ中会导致消息重复消息的原因,最终你会发现,其实消息重复消费算是RocketMQ无奈的“bug”。
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
|
12月前
|
消息中间件 负载均衡 JavaScript
RocketMQ源码中,7种导致重复消费的坑!
RocketMQ源码中,7种导致重复消费的坑!
|
12月前
|
消息中间件 存储 负载均衡
RocketMQ 顺序消费机制
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 顺序消息分为**分区顺序消息**和**全局顺序消息**。
3151 1
RocketMQ 顺序消费机制