前言
RabbitMQ 是一个流行的开源消息代理,它实现了 AMQP(高级消息队列协议)标准,提供了可靠的消息传递机制。RabbitMQ 支持五种不同的工作模式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式,每种模式都适用于不同的应用场景。在本篇博客中,我们将详细介绍这五种工作模式的原理和使用方法,帮助读者更好地理解 RabbitMQ,并且在实践中选择合适的工作模式来处理不同的消息传递需求。
1. 工作模式概念
一、simple模式(即最简单的收发模式)
1. 消息产生消息,将消息放入队列
2. 消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢
出)。
二、work工作模式(资源的竞争)
工作队列模式也称为任务队列模式。在这种模式下,有多个消费者从同一个队列中获取消息并进行处理。消息被平均分配给不同的消费者进行处理,确保每个消息只被处理一次。
1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。
C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某
一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者
使用)。
三.publish/subscribe发布订阅(共享资源)
1. 每个消费者监听自己的队列;
2. 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的
队列都将接收到消息。
四.routing路由模式
1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符
(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消
息;
2. 根据业务功能定义路由字符串
3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
4. 业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可
以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误
五.topic 主题模式(路由模式的一种)
2. 场景和示例
RabbitMQ 的简单模式
是最基础的模式,它只有一个生产者将消息发送到一个队列中,一个消费者从队列中接收消息并进行处理。以下是简单模式的基本概念和应用场景:
基本概念:
消息队列:用于存储消息的缓存区,生产者将消息发送到队列,消费者从队列接收消息。
生产者:将消息发送到队列中的应用程序。
消费者:从队列中接收消息并进行处理的应用程序。
应用场景:
简单模式适用于只有一个生产者和一个消费者的场景。
适用于需要异步处理任务的场景,如发送邮件、短信等。
RabbitMQ 的工作队列模式
一种经典的消息队列应用,它将消息发送到队列中,多个消费者从队列中接收消息并进行处理,实现了消息的分发和处理。以下是工作队列模式的基本概念和应用场景:
基本概念:
消息队列:用于存储消息的缓存区,生产者将消息发送到队列,多个消费者从队列接收消息。
生产者:将消息发送到队列中的应用程序。
消费者:从队列中接收消息并进行处理的应用程序。
应用场景:
工作队列模式适用于需要异步处理任务的场景,如数据处理、图片处理等。
适用于需要平衡负载的场景,多个消费者可以共同处理队列中的消息。
RabbitMQ 工作队列模式的示例代码,它实现了一个生产者将消息发送到队列中,多个消费者从队列中接收消息并进行处理,实现了消息的分发和处理。
RabbitMQ 的发布订阅模式
一种广播式的消息队列应用,它将消息发送到交换机中,多个队列绑定到交换机上并接收消息,实现了消息的广播和接收。以下是发布订阅模式的基本概念和应用场景:
基本概念:
交换机:用于接收生产者发送的消息并将消息广播到绑定到它上面的所有队列。
队列:用于存储消息的缓存区,多个消费者从队列接收消息。
生产者:将消息发送到交换机中的应用程序。
消费者:从队列中接收消息并进行处理的应用程序。
应用场景:
发布订阅模式适用于需要广播消息的场景,如新闻推送、实时数据更新等。
适用于需要多个消费者同时接收消息的场景。
RabbitMQ 的路由模式
路由模式是 RabbitMQ 中的一种消息路由模式,它基于消息的 routing key(路由键)来将消息路由到指定的队列中。在路由模式中,生产者将消息发送到交换机中,,交换机根据消息的路由键将消息路由到指定的队列中,消费者从指定的队列中获取消息并进行处理。
在路由模式中,交换机有一个特殊的类型,即 direct(直连)类型。它会将消息发送到指定的队列中,而不会广播到所有队列中。
路由模式适用于需要将消息路由到指定的队列中的场景,比如根据消息的类型、来源、优先级等将消息分发到不同的队列中,让不同的消费者处理不同的消息。
RabbitMQ 的主题模式
根据主题关键词路由消息的队列应用,它将消息发送到交换机中,根据主题关键词路由到绑定到它上面的队列中。以下是主题模式的基本概念和应用场景:
基本概念:
交换机:用于接收生产者发送的消息并将消息路由到绑定到它上面的队列中。
队列:用于存储消息的缓存区,消费者从队列中接收消息并进行处理。
生产者:将消息发送到交换机中的应用程序。
消费者:从队列中接收消息并进行处理的应用程序。
主题:由一个或多个单词组成,用“.”分隔,如“order.create”。
应用场景:
主题模式适用于需要根据主题关键词路由消息的场景,如日志收集、实时监控等。
适用于需要根据不同的主题关键词将消息发送到不同的队列中进行处理的场景。
SpringBoot代码示例
- 好的,下面是在上面的代码示例中添加其他两种路由模式的代码。
RabbitMQ 配置类代码
@Configuration public class RabbitMQConfig { // 读取配置文件中的 RabbitMQ 连接信息、交换机和队列的名称等信息 @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.direct.exchange}") private String directExchange; @Value("${spring.rabbitmq.topic.exchange}") private String topicExchange; @Value("${spring.rabbitmq.fanout.exchange}") private String fanoutExchange; @Value("${spring.rabbitmq.headers.exchange}") private String headersExchange; // 创建 RabbitMQ 连接工厂 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } // 创建直连交换机并添加到容器中 @Bean public DirectExchange directExchange() { return new DirectExchange(directExchange); } // 创建主题交换机并添加到容器中 @Bean public TopicExchange topicExchange() { return new TopicExchange(topicExchange); } // 创建扇形交换机并添加到容器中 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(fanoutExchange); } // 创建头交换机并添加到容器中 @Bean public HeadersExchange headersExchange() { return new HeadersExchange(headersExchange); } }
这部分代码主要是添加了头交换机的配置。
生产者代码
@Component public class RabbitMQProducer { @Autowired private AmqpTemplate rabbitTemplate; @Value("${spring.rabbitmq.direct.exchange}") private String directExchange; @Value("${spring.rabbitmq.topic.exchange}") private String topicExchange; @Value("${spring.rabbitmq.fanout.exchange}") private String fanoutExchange; @Value("${spring.rabbitmq.headers.exchange}") private String headersExchange; // 发送直连模式的消息 public void sendDirectMessage(String routingKey, String message) { rabbitTemplate.convertAndSend(directExchange, routingKey, message); } // 发送主题模式的消息 public void sendTopicMessage(String routingKey, String message) { rabbitTemplate.convertAndSend(topicExchange, routingKey, message); } // 发送广播模式的消息 public void sendFanoutMessage(String message) { rabbitTemplate.convertAndSend(fanoutExchange, "", message); } // 发送头模式的消息 public void sendHeadersMessage(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("type", "headers"); messageProperties.setHeader("foo", "bar"); Message messageObj = new Message(message.getBytes(), messageProperties); rabbitTemplate.send(headersExchange, "", messageObj); } }
该生产者使用了 RabbitMQ 的 AmqpTemplate 来发送消息,其中 directExchange、topicExchange、fanoutExchange 和 headersExchange 是指定的交换机名称。生产者的 sendDirectMessage、sendTopicMessage、sendFanoutMessage 和 sendHeadersMessage 方法分别用于发送直连、主题、广播和头模式的消息。
消费者代码
@Component public class RabbitMQConsumer { // 监听直连模式的队列 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.direct.queue1}"), exchange = @Exchange(value = "${spring.rabbitmq.direct.exchange}", type = "direct"), key = "direct.queue1" )) public void receiveDirectMessage1(String message) { System.out.println("Direct Consumer1 Received Message: " + message); } // 监听主题模式的队列 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.topic.queue1}"), exchange = @Exchange(value = "${spring.rabbitmq.topic.exchange}", type = "topic"), key = "topic.queue1" )) public void receiveTopicMessage1(String message) { System.out.println("Topic Consumer1 Received Message: " + message); } // 监听广播模式的队列 @RabbitListener(queues = "${spring.rabbitmq.fanout.queue1}") public void receiveFanoutMessage1(String message) { System.out.println("Fanout Consumer1 Received Message: " + message); } // 监听头模式的队列 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.headers.queue1}", autoDelete = "true"), exchange = @Exchange(value = "${spring.rabbitmq.headers.exchange}", type = "headers"), arguments = { @Argument(name = "x-match", value = "all"), @Argument(name = "type", value = "headers"), @Argument(name = "foo", value = "bar") } )) public void receiveHeadersMessage1(Message message) { System.out.println("Headers Consumer1 Received Message: " + new String(message.getBody())); } }
该消费者使用了 RabbitMQ 的 @RabbitListener 注解来监听队列,并使用 @QueueBinding 和 @Exchange 注解来绑定队列和交换机。消费者的 receiveDirectMessage1、receiveTopicMessage1、receiveFanoutMessage1 和 receiveHeadersMessage1 方法分别监听四种模式的指定队列,接受消息并进行处理。
生产者将消息发送到指定的交换机中,交换机根据消息的路由键将消息路由到指定的队列中,消费者从指定的队列中获取消息并进行处理。由于不同的交换机和队列使用了不同的路由模式,因此每个消费者只会接收到它所监听的队列中的消息。