如何保证 RocketMQ 不丢失消息

简介: 如何保证 RocketMQ 不丢失消息

消息的发送流程


一条消息从生产到被消费,将会经历三个阶段:


20210327104432414.jpg

  • 生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker
  • 存储阶段,消息将会存储在 Broker 端磁盘中
  • 消息阶段, Consumer 将会从 Broker 拉取消息


以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。


生产阶段

生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。


RocketMQ 发送消息示例代码如下:

20210327104649197.jpg

send 方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功。

消息发送成功仅代表消息已经到了 Broker 端,Broker 在不同配置下,可能会返回不同响应状态:


  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE


引用官方状态说明:

20210327104916463.jpg

上图中不同 broker 端配置将会在下文详细解释


另外 RocketMQ 还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。

20210327105045436.jpg


异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。


不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如下:


20210327105140598.jpg

Broker 存储阶段

默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。


这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。


若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。


修改 Broker 端配置如下:


默认情况为 ASYNC_FLUSH flushDiskType = SYNC_FLUSH


若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者。


集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。


默认方式下,消息写入 master成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。


注:master 配置:flushDiskType = SYNC_FLUSH


此时若 master 突然宕机且不可恢复,那么还未复制到 slave的消息将会丢失。


为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master节点将会同步等待 slave 节点复制完成,才会返回确认响应。


异步复制与同步复制区别如下图:

20210327173623625.jpg


注: 大家不要被上图误导,broker master 只能配置一种复制方式,上图只为解释同步复制的与异步复制的概念。


Broker master 节点 同步复制配置如下:


默认为 ASYNC_MASTER brokerRole=SYNC_MASTER


如果 slave节点未在指定时间内同步返回响应,生产者将会收到SendStatus.FLUSH_SLAVE_TIMEOUT 返回状态。


小结


结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker 需要采用如下配置:


20210327173822478.jpg

同时这个过程我们还需要生产者配合,判断返回状态是否是 SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。


虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。


消费阶段

消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。


如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。


消息消费的代码如下:


20210327174000479.jpg

以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则我们需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。


总结


RocketMQ 不丢消息处理办法,跟kafka,两者解决思路是一样的,区别就是参数配置不一样而已。


所以下一次,面试官再问你 XX 消息队列如何保证不丢消息?如果你没用过这个消息队列,也不要哭,微笑面对他,从容给他分析那几步会丢失,然后大致解决思路。


虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
627 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
316 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
473 0
RocketMQ Schema——让消息成为流动的结构化数据
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
940 1
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
651 0
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
477 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
498 1
|
消息中间件 存储 Java
10 张图告诉你 RocketMQ 是怎样保存消息的
10 张图告诉你 RocketMQ 是怎样保存消息的
198 0
10 张图告诉你 RocketMQ 是怎样保存消息的
|
消息中间件 存储 uml
5 张图带你彻底理解 RocketMQ 轨迹消息
5 张图带你彻底理解 RocketMQ 轨迹消息
406 0
5 张图带你彻底理解 RocketMQ 轨迹消息
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67605 2
3 张图带你彻底理解 RocketMQ 事务消息