【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理

简介: 【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理

1. 消费重试

消息重试是指消费者在消费某条消息失败之后,RocketMQ服务端会根据重试策略重新消费该消息,若超过最大重试次数还未消费成功则不在进行重新消费,而是直接将该消息发送到死信队列中。

1.1. 消费重试应用场景

消息重试主要是为了解决偶发性的消息消费失败导致的消费完整性问题,这些消费失败的原因包括业务处理逻辑的问题,网络抖动问题。

消费重试应用场景主要有两个:

  1. 业务处理失败,且失败的原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
  2. 消息失败的原因是偶发性的,比如由于网络抖动,消费者消费时宕机等偶发性的问题导致的失败,后续的消息大概率会消费成功。

不要把消息失败来作为条件判断的结果分流,也不要通过使用消息失败来对处理速率限流。

1.2. 消费重试的原理

消费重试的状态机如下图所示:会重试的消息可能会经历如下四种状态。

  1. Ready: 已就绪状态,消息在RocketMQ服务端中准备就绪,可以被消费者消费
  2. Inflight:处理中状态,消息正在被消费者获取,处于消费中还没返回消费结果的状态。
  3. Commit: 提交状态:消息被消费者消费成功,消费者返回成功响应,消息会结束重试。
  4. Wait  Retry:  待重试状态:PushComsumer独有的状态,当消费者消费失败或者消费超时,会触发消费重试机制。如果当前重试次数未达到最大重试次数,则该消息会变成待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  5. DLQ:死信队列:当消息消费失败,并且消费重试的次数超过最大重试次数(默认是16次)之后,RocketMQ服务端会结束该消息的重试,并且将该消息直接发送到死信队列中。
1.2.1. 消息重试触发的条件
  1. 消费失败:

当消息消费失败就会触发消费重试,即消费者没有向RocketMQ服务端返回offset的情况下都被认为是消费失败。都会触发消费重试。

对应的代码没有返回 CONSUME_SUCCESS 的状态是:

// 4.创建一个回调函数
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      // 5.处理消息
      for (MessageExt msg : msgs) {
        System.out.println(msg);
        System.out.println("收到的消息内容:" + new String(msg.getBody()));
      }
      // 1. 消费监听返回null则会消费重试
      return null;
        //2.消费监听返回RECONSUME_LATER也会消费重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    });
  1. 消息处理超时,包括在PushConsumer中排队超时。

1.3. 消费重试次数

RocketMQ  会为每个消费组都设置一个Topic名称为"%RETRY%+consumerGroup"的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费者组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而消费失败的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重试间隔,随着重试次数的增多,重试间隔也会越来越大。

RocketMQ对于重试消息的处理是先保存至Topic名为 “SCHEDULE_TOPIC_XXXX” 的延迟队列,后台定时任务按照对应的时间进行Delay后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。

与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

2. 死信队列

前面说某个消息被重试超过最大重试次数16次之后,则会被直接发送到死信队列中。也就是说死信队列用来存放的是无法被正常消费的消息。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),  将存放死信消息的队列称为死信队列(Dead-Letter  Queue)。可以使用Console控制台对死信队列里的消息进行重发来使得消费者可以进行重新消费。

死信队列具备如下特点:

  1. RocketMQ会自动为需要死信队列的消费者组创建死信队列。
  2. 死信队列与消费者组对应,死信队列中包含该消费者组所有相关Topic的死信消息。
  3. 死信队列中消息的有效期与正常消息相同,默认48小时。
  4. 若要消费死信队列中的消息,需要在控制台将死信队列的权限设置为6,即可读可写。

3. 消息幂等问题的出现

幂等的定义:幂等性指的是多次操作造成的结果是一致的。在http接口中查询操作是幂等的,

新增操作:非幂等的,每次都会插入新数据

更新操作:幂等的,对同样的数据进行修改

删除操作:根据id删除是幂等的。

那么,非幂等的操作如何保证幂等性呢?

消息队列中,很可能会存在一条消息被重复发送,或者一条消息被多个消费者消费。对于像用户注册等非幂等操作,就需要做幂等性保证。可以将情况概况为如下几种:

  1. 生产者重复发送消息:由于网络抖动,导致生产者没有收到broker的ack消息而重发消息,从而造成消息队列中消息重复。
  2. 消费者重复消费消息:由于网络抖动,导致消费者没有返回ack给broker,导致消费者重复消费。
  3. rebalance时的重复消费:由于网络抖动,在rebalance重分配时也可能会出现消费者重复消费某条消息的情况。

4. 如何保证幂等性消费呢?

  1. mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
  2. 使用Redis或zk的分布式锁(主流的方案)

比如在创建订单场景下,我们在发送消息的时候传入orderId作为业务唯一ID。当消息重复发送或者重复消息的时候可以根据订单ID 来做一个逻辑判断。

为了防止两个消费者同时消费相同重复消息的情况,这时候可以在OderId上加上分布式锁,保证同一时间内相同的消息只能有一个消费者消费。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
21天前
|
消息中间件 JSON Java
|
2月前
|
消息中间件 存储 网络协议
消息中间件RabbitMQ---概述和概念 【一】
该文章提供了对消息中间件RabbitMQ的全面概述,包括其核心概念、工作原理以及与AMQP和JMS的关系。
消息中间件RabbitMQ---概述和概念 【一】
|
9天前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 存储 负载均衡
|
15天前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
40 0
|
2月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
53 2
|
2月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
31 2
|
2月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
56 0
|
3月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 API 数据安全/隐私保护
就软件研发问题之RocketMQ ACL 2.0加强集群组件间访问控制的问题如何解决
就软件研发问题之RocketMQ ACL 2.0加强集群组件间访问控制的问题如何解决
下一篇
无影云桌面