该如何保证消息的可靠性传输

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 该如何保证消息的可靠性传输

前言

如何保证消息的可靠性传输

或者说如何处理消息丢失的问题

问题分析

MQ有个基本原则,就是数据不能多一条,也不能少一条.

不能多,就是上文提及的重复消费和幂等性问题

不能少,就是数据别搞丢了。

关于数据不丢失这个问题,如果说是用MQ来传递非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个MQ传递过程中绝对不会把计费消息给弄丢

问题剖析

关于数据的丢失问题,可能出现在生产者、MQ、消费者中,

首先从RabbitMQKafka分别来分析一下吧。

RabbitMQ

image.png

生产者弄丢了数据

生产者将数据发送到RabbitMQ的时候,可能数据就在半路给搞丢了,可能是因为网络问题或其他问题皆有可能。

此时可以选择用RabbitMQ提供的事务功能,就是生产者发送数据之前开启RabbitMQ事务channel.txSelect()

然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback()

然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit()

try{
//通过工厂创建连接
connection=factory.newConnection();
//获取通道
channel=connection.createChannel();
//开启事务
channel.txSelect();
//这里发送消息
channel.basicPublish(exchange,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
//模拟出现异常
intresult=1/0;
//提交事务
channel.txCommit();
}catch(IOException|TimeoutExceptione){
//捕捉异常,回滚事务
channel.txRollback();
}

但是问题是,经过RabbitMQ事务机制(同步),基本上因为太耗性能导致吞吐量会下来

所以一般来说,当要确保写入RabbitMQ的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,每次写入的消息都会分配一个唯一的id。

然后如果写入了RabbitMQ中,RabbitMQ会回传一个ack消息,告知这个消息ok了。

如果RabbitMQ没能处理这个消息,会回调一个nack接口,告知这个消息接收失败且你可以重试。

而且你可以结合这个机制,在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么可以进行重发。

事务机制和confirm机制最大的不同在于:

事务机制是同步的,你提交一个事务之后会阻塞在那儿。

但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调你的一个接口通知你这个消息接收到了。

所以一般在生产者避免数据丢失,都是用confirm机制的。

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。

客户端实现生产者confirm3种方式:

1.普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm,如果服务端返回false或者在一段时间内都没返回,客户端可以进行消息重发。

channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
//消息发送失败
//...
}

2.批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务端confirm

channel.confirmSelect();
for(inti=0;i<batchCount;++i){
channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
//消息发送失败
//...
}

3.异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后客户端会回调这个方法。

