阿里中间件seata源码剖析二:聊聊TC的初始化

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 阿里中间件seata源码剖析二:聊聊TC的初始化

微信图片_20221212150957.jpg上一节seata源码讲解中,我们分析了RM和TM的初始化,而RM和TM的初始化,其实就是向TC建立连接,之后通过跟TC的连接来管理事务。本文我们来聊一聊TC作为server端的初始化过程。


这一次,我们还是把之前讲的springcloud整合seata-tcc模式的架构图贴出来,不熟悉的可以查看这篇文章《springcloud+eureka整合seata-tcc模式微信图片_20221212151022.png

订单服务(order-server)既是一个TM,也是一个RM,订单服务启动后,我们查看一下seata-server(TC)的日志:

2020-09-02 05:01:25.809  INFO --- [Thread_1_21_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='storageApi', applicationId='order-server', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xec46515e, L:/192.168.59.132:8091 - R:/192.168.59.1:50240],client version:1.3.0
2020-09-02 05:01:25.844  INFO --- [Thread_1_22_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='orderApi', applicationId='order-server', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xec46515e, L:/192.168.59.132:8091 - R:/192.168.59.1:50240],client version:1.3.0
2020-09-02 05:01:25.871  INFO --- [Thread_1_23_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='accountApi', applicationId='order-server', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xec46515e, L:/192.168.59.132:8091 - R:/192.168.59.1:50240],client version:1.3.0
2020-09-02 05:01:58.791  INFO --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
2020-09-02 05:02:16.400  INFO --- [NIOWorker_1_2_4] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='order-server', transactionServiceGroup='my_test_tx_group'},channel:[id: 0x6d81f15a, L:/192.168.59.132:8091 - R:/192.168.59.1:50296],client version:1.3.0

从日志中我们可以看到,order-server服务向TC发送了3条RM类型注册消息,发送了1条TM注册消息。接着我们再启动账户服务(account-server),它是一个RM,启动后seata-server日志如下:

2020-09-02 05:17:47.876  INFO --- [NIOWorker_1_3_4] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='account-server', transactionServiceGroup='my_test_tx_group'},channel:[id: 0x391680db, L:/192.168.59.132:8091 - R:/192.168.59.1:51395],client version:1.3.0
2020-09-02 05:17:47.878  INFO --- [Thread_1_24_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='null', applicationId='account-server', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xd828cf61, L:/192.168.59.132:8091 - R:/192.168.59.1:51396],client version:1.3.0

从日志中看出,account-server虽然只是一个RM,但是也发送了一条TM注册消息,之后发了一条RM注册消息。这个RM消息中resourceIds为什么是null呢,上一篇已经讲过了,这个resourceIds来自注解@TwoPhaseBusinessAction中的name,account-server中并没有这个注解。


上面的日志给我们提供了一个阅读seata-server源代码的线索,跟着日志一步一步查找,就找到了seata-server的启动类Server.java。这儿我们再来看一下seata-server中关键的UML类图:

微信图片_20221212151127.png

Server类是一个启动类,里面启动了netty server监听,还记得我之前的启动命令吗?这儿采用文件方式,启动命令再贴一次:


./seata-server.sh -p 8091 -h 127.0.0.1 -m file

这儿给出启动命令,主要是Server类的main函数会从启动命令参数里面获取值,我们看一下它的main函数,代码如下:

public static void main(String[] args) throws IOException {
    // get port first, use to logback.xml
    int port = PortHelper.getPort(args);//从上面启动命令中获取端口号
    System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));
    // create logger
    final Logger logger = LoggerFactory.getLogger(Server.class);
    if (ContainerHelper.isRunningInContainer()) {//判断是否在容器中运行
        logger.info("The server is running in container.");
    }
    //initialize the parameter parser
    //Note that the parameter parser should always be the first line to execute.
    //Because, here we need to parse the parameters needed for startup.
    ParameterParser parameterParser = new ParameterParser(args);
    //initialize the metrics
    MetricsManager.get().init();//是一个监控,可以使用prometheus,之前的实验中没有配置,暂时不关注
    //在file.conf文件中配置,之前的实验中用的是file模式
    System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
  //下面的代码就是关键了,netty server的初始化
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
    //server port
    nettyRemotingServer.setListenPort(parameterParser.getPort());
    UUIDGenerator.init(parameterParser.getServerNode());
    //log store mode : file, db, redis
    SessionHolder.init(parameterParser.getStoreMode());
  //为netty server配置事务管理器
    DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
    coordinator.init();
    nettyRemotingServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);
    ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
    //127.0.0.1 and 0.0.0.0 are not valid here. //还记得之前实验打印出来的XID的格式吗?以ip:port开头:xid=192.168.59.132:8091:41466478607220736
    if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
        XID.setIpAddress(parameterParser.getHost());
    } else {
        XID.setIpAddress(NetUtil.getLocalIp());//启动命令里面的127.0.0.1不用,这里获取本地ip:192.168.59.132
    }
    XID.setPort(nettyRemotingServer.getListenPort());//8091
    try {
        nettyRemotingServer.init();
    } catch (Throwable e) {
        logger.error("nettyServer init error:{}", e.getMessage(), e);
        System.exit(-1);
    }
    System.exit(0);
}

