路由注册之发送心跳包|学习笔记

简介: 快速学习路由注册之发送心跳包

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)路由注册之发送心跳包】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12465


路由注册之发送心跳包

 

路由注册

思考:路由注册由谁注册?

应该由broker完成路由注册。

打开broker启动的代码,在broker启动之后就会开始向NameServer注册路由信息。

image.png

image.png如上图,首先启动入口有个main方法,在main方法里首先是创建BrokerController,创建BrokerController的方式和创建namesrvcontroller相似,先创建brokerconfig,然后再创建nettyserverconfig和nettyclientconfig。对于生产者来说,broker需要处理生产者的请求,nettyserverconfig是作为服务端存在的;而对于NameServer,broker要上报数据给NameServer进行心跳检测,所以就要分别创建brokerconfig、nettyserverconfig及nettyclientconfig

final Brokerconfig brokerconfig = new Brokerconfig();

final NettyServerconfig nettyServerconfig = new Nettyserverconfig();

final NettyclienEconfig nettyclientconfig = new Nettyclientconfig();nettyclientconfig.setuseTLS(Boolean.parseBoolean(System.getproperty(TLs_ENABLE,

string.valueof(Tlssystemconfig.tlsMode == TlsMode.ENFORCING))));

nettyServerconfig.setListenPort(10911);

//nettyServerconfig.setListenPort(10911)是配置nettyServerconfig的端口号,生产者是通过10911这个端口提交信息到broker。

final Messagestoreconfig messagestoreConfig = new MessagestoreConfig();

//填充配置类

if (BrokerRole.SLAVE == messagestoreconfig.getBrokerRole()) {

int ratio = messagestoreconfig.getAccessMessageInMemory

MaxRatio() - 10;

messagestoreconfig.setAccessMessageInMemoryMaxRatio(ratio);

}

if (commndLine.hasoption( 'c')) {

//-c解析配置文件填充配置类

strig file = commandLine.getoptionvalue( 'c' );

if (file != nul1) {

configFile = file;

Inputstream in = new BufferedInputstream(new FileInputstre

am(file));

properties = new Properties();

properties. load(in);

properties2systemEnv( properties);

MixAll.properties20bject(properties, brokerConfig);

MixAll.properties20bject(properties,nettyserverConfig);

MixAll.properties2object(properties,nettyclientconfig);

MixAll.properties20bject(properties,messagestoreconfig);

BrokerPathconfigHelper.setBrokerconfigpath(file);

in.close();

}

}

以下代码是从brokerconfig中拿到NameServerAddr,因为broker要上报心跳信息到NameServer,所以在解析配置文件完成后,brokerConfig就可以得到NameServerAddr。通过这个代码看到,如果NameServer是搭建一个集群,需要以分号拼接。

string namesrvAddr = brokerconfig.getNamesrvldr();

if ( null != namesrvAddr) {

try {

string[] addrArray = namesrvAddr.split( regex: "; ");

for ( string addr : addrArray) {

Remotingutil.string2socketAddress( addr);

}

}catch (Exception e) {

system.out.printf(

"The Name Server Address[%s] illegal,please set it as follows,\"127.0.0.1:9876;192

namesrvAddr);

System.exit( status: -3);

}

}

//BrokerController创建完成

final Brokercontroller controller = new BrokerController(

brokerconfig,

nettyServerconfig,

nettyclientconfig,

messagestoreconfig);

// remember all configs to prevent discard

controller.getconfiguration( ).registerconfig(properties);

//对BrokerController进行初始化

boolean initResult = controller.initialize();

if ( !initResult) {

controller.shutdown( );

system.exit( status: -3);

}

在完成初始化里,完成的是对BrokerController属性的赋值,包括进行messageStore等等。初始化完成之后,controller就可以返回,返回到start方法,最终通过start方法进行启动。

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

private volatile boolean hasshutdown = false;

private AtomicInteger shutdownTimes = new AtomicInteger( initialValue: 0);

@override

public void run() {

synchronized (this) {

Log.info("shutdown hook was invoked,{}" , this.shutdownTimes.incrementAndGet());

if ( ! this.hasshutdown) {

this.hasshutdown = true;

long beginTime = system.currentTimeMillis();

controller.shutdown();

long consumingTimeTotal = system.currentTimeMillis() - beginTime;

log.info("shutdown hook over,consuming total time(ms): { }" , consumingTimeTotal)

}

}

}

},name: "shutdownHook" ) );

这段代码是JVM中注册钩子函数,保证JVM在退出时,进行资源释放。

之后是进行启动,启动是调用Controller里面的start方法,启动之后要开始进行注册路由信息,注册完成之后创建线程池

this.registerBrokerAll( checkOrderConfig: true,oneway:. false,forceRegister: true);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@override

public void run( ) {

try {

BrokerController.this.registerBrokerAll( checkOrderConfig: true,oneway:. false,brokercoConfig.isForceRegister()

}catch (Throwable e) {

log.error( "registerBrokerAll Exception", e);

}

}

},initialDelay: 1000 * 10,Math.max(10000,Math.min(brokerCo

nfig.getRegisterNameServerPeriod(),6000)),TimeUnit.

