幂等解决方案集合(二)消息幂等

简介: msgId 一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同 msgId 的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

问题描述


消息队列的消息幂等性,主要是由 MQ 重试机制引起的。


我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。


举个例子,一个消息 M 发送到了消息中间件,消息投递到了消费程序 A,A 接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。然而这种可靠的特性导致,消息可能被多次地投递


再举个例子,程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。


解决方案


基于业务表的去重


假如我们的逻辑是这样的:


select * from t_order where order_no = 'order123'
if(order  != null) {
    return ;//消息重复,直接返回
}


那么在并下情况下可能会有问题(当业务还没处理完,此时并发的请求进来了,则会穿透掉检查的挡板),所以要考虑原子性问题。


解决的办法在我们上一篇文章中也有介绍到,即使用:


  • 悲观锁(SELECT ... FOR UPDATE
  • 乐观锁


但无论是 select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。


基于消息表+本地事务


我们以 RocketMQ 作为消息中间件为例, RocketMQ 的 文档 中描述道:


RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)”


“msgId 一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同 msgId 的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。”


根据文档的描述我们得到了第一种解决方案,即通过去重表的方法,这点在上一篇文章 《幂等解决方案集合(一)》 中已有所描述,需要注意的是:


“在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。”


具体来说,我们可以这样做:在数据库中增加一个消息消费记录表,把业务操作和这个消息插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。


  1. 开启事务
  2. 插入消息表(处理好主键冲突的问题)
  3. 更新订单表(原消费逻辑)
  4. 提交事务


但是这里有它的局限性


  1. 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。


  1. 数据库的数据必须是在一个库,跨库无法解决


使用 Exactly-Once 投递语义收发消息


Exactly-Once 投递语义


以下引用自阿里云的文档:


Exactly-Once是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。”


Exactly-Once语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。”


“业界对于Exactly-Once投递语义存在很大的争议,很多人会拿出“FLP 不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的 Exactly-Once 语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。”


“如果您要实现一条消息的消费结果只能在业务系统中生效一次,您需要解决的只是如何保证同一条消息的消费幂等问题。消息队列RocketMQ版的Exactly-Once语义就是解决业务中最常见的一条消息的消费结果(消息在消费端计算处理的结果)在数据库系统中有且仅生效一次的问题。”


典型使用场景


“在电商系统中,上游实时计算模块发布商品价格变更的信息,异步通知到下游商品管理模块进行价格变更。此时,需要保证每一条信息的消费幂等,即重复的价格变更信息只会生效一次,这样便不会发生价格多次重复修改的情况,确保实现了消息消费的幂等。”


50.jpg


操作步骤


1 添加依赖


消息队列 RocketMQ 版的 ExactlyOnceConsumer 在客户端 SDK ons-client-ext-1.8.4.Final 中发布,若要使用 Exactly-Once 投递语义,需在应用中依赖该 SDK。另外,ExactlyOnceConsumer 基于 Spring 实现了通过注解@MQTransaction 开启 Exactly-Once 消费的方式,因此还需要在应用中增加 Spring 3.0 以上版本的依赖。


完整的依赖内容如下所示。


<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client-ext</artifactId>
    <version>1.8.4.Final</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring-version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>${spring-version}</version>
</dependency>


2 在用于存储消息消费结果的数据库中创建 transaction_record 表 (注意:存储消息消费结果的数据库系统必须支持本地事务)


若要使用消息队列 RocketMQ 版的 Exactly-Once 投递语义,您需要在业务处理结果持久化的数据库中创建一张 transaction_record 表,保证此表与存储业务处理结果的表在同一个数据库内,且该数据库支持本地事务。目前,消息队列 RocketMQ 版的 Exactly-Once 投递语义支持您的业务访问 MySQL 和 SQLServer 两种类型的数据源。这两种类型的数据源下 transaction_record 表的建表语句如以下所示 (mysql)。


