Rocketmq服务器和消费者之间的消息传输有push(推模式)和pull(拉模式)两种。
Push模式:底层实现也是使用拉取模式,由Rocketmq客户端底层封装实现自动消息拉取,拉取到的消息会回调消费者注册的监听器函数,把消息传给消费者,让消费者进行消费。根据拉取的结果,设置队列下一次拉取的偏移量进行下一次拉取任务。
Pull消费模式:更加灵活,可以让消费者自己决定拉取消息时机和更新队列的偏移量,当然使用上也变得复杂一些。
本文主要讲解push模式的底层实现,pull模式在4.7版本之前,调用pull方法是直接请求服务端拉取消息,4.7版本的pull模式后面文章会单独再讲解。
Push模式的主要流程
主要流程图:(可以点击图片放大)
每个步骤的讲解:
- 1.PullMessageService线程从阻塞队列中获取PullRequest任务(PullRequest对象什么时候放进阻塞队列的呢?后面讲解。)
- 2.将PullRequest对象交给DefaultMQPushConsumerImpl类处
题外话:如果有发生了拉取流控会打印日志,我们排查消费者端是否出现了流控可以查看rocketmqClient.log日志进行排查,关键字”so do flow control”
- b. 如果是顺序消费还需要判断是否获取到该队列的锁,该锁要向broker申请,因为顺序消息要只有在获取到队列的锁的情况下,才能进行消息的拉取,保证了消息的顺序。
- c. 条件不满足就把PullRequest对象,延迟一定时间,重新放到PullMessageService的队列里。
- a. 流控处理(拉取到本地的消息条数和大小最小和最大偏移量间隔),可以防止太多消息拉取到本地,导致内存溢出。
- 3.PullAPIWrapper对PullRequest对象进行包装,主要查找broker地址,封装请求对象。
- 4.MQClientAPIImpl选择异步还是同步方式,push是异步方式。
- 5.NettyRemotingClient调用invokeAsynImpl方法。
- a. 根据borker地址获取netty的channel
- b. 获取信号量,防止太多请求
- c. 生成ResponseFuture放到Map里,Map的key是请求的id,这个id很关键,每次请求都会生成,响应回来会带上这个id,然后通过这个id找到对应的ResponseFuture,然后调用回调函数做对应的业务逻辑处理
- d. 把请求通过netty发送到broker,netty发送是异步的,所以线程立马就可以返回了。
- 6.继续步骤一
- 7.Netty接收响应的线程接收到响应结果,解析响应中包含的请求id,从Map(5-c步骤保存的)中找到对应的ResponseFuture执行回调函数
- a. 根据返回结果,把拉取到消息放到DefaultMQPushConsumerImpl的消费线程池进行消费(消费主要步骤1.把消息传给消费者注册的监听函数 2.根据消费情况进行队列的偏移量更新)
- b. 把设置下一次偏移量,生成PullRequest任务延迟或者立马放到PullMessageService线程的队列里,这是第一种PullRequest对象的来源。
第二种PullRequest对象的来源:
队列重平衡时,将新分配的队列生成PullRequest对象,然后放入队列,所以PullRequest对象和队列是一一对应的。
public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
Push模式下的线程模型
分成3种不同职责的线程:
PullMessageService线程
Push模式流程下的1-6步骤都是由PullMessageService线程,单个线程进行执行,也就是说所有的PullRequest任务都是由此线程来执行;
为什么要这么做呢?
- 1-6步骤都不涉及耗时操作,可以处理很快,单线程足够了,多线程会导致上下文切换,共享资源的锁操作。
- 如果多线程向服务端broker拉取消息,就会存在并发问题,对偏移量的处理就会变得复杂和影响性能。
Netty响应线程
主要对响应结果的解析,执行回调函数
- 把消息放到一个消费线程池单独异步处理
- 生成下一次拉取PullRequest任务放到队列里
拉取线程和消费线程独立,这样可以做到拉取和消费互不影响。
消费消息线程
拉取到的消息进行调用消费者注册的监听函数进行消费,逻辑业务处理。
消费完成的消息的偏移量的更新。
小结
- 1.push模式下的底层模式,主要是执行PullRequest任务进行消息的拉取,要弄明白PullRequest的来源。来源有两个,一个是消费者组队列重平衡时,新分配的队列会分配生成PullRequest任务;二是接收到响应后会生成下一次的PullRequest任务。
- 2.push模式下的线程模型,拉取和消费是线程隔离的,这样拉取和消费互不影响,实现性能最大化。