RabbitMQ的工作模式

简介: 前言RabbitMQ 是一个流行的开源消息代理,它实现了 AMQP(高级消息队列协议)标准,提供了可靠的消息传递机制。RabbitMQ 支持五种不同的工作模式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式,每种模式都适用于不同的应用场景。在本篇博客中,我们将详细介绍这五种工作模式的原理和使用方法,帮助读者更好地理解 RabbitMQ,并且在实践中选择合适的工作模式来处理不同的消息传递需求。

前言

RabbitMQ 是一个流行的开源消息代理,它实现了 AMQP(高级消息队列协议)标准,提供了可靠的消息传递机制。RabbitMQ 支持五种不同的工作模式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式,每种模式都适用于不同的应用场景。在本篇博客中,我们将详细介绍这五种工作模式的原理和使用方法,帮助读者更好地理解 RabbitMQ,并且在实践中选择合适的工作模式来处理不同的消息传递需求。

1. 工作模式概念

一、simple模式(即最简单的收发模式)  

1. 消息产生消息,将消息放入队列

2. 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢

出)。

720e89c2ede546b2bb4e3759976b4c0f.png

二、work工作模式(资源的竞争)

工作队列模式也称为任务队列模式。在这种模式下,有多个消费者从同一个队列中获取消息并进行处理。消息被平均分配给不同的消费者进行处理,确保每个消息只被处理一次。

1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。

C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某

一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者

使用)。

db70703ea97b4984917316d2f0667b3a.png

三.publish/subscribe发布订阅(共享资源)

1. 每个消费者监听自己的队列;

2. 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的

队列都将接收到消息。

c92c9d9e041745c9985878eaf2a25056.png

四.routing路由模式

1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符

(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消

息;

2. 根据业务功能定义路由字符串

3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4. 业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误

99959d2053f94af892470b64825e0092.png

五.topic 主题模式(路由模式的一种)

2. 场景和示例

RabbitMQ 的简单模式

是最基础的模式,它只有一个生产者将消息发送到一个队列中,一个消费者从队列中接收消息并进行处理。以下是简单模式的基本概念和应用场景:


基本概念:


消息队列:用于存储消息的缓存区,生产者将消息发送到队列,消费者从队列接收消息。


生产者:将消息发送到队列中的应用程序。


消费者:从队列中接收消息并进行处理的应用程序。


应用场景:


简单模式适用于只有一个生产者和一个消费者的场景。


适用于需要异步处理任务的场景,如发送邮件、短信等。

RabbitMQ 的工作队列模式

一种经典的消息队列应用,它将消息发送到队列中,多个消费者从队列中接收消息并进行处理,实现了消息的分发和处理。以下是工作队列模式的基本概念和应用场景:

基本概念:


消息队列:用于存储消息的缓存区,生产者将消息发送到队列,多个消费者从队列接收消息。


生产者:将消息发送到队列中的应用程序。


消费者:从队列中接收消息并进行处理的应用程序。


应用场景:


工作队列模式适用于需要异步处理任务的场景,如数据处理、图片处理等。


适用于需要平衡负载的场景,多个消费者可以共同处理队列中的消息。


RabbitMQ 工作队列模式的示例代码,它实现了一个生产者将消息发送到队列中,多个消费者从队列中接收消息并进行处理,实现了消息的分发和处理。

RabbitMQ 的发布订阅模式

一种广播式的消息队列应用,它将消息发送到交换机中,多个队列绑定到交换机上并接收消息,实现了消息的广播和接收。以下是发布订阅模式的基本概念和应用场景:


基本概念:


交换机:用于接收生产者发送的消息并将消息广播到绑定到它上面的所有队列。


队列:用于存储消息的缓存区,多个消费者从队列接收消息。


生产者:将消息发送到交换机中的应用程序。


消费者:从队列中接收消息并进行处理的应用程序。


应用场景:


发布订阅模式适用于需要广播消息的场景,如新闻推送、实时数据更新等。


适用于需要多个消费者同时接收消息的场景。

RabbitMQ 的路由模式

路由模式是 RabbitMQ 中的一种消息路由模式,它基于消息的 routing key(路由键)来将消息路由到指定的队列中。在路由模式中,生产者将消息发送到交换机中,,交换机根据消息的路由键将消息路由到指定的队列中,消费者从指定的队列中获取消息并进行处理。


在路由模式中,交换机有一个特殊的类型,即 direct(直连)类型。它会将消息发送到指定的队列中,而不会广播到所有队列中。


路由模式适用于需要将消息路由到指定的队列中的场景,比如根据消息的类型、来源、优先级等将消息分发到不同的队列中,让不同的消费者处理不同的消息。

RabbitMQ 的主题模式

根据主题关键词路由消息的队列应用,它将消息发送到交换机中,根据主题关键词路由到绑定到它上面的队列中。以下是主题模式的基本概念和应用场景:

基本概念:

交换机:用于接收生产者发送的消息并将消息路由到绑定到它上面的队列中。

队列:用于存储消息的缓存区,消费者从队列中接收消息并进行处理。

生产者:将消息发送到交换机中的应用程序。

消费者:从队列中接收消息并进行处理的应用程序。

主题:由一个或多个单词组成,用“.”分隔,如“order.create”。

应用场景:


主题模式适用于需要根据主题关键词路由消息的场景,如日志收集、实时监控等。

适用于需要根据不同的主题关键词将消息发送到不同的队列中进行处理的场景。

SpringBoot代码示例

  1. 好的,下面是在上面的代码示例中添加其他两种路由模式的代码。

RabbitMQ 配置类代码

@Configuration
public class RabbitMQConfig {
    // 读取配置文件中的 RabbitMQ 连接信息、交换机和队列的名称等信息
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.direct.exchange}")
    private String directExchange;
    @Value("${spring.rabbitmq.topic.exchange}")
    private String topicExchange;
    @Value("${spring.rabbitmq.fanout.exchange}")
    private String fanoutExchange;
    @Value("${spring.rabbitmq.headers.exchange}")
    private String headersExchange;
    // 创建 RabbitMQ 连接工厂
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    // 创建直连交换机并添加到容器中
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(directExchange);
    }
    // 创建主题交换机并添加到容器中
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(topicExchange);
    }
    // 创建扇形交换机并添加到容器中
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }
    // 创建头交换机并添加到容器中
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(headersExchange);
    }
}

