JAVA使用RabbitMQ解决生产端消息投递可靠性,消费端幂等性问题

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 我们发送一个消息没办法知道我们发的消息消费端是否接收到,假如消费端没有接收到那么我们需要触发补偿机制来重新发送一个消息,这个时候我们为了解决这个问题就需要将消息落库,每次将准备发送的消息存入到数据库中,并设置一个状态为待发送。等消费端接收到消息并给我们反馈后,我们将数据库中的消息状态改为已完成。

一、JAVA使用RabbitMQ解决生产端消息投递可靠性,消费端幂等性问题



1、生产端消息投递可靠性


1.1、消息落库


思路:


1.将消息落库:


我们发送一个消息没办法知道我们发的消息消费端是否接收到,假如消费端没有接收到那么我们需要触发补偿机制来重新发送一个消息,这个时候我们为了解决这个问题就需要将消息落库,每次将准备发送的消息存入到数据库中,并设置一个状态为待发送。

等消费端接收到消息并给我们反馈后,我们将数据库中的消息状态改为已完成。


消息库


5.png


发送消息之前先将消息落库


6.png


如果消息发送成功则将数据库状态改为发送完成,如果没有成功则将重试次数+1,我们一般重试3次还是失败就会将状态改为发送失败。


7.png


package com.xiaoqi.server.config;/**
 * @ProjectName: yeb
 * @Package: com.xiaoqi.server.config
 * @ClassName: RabbitMQConfig
 * @Author: LiShiQi
 * @Description: ${description}
 * @Date: 2022/2/24 16:16
 * @Version: 1.0
 */
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.xiaoqi.server.pojo.MailConstants;
import com.xiaoqi.server.pojo.MailLog;
import com.xiaoqi.server.service.IMailLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Description
 * @Author LiShiQi
 * @Date 2022/2/24 16:16
 * @Version 1.0
 */
@Configuration
public class RabbitMQConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;
    @Autowired
    private IMailLogService mailLogService;
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        /**
         * 消息确认回调,确认消息是否到达broker
         * data:消息唯一标识
         * ack:确认结果
         * cause:失败原因
         */
        rabbitTemplate.setConfirmCallback((data,ack,cause) ->{
            String msgId = data.getId();
            if(ack){
                LOGGER.info("{}=============>消息发送成功",msgId);
                mailLogService.update(new UpdateWrapper<MailLog>().set("status",1).eq("msgId",msgId));
            }else{
                LOGGER.error("{}=============>消息发送失败",msgId);
            }
        });
        /**
         * 消息失败回调,比如router不到queue时回调
         * msg:消息主题
         * repCode:响应码
         * repText:相应描述
         * exchange;交换机
         * routingkey:路由键
         */
        rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey) ->{
            LOGGER.error("{}=============>消息发送queue时失败",msg.getBody());
        });
        return rabbitTemplate;
    }
    @Bean
    public Queue queue(){
        return new Queue(MailConstants.MAIL_QUEUE_NAME);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME);
    }
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
    }
}


1.2、定时任务


前面我们消息已经落库了,这个时候我们就弄一个定时任务去扫描我们的消息表中,把状态为待发送的消息任务重新发送一次,如果还失败则重试次数字段+1,等重试次数到达3次,不再重试。


8.png


package com.xiaoqi.server.task;/**
 * @ProjectName: yeb
 * @Package: com.xiaoqi.server.task
 * @ClassName: MailTask
 * @Author: LiShiQi
 * @Description: ${description}
 * @Date: 2022/2/24 18:28
 * @Version: 1.0
 */
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.xiaoqi.server.pojo.Employee;
import com.xiaoqi.server.pojo.MailConstants;
import com.xiaoqi.server.pojo.MailLog;
import com.xiaoqi.server.service.IEmployeeService;
import com.xiaoqi.server.service.IMailLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
 * @Description
 * @Author LiShiQi
 * @Date 2022/2/24 18:28
 * @Version 1.0
 */
@Component
public class MailTask {
    @Autowired
    private IMailLogService mailLogService;
    @Autowired
    private IEmployeeService employeeService;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Scheduled(cron = "0/10 * * * * ?")
    public void mailTask(){
        List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>()
                .eq("status", 0)
                .lt("tryTime", LocalDateTime.now()));
        list.forEach(mailLog -> {
            //如果重试次数超过3次,更新状态为投递失败,不再重试
            if(3 < mailLog.getCount()) {
                mailLogService.update(new UpdateWrapper<MailLog>()
                        .set("status", 2)
                        .eq("msgId", mailLog.getMsgId()));
            }
            mailLogService.update(new UpdateWrapper<MailLog>()
            .set("count",mailLog.getCount()+1)
            .set("updateTime", LocalDateTime.now())
            .set("tryTime", LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
            .eq("msgId",mailLog.getMsgId()));
            Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0);
            //发送消息
            rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,
                    emp,new CorrelationData(mailLog.getMsgId()));
        });
    }
}


2、消费端幂等性问题


