如何保证消息的可靠性传输(如何处理消息丢失的问题)
消息丢失的原因
- 生产者在写入消息到MQ的过程中,由于网络或者一些问题导致消息丢失
- 生产者将消息传输到MQ,但是MQ在保存消息的时候,内部问题导致消息丢失
- MQ收到消息之后,还未持久化到磁盘中,然后自己挂掉了,导致消息丢失
- 消费者获取到了这个消息,但是自己还没消费挂掉了,导致消息丢失
解决办法
生产者丢失消息
RabbitMQ
生产者将消息发送到RabbitMQ的时候,可能数据就在半路搞丢了,此时可以使用RabbitMQ的事务功能,就是生产者发送数据之前开启RabbitMQ事务(Channel.txSelect),然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务(Channel.txRollback),然后重试发送消息,如果收到了消息,那么可以提交事务(channel.txCommit),但是问题是,RabbitMQ事务机制一搞,基本上吞吐量会下来,因为太耗费性能
所以一般来说,如果说确保将消息写入到RagbbitMQ的时候,可以开启Confirm模式,在生产者哪里设置开启COnfirm模式之后,每次写的消息都会分配一个唯一的id,如果RabbitMQ没能处理到这个消息,会回调一个nack接口,告知这个消息接收失败,可以重试发送消息
简单来说,启动rabbitMQ的confirm模式后,在消息的提供者方面可以写两个接口,用来做RabbitMQ的异步回调通知,一个是ack()方法,代表的是消息成功接收,另一个是nack()方法,代表的是消息未成功接收,再未成功接收时,可以调用这个方法进行重新发送消息
MQ丢失数据
RabbitMQ
解决这个问题,需要开启RabbitMQ的持久化,就是消息写入之后,会持久化到磁盘,哪怕是RabbitmQ自己挂了,但是恢复之后也是会自动读取之前存储的数据,一般数据不会丢失,但是极其罕见的是,RabbitMQ还没持久化,自己就挂了,可能会导致少量的数据丢失,这个概率比较小
设置持久化有两个步骤,第一个是queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里面的数据,第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitMQ就会将消息持久化到磁盘上去,必须要同时设置这两个持久化才行
而且持久化可以跟生产者的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是持久化到磁盘之前,rabbitMQ挂了,数据丢了,生产者收不到ack(),同样也是可以重发的
Kafka
这个其实是比较常见的一个场景,kafka的某个Broker宕机,然后重新选举partition的leader时,如果这个时候其他的follower刚好有些数据没有同步,结果此时leader挂了,然后选举某个follower成为leader之后,这样就少了一些数据
解决上述问题需要设置下面的四个参数
- 给topic设置replication.factor参数:这个参数值必须大于1,要求每个partition必须有至少2个副本
- 再kafka服务端设置min.insync.replicas参数,这个值必须大于1,这个要求是一个leader至少感知到有个follower,还跟自己保持联系,没掉队,这样才能确保leader挂了还有个follower
- 再producer端设置acks=all,这个要求是每条数据,必须是写入所有的replica之后,才能认为是写入成功
- 在product端设置retries=MAX;这个是要求一旦写入失败,就会无限重试,卡在这里
按照上述配置之后,至少再kafka broker端就可以保证再leader所在的broker发生故障,进行leader切换的时候,数据不会丢失,同时 asks=all,retries这两个参数也一样保证了生产者的消息不会丢失
消费端弄丢了数据
RabbitMQ
RabbitMQ如果丢失了数据,主要是因为消费的时候,刚消费到,还没处理,结果进程挂了,比如说重启,这样就出问题了,RabbitMQ以为消费了,但是数据没消费,这样消息就丢了
这个时候可以使用RabbitMQ提供的ack机制,简单来说,就是关闭RabbitMQ的自动Ack,通过一个Api来手动调用ack回调,每次代码处理完成之后,直接在程序里进行ack调用,这样可以极大的保证消息的消费端不会丢失
kafka
kafka导致消费者弄丢数据的情况就是当你消费到这个消息的时候,然后消费者那边自动提交了offset,让kafka认为已经消费好了这个消息,其实刚准备处理这个消息,还没来得及处理,自己就挂了,这个会导致这条消息丢失
这个保证的方式还是一样,关闭kafka自动提交offset,改为手动提交,这样就可以保证消息数据不会丢失,当然有可能出现幂等的问题,但是自己保证幂等就可以