getRegisterNameServerPeriod()默认是30s,因为以上代码的存在,broker在启动之后每隔30s要上传细条信息给NameServer。

在registerBrokerAll注册代码中,有一个重要方法doRegisterBrokerAll:

doRegisterBrokerAll(checkorderconfig,oneway,topicconfigwrapper);

通过这个方法,调用了brokerOuterAPI,通过registerBrokerAll方法向外发送请求:

private void doRegisterBrokerAl1(boolean checkorderconfig, boolean oneway,

Topicconfigserializewrapper topicconfigwrapper) {

List<RegisterBrokerResult> registerBrokerResultList = this.bpokerouterAPI.registerBrokerAll(

this. brokerconfig.getBrokerclusterName(),

this.getBrokerAddr(),

this.brokerconfig.getBrokerName(),

this. brokerconfig-getBrokerId(),

this.getHAServerAddr(),

topicconfigwrapper,

this.filterServerManager.buildNewFilterServerList(),

oneway ,

this.brokerconfig.getRegisterBrokerTimeoutMills(),

this.brokerconfig.iscompressedRegister());

//封装请求头

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();

requestHeader.setBrokerAddr(brokerAddr);

requestHeader.setBrokerid(brokerId);

requestHeader. setBrokerName (brokerName);

requestHeader.setc1usterName(c1usterName);

requestHeader.setHaserverAddr(haserverAddr);

requestHeader. setcompressed(compressed);

//封装请求体

RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicconfigserializewrapper(topicconfigwrapper);

requestBody . setFi1terserverList(fi1teserverList);

final byte[] body = requestBody.encode(compressed);

final int bodycrc32 = uti1A17.crc32(body);

requestHeader.setBodycrc32(bodycrc32);

final countDownLatch countDownLatch = new countDownLatch(nameServerAddressList.size());

for (fina1 string namesrvAddr : nameserverAddressList) {

brokerouterExecutor.execute(new Runnab1e( ){

在registerBrokerAll中创建了请求header、封装请求体,

请求体创建之后,在 NameServerAddressList处遍历NameServer的地址,分别给每一个NameServer注册当前broker信息。

for (final string namesrvAddr : nameServerAddressList) {

brokerouterExecutor.execute( new Runnable( ) i

@override

public void run() {

try {

RegisterBrokerResult result = registprBroker(namesr

vAddr , oneway, timeoutMills,requestHeader, ody) ;

if ( result != null) {

registerBrokerResultList.add( result);

}

registerBroker通过remotingClient向NameServer发送请求,然后上报心跳数据,如果是oneway方式表示不接受返回值;如果是其他方式,以同步方式上报数据,会获得响应,之后会解析注册结果。

RemotingCommand response = this.remotingclient.invokeSync(namesrvAddr, request,timeoutNills);

assert response != null;

switch (response.getcode()) {

case Responsecode.success: {

RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader)response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);

RegisterBrokerResult result = new RegisterBrokerResult();

result.setMasterAddr(responseHeader.getMasterAddr());

result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != nul1) {

result.setKvTable(KVTable.decode(response.getBody(),KVTable.class));

}

return result;

}

default:

break;

}

小结:路由注册的入口在BrokerStartup里,在BrokerStartup里首先是创建BrokerController,BrokerController的创建需要brokerconfig、nettyserverconfig和nettyclientconfig配置类,并且设置端口号10911,然后获取namesrvAddr。Controller创建之后就要进行对其初始化,完成controller内部的赋值。然后再注册钩子函数,释放资源。整个controller创建完成之后进行启动,在启动时会调用controller的startup,在controller启动的位置会完成路由信息注册,broker会每隔30s上报心跳信息到NameServer。

相关文章
|
4月前
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的
237 2
|
4月前
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的,
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的,
145 1
|
11月前
|
JSON 开发工具 Android开发
通知消息和透传消息
通知消息和透传消息
626 0
通知消息和透传消息
|
存储 IDE 开发工具
CAN通信配置过滤器和使用三个邮箱发送
CAN通信配置过滤器和使用三个邮箱发送
702 0
|
消息中间件 Java 数据库
消息的和发送和接收|学习笔记
快速学习消息的和发送和接收
107 0
|
消息中间件 RocketMQ 开发者
发送同步消息|学习笔记
快速学习发送同步消息
75 0
发送同步消息|学习笔记
|
消息中间件 RocketMQ 开发者
发送异步消息|学习笔记
快速学习发送异步消息
57 0
|
移动开发 网络协议 测试技术
服务器接收客户端消息|学习笔记
快速学习服务器接收客户端消息
133 0
|
移动开发 网络协议 测试技术
服务器循环接收客户端消息|学习笔记
快速学习服务器循环接收客户端消息
106 0
服务器循环接收客户端消息|学习笔记
|
JSON 网络协议 测试技术
服务端转发消息代码实现|学习笔记
快速学习服务端转发消息代码实现
90 0

热门文章

最新文章