RabbitMQ保证消息的一致性解决方案RabbitMQ保证消息的一致性解决方案

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RabbitMQ保证消息的一致性解决方案RabbitMQ保证消息的一致性解决方案

RabbitMQ保证消息的一致性

一、采用confirm消息确认机制及return返回机制 确保消息发送成功

二、将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在

三、手动确认接收消息方式 消息处理失败拒收重回队列


1. yml配置

spring:
  rabbitmq:
    host: ip
    port: 5672
    username: guest
    password: guest
    ##消息发送确认回调
    publisher-confirms: true
    #采用confirm以及return机制 发送返回监听回调
    publisher-confirm-type: correlated
    publisher-returns: true
listener:
      type: simple
      simple:
        #手动接收消息方式
        acknowledge-mode: manual

2. RabbitMQ配置类

@Configuration
@Slf4j
@AllArgsConstructor
public class RabbitmqConfig {
    private final ConnectionFactory connectionFactory;
    private final RabbitLogsMapper rabbitLogsMapper;
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //confirm确认
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            String msgId = correlationData.getId();
            if (ack) {
                //发送成功
                log.info("消息成功发送 , msgId: {}," ,msgId);
                //状态更新  消息发送成功
                BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
                biddingRabbitLogs.setStatus(SendStatus.SEND_SUCCESS.getValue());
                rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
            } else {
                //发送失败
                log.error("消息发送失败, {}, cause: {}, msgId: {}", correlationData, cause, msgId);
                //状态更新  消息发送失败
                BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
                biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue());
                rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
            }
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            //触发回调  只有交换机找不到队列时才会触发
            log.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
            //状态更新 消息发送失败
            String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
            BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs();
            biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue());
            rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4"));
        });
        return rabbitTemplate;
    }
    @Bean
    public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

说明:


confirm机制只是确保了消息是否成功发送到交换机

Return机制确保了消息是否从交换机发送到指定的队列


- - ConfirmCallback则根据状态判断发送成功还是失败 进行更新日志表记录状态

ReturnCallback则根据收到消息就是未找到队列发送失败,未收到消息就是发送成功 进行更新日志表记录状态

3. 声明的队列一定要将队列持久化

public String createQueue(String queueName) {
        BiddingQueueConfig biddingQueueConfig = queueMapper.selectOne(Wrappers.lambdaQuery(BiddingQueueConfig.class).eq(BiddingQueueConfig::getQueue, queueName));
        if (biddingQueueConfig == null) {
            biddingQueueConfig = new BiddingQueueConfig();
            biddingQueueConfig.setCreatetime(new Date());
            biddingQueueConfig.setQueue(queueName);
            biddingQueueConfig.setStatus("1");
            int insert = queueMapper.insert(biddingQueueConfig);
            //将队列持久化
            rabbitAdmin.declareQueue(new Queue(queueName,true));
            return queueName + "队列创建成功";
        }
        return queueName + "队列创建失败";
}

4. 发送消息 将发送的消息设置为持久化

发送消息前首先将发送的数据插入数据库,状态变为发送中

20210507093722572.png

5. 消费者监听队列

如果根据消息id查询日志表为空的话那么是没有发送消息,消息自动接收,发送成功消息后日志表会有数据

判断是否重复消费 根据状态是否成功消费以及失败重试次数判断

处理业务逻辑,如果成功消息接收 状态更新

如果处理业务逻辑失败报错则会拒收,消息重回队列重新处理此条消息,当处理次数超过3次处理失败则消息改为接收

