【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server

本文着重聊一聊seata-server启动时都做了什么?

PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

二、Seata Server启动

Seata Server包含几个主要模块:Config(配置TC)、Store(TC运行时全局事务以及分支事务的相关信息通过Store持久化)、Coordinator(TC实现事务协调的核心)、Netty-RPC(负责TC与TM/RM交互)、Lock(资源全局锁的实现);

1、找入口

当要启动一个seata-server时,只需要执行压缩包中bin/目录下的seata-server.sh,在这个脚本中会运行seata-server.jar

在这里插入图片描述

即对应于源码工程中的server目录 / seata-server 模块,由于seata-server是一个SpringBoot项目,找到其启动类ServerApplication,里面仅仅指定了一个包扫描路径为io.seata,并无其余特殊配置;
在这里插入图片描述

在启动类的同级目录下,有一个ServerRunner类;
在这里插入图片描述

ServerRunner类实现了CommandLineRunner接口:
在这里插入图片描述

CommandLineRunner接口主要用于实现在Spring容器初始化后执行,并且在整个应用生命周期内只会执行一次;也就是说在Spring容器初始化后会执行ServerRunner#run()方法;
在这里插入图片描述

ServerRunner#run()方法中仅仅调用了Server#start()方法;因此可以确定入口为io.seata.server.Server类的start()方法;

2、整体执行流程

Server#start()方法:

public class Server {
    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void start(String[] args) {
        // create logger
        final Logger logger = LoggerFactory.getLogger(Server.class);

        //initialize the parameter parser
        //Note that the parameter parser should always be the first line to execute.
        //Because, here we need to parse the parameters needed for startup.
        // 1. 对配置文件做参数解析:包括registry.conf、file.conf的解析
        ParameterParser parameterParser = new ParameterParser(args);

        // 2、初始化监控,做metric指标采集
        MetricsManager.get().init();

        // 将Store资源持久化方式放到系统的环境变量store.mode中
        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

        // seata server里netty server 的io线程池(核心线程数50,最大线程数100)
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(),
                NettyServerConfig.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

        // 3、创建TC与RM/TM通信的RPC服务器--netty
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

        // 4、初始化UUID生成器(雪花算法)
        UUIDGenerator.init(parameterParser.getServerNode());

        //log store mode : file, db, redis
        // 5、设置事务会话的持久化方式,有三种类型可选:file/db/redis
        SessionHolder.init(parameterParser.getSessionStoreMode());
        LockerManagerFactory.init(parameterParser.getLockStoreMode());

        // 6、创建并初始化事务协调器,创建时后台会启动一堆线程
        DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
        coordinator.init();

        // 将DefaultCoordinator作为Netty Server的transactionMessageHandler;
        // 用于做AT、TCC、SAGA等不同事务类型的逻辑处理
        nettyRemotingServer.setHandler(coordinator);

        // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
        // 7、注册ServerRunner销毁(Spring容器销毁)的回调钩子函数
        ServerRunner.addDisposable(coordinator);

        //127.0.0.1 and 0.0.0.0 are not valid here.
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
            if (StringUtils.isNotBlank(preferredNetworks)) {
                XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
            } else {
                XID.setIpAddress(NetUtil.getLocalIp());
            }
        }
        // 8、启动netty Server,用于接收TM/RM的请求
        nettyRemotingServer.init();
    }
}

Server端的启动流程大致做了八件事:

  1. 对配置文件(包括registry.conf、file.conf)做参数解析;
  2. 初始化监控,做metric指标采集;
  3. 创建TC与RM/TM通信的RPC服务器(NettyRemotingServer)--netty;
  4. 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
  5. 设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化,有三种类型可选:file/db/redis;
  6. 创建并初始化事务协调器(DefaultCoordinator),后台启动一堆线程做定时任务,并将DefaultCoordinator绑定到RPC服务器上做为transactionMessageHandler
  7. 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
  8. 启动netty Server,用于接收TM/RM的请求;

1)对配置文件做参数解析

具体代码执行流程如下:

在这里插入图片描述

ParameterParser的init()方法中:

  1. 首先从启动命令(运行时参数)中解析;
  2. 接着判断server端是否在容器中启动,是则从容器环境中获取seata环境、host、port、serverNode、storeMode存储模式等信息;
  3. 如果storeMode不存在,则从配置中心/文件中获取配置。
// 解析运行期参数,默认什么里面什么都没有
private void getCommandParameters(String[] args) {
    JCommander jCommander = JCommander.newBuilder().addObject(this).build();
    jCommander.parse(args);
    if (help) {
        jCommander.setProgramName(PROGRAM_NAME);
        jCommander.usage();
        System.exit(0);
    }
}

