RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)

简介: RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)

一、RabbitMQ相关概念


2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message

RabbitMQ 是一个消息中间件:它接收消息并且转发,就类似于一个快递站,卖家把快递通过快递站,送到我们的手上,MQ也是这样,接收并存储消息,再转发。


二、RabbitMQ消息丢失的三种情况

image.png

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了

第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。


三、RabbitMQ消息丢失解决方案

image.png

1.针对生产者

方案1 :开启RabbitMQ事务可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。


// 开启事务
channel.txSelect
try {
      // 这里发送消息
} catch (Exception e) {
      channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit

缺点:RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

方案2: 使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。


//开启confirm
    channel.confirm();
    //发送成功回调
    public void ack(String messageId){
    }
    // 发送失败回调
    public void nack(String messageId){
        //重发该消息
    }

2.针对RabbitMQ

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

image.png

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式

1)同步至所有的

2)同步最多N个机器

3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat"

'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像 rabbitmqctl set_policy ha-nodes "^nodes."

'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。

image.png

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

image.png

3.针对消费者

方案一:ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

使用rabbitmq提供的ack机制,服务端首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。才把消息从内存删除。

这样就解决了,即使一个消费者出了问题,但不会同步消息给服务端,会有其他的消费端去消费,保证了消息不丢的case。


四、总结

image.png

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88363 0
|
6月前
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3分钟白话RocketMQ系列—— 如何保证消息不丢失
1963 0
|
7月前
|
消息中间件 存储 Kafka
如何保证MQ消息队列的高可用?
如何保证MQ消息队列的高可用?
213 0
|
5月前
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
|
1月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3358 2
|
1月前
|
消息中间件 SQL 容灾
深度剖析 RocketMQ 5.0,消息进阶:如何支撑复杂业务消息场景?
本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。
108173 287
|
1月前
|
消息中间件 Cloud Native 物联网
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
本文主要介绍业务消息的应用解耦场景,具体解耦什么? RocketMQ 在业务消息场景的基础特性。业界那么多消息队列能实现应用解耦,RocketMQ 在基础特性上有哪些增强?
125285 2
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
|
1月前
|
消息中间件 存储 Cloud Native
深度剖析 RocketMQ 5.0,架构解析:云原生架构如何支撑多元化场景?
了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。
140059 2
|
2月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
370 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
3月前
|
消息中间件 存储 缓存
【面试问题】MQ 如何保证消息的顺序性?
【1月更文挑战第27天】【面试问题】MQ 如何保证消息的顺序性?

热门文章

最新文章