【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ相关的消费问题以及原理分析总结

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ相关的消费问题以及原理分析总结

消息重复消费的问题


消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。




消息重复消费场景及解决办法


在什么情况下会发生RocketMQ的消息重复消费呢?



生产者重复发送场景


当系统的调用链路比较长的时候,比如,系统A调用系统B,系统B再把消息发送到RocketMQ中,在系统A调用系统B的时候。


如果系统B处理成功,但是迟迟没有将调用成功的结果返回给系统A的时候,系统A就会尝试重新发起请求给系统B,造成系统B重复处理,发起多条消息给RocketMQ造成重复消费。



消费者重复发送场景


在系统B发送消息给RocketMQ的时候,也有可能会发生和上面一样的问题,消息发送超时,结果系统B重试,导致RocketMQ接收到了重复的消息。



消费者重复发送场景


当RocketMQ成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交offset给RocketMQ,自己宕机或者重启了,那么RocketMQ没有接收到offset,就会认为消费失败了,会重发消息给消费者再次消费。



消费者没有立刻返回成功


重复消费的问题的一个可能的问题:消费者消费消息时产生了异常,并没有返回CONSUME_SUCCESS标志。


因为消息处理异常导致的消息重新消费,RocketMQ可以很好的保持消息,一定要消费成功才可以!

官方对comsumerMessage方法


It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
复制代码



无论如何,都不要抛出异常,如果需要重新消费,可以返回RECONSUME_LATER主动要求重新消费。

catch Exception根异常来捕获业务处理的异常:

consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    MessagePack msgpack = new MessagePack();
                    for (MessageExt msg : msgs){
                        byte[] data = msg.getBody();
                        try {
                            RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);
                            logger.debug("Receive a message:" + rtmsg);
                            anlysisRTMsgPack(rtmsg, engine);
                        } catch (IOException e) {
                            logger.error("Unpack RTMsg:", e);
                        } catch (Exception e1){
                            logger.warn("Unexcepted exception.", e1);
                        }
                    }
                    logger.debug("RETURN CONSUME SUCCESS.");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
 });
复制代码




设置CONSUME_FROM_LAST_OFFSET的问题


Consumer在消费时,会设置从哪里开始消费。默认是CONSUME_FROM_LAST_OFFSET,设置的值如代码所示。

public enum ConsumeFromWhere {
    /**
     * 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
    CONSUME_FROM_LAST_OFFSET,
    @Deprecated
   CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    /**
     * 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
     * 后续再启动接着上次消费的进度开始消费
     */
    CONSUME_FROM_FIRST_OFFSET,
    /**
     * 一个新的订阅组第一次启动从指定时间点开始消费<br>
     * 后续再启动接着上次消费的进度开始消费<br>
     * 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数
     */
    CONSUME_FROM_TIMESTAMP,
}
复制代码


  • CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费,是从该消费者上次消费到的位置开始消费。


  • 如果是一个新的消费者,就要根据这个client所属的消费组的情况来判断。
  • 如果所属的消费者组是新上线的,订阅的消息,最早的消息都没有过,RocketMQ的设计者认为,你这是一个新上线的业务,会强制从第一条消息开始消费。
  • 如果订阅的消息,已经产生了过期消息,那么才会从我们这个client启动的时间点开始消费。



ConsumeFromWhere这个参数只对一个新的消费者第一次启动时有效


  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费,
  • CONSUME_FROM_TIMESTAMP:从某个时间开始消费。
  • 而判断是不是一个新的ConsumerGroup是在broker端判断。
  • 消费到哪个offset最先是存在Consumer本地的,定时和broker同步自己的消费offset。
  • broker在判断是不是一个新的consumergroup,就是查broker端有没有这个consumergroup的offset记录。



偏移量无效化


对于一个新的queue,这个参数也是没用的,都是从0开始消费。


所以,这就有了一个问题我已经设置了CONSUME_FROM_LAST_OFFSET,为什么还是重复消费了,可能你这不是新的consumergroup,也可能是个新的Queue。



重试队列和死信队列


  • 消费端,一直不回传消费的结果。RocketMQ认为消息没收到,consumer下一次拉取,broker依然会发送该消息。
  • 任何异常都要捕获返回:ConsumeConcurrentlyStatus.RECONSUME_LATER

RocketMQ会放到重试队列,TOPIC是:%RETRY%+COnsumerGroup的名字

  • 重试的消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。
  • 而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,此时需要人工干预了。
/**
Batch consumption size
*/
private int consumeMessageBatchMaxSize = 1;
/**
Batch pull size
*/
private int pullBatchSize = 32;
复制代码



  • consumeMessageBatchMaxSize 是批量消费的最大条数
  • pullBatchSize 是每次拉取的最大条数



broker端的

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
复制代码


参数是设置重试的时间,即第一次1s之后,第二次5s之后

生产环境不要改

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s
复制代码


16次之后,多了一个topic名为:%DLQ%+consumergroup

image.png

这个默认的16次,可以改,但是使用DefaultMQPullConsumer才可以修改。


DefaultMQPushConsumer不能修改此值。


consumeMessageBatchMaxSize 这个size是消费者注册的回调listener一次处理的消息数,默认是1,不是每次拉取的消息数(默认是32),这个不要搞混。




消息消费进度的更新


未来的文章会进行介绍相关进度更新的功能和分析




相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
182 2
|
2月前
|
中间件 API 开发者
深入理解Python Web框架:中间件的工作原理与应用策略
在Python Web开发中,中间件位于请求处理的关键位置,提供强大的扩展能力。本文通过问答形式,探讨中间件的工作原理、应用场景及实践策略,并以Flask和Django为例展示具体实现。中间件可以在请求到达视图前或响应返回后执行代码,实现日志记录、权限验证等功能。Flask通过装饰器模拟中间件行为,而Django则提供官方中间件系统,允许在不同阶段扩展功能。合理制定中间件策略能显著提升应用的灵活性和可扩展性。
36 4
|
3月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
92 1
|
3月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
203 0
RocketMQ—一次连接namesvr失败的案例分析
|
4月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
4月前
|
中间件 API 开发者
深入理解Python Web框架:中间件的工作原理与应用策略
【7月更文挑战第19天】Python Web中间件摘要:**中间件是扩展框架功能的关键组件,它拦截并处理请求与响应。在Flask中,通过`before_request`和`after_request`装饰器模拟中间件行为;Django则有官方中间件系统,需实现如`process_request`和`process_response`等方法。中间件用于日志、验证等场景,但应考虑性能、执行顺序、错误处理和代码可维护性。
77 0
|
5月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1161 0
|
4月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
268 3

相关产品

  • 云消息队列 MQ