从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(2):路由注册,消息发送核心流程原理

简介: 从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(2):路由注册,消息发送核心流程原理

本文承接上文《从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(1):整体介绍》6be29068e09c41ec9acfb6ddc5eb4ef1.png

路由注册流程与原理

broker 每隔30s会向所有nameserver 发送自己心跳消息和topic 配置消息,

然后消息会被每个nameserver 将心跳信息保留到brokerLiveTable 中 ,

NameServer每隔10s会扫描一次brokerLiveTable(存放心跳包的时间戳信息)

如果在120s内没有收到心跳包,则认为Broker失效,更新topic的路由信息,将失效的Broker信息移除

nameserver 将broker 发送的消息汇总后会形成几个json 对象

主要是QueueData、BrokerData、BrokerLiveInfo 等对象,数据结构如下图:



cf609b2cd5b14a2d9782e28311d1f7c7.png


832fee3db9494d51af0a51459c839efb.png

消息发送流程与原理

消息发送宏观流程图

4b46344f4a624c7a9b2d52d86b8c6d80.png

消息发送宏观流程描述


1.消息发送者向某一个topic发送消息时,需要查询topic的路由信息。

2.初次发送时会根据topic的名称向NameServer集群查询topic的路由信息,然后将其存储在本地内存缓存中,并且每隔30s依次遍历缓存中的topic,向NameServer查询最新的路由信息。

3 如果成功查询到路由信息,会将这些信息更新至本地缓存,实现topic路由信息的动态感知。

4 RocketMQ提供了自动创建主题(topic)的机制,消息发送者向一个不存在的主题发送消息时,向NameServer查询该主题的路由信息会先返回空,

4.1如果未开启自动创建主题机制,直接抛出异常

4.2如果开启了自动创建主题机制,会使用一个默认的主题名再次从NameServer查询路由信息,

然后消息发送者会使用默认主题的路由信息进行负载均衡,

但不会直接使用默认路由信息为新主题创建对应的路由信息。使用默认主题创建路由信息的流程

注意点:生产环境尽量不要开启自动创建主题机制 也就是autoCreateTopicEnable为true

因为这时候发送不存在的topic仅仅会在某cluster其中一台机器产生topic 会导致topic 不同步

消费者在查询topic 时通过nameserver 永远只能查到一个broker,会一直往这个broker发送

消息发送详细流程

发送流程图bbcd1998e7c54827ac0e90aec14b6ed9.png

发送流程如下:

1.消息长度验证

在消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范。

具体的规范要求是主题名称、消息体不能为空,

消息长度不能等于0且默认不能超过允许发送消息的最大长度4MB(maxMessageSize=1024×1024×4)。

2.查找主题路由信息

在消息发送之前,还需要获取主题的路由信息,

只有获取了这些信息我们才能知道消息具体要发送到哪个Broker节点上

3.选择消息队列

根据路由信息选择消息队列,返回的消息队列按照broker序号进行排序。

举例说明,如果topicA在broker-a、broker-b上分别创建了4个队列,那么返回的消息队列为

[{"brokerName":"broker-a"、"queueId":0}、{"brokerName":"broker-a"、"queueId":1}、{"brokerName":"broker-a"、"queueId":2}、{"brokerName":"broker-a"、"queueId":3}、{"brokerName":"broker-b"、"queueId":0}、{"brokerName":"broker-b"、"queueId":1}、{"brokerName":"broker-b"、"queueId":2}、{"brokerName":"broker-b"、"queueId":3}],

3.1 选择消息队列机制

3.1.1 默认机制

在消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时

lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列的个数取模,返回该位置

的MessageQueue(selectOneMessageQueue()方法,如果消息发送失败,下次进行消息队列选择时规避上次MesageQueue所在的Broker,否则有可能再次失败。

或许有读者会问,Broker不可用后,路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先,NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不是检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,因此消息生产者最快感知Broker最新的路由信息也需要30s。这就需要引入一种机制,在Broker宕机期间,一次消息发送失败后,将该Broker暂时排除在消息队列的选择范围中。

3.1.2 故障延迟机制

首先对上述代码进行解读。

1)轮询获取一个消息队列。

2)验证该消息队列是否可用

3)如果返回的MessageQueue可用,则移除关于该topic的条目,表明该Broker故障已经修复。

4.发送消息

4.1 同步发送

第一步:检查消息发送是否合理,这里完成了以下几件事。

1)检查Broker是否有写权限。

2)检查topic是否可以进行消息发送。主要针对默认主题,默认主题不能发送消息,仅供路由查找。

3)在NameServer端存储主题的配置信息,默认路径为${ROCKET_HOME}/store/ config/ topic.json。下面是主题存储信息。

4)检查队列,如果队列不合法,则返回错误码。

如果消息重试次数超过允许的最大重试次数,消息将进入DLQ死信队列。死信队列主题为%DLQ%+消费组名。

调用DefaultMessageStore

4.2 异步发送

异步发送是指消息生产者调用发送的API后,无须等待消息服务器返回本次消息发送的结果,

