异步消息组件MQ高级

简介: 本课程聚焦MQ消息可靠性,涵盖生产者重试与确认机制、消费者确认、消息持久化、幂等性及延迟消息等核心方案,重点讲解如何通过Confirm/Return回调、失败消息表、定时重发等手段保障消息不丢失,实现订单超时自动取消等功能,确保分布式系统数据最终一致。

学习目标
能够测试生产者重试机制
能够测试生产者确认机制
能够说出生产者确认机制的两种方式
能够说出发送失败处理机制
能够说出消息持久化机制
能够测试消费者确认机制
能够说出消费失败重试机制
能够说出MQ消息幂等性方案
能够说出延迟消息方案
能够实现自动取消超时未支付订单功能
能够说出保证消息的可靠性的完整方案
1 消息可靠性
1.1. 思路分析
在昨天的练习作业中,我们改造了余额支付功能,在支付成功后利用RabbitMQ通知交易服务,更新业务订单状态为已支付。但是大家思考一下:如果这里MQ通知失败,支付服务中支付流水显示支付成功,而交易服务中的订单状态却显示未支付,数据出现了不一致。
首先,我们一起分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
发送消息时丢失:
生产者发送消息时连接MQ失败
生产者发送消息到达MQ后未找到Exchange
生产者发送消息到达MQ的Exchange后,未找到合适的Queue
消息到达MQ后,处理消息的进程发生异常
MQ导致消息丢失:
消息到达MQ,保存到队列后,尚未消费就突然宕机
消费者处理消息时:
消息接收后尚未处理突然宕机
消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
保证生产消息的可靠性
确保MQ不会将消息弄丢
保证消费消息的可靠性
注意:使用MQ并不是所有场景对消息的可靠性要求都很高,比如上图中,支付成功短信通知的流程对消息可靠性要求就不高,通常都可以保证消息正常到达消费者,即使个别没有成功通知用户也不影响主体业务流程,所以在设计技术方案时一定要根据业务需求具体分析。
1.2 生产消息可靠性
1.2.1. 生产者重试机制
1.2.1.1 配置
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
从课程资料中找到mq-demo-v2.zip,并解压,使用IDEA打开mq-demo-v2工程。
修改publisher模块的application.yaml文件,添加下面的内容:
配置参数解释
initial-interval: 失败后的初始等待时间
multiplier: 倍增器,每次重试的等待时间是前一次几倍。
失败后下次等待时长 =上次等待时长 multiplier
max-attempts: 最大重试次数(包括第一次尝试)
举例:
由于multiplier设置为1,这意味着每次重试之间的间隔是固定的,不会增加。
假设在t=0时刻首次尝试发送消息,如果发送失败,则会按照以下时间点进行重试:
第一次尝试(也是首次发送):t=0(假设即时失败)
第一次重试:等待1秒后重试,t=1秒(首次失败后等待1秒)
第二次重试:等待1秒
1=1秒 后重试,t=2秒(从第一次重试再等待1秒)
如果设置如下:
由于multiplier设置为2,这意味着每次重试之间的间隔会翻倍。
假设在t=0时刻首次尝试发送消息,如果发送失败,则会按照以下时间点进行重试:
第一次尝试(也是首次发送):t=0(假设即时失败)
第一次重试:等待1秒后重试,t=1秒(首次失败后等待1秒)
第二次重试:等待12=2秒 后重试,t=3秒
第三次重试:等待2
2=4秒 后重试,t=7秒
第四次重试:等待4*2=8秒 后重试,t=15秒
1.2.1.2 测试
我们利用命令停掉RabbitMQ服务:
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送超时重试机制配置成功!
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
1.2.2. 生产者确认机制
1.2.2.1 两种机制介绍
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
MQ内部处理消息的进程发生了异常
生产者发送消息到达MQ后未找到Exchange
生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
针对上述情况,RabbitMQ提供生产者确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
具体如图所示:
生产者确认机制:
1.Publisher Return
消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。
2.Publisher Confirm
消息投递成功返回ack,投递失败返回nack。
注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。
默认两种机制都是关闭状态,需要通过配置文件来开启。
生产者确认机制:确保消息发送到MQ的一种机制,它通过2个函数来确认
(1)【异常】找不到交换机:publisher-confirm:nack
(2)【异常】找到交换机,匹配不到队列:publisher-return:ack
(3)【正常】找到交换机,匹配到队列:publisher-cofirm:ack
1.2.2.2 开启生产者确认
在publisher模块的application.yaml中添加配置:
这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制
simple:同步阻塞等待MQ的回执(回调方法)
correlated:MQ异步回调返回回执
一般我们推荐使用correlated,回调机制。
1.2.2.3 实现方法
ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
内容如下:
ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
我们测试下边的方法,向系统自带的交换机发送消息,并且添加ConfirmCallback:
注意:此代码不用编写直接测试即可,稍后我们会用工具类替代。
1.2.2.4 测试
测试报错:
原因:每个RabbitTemplate只支持一个ReturnCallback。
解决:
屏蔽common-rabbitmq依赖,在common-rabbitmq中对RabbitTemplate设置了ReturnCallback
测试步骤:
可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
当我们把交换机名称修改错误则只会收到nack。
1.2.3. 发送失败处理机制
1.2.3.1 失败处理机制
在ConfirmCallback中收到nack表示消息投递失败,ReturnCallback异常表示路由失败
高频面试题:消息投递失败怎么处理/你们怎么保证消费成功
可以将消息记录到失败消息表,由定时任务进行发布,每隔10秒钟(可设置)执行获取失败消息重新发送,发送一次则在失败次数字段加一,达到3次停止自动发送由人工处理(如钉钉告警)。
在commonn-rabbitmq模块中实现了发送消息的工具方法,此方法实现了发送失败处理机制。
ReturnCallback回调逻辑在com.itheima.common.rabbitmq.config.RabbitMqConfiguration中,核心代码如下:
ApplicationContextAware 的作用:如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器会调用 setApplicationContext 方法,将 ApplicationContext 传递给该 Bean。
Bean 创建:Spring 容器首先创建 Bean 实例。
属性注入:Spring 容器对 Bean 的属性进行依赖注入。
Aware 接口回调:如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器会调用 setApplicationContext 方法,将 ApplicationContext 传递给该 Bean。
初始化方法:如果 Bean 配置了初始化方法(例如通过 @PostConstruct 注解或 init-method 属性),Spring 容器会调用这些初始化方法。
以上可以详见:Spring源码解析
ConfirmCallback的逻辑在RabbitClient 工具类的sendMsg方法中,通过sendMsg方法发送消息,在发送消息时指定correlationData对象,在correlationData对象中指定了ConfirmCallback回调方法的逻辑,当返回nack会将消息写入失败表,如果消息重发成功会将该记录从失败表删除。
核心代码如下:
当发送消息失败会入库到失败消息表。
我们可以启动定时任务去扫描失败消息表的记录,重新发送,当达到最大失败次数后由人工处理。
1.2.3.4 测试
下边测试发送失败入库功能:
首先在publisher模块添加common-rabbitmq依赖
屏蔽MqConfig类中设置ReturnCallback的代码
由于RabbitTemplate只能设置一次ReturnCallback,而在common-rabbitmq中设置了ReturnCallback,所以屏蔽MqConfig类中设置ReturnCallback的代码,如下图:
在hmall中创建失败消息表
查看虚拟机对应hmall数据库,如果已存在fail_msg表需要删除原表再通过下边的语句新建表。
SQL
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table fail_msg
(
id varchar(255) not null comment '消息id'
primary key,
exchange varchar(255) not null comment '交换机',
routing_key varchar(255) not null comment '路由key',
msg text not null comment '消息',
reason varchar(255) not null comment '原因',
delay_msg_execute_time int null comment '延迟消息执行时间',
next_fetch_time int null comment '下次拉取时间',
create_time datetime null comment '创建时间',
update_time datetime null comment '更新时间',
fail_count int default 0 null comment '失败次数'
)
comment '失败消息记录表' charset = utf8mb

