消息投递时 可能发生丢失的场景:
- 生产者------msg------> MQ 。可开启消息投递结果回调,确保每条消息都收到了回调。
- MQ。将Queue与消息设置成可持久化,搭建镜像集群队列。
- MQ-------callback---->生产者。回调时失败,某条消息在一段时间内未收到回调,则默认投递失败,生产者需要再次投递该消息到MQ。(该场景下会导致同一条消息被重复投递,消费者端需要自行保证消息幂等消费)
一、实现思路
使用技术:
- SpringBoot
- RabbitMQ
- Mysql
- MybatisPlus
- XxlJob
二、准备,框架搭建
- 数据库Entity:
- Message
/** * 消息发送历史 * * @author futao * @date 2020/3/31. */ @Getter @Setter @Builder @TableName("message") public class Message extends IdTimeEntity { @Tolerate public Message() { } /** * 消息承载的业务数据 */ @TableField("msg_data") private String msgData; /** * 交换机名称 */ @TableField("exchange_name") private String exchangeName; /** * 路由键 */ @TableField("routing_key") private String routingKey; /** * 消息状态 * * @see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum */ @TableField("status") private int status; /** * 重试次数 */ @TableField("retry_times") private int retryTimes; /** * 下一次重试时间 */ @TableField("next_retry_date_time") private LocalDateTime nextRetryDateTime; }
- 消息状态枚举
/** * 消息状态枚举 * * @author futao * @date 2020/3/31. */ @Getter @AllArgsConstructor public enum MessageStatusEnum { /** * 1=发送中 */ SENDING(1, "发送中"), /** * 2=发送失败 */ SUCCESS(2, "发送成功"), /** * 3=发送失败 */ FAIL(3, "发送失败"); private int status; private String description; }
- RabbitMQ配置
spring: rabbitmq: host: localhost port: 5672 username: futao password: 123456789 virtual-host: reliable-delivery connection-timeout: 15000 # 发送确认 publisher-confirms: true # 路由失败回调 publisher-returns: true template: # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃 mandatory: true app: rabbitmq: retry: # 消息最大重试次数 max-retry-times: 5 # 每次重试时间间隔 retry-interval: 5s # 队列定义 queue: user: user-queue # 交换机定义 exchange: user: user-exchange
三、编码
- 队列交换机定义与绑定
/** * RabbitMQ队列定义与绑定 * * @author futao * @date 2020/3/31. */ @Configuration public class Declare { @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) { return QueueBuilder .durable(userQueueName) //.withArgument("x-max-length", 2) .build(); } @Bean public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) { return ExchangeBuilder .topicExchange(userExchangeName) .durable(true) .build(); } @Bean public Binding userBinding(Queue userQueue, Exchange userExchange) { return BindingBuilder .bind(userQueue) .to(userExchange) .with("user.*") .noargs(); } }
- 对RabbitTemplate进行增强,设置
confirmCallback()
消息投递回调方法与returnCallback()
消息路由失败回调方法
/** * Bean增强 * 【严重警告】: 不可在该类中注入Bean,被注入的Bean不会被BeanPostProcessor增强,造成误伤。 * 必须通过容器来获取需要注入的Bean * * @author futao * @date 2020/3/20. */ @Slf4j @Component public class BeanEnhance implements BeanPostProcessor { // @Resource // private MessageMapper messageMapper; /** * 消息的最大重试次数 */ @Value("${app.rabbitmq.retry.max-retry-times}") private int maxRetryTimes; /** * 每次重试时间间隔 */ @Value("${app.rabbitmq.retry.retry-interval}") private Duration retryInterval; // @Autowired // private RabbitTemplate rabbitTemplate; // // @Autowired // private BeanEnhance enhance; @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //增强RabbitTemplate if (RabbitTemplate.class.equals(bean.getClass())) { //消息投递成功与否的监听,可以用来保证消息100%投递到rabbitMQ。(如果某条消息(通过id判定)在一定时间内未收到该回调,则重发该消息) //需要设置 publisher-confirms: true ((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> { String correlationDataId = correlationData.getId(); if (ack) { //ACK log.debug("消息[{}]投递成功,将DB中的消息状态设置为投递成功", correlationDataId); ApplicationContextHolder.getBean(MessageMapper.class).update(null, Wrappers.<Message>lambdaUpdate() .set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus()) .eq(Message::getId, correlationDataId) ); } else { log.debug("消息[{}]投递失败,cause:{}", correlationDataId, cause); //NACK,消息重发 ApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId); } }); //消息路由失败的回调--需要设置 publisher-returns: true 并且 template: mandatory: true 否则rabbit将丢弃该条消息 ((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.warn("消息路由失败回调...做一些补偿或者记录............................................."); log.warn("message{}", message); log.warn("replyCode{}", replyCode); log.warn("replyText{}", replyText); log.warn("exchange{}", exchange); log.warn("routingKey{}", routingKey); }); } return bean; } /** * NACK时进行消息重发 * * @param correlationDataId */ @Transactional(rollbackFor = Exception.class) public void reSend(String correlationDataId) { Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId); if (message.getRetryTimes() < maxRetryTimes) { //进行重试 ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId)); //更新DB消息状态 ApplicationContextHolder.getBean(MessageMapper.class).update(null, Wrappers.<Message>lambdaUpdate() .set(Message::getStatus, MessageStatusEnum.SENDING.getStatus()) .set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval)) .set(Message::getRetryTimes, message.getRetryTimes() + 1) .eq(Message::getId, correlationDataId) ); } } }
- 生产者:
/** * @author futao * @date 2020/3/31. */ @Component public class Sender { @Value("${app.rabbitmq.retry.retry-interval}") private Duration retryInterval; @Autowired private RabbitTemplate rabbitTemplate; @Resource private MessageMapper messageMapper; @Value("${app.rabbitmq.exchange.user}") private String userExchangeName; public void send(User user) { Message message = Message.builder() .msgData(JSON.toJSONString(user)) .exchangeName(userExchangeName) .routingKey("user.abc") .status(MessageStatusEnum.SENDING.getStatus()) //下次重试时间 .nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval)) .retryTimes(0) .build(); //消息落库 messageMapper.insert( message ); CorrelationData correlationData = new CorrelationData(message.getId()); //消息投递到MQ rabbitTemplate.convertAndSend(userExchangeName, "user.abc", JSON.toJSONString(user), correlationData); } }
- 定时任务扫描DB中的消息状态,如果存在发送中的消息,且当前时间>=下一次投递时间 and 重试次数<=最大重试次数,则再次进行投递。
- XxlJob配置
xxl: job: switch: ON admin: ### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册; addresses: http://127.0.0.1:9090/xxl-job-admin executor: ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册 appname: xxl-job-executor-rabbitmq ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"; # ip: ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口; port: 9999 ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径; logpath: data/applogs/xxl-job/jobhandler ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能; logretentiondays: 30 ### 执行器通讯TOKEN [选填]:非空时启用; accessToken:
- Configuration
/** * XXL-JOB配置 * * @author futao * @date 2020/4/1. */ @Setter @Getter @Slf4j @Configuration @ConfigurationProperties(prefix = "xxl.job") public class XxlJobConfig { private final Admin admin = new Admin(); private final Executor executor = new Executor(); @Bean public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) { log.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(xxlJobConfig.getAdmin().getAddresses()); xxlJobSpringExecutor.setAppName(xxlJobConfig.getExecutor().getAppName()); xxlJobSpringExecutor.setIp(xxlJobConfig.getExecutor().getIp()); xxlJobSpringExecutor.setPort(xxlJobConfig.getExecutor().getPort()); xxlJobSpringExecutor.setLogPath(xxlJobConfig.getExecutor().getLogPath()); xxlJobSpringExecutor.setLogRetentionDays(xxlJobConfig.getExecutor().getLogRetentionDays()); return xxlJobSpringExecutor; } @Getter @Setter public static class Admin { private String addresses; } @Getter @Setter public static class Executor { private String appName; private String ip; private int port; private String logPath; private int logRetentionDays; } }
- 定时扫描任务编写
/** * 扫描数据库中需要重新投递的消息并重新投递 * * @author futao * @date 2020/4/1. */ @Slf4j @Component public class MessageReSendJob extends IJobHandler { @Autowired private RabbitTemplate rabbitTemplate; @Resource private MessageMapper messageMapper; @Autowired private MessageReSendJob messageReSendJob; /** * 最大重试次数 */ @Value("${app.rabbitmq.retry.max-retry-times}") private int retryTimes; /** * 重试时间间隔 */ @Value("${app.rabbitmq.retry.retry-interval}") private Duration retryInterval; /** * 批量从数据库中读取的消息 */ private static final int PAGE_SIZE = 100; @XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy") @Override public ReturnT<String> execute(String s) throws Exception { long startTime = System.currentTimeMillis(); log.info("开始扫描需要进行重试投递的消息"); XxlJobLogger.log("开始扫描需要进行重试投递的消息"); service(1); log.info("扫描需要进行重试投递的消息任务结束,耗时[{}]ms", System.currentTimeMillis() - startTime); XxlJobLogger.log("扫描需要进行重试投递的消息任务结束,耗时[{}]ms", System.currentTimeMillis() - startTime); return ReturnT.SUCCESS; } public void service(int pageNum) { IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE), Wrappers.<Message>lambdaQuery() //发送中的消息 .eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus()) //已到达下次发送时间 .le(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8))) ); List<Message> messages = messageIPage.getRecords(); messages.forEach(message -> { if (retryTimes <= message.getRetryTimes()) { //已达到最大投递次数,将消息设置为投递失败 messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId())); } else { messageReSendJob.reSend(message); } }); if (PAGE_SIZE == messages.size()) { service(++pageNum); } } /** * 重新投递消息 * * @param message */ public void reSend(Message message) { messageMapper.update(null, Wrappers.<Message>lambdaUpdate() .set(Message::getRetryTimes, message.getRetryTimes() + 1) .set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval)) .eq(Message::getId, message.getId()) ); try { //再次投递 rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId())); } catch (Exception e) { log.error("消息[{}]投递失败", JSON.toJSONString(message)); } } public void jobHandlerInit() { log.info("before job execute..."); XxlJobLogger.log("before job handler init..."); } public void jobHandlerDestroy() { log.info("after job execute..."); XxlJobLogger.log("after job execute..."); } }
- XxlJob 新增调度任务
四、测试
- 测试接口
/** * @author futao * @date 2020/4/1. */ @RequestMapping("/user") @RestController public class UserController { @Autowired private Sender sender; @RequestMapping("/send") public void send() { sender.send(User .builder() .userName("天文") .birthday(LocalDate.of(1995, 1, 31)) .address("浙江杭州") .build()); } }
- 正常场景:
消息落库,状态为1=发送中
回调
消息设置成投递成功
异常场景
启动生产者服务后停止MQ
发送消息
因为收不到该条消息的ACK。所以一直处于发送中。开启任务调度再次进行投递(投递次数+1,且更新下次投递时间)
当投递次数达到最大投递次数,下一次,将消息设置成投递失败
调度日志
# Next
- 消息可靠消费
- 消费端限流保护
- 死信队列
- 延迟队列