消息发送2-消息路由查找|学习笔记

简介: 快速学习消息发送2-消息路由查找

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息发送2-消息路由查找】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12472


消息发送2-消息路由查找


查找路由

查找路由的目的是要知道当前的消息发给哪个broker,找到broker之后选择队列发送消息。

DefaultMQProducerlmpl中消息发送的全部业务逻辑:

private sendResult sendbefaultimpk

Message msg,

final communicationtode communicationMode,

final Sendcal1back sendcal1back,

final long timeout

)throws wQclientException,RemotingException,MQBrokerException,InterruptedException {

this.makesurestateOK(;

validators.checkMessage(msg, this.dofaultNQProducer);

final long invokeID = random.nextLong(;

long beginTimestampFirst - system.currentTimeMillis();

long beginTimestampPrev - beginTimestampFirst;

long endTimestamp - beginTimestampFirst;

//根据主题查找当前要发送的broker的信息,最后返回TopicpublishInfo

TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

if (topicPublishInfo != nul1 && topicPublishInfo.ok()) {

boolean callTimeout = false;

MessageQueue mq = null;

Exception exception = null;

sendResult sendResult = null;

int timesTotal = communicationMode =m CommunicationMode.sYNc ? 1 + this.defaultMQProducor

int times = 0;

string[brokerssent = new String[timesTotal];

for (; times < timesTotal; timesi) {

在TopicpublishInfo中,有messageQulueList,通过它进行负载均衡。

public class TopicPublishInfo i

private boolean orderTopic = false;

private boolean haveTopicRouterInfo = false;

private List<MessageQueue> messageQulueList = new ArrayList<~>( );

private volatile ThreadLocalIndex sendwhichQueue = new ThreadLocalIndex();

private TopicRouteData topicRouteData;

在DefaultMQProducerlmpl中如何进行路由查找:

image.png生产者为了提高发送的效率,不会每一次都去NameServer当中根据主题查找路由信息,而是在本地缓存路由表,如果没有查到,才会在NameServer中去查找,如果当前已经查到了TopicPublishInfo在当前的缓存表中查到了,就直接返回。

private TopicPublishInfo tryToFindTopicPublishInfo(final string topic){

//从缓存中获得主题的路由信息

TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

//路由信息为空,则从NameServer获取路由;不为空,直接返回topicPublishInfo

if (null == topicPublishInfo || !topicPublishInfo.ok()) {

this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());this.mQclientFactory.updateTopicRouteInfoFromNameserver(topic);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()){

return topicPublishInfo;

}else {

//如果未找到当前主题的路由信息,则用默认主题继续查找this.mQclientFactory.updateTopicRouteInfoFromNameServer(topic,isDefault: true,this.defaultNQProducer);

topicPublishInfo = this.topicPublishInfoTable.get(topic);

return topicPublishInfo;

}

}

缓存表样式如下,key是topic,value是PublihInfo发送路由信息:

private final ConcurrentMap<string/*topic*/, TopicPublihInfo>topicPublishInfoTable= new concurrentHashMap<~>();

MQClientInstance是向外发送请求的类,从NameServer获取路由是调用MQClientInstance完成的,请求的代码过程:

//如果当前未指定主题,使用的就是默认主题

if (isDefault &&defaultMQProducer!= nul1) i

topicRouteData = this.mClientAPIImp1.getDefaultTopicRuteInfoFromNameServer(defaultMQProducer.getCreateTopic.

timeoutMillis: 1000* 3);

if (topicRouteData != null) {

for (QueueData data : topicRouteData.getQueueDatas()) {

int queueNums = Math.min(defaultwQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());

data.setReadQueueNums(queueNums ) ;

data.setwriteQueueNums(queueNums );

}

}

} else {

topicRouteData = this.mQclientAPIImpl.getTopicRouteInfoFromNameServer(topic,timeoutilli:1000 * 3);

注:一般发送消息会设置主题,所以运行getTopicRouteInfoFromNameServer,在MQClientAPIImpl中请求NameServer,获取当前主题所对应的路由信息。

完成调用之后,会返回topicRouteData主题路由数据,后判断topicRouteData:

//如果数据不为空,根据主题从本地缓存的表查询当前主题原先的路由信息,比较原来路由信息和当前路由信息是否一致

if (topicRouteData != null){

TopicRouteData old = this.topicRouteTable.get(topic);

boolean changed = topicRouteDataIsChange(old,topicRouteData);

if ( !changed) {

changed = this.isNeedupdateTopicRouteInfo(topic);

} else {

log.info( varl: "the topic[{]] route info changed,old[(}] ,new[()]", topic,old,topicRouteData);

}

//如果不一致,对本地路由表进行更新操作

比较之后会返回布尔值,如果是true代表变化了,变化之后在以下代码位置做更新操作,把本地路由表去做更新,这样下一次再去发送消息时,就可以直接从本地获取。

if ( changed) {

TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()){this.brokerAddrTable.put(bd.getBrokerName()bd.getBrokerAddrs( ));

}

//update pub info 将topicRouteData转换为topicPublishInfo

{

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

publishInfo.setHaveTopicRouterInfo(true);

Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator();

while (it.hasNext()){

Entry<string,MQProducerInner> entry = it.next();

MQProducerInner impl = entry.getvalue();

if ( impl != null) {

impl.updateTopicPublishInfo(topic, publishInfo);

}

如上图表,返回路由信息下面是判断是否需要更新本地路由表,其中类名为TopicRouteData,但整个方法叫TopicPublicInfo,最终是用这个类。

从NameServer中查出的类是TopicRouteData,最终在消息发送中用的类TopicPublicInfo,所以重要代码:TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);将topicRouteData中的数据取出放到TopicPublishInfo 里,这就是最终消息生产者在发送消息时从TopicPublishInfo去查找路由信息。

主要流程:在发送消息时,根据主题查找路由信息,producer内部维护了一个自己的路由表,首先会在它本地缓存中查找,如果缓存中没有信息,则请求NameServer,请求NameServer借助的是MQClientInstance。查找结束,返回路由信息,之后检查是否需要更新本地缓存路由,最后将topicRouteData转换为topicPublishInfo。

TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());这一步执行完成之后,producer查找路由就结束了。

相关文章
|
9月前
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
305 0
|
2月前
|
消息中间件 前端开发 算法
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
【十七】RabbitMQ基础篇(延迟队列和死信队列实战)
69 1
|
2月前
|
消息中间件 存储 Kafka
几种 MQ 顺序消息的实现方式
几种 MQ 顺序消息的实现方式
|
2月前
|
消息中间件 网络架构
【面试问题】什么是 MQ topic 交换器(模式匹配) ?
【1月更文挑战第27天】【面试问题】什么是 MQ topic 交换器(模式匹配) ?
|
12月前
RabbmitMQ学习笔记-死信队列
RabbmitMQ学习笔记-死信队列
46 0
|
消息中间件 Java 数据库
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
324 0
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
|
消息中间件 存储 Apache
解析 RocketMQ 业务消息--“顺序消息”
本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。
253 0
解析 RocketMQ  业务消息--“顺序消息”
|
消息中间件 存储 运维
【视频】普通消息 | 学习笔记
快速学习【视频】普通消息
459 0
【视频】普通消息 | 学习笔记
|
消息中间件 缓存 负载均衡
消息发送3-选择队列|学习笔记
快速学习消息发送3-选择队列
92 0
消息发送3-选择队列|学习笔记
|
消息中间件 缓存 RocketMQ
消息发送5-总结|学习笔记
快速学习消息发送5-总结
48 0
消息发送5-总结|学习笔记