相关文章
|
JavaScript Java Linux
Go语言 thrift 入门指南--thrift IDL介绍
Thrift 是一个轻量级、跨语言的 RPC 框架,由 facebook 开发,2007年正式开源,2008 纳入 Apache 软件基金会开源项目。
2579 0
Go语言 thrift 入门指南--thrift IDL介绍
|
5月前
|
存储 人工智能 算法
告别模糊检索:深度拆解向量数据库,手把手教你选对AI底座
本文深入解析向量数据库在大模型时代的关键作用,揭示其作为AI“外挂大脑”的原理与价值。从技术原理、选型维度到RAG全链路实践,结合Elasticsearch与LLaMA-Factory等工具,手把手教你构建专属AI系统,助力迈向场景化智能。
359 1
|
2月前
|
弹性计算 安全 Linux
阿里云ECS云服务器秒级部署OpenClaw教程:千问Qwen3.6-Plus接入+本地多系统适配+避坑指南
2026年,OpenClaw(原Clawdbot)已成为AI自动化代理领域的标杆级开源框架,凭借数据可控、跨平台兼容、大模型生态完善的核心优势,成为个人与团队搭建专属智能助手的首选方案。阿里云ECS云服务器提供OpenClaw官方预装镜像,实现秒级创建、一键启动、7×24小时稳定运行,完美适配长期在线场景;同时支持本地MacOS、Linux、Windows11全平台部署,满足隐私与离线需求。本文全程提供可直接复制的代码命令,从阿里云ECS秒级部署、本地多系统安装、千问Qwen3.6-Plus API配置,到新手高频问题避坑指南,形成完整闭环,零基础用户也能快速完成全流程搭建,无需任何Linux
1656 7
|
2月前
|
中间件 测试技术 API
值得收藏,一些好用的Claude Code提示词!
值得收藏,一些好用的Claude Code提示词!
|
存储 人工智能 Rust
1句话给你的 OpenClaw 装上"长期记忆" — 基于 PolarDB-X 的 mem0 记忆方案
PolarDB Mem0是一款为AI Agent构建专属长效记忆的托管服务,100%兼容开源的Mem0系统。通过在多次交互中高效地抽取、存储与调用记忆,赋予Agent持续学习与积累认知的能力,帮助您打造真正智能和个性化的AI应用。
|
4月前
|
人工智能 监控 机器人
股票终极赚钱尽头是AI Agent?阿里云1分钟集成OpenClaw Skills自动选股系统,让AI成为你的私人股票分析师
对于忙碌的上班族而言,研究个股、跟踪行情、筛选潜力股是件耗时耗力的事。过去,量化分析、实时行情监测是机构专属的“特权”,而现在借助OpenClaw的自动化能力+实时金融数据API,普通个人也能在10分钟内搭建专属AI选股系统——每日自动筛选5支潜在涨幅超5%的潜力股,生成专业盘前报告,还能通过QQ随时随地查询行情,无需复杂代码与量化背景,让AI成为你的私人股票分析师。
2237 3
|
6月前
|
JSON Java fastjson
Spring Boot返回Json数据及数据封装
Spring Boot默认使用Jackson处理JSON,通过@RestController可直接返回JSON数据。本文详解Jackson与FastJson的配置与对比,并封装统一的JSON返回结构,提升前后端交互规范性与开发效率。
|
6月前
|
Java Spring
Spring Boot开发环境搭建和项目启动
图片展示了一幅色彩斑斓的抽象艺术画作,流动的线条与绚丽的色块交织,营造出梦幻般的视觉效果,仿佛宇宙星河或意识流的具象化表达,引人遐想。
|
机器学习/深度学习 人工智能
Diffusion-DPO:一种基于直接偏好优化的扩散模型对齐新方法
本文介绍了一种名为 Diffusion-DPO 的创新方法,该方法基于直接偏好优化(DPO)原理,简化了扩散模型与人类偏好的对齐过程。相比传统的基于人类反馈的强化学习(RLHF)方法,Diffusion-DPO 避免了显式奖励模型的训练,通过数学近似简化实现流程,并在处理开放词汇表场景时展现出更强的能力。实验结果表明,该方法在 Stable Diffusion 1.5 和 SDXL-1.0 等主流模型上显著提升了生成图像的质量和可控性,为未来扩散模型的发展提供了新的思路。
1433 14
Diffusion-DPO:一种基于直接偏好优化的扩散模型对齐新方法
|
Go 开发者
GVM:Go语言版本和包管理的神器!
GVM,Go版本管理器,简化了在单机上切换不同Go版本的任务。
933 0

热门文章

最新文章