阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook

本文涉及的产品
性能测试 PTS,5000VUM额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 阿里中间件seata源码剖析三:聊聊seata中的ShutdownHook

前面我用2篇文章分别讲了seata中客户端(TM和RM)和服务端seata-server的初始化过程,如果熟悉seata这个分布式中间件的使用的话,这个过程还是非常清晰的。有2个地方我没有讲解,就是这2个流程初始化过程的ShutdownHook。


TM、RM的shutdown


之前讲过,GlobalTransactionScanner这个类就是客户端的初始化类,初始化的方法在afterPropertiesSet这个方法:

public void afterPropertiesSet() {
    if (disableGlobalTransaction) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        return;
    }
    initClient();
}
private void initClient() {
    //省略其他代码
    //init TM
    TMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //省略其他代码
    registerSpringShutdownHook();
}

上面的initClient方法中registerSpringShutdownHook方法就是今天要讲注册ShutdownHook,代码如下:

private void registerSpringShutdownHook() {
    if (applicationContext instanceof ConfigurableApplicationContext) {//这个分支在sega模式下才会讲,到后面讲
        ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
        ShutdownHook.removeRuntimeShutdownHook();
    }
    ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
    ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
}

下面的这2个ShutdownHook,分别添加了TM和RM的Client,关闭的时候,会做一些操作,这里的Disposable就是spring中的Disposable,里面只有一个destroy方法,之前的那张UML类图再看一下:

微信图片_20221212151547.png

可以看到TmNettyRemotingClient和RmNettyRemotingClient都继承了AbstractNettyRemotingClient,所以他们的destroy方法都是一个,代码如下:

public void destroy() {
    clientBootstrap.shutdown();
    if (mergeSendExecutorService != null) {//关闭批量发送请求的线程池
        mergeSendExecutorService.shutdown();
    }
    super.destroy();
}

上面的clientBootstrap.shutdown()就是关闭netty的eventLoopGroupWorker,代码很简单:

public void shutdown() {
    try {
        this.eventLoopGroupWorker.shutdownGracefully();
        if (this.defaultEventExecutorGroup != null) {
            this.defaultEventExecutorGroup.shutdownGracefully();
        }
    } catch (Exception exx) {
        LOGGER.error("Failed to shutdown: {}", exx.getMessage());
    }
}

上面的super.destroy()方法,在AbstractNettyRemoting类,是关闭了一个定时任务线程池和一个收发消息的线程池,代码如下:

public void destroy() {
    timerExecutor.shutdown();
    messageExecutor.shutdown();
}

seata server的shutdown


上一篇文章讲到,seata server的初始化类在Server.java的main函数,里面的shutdown代码如下:

public static void main(String[] args) throws IOException {
    //省略部分代码
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
    //server port
    nettyRemotingServer.setListenPort(parameterParser.getPort());
    //省略部分代码
    DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
    coordinator.init();
    nettyRemotingServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);
    ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
    //省略部分代码
}

同样,上面的2个Disposable也是spring中的Disposable实现,首先看一下DefaultCoordinator中的destroy方法,代码如下:

public void destroy() {
    // 1. first shutdown timed task
    retryRollbacking.shutdown();
    retryCommitting.shutdown();
    asyncCommitting.shutdown();
    timeoutCheck.shutdown();
    try {
        retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ignore) {
    }
    // 2. second close netty flow
    if (remotingServer instanceof NettyRemotingServer) {
        ((NettyRemotingServer) remotingServer).destroy();
    }
    // 3. last destroy SessionHolder
    SessionHolder.destroy();
}

首先关闭4个定时任务线程池,而且都是等待5s,然后调用NettyRemotingServer的destroy方法,最后关闭session。这个关闭代码在AbstractNettyRemotingServer,代码如下:

public void destroy() {
    serverBootstrap.shutdown();
    super.destroy();
}

下面代码是serverBootstrap的shutdown,也就是关闭netty的EventLoopGroup

public void shutdown() {
    try {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Shutting server down. ");
        }
        if (initialized.get()) {
            RegistryFactory.getInstance().unregister(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));//file方式这里不做任务动作,其他方式有注册和注销的逻辑
            RegistryFactory.getInstance().close();
            //wait a few seconds for server transport
            TimeUnit.SECONDS.sleep(nettyServerConfig.getServerShutdownWaitTime());//线程先睡眠设置的关闭等待时间,这个时间是file.conf的shutdown.wait
        }
        this.eventLoopGroupBoss.shutdownGracefully();
        this.eventLoopGroupWorker.shutdownGracefully();
    } catch (Exception exx) {
        LOGGER.error(exx.getMessage());
    }
}

而AbstractNettyRemotingServer中destroy方法的super.destroy,跟RM和TM中讲的一样,也在AbstractNettyRemoting类中,代码如下:

public void destroy() {
    timerExecutor.shutdown();
    messageExecutor.shutdown();
}

可以看见,其实DefaultCoordinator的destroy逻辑已经执行了NettyRemotingServer的逻辑了,这样设计或许是为了以后server的扩展吧?


总结


seata中无论RM、TM客户端的关闭和server的关闭,主要就是3件事情,关闭线程池、关闭netty连接和关闭session。如果不是文件方式,也有一些注销的动作。

相关文章
|
5月前
|
消息中间件 存储 NoSQL
阿里开源中间件一览
阿里开源中间件一览
344 2
|
6月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
消息中间件 中间件 Kafka
限时开源!阿里内部消息中间件合集:MQ+Kafka+体系图+笔记
近好多小伙伴说在准备金三银四的面试突击了,但是遇到消息中间件不知道该怎么学了,问我有没有成体系的消息中间件的学习方式。 额,有点不知所措,于是乎小编就想着做一次消息中间件的专题,归类整理了一些纯手绘知识体系图、面试以及相关的学习笔记。
235 1
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
消息中间件 安全 Java
全网首发!消息中间件神仙笔记,涵盖阿里十年技术精髓
消息中间件是分布式系统中的重要组件,在实际工作中常用消息中间件进行系统间数据交换,从而解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。
|
6月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
110 0
|
5月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1397 0
|
4月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
275 3