一、JAVA使用RabbitMQ解决生产端消息投递可靠性,消费端幂等性问题
1、生产端消息投递可靠性
1.1、消息落库
思路:
1.将消息落库:
我们发送一个消息没办法知道我们发的消息消费端是否接收到,假如消费端没有接收到那么我们需要触发补偿机制来重新发送一个消息,这个时候我们为了解决这个问题就需要将消息落库,每次将准备发送的消息存入到数据库中,并设置一个状态为待发送。
等消费端接收到消息并给我们反馈后,我们将数据库中的消息状态改为已完成。
消息库
发送消息之前先将消息落库
如果消息发送成功则将数据库状态改为发送完成,如果没有成功则将重试次数+1,我们一般重试3次还是失败就会将状态改为发送失败。
package com.xiaoqi.server.config;/** * @ProjectName: yeb * @Package: com.xiaoqi.server.config * @ClassName: RabbitMQConfig * @Author: LiShiQi * @Description: ${description} * @Date: 2022/2/24 16:16 * @Version: 1.0 */ import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.xiaoqi.server.pojo.MailConstants; import com.xiaoqi.server.pojo.MailLog; import com.xiaoqi.server.service.IMailLogService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Description * @Author LiShiQi * @Date 2022/2/24 16:16 * @Version 1.0 */ @Configuration public class RabbitMQConfig { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class); @Autowired private CachingConnectionFactory cachingConnectionFactory; @Autowired private IMailLogService mailLogService; @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); /** * 消息确认回调,确认消息是否到达broker * data:消息唯一标识 * ack:确认结果 * cause:失败原因 */ rabbitTemplate.setConfirmCallback((data,ack,cause) ->{ String msgId = data.getId(); if(ack){ LOGGER.info("{}=============>消息发送成功",msgId); mailLogService.update(new UpdateWrapper<MailLog>().set("status",1).eq("msgId",msgId)); }else{ LOGGER.error("{}=============>消息发送失败",msgId); } }); /** * 消息失败回调,比如router不到queue时回调 * msg:消息主题 * repCode:响应码 * repText:相应描述 * exchange;交换机 * routingkey:路由键 */ rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey) ->{ LOGGER.error("{}=============>消息发送queue时失败",msg.getBody()); }); return rabbitTemplate; } @Bean public Queue queue(){ return new Queue(MailConstants.MAIL_QUEUE_NAME); } @Bean public DirectExchange directExchange(){ return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME); } }
1.2、定时任务
前面我们消息已经落库了,这个时候我们就弄一个定时任务去扫描我们的消息表中,把状态为待发送的消息任务重新发送一次,如果还失败则重试次数字段+1,等重试次数到达3次,不再重试。
package com.xiaoqi.server.task;/** * @ProjectName: yeb * @Package: com.xiaoqi.server.task * @ClassName: MailTask * @Author: LiShiQi * @Description: ${description} * @Date: 2022/2/24 18:28 * @Version: 1.0 */ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.xiaoqi.server.pojo.Employee; import com.xiaoqi.server.pojo.MailConstants; import com.xiaoqi.server.pojo.MailLog; import com.xiaoqi.server.service.IEmployeeService; import com.xiaoqi.server.service.IMailLogService; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.List; /** * @Description * @Author LiShiQi * @Date 2022/2/24 18:28 * @Version 1.0 */ @Component public class MailTask { @Autowired private IMailLogService mailLogService; @Autowired private IEmployeeService employeeService; @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(cron = "0/10 * * * * ?") public void mailTask(){ List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>() .eq("status", 0) .lt("tryTime", LocalDateTime.now())); list.forEach(mailLog -> { //如果重试次数超过3次,更新状态为投递失败,不再重试 if(3 < mailLog.getCount()) { mailLogService.update(new UpdateWrapper<MailLog>() .set("status", 2) .eq("msgId", mailLog.getMsgId())); } mailLogService.update(new UpdateWrapper<MailLog>() .set("count",mailLog.getCount()+1) .set("updateTime", LocalDateTime.now()) .set("tryTime", LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT)) .eq("msgId",mailLog.getMsgId())); Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0); //发送消息 rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME, emp,new CorrelationData(mailLog.getMsgId())); }); } }
2、消费端幂等性问题
首先幂等性问题就是一次和多次结果是一样的,也就是说有可能一个消息因为某些原因(例如在定时任务扫描数据库的时候扫描到状态为待发送,但是这个时候其实已经正在发送,这个时候定时任务又发送了一次)生产端可能给消费端发送了两次消息,这个时候我们消费端只需要消费一次就可以了,因为如果是电商业务,不可能下一笔订单扣两笔钱吧,所以这里我们用redis来实现。
2.1、redis解决
大概思路: 我们消费每一个消息的时候将这个消息的消息id放入redis中,如果接收到的消息id在redis中,证明我们已经消费过了就不在进行消费了。
package com.xiaoqi.mail;/** * @ProjectName: yeb * @Package: com.xiaoqi.mail * @ClassName: MailReceiver * @Author: LiShiQi * @Description: ${description} * @Date: 2022/2/24 12:43 * @Version: 1.0 */ import com.rabbitmq.client.Channel; import com.xiaoqi.server.pojo.Employee; import com.xiaoqi.server.pojo.MailConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.mail.MailProperties; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.mail.javamail.MimeMessageHelper; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import org.thymeleaf.TemplateEngine; import org.thymeleaf.context.Context; import javax.mail.internet.MimeMessage; import java.io.IOException; import java.util.Date; /** * @Description * @Author LiShiQi * @Date 2022/2/24 12:43 * @Version 1.0 */ @Component public class MailReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class); //邮件发送 @Autowired private JavaMailSender javaMailSender; //邮件配置 @Autowired private MailProperties mailProperties; //引擎 @Autowired private TemplateEngine templateEngine; @Autowired private RedisTemplate redisTemplate; //监听 @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME) public void handler(Message message, Channel channel){ Employee employee = (Employee)message.getPayload(); MessageHeaders headers = message.getHeaders(); //消息序号 long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG); String msgId = (String) headers.get("spring_returned_message_correlation"); HashOperations hashOperations = redisTemplate.opsForHash(); try { if(hashOperations.entries("mail_log").containsKey(msgId)){ LOGGER.error("消息已经被消费===============>{}",msgId); /** * 手动确认消息 * tag:消息序号 * multiple:是否确认多条 */ channel.basicAck(tag,false); return; } //创建消息 MimeMessage msg = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(msg); //发件人 helper.setFrom(mailProperties.getUsername()); //收件人 helper.setTo(employee.getEmail()); //主题 helper.setSubject("入职欢迎邮件"); //发送日期 helper.setSentDate(new Date()); //邮件内容 Context context = new Context(); context.setVariable("name",employee.getName()); context.setVariable("posName",employee.getPosition().getName()); context.setVariable("joblevelName",employee.getJoblevel().getName()); context.setVariable("departmentName",employee.getDepartment().getName()); String mail = templateEngine.process("mail", context); //参数为true就是html helper.setText(mail,true); //发送邮件 javaMailSender.send(msg); LOGGER.info("邮件发送成功"); //将消息id存入redis hashOperations.put("mail_log",msgId,"OK"); //手动确认消息 channel.basicAck(tag,false); } catch (Exception e) { /** * 手动确认消息 * tag:消息序号 * multiple:是否确认多条 * requeue:是否退回到队列 */ try { channel.basicNack(tag,false,true); } catch (IOException e1) { LOGGER.error("邮件发送失败=========>{}",e.getMessage()); } LOGGER.error("邮件发送失败=========>{}",e.getMessage()); } } }
3、总结
以上就是解决MQ消息队列的可靠性问题,因为在引入消息队列解决某些问题的同时我们随之而来了一些其他问题,这个时候我们就要考虑怎么解决这些其他问题,以上的解决方案只是众多方案中的其中一种,还有其他方案也可以解决这些问题。