sendKernelImpl()
构建发送消息的
请求头部 RequestHeader
上下文SendMessageContext
然后调用方法MQClientAPIImpl#sendMessage(),将消息发送给队列所在Broker。
至此,消息被发送给远程调用的封装类MQClientAPIImpl,完成后续序列化和网络传输等步骤。
RocketMQ的Producer无论同步还是异步发送消息,都统一到了同一流程。
异步发送消息的实现,也是通过一个线程池,在异步线程执行的调用和同步发送相同的底层方法来实现的。
方法的一个参数区分同步or异步发送
这使得整个流程统一,很多同步异步代码可复用,代码结构清晰简单,易维护。
使用同步发送,当前线程会阻塞等待服务端的响应,直到收到响应或者超时方法才会返回,所以在业务代码调用同步发送的时候,只要返回成功,消息就一定发送成功了。
而异步发送,发送的逻辑都是在Executor的异步线程中执行的,所以不会阻塞当前线程,当服务端返回响应或者超时之后,Producer会调用Callback方法来给业务代码返回结果。业务代码需要在Callback中来判断发送结果。
总结
本文分析了RocketMQ客户端消息生产的实现过程,包括Producer初始化和发送消息的主流程。Producer中包含的几个核心的服务都是有状态的,在Producer启动时,由MQClientInstance类中来统一启动。
在发送消息的流程中,RocketMQ分了三种发送方式:
- 单向
- 同步
- 异步
这三种发送方式对应的发送流程基本相同,同步和异步发送由已封装好的MQClientAPIImpl类分别实现。
面试场景快问快答
DefaultMQProducer有个属性defaultTopicQueueNums,它是用来设置topic的ConsumeQueue的数量的吗?有同学可能认为consumeQueue的数量是创建topic的时候指定的,跟producer没有关系,那这参数有什么用呢?
这参数是控制客户端在生产消费的时候会访问同一个主题的队列数量,假设一个主题有100个队列,对每个客户端,它没必要100个队列都访问,只需使用其中几个队列。
在RocketMq的控制台上可以创建topic,需要指定writeQueueNums,readQueueNums,perm,这三个参数是有什么用呢?这里为什么要区分写跟读队列呢?不应该只有一个consumeQueue?
writeQueueNums和readQueueNums是在服务端来控制每个客户端在生产和消费的时候,分别访问多少个队列。这两参数是服务端参数,优先级高于客户端控制的参数defaultTopicQueueNums的。perm是设置Topic读写等权限的参数。
用户请求–>异步处理—>用户收到响应结果。异步处理的作用是:用更少的线程来接收更多的用户请求,然后异步处理业务逻辑。异步处理完后,如何将结果通知给原先的用户呢?即使有回调接口,我理解也是给用户发个短信之类的处理,那结果怎么返回到定位到用户,并返回之前请求的页面上呢?需要让之前的请求线程阻塞吗?那也无法达到【用更少的线程来接收更多的用户请求】的目的丫。
如果局限于:“APP/浏览器 --[http协议]–>web 服务”这样的场景,受限于http协议,前端和web服务的交互一定是单向和同步的。一定要等待结果然后返回响应,但是,这种情况仍然可以使用异步方法。像spring web这种框架,它把处理web请求都给你封装好了,你只要写个handler很方便。但这handler只能是一个同步方法,它必须在返回值中给出响应结果,所以导致很多同学思维转不过来。


