RabbitMq消息防丢失(含springboot代码示例)

简介: 1.概述1.1.数据丢失的原因在消息中有三种可能性造成数据丢失:消费者消费消息失败生产者生产消息失败MQ数据丢失消费者消费消息失败:

1.概述

1.1.数据丢失的原因

在消息中有三种可能性造成数据丢失:

  1. 消费者消费消息失败
  2. 生产者生产消息失败
  3. MQ数据丢失

消费者消费消息失败:

RabbitMq存在应答机制,默认为自动应答,MQ向消费者推送一条消息,消费者收到这条消息后会返回一个ack(应答)给MQ,MQ收到应答后会删除这条消息。


自动应答存在一个问题,就是消费者收到消息后立马就会给MQ返回ack,如果消费者返回完ack但还没来的及真正处理这条消息时,消费者断电宕机了,那么这条消息就丢失了。


这就是由于消费者消费消息失败造成的数据丢失。

生产者生产数据失败:

生产者向MQ推送了一条消息,但是由于由于诸如网络故障等原因mq并没有收到该条消息,这样就造成了这条消息的丢失。

MQ数据丢失:

MQ的数据是存在内存中的,诸如断电等原因可能会造成数据的丢失。

1.2.如何防止数据丢失

解决以上列举的数据丢失问题的办法有三种:

  1. 手动应答
  2. 消息确认机制
  3. 持久化

手动应答:

RabbitMQ默认是自动应答,消费者收到消息后就会自动返回ack给MQ,可以将应答模式改为手动应答,在消费者一侧消息的消费动作完成后手动来返回ack给MQ,用来解决“消费者消费消息失败”问题。

消息确认机制:

消息队列收到消息后,告知生产者,让生产者感知到自己生产的消息,消息队列已经接收到,用来解决“生产者生产消息失败”问题。消息确认机制有两种实现方式:

  • AMQP事务
  • confirm

持久化:

消息队列的消息持久化到磁盘上,用来解决“MQ数据丢失”问题。

2.手动应答

手动应答是通过设置channel来实现的,以下为一个完整代码示例。

配置类:

@Configuration
public class config {
    @Bean
    public Queue queue(){
        return new Queue("queue_01",false);
    }
}

生产者:

@SpringBootTest(classes = Main.class)
public class Producer {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void producerMsg(){
        rabbitTemplate.convertAndSend("queue_01","hello_world");
    }
}

消费者:

@Component
@Slf4j
public class Consumer {
    @RabbitListener(queues = {"queue_01"})
    public void consumerMsg(String msg, Message message,Channel channel){
        try {
            log.info("消费者消费消息: "+msg);
            /**
             * 没有异常就确认消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:当前消息在队列中的的索引;
             * multiple:为true的话就是批量确认
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            /**
             * 有异常就拒收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true为将消息重返当前消息队列,重新发送给消费者;
             *         false将消息丢弃
             */
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (Exception ex) {
                log.error(ex.getMessage());
            }
        }
    }
}

3.消息确认机制

AQMP事务、confirm其实都是基于channel的。

3.1.AMQP事务

AMQP事务和数据库事务类似,定义一组对MQ的操作,统一提交,成功则全部一起执行,失败则全部回滚。AMQP事务在spring boot中的使用很简单,和数据库的事务一样,一个注解就可以搞定。

@GetMapping("/direct/wx/transactional")
@Transactional(rollbackFor = Exception.class)
public String sendDirectMessageTransactional() {
  rabbitTemplate.convertAndSend("direct_exchange", "wx","hello world!");
  log.info("开启事务消息机制");
    try {
           Thread.sleep(5000);
       } catch (Exception e) {
            e.printStackTrace();
       }
      return "ok";
}

3.2.confirm

confirm是基于channel的,一旦channel进入confirm模式,所有在该channel上发布的消息都会被指派一个唯一的ID(从1开始),消息被投递道匹配队列后broker会发送一个确认消息给生产者。如果消息和队列是可持久化的(durable为true),那么确认消息会在消息被写入磁盘后发出。


confirm最大的好处在于异步,生产者在等待上一条消息的确认消息的时候可以继续往下发送。


confirm在spring boot中的使用很简单,在配置文件中开启即可,并且支持自定义回调函数:


