RabbitMq实战如何保证消息幂等消费

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: RabbitMq实战如何保证消息幂等消费

正文


一、消息幂等性


在编程中一个幂等操作的特点是其任意多次执行所产生的结果与一次执行的产生的结果相同,在mq中由于网络故障或客户端延迟消费mq自动重试过程中可能会导致消息的重复消费,那我们如何保证消息的幂等问题呢?也可以理解为如何保证消息不被重复消费呢,不重复消费也就解决了幂等问题。


二、解决方案


1、生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过。


2、如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)。


3、如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中。


注:还有一种方式,数据库操作可以设置唯一键(消息id),防止重复数据的插入,这样插入只会报错而不会插入重复数据,本人没有测试。


三、代码


111.png

简单描述一下需求,如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息。


如果是Redis存放数据key=全局id,value=积分值,在消费消息之前,通过全局id去redis查询是否有该数据,如果有直接丢弃。该方法本人没有测试,只是说说自己的思路。有不对的希望大佬们不吝赐教。


生产者


package com.xiaojie.score.producer;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.score.entity.Score;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
 * @author xiaojie
 * @version 1.0
 * @description:发送积分消息的生产者
 * @date 2021/10/10 22:18
 */
@Component
@Slf4j
public class ScoreProducer implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //定义交换机
    private static final String SCORE_EXCHANGE = "xiaojie_score_exchaneg";
    //定义路由键
    private static final String SCORE_ROUTINNGKEY = "score.add";
    /**
     * @description: 订单完成
     * @param:
     * @return: java.lang.String
     * @author xiaojie
     * @date: 2021/10/10 22:30
     */
    public String completeOrder() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单已完成");
        //发送积分通知
        Score score = new Score();
        score.setScore(100);
        score.setOrderId(orderId);
        String jsonMSg = JSONObject.toJSONString(score);
        sendScoreMsg(jsonMSg, orderId);
        return orderId;
    }
    /**
     * @description: 发送积分消息
     * @param:
     * @param: message
     * @param: orderId
     * @return: void
     * @author xiaojie
     * @date: 2021/10/10 22:22
     */
    @Async
    public void sendScoreMsg(String jsonMSg, String orderId) {
        this.rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> {
            //设置消息的id为唯一
            message.getMessageProperties().setMessageId(orderId);
            return message;
        });
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (ack) {
            log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);
        } else {
            log.info(">>>>>>>消息发送失败{}", ack);
        }
    }
}


消费者


package com.xiaojie.score.consumer;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.xiaojie.score.entity.Score;
import com.xiaojie.score.mapper.ScoreMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 积分的消费者
 * @date 2021/10/10 22:37
 */
@Component
@Slf4j
public class ScoreConsumer {
    @Autowired
    private ScoreMapper scoreMapper;
    @RabbitListener(queues = {"xiaojie_score_queue"})
    public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        String orderId = message.getMessageProperties().getMessageId();
        if (StringUtils.isBlank(orderId)) {
            return;
        }
        log.info(">>>>>>>>消息id是:{}", orderId);
        String msg = new String(message.getBody());
        Score score = JSONObject.parseObject(msg, Score.class);
        if (score == null) {
            return;
        }
        //执行前去数据库查询,是否存在该数据,存在说明已经消费成功,不存在就去添加数据,添加成功丢弃消息
        Score dbScore = scoreMapper.selectByOrderId(orderId);
        if (dbScore != null) {
            //证明已经消费消息,告诉mq已经消费,丢弃消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        Integer result = scoreMapper.save(score);
        if (result > 0) {
            //积分已经累加,删除消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        } else {
            log.info("消费失败,采取相应的人工补偿");
        }
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
288 1
|
3月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
34 1
|
7月前
|
消息中间件 存储 网络协议
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
131 0
|
4月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
230 1
|
3月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
76 0
|
2月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
25 4
|
2月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
29 0
|
2月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
49 1
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
44 1
|
6月前
|
消息中间件
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
50 0