消息重复消费的问题
消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。
消息重复消费场景及解决办法
在什么情况下会发生RocketMQ的消息重复消费呢?
生产者重复发送场景
当系统的调用链路比较长的时候,比如,系统A调用系统B,系统B再把消息发送到RocketMQ中,在系统A调用系统B的时候。
如果系统B处理成功,但是迟迟没有将调用成功的结果返回给系统A的时候,系统A就会尝试重新发起请求给系统B,造成系统B重复处理,发起多条消息给RocketMQ造成重复消费。
消费者重复发送场景
在系统B发送消息给RocketMQ的时候,也有可能会发生和上面一样的问题,消息发送超时,结果系统B重试,导致RocketMQ接收到了重复的消息。
消费者重复发送场景
当RocketMQ成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交offset给RocketMQ,自己宕机或者重启了,那么RocketMQ没有接收到offset,就会认为消费失败了,会重发消息给消费者再次消费。
消费者没有立刻返回成功
重复消费的问题的一个可能的问题:消费者消费消息时产生了异常,并没有返回CONSUME_SUCCESS标志。
因为消息处理异常导致的消息重新消费,RocketMQ可以很好的保持消息,一定要消费成功才可以!
官方对comsumerMessage方法
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure 复制代码
无论如何,都不要抛出异常,如果需要重新消费,可以返回RECONSUME_LATER主动要求重新消费。
catch Exception根异常来捕获业务处理的异常:
consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); MessagePack msgpack = new MessagePack(); for (MessageExt msg : msgs){ byte[] data = msg.getBody(); try { RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class); logger.debug("Receive a message:" + rtmsg); anlysisRTMsgPack(rtmsg, engine); } catch (IOException e) { logger.error("Unpack RTMsg:", e); } catch (Exception e1){ logger.warn("Unexcepted exception.", e1); } } logger.debug("RETURN CONSUME SUCCESS."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 复制代码
设置CONSUME_FROM_LAST_OFFSET的问题
Consumer在消费时,会设置从哪里开始消费。默认是CONSUME_FROM_LAST_OFFSET,设置的值如代码所示。
public enum ConsumeFromWhere { /** * 一个新的订阅组第一次启动从队列的最后位置开始消费<br> * 后续再启动接着上次消费的进度开始消费 */ CONSUME_FROM_LAST_OFFSET, @Deprecated CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST, @Deprecated CONSUME_FROM_MIN_OFFSET, @Deprecated CONSUME_FROM_MAX_OFFSET, /** * 一个新的订阅组第一次启动从队列的最前位置开始消费<br> * 后续再启动接着上次消费的进度开始消费 */ CONSUME_FROM_FIRST_OFFSET, /** * 一个新的订阅组第一次启动从指定时间点开始消费<br> * 后续再启动接着上次消费的进度开始消费<br> * 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数 */ CONSUME_FROM_TIMESTAMP, } 复制代码
- CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费,是从该消费者上次消费到的位置开始消费。
- 如果是一个新的消费者,就要根据这个client所属的消费组的情况来判断。
- 如果所属的消费者组是新上线的,订阅的消息,最早的消息都没有过,RocketMQ的设计者认为,你这是一个新上线的业务,会强制从第一条消息开始消费。
- 如果订阅的消息,已经产生了过期消息,那么才会从我们这个client启动的时间点开始消费。
ConsumeFromWhere这个参数只对一个新的消费者第一次启动时有效
- CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费,
- CONSUME_FROM_TIMESTAMP:从某个时间开始消费。
- 而判断是不是一个新的ConsumerGroup是在broker端判断。
- 消费到哪个offset最先是存在Consumer本地的,定时和broker同步自己的消费offset。
- broker在判断是不是一个新的consumergroup,就是查broker端有没有这个consumergroup的offset记录。
偏移量无效化
对于一个新的queue,这个参数也是没用的,都是从0开始消费。
所以,这就有了一个问题我已经设置了CONSUME_FROM_LAST_OFFSET,为什么还是重复消费了,可能你这不是新的consumergroup,也可能是个新的Queue。
重试队列和死信队列
- 消费端,一直不回传消费的结果。RocketMQ认为消息没收到,consumer下一次拉取,broker依然会发送该消息。
- 任何异常都要捕获返回:ConsumeConcurrentlyStatus.RECONSUME_LATER
RocketMQ会放到重试队列,TOPIC是:%RETRY%+COnsumerGroup的名字
- 重试的消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。
- 而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,此时需要人工干预了。
/** Batch consumption size */ private int consumeMessageBatchMaxSize = 1; /** Batch pull size */ private int pullBatchSize = 32; 复制代码
- consumeMessageBatchMaxSize 是批量消费的最大条数
- pullBatchSize 是每次拉取的最大条数
broker端的
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 复制代码
参数是设置重试的时间,即第一次1s之后,第二次5s之后
生产环境不要改
messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 复制代码
16次之后,多了一个topic名为:%DLQ%+consumergroup
这个默认的16次,可以改,但是使用DefaultMQPullConsumer才可以修改。
DefaultMQPushConsumer不能修改此值。
consumeMessageBatchMaxSize 这个size是消费者注册的回调listener一次处理的消息数,默认是1,不是每次拉取的消息数(默认是32),这个不要搞混。
消息消费进度的更新
未来的文章会进行介绍相关进度更新的功能和分析