RabbitMQ的工作模式

简介: 前言RabbitMQ 是一个流行的开源消息代理,它实现了 AMQP(高级消息队列协议)标准,提供了可靠的消息传递机制。RabbitMQ 支持五种不同的工作模式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式,每种模式都适用于不同的应用场景。在本篇博客中,我们将详细介绍这五种工作模式的原理和使用方法,帮助读者更好地理解 RabbitMQ,并且在实践中选择合适的工作模式来处理不同的消息传递需求。

前言

RabbitMQ 是一个流行的开源消息代理,它实现了 AMQP(高级消息队列协议)标准,提供了可靠的消息传递机制。RabbitMQ 支持五种不同的工作模式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式,每种模式都适用于不同的应用场景。在本篇博客中,我们将详细介绍这五种工作模式的原理和使用方法,帮助读者更好地理解 RabbitMQ,并且在实践中选择合适的工作模式来处理不同的消息传递需求。

1. 工作模式概念

一、simple模式(即最简单的收发模式)  

1. 消息产生消息,将消息放入队列

2. 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢

出)。

720e89c2ede546b2bb4e3759976b4c0f.png

二、work工作模式(资源的竞争)

工作队列模式也称为任务队列模式。在这种模式下,有多个消费者从同一个队列中获取消息并进行处理。消息被平均分配给不同的消费者进行处理,确保每个消息只被处理一次。

1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。

C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某

一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者

使用)。

db70703ea97b4984917316d2f0667b3a.png

三.publish/subscribe发布订阅(共享资源)

1. 每个消费者监听自己的队列;

2. 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的

队列都将接收到消息。

c92c9d9e041745c9985878eaf2a25056.png

四.routing路由模式

1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符

