搭建环境
RabbitMQ安装成功后下边我们编写消息发送与消息接收程序实现收发消息,如下图:publisher即消息发送者将消息发送到MQ的队列中,consumer即消息消费者从MQ中接收消息。
RabbitMQ通信采用了AMQP (Advanced Message Queuing Protocol) 协议,因此它具备跨语言的特性,任何语言只要遵循AMQP协议都可以使用RabbitMQ收发消息。
RabbitMQ官方也提供了各种不同语言的客户端API,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
下边使用SpringAMQP实现消息收发,上图是RabbitMQ最简单的工作模型,我们仅作测试使用,这种模式一般很少在生产中使用。
在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:
将其复制到你的工作空间,然后用Idea打开,项目结构如图:
包括三部分:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
在mq-demo这个父工程中,已经配置好了SpringAMQP相关的依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.itcast.demo</groupId> <artifactId>mq-demo</artifactId> <version>1.0-SNAPSHOT</version> <modules> <module>publisher</module> <module>consumer</module> </modules> <packaging>pom</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> <relativePath/> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--单元测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
设置java版本
因此,子工程中就可以直接使用SpringAMQP了。
2.3.2 创建队列
首先进入RabbitMQ控制台创建队列,新建一个队列:simple.queue
添加成功:
接下来,我们就可以利用Java代码收发消息了。
2.3.3 消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置:
spring: rabbitmq: host: 192.168.101.68 # 你的虚拟机IP port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itheima # 用户名 password: 123321 # 密码
然后在publisher服务的test下编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
package com.itheima.publisher.amqp; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest @Slf4j public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); log.info("消息发送成功:{}",message); } }
打开控制台,可以看到消息已经发送到队列中:
接下来,我们再来实现消息接收。
我们可以在RabbitMQ的控制台去查看消息
2.3.4 消息接收
首先配置MQ地址,在consumer服务的application.yml中添加配置:
spring: rabbitmq: host: 192.168.101.68 # 你的虚拟机IP port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itheima # 用户名 password: 123321 # 密码
然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:
package com.itheima.consumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitListener { // 利用RabbitListener来声明要监听的队列信息 // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。 // 可以看到方法体中接收的就是消息体的内容 @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
2.3.5 测试
测试流程:
- 启动consumer服务
- 在publisher服务中运行测试代码,发送MQ消息。
- 观察consumer控制台日志收到消息:
2.3.6 推模式与拉模式
在RabbitMQ中,消息传递给消费者的方式有两种:推模式(Push)和拉模式(Pull)。
- 推模式(Push):这是最常用的模式,在这种模式下,一旦消费者订阅了一个队列,RabbitMQ就会自动将队列中的消息发送给消费者,这种方式不需要消费者持续地询问是否有新的消息,而是由Broker在有消息时主动发送给消费者。
- 拉模式(Pull):在这种模式下,消费者需要主动请求Broker来获取消息。
在实践中,推模式更常见,因为它可以减少消费者的网络负载,并且可以让Broker更好地控制消息的传递速率。然而,拉模式也有其应用场景,比如当消费者想要精确控制消息获取的时候。
RabbitMQ默认采用的是推模式,但同时也提供了拉模式的支持,以满足不同的应用场景需求。