这部分代码主要是添加了头交换机的配置。

生产者代码

@Component
public class RabbitMQProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @Value("${spring.rabbitmq.direct.exchange}")
    private String directExchange;
    @Value("${spring.rabbitmq.topic.exchange}")
    private String topicExchange;
    @Value("${spring.rabbitmq.fanout.exchange}")
    private String fanoutExchange;
    @Value("${spring.rabbitmq.headers.exchange}")
    private String headersExchange;
    // 发送直连模式的消息
    public void sendDirectMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend(directExchange, routingKey, message);
    }
    // 发送主题模式的消息
    public void sendTopicMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend(topicExchange, routingKey, message);
    }
    // 发送广播模式的消息
    public void sendFanoutMessage(String message) {
        rabbitTemplate.convertAndSend(fanoutExchange, "", message);
    }
    // 发送头模式的消息
    public void sendHeadersMessage(String message) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("type", "headers");
        messageProperties.setHeader("foo", "bar");
        Message messageObj = new Message(message.getBytes(), messageProperties);
        rabbitTemplate.send(headersExchange, "", messageObj);
    }
}

该生产者使用了 RabbitMQ 的 AmqpTemplate 来发送消息,其中 directExchange、topicExchange、fanoutExchange 和 headersExchange 是指定的交换机名称。生产者的 sendDirectMessage、sendTopicMessage、sendFanoutMessage 和 sendHeadersMessage 方法分别用于发送直连、主题、广播和头模式的消息。

消费者代码

@Component
public class RabbitMQConsumer {
    // 监听直连模式的队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.direct.queue1}"),
            exchange = @Exchange(value = "${spring.rabbitmq.direct.exchange}", type = "direct"),
            key = "direct.queue1"
    ))
    public void receiveDirectMessage1(String message) {
        System.out.println("Direct Consumer1 Received Message: " + message);
    }
    // 监听主题模式的队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.topic.queue1}"),
            exchange = @Exchange(value = "${spring.rabbitmq.topic.exchange}", type = "topic"),
            key = "topic.queue1"
    ))
    public void receiveTopicMessage1(String message) {
        System.out.println("Topic Consumer1 Received Message: " + message);
    }
    // 监听广播模式的队列
    @RabbitListener(queues = "${spring.rabbitmq.fanout.queue1}")
    public void receiveFanoutMessage1(String message) {
        System.out.println("Fanout Consumer1 Received Message: " + message);
    }
    // 监听头模式的队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.headers.queue1}", autoDelete = "true"),
            exchange = @Exchange(value = "${spring.rabbitmq.headers.exchange}", type = "headers"),
            arguments = {
                    @Argument(name = "x-match", value = "all"),
                    @Argument(name = "type", value = "headers"),
                    @Argument(name = "foo", value = "bar")
            }
    ))
    public void receiveHeadersMessage1(Message message) {
        System.out.println("Headers Consumer1 Received Message: " + new String(message.getBody()));
    }
}

该消费者使用了 RabbitMQ 的 @RabbitListener 注解来监听队列,并使用 @QueueBinding 和 @Exchange 注解来绑定队列和交换机。消费者的 receiveDirectMessage1、receiveTopicMessage1、receiveFanoutMessage1 和 receiveHeadersMessage1 方法分别监听四种模式的指定队列,接受消息并进行处理。


生产者将消息发送到指定的交换机中,交换机根据消息的路由键将消息路由到指定的队列中,消费者从指定的队列中获取消息并进行处理。由于不同的交换机和队列使用了不同的路由模式,因此每个消费者只会接收到它所监听的队列中的消息。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
9天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
40 3
|
5月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
87 2
|
5月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
88 1
|
5月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
5月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
77 0
|
6月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决