配置文件:

spring.rabbitmq.publisher-confirms: true

spring.rabbitmq.publisher-returns: true

生产者:

@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 设置交换机处理失败消息的模式     true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
        // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
        rabbitTemplate.setMandatory(true);
        //消息消费者确认收到消息后,手动ack回执
        rabbitTemplate.setConfirmCallback(this);
        // 暂时关闭 return 配置
        //rabbitTemplate.setReturnCallback(this);
        //发送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
    /**
     * 交换机并未将数据丢入指定的队列中时,触发
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  参数三:true  表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
     * @param message   消息对象
     * @param replyCode 错误码
     * @param replyText 错误信息
     * @param exchange 交换机
     * @param routingKey 路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
    }
    /**
     * 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
     * @param correlationData  相关配置信息
     * @param ack exchange 交换机,判断交换机是否成功收到消息    true 表示交换机收到
     * @param cause  失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交换机接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 没有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
    }
}


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
3月前
|
Java 数据库连接 数据库
Spring boot 使用mybatis generator 自动生成代码插件
本文介绍了在Spring Boot项目中使用MyBatis Generator插件自动生成代码的详细步骤。首先创建一个新的Spring Boot项目,接着引入MyBatis Generator插件并配置`pom.xml`文件。然后删除默认的`application.properties`文件,创建`application.yml`进行相关配置,如设置Mapper路径和实体类包名。重点在于配置`generatorConfig.xml`文件,包括数据库驱动、连接信息、生成模型、映射文件及DAO的包名和位置。最后通过IDE配置运行插件生成代码,并在主类添加`@MapperScan`注解完成整合
558 1
Spring boot 使用mybatis generator 自动生成代码插件
|
4月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
175 32
|
3月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
250 0
|
8月前
|
XML 前端开发 Java
SpringBoot整合Flowable【04】- 通过代码控制流程流转
本文介绍了如何使用Flowable的Java API控制流程流转,基于前文构建的绩效流程模型。首先,通过Flowable-UI导出模型文件并部署到Spring Boot项目中。接着,详细讲解了如何通过代码部署、启动和审批流程,涉及`RepositoryService`、`RuntimeService`和`TaskService`等核心服务类的使用。最后,通过实际操作演示了流程从部署到完成的全过程,并简要说明了相关数据库表的变化。本文帮助读者初步掌握Flowable在实际业务中的应用,后续将深入探讨更多高级功能。
1033 0
SpringBoot整合Flowable【04】-  通过代码控制流程流转
|
9月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
141 6
|
11月前
|
Java 数据库连接 Maven
mybatis使用一:springboot整合mybatis、mybatis generator,使用逆向工程生成java代码。
这篇文章介绍了如何在Spring Boot项目中整合MyBatis和MyBatis Generator,使用逆向工程来自动生成Java代码,包括实体类、Mapper文件和Example文件,以提高开发效率。
423 2
mybatis使用一:springboot整合mybatis、mybatis generator,使用逆向工程生成java代码。
|
11月前
|
Java BI API
spring boot 整合 itextpdf 导出 PDF,写入大文本,写入HTML代码,分析当下导出PDF的几个工具
这篇文章介绍了如何在Spring Boot项目中整合iTextPDF库来导出PDF文件,包括写入大文本和HTML代码,并分析了几种常用的Java PDF导出工具。
2434 0
spring boot 整合 itextpdf 导出 PDF,写入大文本,写入HTML代码,分析当下导出PDF的几个工具
|
11月前
|
前端开发 Java Apache
Springboot整合shiro,带你学会shiro,入门级别教程,由浅入深,完整代码案例,各位项目想加这个模块的人也可以看这个,又或者不会mybatis-plus的也可以看这个
本文详细讲解了如何整合Apache Shiro与Spring Boot项目,包括数据库准备、项目配置、实体类、Mapper、Service、Controller的创建和配置,以及Shiro的配置和使用。
2419 1
Springboot整合shiro,带你学会shiro,入门级别教程,由浅入深,完整代码案例,各位项目想加这个模块的人也可以看这个,又或者不会mybatis-plus的也可以看这个
|
10月前
|
缓存 监控 Java

热门文章

最新文章