提纲要点
基本概念
特点
- 可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。
- 灵活的分发消息策略(路由):这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
- 支持集群:多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 多种协议:RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
- 支持多种语言客户端:RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
- 可视化管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
- 插件机制:RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件
AMQP协议
RabbitMQ 基于AMQP 是一个高级消息队列传输协议( Advanced Message Queuing Protocol )
- 一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
- 一个网络层协议AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。
AMQ Model
- exchange(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
- message queue(消息队列):存储消息。直到消息被安全的投递给了消费者。
- binding :定义了 message queue 和 exchange 之间的关系,提供了消息路由的规则。
RabbitMQ中主要组成部分
- Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
- Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。
- Queue:消息队列,存储消息的队列。
- Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
- Consumer:消息消费者。消费队列中存储的消息。
RabbitMQ交换机模型
Direct Exchange
直连交换机:消息和特定的路由键完全匹配
创建消息队列
创建队列绑定交换机
指定路由发送消息
结论:匹配了路由key值发送到队列中
Fanout Exchange
广播交换机:一 个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息 (发布/订阅模式)
创建广播类型交换机
创建两个队列
绑定路由和key
发送消息
结论:只要绑定到交换机中,就会往指定的交换机中发送消息
Topic Exchange
主题交换机: 交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 "#"。需要注意的是通配符前面必须要加上"."符号
发送消息代码
@ApiOperation(value = "发送广播消息", notes = "发送广播消息") @GetMapping(value = "private/test3") public String publishMessage3( @RequestParam("name")String name ) { rabbitTemplate.convertAndSend("MyTest-topic", "my.topic.A",name); return "success"; }
路由KEY匹配my.topic.A和my.topic.* 所以两个队列都能接收到到消息
Headers Exchange
请求头部交换机:它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由
RabbitMq 消息过期时间TTL
RabbitMq 队列中的消息都是由TTL时间,当消息达到一定时间后将会失效,具体时间发送方案:
rabbitTemplate.convertAndSend(RabbitStatistics.GQS_MILEAGE, RabbitStatistics.GQS_MILEAGE_KEY, JSON.toJSONString(dto), message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置这条消息的过期时间 messageProperties.setExpiration(String.valueOf(1000 * 24 * 60 * 60)); return message; });
多租户模式:虚拟主机和隔离
每一个 RabbitMQ 服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost)。每一个 vhost本质上是一个mini版的 RabbitMq 服务器,拥有自己的队列交换器和绑定·.....更重要的是,它拥有自己的权限机制。这使得你能够安全地使用一个 RabbitMq 服务器来服务众多应用程序
队列
- 死信队列DLX
- 延迟队列: 消费者订阅的是 死信队列,没有消费者订阅普通队列的话,当消息过期时间到了,就会被路由到死信队列,这就达成了,消息被延迟消费的目的。
- 优先级队列
- 新建队列设置最大优先级(最大优先级不要太高,会耗费CPU资源)
- 发送消息的时候,消息设置优先级,如果没有设置的情况下默认优先级为1
rabbitTemplate.convertAndSend(RabbitStatistics.GQS_MILEAGE, RabbitStatistics.GQS_MILEAGE_KEY, JSON.toJSONString(dto), message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置这条消息的过期时间 messageProperties.setExpiration(String.valueOf(1000 * 24 * 60 * 60)); pro.getMessageProperties().setPriority(5); return message; });
持久化
- 交换器的持久化在声明交换器是将 durable 参数设置为 true 实现,如果不持久化,RabbitMQ 服务重启之后,相关的交换器元数据会丢失(没有这个交换器了),但是 队列和消息不会丢失(分情况是否设置持久化),只是 不能将消息发送到这个交换器了。
- 队列的持久化在声明队列时将 durable 参数设置为 true 实现,如果不持久化,RabbitMQ 服务重启之后,相关的 元数据会丢失,消息也会丢失;
- 消息的持久化但是队列的持久化,并 不能保证消息数据不丢失,要保证消息不丢失,需要将消息的投递模式设置为 2 (BasicProperties 中的 deliveryMode 属性)
消息删除
过期删除策略:通过设置消息TTL时间,消息到时间自动删除
消息确认删除:通过ack消息消费确认的机制,进行消息删除
事务
springboot 中启用事务管理
@Bean("rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); }
springboot 中使用事务
@RestController @RequestMapping("/v1") @Slf4j @Api(value = "测试类", description = "测试类") public class TestController { @Autowired private RabbitTemplate rabbitTemplate; @ApiOperation(value = "发送广播消息", notes = "发送广播消息") @GetMapping(value = "private/test") @Transactional(transactionManager = "rabbitTransactionManager",rollbackFor = Exception.class) public String run( @RequestParam("name")String name ) { rabbitTemplate.convertAndSend("MyTest-fanout", null,name); return "success"; } }
消息发送和确认机制
生产者消息发送确认机制
消息发送后需要确认消息是否已经正确的入队列,通过消息确认回调(setConfirmCallback)和路由key回调(setReturnsCallback)
package com.example.demo.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class RabbitMqConfig { @Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { return; } }); /** * 在找不过交换机和routingkey还是会触发 */ rabbitTemplate.setReturnsCallback(returned -> { log.error("返回消息{}",returned); log.error("ReturnCallback: " + "消息:" + returned.getMessage()); log.error("ReturnCallback: " + "回应码:" + returned.getReplyCode()); }); return rabbitTemplate; } }
CorrelationData 可以设置ID进行消息的传递
消费者消息确认机制
自动确认模式:
在消费者处理完消息后,自动确认消息已被消费。这种模式下,如果消费者在处理消息时发生异常,那么消息也会被认为已被确认,会从队列中删除,而无法重新消费。因此,不建议使用此模式。
手动确认模式(无需返回确认结果):
消费者在接收到消息后,不会自动确认消息已被消费,而是需要手动调用 channel.basicAck 方法来确认消息已被消费。此模式下,如果消费者在处理消息时发生异常,那么消息不会被确认,也不会从队列中删除,可以重新消费。
手动确认模式(需要返回确认结果):
消费者在接收到消息后,需要手动调用 channel.basicAck 方法来确认消息已被消费,并需要在回调方法中返回确认结果。如果消费者在处理消息时发生异常,那么消息不会被确认,也不会从队列中删除,可以重新消费。
一般建议使用手动确认模式,因为这种模式下可以保证消息的可靠性,并且可以防止消息的丢失。
在 Spring Boot 中,acknowledge-mode 有以下几个配置选项:
AUTO: 自动确认模式。
消息消费者收到消息后,自动发送确认消息,无需手动调用确认方法。此模式是默认的确认模式。
MANUAL: 手动确认模式。
消息消费者接收到消息后,不会自动发送确认消息,需要手动调用确认方法来确认消息已经被消费。如果没有手动确认消息,则消息会一直存在于队列中,不会被删除。
NONE: 不确认模式。
消息消费者接收到消息后,不会自动发送确认消息,并且也不会手动确认消息。此模式会造成消息被重复消费,因此一般不建议使用。
配置文件
spring: rabbitmq: listener: simple: acknowledge-mode: MANUAL(auto,none)
在监听器方法中,需要手动调用 Channel 对象的 basicAck 方法来确认消息。例如:
@RabbitListener(queues = "myQueue") public void handleMessage(Message message, Channel channel) throws IOException { // 处理消息 // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
通过绑定队列,交换机,key的方式监听消息,下面这种方式会自动创建路由值进行绑定
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic-2", durable = "true"), exchange = @Exchange(value = "MyTest-topic",type = "topic"), key = "my.topic.B")) public void receiveMessage2(Message message, Channel channel) throws IOException { // 处理消息 try { String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); // 消息处理逻辑 System.out.println("Received message: " + messageBody); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消息处理异常,手动拒绝消息并重新入队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } }
消费者消费模式
公平模式
yml配置
spring: rabbitmq: host: 10.123.35.161 port: 5672 username: admin password: admin123 virtual-host: /vmsdms #消息确认配置项 #确认消息已发送到交换机(Exchange) publisher-confirms: true #确认消息已发送到队列(Queue) publisher-returns: true listener: simple: acknowledge-mode: manual #消费手动确认 prefetch: 1 #每次从一次性从broker里面取的待消费的消息的个数 retry: enabled: true # 允许消息消费失败的重试 max-attempts: 3 # 消息最多消费次数3次 initial-interval: 2000 # 消息多次消费的间隔2秒
关键信息: prefetch 获取消费的个数设为1,2,3,4;表示每次获取的数量,只要设置了这个数据,会变成公平模式
消费者代码
package com.example.demo.handler; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.TimeUnit; /** * @author jiangmb * @version 1.0.0 * @date 2023-03-23 16:53 */ @Component public class CustomerModeHandler { @RabbitListener(queues = {"topic-4"}) public void receiveMessage1(Message message, Channel channel) throws IOException { // 处理消息 try { String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); // 消息处理逻辑 System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者1 Received message: " + messageBody); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消息处理异常,手动拒绝消息并重新入队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } @RabbitListener(queues = {"topic-4"}) public void receiveMessage2(Message message, Channel channel) throws IOException { // 处理消息 try { String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); // 消息处理逻辑 System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者2 Received message: " + messageBody); TimeUnit.SECONDS.sleep(10); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消息处理异常,手动拒绝消息并重新入队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
消费结果
结论:消费公平模式下,消费性能比较高的机器能够负载更高的并发量
轮询模式
yml配置
spring: rabbitmq: host: 10.123.35.161 port: 5672 username: admin password: admin123 virtual-host: /vmsdms #消息确认配置项 #确认消息已发送到交换机(Exchange) publisher-confirms: true #确认消息已发送到队列(Queue) publisher-returns: true listener: simple: acknowledge-mode: manual #消费手动确认 # prefetch: 1 #每次从一次性从broker里面取的待消费的消息的个数 retry: enabled: true # 允许消息消费失败的重试 max-attempts: 3 # 消息最多消费次数3次 initial-interval: 2000 # 消息多次消费的间隔2秒
关键信息: prefetch 不设置的数量那么就是轮询模式
消费者代码
package com.example.demo.handler; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.TimeUnit; /** * @author jiangmb * @version 1.0.0 * @date 2023-03-23 16:53 */ @Component public class CustomerModeHandler { @RabbitListener(queues = {"topic-4"}) public void receiveMessage1(Message message, Channel channel) throws IOException { // 处理消息 try { String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); // 消息处理逻辑 System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者1 Received message: " + messageBody); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消息处理异常,手动拒绝消息并重新入队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } @RabbitListener(queues = {"topic-4"}) public void receiveMessage2(Message message, Channel channel) throws IOException { // 处理消息 try { String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); // 消息处理逻辑 System.out.println(LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"消费者2 Received message: " + messageBody); TimeUnit.SECONDS.sleep(10); // 手动确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消息处理异常,手动拒绝消息并重新入队列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
消费结果
附加
springboot rabbitmq 配置和解析
spring: rabbitmq: host: localhost # RabbitMQ服务的主机名或IP地址 port: 5672 # RabbitMQ服务的端口号,默认为5672 username: guest # 连接RabbitMQ服务的用户名,默认为guest password: guest # 连接RabbitMQ服务的密码,默认为guest virtual-host: / # 虚拟主机名称,默认为/ connection-timeout: 60000 # 连接超时时间(毫秒),默认为60000 requested-heartbeat: 60 # 请求心跳时间(秒),默认为60 publisher-confirms: false # 是否启用生产者确认,默认为false publisher-returns: false # 是否启用生产者返回,默认为false template: receive-timeout: 60000 # 接收消息的超时时间(毫秒),默认为60000 reply-timeout: 5000 # 请求回复的超时时间(毫秒),默认为5000 retry: enabled: false # 是否启用重试,默认为false initial-interval: 1000 # 初始重试间隔时间(毫秒),默认为1000 max-interval: 10000 # 最大重试间隔时间(毫秒),默认为10000 multiplier: 1.0 # 重试时间乘数,默认为1.0 max-attempts: 3 # 最大重试次数,默认为3 stateless: false # 是否启用无状态重试,默认为false backoff: type: exponential # 退避类型,默认为exponential multiplier: 2.0 # 退避时间乘数,默认为2.0 max-interval: 30000 # 最大退避时间(毫秒),默认为30000 initial-interval: 1000 # 初始退避时间(毫秒),默认为1000 max-attempts: 3 # 最大重试次数,默认为3 mandatory: false # 是否启用强制标志,默认为false receive-timeout: 60000 # 接收消息的超时时间(毫秒),默认为60000 reply-timeout: 5000 # 请求回复的超时时间(毫秒),默认为5000 exchange: # 指定Exchange的名称和类型 name: myExchange type: topic routing-key: myRoutingKey # 指定Routing Key default-receive-queue: myQueue # 默认接收队列的名称 message-converter: json # 指定消息转换器 listener: direct: auto-startup: true # 是否自动启动,默认为true acknowledge-mode: auto # 确认模式,默认为auto concurrency: 1 # 消费者线程数,默认为1 max-concurrency: 10 # 最大消费者线程数,默认为10 prefetch: 1 # 每个消费者一次