上一篇说到生产者消息确认机制,它可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果MQ宕机,也可能导致消息丢失,所以提出了消息持久化。持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据。而持久化机制又可以分为:
- 交换机持久化
- 队列持久化
- 消息持久化
在上一篇的案例代码中,我们故意把交换机的durable设置为false,队列的durable为true。
这里说一下另外两个参数:
- exclusive:排他队列。如果一个队列被声明为排他队列,那么这个队列只能被第一次声明他的连接所见,并在连接断开的时候自动删除。这里有三点需要说明:
- 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列
- 如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同
- 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景
- autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
当重启MQ服务后发现交换机已被销毁,而队列还存在,这是符合我们预期的。
我们在SpringBoot项目中使用spring-boot-starter-amqp 声明的交换机、队列、消息默认都是持久化的。而交换机和队列的持久化从上述代码中可以看到是通过指定durable的值。
在SpringBoot中使用rabbitTemplate.convertAndSend发送消息时
在做消息转换的时候,我们注意到,传入了一个MessageProperties对象。
在MessageProperties中,有个deliveryMode属性,该属性默认值为:MessageDeliveryMode.PERSISTENT(持久化的)
但是要明白即便交换机、队列、消息都设置持久化,也不能保证消息100%不丢失。因为有可能RabbitMQ接收到了消息,但是还没来得及持久化到磁盘,他自己就宕机了,这个时候消息还是会丢失的。
可以考虑生产者在发送消息时在数据库中写入消息日志,但是在后期消息补发的时候就需要人工介入,将失败的消息捞出来然后重新发送。只要消息数据在,即便MQ宕机了也只是补数据的事。
也可以单独起一个定时任务,周期性的去将这些失败存储的消息进行重发。如果你的MQ服务故障后几分钟就恢复了,那么重试的时候消息就能够成功发出去了。
下一篇聊一聊消费者手动ACK。