阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook

前面我用2篇文章分别讲了seata中客户端(TM和RM)和服务端seata-server的初始化过程,如果熟悉seata这个分布式中间件的使用的话,这个过程还是非常清晰的。有2个地方我没有讲解,就是这2个流程初始化过程的ShutdownHook。


TM、RM的shutdown


之前讲过,GlobalTransactionScanner这个类就是客户端的初始化类,初始化的方法在afterPropertiesSet这个方法:

public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        return;
    }
    initClient();
}
private void initClient() {
    //省略其他代码
    //init TM
    TMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //省略其他代码
    registerSpringShutdownHook();
}

上面的initClient方法中registerSpringShutdownHook方法就是今天要讲注册ShutdownHook,代码如下:

private void registerSpringShutdownHook() {
    if (applicationContext instanceof ConfigurableApplicationContext) {//这个分支在sega模式下才会讲,到后面讲
        ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
        ShutdownHook.removeRuntimeShutdownHook();
    }
    ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
    ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
}

下面的这2个ShutdownHook,分别添加了TM和RM的Client,关闭的时候,会做一些操作,这里的Disposable就是spring中的Disposable,里面只有一个destroy方法,之前的那张UML类图再看一下:

微信图片_20221212151547.png

可以看到TmNettyRemotingClient和RmNettyRemotingClient都继承了AbstractNettyRemotingClient,所以他们的destroy方法都是一个,代码如下:

public void destroy() {
    clientBootstrap.shutdown();
    if (mergeSendExecutorService != null) {//关闭批量发送请求的线程池
        mergeSendExecutorService.shutdown();
    }
    super.destroy();
}

上面的clientBootstrap.shutdown()就是关闭netty的eventLoopGroupWorker,代码很简单:

public void shutdown() {
    try {
        this.eventLoopGroupWorker.shutdownGracefully();
        if (this.defaultEventExecutorGroup != null) {
            this.defaultEventExecutorGroup.shutdownGracefully();
        }
    } catch (Exception exx) {
        LOGGER.error("Failed to shutdown: {}", exx.getMessage());
    }
}

上面的super.destroy()方法,在AbstractNettyRemoting类,是关闭了一个定时任务线程池和一个收发消息的线程池,代码如下:

public void destroy() {
    timerExecutor.shutdown();
    messageExecutor.shutdown();
}

seata server的shutdown


上一篇文章讲到,seata server的初始化类在Server.java的main函数,里面的shutdown代码如下:

public static void main(String[] args) throws IOException {
    //省略部分代码
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
    //server port
    nettyRemotingServer.setListenPort(parameterParser.getPort());
    //省略部分代码
    DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
    coordinator.init();
    nettyRemotingServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);
    ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
    //省略部分代码
}

同样,上面的2个Disposable也是spring中的Disposable实现,首先看一下DefaultCoordinator中的destroy方法,代码如下:

public void destroy() {
    // 1. first shutdown timed task
    retryRollbacking.shutdown();
    retryCommitting.shutdown();
    asyncCommitting.shutdown();
    timeoutCheck.shutdown();
    try {
        retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ignore) {
    }
    // 2. second close netty flow
    if (remotingServer instanceof NettyRemotingServer) {
        ((NettyRemotingServer) remotingServer).destroy();
    }
    // 3. last destroy SessionHolder
    SessionHolder.destroy();
}

首先关闭4个定时任务线程池,而且都是等待5s,然后调用NettyRemotingServer的destroy方法,最后关闭session。这个关闭代码在AbstractNettyRemotingServer,代码如下:

public void destroy() {
    serverBootstrap.shutdown();
    super.destroy();
}

下面代码是serverBootstrap的shutdown,也就是关闭netty的EventLoopGroup

public void shutdown() {
    try {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Shutting server down. ");
        }
        if (initialized.get()) {
            RegistryFactory.getInstance().unregister(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));//file方式这里不做任务动作,其他方式有注册和注销的逻辑
            RegistryFactory.getInstance().close();
            //wait a few seconds for server transport
            TimeUnit.SECONDS.sleep(nettyServerConfig.getServerShutdownWaitTime());//线程先睡眠设置的关闭等待时间,这个时间是file.conf的shutdown.wait
        }
        this.eventLoopGroupBoss.shutdownGracefully();
        this.eventLoopGroupWorker.shutdownGracefully();
    } catch (Exception exx) {
        LOGGER.error(exx.getMessage());
    }
}

而AbstractNettyRemotingServer中destroy方法的super.destroy,跟RM和TM中讲的一样,也在AbstractNettyRemoting类中,代码如下:

public void destroy() {
    timerExecutor.shutdown();
    messageExecutor.shutdown();
}

可以看见,其实DefaultCoordinator的destroy逻辑已经执行了NettyRemotingServer的逻辑了,这样设计或许是为了以后server的扩展吧?


总结


seata中无论RM、TM客户端的关闭和server的关闭,主要就是3件事情,关闭线程池、关闭netty连接和关闭session。如果不是文件方式,也有一些注销的动作。

相关文章
|
14小时前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
9月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
9月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
10月前
|
消息中间件 安全 Java
全网首发!消息中间件神仙笔记,涵盖阿里十年技术精髓
消息中间件是分布式系统中的重要组件,在实际工作中常用消息中间件进行系统间数据交换,从而解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。
|
10月前
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。
|
11月前
|
中间件 数据库 微服务
阿里开源分布式事务seata带你入门
阿里开源分布式事务seata带你入门
948 0
|
12月前
|
缓存 NoSQL 容灾
《Java应用提速(速度与激情)》——六、阿里中间件提速
《Java应用提速(速度与激情)》——六、阿里中间件提速
|
12月前
|
消息中间件 NoSQL Dubbo
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
一转眼,都2023年了,你是否在满意的公司?拿着理想的薪水? 虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因: 第一,“知其然不知其所以然”。做了多年技术,开发了很多业务应用,但似乎并未思考过种种技术选择背后的逻辑。所以,他无法向面试官展现出自己未来技术能力的成长潜力。面试官也不会放心把具有一定深度的任务交给他。 第二,知识碎片化,不成系统。在面试中,面试者似乎无法完整、清晰地描述自己所开发的系统,或者使用的相关技术。
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)