开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):路由删除】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12467
路由删除
路由删除
Broker每隔30s向Nameserver发送一个心跳包,心跳包包含BrokerTd,Broker地址,Broker名称,Broker所属集群名称。
Broker关联的Fi1terserver列表。但是如果Broker宕机,Nameserver无法收到心跳包,此时Nameserver如何来剔除这些失效的Broker? Nameserver会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterserverTable。
RocketMQ有两个触发点来删除路由信息:
• NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
• Broker在正常关闭的情况下,会执行unregisterBroker指令
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。
根据下图再结合代码讲解路由删除的流程:
根据NameServer定期扫描删除路由信息,找到NameServer,里面有NamesrvController,在NamesrvController中有线程池,线程池是每隔10s扫描没有上报信息的broker
相关代码:
this.scheduledExecutorservice.scheduleAtFixedRate(new Runnable() {
@override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
},initialDelay:5,period:10,TimeUnit.sECONDS);
每隔10s执行下面所示方法:
public void scanNotActiveBroker() {
Iterator<Entry<String,BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext())i
Entry<string,BrokerLiveInfo> next = it.next();
long last = next.getvalue().getLastupdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME)<system.curren
tTimeMillis()){
Remotingutil.closechannel(next.getvalue().getchanne1());
it.remove();
log.warn("The broker channel expired,{}{ms ", next.getKey(),BROKER_CHANNEL_EXPIRED_TINIE);
this.onchannelDestroy(next.getKey(),next.getvalue().getchannel());
}
}
}
这个方法里面,首先是遍历brokerLiveTable,然后是获得BrokerLivelnfo的lastupdateTimestamp,如果这个时间加上当前的120s还小于system.currentTimeMillis(),就认为这个broker出现宕机。之后首先是关闭连接通道,接着移除该broker信息,在onchannelDestroy方法里传递broker的IP地址及当前通道信息,并负责维护其他的路由表,比如维护brokerAddrTable、BrokerNameMclusterAddrTable、根据BrokerName队列集合移除Broker。根据流程图,整个维护路由表的过程是加锁的,也是同步化的。
this.lock. readLock().lockInterruptibly();
terator<Entry<string,BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entryset().iterator();
while (itBrokerLiveTable.hasNext()){
Entry<string,BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getvalue().getchannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
}finally{
在上面代码组成的方法中去维护brokerLiveTable、filterserverTable还有brokerAddrtable
this. 1ock.writeLock() . lockInterruptibly();
this.brokerLiveTable.remove( brokerAddrFound) ;this.filterServerTable.remove(brokerAddrFound);
string brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<string,BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entryset().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getvalue();
当以上table都维护完成之后,会进行判断,如果当前队列里为空,会将TopicQueueTable删除,因为没有一个broker可以处理topic,故主题应该删除。
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
Log.info("remove topic[i}] all queue,from topicQueueTable,because channel destroyed",
topic);
}
上面所有都完成之后会释放lock。
总结:在路由删除过程中主要设计三个类,一个是NameServerController,它在初始化之后会开启一个线程池去不断扫描brokerLiveTable,如果发现一个broker超过120s未上报信息,会进行broker删除,维护路由表。