前面我用2篇文章分别讲了seata中客户端(TM和RM)和服务端seata-server的初始化过程,如果熟悉seata这个分布式中间件的使用的话,这个过程还是非常清晰的。有2个地方我没有讲解,就是这2个流程初始化过程的ShutdownHook。
TM、RM的shutdown
之前讲过,GlobalTransactionScanner这个类就是客户端的初始化类,初始化的方法在afterPropertiesSet这个方法:
public void afterPropertiesSet() { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } initClient(); } private void initClient() { //省略其他代码 //init TM TMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //省略其他代码 registerSpringShutdownHook(); }
上面的initClient方法中registerSpringShutdownHook方法就是今天要讲注册ShutdownHook,代码如下:
private void registerSpringShutdownHook() { if (applicationContext instanceof ConfigurableApplicationContext) {//这个分支在sega模式下才会讲,到后面讲 ((ConfigurableApplicationContext) applicationContext).registerShutdownHook(); ShutdownHook.removeRuntimeShutdownHook(); } ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup)); ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup)); }
下面的这2个ShutdownHook,分别添加了TM和RM的Client,关闭的时候,会做一些操作,这里的Disposable就是spring中的Disposable,里面只有一个destroy方法,之前的那张UML类图再看一下:
可以看到TmNettyRemotingClient和RmNettyRemotingClient都继承了AbstractNettyRemotingClient,所以他们的destroy方法都是一个,代码如下:
public void destroy() { clientBootstrap.shutdown(); if (mergeSendExecutorService != null) {//关闭批量发送请求的线程池 mergeSendExecutorService.shutdown(); } super.destroy(); }
上面的clientBootstrap.shutdown()就是关闭netty的eventLoopGroupWorker,代码很简单:
public void shutdown() { try { this.eventLoopGroupWorker.shutdownGracefully(); if (this.defaultEventExecutorGroup != null) { this.defaultEventExecutorGroup.shutdownGracefully(); } } catch (Exception exx) { LOGGER.error("Failed to shutdown: {}", exx.getMessage()); } }
上面的super.destroy()方法,在AbstractNettyRemoting类,是关闭了一个定时任务线程池和一个收发消息的线程池,代码如下:
public void destroy() { timerExecutor.shutdown(); messageExecutor.shutdown(); }
seata server的shutdown
上一篇文章讲到,seata server的初始化类在Server.java的main函数,里面的shutdown代码如下:
public static void main(String[] args) throws IOException { //省略部分代码 NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS); //server port nettyRemotingServer.setListenPort(parameterParser.getPort()); //省略部分代码 DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); coordinator.init(); nettyRemotingServer.setHandler(coordinator); // register ShutdownHook ShutdownHook.getInstance().addDisposable(coordinator); ShutdownHook.getInstance().addDisposable(nettyRemotingServer); //省略部分代码 }
同样,上面的2个Disposable也是spring中的Disposable实现,首先看一下DefaultCoordinator中的destroy方法,代码如下:
public void destroy() { // 1. first shutdown timed task retryRollbacking.shutdown(); retryCommitting.shutdown(); asyncCommitting.shutdown(); timeoutCheck.shutdown(); try { retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { } // 2. second close netty flow if (remotingServer instanceof NettyRemotingServer) { ((NettyRemotingServer) remotingServer).destroy(); } // 3. last destroy SessionHolder SessionHolder.destroy(); }
首先关闭4个定时任务线程池,而且都是等待5s,然后调用NettyRemotingServer的destroy方法,最后关闭session。这个关闭代码在AbstractNettyRemotingServer,代码如下:
public void destroy() { serverBootstrap.shutdown(); super.destroy(); }
下面代码是serverBootstrap的shutdown,也就是关闭netty的EventLoopGroup
public void shutdown() { try { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Shutting server down. "); } if (initialized.get()) { RegistryFactory.getInstance().unregister(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));//file方式这里不做任务动作,其他方式有注册和注销的逻辑 RegistryFactory.getInstance().close(); //wait a few seconds for server transport TimeUnit.SECONDS.sleep(nettyServerConfig.getServerShutdownWaitTime());//线程先睡眠设置的关闭等待时间,这个时间是file.conf的shutdown.wait } this.eventLoopGroupBoss.shutdownGracefully(); this.eventLoopGroupWorker.shutdownGracefully(); } catch (Exception exx) { LOGGER.error(exx.getMessage()); } }
而AbstractNettyRemotingServer中destroy方法的super.destroy,跟RM和TM中讲的一样,也在AbstractNettyRemoting类中,代码如下:
public void destroy() { timerExecutor.shutdown(); messageExecutor.shutdown(); }
可以看见,其实DefaultCoordinator的destroy逻辑已经执行了NettyRemotingServer的逻辑了,这样设计或许是为了以后server的扩展吧?
总结
seata中无论RM、TM客户端的关闭和server的关闭,主要就是3件事情,关闭线程池、关闭netty连接和关闭session。如果不是文件方式,也有一些注销的动作。