// 启动自动创建队列
@RabbitListener(queuesToDeclare = { @Queue("queue_work_dontask") })
@RabbitHandler
@SneakyThrows
public void receiveDonTask(String data, Message message, Channel channel){
    //消息id
    String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    //根据消息id查询BiddingRabbitLogs日志表
    BiddingRabbitLogs biddingRabbitLogs = remoteLogsService.get(msgId, SecurityConstants.FROM_IN).getData();
    if (biddingRabbitLogs == null) {
        log.error("消息ID查询 biddingRabbitLogs:null");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        return;
    }
    //状态:1.消息发送中 2.消息发送成功 3.消息发送失败 4.消费成功 5.消费失败
    if (SendStatus.CONSUME_SUCCESS.getValue().equals(biddingRabbitLogs.getStatus()) || SendStatus.SEND_FAILD.getValue() == String.valueOf(biddingRabbitLogs.getTryTimes())) {
        //重复消费
        log.info("消息ID:{},重复消费",msgId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        return;
    }
    try {
        //处理业务逻辑
        Map map = JSON.parseObject(data, Map.class);
        String dataString = (String) map.get("data");
        String username = (String) map.get("username");
        Integer tenantId = (Integer) map.get("tenantId");
        ApproveParam approveParam = JSON.parseObject(dataString, ApproveParam.class);
        R<String> stringR = doneTask(approveParam,username,tenantId);
        //处理成功  更新状态
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        biddingRabbitLogs.setStatus(SendStatus.CONSUME_SUCCESS.getValue());
        biddingRabbitLogs.setSuccesstime(new Date());
        remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN);
        log.info("消费成功,消息ID:{}",msgId);
    } catch (Exception e) {
        e.printStackTrace();
        if (biddingRabbitLogs.getTryTimes() >= Integer.parseInt(SendStatus.TRY_TIMES.getValue())) {
            //多次消费不成功 自动接收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.error("多次消费失败,消息ID:{}",msgId);
        } else {
            //消费失败 拒收 重回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            log.error("消费失败,消息ID:{}",msgId);
        }
        biddingRabbitLogs.setStatus(SendStatus.CONSUME_FAILD.getValue());
        biddingRabbitLogs.setTryTimes(biddingRabbitLogs.getTryTimes()+1);
        remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN);
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
72 8
|
28天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 存储 弹性计算
云消息队列 RabbitMQ 版实践解决方案评测
随着企业业务的增长,对消息队列的需求日益提升。阿里云的云消息队列 RabbitMQ 版通过架构优化,解决了消息积压、内存泄漏等问题,并支持弹性伸缩和按量计费,大幅降低资源和运维成本。本文从使用者角度详细评测这一解决方案,涵盖实践原理、部署体验、实际优势及应用场景。
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
81 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
76 9
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
52 1
|
2月前
|
消息中间件 监控 持续交付
《云消息队列RabbitMQ实践》解决方案测评报告
《云消息队列RabbitMQ实践》解决方案通过RabbitMQ实现业务解耦、异步处理和高可用性。其核心优势包括消息持久化、灵活路由及高可靠性。文档详细介绍了部署步骤、配置方法及监控手段,帮助用户快速搭建消息队列系统。方案适用于电商、金融和实时数据处理等高并发场景,通过异步处理提升系统性能。建议增加自动化部署、复杂场景示例及更详尽的日志解析,进一步提升用户体验。
|
1月前
|
消息中间件 运维 监控
《云消息队列RabbitMQ实践》解决方案
《云消息队列RabbitMQ实践》解决方案
|
5月前
|
消息中间件 中间件 程序员
分布式事务大揭秘:使用MQ实现最终一致性
本文由小米分享,介绍分布式事务中的MQ最终一致性实现,以RocketMQ为例。RocketMQ的事务消息机制包括准备消息、本地事务执行、确认/回滚消息及事务状态检查四个步骤。这种机制通过消息队列协调多系统操作,确保数据最终一致。MQ最终一致性具有系统解耦、提高可用性和灵活事务管理等优点,广泛应用于分布式系统中。文章还讨论了RocketMQ的事务消息处理流程和失败情况下的处理策略,帮助读者理解如何在实际应用中解决分布式事务问题。
408 6
下一篇
无影云桌面