开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息发送2-消息路由查找】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12472
消息发送2-消息路由查找
查找路由
查找路由的目的是要知道当前的消息发给哪个broker,找到broker之后选择队列发送消息。
DefaultMQProducerlmpl中消息发送的全部业务逻辑:
private sendResult sendbefaultimpk
Message msg,
final communicationtode communicationMode,
final Sendcal1back sendcal1back,
final long timeout
)throws wQclientException,RemotingException,MQBrokerException,InterruptedException {
this.makesurestateOK(;
validators.checkMessage(msg, this.dofaultNQProducer);
final long invokeID = random.nextLong(;
long beginTimestampFirst - system.currentTimeMillis();
long beginTimestampPrev - beginTimestampFirst;
long endTimestamp - beginTimestampFirst;
//根据主题查找当前要发送的broker的信息,最后返回TopicpublishInfo
TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != nul1 && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
sendResult sendResult = null;
int timesTotal = communicationMode =m CommunicationMode.sYNc ? 1 + this.defaultMQProducor
int times = 0;
string[brokerssent = new String[timesTotal];
for (; times < timesTotal; timesi) {
在TopicpublishInfo中,有messageQulueList,通过它进行负载均衡。
public class TopicPublishInfo i
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQulueList = new ArrayList<~>( );
private volatile ThreadLocalIndex sendwhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
在DefaultMQProducerlmpl中如何进行路由查找:
生产者为了提高发送的效率,不会每一次都去NameServer当中根据主题查找路由信息,而是在本地缓存路由表,如果没有查到,才会在NameServer中去查找,如果当前已经查到了TopicPublishInfo在当前的缓存表中查到了,就直接返回。
private TopicPublishInfo tryToFindTopicPublishInfo(final string topic){
//从缓存中获得主题的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//路由信息为空,则从NameServer获取路由;不为空,直接返回topicPublishInfo
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());
this.mQclientFactory.updateTopicRouteInfoFromNameserver(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()){
return topicPublishInfo;
}else {
//如果未找到当前主题的路由信息,则用默认主题继续查找this.mQclientFactory.updateTopicRouteInfoFromNameServer(topic,isDefault: true,this.defaultNQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
缓存表样式如下,key是topic,value是PublihInfo发送路由信息:
private final ConcurrentMap<string/*topic*/, TopicPublihInfo>topicPublishInfoTable= new concurrentHashMap<~>();
MQClientInstance是向外发送请求的类,从NameServer获取路由是调用MQClientInstance完成的,请求的代码过程:
//如果当前未指定主题,使用的就是默认主题
if (isDefault &&defaultMQProducer!= nul1) i
topicRouteData = this.mClientAPIImp1.getDefaultTopicRuteInfoFromNameServer(defaultMQProducer.getCreateTopic.
timeoutMillis: 1000* 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultwQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums ) ;
data.setwriteQueueNums(queueNums );
}
}
} else {
topicRouteData = this.mQclientAPIImpl.getTopicRouteInfoFromNameServer(topic,timeoutilli:1000 * 3);
注:一般发送消息会设置主题,所以运行getTopicRouteInfoFromNameServer,在MQClientAPIImpl中请求NameServer,获取当前主题所对应的路由信息。
完成调用之后,会返回topicRouteData主题路由数据,后判断topicRouteData:
//如果数据不为空,根据主题从本地缓存的表查询当前主题原先的路由信息,比较原来路由信息和当前路由信息是否一致
if (topicRouteData != null){
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old,topicRouteData);
if ( !changed) {
changed = this.isNeedupdateTopicRouteInfo(topic);
} else {
log.info( varl: "the topic[{]] route info changed,old[(}] ,new[()]", topic,old,topicRouteData);
}
//如果不一致,对本地路由表进行更新操作
比较之后会返回布尔值,如果是true代表变化了,变化之后在以下代码位置做更新操作,把本地路由表去做更新,这样下一次再去发送消息时,就可以直接从本地获取。
if ( changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()){
this.brokerAddrTable.put(bd.getBrokerName()bd.getBrokerAddrs( ));
}
//update pub info 将topicRouteData转换为topicPublishInfo
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String,MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()){
Entry<string,MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getvalue();
if ( impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
如上图表,返回路由信息下面是判断是否需要更新本地路由表,其中类名为TopicRouteData,但整个方法叫TopicPublicInfo,最终是用这个类。
从NameServer中查出的类是TopicRouteData,最终在消息发送中用的类TopicPublicInfo,所以重要代码:TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
将topicRouteData中的数据取出放到TopicPublishInfo 里,这就是最终消息生产者在发送消息时从TopicPublishInfo去查找路由信息。
主要流程:在发送消息时,根据主题查找路由信息,producer内部维护了一个自己的路由表,首先会在它本地缓存中查找,如果缓存中没有信息,则请求NameServer,请求NameServer借助的是MQClientInstance。查找结束,返回路由信息,之后检查是否需要更新本地缓存路由,最后将topicRouteData转换为topicPublishInfo。
TopicpublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic())
;这一步执行完成之后,producer查找路由就结束了。