1 消息可靠性
1.1. 思路分析
在昨天的练习作业中,我们改造了余额支付功能,在支付成功后利用RabbitMQ通知交易服务,更新业务订单状态为已支付。但是大家思考一下:如果这里MQ通知失败,支付服务中支付流水显示支付成功,而交易服务中的订单状态却显示未支付,数据出现了不一致。
首先,我们一起分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
● 发送消息时丢失:
○ 生产者发送消息时连接MQ失败
○ 生产者发送消息到达MQ后未找到Exchange
○ 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
○ 消息到达MQ后,处理消息的进程发生异常
● MQ导致消息丢失:
○ 消息到达MQ,保存到队列后,尚未消费就突然宕机
● 消费者处理消息时:
○ 消息接收后尚未处理突然宕机
○ 消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
- 保证生产消息的可靠性
- 确保MQ不会将消息弄丢
- 保证消费消息的可靠性
注意:使用MQ并不是所有场景对消息的可靠性要求都很高,比如上图中,支付成功短信通知的流程对消息可靠性要求就不高,通常都可以保证消息正常到达消费者,即使个别没有成功通知用户也不影响主体业务流程,所以在设计技术方案时一定要根据业务需求具体分析。
1.2 生产消息可靠性
1.2.1. 生产者重试机制
1.2.1.1 配置
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
从课程资料中找到mq-demo-v2.zip,并解压,使用IDEA打开mq-demo-v2工程。
修改publisher模块的application.yaml文件,添加下面的内容:
spring:
rabbitmq:
host: 192.168.101.68 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机【注意这里可能不对】
username: hmall # 用户名【注意这里可能不对】
password: 123 # 密码【注意这里可能不对】
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
datasource:enabled: true # 开启超时重试机制 initial-interval: 1000ms # 失败后的初始等待时间 multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier max-attempts: 3 # 总共尝试次数
url: jdbc:mysql://192.168.101.68:3306/hmall?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: mysql
mybatis-plus:
configuration:
default-enum-type-handler: com.baomidou.mybatisplus.core.handlers.MybatisEnumTypeHandler
global-config:
db-config:
update-strategy: not_null
id-type: auto
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
level:
com.itheima: debug
rabbit-mq:
enable: true
persistence:
enable: true
配置参数解释
● initial-interval: 失败后的初始等待时间
● multiplier: 倍增器,每次重试的等待时间是前一次几倍。
失败后下次等待时长 =上次等待时长 multiplier
● max-attempts: 最大重试次数(包括第一次尝试)
举例:
由于multiplier设置为1,这意味着每次重试之间的间隔是固定的,不会增加。
假设在t=0时刻首次尝试发送消息,如果发送失败,则会按照以下时间点进行重试:
● 第一次尝试(也是首次发送):t=0(假设即时失败)
● 第一次重试:等待1秒后重试,t=1秒(首次失败后等待1秒)
● 第二次重试:等待1秒1=1秒 后重试,t=2秒(从第一次重试再等待1秒)
如果设置如下:
initial-interval:1000ms
multiplier:2
max-attempts: 5
由于multiplier设置为2,这意味着每次重试之间的间隔会翻倍。
假设在t=0时刻首次尝试发送消息,如果发送失败,则会按照以下时间点进行重试:
● 第一次尝试(也是首次发送):t=0(假设即时失败)
● 第一次重试:等待1秒后重试,t=1秒(首次失败后等待1秒)
● 第二次重试:等待12=2秒 后重试,t=3秒
● 第三次重试:等待22=4秒 后重试,t=7秒
● 第四次重试:等待4*2=8秒 后重试,t=15秒
1.2.1.2 测试
我们利用命令停掉RabbitMQ服务:
docker stop mq
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送超时重试机制配置成功!
01-15 17:37:58:252 INFO 18792 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.101.68:5672]
01-15 17:38:00:281 INFO 18792 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.101.68:5672]
01-15 17:38:02:306 INFO 18792 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.101.68:5672]
org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
1.2.2. 生产者确认机制
1.2.2.1 两种机制介绍
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
● MQ内部处理消息的进程发生了异常
● 生产者发送消息到达MQ后未找到Exchange
● 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
针对上述情况,RabbitMQ提供生产者确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
具体如图所示:
生产者确认机制:
1.Publisher Return
消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。
2.Publisher Confirm
消息投递成功返回ack,投递失败返回nack。
注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。
默认两种机制都是关闭状态,需要通过配置文件来开启。
生产者确认机制:确保消息发送到MQ的一种机制,它通过2个函数来确认
(1)【异常】找不到交换机:publisher-confirm:nack
(2)【异常】找到交换机,匹配不到队列:publisher-return:ack
(3)【正常】找到交换机,匹配到队列:publisher-cofirm:ack
1.2.2.2 开启生产者确认
在publisher模块的application.yaml中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
● none:关闭confirm机制
● simple:同步阻塞等待MQ的回执(回调方法)
● correlated:MQ异步回调返回回执
一般我们推荐使用correlated,回调机制。
1.2.2.3 实现方法
ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
内容如下:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
● id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
● SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
我们测试下边的方法,向系统自带的交换机发送消息,并且添加ConfirmCallback:
注意:此代码不用编写直接测试即可,稍后我们会用工具类替代。
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息,故意指定一个错误的rontingKey
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
1.2.2.4 测试
测试报错:
原因:每个RabbitTemplate只支持一个ReturnCallback。
解决:
屏蔽common-rabbitmq依赖,在common-rabbitmq中对RabbitTemplate设置了ReturnCallback
测试步骤:
可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
当我们把交换机名称修改错误则只会收到nack。
1.2.3. 发送失败处理机制
1.2.3.1 失败处理机制
在ConfirmCallback中收到nack表示消息投递失败,ReturnCallback异常表示路由失败
高频面试题:消息投递失败怎么处理/你们怎么保证消费成功
可以将消息记录到失败消息表,由定时任务进行发布,每隔10秒钟(可设置)执行获取失败消息重新发送,发送一次则在失败次数字段加一,达到3次停止自动发送由人工处理(如钉钉告警)。
在commonn-rabbitmq模块中实现了发送消息的工具方法,此方法实现了发送失败处理机制。
ReturnCallback回调逻辑在com.itheima.common.rabbitmq.config.RabbitMqConfiguration中,核心代码如下:
@Configuration
@ConditionalOnProperty(prefix = "rabbit-mq", name = "enable", havingValue = "true")
@Import({RabbitClient.class, FailMsgDaoImpl.class})
@Slf4j
public class RabbitMqConfiguration implements ApplicationContextAware {
/**
* 并发数量
*/
public static final int DEFAULT_CONCURRENT = 10;
@Autowired(required = false)
private FailMsgDao failMsgDao;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//定义returnCallback回调方法
rabbitTemplate.setReturnsCallback(
new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
byte[] body = returnedMessage.getMessage().getBody();
//消息id
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
String content = new String(body, Charset.defaultCharset());
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息id{},消息内容{}",
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
messageId,
content);
if (failMsgDao != null) {
// 失败消息落库(后续定时任务重试,达最大次数人工干预)
failMsgDao.save(messageId, returnedMessage.getExchange(), returnedMessage.getRoutingKey(), content, 0, DateUtils.getCurrentTime()+10, "returnCallback");
}
}
}
);
}
}
ApplicationContextAware 的作用:如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器会调用 setApplicationContext 方法,将 ApplicationContext 传递给该 Bean。
Bean 创建:Spring 容器首先创建 Bean 实例。
属性注入:Spring 容器对 Bean 的属性进行依赖注入。
Aware 接口回调:如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器会调用 setApplicationContext 方法,将 ApplicationContext 传递给该 Bean。
初始化方法:如果 Bean 配置了初始化方法(例如通过 @PostConstruct 注解或 init-method 属性),Spring 容器会调用这些初始化方法。
以上可以详见:Spring源码解析
ConfirmCallback的逻辑在RabbitClient 工具类的sendMsg方法中,通过sendMsg方法发送消息,在发送消息时指定correlationData对象,在correlationData对象中指定了ConfirmCallback回调方法的逻辑,当返回nack会将消息写入失败表,如果消息重发成功会将该记录从失败表删除。
核心代码如下:
@Slf4j
@Service
public class RabbitClient {
....
@Resource
private RabbitTemplate rabbitTemplate;
@Autowired(required = false)
private FailMsgDao failMsgDao;
/**
* 发送消息 重试3次
*
* @param exchange 交换机
* @param routingKey 路由key
* @param msg 消息对象,会将对象序列化成json字符串发出
* @param delay 延迟时间 秒
* @param msgId 消息id
* @param isFailMsg 是否是失败消息
* @return 是否发送成功
*/
@Retryable(value = MqException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5), recover = "saveFailMag")
public void sendMsg(String exchange,
String routingKey,
Object msg,
Integer delay,
String msgId,
boolean isFailMsg) {
// 1.发送消息前准备
// 1.1获取消息内容,如果非字符串将其序列化
String jsonMsg = JsonUtils.toJsonStr(msg);
// 1.2.全局唯一消息id,如果调用者设置了消息id,使用调用者消息id,如果为配置,默认雪花算法生成消息id
if(StrUtil.isBlank(msgId)){
msgId = IdUtil.getSnowflakeNextIdStr();
}
// 1.3.设置默认延迟时间,默认立即发送
delay = NumberUtils.null2Default(delay, -1);
log.debug("消息发送!exchange = {}, routingKey = {}, msg = {}, msgId = {}", exchange, routingKey, jsonMsg, msgId);
// 1.4.构建回调
RabbitMqListenableFutureCallback futureCallback = RabbitMqListenableFutureCallback.builder()
.exchange(exchange)
.routingKey(routingKey)
.msg(jsonMsg)
.msgId(msgId)
.delay(delay)
.isFailMsg(isFailMsg)
.failMsgDao(failMsgDao)
.build();
// 1.5.CorrelationData设置
CorrelationData correlationData = new CorrelationData(msgId.toString());
correlationData.getFuture().addCallback(futureCallback);
// 1.6.构造消息对象
Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))
//持久化
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
//消息id
.setMessageId(msgId.toString())
.build();
try {
// 2.发送消息
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, new DelayMessagePostProcessor(delay), correlationData);
} catch (Exception e) {
log.error("send error:" + e);
// 3.构建异常回调,并抛出异常
MqException mqException = new MqException();
mqException.setMsg(ExceptionUtil.getMessage(e));
mqException.setMqId(msgId);
throw mqException;
}
}
public class RabbitMqListenableFutureCallback implements ListenableFutureCallback<CorrelationData.Confirm> {
//记录失败消息service
private FailMsgDao failMsgDao;
private String exchange;
private String routingKey;
private String msg;
private String msgId;
private Integer delay;
//是否是失败消息
private boolean isFailMsg=false;
@Override
public void onFailure(Throwable ex) {
if(failMsgDao == null) {
return;
}
// 执行失败保存失败信息
failMsgDao.save(msgId, exchange, routingKey, msg, delay, DateUtils.getCurrentTime() + 10, ExceptionUtil.getMessage(ex));
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if(failMsgDao == null){
return;
}
if(!result.isAck()){
// 执行失败保存失败信息,如果已经存在保存信息,如果不在信息信息
failMsgDao.save(msgId, exchange, routingKey, msg, delay,DateUtils.getCurrentTime() + 10, "MQ回复nack");
}else if(isFailMsg && msgId != null){
// 如果发送的是失败消息,当收到ack需要从fail_msg删除该消息
failMsgDao.removeById(msgId);
}
}
}
当发送消息失败会入库到失败消息表。
我们可以启动定时任务去扫描失败消息表的记录,重新发送,当达到最大失败次数后由人工处理。
1.2.3.4 测试
下边测试发送失败入库功能:
- 首先在publisher模块添加common-rabbitmq依赖
com.itheima
common-rabbitmq
0.0.1-SNAPSHOT 屏蔽MqConfig类中设置ReturnCallback的代码
由于RabbitTemplate只能设置一次ReturnCallback,而在common-rabbitmq中设置了ReturnCallback,所以屏蔽MqConfig类中设置ReturnCallback的代码,如下图:在hmall中创建失败消息表
查看虚拟机对应hmall数据库,如果已存在fail_msg表需要删除原表再通过下边的语句新建表。
create table fail_msg
(
id varchar(255) not null comment '消息id'
primary key,
exchange varchar(255) not null comment '交换机',
routing_key varchar(255) not null comment '路由key',
msg text not null comment '消息',
reason varchar(255) not null comment '原因',
delay_msg_execute_time int null comment '延迟消息执行时间',
next_fetch_time int null comment '下次拉取时间',
create_time datetime null comment '创建时间',
update_time datetime null comment '更新时间',
fail_count int default 0 null comment '失败次数'
)
comment '失败消息记录表' charset = utf8mb4;- 在publisher模块进行配置
在启动类中添加@MapperScan("com.itheima.common.rabbitmq.dao.mapper")
在application.yaml添加:直接复制粘贴为一级配置
rabbit-mq:
enable: true
persistence:
enable: true
- 使用RabbitClient工具类发送消息
@Resource
private RabbitClient rabbitClient;
@Test
void testPublisherReturn() {
rabbitClient.sendMsg("hmall.directa", "q", "hello");
}
分别测试ReturnCallback和ConfirmCallback不同情况下失败消息入库。
测试ReturnCallback时注意:单元测试方法运行完就完毕了数据库连接池,而ReturnCallback是回调方法,是在单元测试方法执行完再执行,在ReturnCallback中操作数据库时报没有可用的数据库连接的错误,需要在单元测试方法最后添加休眠代码,保证ReturnCallback执行完成再结束整个单元测试方法。
@Test
void testPublisherReturn() throws InterruptedException {
rabbitClient.sendMsg("hmall.directa", "q", "hello");
// 增加一点时间,确保ReturnCallback执行完成
Thread.sleep(5000);
}
1.2.4 失败消息定时任务重发(作业)
我们完成了失败消息的存储,请使用任意一个你熟悉的定时任务完成消息的重发
思路大致如下:
- 选定一个定时任务框架,如:SpringTask、Quartz、xxljob等
- 编写实现代码
a. 扫描表:fail_msg
b. 扫描条件:where fail_count < 3
c. 调用rabbitClient重发消息
d. 发送完成后,删除表:fail_msg中的数据 参考实现思路:首先问AI,然后就可以得到详尽的步骤
大致代码路径如下:这里我就不再写查找数据库那部分代码了,较为简单
1.2.5 小结
如何保证生产消息可靠性?
首先在发送消息时可以开启重试机制,避免因为短暂的网络问题导致发送消息失败。
RabbitMQ还提供生产者确认机制保证发送消息到MQ的可靠性。
生产者确认机制包括两种:
1.Publisher Return
消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。
2.Publisher Confirm
消息投递成功返回ack,投递失败返回nack。
注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。
我们在发送消息时给每个消息指定一个唯一ID,设置回调方法,如果Publisher Return失败或Publisher Confirm返回nack,我们在回调方法中解析失败消息,并记录到失败表由定时任务去异步重新发送。
1.3.消息持久化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置持久化,包括:
● 交换机持久化
● 队列持久化
● 消息持久化
我们以控制台界面为例来说明。
1.3.1 交换机持久化
交换机持久化是指将交换机的定义信息(元数据)持久化到RabbitMQ的数据库(mnesia)中,RabbitMQ重启后交换机定义仍然存在。
若交换机不设置持久化,在rabbitmq服务重启之后,相关的交换机元数据会丢失,但消息不会丢失,只是不能将消息发送到这个交换机中,所以通常要设置交换机持久化。
在控制台中声明交换机是默认设置持久化的。
设置方法:
在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:
设置为Durable就是持久化模式,Transient就是临时模式。
设置持久化后在交换机列表会有一个"D"标识
1.3.2 队列持久化
队列持久化也是将队列的定义信息(元数据)持久化到RabbitMQ的数据库中。
如果队列不设置持久化,在RabbitMQ重启后队列的元数据丢失。
设置队列持久化可以保证队列本身的元数据不会因异常情况而丢失,队列中存储的是消息的在队列中的位置、消息的ID、存储位置等,消息会存储在独立的rdq数据文件中,队列持久化不能保证消息数据不会丢失。
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:
除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。
设置持久化后在队列列表会有一个"D"标识