【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理

简介: 【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理

1. 消费重试

消息重试是指消费者在消费某条消息失败之后,RocketMQ服务端会根据重试策略重新消费该消息,若超过最大重试次数还未消费成功则不在进行重新消费,而是直接将该消息发送到死信队列中。

1.1. 消费重试应用场景

消息重试主要是为了解决偶发性的消息消费失败导致的消费完整性问题,这些消费失败的原因包括业务处理逻辑的问题,网络抖动问题。

消费重试应用场景主要有两个:

  1. 业务处理失败,且失败的原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
  2. 消息失败的原因是偶发性的,比如由于网络抖动,消费者消费时宕机等偶发性的问题导致的失败,后续的消息大概率会消费成功。

不要把消息失败来作为条件判断的结果分流,也不要通过使用消息失败来对处理速率限流。

1.2. 消费重试的原理

消费重试的状态机如下图所示:会重试的消息可能会经历如下四种状态。

  1. Ready: 已就绪状态,消息在RocketMQ服务端中准备就绪,可以被消费者消费
  2. Inflight:处理中状态,消息正在被消费者获取,处于消费中还没返回消费结果的状态。
  3. Commit: 提交状态:消息被消费者消费成功,消费者返回成功响应,消息会结束重试。
  4. Wait  Retry:  待重试状态:PushComsumer独有的状态,当消费者消费失败或者消费超时,会触发消费重试机制。如果当前重试次数未达到最大重试次数,则该消息会变成待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  5. DLQ:死信队列:当消息消费失败,并且消费重试的次数超过最大重试次数(默认是16次)之后,RocketMQ服务端会结束该消息的重试,并且将该消息直接发送到死信队列中。
1.2.1. 消息重试触发的条件
  1. 消费失败:

当消息消费失败就会触发消费重试,即消费者没有向RocketMQ服务端返回offset的情况下都被认为是消费失败。都会触发消费重试。

对应的代码没有返回 CONSUME_SUCCESS 的状态是:

// 4.创建一个回调函数
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      // 5.处理消息
      for (MessageExt msg : msgs) {
        System.out.println(msg);
        System.out.println("收到的消息内容:" + new String(msg.getBody()));
      }
      // 1. 消费监听返回null则会消费重试
      return null;
        //2.消费监听返回RECONSUME_LATER也会消费重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    });
  1. 消息处理超时,包括在PushConsumer中排队超时。

1.3. 消费重试次数

RocketMQ  会为每个消费组都设置一个Topic名称为"%RETRY%+consumerGroup"的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费者组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而消费失败的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重试间隔,随着重试次数的增多,重试间隔也会越来越大。

RocketMQ对于重试消息的处理是先保存至Topic名为 “SCHEDULE_TOPIC_XXXX” 的延迟队列,后台定时任务按照对应的时间进行Delay后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。

与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

2. 死信队列

前面说某个消息被重试超过最大重试次数16次之后,则会被直接发送到死信队列中。也就是说死信队列用来存放的是无法被正常消费的消息。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),  将存放死信消息的队列称为死信队列(Dead-Letter  Queue)。可以使用Console控制台对死信队列里的消息进行重发来使得消费者可以进行重新消费。

死信队列具备如下特点:

  1. RocketMQ会自动为需要死信队列的消费者组创建死信队列。
  2. 死信队列与消费者组对应,死信队列中包含该消费者组所有相关Topic的死信消息。
  3. 死信队列中消息的有效期与正常消息相同,默认48小时。
  4. 若要消费死信队列中的消息,需要在控制台将死信队列的权限设置为6,即可读可写。

3. 消息幂等问题的出现

幂等的定义:幂等性指的是多次操作造成的结果是一致的。在http接口中查询操作是幂等的,

新增操作:非幂等的,每次都会插入新数据

更新操作:幂等的,对同样的数据进行修改

删除操作:根据id删除是幂等的。

那么,非幂等的操作如何保证幂等性呢?

消息队列中,很可能会存在一条消息被重复发送,或者一条消息被多个消费者消费。对于像用户注册等非幂等操作,就需要做幂等性保证。可以将情况概况为如下几种:

  1. 生产者重复发送消息:由于网络抖动,导致生产者没有收到broker的ack消息而重发消息,从而造成消息队列中消息重复。
  2. 消费者重复消费消息:由于网络抖动,导致消费者没有返回ack给broker,导致消费者重复消费。
  3. rebalance时的重复消费:由于网络抖动,在rebalance重分配时也可能会出现消费者重复消费某条消息的情况。

4. 如何保证幂等性消费呢?

  1. mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
  2. 使用Redis或zk的分布式锁(主流的方案)

比如在创建订单场景下,我们在发送消息的时候传入orderId作为业务唯一ID。当消息重复发送或者重复消息的时候可以根据订单ID 来做一个逻辑判断。

为了防止两个消费者同时消费相同重复消息的情况,这时候可以在OderId上加上分布式锁,保证同一时间内相同的消息只能有一个消费者消费。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2天前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
10 2
|
2天前
|
消息中间件 存储 RocketMQ
【RocketMQ系列十】RocketMQ的核心概念说明
【RocketMQ系列十】RocketMQ的核心概念说明
9 1
|
2天前
|
消息中间件 存储 Java
【RocketMQ 系列三】RocketMQ集群搭建(2m-2s-sync)
【RocketMQ 系列三】RocketMQ集群搭建(2m-2s-sync)
9 1
|
5天前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
8天前
|
消息中间件 存储 缓存
RocketMQ4.2 最佳实践之集群搭建
RocketMQ4.2 最佳实践之集群搭建
|
9天前
|
消息中间件 存储 中间件
【主流技术】聊一聊消息队列 RocketMQ 的基本结构与概念
2.6Broker 代理服务器(Broker)是消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 2.7Pull Consumer 拉取式消费(Pull Consumer)是 Consumer 消费的一种类型,也是默认的类型。下游应用系统通常主动调用 Consumer 的拉消息方法从 Broke r服务器拉消息,即主动权由下游应用控制。一旦获取了批量消息,应用就会启动消费过程。
|
10天前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
20 0
|
5天前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5天前
|
消息中间件 监控 Oracle
消息队列 MQ产品使用合集之启动Namesrv节点时,遇到报错,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5天前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。