SortedSet<Long>confirmSet=Collections.synchronizedSortedSet(newTreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(newConfirmListener(){
publicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{
if(multiple){
confirmSet.headSet(deliveryTag+1).clear();
}else{
confirmSet.remove(deliveryTag);
}
}
publicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{
System.out.println("Nack,SeqNo:"+deliveryTag+",multiple:"+multiple);
if(multiple){
confirmSet.headSet(deliveryTag+1).clear();
}else{
confirmSet.remove(deliveryTag);
}
}
});
while(true){
longnextSeqNo=channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}

RabbitMQ弄丢了数据

就是RabbitMQ自己弄丢了数据,这个必须开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己宕机,恢复之后会自动读取之前存储的数据,一般数据不会丢。

除非极其罕见的是,RabbitMQ还没持久化,自己就宕机,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤

  • 创建queue的时候将其设置为持久化。这样就可以保证RabbitMQ持久化queue的元数据,但是它是不会持久化queue里的数据。
  • 第二个是发送消息的时候将消息的deliveryMode设置为2。就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ哪怕是宕机,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

注意:

虽然给RabbitMQ开启了持久化机制,但是也有可能这个消息写到了RabbitMQ中,但还没来得及持久化到磁盘上RabbitMQ就发生宕机,便会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以若真是在持久化到磁盘之前,RabbitMQ发生了宕机,数据丢了,生产者收不到ack,也可以自己重发的。

消费端弄丢了数据

RabbitMQ如果丢失了数据,主要是因为消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,RabbitMQ认为都消费了,这数据就丢了。

这个时候得用RabbitMQ提供的ack机制。

简单来说,就是必须关闭RabbitMQ的自动ack,通过一个api来调用ack就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。

这样的话,如果还没处理完就没有ack了。那么RabbitMQ就认为还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

为了保证消息从队列中可靠地到达消费者,RabbitMQ提供了消息确认机制。

消费者在声明队列时,可以指定noAck参数,当noAck=falseRabbitMQ会等待消费者显式发回ack信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ会在队列中立即删除它。

image.png

Kafka

消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况就是

当消费到了这个消息,然后消费者那边自动提交了offset,让Kafka以为已经消费好了这个消息,但其实你才刚准备处理这个消息,还没处理自己就宕机,此时这条消息就丢失了。

这其实跟RabbitMQ大同小异。都知道Kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,便可以保证数据不会丢失了。

但是此时确实还是可能会有重复消费,比如刚处理完,还没提交offset,结果自己宕机,此时肯定会重复消费一次,那再自行保证幂等性就解决。

Kafka弄丢了数据

这是比较常见的一个场景,当Kafka某个broker宕机,然后重新选举partitionleader

要是此时其他的follower刚好还有些数据没有同步,结果此时leader也宕机了,然后选举某个followerleader之后就少了一些数据便会丢了一些数据的情况。

所以此时一般是要求起码设置如下4个参数:

  • 给topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本。
  • 在Kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,这样才能确保leader挂了还有一个follower吧。
  • producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了
  • producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

生产者不会弄丢数据

如果按照上述的思路设置了acks=all,一定不会丢的前提是:

leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

RocketMQ

消息丢失的场景

  • 生产者发送消息到MQ有可能丢失消息
  • MQ收到消息后写入硬盘可能丢失消息
  • 消息写入硬盘后,硬盘坏了丢失消息
  • 消费者消费MQ也可能丢失消息
  • 整个MQ节点挂了丢失消息

保证生产者发送消息时不丢失

解决发送时消息丢失的问题可以采用RocketMQ自带的事物消息机制

事物消息原理:

首先生产者会发送一个half消息(对原始消息的封装),该消息对消费者不可见的。

MQ通过ack机制返回消息接受状态,生产者执行本地事务并且返回给MQ一个状态(Commit、RollBack等),

如果是Commit的话MQ就会把消息给到下游。

如果是RollBack的话就会丢弃该消息。

如果为UnKnow的话会过一段时间回查本地事务状态,默认回查15次,一直是UnKnow状态的话就会丢弃此消息。

先发一个half消息的作用就是先判断下MQ有没有问题,服务正不正常。

保证MQ收到消息后写入硬盘不丢失

数据存盘绕过缓存,改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘,一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了。

保证消息写入硬盘后,硬盘坏了不丢失

为了保证磁盘损坏导致丢失数据,RocketMQ采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障导致数据丢失。

Master节点挂了之后DLedger登场

  • 接管MQcommitLog
  • 选举从节点
  • 文件复制uncommited状态多半从节点收到之后改为commited

保证消费者消费MQ不丢失

  • 如果是网络问题导致的消费失败可以进行重试机制,默认每条消息重试16次
  • 多线程异步消费失败,MQ认为已经消费成功但是实际上对于业务逻辑来说消息是没有落地的,解决方案就是按照mq官方推荐的先执行本地事务再返回成功状态。

保证整个MQ节点宕机数据不丢失

这种极端情况可以消息发送失败之后先存入本地,例如放到缓存中,另外启动一个线程扫描缓存的消息去重试发送。



目录
相关文章
|
1月前
|
监控 安全 中间件
中间件在数据传输的可靠性
中间件保障数据传输可靠性,关注数据完整性(通过校验和验证)、一致性(一致性协议,处理并发控制)、错误处理(重试、故障转移)、安全性(加密、认证、访问控制)、性能优化(减少延迟、提高吞吐量)及监控日志,确保分布式系统中数据的稳定、安全传输。
32 4
|
1月前
|
消息中间件 存储 监控
|
1月前
|
网络协议 算法 网络性能优化
|
1月前
|
消息中间件 存储 运维
|
13天前
|
消息中间件 Kafka API
深入解析Kafka消息传递的可靠性保证机制
深入解析Kafka消息传递的可靠性保证机制
16 0
|
1月前
|
中间件 网络性能优化
中间件数据传输重传机制
中间件数据传输重传机制保障分布式系统中数据的可靠传输,关键点包括确认应答(发送方等待接收方ACK)、超时重传(设定数据包超时时间)、序列号与窗口控制(有序重组及提高效率)、流量与拥塞控制(避免接收方缓冲区溢出和网络拥塞)、错误检测(使用校验和等检测并重传错误数据包)、日志重试策略(记录失败信息并动态调整策略)以及备份容错(使用备份服务器保证数据可用性)。这些机制确保数据在复杂网络环境下的完整性和一致性。
26 3
|
9月前
|
消息中间件 存储 Kafka
如何保证MQ中消息的可靠性传输?
如何保证MQ中消息的可靠性传输?
83 1
|
1月前
|
消息中间件 存储 负载均衡
【mq】如何保证消息可靠性
【mq】如何保证消息可靠性
87 0
|
11月前
|
消息中间件
如何保证消息的可靠性,避免消息丢失
如何保证消息的可靠性,避免消息丢失
62 0
|
1月前
|
缓存
流量控制&可靠传输机制&停止-等待协议
流量控制&可靠传输机制&停止-等待协议
33 0