(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消

息;

2. 根据业务功能定义路由字符串

3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4. 业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误

99959d2053f94af892470b64825e0092.png

五.topic 主题模式(路由模式的一种)

2. 场景和示例

RabbitMQ 的简单模式

是最基础的模式,它只有一个生产者将消息发送到一个队列中,一个消费者从队列中接收消息并进行处理。以下是简单模式的基本概念和应用场景:


基本概念:


消息队列:用于存储消息的缓存区,生产者将消息发送到队列,消费者从队列接收消息。


生产者:将消息发送到队列中的应用程序。


消费者:从队列中接收消息并进行处理的应用程序。


应用场景:


简单模式适用于只有一个生产者和一个消费者的场景。


适用于需要异步处理任务的场景,如发送邮件、短信等。

RabbitMQ 的工作队列模式

一种经典的消息队列应用,它将消息发送到队列中,多个消费者从队列中接收消息并进行处理,实现了消息的分发和处理。以下是工作队列模式的基本概念和应用场景:

基本概念:


消息队列:用于存储消息的缓存区,生产者将消息发送到队列,多个消费者从队列接收消息。


生产者:将消息发送到队列中的应用程序。


消费者:从队列中接收消息并进行处理的应用程序。


应用场景:


工作队列模式适用于需要异步处理任务的场景,如数据处理、图片处理等。


适用于需要平衡负载的场景,多个消费者可以共同处理队列中的消息。


RabbitMQ 工作队列模式的示例代码,它实现了一个生产者将消息发送到队列中,多个消费者从队列中接收消息并进行处理,实现了消息的分发和处理。

RabbitMQ 的发布订阅模式

一种广播式的消息队列应用,它将消息发送到交换机中,多个队列绑定到交换机上并接收消息,实现了消息的广播和接收。以下是发布订阅模式的基本概念和应用场景:


基本概念:


交换机:用于接收生产者发送的消息并将消息广播到绑定到它上面的所有队列。


队列:用于存储消息的缓存区,多个消费者从队列接收消息。


生产者:将消息发送到交换机中的应用程序。


消费者:从队列中接收消息并进行处理的应用程序。


应用场景:


发布订阅模式适用于需要广播消息的场景,如新闻推送、实时数据更新等。


适用于需要多个消费者同时接收消息的场景。

RabbitMQ 的路由模式

路由模式是 RabbitMQ 中的一种消息路由模式,它基于消息的 routing key(路由键)来将消息路由到指定的队列中。在路由模式中,生产者将消息发送到交换机中,,交换机根据消息的路由键将消息路由到指定的队列中,消费者从指定的队列中获取消息并进行处理。


在路由模式中,交换机有一个特殊的类型,即 direct(直连)类型。它会将消息发送到指定的队列中,而不会广播到所有队列中。


路由模式适用于需要将消息路由到指定的队列中的场景,比如根据消息的类型、来源、优先级等将消息分发到不同的队列中,让不同的消费者处理不同的消息。

RabbitMQ 的主题模式

根据主题关键词路由消息的队列应用,它将消息发送到交换机中,根据主题关键词路由到绑定到它上面的队列中。以下是主题模式的基本概念和应用场景:

基本概念:

交换机:用于接收生产者发送的消息并将消息路由到绑定到它上面的队列中。

队列:用于存储消息的缓存区,消费者从队列中接收消息并进行处理。

生产者:将消息发送到交换机中的应用程序。

消费者:从队列中接收消息并进行处理的应用程序。

主题:由一个或多个单词组成,用“.”分隔,如“order.create”。

应用场景:


主题模式适用于需要根据主题关键词路由消息的场景,如日志收集、实时监控等。

适用于需要根据不同的主题关键词将消息发送到不同的队列中进行处理的场景。

SpringBoot代码示例

  1. 好的,下面是在上面的代码示例中添加其他两种路由模式的代码。

RabbitMQ 配置类代码

@Configuration
public class RabbitMQConfig {
    // 读取配置文件中的 RabbitMQ 连接信息、交换机和队列的名称等信息
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.direct.exchange}")
    private String directExchange;
    @Value("${spring.rabbitmq.topic.exchange}")
    private String topicExchange;
    @Value("${spring.rabbitmq.fanout.exchange}")
    private String fanoutExchange;
    @Value("${spring.rabbitmq.headers.exchange}")
    private String headersExchange;
    // 创建 RabbitMQ 连接工厂
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    // 创建直连交换机并添加到容器中
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(directExchange);
    }
    // 创建主题交换机并添加到容器中
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(topicExchange);
    }
    // 创建扇形交换机并添加到容器中
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }
    // 创建头交换机并添加到容器中
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(headersExchange);
    }
}

这部分代码主要是添加了头交换机的配置。

生产者代码

@Component
public class RabbitMQProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @Value("${spring.rabbitmq.direct.exchange}")
    private String directExchange;
    @Value("${spring.rabbitmq.topic.exchange}")
    private String topicExchange;
    @Value("${spring.rabbitmq.fanout.exchange}")
    private String fanoutExchange;
    @Value("${spring.rabbitmq.headers.exchange}")
    private String headersExchange;
    // 发送直连模式的消息
    public void sendDirectMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend(directExchange, routingKey, message);
    }
    // 发送主题模式的消息
    public void sendTopicMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend(topicExchange, routingKey, message);
    }
    // 发送广播模式的消息
    public void sendFanoutMessage(String message) {
        rabbitTemplate.convertAndSend(fanoutExchange, "", message);
    }
    // 发送头模式的消息
    public void sendHeadersMessage(String message) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("type", "headers");
        messageProperties.setHeader("foo", "bar");
        Message messageObj = new Message(message.getBytes(), messageProperties);
        rabbitTemplate.send(headersExchange, "", messageObj);
    }
}

该生产者使用了 RabbitMQ 的 AmqpTemplate 来发送消息,其中 directExchange、topicExchange、fanoutExchange 和 headersExchange 是指定的交换机名称。生产者的 sendDirectMessage、sendTopicMessage、sendFanoutMessage 和 sendHeadersMessage 方法分别用于发送直连、主题、广播和头模式的消息。

