SpringBoot RabbitMq 六大模式

简介: 依赖、配置依赖:<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>

依赖、配置

依赖:

<dependency>

  <groupId>org.springframework.boot</groupId>

  <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>


配置:


spring:

 rabbitmq:

   host: 192.168.31.10

   port: 5672 #通过控制台可以查看

   username: guest

   password: guest

   virtual-host: /vhost_sys_logs #可以不配置,会使用的是默认virtual-host

简单队列

模型

8a6426db2b60452fbf78504d69dccf5e.png

简单队列,consumer和producer通过队列直连。

代码示例

配置类:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue() {
        //name,名字;durable,是否开启持久化
        return new Queue("logs",false);
    }
}

生产者:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("logs","helo world!");
    }
}

消费者:

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"logs"})
    public void getMsg(String message){
        log.info(message);
    }
}

工作队列

模型a7caab6b0ec84d77a6fc89868204584c.png

工作队列(work queue),让多个消费者去消费同一个消息队列中的消息,支持轮询分发(默认)、公平分发两种分发模式。

代码示例

配置类:

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue() {
        //name,名字;durable,是否开启持久化
        return new Queue("logs",false);
    }
}


生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("logs","helo world 01!");
        rabbitTemplate.convertAndSend("logs","helo world 02!");
    }
}

消费者:

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"logs"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"logs"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

订阅模式

模型

9786507cef9c4beeaf8ed021da2558c9.png

订阅模式(fanout),也叫广播模式,见名知意,其特点是将消息广播出去。通过交换机将生产者生产的消息分发到多个队列中去,从而支持生产者生产的一个消息被多个消费者消费。

代码示例

配置类:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue_01() {
        //name,名字;durable,是否开启持久化
        return new Queue("queue_01",false);
    }
    @Bean
    public Queue queue_02(){
        return new Queue("queue_02",false);
    }
    //订阅模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_exchange",false,false);
    }
    //将队列绑定到交换机上
    @Bean
    public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logsAccess).to(fanoutExchange);
    }
    @Bean
    public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logsError).to(fanoutExchange);
    }
}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("fanout_exchange","","helo world 01!");
    }
}

消费者:

@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"queue_01"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"queue_02"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

路由模式

模型

9b9825acb8b04fe98b03096752b91e93.png

路由模式(direct),在订阅模式支持一条消息被多个消费者消费的特性上增加了分类投递的特性,通过交换机,支持消息以类别(routing key)的方式投送到不同的消息队列中去。

代码示例

配置类:

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue_01() {
        //durable,是否开启持久化
        return new Queue("queue_01",false);
    }
    @Bean
    public Queue queue_02(){
        return new Queue("queue_02",false);
    }
    //路由模式的交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_exchange",false,false);
    }
    //将队列绑定到交换机上
    @Bean
    public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, DirectExchange directExchange) {
        return BindingBuilder.bind(logsAccess).to(directExchange).with("routing_key_01");
    }
    @Bean
    public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, DirectExchange directExchange) {
        return BindingBuilder.bind(logsError).to(directExchange).with("routing_key_02");
    }
}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("direct_exchange","routing_key_01","helo world!");
    }
}

消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"queue_01"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"queue_02"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

主题模式

模型

7064eba2b47e40b09b7a29cec6f6e09d.png

主题模式,也叫通配符模式,在路由模式以类别进行消息投送的基础上增加了对通配符的支持,这样就可以使用通配符将多个类别聚合成一个主题。

代码示例

配置类:

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue queue_01() {
        //durable,是否开启持久化
        return new Queue("queue_01",false);
    }
    @Bean
    public Queue queue_02(){
        return new Queue("queue_02",false);
    }
    //主题模式的交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topic_exchange",false,false);
    }
    //将队列绑定到交换机上
    @Bean
    public Binding bindingSmsQueue_01(@Qualifier("queue_01") Queue logsAccess, TopicExchange topicExchange) {
        return BindingBuilder.bind(logsAccess).to(topicExchange).with("#.error.#");
    }
    @Bean
    public Binding bindingSmsQueue_02(@Qualifier("queue_02") Queue logsError, TopicExchange topicExchange) {
        return BindingBuilder.bind(logsError).to(topicExchange).with("#.info.#");
    }
}

生产者:

@SpringBootTest(classes=PrivilegeSystemMain.class)
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void simpleTest() {
        rabbitTemplate.convertAndSend("topic_exchange","test.error.test","helo world!");
    }
}

消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ConsumeBean {
    @RabbitListener(queues={"queue_01"})
    public void consumer_01(String message){
        log.info("consumer_01 get message "+message);
    }
    @RabbitListener(queues={"queue_02"})
    public void consumer_02(String message){
        log.info("consumer_02 get message "+message);
    }
}

RPC

几乎不会使用,略......

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
14天前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
1月前
|
消息中间件 Java 网络架构
|
1月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
1月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
43 2
|
1月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
40 1
|
1月前
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
291 2
|
1月前
|
消息中间件 Java Maven
|
29天前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
2月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
1月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
39 0