RocketMq-Request-Reply消息

简介: RocketMq-Request-Reply消息

什么是Request-Reply?


RocketMQ 中"Request-Reply"模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程。


RocketMQ从4.6.0版本开始支持这种模式。这种模式的流程如下图:

image.png



与RPC的不同


RocketMQ的这种调用方式跟dubbo之类的RPC调用非常类似,那为什么不使用dubbo?而要使用RocketMQ的这种RPC调用呢?原因如下:


  1. 基于RocketMQ来实现RPC可以快速搭建服务的消息总线,实现自己的RPC框架。


  1. 基于RocketMQ来实现RPC可以方便的收集调用的相关信息,能够实现调用链路追踪和分析。


  1. 基于RocketMQ来实现RPC既可以解耦两个系统之间的依赖,也可以实现跨网络区域实现系统间的同步调用,这里RocketMQ扮演的是一个类似于网关的角色。



Request-Reply的实现逻辑


image.png


在以上图中,可以看到,使用RPC模式还是三方:Producer、Broker、Consumer。在Producer中进行消息的发送时,可以随便指定Topic,但是需要送入reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。


而在进行发送消息时,也有两种模式,一种同步阻塞,另外一种异步非阻塞,这些跟之前普通消息的三种发送方式类似。Broker端除了Producer发送时指定的Topic之外,还有一个Reply_Topic,这个以集群名_REPLY_TOPIC命名(不管RPC生产者主题有多少,这个在一个集群中只有一个),主要用于Consumer响应RPC消息的路由发现。Consumer端除了消费监听之外,还需要加入一个消息的生产(用于RPC的响应消息),必须使用客户端提供的MessageUtil进行消息的包装,防止关键信息丢失从而导致Producer不能收到RPC消息响应。




代码案例


生产者向RequestTopic主题发送RPC消息,使用同步阻塞方式。发送方法也不是send方法,而是request方法(该方法会封装reply_to_client、correlation_id等关键信息),同时方法也提供了Message的返回值。


image.png


消费者接受主题消息发送RPC响应。收到响应后需要再做一次生产,使用工具类MessageUtil封装消息后进行响应消息发送。


image.png

RPC案例消息: 生产者打印如下,对比普通消息多了reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。


image.png


消费打如下,消费者同时需要响应RPC,对应的主题是DefaultCluster_REPLY_TOPIC。


image.png


在PRC方式中,生产者也可以使用异步方式发起,代码如下:

image.png



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 RocketMQ
RocketMQ报错:MQClientException:no route info of this topic的解决
RocketMQ报错:MQClientException:no route info of this topic的解决
422 0
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
3641 1
rocketmq客户端发送消息报错和超时问题
|
5月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
133 1
|
8月前
|
消息中间件 Kubernetes Java
MQ产品使用合集之RocketMQ发消息失败了,proxy报connect to null failed如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
928 2
MQ产品使用合集之RocketMQ发消息失败了,proxy报connect to null failed如何解决
|
6月前
|
消息中间件 RocketMQ
【RocketMQ系列四】消息示例-简单消息的实现
【RocketMQ系列四】消息示例-简单消息的实现
47 0
|
7月前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
JSON 物联网 数据格式
什么是MQTT 遗嘱消息(Will Message)
【2月更文挑战第17天】
538 7
什么是MQTT 遗嘱消息(Will Message)
|
消息中间件 存储 Java
RabbitMQ之headers(头部)Exchange解读
RabbitMQ之headers(头部)Exchange解读
|
消息中间件 RocketMQ
|
域名解析 消息中间件 Kubernetes
【消息队列】解决ERR 1 [topic/channel] (: no such host
【消息队列】解决ERR 1 [topic/channel] (: no such host
337 0