四、RabbitMQ如何保证消息丢失

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 四、RabbitMQ如何保证消息丢失

RabbitMQ消息丢失的情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

解决方案

一:针对生产者
方案1.开启RabbitMQ事务
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务
channel.txSelect
    try {
          // 这里发送消息
    } catch (Exception e) {
          channel.txRollback
    // 这里再次重发这条消息
    }
// 提交事务
channel.txCommit

缺点:
但是问题是,RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

方案2 使用confirm机制
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

二、针对RabbitMQ

1.消息持久化

2.设置集群镜像模式

3.消息补偿机制

第一种:消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

第二种:设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
2)普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
3)镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式:

1)同步至所有的
2)同步最多N个机器
3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像

rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:   系统的吞吐量会有所下降

第三种:消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

三、针对消费者

ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

这个使用就要使用Message acknowledgment 机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除。

这样就解决了,及时一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
机器学习/深度学习 编解码 算法
yolov1到v8的变化
yolov1到v8的变化
295 1
|
9月前
|
人工智能 自然语言处理 算法
主动式智能导购 AI 助手解决方案实践与测评
主动式智能导购 AI 助手解决方案实践与测评
|
11月前
|
存储 关系型数据库 MySQL
介绍一下MySQL的一些应用场景
【10月更文挑战第17天】介绍一下MySQL的一些应用场景
2329 0
|
12月前
|
JavaScript 前端开发 小程序
一小时入门Vue.js前端开发
本文是作者关于Vue.js前端开发的快速入门教程,包括结果展示、参考链接、注意事项以及常见问题的解决方法。文章提供了Vue.js的基础使用介绍,如何安装和使用cnpm,以及如何解决命令行中遇到的一些常见问题。
612 5
一小时入门Vue.js前端开发
|
监控 供应链 BI
ERP系统中的现金流管理与资产负债管理解析
【7月更文挑战第25天】 ERP系统中的现金流管理与资产负债管理解析
340 2
|
缓存 Java
java把InputStream流写入到文件中
java把InputStream流写入到文件中
411 0
|
安全 关系型数据库 分布式数据库
PolarDB 的安全性和合规性措施
【8月更文第27天】随着云计算技术的不断发展,企业对云数据库的安全性和合规性的需求日益增长。阿里云的 PolarDB 作为一款高度兼容 MySQL、PostgreSQL 和 Oracle 的关系型数据库服务,提供了强大的安全保护和合规性支持。本文将详细探讨 PolarDB 如何确保数据安全,并符合各种法规要求。
394 0
|
11月前
|
SQL 监控 数据库
SQL语句性能分析技巧与方法
在数据库管理中,分析SQL语句的性能是优化数据库查询、提升系统响应速度的重要步骤
|
druid Java 数据库连接
SpringBoot + Mybatis + Druid + PageHelper 实现多数据源分页
SpringBoot + Mybatis + Druid + PageHelper 实现多数据源分页
588 0
|
人工智能 NoSQL atlas
Atlas Vector Search:借助语义搜索和 AI 针对任何类型的数据构建智能应用
一切才刚刚开始,MongoDB 致力于提供优秀的开发者数据平台,助力开发者打造新一代 AI 赋能的应用
2926 2