【微服务】RabbitMQ七种消息收发方式🌱
😄 不断学习才是王道
🔥 继续踏上学习之路,学之分享笔记
👊 总有一天我也能像各位大佬一样
🏆 一个有梦有戏的人 @怒放吧德德
💬什么是消息队列
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
☀️AMQP和JMS
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
🔶我们可以学习一下两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
🔨1、安装RabbitMQ
建议将其安装到docker中,本次是使用虚拟机centos7来安装
首先拉取镜像
docker pull rabbitmq:3-management
运行镜像
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
--name mq \
--hostname my-rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
-e RABBITMQ_DEFAULT_USER:指定用户名
-e RABBITMQ_DEFAULT_PASS:指定用户密码
👉输入http://ip:15672 就可以直接到达登录界面
登陆之后如:
介绍一下这几个大部分:💬
Overview:这里可以概览 RabbitMQ 的整体情况,如果是集群,也可以查看集群中各个节点的情况。包括 RabbitMQ 的端口映射信息等,都可以在这个选项卡中查看。
Connections:这个选项卡中是连接上 RabbitMQ 的生产者和消费者的情况。
Channels:这里展示的是“通道”信息,关于“通道”和“连接”的关系,松哥在后文再和大家详细介绍。
Exchange:这里展示所有的交换机信息。
Queue:这里展示所有的队列信息。
Admin:这里展示所有的用户信息。
✋2、RabbitMQ的七种消息收发方式
rabbit的架构:
图片来自公众号:江南一点雨
内容说明:💬
生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
交换机(Exchange):和生产者建立连接并接收生产者的消息。
消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
路由(Routes):交换机转发消息到队列的规则。
官网提供了七种的消息收发方式:💬
- Hello World
- Work queues
- Publish/Subscribe
Direct
Fanout
Topic
Header
- Routing
- Topics
- RPC
- Publisher Confirms
👉需要导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
👉在 application.yml 中配置 RabbitMQ 的基本连接信息
spring:
rabbitmq:
host: *.*.*.*
username: root
password: root
port: 5672
接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
🚩1、 Hello World
这个没有交换机,只需要定义消息的队列,其实这个是采用了默认交换机
接下来看看代码
队列的定义:
package com.lyd.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-24 11:46
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "hello_world_queue_name";
@Bean
Queue helloWorldQueue() {
//1. 第一个参数是队列的名字
//2。第二个参数是持久化
//3. 该队列是否具有排他性,有排他性的队列只能被创建其的 Connection 处理
//4。如果该队列没有消费者,那么是否自动删除该队列
return new Queue(QUEUE_NAME, true, false, false);
}
}
消费者监听队列:
package com.lyd.rabbitmq.receiver;
import com.lyd.rabbitmq.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-24 11:48
*/
@Component
public class MsgReceiver {
/**
* 通过 @RabbitListener 注解指定该方法监听的消息队列,该注解的参数就是消息队列的名字
* @param msg
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(Map<String, String> msg) {
System.out.println("msg = " + msg.get("str"));
}
}
向队列发送消息
队列消息发送可是任意类型
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void Hello() {
Map<String, String> map = new HashMap<>();
map.put("str", "map 类型可以用");
rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, map);
}
这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
🚩2、Work queues
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者.
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
先看一下消费者监听
package com.lyd.rabbitmq.receiver;
import com.lyd.rabbitmq.config.WorkQueueConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-30
*/
@Component
public class WorkQueueReceive {
@RabbitListener(queues = WorkQueueConfig.HELLO_WORLD_QUEUE_NAME)
public void receive(String msg) {
System.out.println("receive = " + msg);
}
@RabbitListener(queues = WorkQueueConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")
public void receive2(String msg) {
System.out.println("receive2 = " + msg+"------->"+Thread.currentThread().getName());
}
}
然后通过循环发送多个消息
@Test
void WorkQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(WorkQueueConfig.HELLO_WORLD_QUEUE_NAME, "hello");
}
}
就可以看看这十个消息分别的消费情况
可以看到,消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异),消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。
当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:
spring:
rabbitmq:
host: *.*.*.*
username: root
password: root
port: 5672
listener:
simple:
acknowledge-mode: manual # 有三种参数: auto(自动)、manual(手动)、none
🚩3、Publish/Subscribe
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
有四种交换机
- Direct
- Fanout
- Topic
- Header
🏁(1)、Direct
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
首先是交换机的配置
本次实验有两个队列一个交换机
package com.lyd.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: lyd
* @Description: Direct:这种路由策略,将消息队列绑定到 DirectExchange 上,当消息到达交换机的时候,消息会携带一个 routing_key,然后交换机会找到名为 routing_key 的队列,将消息路由过去
* @Date: 2022-07-24 12:08
*/
@Configuration
public class DirectConfig {
public static final String DIRECT_QUEUE_NAME = "direct_queue_name";
public static final String DIRECT_QUEUE_NAME2 = "direct_queue_name2";
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
@Bean
Queue directQueue() {
return new Queue(DIRECT_QUEUE_NAME, true, false, false);
}
@Bean
Queue directQueue2() {
return new Queue(DIRECT_QUEUE_NAME2, true, false, false);
}
@Bean
DirectExchange directExchange() {
//1. 交换机的名称
//2。交换机是否持久化
//3. 如果没有与之绑定的队列,是否删除交换机
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
/**
* 这个定义是将交换机和队列绑定起来
* @return
*/
@Bean
Binding directBinding1() {
return BindingBuilder
//设置绑定的队列
.bind(directQueue())
//设置绑定的交换机
.to(directExchange())
//设置 routing_key
.with(DIRECT_QUEUE_NAME);
}
@Bean
Binding directBinding2() {
return BindingBuilder
//设置绑定的队列
.bind(directQueue2())
//设置绑定的交换机
.to(directExchange())
//设置 routing_key
.with(DIRECT_QUEUE_NAME2);
}
}
消费者定义两个监听队列
package com.lyd.rabbitmq.receiver;
import com.lyd.rabbitmq.config.DirectConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-24 14:43
*/
@Component
public class DirectReceiver {
@RabbitListener(queues = DirectConfig.DIRECT_QUEUE_NAME)
public void msgHandler(String msg) {
System.out.println("msg2 = " + msg);
}
@RabbitListener(queues = DirectConfig.DIRECT_QUEUE_NAME2)
public void msgHandler2(String msg) {
System.out.println("msg2 = " + msg);
}
}
发送消息:
将消息发到指定的交换机中,根据routing key来指定向那个队列发送信息
@Test
void dirct() {
rabbitTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE_NAME, DirectConfig.DIRECT_QUEUE_NAME, "这条消息发给队列1");
rabbitTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE_NAME, DirectConfig.DIRECT_QUEUE_NAME2, "这条消息发给队列2");
}
可以看出消息已经正确发送并让消费者消费
🏁(2)、Fanout
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用.
交换机配置
本次实验有四个队列两个交换机
package com.lyd.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: lyd
* @Description: fanout 交换机会将到达交换机的所有消息路由到与他绑定的所有队列上面来, 不看routingkey,直接广播
* @Date: 2022-07-24 15:02
*/
@Configuration
public class FanoutConfig {
public static final String FANOUT_QUEUE_NAME = "fanout_queue_name";
public static final String FANOUT_QUEUE_NAME2 = "fanout_queue_name2";
public static final String FANOUT_QUEUE_NAME3 = "fanout_queue_name3";
public static final String FANOUT_QUEUE_NAME4 = "fanout_queue_name4";
public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange_name";
public static final String FANOUT_EXCHANGE_NAME2 = "fanout_exchange_name2";
@Bean
Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE_NAME, true, false, false);
}
@Bean
Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE_NAME2, true, false, false);
}
@Bean
Queue fanoutQueue3() {
return new Queue(FANOUT_QUEUE_NAME3, true, false, false);
}
@Bean
Queue fanoutQueue4() {
return new Queue(FANOUT_QUEUE_NAME4, true, false, false);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME, true, false);
}
@Bean
FanoutExchange fanoutExchange2() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME2, true, false);
}
@Bean
Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue())
.to(fanoutExchange());
}
@Bean
Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2())
.to(fanoutExchange());
}
@Bean
Binding fanoutBinding3() {
return BindingBuilder.bind(fanoutQueue3())
.to(fanoutExchange2());
}
@Bean
Binding fanoutBinding4() {
return BindingBuilder.bind(fanoutQueue4())
.to(fanoutExchange2());
}
}
在单元测试向交换机发送信息
此时不需要设置 routingkey ,因为广播对此是无效的,只要队列与广播交换机绑定了,都会向队列发送消息
@Test
void fanout() {
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, null, "hello fanout1!");
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME2, null, "hello fanout2!");
}
可见信息向交换机发送消息,交换机将收到的信息广播给与之绑定的队列中。
🏁(3)、Topic
TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。
交换机配置
package com.lyd.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: lyd
* @Description: TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,
* 当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上
* @Date: 2022-07-24 15:29
*/
@Configuration
public class TopicConfig {
public static final String XIAOMI_QUEUE_NAME = "xiaomi_queue_name";
public static final String HUAWEI_QUEUE_NAME = "huawei_queue_name";
public static final String PHONE_QUEUE_NAME = "phone_queue_name";
public static final String TOPIC_EXCHANGE_NAME = "topic_queue_name";
@Bean
Queue xiaomiQueue() {
return new Queue(XIAOMI_QUEUE_NAME, true, false, false);
}
@Bean
Queue huaweiQueue() {
return new Queue(HUAWEI_QUEUE_NAME, true, false, false);
}
@Bean
Queue phoneQueue() {
return new Queue(PHONE_QUEUE_NAME, true, false, false);
}
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomiQueue())
.to(topicExchange())
//这里的 # 是一个通配符,表示将来消息的 routing_key 只要是以 xiaomi 开头,都将被路由到 xiaomiQueue
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huaweiQueue())
.to(topicExchange())
//这里的 # 是一个通配符,表示将来消息的 routing_key 只要是以 xiaomi 开头,都将被路由到 xiaomiQueue
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phoneQueue())
.to(topicExchange())
//这里的 # 是一个通配符,表示将来消息的 routing_key 只要是以 xiaomi 开头,都将被路由到 xiaomiQueue
.with("#.phone.#");
}
}
在单元测试进行测试
@Test
void topic() {
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, "huawei.phone.news", "华为手机新闻");
rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME,"phone.news","手机新闻..");
}
运行结果
phone = 华为手机新闻
phone = 手机新闻..
huawei = 华为手机新闻
根据绑定的规则指向一定的routingkey
🏁(4)、Header
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关.
配置:
package com.lyd.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-24 16:00
*/
@Configuration
public class HeaderConfig {
public static final String HEADER_QUEUE_NAME_NAME = "header_queue_name_name";
public static final String HEADER_QUEUE_AGE_NAME = "header_queue_age_name";
public static final String HEADER_EXCHANGE_NAME = "header_exchange_name";
@Bean
Queue headerNameQueue() {
return new Queue(HEADER_QUEUE_NAME_NAME, true, false,false);
}
@Bean
Queue headerAgeQueue() {
return new Queue(HEADER_QUEUE_AGE_NAME, true, false,false);
}
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADER_EXCHANGE_NAME, true, false);
}
@Bean
Binding nameBinding() {
return BindingBuilder.bind(headerNameQueue())
.to(headersExchange())
//如果将来消息头部中包含 name 属性,就算匹配成功
.where("name").exists();
}
@Bean
Binding ageBinding() {
return BindingBuilder.bind(headerAgeQueue())
.to(headersExchange())
//将来头信息中必须要有 age 属性,并且 age 属性值为 99
.where("age")
.matches(99);
}
}
测试
发送两条消息,通过setHeader来设置头信息,以至于让交换机通过头信息来判断发往那个队列
@Test
void header() {
Message nameMsg = MessageBuilder.withBody("hello zhangsan".getBytes()).setHeader("name", "aaa").build();
rabbitTemplate.send(HeaderConfig.HEADER_EXCHANGE_NAME, null, nameMsg);
Message ageMsg = MessageBuilder.withBody("hello lisi 99".getBytes()).setHeader("age", 99).build();
rabbitTemplate.send(HeaderConfig.HEADER_EXCHANGE_NAME, null, ageMsg);
}
运行结果
nameMsgHandler >>> hello zhangsan
ageMsgHandler >>> hello lisi 99
🚩4、Routing
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。
🚩5、Topics
一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过
* 和关键字,对指定RoutingKey 内容,编写时注意格式
xxx.xxx.xxx 去编写。
如图:
🚩6、RPC
- 首先 Client 发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是 correlation_id,这个表示这条消息的唯一 id,还有一个内容是 reply_to,这个表示消息回复队列的名字。
- Server 从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到 reply_to 指定的回调队列中。
- Client 从回调队列中读取消息,就可以知道消息的执行情况是什么样子了。
需要配置两个交换机,一个是客户端发送消息交换机。采用手动ack方式,需要另一个交换机来进行消息确认先看一下yml配置:
spring:
rabbitmq:
host: *.*.*.*
username: root
password: root
port: 5672
listener:
simple:
acknowledge-mode: manual
publisher-confirm-type: correlated # 消息确认方式-通过 correlated 来确认
publisher-returns: true
解释: 前面都是日常,后面两行:首先是配置消息确认方式,我们通过 correlated 来确认,只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来。最后一行配置则是开启发送失败退回。
提供一个配置:
package com.lyd.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-30
*/
@Configuration
public class RPCRabbitConfig {
public static final String RPC_QUEUE1 = "queue_1";
public static final String RPC_QUEUE2 = "queue_2";
public static final String RPC_EXCHANGE = "rpc_exchange";
/**
* 设置消息发送RPC队列
*/
@Bean
Queue msgQueue() {
return new Queue(RPC_QUEUE1);
}
/**
* 设置返回队列
*/
@Bean
Queue replyQueue() {
return new Queue(RPC_QUEUE2);
}
/**
* 设置交换机
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(RPC_EXCHANGE);
}
/**
* 请求队列和交换器绑定
*/
@Bean
Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
}
/**
* 返回队列和交换器绑定
*/
@Bean
Binding replyBinding() {
return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
}
/**
* 使用 RabbitTemplate发送和接收消息
* 并设置回调队列地址
*/
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setReplyAddress(RPC_QUEUE2);
template.setReplyTimeout(6000);
return template;
}
/**
* 给返回队列设置监听器
*/
@Bean
SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RPC_QUEUE2);
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
}
编写客户端发送给消息
在 Spring Boot 中我们负责消息发送的工具是 RabbitTemplate,默认情况下,系统自动提供了该工具,但是这里我们需要对该工具重新进行定制,主要是添加消息发送的返回队列,最后我们还需要给返回队列设置一个监听器。
package com.lyd.rabbitmq.controller;
import com.lyd.rabbitmq.config.RPCRabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-30
*/
@RestController
public class RPCController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void hello(String message) {
//将来要发送的消息对象
Message msg = MessageBuilder.withBody(message.getBytes()).build();
//发送消息,方法的返回值就是 server 给出的响应
Message result = rabbitTemplate.sendAndReceive(RPCRabbitConfig.RPC_EXCHANGE, RPCRabbitConfig.RPC_QUEUE1, msg);
if (result != null) {
//发送的消息的 correlationId
String correlationId = msg.getMessageProperties().getCorrelationId();
//返回的消息的 correlationId
String spring_returned_message_correlation = (String) result.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
if (correlationId.equals(spring_returned_message_correlation)) {
System.out.println("收到服务端的响应:" + new String(result.getBody()));
}
}
}
}
消息发送调用 sendAndReceive 方法,该方法自带返回值,返回值就是服务端返回的消息。
服务端返回的消息中,头信息中包含了 spring_returned_message_correlation 字段,这个就是消息发送时候的 correlation_id,通过消息发送时候的 correlation_id 以及返回消息头中的 spring_returned_message_correlation 字段值,我们就可以将返回的消息内容和发送的消息绑定到一起,确认出这个返回的内容就是针对这个发送的消息的。
这就是整个客户端的开发,其实最最核心的就是 sendAndReceive 方法的调用。调用虽然简单,但是准备工作还是要做足够。例如如果我们没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来。
服务端进行消息接收后的返回
package com.lyd.rabbitmq.service;
import com.lyd.rabbitmq.config.RPCRabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author: lyd
* @Description:
* @Date: 2022-07-30
*/
@Component
public class RpcServer {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RPCRabbitConfig.RPC_QUEUE1)
public void process(Message message) {
byte[] body = message.getBody();
//这是服务端要返回的消息
Message build = MessageBuilder.withBody(("我是服务端,我收到了客户端发来的:" + new String(body)).getBytes()).build();
CorrelationData correlationData = new CorrelationData(message.getMessageProperties().getCorrelationId());
rabbitTemplate.sendAndReceive(RPCRabbitConfig.RPC_EXCHANGE, RPCRabbitConfig.RPC_QUEUE2, build, correlationData);
}
}
- 服务端首先收到消息并打印出来。
- 服务端提取出原消息中的 correlation_id。
- 服务端调用 sendAndReceive 方法,将消息发送给 RPC_QUEUE2 队列,同时带上 correlation_id 参数。
启动两个服务,接下来通过postman测试
运行结果:(在客户端中打印出信息)
🚩7、Publisher Confirms
发布者确认(Publisher Confirms)特性用以实现消息的可靠投递,在解决消息可靠性的问题时,有两种方式:事务和消息确认。
对于消息是否被成功消费,可以使用这种方式——消息确认机制。消息确认分为:自动确认和手动确认。
:warning:这里推荐一篇博客:官方 RabbitMQ 教程 - 7 Publisher Confirms
学习路径: B站 -> 江南一点雨
😄 不断学习才是王道
🔥 继续踏上学习之路,学之分享笔记
👊 总有一天我也能像各位大佬一样
🏆 一个有梦有戏的人 @怒放吧德德
👍如有错欢迎指出