什么是Request-Reply?
RocketMQ 中"Request-Reply"模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程。
RocketMQ从4.6.0版本开始支持这种模式。这种模式的流程如下图:
与RPC的不同
RocketMQ的这种调用方式跟dubbo之类的RPC调用非常类似,那为什么不使用dubbo?而要使用RocketMQ的这种RPC调用呢?原因如下:
- 基于RocketMQ来实现RPC可以快速搭建服务的消息总线,实现自己的RPC框架。
- 基于RocketMQ来实现RPC可以方便的收集调用的相关信息,能够实现调用链路追踪和分析。
- 基于RocketMQ来实现RPC既可以解耦两个系统之间的依赖,也可以实现跨网络区域实现系统间的同步调用,这里RocketMQ扮演的是一个类似于网关的角色。
Request-Reply的实现逻辑
在以上图中,可以看到,使用RPC模式还是三方:Producer、Broker、Consumer。在Producer中进行消息的发送时,可以随便指定Topic,但是需要送入reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。
而在进行发送消息时,也有两种模式,一种同步阻塞,另外一种异步非阻塞,这些跟之前普通消息的三种发送方式类似。Broker端除了Producer发送时指定的Topic之外,还有一个Reply_Topic,这个以集群名_REPLY_TOPIC命名(不管RPC生产者主题有多少,这个在一个集群中只有一个),主要用于Consumer响应RPC消息的路由发现。Consumer端除了消费监听之外,还需要加入一个消息的生产(用于RPC的响应消息),必须使用客户端提供的MessageUtil进行消息的包装,防止关键信息丢失从而导致Producer不能收到RPC消息响应。
代码案例
生产者向RequestTopic主题发送RPC消息,使用同步阻塞方式。发送方法也不是send方法,而是request方法(该方法会封装reply_to_client、correlation_id等关键信息),同时方法也提供了Message的返回值。
消费者接受主题消息发送RPC响应。收到响应后需要再做一次生产,使用工具类MessageUtil封装消息后进行响应消息发送。
RPC案例消息: 生产者打印如下,对比普通消息多了reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。
消费打如下,消费者同时需要响应RPC,对应的主题是DefaultCluster_REPLY_TOPIC。
在PRC方式中,生产者也可以使用异步方式发起,代码如下: