1. 消息确认机制
为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。
消费者在订阅队列时,可以指定 autoAck 参数,
(1) 当 autoAck = false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。
(2)当 autoAck = true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
当autoAck 参数为 false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。
如果 RabbitMQ 服务器端一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。如下图:
RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。
其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认。如下图:
2. 消息发送确认
2.1 ConfirmCallback方法
ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。
我们需要在生产者的配置中添加下面配置,表示开启发布者确认。
spring.rabbitmq.publisher-confirm-type=correlated # 新版本 spring.rabbitmq.publisher-confirms=true # 老版本
2.2 ReturnCallback方法
通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。
使用此接口需要在生产者配置中加入一下配置,表示发布者返回。
spring.rabbitmq.publisher-returns=true
【示例】发送端实现消息发送确认功能(交换器确认、队列确认)。
(1)创建第一个 SpringBoot 项目( rabbitmq-provider 消息发送项目)。
在pom.xml配置信息文件中,添加相关依赖文件:
<!-- AMQP客户端 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.4.1</version> </dependency>
在 application.yml 配置文件中配置 RabbitMQ 服务:
spring:
项目名称
application: name: rabbitmq-provider
RabbitMQ服务配置
rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 消息确认(ACK) publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) publisher-returns: true #确认消息已发送到队列(Queue)
(2)配置队列
在 rabbitmq-provider(消息发送项目)中,配置消息确认,队列名称等,并将队列交由 IoC 管理,代码如下:
import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.rabbit.connection.ConnectionFactory; /** RabbitMQ配置类 **/ @Configuration public class RabbitMqConfig { public static final String QUEUE_NAME = "queue_name"; //队列名称 public static final String EXCHANGE_NAME = "exchange_name"; //交换器名称 public static final String ROUTING_KEY = "routing_key"; //路由键 @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); //确认消息送到交换机(Exchange)回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("\n确认消息送到交换机(Exchange)结果:"); System.out.println("相关数据:" + correlationData); System.out.println("是否成功:" + ack); System.out.println("错误原因:" + cause); } }); //确认消息送到队列(Queue)回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("\n确认消息送到队列(Queue)结果:"); System.out.println("发生消息:" + returnedMessage.getMessage()); System.out.println("回应码:" + returnedMessage.getReplyCode()); System.out.println("回应信息:" + returnedMessage.getReplyText()); System.out.println("交换机:" + returnedMessage.getExchange()); System.out.println("路由键:" + returnedMessage.getRoutingKey()); } }); return rabbitTemplate; } /** - 队列 */ @Bean public Queue queue() { /** - 创建队列,参数说明: - String name:队列名称。 - boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。 - 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。 - boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。 - boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除, - 当没有生产者或者消费者使用此队列,该队列会自动删除。 - Map<String, Object> arguments:设置队列的其他一些参数。 */ return new Queue(QUEUE_NAME, true, false, false, null); } /** - Direct交换器 */ @Bean public DirectExchange exchange() { /** - 创建交换器,参数说明: - String name:交换器名称 - boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。 - 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。 - boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除, */ return new DirectExchange(EXCHANGE_NAME, true, false); } /** - 绑定 */ @Bean Binding binding(DirectExchange exchange, Queue queue) { //将队列和交换机绑定, 并设置用于匹配键:routingKey return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
(3)创建发送者 (生产者)
在 rabbitmq-provider(消息发送项目)中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息。
同时在代码中故意将 routingKey 参数写入错误,让其应发确认消息送到队列失败回调,代码如下:
import com.pjb.config.RabbitMqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
/**
RabbitMq测试类
**/
@SpringBootTest
public class RabbitMqTest
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test public void sendMessage() throws Exception { String message = "您好,欢迎访问";
//这里故意将routingKey参数写入错误,让其应发确认消息送到队列失败回调
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, "no_queue_name", message);
//由于这里使用的是测试方法,当测试方法结束,RabbitMQ相关的资源也就关闭了,
//会导致消息确认的回调出现问题,所有加段延时
Thread.sleep(2000);
}
}
3、消息接收确认
消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。 RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。 **消息确认模式有**: - AcknowledgeMode.NONE:自动确认。 - AcknowledgeMode.AUTO:根据情况确认。 - AcknowledgeMode.MANUAL:手动确认。 消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。 Basic.Ack 命令:用于确认当前消息。 Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。 Basic.Reject 命令:用于拒绝当前消息。 **3.1 basicAck 方法** basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下: void basicAck(long deliveryTag, boolean multiple) throws IOException; 参数说明: long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。 boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。 **3.2 basicNack 方法** basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下: ```java void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
参数说明:
long deliveryTag:唯一标识 ID。
boolean multiple:上面已经解释。
boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
3.3 basicReject 方法
basicReject 方法用于明确拒绝当前的消息而不是确认。 RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
Channel 类中的basicReject 方法定义如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
参数说明:
long deliveryTag:唯一标识 ID。
boolean requeue:上面已经解释。
【示例】消费者客户端实现消息接收确认。
(1)创建第二个 SpringBoot 项目( rabbitmq-consumer 消息接收项目)。
在pom.xml配置信息文件中,添加相关依赖文件:
<!-- AMQP客户端 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.4.1</version> </dependency>
在 application.yml 配置文件中配置 RabbitMQ 服务。
RabbitMQ服务配置
rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
(2)配置信息类
在 rabbitmq-consumer(消息接收项目)中,配置手动确认消息、消息接收确认。
import com.pjb.receiver.Receiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - RabbitMQ配置类 **/ @Configuration public class RabbitMqConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private Receiver receiver; //消息接收处理类 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { //消费者数量,默认10 int DEFAULT_CONCURRENT = 10; //每个消费者获取最大投递数量 默认50 int DEFAULT_PREFETCH_COUNT = 50; SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(DEFAULT_CONCURRENT); container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT); // RabbitMQ默认是自动确认,这里改为手动确认消息 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置一个队列 container.setQueueNames("queue_name"); //如果同时设置多个如下: 前提是队列都是必须已经创建存在的 //container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3"); //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues //container.setQueues(new Queue("TestDirectQueue",true)); //container.addQueues(new Queue("TestDirectQueue2",true)); //container.addQueues(new Queue("TestDirectQueue3",true)); container.setMessageListener(receiver); return container; } }
(3)创建接收者
在 rabbitmq-consumer(消息接收项目)中,创建创建接收者。
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /** - 接收者 **/ @Component public class Receiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); /** * 确认消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean multiple:是否批处理,当该参数为 true 时, * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。 */ channel.basicAck(deliveryTag, true); /** * 否定消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean multiple:是否批处理,当该参数为 true 时, * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。 * boolean requeue:如果 requeue 参数设置为 true, * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除, * 而不会把它发送给新的消费者。 */ //channel.basicNack(deliveryTag, true, false); } catch (Exception e) { e.printStackTrace(); /** * 拒绝消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean requeue:如果 requeue 参数设置为 true, * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除, * 而不会把它发送给新的消费者。 */ channel.basicReject(deliveryTag, true); } } }
【示例】设置监听多个队列,执行不同的消息接收确认。
(1)修改配置类
在上述的 RabbitMqConfig.java 配置类中,添加多个队列。
(2)修改接收者
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; /** - 接收者 - **/ @Component public class Receiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { if ("queue_name".equals(message.getMessageProperties().getConsumerQueue())) { System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); System.out.println("执行queue_name中的消息的业务处理流程......"); } if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())) { System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue()); System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); System.out.println("执行fanout.A中的消息的业务处理流程......"); } /** * 确认消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean multiple:是否批处理,当该参数为 true 时, * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。 */ channel.basicAck(deliveryTag, true); /** * 否定消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean multiple:是否批处理,当该参数为 true 时, * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。 * boolean requeue:如果 requeue 参数设置为 true, * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除, * 而不会把它发送给新的消费者。 */ //channel.basicNack(deliveryTag, true, false); } catch (Exception e) { e.printStackTrace(); /** * 拒绝消息,参数说明: * long deliveryTag:唯一标识 ID。 * boolean requeue:如果 requeue 参数设置为 true, * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除, * 而不会把它发送给新的消费者。 */ channel.basicReject(deliveryTag, true); } } }