rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析串行消费问题。
由于串行消费和并行消费的大体逻辑都是相同的,所以建议先看rocketMq - 并发消费过程文章,本章只会针对不同的地方进行说明。
为什么有序消费能够保证消息被顺序消费?
为什么有序消费能够保证消息被顺序消费?
为什么有序消费能够保证消息被顺序消费?
答案是:
1、顺序消费的顺序是有序保存在ProcessQueue的TreeMap对象中,key为消息的偏移量,也就是一个messageQueue拉取的消息有序放置在ProcessQueue当中
2、每次消费的时候都是按序从ProcessQueue中按顺序拷贝待消费任务到临时的TreeMap对象当中
3、消费失败后依旧会重新消费刚刚消费失败的那部分任务
4、每次pullRequest执行完成后都会触发一次ConsumeRequest任务,会在原来的TreeMap对象中加入新的待消费的消息
说明:参见DefaultMQPushConsumerImpl类
说明:参见ConsumeMessageOrderlyService类
1、takeMessages操作会按照顺序情况从processQueue的msgTreeMap中获取(TreeMap是有序对象)
说明:参见ConsumeMessageOrderlyService类
1、如果保证顺序消费呢,获取待消费数据的时候按照顺序进行获取放到临时TreeMap中并删除全局TreeMap里面的message对象
2、消费成功后清除临时TreeMap里面的消息
3、消费失败后将消息对象的重试次数+1然后重新提交ConsumeRequest,如果已经超出重试次数就丢弃。再次投递是一个延迟多少时间后消费的任务
4、和无序消费的任务相比,没有重新发送broker的重试队列这一步
说明:参见ConsumeMessageOrderlyService类
1、消费成功后会持久化成功消费的偏移量,本地保存消费偏移量
2、定时任务定时同步消费偏移量到broker进行保存