一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费

简介: 一文看懂Spring Boot整合Rabbit MQ实现多种模式的生产和消费

1 RabbitMQ介绍

1.1 各个组件的层级关系

1.2 RabbitMQ的六种模式

2 Spring Boot整合RabbitMQ

2.1 RabbitMQ配置
2.1.1 添加用户

2.1.2 添加虚拟机

2.1.3 分配权限

2.1.4 添加队列

2.2 Spring Boot工程搭建
2.2.1 依赖
<!--   spring boot-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!--  rabbitmq  amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--   web -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--    test-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
2.2.2 配置文件
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: test
    password: 123
    virtual-host: /test
2.3 各个模式测试
2.3.1 HelloWorld

生产者:

@Component
public class Test01 {
    /**
     * 注入rabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Hello world
     */
    public void testHello() {
        /**
         * 参数1: 消息队列的名字
         * 参数2: 发送的消息
         */
        rabbitTemplate.convertAndSend("sp_test", "hello world");
    }
}

消费者:

@Component
public class Consumer {
    /**
     * 一个消费者
     *
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("sp_test"))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }
}

测试:

@SpringBootTest
class SpRabbitmqApplicationTests {
    @Autowired
    private Test01 test01;
    @Test
    void producerTest01() {
        test01.testHello();
    }
}

2.3.2 Work Queues

生产者:

@Component
public class Test01 {
    /**
     * 注入rabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Work
     */
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("sp_test", "work模型" + i);
        }
    }
}

消费者:

@Component
public class Consumer {
    /**
     * 一个消费者
     *
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("sp_test"))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }
    /**
     * 一个消费者
     *
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("sp_test"))
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

测试:

@Component
public class Consumer {
    /**
     * 一个消费者
     *
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("sp_test"))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }
    /**
     * 一个消费者
     *
     * @param message
     */
    @RabbitListener(queuesToDeclare = @Queue("sp_test"))
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

2.3.3 Public

新建路由:

生产者:

@Component
public class Test01 {
    /**
     * 注入rabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Fanout 广播
     */
    public void testFanout() {
        /**
         *  参数1: 交换机名称
         *  参数2: routingKey
         *  参数3: message
         * */
        rabbitTemplate.convertAndSend("test_fanout", "", "Fanout的模型发送的消息");
    }
}

消费者:

@Component
public class Consumer2 {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "test_fanout", type = "fanout")  //绑定的交换机
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "test_fanout", type = "fanout")  //绑定的交换机
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

测试:

@SpringBootTest
class SpRabbitmqApplicationTests {
    @Autowired
    private Test01 test01;
    @Test
    void producerTest01() {
        test01.testFanout();
    }
}

2.3.4 Routing

创建交换机:

生产者:

@Component
public class Test01 {
    /**
     * 注入rabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Route 路由模式
     */
    public void testRoute() {
        /**
         *  参数1: 交换机名称
         *  参数2: 路由key
         *  参数3: 发送的信息
         * */
         rabbitTemplate.convertAndSend("test_routing", "info", "发送info的key的路由信息");
    }
}

消费者:

@Component
public class Consumer3 {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "test_routing", type = "direct"),//自定交换机名称和类型
                    key = {"info", "error", "warn"}
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "test_routing", type = "direct"),
                    key = {"error"}
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

测试:

@SpringBootTest
class SpRabbitmqApplicationTests {
    @Autowired
    private Test01 test01;
    @Test
    void producerTest01() {
        test01.testRoute();
    }
}

2.3.5 Topics

添加交换机:

生产者:

@Component
public class Test01 {
    /**
     * 注入rabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Topic 动态路由  订阅模式
     */
    public void testTopic() {
        /**
         *  参数1: 交换机名称
         *  参数2: 路由key
         *  参数3: 发送的消息
         * */
        rabbitTemplate.convertAndSend("test_topic", "ymx.name.Mr_YanMingXin", "ymx路由消息");
    }
}

消费者:

@Component
public class Consumer4 {
    /**
     * "*"代表一个单词,"#"代表0到多个单词
     *
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(type = "topic", name = "test_topic"),
                    key = {"ymx.name", "ymx.name.*"}
            )
    })
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(type = "topic", name = "test_topic"),
                    key = {"ymx.#", "ymx.name.#"}
            )
    })
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

测试:

@SpringBootTest
class SpRabbitmqApplicationTests {
    @Autowired
    private Test01 test01;
    @Test
    void producerTest01() {
        test01.testTopic();
    }
}

3 小总结下

  • 消费者是不需要手动去调用的,只需要写上一个带参数的方法和打上@RabbitListener注解,在项目运行时就可以自己进行消费了
  • public中之所以没有指定routingKey是因为这种模式下会向全部的消费者发送消息,写上也没什么意义
  • @RabbitListener可以写在方法上也可以写在类上,写在类上时,方法上就要打上@RabbitHandler注解
  • 还有其他需要注意的欢迎交流~


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
9天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
40 3
|
5月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
87 2
|
5月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
1044 3
|
5月前
|
消息中间件 Java Maven
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建