首先幂等性问题就是一次和多次结果是一样的,也就是说有可能一个消息因为某些原因(例如在定时任务扫描数据库的时候扫描到状态为待发送,但是这个时候其实已经正在发送,这个时候定时任务又发送了一次)生产端可能给消费端发送了两次消息,这个时候我们消费端只需要消费一次就可以了,因为如果是电商业务,不可能下一笔订单扣两笔钱吧,所以这里我们用redis来实现。


2.1、redis解决


大概思路: 我们消费每一个消息的时候将这个消息的消息id放入redis中,如果接收到的消息id在redis中,证明我们已经消费过了就不在进行消费了。


9.png


package com.xiaoqi.mail;/**
 * @ProjectName: yeb
 * @Package: com.xiaoqi.mail
 * @ClassName: MailReceiver
 * @Author: LiShiQi
 * @Description: ${description}
 * @Date: 2022/2/24 12:43
 * @Version: 1.0
 */
import com.rabbitmq.client.Channel;
import com.xiaoqi.server.pojo.Employee;
import com.xiaoqi.server.pojo.MailConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.mail.MailProperties;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.Date;
/**
 * @Description
 * @Author LiShiQi
 * @Date 2022/2/24 12:43
 * @Version 1.0
 */
@Component
public class MailReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
    //邮件发送
    @Autowired
    private JavaMailSender javaMailSender;
    //邮件配置
    @Autowired
    private MailProperties mailProperties;
    //引擎
    @Autowired
    private TemplateEngine templateEngine;
    @Autowired
    private RedisTemplate redisTemplate;
    //监听
    @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
    public void handler(Message message, Channel channel){
        Employee employee = (Employee)message.getPayload();
        MessageHeaders headers = message.getHeaders();
        //消息序号
        long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        String msgId = (String) headers.get("spring_returned_message_correlation");
        HashOperations hashOperations = redisTemplate.opsForHash();
        try {
            if(hashOperations.entries("mail_log").containsKey(msgId)){
                LOGGER.error("消息已经被消费===============>{}",msgId);
                /**
                 * 手动确认消息
                 * tag:消息序号
                 * multiple:是否确认多条
                 */
                channel.basicAck(tag,false);
                return;
            }
            //创建消息
            MimeMessage msg = javaMailSender.createMimeMessage();
            MimeMessageHelper helper = new MimeMessageHelper(msg);
            //发件人
            helper.setFrom(mailProperties.getUsername());
            //收件人
            helper.setTo(employee.getEmail());
            //主题
            helper.setSubject("入职欢迎邮件");
            //发送日期
            helper.setSentDate(new Date());
            //邮件内容
            Context context = new Context();
            context.setVariable("name",employee.getName());
            context.setVariable("posName",employee.getPosition().getName());
            context.setVariable("joblevelName",employee.getJoblevel().getName());
            context.setVariable("departmentName",employee.getDepartment().getName());
            String mail = templateEngine.process("mail", context);
            //参数为true就是html
            helper.setText(mail,true);
            //发送邮件
            javaMailSender.send(msg);
            LOGGER.info("邮件发送成功");
            //将消息id存入redis
            hashOperations.put("mail_log",msgId,"OK");
            //手动确认消息
            channel.basicAck(tag,false);
        } catch (Exception e) {
            /**
             * 手动确认消息
             * tag:消息序号
             * multiple:是否确认多条
             * requeue:是否退回到队列
             */
            try {
                channel.basicNack(tag,false,true);
            } catch (IOException e1) {
                LOGGER.error("邮件发送失败=========>{}",e.getMessage());
            }
            LOGGER.error("邮件发送失败=========>{}",e.getMessage());
        }
    }
}


3、总结


以上就是解决MQ消息队列的可靠性问题,因为在引入消息队列解决某些问题的同时我们随之而来了一些其他问题,这个时候我们就要考虑怎么解决这些其他问题,以上的解决方案只是众多方案中的其中一种,还有其他方案也可以解决这些问题。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
13天前
|
消息中间件 Java 中间件
MQ四兄弟:如何保证消息可靠性
本文介绍了RabbitMQ、RocketMQ、Kafka和Pulsar四种消息中间件的可靠性机制。这些中间件通过以下几种方式确保消息的可靠传输:1. 消息持久化,确保消息在重启后不会丢失;2. 确认机制,保证消息从生产者到消费者都被成功处理;3. 重试机制,处理失败后的重试;4. 死信队列,处理无法消费的消息。每种中间件的具体实现略有不同,但核心思想相似,都是从生产者、中间件本身和消费者三个角度来保障消息的可靠性。
17 0
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
44 3
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
3月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
37 0
rabbitmq基础教程(ui,java,springamqp)
|
3月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
125 0
|
4月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
200 0
|
5月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
203 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
5月前
|
消息中间件 存储 运维
RabbitMQ-消息消费时的可靠性保障
将这些实践融入到消息消费的处理逻辑中,可以很大程度上保障RabbitMQ中消息消费的可靠性,确保消息系统的稳定性和数据的一致性。这些措施的实施,需要在系统的设计和开发阶段充分考虑,以及在后续的维护过程中不断的调整和完善。
66 0
|
6月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ
|
6月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。