【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理

简介: 【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理

1. 消费重试

消息重试是指消费者在消费某条消息失败之后,RocketMQ服务端会根据重试策略重新消费该消息,若超过最大重试次数还未消费成功则不在进行重新消费,而是直接将该消息发送到死信队列中。

1.1. 消费重试应用场景

消息重试主要是为了解决偶发性的消息消费失败导致的消费完整性问题,这些消费失败的原因包括业务处理逻辑的问题,网络抖动问题。

消费重试应用场景主要有两个:

  1. 业务处理失败,且失败的原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
  2. 消息失败的原因是偶发性的,比如由于网络抖动,消费者消费时宕机等偶发性的问题导致的失败,后续的消息大概率会消费成功。

不要把消息失败来作为条件判断的结果分流,也不要通过使用消息失败来对处理速率限流。

1.2. 消费重试的原理

消费重试的状态机如下图所示:会重试的消息可能会经历如下四种状态。

  1. Ready: 已就绪状态,消息在RocketMQ服务端中准备就绪,可以被消费者消费
  2. Inflight:处理中状态,消息正在被消费者获取,处于消费中还没返回消费结果的状态。
  3. Commit: 提交状态:消息被消费者消费成功,消费者返回成功响应,消息会结束重试。
  4. Wait  Retry:  待重试状态:PushComsumer独有的状态,当消费者消费失败或者消费超时,会触发消费重试机制。如果当前重试次数未达到最大重试次数,则该消息会变成待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  5. DLQ:死信队列:当消息消费失败,并且消费重试的次数超过最大重试次数(默认是16次)之后,RocketMQ服务端会结束该消息的重试,并且将该消息直接发送到死信队列中。
1.2.1. 消息重试触发的条件
  1. 消费失败:

当消息消费失败就会触发消费重试,即消费者没有向RocketMQ服务端返回offset的情况下都被认为是消费失败。都会触发消费重试。

对应的代码没有返回 CONSUME_SUCCESS 的状态是:

// 4.创建一个回调函数
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      // 5.处理消息
      for (MessageExt msg : msgs) {
        System.out.println(msg);
        System.out.println("收到的消息内容:" + new String(msg.getBody()));
      }
      // 1. 消费监听返回null则会消费重试
      return null;
        //2.消费监听返回RECONSUME_LATER也会消费重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    });
  1. 消息处理超时,包括在PushConsumer中排队超时。

1.3. 消费重试次数

RocketMQ  会为每个消费组都设置一个Topic名称为"%RETRY%+consumerGroup"的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费者组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而消费失败的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重试间隔,随着重试次数的增多,重试间隔也会越来越大。

RocketMQ对于重试消息的处理是先保存至Topic名为 “SCHEDULE_TOPIC_XXXX” 的延迟队列,后台定时任务按照对应的时间进行Delay后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。

与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

2. 死信队列

前面说某个消息被重试超过最大重试次数16次之后,则会被直接发送到死信队列中。也就是说死信队列用来存放的是无法被正常消费的消息。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),  将存放死信消息的队列称为死信队列(Dead-Letter  Queue)。可以使用Console控制台对死信队列里的消息进行重发来使得消费者可以进行重新消费。

死信队列具备如下特点:

  1. RocketMQ会自动为需要死信队列的消费者组创建死信队列。
  2. 死信队列与消费者组对应,死信队列中包含该消费者组所有相关Topic的死信消息。
  3. 死信队列中消息的有效期与正常消息相同,默认48小时。
  4. 若要消费死信队列中的消息,需要在控制台将死信队列的权限设置为6,即可读可写。

3. 消息幂等问题的出现

幂等的定义:幂等性指的是多次操作造成的结果是一致的。在http接口中查询操作是幂等的,

新增操作:非幂等的,每次都会插入新数据

更新操作:幂等的,对同样的数据进行修改

删除操作:根据id删除是幂等的。

那么,非幂等的操作如何保证幂等性呢?

消息队列中,很可能会存在一条消息被重复发送,或者一条消息被多个消费者消费。对于像用户注册等非幂等操作,就需要做幂等性保证。可以将情况概况为如下几种:

  1. 生产者重复发送消息:由于网络抖动,导致生产者没有收到broker的ack消息而重发消息,从而造成消息队列中消息重复。
  2. 消费者重复消费消息:由于网络抖动,导致消费者没有返回ack给broker,导致消费者重复消费。
  3. rebalance时的重复消费:由于网络抖动,在rebalance重分配时也可能会出现消费者重复消费某条消息的情况。

4. 如何保证幂等性消费呢?

  1. mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
  2. 使用Redis或zk的分布式锁(主流的方案)

比如在创建订单场景下,我们在发送消息的时候传入orderId作为业务唯一ID。当消息重复发送或者重复消息的时候可以根据订单ID 来做一个逻辑判断。

为了防止两个消费者同时消费相同重复消息的情况,这时候可以在OderId上加上分布式锁,保证同一时间内相同的消息只能有一个消费者消费。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
ly~
|
3月前
|
消息中间件 存储 监控
如何查看 RocketMQ 消息的重试次数和时间间隔?
RocketMQ消息重试次数和时间间隔可通过查看消费者和Broker日志、使用管理控制台的监控页面和消息查询功能,或通过分析消费者代码和RocketMQ客户端库代码等方式获取。日志中常有消费失败重试的明确记录,控制台可监控消费情况推断重试状态,代码分析则适合技术用户深入了解。
ly~
291 3
|
15天前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
115 2
|
15天前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
80 0
|
3月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
232 6
|
4月前
|
消息中间件 JSON Java
|
5月前
|
消息中间件 存储 负载均衡
|
4月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
5月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
151 2
|
5月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
57 2
|
4月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
111 0