MQ收发消息

简介: 本文介绍如何使用SpringAMQP实现RabbitMQ消息收发。RabbitMQ基于AMQP协议,支持跨语言通信。通过SpringBoot整合SpringAMQP,可快速实现消息发送与接收。文中演示了创建队列、配置生产者与消费者、使用RabbitTemplate发送消息及@RabbitListener监听消息的完整流程,并简要对比了推模式与拉模式的应用场景。

搭建环境

RabbitMQ安装成功后下边我们编写消息发送与消息接收程序实现收发消息,如下图:publisher即消息发送者将消息发送到MQ的队列中,consumer即消息消费者从MQ中接收消息。

RabbitMQ通信采用了AMQP (Advanced Message Queuing Protocol) 协议,因此它具备跨语言的特性,任何语言只要遵循AMQP协议都可以使用RabbitMQ收发消息。

RabbitMQ官方也提供了各种不同语言的客户端API,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

下边使用SpringAMQP实现消息收发,上图是RabbitMQ最简单的工作模型,我们仅作测试使用,这种模式一般很少在生产中使用。

在课前资料给大家提供了一个Demo工程,方便我们学习SpringAMQP的使用:

将其复制到你的工作空间,然后用Idea打开,项目结构如图:

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者

在mq-demo这个父工程中,已经配置好了SpringAMQP相关的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.itcast.demo</groupId>
  <artifactId>mq-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <modules>
    <module>publisher</module>
    <module>consumer</module>
  </modules>
  <packaging>pom</packaging>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.12</version>
    <relativePath/>
  </parent>
  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
  </dependencies>
</project>

设置java版本

因此,子工程中就可以直接使用SpringAMQP了。

2.3.2 创建队列

首先进入RabbitMQ控制台创建队列,新建一个队列:simple.queue

添加成功:

接下来,我们就可以利用Java代码收发消息了。

2.3.3 消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在publisher服务的test下编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
        log.info("消息发送成功:{}",message);
    }
}

打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。

我们可以在RabbitMQ的控制台去查看消息

2.3.4 消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itheima # 用户名
    password: 123321 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

2.3.5 测试

测试流程:

  • 启动consumer服务
  • 在publisher服务中运行测试代码,发送MQ消息。
  • 观察consumer控制台日志收到消息:

2.3.6 推模式与拉模式

在RabbitMQ中,消息传递给消费者的方式有两种:推模式(Push)和拉模式(Pull)。

  1. 推模式(Push):这是最常用的模式,在这种模式下,一旦消费者订阅了一个队列,RabbitMQ就会自动将队列中的消息发送给消费者,这种方式不需要消费者持续地询问是否有新的消息,而是由Broker在有消息时主动发送给消费者。
  2. 拉模式(Pull):在这种模式下,消费者需要主动请求Broker来获取消息。

在实践中,推模式更常见,因为它可以减少消费者的网络负载,并且可以让Broker更好地控制消息的传递速率。然而,拉模式也有其应用场景,比如当消费者想要精确控制消息获取的时候。

RabbitMQ默认采用的是推模式,但同时也提供了拉模式的支持,以满足不同的应用场景需求。

相关文章
|
3月前
|
数据采集 编解码 自动驾驶
世界模型 LingBot-World,正式开源!
蚂蚁灵波团队开源世界模型LingBot-World,专为交互式仿真设计。其核心LingBot-World-Base具备高保真、强动态、长时序一致性(支持近10分钟稳定生成)和实时交互能力(≈16FPS,延迟<1秒),依托可扩展数据引擎,从游戏环境学习物理与因果规律,打造具身智能、自动驾驶等领域的“数字演练场”。
997 1
|
4月前
|
JSON NoSQL 关系型数据库
【技术选型】MongoDB vs MySQL:一场没有输家的“双雄对决”
本文深入对比MySQL与MongoDB的核心差异,从理念、性能到实战场景。MySQL严谨规范,适合高一致性业务;MongoDB灵活高效,契合多变需求。通过电商案例解析,揭示两者互补而非替代的关系,帮助开发者按场景选型,实现技术价值最大化。
|
NoSQL 关系型数据库 MySQL
什么时候使用MongoDB而不是MySql
MongoDB与MySQL对比:MongoDB适合非结构化数据、高并发读写、地理空间数据处理、实时分析和嵌入式应用,因其面向文档、高扩展性和地理空间索引功能。而MySQL在结构化数据、事务处理和严格一致性场景下更具优势。选择取决于具体需求。
1046 7
|
4月前
|
应用服务中间件 微服务
微服务雪崩问题
高并发下商品服务占用过多Tomcat连接,可能导致接口延迟或阻塞,进而影响依赖它的购物车服务,引发连锁反应。若不加控制,将导致整个微服务集群雪崩。微服务保护旨在防止此类级联失败,保障系统稳定。
|
4月前
|
消息中间件 存储 Java
消息中间件RabbitMQ(高级)
本文深入探讨RabbitMQ在生产环境中的高级应用,涵盖消息可靠性、延迟消息、消息堆积及集群高可用等核心问题。通过生产者确认、持久化、消费者确认机制确保消息不丢失;利用TTL与死信交换机实现延迟队列;借助惰性队列提升堆积能力;最后通过普通集群、镜像集群及仲裁队列实现高可用架构。
 消息中间件RabbitMQ(高级)
|
3月前
|
前端开发 Java Nacos
application.yml和bootstrap.yml这两个配置文件有什么区别?
`bootstrap.yml` 与 `application.yml` 是 Spring Boot/Cloud 项目中的两类配置文件。前者用于应用启动前加载,主要配置远程配置中心(如 Nacos)、加密等关键信息,优先级高;后者是默认主配置,用于常规配置如端口、数据库等。自 Spring Boot 2.4+ 起,默认不再启用 `bootstrap` 机制,推荐使用 `spring.config.import` 在 `application.yml` 中统一导入配置,以简化流程、降低复杂度。纯 Spring Boot 应用不加载 `bootstrap.yml`。
556 0
|
4月前
|
缓存 安全
String,StringBuilder 和 StringBuffer 的区别
String不可变,StringBuilder与StringBuffer可变;后者线程不安全,StringBuffer线程安全。大量拼接时优先选用后两者,多线程用StringBuffer,单线程用StringBuilder。String因final设计保证不可变,利于安全与缓存。
|
11月前
|
存储 缓存 NoSQL
Redis中的常用命令-get&set&keys&exists&expire&ttl&type的详细解析
总的来说,这些Redis命令提供了处理存储在内存中的键值对的便捷方式。通过理解和运用它们,你可以更有效地在Redis中操作数据,使其更好地服务于你的应用。
566 17
|
Docker 容器
docker --cpus 详解
docker --cpus 详解
|
SQL 分布式计算 安全
jps查看进程出现「xxxx -- process information unavailable」
jps查看进程出现「xxxx -- process information unavailable」
2282 0
jps查看进程出现「xxxx -- process information unavailable」

热门文章

最新文章