RocketMq-Request-Reply消息

简介: RocketMq-Request-Reply消息

什么是Request-Reply?


RocketMQ 中"Request-Reply"模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程。


RocketMQ从4.6.0版本开始支持这种模式。这种模式的流程如下图:

image.png



与RPC的不同


RocketMQ的这种调用方式跟dubbo之类的RPC调用非常类似,那为什么不使用dubbo?而要使用RocketMQ的这种RPC调用呢?原因如下:


  1. 基于RocketMQ来实现RPC可以快速搭建服务的消息总线,实现自己的RPC框架。


  1. 基于RocketMQ来实现RPC可以方便的收集调用的相关信息,能够实现调用链路追踪和分析。


  1. 基于RocketMQ来实现RPC既可以解耦两个系统之间的依赖,也可以实现跨网络区域实现系统间的同步调用,这里RocketMQ扮演的是一个类似于网关的角色。



Request-Reply的实现逻辑


image.png


在以上图中,可以看到,使用RPC模式还是三方:Producer、Broker、Consumer。在Producer中进行消息的发送时,可以随便指定Topic,但是需要送入reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。


而在进行发送消息时,也有两种模式,一种同步阻塞,另外一种异步非阻塞,这些跟之前普通消息的三种发送方式类似。Broker端除了Producer发送时指定的Topic之外,还有一个Reply_Topic,这个以集群名_REPLY_TOPIC命名(不管RPC生产者主题有多少,这个在一个集群中只有一个),主要用于Consumer响应RPC消息的路由发现。Consumer端除了消费监听之外,还需要加入一个消息的生产(用于RPC的响应消息),必须使用客户端提供的MessageUtil进行消息的包装,防止关键信息丢失从而导致Producer不能收到RPC消息响应。




代码案例


生产者向RequestTopic主题发送RPC消息,使用同步阻塞方式。发送方法也不是send方法,而是request方法(该方法会封装reply_to_client、correlation_id等关键信息),同时方法也提供了Message的返回值。


image.png


消费者接受主题消息发送RPC响应。收到响应后需要再做一次生产,使用工具类MessageUtil封装消息后进行响应消息发送。


image.png

RPC案例消息: 生产者打印如下,对比普通消息多了reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。


image.png


消费打如下,消费者同时需要响应RPC,对应的主题是DefaultCluster_REPLY_TOPIC。


image.png


在PRC方式中,生产者也可以使用异步方式发起,代码如下:

image.png



相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 RocketMQ
RocketMQ报错:MQClientException:no route info of this topic的解决
RocketMQ报错:MQClientException:no route info of this topic的解决
138 0
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
2065 1
rocketmq客户端发送消息报错和超时问题
|
3天前
|
消息中间件 Kubernetes Java
MQ产品使用合集之RocketMQ发消息失败了,proxy报connect to null failed如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
21 2
MQ产品使用合集之RocketMQ发消息失败了,proxy报connect to null failed如何解决
|
4天前
|
JSON 物联网 数据格式
什么是MQTT 遗嘱消息(Will Message)
【2月更文挑战第17天】
140 7
什么是MQTT 遗嘱消息(Will Message)
|
12月前
|
消息中间件 Java 网络安全
rocketmq发送消息报错
rocketmq发送消息报错
|
域名解析 消息中间件 Kubernetes
【消息队列】解决ERR 1 [topic/channel] (: no such host
【消息队列】解决ERR 1 [topic/channel] (: no such host
243 0
|
消息中间件 RocketMQ
|
消息中间件 编解码 算法
RocketMQ msgId生成算法
RocketMQ msgId生成算法
RocketMQ msgId生成算法
|
消息中间件
消息队列 MQ——名称 含义 Message 消息
消息队列 MQ——名称 含义 Message 消息自制脑图
92 0
消息队列 MQ——名称 含义 Message 消息
|
消息中间件 Kafka
Error when sending message to topic wyh-elk-kafka-topic with key: null
Error when sending message to topic wyh-elk-kafka-topic with key: null