// server端在容器中启动,则从容器环境中读取环境、host、port、server节点以及StoreMode存储模式
private void getEnvParameters() {
    // 设置seata的环境
    if (StringUtils.isBlank(seataEnv)) {
        seataEnv = ContainerHelper.getEnv();
    }
    // 设置Host
    if (StringUtils.isBlank(host)) {
        host = ContainerHelper.getHost();
    }
    // 设置端口号
    if (port == 0) {
        port = ContainerHelper.getPort();
    }
    if (serverNode == null) {
        serverNode = ContainerHelper.getServerNode();
    }
    if (StringUtils.isBlank(storeMode)) {
        storeMode = ContainerHelper.getStoreMode();
    }
    if (StringUtils.isBlank(sessionStoreMode)) {
        sessionStoreMode = ContainerHelper.getSessionStoreMode();
    }
    if (StringUtils.isBlank(lockStoreMode)) {
        lockStoreMode = ContainerHelper.getLockStoreMode();
    }
}

2)初始化监控

在这里插入图片描述

默认不开启,此处不做过多介绍

3)创建TC与RM/TM通信的RPC服务器

在这里插入图片描述

单纯的new一个NettyRemotingServer,也没啥可说的;

4)初始化UUID生成器

UUID底层采用雪花算法,其用于生成全局事务id和分支事务id;

代码执行流程如下:

在这里插入图片描述

UUIDGenerator会委托IdWorker来生成雪花id,生成的雪花Id由0、10位的workerId、41位的时间戳、12位的sequence序列号组成。

IdWorker

IdWorker中有8个重要的成员变量/常量:

/**
 * Start time cut (2020-05-03)
 */
private final long twepoch = 1588435200000L;

/**
 * The number of bits occupied by workerId
 */
private final int workerIdBits = 10;

/**
 * The number of bits occupied by timestamp
 */
private final int timestampBits = 41;

/**
 * The number of bits occupied by sequence
 */
private final int sequenceBits = 12;

/**
 * Maximum supported machine id, the result is 1023
 */
private final int maxWorkerId = ~(-1 << workerIdBits);

/**
 * business meaning: machine ID (0 ~ 1023)
 * actual layout in memory:
 * highest 1 bit: 0
 * middle 10 bit: workerId
 * lowest 53 bit: all 0
 */
private long workerId;

/**
 * 又是一个雪花算法(64位,8字节)
 * timestamp and sequence mix in one Long
 * highest 11 bit: not used
 * middle  41 bit: timestamp
 * lowest  12 bit: sequence
 */
private AtomicLong timestampAndSequence;

/**
 * 从一个long数组类型中抽取出一个时间戳伴随序列号,偏向一个辅助性质
 * mask that help to extract timestamp and sequence from a long
 */
private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));

变量/常量解释:

  1. 常量twepoch表示我们的时间戳时间从2020-05-03开始计算,即当前时间的时间戳需要减去twepoch的值1588435200000L
  2. 常量workerIdBits表示机器号workerId占10位;
  3. 常量timestampBits表示时间戳timestamp占41位;
  4. 常量sequenceBits表示序列化占12位;
  5. 常量maxWorkerId表示机器号的最大值为1023;
  6. long类型的变量workerId本身也是一个雪花算法,只是从开头往后数,第2位开始,一共10位用来表示workerId,其余位全是0;
  7. AtomicLong类型的变量timestampAndSequence,其本身也是一个雪花算法,头11位不使用,中间41位表示timestamp,最后12位表示sequence;
  8. long类型的常量timestampAndSequenceMask,用于从一个完整的雪花ID(long类型)中摘出timestamp 和 sequence

IdWorker构造器中会分别初始化TimestampAndSequence、WorkerId。

1> initTimestampAndSequence()

initTimestampAndSequence()方法负责初始化timestampsequence

private void initTimestampAndSequence() {
    // 拿到当前时间戳 - (2020-05-03 时间戳)的数值,即当前时间相对2020-05-03的时间戳
    long timestamp = getNewestTimestamp();
    // 把时间戳左移12位,后12位流程sequence使用
    long timestampWithSequence = timestamp << sequenceBits;
    // 把混合sequence(默认为0)的时间戳赋值给timestampAndSequence
    this.timestampAndSequence = new AtomicLong(timestampWithSequence);
}

// 获取当前时间戳
private long getNewestTimestamp() {
    //当前时间的时间戳减去2020-05-03的时间戳
    return System.currentTimeMillis() - twepoch;
}

2> initWorkerId(Long)

initWorkerId(Long workerId)方法负责初始化workId,默认不会传过来workerId,如果传过来则使用传过来的workerId,并校验其不能大于1023,然后将其左移53位;