消费者代码

@Component
public class RabbitMQConsumer {
    // 监听直连模式的队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.direct.queue1}"),
            exchange = @Exchange(value = "${spring.rabbitmq.direct.exchange}", type = "direct"),
            key = "direct.queue1"
    ))
    public void receiveDirectMessage1(String message) {
        System.out.println("Direct Consumer1 Received Message: " + message);
    }
    // 监听主题模式的队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.topic.queue1}"),
            exchange = @Exchange(value = "${spring.rabbitmq.topic.exchange}", type = "topic"),
            key = "topic.queue1"
    ))
    public void receiveTopicMessage1(String message) {
        System.out.println("Topic Consumer1 Received Message: " + message);
    }
    // 监听广播模式的队列
    @RabbitListener(queues = "${spring.rabbitmq.fanout.queue1}")
    public void receiveFanoutMessage1(String message) {
        System.out.println("Fanout Consumer1 Received Message: " + message);
    }
    // 监听头模式的队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.headers.queue1}", autoDelete = "true"),
            exchange = @Exchange(value = "${spring.rabbitmq.headers.exchange}", type = "headers"),
            arguments = {
                    @Argument(name = "x-match", value = "all"),
                    @Argument(name = "type", value = "headers"),
                    @Argument(name = "foo", value = "bar")
            }
    ))
    public void receiveHeadersMessage1(Message message) {
        System.out.println("Headers Consumer1 Received Message: " + new String(message.getBody()));
    }
}

该消费者使用了 RabbitMQ 的 @RabbitListener 注解来监听队列,并使用 @QueueBinding 和 @Exchange 注解来绑定队列和交换机。消费者的 receiveDirectMessage1、receiveTopicMessage1、receiveFanoutMessage1 和 receiveHeadersMessage1 方法分别监听四种模式的指定队列,接受消息并进行处理。


生产者将消息发送到指定的交换机中,交换机根据消息的路由键将消息路由到指定的队列中,消费者从指定的队列中获取消息并进行处理。由于不同的交换机和队列使用了不同的路由模式,因此每个消费者只会接收到它所监听的队列中的消息。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
9月前
|
消息中间件 存储 网络协议
RabbitMq概述与工作模式(1)(上)
RabbitMq概述与工作模式(1)
72 0
|
9月前
|
消息中间件 数据库
RabbitMq概述与工作模式(1)(下)
RabbitMq概述与工作模式(1)
67 0
|
10月前
|
消息中间件 Dubbo 应用服务中间件
RabbitMQ 六种工作模式与应用场景
RabbitMQ 六种工作模式与应用场景
292 0
|
12月前
|
消息中间件 数据库
【消息中间件】RabbitMQ的工作模式
【消息中间件】RabbitMQ的工作模式
|
12月前
|
消息中间件 存储 缓存
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
|
消息中间件 网络架构
SpringCloudStream学习(二)RabbitMQ中的交换机跟工作模式
SpringCloudStream学习(二)RabbitMQ中的交换机跟工作模式
164 0
SpringCloudStream学习(二)RabbitMQ中的交换机跟工作模式
|
消息中间件
RabbitMQ工作模式5 Topics通配符模式
RabbitMQ工作模式5 Topics通配符模式
183 0
RabbitMQ工作模式5 Topics通配符模式
|
消息中间件 数据库
RabbitMQ工作模式4 Routing路由模式
RabbitMQ工作模式4 Routing路由模式
137 0
RabbitMQ工作模式4 Routing路由模式
|
消息中间件 存储 数据库
RabbitMQ工作模式3 Pub/Sub订阅模式
RabbitMQ工作模式3 Pub/Sub订阅模式
116 0
RabbitMQ工作模式3 Pub/Sub订阅模式
|
消息中间件
RabbitMQ工作模式2 Work queues工作队列模式
RabbitMQ工作模式2 Work queues工作队列模式
98 0
RabbitMQ工作模式2  Work queues工作队列模式