【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践

简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践

RocketMQ消费失败重试机制分析


今天我们分析一下RocketMQ消费重试机制,如果执行以下的RocketMQ的消费服务的代码

try {
            try {
                if (messageExtWrappers.size() > 0) {
                    try {
                        var22 = messageExtWrappers.iterator();
                        while(var22.hasNext()) {
                            messageExt = (MessageExt)var22.next();
                        }
                    } catch (Throwable var16) {
                        ;
                    }
                    this.consume(messageExtWrappers, context);
                }
                LOGGER.info("MQ_CON_SUCCESS {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});
                ConsumeConcurrentlyStatus var23 = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                return var23;
            } catch (MessageListenerConcurrentlyException var17) {
                LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var17});
                throw var17;
            } catch (Throwable var18) {
                LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var18});
                LOGGER.info("MQ_CON_RECONSUME {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});
                if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {
                    context.setDelayLevelWhenNextConsume(-1);
                }
            }
复制代码



当进行消费数据后得出的结果日志如下:


第一次消费

MQ_CON_MSG topic MSG MessageExt [queueId=1, storeSize=453, queueOffset=25, sysFlag=0, bornTimestamp=1566785215908, bornHost=/10.42.0.77:54608, storeTimestamp=1566785215908, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B77CE84, commitLogOffset=192401028, bodyCRC=53737244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={MIN_OFFSET=0, _catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15071, HASH_CODE=690132963, MAX_OFFSET=26, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785215911, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15072, UNIQ_KEY=0A2A004D000938AF386882EAA5A40112, WAIT=true}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 48, 52, 54, 56, 57, 52, 48, 52, 48, 56, 48], transactionId='null’}]



第一次retry(reconsumeTimes=1,DELAY=3)


MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1187, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785226241, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B785900, commitLogOffset=192436480, bodyCRC=893293938, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785226242, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1188, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=3, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]



第二次retry(reconsumeTimes=2, DELAY=4)


MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1209, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785256680, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B791399, commitLogOffset=192484249, bodyCRC=893293938, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785256728, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1210, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=4, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]




第三次retry(reconsumeTimes=3, DELAY=5)


MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1228, sysFlag=0, bornTimestamp=1566785215923, b


RocketMQ的默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别代表延迟level1-level18,为什么不是从1开始呢?

if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {
        context.setDelayLevelWhenNextConsume(-1);
}
复制代码


当重试达到满足条件的时候,不再重试,直接放到dlq队列里面。如果不控制的,会一直重试到最高DelayLevel 18。


从DefaultMQPullConsumerImpl类里面找到一段代码

public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
            if (UtilAll.isBlank(consumerGroup)) {
                consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
            }
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000L, this.defaultMQPullConsumer.getMaxReconsumeTimes());
        } catch (Exception var8) {
            this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), var8);
            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());
            String originMsgId = MessageAccessor.getOriginMessageId(msg);
            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
            newMsg.setFlag(msg.getFlag());
            MessageAccessor.setProperties(newMsg, msg.getProperties());
            MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic());
            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
        }
    }
复制代码

从代码中看到DelayTimeLevel =3+reconsumeTime

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());



所以默认重试时,实际是从3开始的,从时间的角度,也验证为什么会重试4次,而且每次间隔的时间是10s/30s/1m .


  • 此外发送时,延时等级设置的是9,那么5m之后消费者才能从broker取到该消息消费。消费失败或者异常时,消费者返回给broker,ConsumeConcurrentlyStatus.RECONSUME_LATER,所以broker会重发该消息,由于消费者设置了再次消费的延时级别为5,所以1m之后,消费者才能从broker取出该重发消息。由于consumer没有设置重试次数,所以,broker默认重发该消息16次,然后放入删除队列。


  • DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,出现异常会设置delayLevelWhenNextConsume,然后立即返回ConsumeConcurrentlyStatus.RECONSUME_LATER




相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 缓存
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
|
7月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
498 6
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
3420 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
526 11
RocketMQ原理—3.源码设计简单分析下
|
存储 消息中间件 网络协议
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
|
SQL 大数据 数据库
RocketMQ实战—1.订单系统面临的技术挑战
本文详细分析了一个订单系统的设计与技术挑战。首先,介绍了订单系统的整体架构、业务流程及负载情况,包括电商购物流程、核心和非核心业务流程,以及真实生产中的负载压力。接着,探讨了系统面临的主要技术问题:支付后发券、发红包等操作导致性能下降;退款流程复杂且易失败;与第三方系统耦合带来的不稳定;大数据团队直接查询数据库影响性能;秒杀活动时数据库压力剧增等。最后,通过放大100倍压力的方法,梳理了高并发下的技术挑战,如核心链路优化、后台线程补偿机制、第三方系统解耦、数据获取方式改进等,为订单系统的优化提供了全面的参考。
RocketMQ实战—1.订单系统面临的技术挑战
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!

热门文章

最新文章

相关产品

  • 云消息队列 MQ