Rabbmit 重复消费的问题

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 最近遇到一个奇怪的问题,消费者在批量消费消息时,遇到该批次中出现部分重复消费导致业务异常。这些异常集中在某一时刻附近。

一、问题概述

最近遇到一个奇怪的问题,消费者在批量消费消息时,遇到该批次中出现部分重复消费导致业务异常。这些异常集中在某一时刻附近。

类似于下面这种就是消费了两次,log显示两次消费时间相差不到1mins,显然这是在同一批次消费时发生的问题

疑问:

1、消费者确认消息后,MQ 会回复ack的情况吗?

二、消息确认机制概述

确保可靠消费需求可以根据场景选择,大多数都是使用At most once


  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。


  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。


  • Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

三、幂等性解决重复消息问题?

什么是幂等性呢?

数学上的定义是:

如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。


这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。


这也就是说如果业务上的代码不会因为重复消费导致业务代码产生不同的结果。这可以称为业务上的幂等性。

3.1.1 利用数据库的唯一约束实现幂等

将账户 X 的余额加 100 元。这个操作不是幂等的,我们可以通过改造业务逻辑,让它具备幂等性。


首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。


这样的操作业务上常会使用主键列生成uuid来做唯一标识以解决分布式带来的问题。

3.1.2 为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。


比如,刚刚我们说过,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。


但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。类似于CAS。

3.1.3 记录并检查

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一 ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。


具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。


原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。


对这一点深有体会,特别是和redis相结合的时候。


说明: 只有一个消费者


比如说msg A,


  • 第一次msg A被consumer拿到了,检查幂等性之后,发现没有消费过就记录了

       不巧的是中途出问题了,数据并没有被真正的消费,中途的原因多种多样,其中比较常见的是DB问题,insert 失败了。消息就redelivery。


  • 第二次msg A 被consumer拿到,发现该笔消息已经记录过了,就直接把消息ack了,然而事实上消息并没有被真正的消费掉。这笔消息就丢失了.


比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:


  • t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”;


  • t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A 还未来得及更新消息执行状态。

这样就会导致账户被错误地增加了两次 100 元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。


对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。

四、如何解决?

4.1 channel is already closed due to ...

该种报错会导致消息重复消费,这种情况属于AP 端发给MQ broker发送ack成功,但是channel由于某种原因关闭了导致的重复消费。比较容易发现察觉到。

2020-09-07 11:10:25.855  INFO 15832 --- [ool-12-thread-1] com.navi.job.service.MainService         : 当前message:A385M|A1087031BA|2020-08-14 10:44:02|0|defect在MQ 中的index为:28
2020-09-07 11:10:25.865 ERROR 15832 --- [ool-12-thread-1] com.navi.job.service.MainService         : 
channel is already closed due to channel error; protocol method: #method<channel.close>
(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 20, class-id=60, method-id=80)
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>
(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 20, class-id=60, method-id=80)
  at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198)
  at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:312)
  at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:306)
  at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1165)
  at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
  at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
  at com.navi.job.service.MainService.lambda$sureAck$6(MainService.java:775)
  at java.util.ArrayList.forEach(ArrayList.java:1249)
  at com.navi.job.service.MainService.sureAck(MainService.java:767)
  at com.navi.job.service.MainService.subMainProc(MainService.java:363)
  at com.navi.job.service.MainCallable$1.call(MainCallable.java:70)
  at com.navi.job.service.MainCallable$1.call(MainCallable.java:67)
  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
  at java.util.concurrent.FutureTask.run(FutureTask.java)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

五、问题

为什么大部分消息队列都选择只提供 At least once 的服务质量,而不是级别更高的 Exactly once 呢?


1、若消息队列实现了exactly once,会引发的问题有:①消费端在pull消息时,需要检测此消息是否被消费,这个检测机制无疑会拉低消息消费的速度。可以预想到,随着消息的剧增,消费性能势必会急剧下降,导致消息积压;②检查机制还需要业务端去配合实现,若一条消息长时间未返回ack,消息队列需要去回调看下消费结果(这个类似于事物消息的回查机制)。这样就会增加业务端的压力,与很多的未知因素。


2、消息队列即使做到了Exactly once级别,consumer也还是要做幂等。因为在consumer从消息队列取消息这里,如果consumer消费成功,但是ack失败,consumer还是会取到重复的消息,所以消息队列花大力气做成Exactly once并不能解决业务侧消息重复的问题。

六、 消息幂等的必要性


相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
4天前
|
消息中间件 存储 NoSQL
解决MQ下单消息重复消费幂等机制详解
【11月更文挑战第20天】在分布式系统中,消息队列(Message Queue, MQ)作为一种常用的中间件,用于在不同系统或服务之间异步传输消息。MQ的应用场景广泛,如订单处理、日志收集、系统解耦等。然而,MQ的使用也伴随着一些挑战,其中消息重复消费是一个常见问题。特别是在下单场景中,如果消息被重复消费,可能会导致订单被重复创建或处理,从而引发一系列业务问题。
28 6
|
1月前
|
消息中间件 存储 Java
Kafka 如何避免重复消费?
在Apache Kafka中,避免消息的重复消费是确保数据准确处理的关键。本文详细介绍了七种避免重复消费的方法:使用消费者组、幂等生产者、事务性生产者与消费者、手动提交偏移量、外部存储管理偏移量、去重逻辑及幂等消息处理逻辑。每种方法均有其优缺点,可根据实际需求选择合适方案。结合消费者组、手动提交偏移量和幂等处理逻辑通常是有效策略,而对于高一致性要求,则可考虑使用事务性消息。
111 0
|
负载均衡 网络性能优化
EMQ如何保证消息不重复消费?
EMQ(Erlang MQTT Broker)通过以下机制来保证消息不重复消费
780 2
|
消息中间件 NoSQL Redis
消息重复消费的问题
消息重复消费的问题
|
6月前
|
消息中间件 负载均衡 Kafka
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
703 2
|
消息中间件 NoSQL Kafka
如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
为了提高应用程序的性能和可扩展性,很多应用程序开始采用消息队列(MQ)来处理消息。 MQ 可以将消息异步地发送到目的地,从而实现解耦、异步处理和流量控制等功能。 但是,MQ 也带来了一些问题,如消息重复消费和消息消费的幂等性问题。 本文将介绍 MQ 如何保证消息不被重复消费,并讨论如何保证消息消费的幂等性。
|
消息中间件 算法 关系型数据库
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
|
消息中间件 Kafka API
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(一)
|
消息中间件 Kafka
Kafka消息的重复消费问题如何解决的 ?
Kafka 通过使用消费者组(Consumer Group)来解决消息的重复消费问题。
1622 0
|
消息中间件 负载均衡 Java
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?