上面main函数最核心的功能就是为nettyRemotingServer的初始化做准备,包括配置监控、事务协调器、关闭的hook,XID等,接着就初始化server了。基于netty,那肯定是创建监听,还有一些处理器来处理事务,是这样吗?让我们来看下:

public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();
    }
}

下面这段代码是不是似曾相识呢?还记得上篇文章讲的RM和TM客户端初始化吗?这里注册的消息类型更丰富,包括了全局事务的消息类型。

private void registerProcessor() {
    // 1. registry on request message processor
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. registry on response message processor
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
    // 3. registry rm message processor
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. registry tm message processor
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. registry heartbeat message processor
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

看完了这个方法,我们接着看init方法,根据之前画的UML类图,这个init方法在AbstractNettyRemotingServer类,代码如下:

public void init() {
    super.init();
    serverBootstrap.start();
}

继续往上追溯,super.init方法在AbstractNettyRemoting类,代码如下:

public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                if (entry.getValue().isTimeout()) {
                    futures.remove(entry.getKey());
                    entry.getValue().setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

这段代码就是定时地移除超时的消息。我们再看serverBootstrap.start()方法:

public void start() {
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
        .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
            new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                nettyServerConfig.getWriteBufferHighWaterMark()))
        .localAddress(new InetSocketAddress(listenPort))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                    .addLast(new ProtocolV1Decoder())
                    .addLast(new ProtocolV1Encoder());
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers);
                }
            }
        });
    try {
        ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
        LOGGER.info("Server started, listen port: {}", listenPort);
        RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
        initialized.set(true);
        future.channel().closeFuture().sync();
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

这段代码就是初始化netty server,其中的一些参数使用到了seata server配置文件file.conf中的属性值。这里我重点介绍一下channelHandlers,这个变量的赋值在AbstractNettyRemotingServer类,代码如下:

public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
    super(messageExecutor);
    serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
    serverBootstrap.setChannelHandlers(new ServerHandler());
}
ServerHandler类继承了netty库中的ChannelDuplexHandler类,主要功能包括读取channel中消息、释放连接、处理事件等,这里我们看一下channelRead这个方法:
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (!(msg instanceof RpcMessage)) {
        return;
    }
    processMessage(ctx, (RpcMessage) msg);
}
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    //省略部分代码
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        }
                    });
                } catch (RejectedExecutionException e) {
                    //省略部分代码
                }
            } else {
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

上面的代码非常清晰,根据消息类型从processorTable取出对应的消息处理器RemotingProcessor,然后根据Pair上是否为RemotingProcessor绑定了线程池,来决定是否使用线程池来处理相关消息。


最后总结一下,其实seata server的初始化过程比较简单,其实就是一个netty server的初始化过程,它注册了很多消息处理类型,然后给netty绑定了一个自定义的Handler,来处理各种消息。最核心的就是这些处理器如何来处理各种消息,我们放到后面讲解。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
8天前
|
消息中间件 存储 NoSQL
阿里开源中间件一览
阿里开源中间件一览
14 2
|
29天前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
10月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
10月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
10月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
11月前
|
消息中间件 安全 Java
全网首发!消息中间件神仙笔记,涵盖阿里十年技术精髓
消息中间件是分布式系统中的重要组件,在实际工作中常用消息中间件进行系统间数据交换,从而解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。
|
11月前
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。
|
缓存 NoSQL 容灾
《Java应用提速(速度与激情)》——六、阿里中间件提速
《Java应用提速(速度与激情)》——六、阿里中间件提速
|
消息中间件 NoSQL Dubbo
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
一转眼,都2023年了,你是否在满意的公司?拿着理想的薪水? 虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因: 第一,“知其然不知其所以然”。做了多年技术,开发了很多业务应用,但似乎并未思考过种种技术选择背后的逻辑。所以,他无法向面试官展现出自己未来技术能力的成长潜力。面试官也不会放心把具有一定深度的任务交给他。 第二,知识碎片化,不成系统。在面试中,面试者似乎无法完整、清晰地描述自己所开发的系统,或者使用的相关技术。
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)