rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。
并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发消息的ack机制,消费进度的持久化,这篇分享会就这几个问题分解展开。
其他逻辑
1、consumer会定期向broker同步ack消息偏移量,也就是已经消费的位置。
2、极端情况下consumer会因为一个消息一直失败导致ack消息偏移量无法前进,但是因为会有定时任务去清楚过期消息,所以ack进度正常便宜。
并发消费整体流程
说明:
1、Rebalance负责生成pullRequest放置到pullRequestQueue当中。
2、PullMessageService负责消费pullRequest来完成数据的拉取。
3、数据拉取后生成ConsumeRequest对象投递到consumeExecutor的线程池当中
4、ConsumeRequest是一个线程实例,负责消费拉取的消息。
5、消费消息成功就从ConsumeRequest的ProcessQueue中删除,消费失败就投递到broker的重试队列中,重试次数和延迟粒度在broker端处理。
6、consumeRequest内部维持的processQueue作为一个TreeMap对象可以维持消息的有序性,用于判断消费进度。
7、pullRequest在消费完以后还是再次投递到pullRequestQueue当中。
pullRequest执行过程
说明:参见PullMessageService类
1、单线程循环消费pullRequest。
说明:参见PullMessageService类
1、消费过程中进行一些状态判断以及流速控制
说明:参见DefaultMQPushConsumerImpl类
1、区分有序消费和无须消费
2、无序消费会判断消费偏移量是否差别过大
说明:参见DefaultMQPushConsumerImpl类
1、处理拉取消息的后续操作
2、处理完以后再次投递pullRequest请求
说明:参见PullAPIWrapper类
说明:参见PullAPIWrapper类
说明:参见ConsumeMessageConcurrentlyService类。
1、拉取消息成功后设置下一次拉取的偏移量。
2、更新拉取的消息到processQueue当中。
3、再次投递pullRequest发起下一次拉取。
说明:参见ConsumeMessageConcurrentlyService类
1、分一次能够处理完成和分多次能够处理完成。
说明:
1、processQueue是待处理消息保存位置,里面核心数据结构之一为TreeMap
2、messageQueue就是这个ConsumeRequest负责处理的messageQueue
说明:参见ConsumeMessageConcurrentlyService类
1、consumer消费拉取消息的逻辑及后续处理
说明:参见ConsumeMessageConcurrentlyService类
1、消费成功就删除所有拉取的消息
说明:参见SendMessageProcessor类
1、处理逻辑在consumerSendMsgBack方法中
2、里面涉及到延迟粒度和重试次数的设置
3、消息是被投递到延迟队列当中的
说明:参见MQClientInstance类
1、在persistAllConsumerOffset定期持久化消费偏移量
2、消费偏移量由ConsumerRequest请求在处理的过程中变更的
说明:参见DefaultMQPushConsumerImpl类
1、处理没有从broker拉取消息的过程
2、再次投递pullRequest请求