消息投递时 可能发生丢失的场景:
- 生产者------msg------> MQ 。可开启消息投递结果回调,确保每条消息都收到了回调。
- MQ。将Queue与消息设置成可持久化,搭建镜像集群队列。
- MQ-------callback---->生产者。回调时失败,某条消息在一段时间内未收到回调,则默认投递失败,生产者需要再次投递该消息到MQ。(该场景下会导致同一条消息被重复投递,消费者端需要自行保证消息幂等消费)
一、实现思路
使用技术:
- SpringBoot
- RabbitMQ
- Mysql
- MybatisPlus
- XxlJob
二、准备,框架搭建
数据库Entity:
- Message
/**
* 消息发送历史
*
* @author futao
* @date 2020/3/31.
*/
@Getter
@Setter
@Builder
@TableName("message")
public class Message extends IdTimeEntity {
@Tolerate
public Message() {
}
/**
* 消息承载的业务数据
*/
@TableField("msg_data")
private String msgData;
/**
* 交换机名称
*/
@TableField("exchange_name")
private String exchangeName;
/**
* 路由键
*/
@TableField("routing_key")
private String routingKey;
/**
* 消息状态
*
* @see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum
*/
@TableField("status")
private int status;
/**
* 重试次数
*/
@TableField("retry_times")
private int retryTimes;
/**
* 下一次重试时间
*/
@TableField("next_retry_date_time")
private LocalDateTime nextRetryDateTime;
}
- 消息状态枚举
/**
* 消息状态枚举
*
* @author futao
* @date 2020/3/31.
*/
@Getter
@AllArgsConstructor
public enum MessageStatusEnum {
/**
* 1=发送中
*/
SENDING(1, "发送中"),
/**
* 2=发送失败
*/
SUCCESS(2, "发送成功"),
/**
* 3=发送失败
*/
FAIL(3, "发送失败");
private int status;
private String description;
}
- RabbitMQ配置
spring:
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: reliable-delivery
connection-timeout: 15000
# 发送确认
publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
mandatory: true
app:
rabbitmq:
retry:
# 消息最大重试次数
max-retry-times: 5
# 每次重试时间间隔
retry-interval: 5s
# 队列定义
queue:
user: user-queue
# 交换机定义
exchange:
user: user-exchange
三、编码
- 队列交换机定义与绑定
/**
* RabbitMQ队列定义与绑定
*
* @author futao
* @date 2020/3/31.
*/
@Configuration
public class Declare {
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) {
return QueueBuilder
.durable(userQueueName)
//.withArgument("x-max-length", 2)
.build();
}
@Bean
public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
return ExchangeBuilder
.topicExchange(userExchangeName)
.durable(true)
.build();
}
@Bean
public Binding userBinding(Queue userQueue, Exchange userExchange) {
return BindingBuilder
.bind(userQueue)
.to(userExchange)
.with("user.*")
.noargs();
}
}
- 对RabbitTemplate进行增强,设置
confirmCallback()
消息投递回调方法与returnCallback()
消息路由失败回调方法
/**
* Bean增强
* 【严重警告】: 不可在该类中注入Bean,被注入的Bean不会被BeanPostProcessor增强,造成误伤。
* 必须通过容器来获取需要注入的Bean
*
* @author futao
* @date 2020/3/20.
*/
@Slf4j
@Component
public class BeanEnhance implements BeanPostProcessor {
// @Resource
// private MessageMapper messageMapper;
/**
* 消息的最大重试次数
*/
@Value("${app.rabbitmq.retry.max-retry-times}")
private int maxRetryTimes;
/**
* 每次重试时间间隔
*/
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
// @Autowired
// private RabbitTemplate rabbitTemplate;
//
// @Autowired
// private BeanEnhance enhance;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//增强RabbitTemplate
if (RabbitTemplate.class.equals(bean.getClass())) {
//消息投递成功与否的监听,可以用来保证消息100%投递到rabbitMQ。(如果某条消息(通过id判定)在一定时间内未收到该回调,则重发该消息)
//需要设置 publisher-confirms: true
((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> {
String correlationDataId = correlationData.getId();
if (ack) {
//ACK
log.debug("消息[{}]投递成功,将DB中的消息状态设置为投递成功", correlationDataId);
ApplicationContextHolder.getBean(MessageMapper.class).update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus())
.eq(Message::getId, correlationDataId)
);
} else {
log.debug("消息[{}]投递失败,cause:{}", correlationDataId, cause);
//NACK,消息重发
ApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId);
}
});
//消息路由失败的回调--需要设置 publisher-returns: true 并且 template: mandatory: true 否则rabbit将丢弃该条消息
((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.warn("消息路由失败回调...做一些补偿或者记录.............................................");
log.warn("message{}", message);
log.warn("replyCode{}", replyCode);
log.warn("replyText{}", replyText);
log.warn("exchange{}", exchange);
log.warn("routingKey{}", routingKey);
});
}
return bean;
}
/**
* NACK时进行消息重发
*
* @param correlationDataId
*/
@Transactional(rollbackFor = Exception.class)
public void reSend(String correlationDataId) {
Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId);
if (message.getRetryTimes() < maxRetryTimes) {
//进行重试
ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
//更新DB消息状态
ApplicationContextHolder.getBean(MessageMapper.class).update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
.set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.set(Message::getRetryTimes, message.getRetryTimes() + 1)
.eq(Message::getId, correlationDataId)
);
}
}
}
- 生产者:
/**
* @author futao
* @date 2020/3/31.
*/
@Component
public class Sender {
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MessageMapper messageMapper;
@Value("${app.rabbitmq.exchange.user}")
private String userExchangeName;
public void send(User user) {
Message message = Message.builder()
.msgData(JSON.toJSONString(user))
.exchangeName(userExchangeName)
.routingKey("user.abc")
.status(MessageStatusEnum.SENDING.getStatus())
//下次重试时间
.nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.retryTimes(0)
.build();
//消息落库
messageMapper.insert(
message
);
CorrelationData correlationData = new CorrelationData(message.getId());
//消息投递到MQ
rabbitTemplate.convertAndSend(userExchangeName, "user.abc", JSON.toJSONString(user), correlationData);
}
}
定时任务扫描DB中的消息状态,如果存在发送中的消息,且当前时间>=下一次投递时间 and 重试次数<=最大重试次数,则再次进行投递。
- XxlJob配置
xxl:
job:
switch: ON
admin:
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
addresses: http://127.0.0.1:9090/xxl-job-admin
executor:
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-rabbitmq
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
# ip:
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
### 执行器通讯TOKEN [选填]:非空时启用;
accessToken:
- Configuration
/**
* XXL-JOB配置
*
* @author futao
* @date 2020/4/1.
*/
@Setter
@Getter
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobConfig {
private final Admin admin = new Admin();
private final Executor executor = new Executor();
@Bean
public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(xxlJobConfig.getAdmin().getAddresses());
xxlJobSpringExecutor.setAppName(xxlJobConfig.getExecutor().getAppName());
xxlJobSpringExecutor.setIp(xxlJobConfig.getExecutor().getIp());
xxlJobSpringExecutor.setPort(xxlJobConfig.getExecutor().getPort());
xxlJobSpringExecutor.setLogPath(xxlJobConfig.getExecutor().getLogPath());
xxlJobSpringExecutor.setLogRetentionDays(xxlJobConfig.getExecutor().getLogRetentionDays());
return xxlJobSpringExecutor;
}
@Getter
@Setter
public static class Admin {
private String addresses;
}
@Getter
@Setter
public static class Executor {
private String appName;
private String ip;
private int port;
private String logPath;
private int logRetentionDays;
}
}
- 定时扫描任务编写
/**
* 扫描数据库中需要重新投递的消息并重新投递
*
* @author futao
* @date 2020/4/1.
*/
@Slf4j
@Component
public class MessageReSendJob extends IJobHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MessageMapper messageMapper;
@Autowired
private MessageReSendJob messageReSendJob;
/**
* 最大重试次数
*/
@Value("${app.rabbitmq.retry.max-retry-times}")
private int retryTimes;
/**
* 重试时间间隔
*/
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
/**
* 批量从数据库中读取的消息
*/
private static final int PAGE_SIZE = 100;
@XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy")
@Override
public ReturnT<String> execute(String s) throws Exception {
long startTime = System.currentTimeMillis();
log.info("开始扫描需要进行重试投递的消息");
XxlJobLogger.log("开始扫描需要进行重试投递的消息");
service(1);
log.info("扫描需要进行重试投递的消息任务结束,耗时[{}]ms", System.currentTimeMillis() - startTime);
XxlJobLogger.log("扫描需要进行重试投递的消息任务结束,耗时[{}]ms", System.currentTimeMillis() - startTime);
return ReturnT.SUCCESS;
}
public void service(int pageNum) {
IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE),
Wrappers.<Message>lambdaQuery()
//发送中的消息
.eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
//已到达下次发送时间
.le(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))
);
List<Message> messages = messageIPage.getRecords();
messages.forEach(message -> {
if (retryTimes <= message.getRetryTimes()) {
//已达到最大投递次数,将消息设置为投递失败
messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId()));
} else {
messageReSendJob.reSend(message);
}
});
if (PAGE_SIZE == messages.size()) {
service(++pageNum);
}
}
/**
* 重新投递消息
*
* @param message
*/
public void reSend(Message message) {
messageMapper.update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getRetryTimes, message.getRetryTimes() + 1)
.set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.eq(Message::getId, message.getId())
);
try {
//再次投递
rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId()));
} catch (Exception e) {
log.error("消息[{}]投递失败", JSON.toJSONString(message));
}
}
public void jobHandlerInit() {
log.info("before job execute...");
XxlJobLogger.log("before job handler init...");
}
public void jobHandlerDestroy() {
log.info("after job execute...");
XxlJobLogger.log("after job execute...");
}
}
- XxlJob 新增调度任务
四、测试
- 测试接口
/**
* @author futao
* @date 2020/4/1.
*/
@RequestMapping("/user")
@RestController
public class UserController {
@Autowired
private Sender sender;
@RequestMapping("/send")
public void send() {
sender.send(User
.builder()
.userName("天文")
.birthday(LocalDate.of(1995, 1, 31))
.address("浙江杭州")
.build());
}
}
- 正常场景:
消息落库,状态为1=发送中
回调
消息设置成投递成功
- 异常场景
启动生产者服务后停止MQ
发送消息
因为收不到该条消息的ACK。所以一直处于发送中。开启任务调度再次进行投递(投递次数+1,且更新下次投递时间)
当投递次数达到最大投递次数,下一次,将消息设置成投递失败
调度日志
Next
- 消息可靠消费
- 消费端限流保护
- 死信队列
- 延迟队列