本文承接上文《从0到1 手把手搭建spring cloud alibaba 微服务大型应用框架(十三)rocketmq 篇(1):整体介绍》
路由注册流程与原理
broker 每隔30s会向所有nameserver 发送自己心跳消息和topic 配置消息,
然后消息会被每个nameserver 将心跳信息保留到brokerLiveTable 中 ,
NameServer每隔10s会扫描一次brokerLiveTable(存放心跳包的时间戳信息)
如果在120s内没有收到心跳包,则认为Broker失效,更新topic的路由信息,将失效的Broker信息移除
nameserver 将broker 发送的消息汇总后会形成几个json 对象
主要是QueueData、BrokerData、BrokerLiveInfo 等对象,数据结构如下图:
消息发送流程与原理
消息发送宏观流程图
消息发送宏观流程描述
1.消息发送者向某一个topic发送消息时,需要查询topic的路由信息。
2.初次发送时会根据topic的名称向NameServer集群查询topic的路由信息,然后将其存储在本地内存缓存中,并且每隔30s依次遍历缓存中的topic,向NameServer查询最新的路由信息。
3 如果成功查询到路由信息,会将这些信息更新至本地缓存,实现topic路由信息的动态感知。
4 RocketMQ提供了自动创建主题(topic)的机制,消息发送者向一个不存在的主题发送消息时,向NameServer查询该主题的路由信息会先返回空,
4.1如果未开启自动创建主题机制,直接抛出异常
4.2如果开启了自动创建主题机制,会使用一个默认的主题名再次从NameServer查询路由信息,
然后消息发送者会使用默认主题的路由信息进行负载均衡,
但不会直接使用默认路由信息为新主题创建对应的路由信息。使用默认主题创建路由信息的流程
注意点:生产环境尽量不要开启自动创建主题机制 也就是autoCreateTopicEnable为true
因为这时候发送不存在的topic仅仅会在某cluster其中一台机器产生topic 会导致topic 不同步
消费者在查询topic 时通过nameserver 永远只能查到一个broker,会一直往这个broker发送
消息发送详细流程
发送流程图
发送流程如下:
1.消息长度验证
在消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范。
具体的规范要求是主题名称、消息体不能为空,
消息长度不能等于0且默认不能超过允许发送消息的最大长度4MB(maxMessageSize=1024×1024×4)。
2.查找主题路由信息
在消息发送之前,还需要获取主题的路由信息,
只有获取了这些信息我们才能知道消息具体要发送到哪个Broker节点上
3.选择消息队列
根据路由信息选择消息队列,返回的消息队列按照broker序号进行排序。
举例说明,如果topicA在broker-a、broker-b上分别创建了4个队列,那么返回的消息队列为
[{"brokerName":"broker-a"、"queueId":0}、{"brokerName":"broker-a"、"queueId":1}、{"brokerName":"broker-a"、"queueId":2}、{"brokerName":"broker-a"、"queueId":3}、{"brokerName":"broker-b"、"queueId":0}、{"brokerName":"broker-b"、"queueId":1}、{"brokerName":"broker-b"、"queueId":2}、{"brokerName":"broker-b"、"queueId":3}],
3.1 选择消息队列机制
3.1.1 默认机制
在消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时
lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列的个数取模,返回该位置
的MessageQueue(selectOneMessageQueue()方法,如果消息发送失败,下次进行消息队列选择时规避上次MesageQueue所在的Broker,否则有可能再次失败。
或许有读者会问,Broker不可用后,路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先,NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不是检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,因此消息生产者最快感知Broker最新的路由信息也需要30s。这就需要引入一种机制,在Broker宕机期间,一次消息发送失败后,将该Broker暂时排除在消息队列的选择范围中。
3.1.2 故障延迟机制
首先对上述代码进行解读。
1)轮询获取一个消息队列。
2)验证该消息队列是否可用
3)如果返回的MessageQueue可用,则移除关于该topic的条目,表明该Broker故障已经修复。
4.发送消息
4.1 同步发送
第一步:检查消息发送是否合理,这里完成了以下几件事。
1)检查Broker是否有写权限。
2)检查topic是否可以进行消息发送。主要针对默认主题,默认主题不能发送消息,仅供路由查找。
3)在NameServer端存储主题的配置信息,默认路径为${ROCKET_HOME}/store/ config/ topic.json。下面是主题存储信息。
4)检查队列,如果队列不合法,则返回错误码。
如果消息重试次数超过允许的最大重试次数,消息将进入DLQ死信队列。死信队列主题为%DLQ%+消费组名。
调用DefaultMessageStore
4.2 异步发送
异步发送是指消息生产者调用发送的API后,无须等待消息服务器返回本次消息发送的结果,
只需要提供一个回调函数,供消息发送客户端在收到响应结果后回调。异步发送方式相比于同步发送方式,
虽然消息发送端的发送性能会显著提高,但是为了降低消息服务器的负载压力,
RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue实现,默认为65535。
异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息的发送重试次数,
但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等情况将不会重试
4.3单向发送
单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送的结果,
并且无须提供回调函数,这表示压根就不关心本次消息发送是否成功,
其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做了,并且没有重试机制。