开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):路由发现和小结】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12468
路由发现和小结
内容介绍:
一、路由发现
二、小结
一、路由发现
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
在NameServer中有一个DefaultRequestProcessor,这个类的作用是处理客户端请求,里面有一个请求类型是GET_RUTEINT_BY_TOPIC,是客户端根据这个主题主动的获得路由信息,目的是降低NameServer实现的难度,保证简单、高效。
代码:
case Requestcode.GET_ROUTEINTO_By_TOPIC:
return this.getRouteInfoByTopic(ctx,request);
在getRouteInfoByTopic里会找到namesrvcontroller拿到RouteInfoManger,根据主题去摘取相关路由信息去填充topicRouteData
public RemotingCommand getRouteInfoByTopic(ChannelHan
dlercontext ctx,
Remotingcommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =
l(GetRouteInfoRequestHeader) request.decodeCommandcustomHeader(GetRouteInfoRequestHeader .class) ;
TopicRouteData topicRouteData =
this.namesrvController.getRouteInfoNanager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
If (this.namesrvController.getNamesrvConfig().isorderMessag
eEnable()) {
string orderTopicconf =
this.namesrvController.getKvConfigManager() .getKVConfig(NamesrvUtil.NANESPACE_ORDER_TOPIC_CONFT6,
requestHeader.getTopic());
topicRouteData.setorderTopicconf(orderTopicconf);
}
byte[]content = topicRouteData.encode();
response.setBody ( content);
response.setcode(Responsecode.SUCCESS);response.setRemark( null);
return response;
}
以上代码是在进行路由的发现,this.lock.readLock(). lockInterruptibly();
这个流程是同步的,readLock是共享锁。
二、小结
NameServer充当broker管理者,NameServer收到 Broker 心跳包后更新brokerLiveTable 中的信息,特别记录心跳时间 lastUpdateTime;broker会每隔30s向NameServer报告自己的信息;NameServer每隔10s扫描brokerLiveTable,检测表中上次收到心跳包的时间,比较当前时间与上一次时间,如果超过120s,则认为broker不可用,移除路由表中与该broker相关的所有信息。
当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端主动的通过topic查询路由信息。