只需要提供一个回调函数,供消息发送客户端在收到响应结果后回调。异步发送方式相比于同步发送方式,

虽然消息发送端的发送性能会显著提高,但是为了降低消息服务器的负载压力,

RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue实现,默认为65535。

异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息的发送重试次数,

但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等情况将不会重试

4.3单向发送

单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送的结果,

并且无须提供回调函数,这表示压根就不关心本次消息发送是否成功,

其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做了,并且没有重试机制。





相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
安全 Java Ruby
我尝试了所有后端框架 — — 这就是为什么只有 Spring Boot 幸存下来
作者回顾后端开发历程,指出多数框架在生产环境中难堪重负。相比之下,Spring Boot凭借内置安全、稳定扩展、完善生态和企业级支持,成为构建高可用系统的首选,真正经受住了时间与规模的考验。
542 2
|
8月前
|
XML JSON Java
Spring框架中常见注解的使用规则与最佳实践
本文介绍了Spring框架中常见注解的使用规则与最佳实践,重点对比了URL参数与表单参数的区别,并详细说明了@RequestParam、@PathVariable、@RequestBody等注解的应用场景。同时通过表格和案例分析,帮助开发者正确选择参数绑定方式,避免常见误区,提升代码的可读性与安全性。
|
6月前
|
安全 前端开发 Java
《深入理解Spring》:现代Java开发的核心框架
Spring自2003年诞生以来,已成为Java企业级开发的基石,凭借IoC、AOP、声明式编程等核心特性,极大简化了开发复杂度。本系列将深入解析Spring框架核心原理及Spring Boot、Cloud、Security等生态组件,助力开发者构建高效、可扩展的应用体系。(238字)
|
6月前
|
消息中间件 缓存 Java
Spring框架优化:提高Java应用的性能与适应性
以上方法均旨在综合考虑Java Spring 应该程序设计原则, 数据库交互, 编码实践和系统架构布局等多角度因素, 旨在达到高效稳定运转目标同时也易于未来扩展.
458 8
|
7月前
|
监控 Kubernetes Cloud Native
Spring Batch 批处理框架技术详解与实践指南
本文档全面介绍 Spring Batch 批处理框架的核心架构、关键组件和实际应用场景。作为 Spring 生态系统中专门处理大规模数据批处理的框架,Spring Batch 为企业级批处理作业提供了可靠的解决方案。本文将深入探讨其作业流程、组件模型、错误处理机制、性能优化策略以及与现代云原生环境的集成方式,帮助开发者构建高效、稳定的批处理系统。
729 1
|
9月前
|
安全 Java 微服务
Java 最新技术和框架实操:涵盖 JDK 21 新特性与 Spring Security 6.x 安全框架搭建
本文系统整理了Java最新技术与主流框架实操内容,涵盖Java 17+新特性(如模式匹配、文本块、记录类)、Spring Boot 3微服务开发、响应式编程(WebFlux)、容器化部署(Docker+K8s)、测试与CI/CD实践,附完整代码示例和学习资源推荐,助你构建现代Java全栈开发能力。
901 1
|
8月前
|
Cloud Native Java API
Java Spring框架技术栈选和最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡
Java Spring框架技术栈选和最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡
1475 0
|
9月前
|
缓存 安全 Java
第五章 Spring框架
Spring IOC(控制反转)通过工厂模式管理对象的创建与生命周期,DI(依赖注入)则让容器自动注入所需对象,降低耦合。常见注解如@Component、@Service用于声明Bean,@Autowired用于注入。Bean默认单例,作用域可通过@Scope配置,如prototype、request等。Spring通过三级缓存解决循环依赖问题,但构造函数循环依赖需用@Lazy延迟加载。AOP通过动态代理实现,用于日志、事务等公共逻辑。事务通过@Transactional实现,需注意异常处理及传播行为。
140 0
|
9月前
|
缓存 安全 Java
Spring 框架核心原理与实践解析
本文详解 Spring 框架核心知识,包括 IOC(容器管理对象)与 DI(容器注入依赖),以及通过注解(如 @Service、@Autowired)声明 Bean 和注入依赖的方式。阐述了 Bean 的线程安全(默认单例可能有安全问题,需业务避免共享状态或设为 prototype)、作用域(@Scope 注解,常用 singleton、prototype 等)及完整生命周期(实例化、依赖注入、初始化、销毁等步骤)。 解析了循环依赖的解决机制(三级缓存)、AOP 的概念(公共逻辑抽为切面)、底层动态代理(JDK 与 Cglib 的区别)及项目应用(如日志记录)。介绍了事务的实现(基于 AOP
335 0
|
9月前
|
存储 缓存 NoSQL
Spring Cache缓存框架
Spring Cache是Spring体系下的标准化缓存框架,支持多种缓存(如Redis、EhCache、Caffeine),可独立或组合使用。其优势包括平滑迁移、注解与编程两种使用方式,以及高度解耦和灵活管理。通过动态代理实现缓存操作,适用于不同业务场景。
684 0