private void initWorkerId(Long workerId) {
   if (workerId == null) {
       // workid为null时,自动生成一个workerId
       workerId = generateWorkerId();
   }
   // workerId最大只能是1023,因为其只占10bit
   if (workerId > maxWorkerId || workerId < 0) {
       String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
       throw new IllegalArgumentException(message);
   }
   this.workerId = workerId << (timestampBits + sequenceBits);
}

如果没传则基于MAC地址生成;
在这里插入图片描述

如果基于MAC地址生成workerId出现异常,则也1023为基数生成一个随机的workerId;
在这里插入图片描述

最后同样,校验workerId不能大于1023,然后将其左移53位;

5)设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化

在这里插入图片描述

1> SessionHolder

SessionHolder负责事务会话Session的持久化,一个session对应一个事务,事务又分为全局事务和分支事务;

SessionHolder支持db,file和redis的持久化方式,其中redis和db支持集群模式,项目上推荐使用redis或db模式;

SessionHolder有五个重要的属性,如下:

// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
// 用于管理分布式锁
private static DistributedLocker DISTRIBUTED_LOCKER;

这五个属性在SessionHolder#init()方法中初始化,init()方法源码如下:

public static void init(String mode) {
    if (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
                CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
    }
    StoreMode storeMode = StoreMode.get(mode);
    // 根据storeMode采用SPI机制初始化SessionManager
    // db模式
    if (StoreMode.DB.equals(storeMode)) {
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});

        DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
    } else if (StoreMode.FILE.equals(storeMode)) {
        // 文件模式
        String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                DEFAULT_SESSION_STORE_FILE_DIR);
        if (StringUtils.isBlank(sessionStorePath)) {
            throw new StoreException("the {store.file.dir} is empty.");
        }
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
            new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
        ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
        RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
        RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;

        DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
    } else if (StoreMode.REDIS.equals(storeMode)) {
        // redis模式
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
            StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
            StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
            StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});

        DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
    } else {
        // unknown store
        throw new IllegalArgumentException("unknown store mode:" + mode);
    }
    // 根据storeMode重新加载
    reload(storeMode);
}

init()方法中根据storeMode采用SPI机制初始化SessionManager,SessionManager有三个实现类:
在这里插入图片描述

2> LockerManager

在这里插入图片描述

SessionHolder一样,LockManagerFactory#init()方法同样根据storeMode采用SPI机制初始化LockManager,LockManager有三个实现类:

在这里插入图片描述

6)创建并初始化事务协调器(DefaultCoordinator

DefaultCoordinator是事务协调的核心,比如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是通过DefaultCoordinator进行协调处理的。

在这里插入图片描述
(1)先来看DefaultCoordinator的创建;
在这里插入图片描述

使用Double Check Lock(DCL-双重检查锁)机制获取到单例的DefaultCoordinator;如果DefaultCoordinator为实例化过,则new一个:
在这里插入图片描述

DefaultCoordinator的类构造器中,首先绑定远程通信的Server的具体实现到内部成员中,然后实例化一个DefaultCore,DefaultCore是AT、TCC、XA、Saga四种分布式事务模式的具体实现类;
在这里插入图片描述

DefaultCore的类构造器中首先通过SPI机制加载出所有的AbstractCore的子类,一共有四个:ATCore、TccCore、SagaCore、XACore;然后将AbstractCore子类可以处理的事务模式作为Key、AbstractCore子类作为Value存储到一个缓存Map(Map<BranchType, AbstractCore> coreMap)中;

private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();

后续通过BranchType(分支类型)就可以从coreMap中获取到相应事务模式的具体AbstractCore实现类。

(2)初始化DefaultCoordinator;

在这里插入图片描述

所谓的初始化,其实就是后台启动一堆线程做定时任务;去定时处理重试回滚、重试提交、异步提交、超时的检测,以及定时清理undo_log。

除定时清理undo_log外,其余定时任务的处理逻辑基本都是:

  1. 首先获取所有可回滚的全局事务会话Session,如果可回滚的分支事务为空,则直接返回;
  2. 否者,遍历所有的可回滚Session;为了防止重复回滚,如果session的状态是正在回滚中并且session不是死亡的,则直接返回;
  3. 如果Session重试回滚超时,从缓存中删除已经超时的回滚Session;
  4. 发布session回滚完成事件给到Metric,对回滚中的Session添加Session生命周期的监听;
  5. 使用DefaultCoordinator组合的DefaultCore执行全局回滚。

以处理重试回滚的方法handleRetryRollbacking()为例:

protected void handleRetryRollbacking() {
    SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
    sessionCondition.setLazyLoadBranch(true);
    // 获取所有的可回滚的全局事务session
    Collection<GlobalSession> rollbackingSessions =
        SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
    // 如果可回滚的分支事务为空,则直接返回
    if (CollectionUtils.isEmpty(rollbackingSessions)) {
        return;
    }
    long now = System.currentTimeMillis();
    // 遍历所有的可回滚Session,
    SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
        try {
            // prevent repeated rollback
            // 防止重复回滚:如果session的状态是正在回滚中并且session不是死亡的,则直接返回。
            if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
                && !rollbackingSession.isDeadSession()) {
                // The function of this 'return' is 'continue'.
                return;
            }
            // 判断回滚是否重试超时
            if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
                    rollbackingSession.clean();
                }
                // Prevent thread safety issues
                // 删除已经超时的回滚Session
                SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
                LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());

                SessionHelper.endRollbackFailed(rollbackingSession, true);

                // rollback retry timeout event
                // 发布session回滚完成事件给到Metric
                MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);

                //The function of this 'return' is 'continue'.
                return;
            }
            // 对回滚中的Session添加Session生命周期的监听
            rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            // 使用DefaultCoordinator组合的DefaultCore执行全局回滚
            core.doGlobalRollback(rollbackingSession, true);
        } catch (TransactionException ex) {
            LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
        }
    });
}