CREATE TABLE `transaction_record` (
  `consumer_group` varchar(128) NOT NULL DEFAULT '',
  `message_id` varchar(255) NOT NULL DEFAULT '',
  `topic_name` varchar(255) NOT NULL DEFAULT '',
  `ctime` bigint(20) NOT NULL,
  `queue_id` int(11) NOT NULL,
  `offset` bigint(20) NOT NULL,
  `broker_name` varchar(255) NOT NULL DEFAULT '',
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`),
  UNIQUE KEY `message_id_unique_key` (`message_id`),
  KEY `ctime_key` (`ctime`),
  KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


3 在消息生产端使用 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性设置打开 Exactly-Once 投递语义


/**
 * TestExactlyOnceProducer 启动
 * 通过 PropertyKeyConst.EXACTLYONCE_DELIVERY 开启 exactly-once 投递语义。
 */
public class TestExactlyOnceProducer {
    public static void main(String[] args) {
        Properties producerProperties = new Properties();
        producerProperties.setProperty(PropertyKeyConst.GROUP_ID,"{GROUP_ID}");
        producerProperties.setProperty(PropertyKeyConst.AccessKey,"{accessKey}");
        producerProperties.setProperty(PropertyKeyConst.SecretKey,"{secretKey}");
        producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR,"{NAMESRV_ADDR}");
        producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY,"true");
        Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties);
        producer.start();
        System.out.println("Producer Started");
        for (int i = 0; i < 10; i++) {
            Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes());
            try {
                SendResult sendResult = producer.send(message);
                assert sendResult != null;
                System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId());
            } catch (ONSClientException e) {
                System.out.println("发送失败");
                //出现异常意味着发送失败,为避免消息丢失,建议缓存该消息然后进行重试。
            }
        }
        producer.shutdown();
    }
}


4 在消息消费端创建 ExactlyOnceConsumer,并开启 Exactly-Once 的消费模式。

使用消息队列 RocketMQ 版的 Exactly-Once 投递语义进行消费时,消费端需要使用 ExactlyOnceONSFactory 调用 createExactlyOnceConsumer 接口创建 ExactlyOnceConsumer,然后通过使用 ExactlyOnceConsumer 进行 Exactly-Once 模式的消费。


在使用 ExactlyOnceConsumer 时,需要注意以下两点:


  • 创建 ExactlyOnceConsumer 时,可以通过设置 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性打开或者关闭 Exactly-Once 投递语义。ExactlyOnceConsumer 默认打开 Exactly-Once 投递语义。
  • 使用 ExactlyOnceConsumer 消费时,在消息监听器 MessageListener 的 consume 方法中,您的业务处理逻辑需要使用 MQDataSource 对数据库的进行读写操作。

您可以选择以下任一方式在消费端开启 Exactly-Once 投递语义:

  • 以非 Spring 方式开启 Exactly-Once 投递语义
  • MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性
  • MessageListener 中通过 Springboot 注解方式实现开启 Exactly-Once 投递语义
  • MessageListener 中通过 MyBatis 方式实现 Exactly-Once 投递语义

以下示例为:MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性,其他示例代码,请参考阿里云文档:

https://help.aliyun.com/document_detail/102777.html


/**
 * TestExactlyOnceListener 实现。
 * 实现了一个事务中对多个业务表进行更新的场景,保证事务内的操作有且仅有一次生效。
 */
public class SimpleTxListener implements MessageListener {
    private MQDataSource dataSource;
    public SimpleTxListener() {
        this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}");
    }
        @Override
    public Action consume(Message message, ConsumeContext context) {
        Connection connection = null;
        Statement statement = null;
        try {
            /**
             *  在此处对消费到的消息 message 做业务计算处理,使用 MQDataSource 将处理结果持久化到数据库系统。
             *  此范例演示了在一个事务内对多个表进行更新的业务场景,Exactly-Once 投递语义保证事务内的操作有且仅有一次。
             *  实际的业务处理按照:接收消息->业务处理->结果持久化的流程来设计。
             */
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)",
                new String(message.getBody()), message.getMsgID(), System.currentTimeMillis());
            String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST");
            statement = connection.createStatement();
            statement.execute(insertSql);
            statement.execute(updateSql);
            connection.commit();
            System.out.println("consume message :" + message.getMsgID());
            return Action.CommitMessage;
        } catch (Throwable e) {
            try {
                connection.rollback();
            } catch (Exception e1) {
            }
            System.out.println("consume message fail");
            return Action.ReconsumeLater;
        } finally {
            if (statement != null) {
                try {
                    statement.close();
                } catch (Exception e) { }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception e) { }
            }
        }
    }                                                      
}


总结


你可以看到,这种方式是阿里云做了封装,但理论上跟上一个方案 基于消息表+本地事务 是一样的。


利用 Redis


最后这种方案其实思路跟 基于消息表+本地事务 类似,只不过判断是否重复消息用 Redis 分布式锁实现,关于具体实现步骤也跟我们上一篇文章类似,请参考上一篇文章。


最后


当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入 死信队列,最终进入到数据库中,接着设置定时 job 查询这些数据,进行手动补偿。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
SQL 缓存 NoSQL
接口的幂等性设计和防重保证,详细分析幂等性的几种实现方法
本篇文章详细说明了幂等性,解释了什么是幂等性,幂等性的使用场景,讨论了幂等和防重的概念。分析了幂等性的情况以及如何设计幂等性服务。阐述了幂等性实现防重的几种策略,包括乐关锁,防重表,分布式锁,token令牌以及支付缓冲区。
6988 0
接口的幂等性设计和防重保证,详细分析幂等性的几种实现方法
|
2月前
|
消息中间件 存储 NoSQL
解决MQ下单消息重复消费幂等机制详解
【11月更文挑战第20天】在分布式系统中,消息队列(Message Queue, MQ)作为一种常用的中间件,用于在不同系统或服务之间异步传输消息。MQ的应用场景广泛,如订单处理、日志收集、系统解耦等。然而,MQ的使用也伴随着一些挑战,其中消息重复消费是一个常见问题。特别是在下单场景中,如果消息被重复消费,可能会导致订单被重复创建或处理,从而引发一系列业务问题。
92 6
|
14天前
|
缓存 数据库 索引
所有的接口都需要幂等吗?
幂等性(Idempotency)源自数学,指多次执行某操作结果不变。在计算机科学中,它确保在网络通信、重试机制和并发操作下系统状态一致。常见应用如HTTP方法中的GET、PUT、DELETE及数据库操作中的SELECT、UPDATE、DELETE等。实现幂等性可通过唯一请求ID、数据库约束、状态检查等方法。并非所有业务都需要幂等处理,需根据业务逻辑、系统容错策略及性能复杂度权衡。
30 0
|
5月前
|
SQL 前端开发 NoSQL
关于幂等:大厂如何解决幂等问题?
为确保分布式系统中接口的幂等性,防止重复下单及更新数据时出现ABA问题,可采取以下措施:首先,每个请求需具备唯一标识符,如订单ID,确保同一订单ID仅能成功处理一次。其次,处理请求后需记录状态标识,如在数据库中记录支付流水。接收请求时检查是否已处理,利用数据库的唯一性约束防止重复操作。例如,支付前插入支付流水记录,若订单ID已存在,则阻止重复支付。此外,为解决ABA问题,可在订单表中增加版本号字段,更新数据时需验证版本号一致性并同步递增版本号,确保数据正确性及更新操作的幂等性。
|
消息中间件 缓存 NoSQL
如何实现消费幂等 ?
这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:**消费幂等**。
如何实现消费幂等 ?
|
消息中间件 存储 Kafka
MQ保证消息幂等机制
MQ保证消息幂等机制
274 0
|
8月前
|
安全
[AIGC] 用幂等性解决重复消息问题
[AIGC] 用幂等性解决重复消息问题
|
8月前
|
算法 安全 数据库
幂等(使用场景,详细介绍)
幂等(使用场景,详细介绍)
|
8月前
|
存储 缓存 数据库
接口幂等有哪些实现方式
接口幂等有哪些实现方式
56 0
|
8月前
|
API
什么是接口幂等
什么是接口幂等
224 0

热门文章

最新文章