出现消息丢失的情况有三种:
(1)生产者把消息发送到 RabbitMQ Server 的过程中丢失
(2)RabbitMQ Server 收到消息后再持久化之前宕机导致消息 丢失
(3)消费端收到消息还没来得及处理,此时宕机,导致 RabbitMQ Server 认为这个消息已签收,无法重复投递导致消息丢失
所以我认为保证消息可靠性从 生产者、MQ、消费者中这三个方面着手就可以了
解决方案:
从生产者发送消息到 Server 端的角度:RabbitMQ 提供了事务(transaction)和确认(confirm)两种模式来确保生产者不丢消息。
(1)开启事务 channel.txSelect (),然后发送消息,如果发送过程中出现什么异常,事务就会回滚 channel.txRollback (),如果发送成功则提交事务 channel.txCommit (),但是这种方式是同步的,会导致吞吐量下降;
(2)confirm 确认模式:RabbitMQ 提供了一个 Confirm 消息确认机制,消息从生产者发送到 Server 端以后,如果消息处理成功,Server 端会返回一个 ack 的消息,那么客户端可以根据消息的处理结果,来决定是否要对消息进行重新发送,从而确保消息发送到 Server 端上。
从 RabbitMQ Server 端的角度:可以开启消息的持久化机制,也就是收到消息之后持久化到磁盘里面。设置持久化的方式有两个步骤:
(1)创建 Queue 的时候设置为持久化
(2)发送消息的时候,把消息投递模式设置为持久化投递。
不过虽然设置了持久化消息,但是有可能也会出现问题,比如消息刷新到磁盘之前,RabbitMQ 的 Server 端宕机,导致消息丢失的一个问题。所以为了确保万无一失,我们需要结合 Confirm 消息确认机制来一起使用。
从消费端的角度:我们可以把消息的自动确认机制修改为手动确认,也就是消费端只有手动调用消息确认方法,才表示这个消息已经被签收。
这种方式可能会造成消息的重复消费的问题,这里需要考虑到幂等性的一个设计。
以上就是我对保证消息的可靠性的全部理解。
- 生产者提交给消息服务器时,使用确认机制
- 消息服务器对应的队列、交换机等都持久化,保证数据的不丢失
- 消费者关闭 RabbitMQ 自动 ACK,采用消息确认机制,保证数据的不丢失