前言
如何保证消息的可靠性传输
或者说如何处理消息丢失的问题
问题分析
用MQ
有个基本原则,就是数据不能多一条,也不能少一条.
不能多,就是上文提及的重复消费和幂等性问题。
不能少,就是数据别搞丢了。
关于数据不丢失这个问题,如果说是用MQ
来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个MQ
传递过程中绝对不会把计费消息给弄丢。
问题剖析
关于数据的丢失问题,可能出现在生产者、MQ
、消费者中,
首先从RabbitMQ
和Kafka
分别来分析一下吧。
RabbitMQ
生产者弄丢了数据
生产者将数据发送到RabbitMQ
的时候,可能数据就在半路给搞丢了,可能是因为网络问题或其他问题皆有可能。
此时可以选择用RabbitMQ
提供的事务功能,就是生产者发送数据之前开启RabbitMQ
事务channel.txSelect()
。
然后发送消息,如果消息没有成功被RabbitMQ
接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback()
然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit()
。
try{ //通过工厂创建连接 connection=factory.newConnection(); //获取通道 channel=connection.createChannel(); //开启事务 channel.txSelect(); //这里发送消息 channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes()); //模拟出现异常 intresult=1/0; //提交事务 channel.txCommit(); }catch(IOException|TimeoutExceptione){ //捕捉异常,回滚事务 channel.txRollback(); }
但是问题是,经过RabbitMQ
事务机制(同步),基本上因为太耗性能导致吞吐量会下来。
所以一般来说,当要确保写入RabbitMQ
的消息别丢,可以开启confirm
模式,在生产者那里设置开启confirm
模式之后,每次写入的消息都会分配一个唯一的id。
然后如果写入了RabbitMQ
中,RabbitMQ
会回传一个ack
消息,告知这个消息ok了。
如果RabbitMQ
没能处理这个消息,会回调一个nack
接口,告知这个消息接收失败且你可以重试。
而且你可以结合这个机制,在内存里维护每个消息id
的状态,如果超过一定时间还没接收到这个消息的回调,那么可以进行重发。
事务机制和confirm
机制最大的不同在于:
事务机制是同步的,你提交一个事务之后会阻塞在那儿。
但是confirm
机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ
接收了之后会异步回调你的一个接口通知你这个消息接收到了。
所以一般在生产者避免数据丢失,都是用confirm
机制的。
已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。
客户端实现生产者confirm
有3种方式:
1.普通confirm模式:每发送一条消息后,调用waitForConfirms()
方法,等待服务器端confirm
,如果服务端返回false
或者在一段时间内都没返回,客户端可以进行消息重发。
channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes()); if(!channel.waitForConfirms()){ //消息发送失败 //... }
2.批量confirm模式:每发送一批消息后,调用waitForConfirms()
方法,等待服务端confirm
。
channel.confirmSelect(); for(inti=0;i<batchCount;++i){ channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes()); } if(!channel.waitForConfirms()){ //消息发送失败 //... }
3.异步confirm模式:提供一个回调方法,服务端confirm
了一条或者多条消息后客户端会回调这个方法。
SortedSet<Long>confirmSet=Collections.synchronizedSortedSet(newTreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(newConfirmListener(){ publicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{ if(multiple){ confirmSet.headSet(deliveryTag+1).clear(); }else{ confirmSet.remove(deliveryTag); } } publicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{ System.out.println("Nack,SeqNo:"+deliveryTag+",multiple:"+multiple); if(multiple){ confirmSet.headSet(deliveryTag+1).clear(); }else{ confirmSet.remove(deliveryTag); } } }); while(true){ longnextSeqNo=channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }
RabbitMQ
弄丢了数据
就是
RabbitMQ
自己弄丢了数据,这个必须开启RabbitMQ
的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ
自己宕机,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,
RabbitMQ
还没持久化,自己就宕机,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
- 创建
queue
的时候将其设置为持久化。这样就可以保证RabbitMQ
持久化queue
的元数据,但是它是不会持久化queue
里的数据。 - 第二个是发送消息的时候将消息的
deliveryMode
设置为2。就是将消息设置为持久化的,此时RabbitMQ
就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ
哪怕是宕机,再次重启,也会从磁盘上重启恢复queue
,恢复这个queue
里的数据。
注意:
虽然给RabbitMQ
开启了持久化机制,但是也有可能这个消息写到了RabbitMQ
中,但还没来得及持久化到磁盘上RabbitMQ
就发生宕机,便会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的confirm
机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack
了,所以若真是在持久化到磁盘之前,RabbitMQ
发生了宕机,数据丢了,生产者收不到ack
,也可以自己重发的。
消费端弄丢了数据
RabbitMQ
如果丢失了数据,主要是因为消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,RabbitMQ
认为都消费了,这数据就丢了。
这个时候得用RabbitMQ
提供的ack
机制。
简单来说,就是必须关闭RabbitMQ
的自动ack
,通过一个api
来调用ack
就行,然后每次你自己代码里确保处理完的时候,再在程序里ack
一把。
这样的话,如果还没处理完就没有ack
了。那么RabbitMQ
就认为还没处理完,这个时候RabbitMQ
会把这个消费分配给别的consumer
去处理,消息是不会丢的。
为了保证消息从队列中可靠地到达消费者,
RabbitMQ
提供了消息确认机制。消费者在声明队列时,可以指定
noAck
参数,当noAck=false
,RabbitMQ
会等待消费者显式发回ack
信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ
会在队列中立即删除它。
Kafka
消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况就是
当消费到了这个消息,然后消费者那边自动提交了
offset
,让Kafka
以为已经消费好了这个消息,但其实你才刚准备处理这个消息,还没处理自己就宕机,此时这条消息就丢失了。
这其实跟RabbitMQ
大同小异。都知道Kafka会自动提交offset
,那么只要关闭自动提交offset
,在处理完之后自己手动提交offset
,便可以保证数据不会丢失了。
但是此时确实还是可能会有重复消费,比如刚处理完,还没提交offset
,结果自己宕机,此时肯定会重复消费一次,那再自行保证幂等性就解决。
Kafka
弄丢了数据
这是比较常见的一个场景,当Kafka
某个broker
宕机,然后重新选举partition
的leader
。
要是此时其他的follower
刚好还有些数据没有同步,结果此时leader
也宕机了,然后选举某个follower
成leader
之后就少了一些数据便会丢了一些数据的情况。
所以此时一般是要求起码设置如下4个参数:
- 给topic设置
replication.factor
参数:这个值必须大于1,要求每个partition必须有至少2个副本。 - 在Kafka服务端设置
min.insync.replicas
参数:这个值必须大于1,这个是要求一个leader
至少感知到有至少一个follower
还跟自己保持联系,这样才能确保leader挂了还有一个follower
吧。 - 在
producer
端设置acks=all
:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。 - 在
producer
端设置retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
生产者不会弄丢数据
如果按照上述的思路设置了acks=all
,一定不会丢的前提是:
leader
接收到消息,所有的follower
都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
RocketMQ
消息丢失的场景
- 生产者发送消息到
MQ
有可能丢失消息 MQ
收到消息后写入硬盘可能丢失消息- 消息写入硬盘后,硬盘坏了丢失消息
- 消费者消费
MQ
也可能丢失消息 - 整个
MQ
节点挂了丢失消息
保证生产者发送消息时不丢失
解决发送时消息丢失的问题可以采用RocketMQ
自带的事物消息机制
事物消息原理:
首先生产者会发送一个half消息(对原始消息的封装),该消息对消费者不可见的。
MQ
通过ack
机制返回消息接受状态,生产者执行本地事务并且返回给MQ
一个状态(Commit、RollBack
等),
如果是Commit
的话MQ
就会把消息给到下游。
如果是RollBack
的话就会丢弃该消息。
如果为UnKnow
的话会过一段时间回查本地事务状态,默认回查15次,一直是UnKnow
状态的话就会丢弃此消息。
先发一个half消息的作用就是先判断下MQ
有没有问题,服务正不正常。
保证MQ
收到消息后写入硬盘不丢失
数据存盘绕过缓存,改为同步刷盘,这一步需要修改Broker
的配置文件,将flushDiskType
改为SYNC_FLUSH
同步刷盘策略,默认的是ASYNC_FLUSH
异步刷盘,一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了。
保证消息写入硬盘后,硬盘坏了不丢失
为了保证磁盘损坏导致丢失数据,RocketMQ
采用主从机构,集群部署,Leader
中的数据在多个Follower
中都存有备份,防止单点故障导致数据丢失。
Master节点挂了之后DLedger
登场
- 接管
MQ
的commitLog
- 选举从节点
- 文件复制
uncommited
状态多半从节点收到之后改为commited
保证消费者消费MQ
不丢失
- 如果是网络问题导致的消费失败可以进行重试机制,默认每条消息重试16次
- 多线程异步消费失败,
MQ
认为已经消费成功但是实际上对于业务逻辑来说消息是没有落地的,解决方案就是按照mq
官方推荐的先执行本地事务再返回成功状态。
保证整个MQ
节点宕机数据不丢失
这种极端情况可以消息发送失败之后先存入本地,例如放到缓存中,另外启动一个线程扫描缓存的消息去重试发送。