7)注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator

8)启动NettyServer(NettyRemotingServer)

启动NettyRemotingServer时会做两件事:注册消息处理器、初始化并启动NettyServerBootstrap
在这里插入图片描述

1> 首先注册消息处理器

消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);

在这里插入图片描述

每个消息类型和对应的处理器关系如下:

所谓的注册消息处理器本质上就是将处理器RemotingProcessor和处理消息的线程池ExecutorService包装成一个Pair,然后将Pair作为Value,messageType作为key放入一个Map(processorTable)中;
在这里插入图片描述

/**
 * This container holds all processors.
 * processor type {@link MessageType}
 */
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

2> 初始化NettyRemotingServer

在初始化NettyRemotingServer之前会通过AtomicBoolean类型的原子变量initialized + CAS操作确保仅会有一个线程进行NettyRemotingServer的初始化;

在这里插入图片描述

再看NettyRemotingServer的类继承图:
在这里插入图片描述

CAS成功后进入到NettyRemotingServer的父类AbstractNettyRemotingServer#init()方法;

在这里插入图片描述

方法中:

(1)首先调用父类AbstractNettyRemoting的init()方法:
在这里插入图片描述
启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;

(2)紧接着启动ServerBootstrap(就正常的nettyServer启动):

在这里插入图片描述
NettyRemotingServer在启动的过程中设置了4个ChannelHandler:

  1. IdleStateHandler:处理心跳
  2. ProtocolV1Decoder:消息解码器
  3. ProtocolV1Encoder:消息编码器
  4. AbstractNettyRemotingServer.ServerHandler:处理各种消息
AbstractNettyRemotingServer.ServerHandler类

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

三、总结和后续

本文我们聊了Seata Server启动时都做了哪些事?博主总结一共八件事:

  1. 对配置文件(包括registry.conf、file.conf)做参数解析;
  2. 初始化监控,做metric指标采集;
  3. 创建TC与RM/TM通信的RPC服务器(NettyRemotingServer)--netty;
  4. 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
  5. 设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化,有三种类型可选:file/db/redis;
  6. 创建并初始化事务协调器(DefaultCoordinator),后台启动一堆线程做定时任务,并将DefaultCoordinator绑定到RPC服务器上做为transactionMessageHandler
  7. 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
  8. 启动netty Server,用于接收TM/RM的请求;

下一篇文章我们聊一下Seata Client(AT模式下仅作为RM时)启动时都做了什么?

相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
2月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
70 1
|
27天前
|
运维 监控 持续交付
微服务架构解析:跨越传统架构的技术革命
微服务架构(Microservices Architecture)是一种软件架构风格,它将一个大型的单体应用拆分为多个小而独立的服务,每个服务都可以独立开发、部署和扩展。
163 36
微服务架构解析:跨越传统架构的技术革命
|
12天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
173 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
9天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
25天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
177 7
|
2月前
|
弹性计算 持续交付 API
构建高效后端服务:微服务架构的深度解析与实践
在当今快速发展的软件行业中,构建高效、可扩展且易于维护的后端服务是每个技术团队的追求。本文将深入探讨微服务架构的核心概念、设计原则及其在实际项目中的应用,通过具体案例分析,展示如何利用微服务架构解决传统单体应用面临的挑战,提升系统的灵活性和响应速度。我们将从微服务的拆分策略、通信机制、服务发现、配置管理、以及持续集成/持续部署(CI/CD)等方面进行全面剖析,旨在为读者提供一套实用的微服务实施指南。
|
2月前
|
负载均衡 Java 持续交付
深入解析微服务架构中的服务发现与负载均衡
深入解析微服务架构中的服务发现与负载均衡
79 7
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
80 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
43 6

推荐镜像

更多