如何实现消费幂等 ?

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:**消费幂等**。

这篇文章,我们聊聊消息队列中非常重要的最佳实践之一:消费幂等

1 基础概念

当出现 RocketMQ 消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。

例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。

如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

2 适用场景

RocketMQ 消息重复的场景如下:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。

    如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同但 Message ID 不同的消息。

  • 投递时消息重复

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,Broker 服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及消费者应用重启)

    Broker 端或客户端重启、扩容或缩容时,会触发 Rebalance ,此时消费者可能会收到少量重复消息。

3 处理策略

因为不同的 Message ID 对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。

最好的方式是以业务唯一标识作为幂等处理的关键依据,业务的唯一标识可以通过消息 Key 设置。

Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
message.setKey("ORDERID_100"); // 订单编号
SendResult sendResult = producer.send(message);

消费者收到消息时可以根据消息的 Key ,即订单号来实现消息幂等 :

consumer.registerMessageListener(new MessageListenerConcurrently() {
   
   
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
   
        for (MessageExt message : msgs) {
   
   
            String key = message.getKeys();
           // 根据业务唯一标识的Key做幂等处理
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

注意:我们也可以在消息 body 体设置业务唯一标识,使用 key 是为了编程简易。

常见的幂等策略包含:

  • 数据库去重表
  • Redis 添加标志位
  • 业务状态机判断

4 数据库去重表

举一个电商场景的例子:用户购物车结算时,系统会创建支付订单。用户支付成功后支付订单的状态会由未支付修改为支付成功,然后系统给用户增加积分。

我们可以使用 RocketMQ 事务消息的方案,该方案能够发挥 MQ 的优势:异步解耦,以及事务的最终一致性的特性。

在消费监听器逻辑里,幂等非常重要 。积分表 SQL 如下:

CREATE TABLE `t_points` (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `user_id` bigint(20) NOT NULL COMMENT '用户id',
  `order_id` bigint(20) NOT NULL COMMENT '订单编号',
  `points` int(4) NOT NULL COMMENT '积分',
  `remarks` varchar(128) COLLATE utf8mb4_bin NOT NULL COMMENT '备注',
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_order_Id` (`order_id`) USING BTREE COMMENT '订单唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

当收到订单信息后,首先判断该订单是否有积分记录,若没有记录,才插入积分记录。

就算出现极端并发场景下,订单编号也是唯一键,数据库中也必然不会存在相同订单的多条积分记录。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
   
    try {
   
   
        for (MessageExt messageExt : msgs) {
   
   
            String orderJSON = new String(messageExt.getBody(), "UTF-8");
            logger.info("orderJSON:" + orderJSON);
            OrderPO orderPO = JSON.parseObject(orderJSON, OrderPO.class);
            // 首先查询是否处理完成
            PointsPO pointsPO = pointsMapper.getByOrderId(orderPO.getId());
            if (pointsPO == null) {
   
   
                Long id = SnowFlakeIdGenerator.getUniqueId(1023, 0);
                pointsPO = new PointsPO();
                pointsPO.setId(id);
                pointsPO.setOrderId(orderPO.getId());
                pointsPO.setUserId(orderPO.getUserId());
                // 添加积分数 30
                pointsPO.setPoints(30);
                pointsPO.setCreateTime(new Date());
                pointsPO.setRemarks("添加积分数 30");
                pointsMapper.insert(pointsPO);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
   
   
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

5 Redis 添加标志位

在消费者接收到消息后,首先判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则,则执行业务逻辑,执行完成后,在缓存中添加标志位。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
   
    try {
   
   
        for (MessageExt messageExt : msgs) {
   
   
           String bizKey = messageExt.getKeys(); // 唯一业务主键
           //1. 判断是否存在标志
           if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
   
   
                     continue;
                }
              //2. 执行业务逻辑
           //TODO do business
           //3. 设置标志位
           redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
   
   
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

6 业务状态机判断

笔者曾经服务于神州专车,乘客在用户端点击立即叫车,订单服务创建订单,首先保存到数据库后,然后将订单信息同步保存到缓存中。

在订单的载客生命周期里,订单的修改操作先修改缓存,然后发送消息到 MetaQ ,订单落盘服务消费消息,并判断订单信息是否正常(比如有无乱序),若订单数据无误,则存储到数据库中。

这种设计会有小的概率,因为网络问题,消费者会收到乱序的消息。

订单状态机按顺序分别是:创建已分配司机司机已出发司机已到达司机已接到乘客已到达

当订单状态是司机已到达时,消费者收到司机已出发的消息,此时只要业务数据正确,就可以忽略旧的消息,打印相关日志即可。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
消息中间件 存储 NoSQL
解决MQ下单消息重复消费幂等机制详解
【11月更文挑战第20天】在分布式系统中,消息队列(Message Queue, MQ)作为一种常用的中间件,用于在不同系统或服务之间异步传输消息。MQ的应用场景广泛,如订单处理、日志收集、系统解耦等。然而,MQ的使用也伴随着一些挑战,其中消息重复消费是一个常见问题。特别是在下单场景中,如果消息被重复消费,可能会导致订单被重复创建或处理,从而引发一系列业务问题。
101 6
|
8月前
|
消息中间件 NoSQL Kafka
如何保证消息不被重复消费~~~~~(如何保证消息队列的幂等性)
如何保证消息不被重复消费~~~~~(如何保证消息队列的幂等性)
100 0
|
5月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
消息中间件 存储 Kafka
MQ保证消息幂等机制
MQ保证消息幂等机制
277 0
|
8月前
|
算法 安全 数据库
幂等(使用场景,详细介绍)
幂等(使用场景,详细介绍)
|
8月前
|
消息中间件 关系型数据库 MySQL
如何保证消息幂等
如何保证消息幂等
88 0
|
消息中间件 缓存 监控
Rocketmq并发和顺序消费的失败重试机制
Rocketmq并发和顺序消费的失败重试机制
|
消息中间件 NoSQL Kafka
如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
为了提高应用程序的性能和可扩展性,很多应用程序开始采用消息队列(MQ)来处理消息。 MQ 可以将消息异步地发送到目的地,从而实现解耦、异步处理和流量控制等功能。 但是,MQ 也带来了一些问题,如消息重复消费和消息消费的幂等性问题。 本文将介绍 MQ 如何保证消息不被重复消费,并讨论如何保证消息消费的幂等性。
|
消息中间件 Kafka 测试技术
MQ 学习日志(七) 保证消息消费的顺序性
保证消息消费的顺序性
195 0
|
消息中间件 负载均衡 Java
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?
RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?