开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):路由注册之处理请求包】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12466
路由注册之处理请求包
处理心跳包
在broker发送请求之后,NameServer如何处理:
在NameServer项目中有一个processor包,processor里有个DefaultRequestProcessor类,在这类当中,负责处理它所接受到的请求,结合下图来看:
processRequest负责接受心跳注册请求,之后是对请求类型进行判断,因为NameServer不仅仅接受broker的心跳请求,还会接受producer和consumer的请求。
switch (request.getcode()) {
case Requestcode.PuT_kv_cONFIG:
return this.putkvconfig(ctx,request);
case Requestcode.GET_KV_CONFIG:
return this.getKvconfig(ctx,request);
case Requestcode.DELETE_Kv_CONFIG:
return this.deletekvconfig(ctx,request);
case Requestcode.QUERY_DATA_VERSION:
return queryBrokerTopicconfig(ctx,request);
case Requestcode.REGISTER_BROKER:
Version brokerVersion = MQversion.value2version(reque
st.getVersion());
if (brokerVersion.ordinal() >= MQversion.version.v3_0_11.
ordinal()) {
return this.registerBrokerwithFilterserver(ctx,request);
}else {
return this.registerBroker( ctx,request);
case Requestcode. UNREGISTER_BROKER:
return this.unregisterBroker( ctx, request);
在registerBroker中最核心的路由注册由RouteInfoManger完成
//获得响应
final Remotingcommand response = Remotingcommand.createResponsecommand(RegisterBrokerResponseHeader.class);
//创建响应头
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response .readcustomHeader();
final RegisterBrokerRequestHeader=
(RegisterBrokerRequestHeader)request.decodecommandCustomHeader(RegisterBrokerRequestHeader.class) ;
以上准备工作完成后,通过RouteinfoManager进行注册:
RegisterBrokerResult result = this.namesrvcontroller.getRou
teinfoManager( ).registerBroker(
requestHeader.getclusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName( ),
requestHeader-getBrokerId(),
requestHeader.getHaServerAddr( ),
topicconfigwrapper,
filterServerList: null,
ctx.channel()
);
在注册时,注意this. lock.writeLock( ). lockInterruptibly()
代码,这行代码说明路由注册是同步,串行化的过程,故在上图中有“加锁”标识。下面对clusterAddrTable和brokerAddrTable进行维护,然后在往下运行,对brokerLiveTable进行了维护。
在DefaultRequestProcesso代码中,如果当前brokerVersion版本大于3.0.11,运行的是registerBrokerwithFilterServer方法。
if (brokerVersion.ordinal() >= MQversion.version.v3_e_11.ord
inal()) {
return this.registerBrokerwithFilterServer(ctx,request);
如果,filterserverList不为空,会进行维护处理。
if (filterserverList !=-nul1) {
if (filterserverList.isEmpty() {
this.fi1terserverTable.remove(brokerAddr);
}else {
this.filterserverTable.put(brokerAddr,filterserverList);
}
}
总结:在NameServer处理心跳包时主要用到DefaultRequestProcesso和RouteInfoManger两个类,真正工作的是RouteInfoManger,在维护路由表时是同步状态。