RocketMq-Request-Reply消息

简介: RocketMq-Request-Reply消息

什么是Request-Reply?


RocketMQ 中"Request-Reply"模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程。


RocketMQ从4.6.0版本开始支持这种模式。这种模式的流程如下图:

image.png



与RPC的不同


RocketMQ的这种调用方式跟dubbo之类的RPC调用非常类似,那为什么不使用dubbo?而要使用RocketMQ的这种RPC调用呢?原因如下:


  1. 基于RocketMQ来实现RPC可以快速搭建服务的消息总线,实现自己的RPC框架。


  1. 基于RocketMQ来实现RPC可以方便的收集调用的相关信息,能够实现调用链路追踪和分析。


  1. 基于RocketMQ来实现RPC既可以解耦两个系统之间的依赖,也可以实现跨网络区域实现系统间的同步调用,这里RocketMQ扮演的是一个类似于网关的角色。



Request-Reply的实现逻辑


image.png


在以上图中,可以看到,使用RPC模式还是三方:Producer、Broker、Consumer。在Producer中进行消息的发送时,可以随便指定Topic,但是需要送入reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。


而在进行发送消息时,也有两种模式,一种同步阻塞,另外一种异步非阻塞,这些跟之前普通消息的三种发送方式类似。Broker端除了Producer发送时指定的Topic之外,还有一个Reply_Topic,这个以集群名_REPLY_TOPIC命名(不管RPC生产者主题有多少,这个在一个集群中只有一个),主要用于Consumer响应RPC消息的路由发现。Consumer端除了消费监听之外,还需要加入一个消息的生产(用于RPC的响应消息),必须使用客户端提供的MessageUtil进行消息的包装,防止关键信息丢失从而导致Producer不能收到RPC消息响应。




代码案例


生产者向RequestTopic主题发送RPC消息,使用同步阻塞方式。发送方法也不是send方法,而是request方法(该方法会封装reply_to_client、correlation_id等关键信息),同时方法也提供了Message的返回值。


image.png


消费者接受主题消息发送RPC响应。收到响应后需要再做一次生产,使用工具类MessageUtil封装消息后进行响应消息发送。


image.png

RPC案例消息: 生产者打印如下,对比普通消息多了reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。


image.png


消费打如下,消费者同时需要响应RPC,对应的主题是DefaultCluster_REPLY_TOPIC。


image.png


在PRC方式中,生产者也可以使用异步方式发起,代码如下:

image.png



相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Java Spring
RocketMQ-JAVA客户端不同版本接入方式
RocketMQ4.0 RocketMQ5.0 JAVA接入 spring springboot
RocketMQ-JAVA客户端不同版本接入方式
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
5024 1
rocketmq客户端发送消息报错和超时问题
|
7月前
|
消息中间件 存储 人工智能
Apache RocketMQ for AI 战略升级,开启 AI MQ 新时代
Apache RocketMQ 顺应AIGC浪潮,针对长时会话、稀缺算力调度及AI Agent协作等挑战,推出专为AI时代打造的消息引擎。通过“会话即主题”的Lite-Topic机制,实现百万级队列动态管理,保障会话连续性与断点续传;结合智能资源调度能力,如定速消费与优先级队列,提升算力利用率与服务公平性;同时构建高效异步通信枢纽,支撑Agent-to-Agent及AI工作流的非阻塞协同。已在阿里集团与阿里云多个AI产品中大规模验证,助力开发者构建稳定、高效、可扩展的AI应用基础设施。
|
缓存 Java Android开发
【OOM异常排查经验】
【OOM异常排查经验】
928 0
|
消息中间件 存储 Apache
消息中间件使用规范(RocketMQ)
消息中间件使用规范(RocketMQ)
6100 94
|
消息中间件 负载均衡 算法
【消息中间件】RocketMQ消息发送-请求与响应
前面的文章介绍了,RocketMQ的搭建,以及RocketMQ的NameServer,接下来我们配合着官方提供的demo,进行实际的消息发送学习,主要学习发送方式、发送参数的含义,以及发送中的一些问题
|
消息中间件 存储 算法
RocketMQ核心知识点整理,收藏再看!
RocketMQ核心知识点整理,收藏再看!
1799 0
RocketMQ核心知识点整理,收藏再看!
|
消息中间件 存储 监控
RocketMQ Tag 详解!
本文详细介绍了 RocketMQ 中 Tag 的原理及其应用场景。Tag 是一种消息过滤机制,允许生产者在发送消息时指定标签,消费者据此选择性消费。文章通过源码分析展示了 Tag 在消息发送、存储及消费阶段的作用,并提供了完整的示例代码。尽管 Tag 功能简单高效,但也存在单一维度过滤等局限性。适合需要高效、低延迟消息传递的场景,如日志监控、电商系统等。
1807 2
|
SQL Java 关系型数据库
mybatis-plus启动时自动执行sql脚本
mybatis-plus启动时自动执行sql脚本
461 1
|
消息中间件 Java RocketMQ
【已解决】RocketMq使用报错
【已解决】RocketMq使用报错
964 0