【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server

本文着重聊一聊seata-server启动时都做了什么?

PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

二、Seata Server启动

Seata Server包含几个主要模块:Config(配置TC)、Store(TC运行时全局事务以及分支事务的相关信息通过Store持久化)、Coordinator(TC实现事务协调的核心)、Netty-RPC(负责TC与TM/RM交互)、Lock(资源全局锁的实现);

1、找入口

当要启动一个seata-server时,只需要执行压缩包中bin/目录下的seata-server.sh,在这个脚本中会运行seata-server.jar

在这里插入图片描述

即对应于源码工程中的server目录 / seata-server 模块,由于seata-server是一个SpringBoot项目,找到其启动类ServerApplication,里面仅仅指定了一个包扫描路径为io.seata,并无其余特殊配置;
在这里插入图片描述

在启动类的同级目录下,有一个ServerRunner类;
在这里插入图片描述

ServerRunner类实现了CommandLineRunner接口:
在这里插入图片描述

CommandLineRunner接口主要用于实现在Spring容器初始化后执行,并且在整个应用生命周期内只会执行一次;也就是说在Spring容器初始化后会执行ServerRunner#run()方法;
在这里插入图片描述

ServerRunner#run()方法中仅仅调用了Server#start()方法;因此可以确定入口为io.seata.server.Server类的start()方法;

2、整体执行流程

Server#start()方法:

public class Server {
    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void start(String[] args) {
        // create logger
        final Logger logger = LoggerFactory.getLogger(Server.class);

        //initialize the parameter parser
        //Note that the parameter parser should always be the first line to execute.
        //Because, here we need to parse the parameters needed for startup.
        // 1. 对配置文件做参数解析:包括registry.conf、file.conf的解析
        ParameterParser parameterParser = new ParameterParser(args);

        // 2、初始化监控,做metric指标采集
        MetricsManager.get().init();

        // 将Store资源持久化方式放到系统的环境变量store.mode中
        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

        // seata server里netty server 的io线程池(核心线程数50,最大线程数100)
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(),
                NettyServerConfig.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

        // 3、创建TC与RM/TM通信的RPC服务器--netty
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

        // 4、初始化UUID生成器(雪花算法)
        UUIDGenerator.init(parameterParser.getServerNode());

        //log store mode : file, db, redis
        // 5、设置事务会话的持久化方式,有三种类型可选:file/db/redis
        SessionHolder.init(parameterParser.getSessionStoreMode());
        LockerManagerFactory.init(parameterParser.getLockStoreMode());

        // 6、创建并初始化事务协调器,创建时后台会启动一堆线程
        DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
        coordinator.init();

        // 将DefaultCoordinator作为Netty Server的transactionMessageHandler;
        // 用于做AT、TCC、SAGA等不同事务类型的逻辑处理
        nettyRemotingServer.setHandler(coordinator);

        // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
        // 7、注册ServerRunner销毁(Spring容器销毁)的回调钩子函数
        ServerRunner.addDisposable(coordinator);

        //127.0.0.1 and 0.0.0.0 are not valid here.
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
            if (StringUtils.isNotBlank(preferredNetworks)) {
                XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
            } else {
                XID.setIpAddress(NetUtil.getLocalIp());
            }
        }
        // 8、启动netty Server,用于接收TM/RM的请求
        nettyRemotingServer.init();
    }
}

Server端的启动流程大致做了八件事:

  1. 对配置文件(包括registry.conf、file.conf)做参数解析;
  2. 初始化监控,做metric指标采集;
  3. 创建TC与RM/TM通信的RPC服务器(NettyRemotingServer)--netty;
  4. 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
  5. 设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化,有三种类型可选:file/db/redis;
  6. 创建并初始化事务协调器(DefaultCoordinator),后台启动一堆线程做定时任务,并将DefaultCoordinator绑定到RPC服务器上做为transactionMessageHandler
  7. 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
  8. 启动netty Server,用于接收TM/RM的请求;

1)对配置文件做参数解析

具体代码执行流程如下:

在这里插入图片描述

ParameterParser的init()方法中:

  1. 首先从启动命令(运行时参数)中解析;
  2. 接着判断server端是否在容器中启动,是则从容器环境中获取seata环境、host、port、serverNode、storeMode存储模式等信息;
  3. 如果storeMode不存在,则从配置中心/文件中获取配置。
// 解析运行期参数,默认什么里面什么都没有
private void getCommandParameters(String[] args) {
    JCommander jCommander = JCommander.newBuilder().addObject(this).build();
    jCommander.parse(args);
    if (help) {
        jCommander.setProgramName(PROGRAM_NAME);
        jCommander.usage();
        System.exit(0);
    }
}

// server端在容器中启动,则从容器环境中读取环境、host、port、server节点以及StoreMode存储模式
private void getEnvParameters() {
    // 设置seata的环境
    if (StringUtils.isBlank(seataEnv)) {
        seataEnv = ContainerHelper.getEnv();
    }
    // 设置Host
    if (StringUtils.isBlank(host)) {
        host = ContainerHelper.getHost();
    }
    // 设置端口号
    if (port == 0) {
        port = ContainerHelper.getPort();
    }
    if (serverNode == null) {
        serverNode = ContainerHelper.getServerNode();
    }
    if (StringUtils.isBlank(storeMode)) {
        storeMode = ContainerHelper.getStoreMode();
    }
    if (StringUtils.isBlank(sessionStoreMode)) {
        sessionStoreMode = ContainerHelper.getSessionStoreMode();
    }
    if (StringUtils.isBlank(lockStoreMode)) {
        lockStoreMode = ContainerHelper.getLockStoreMode();
    }
}

2)初始化监控

在这里插入图片描述

默认不开启,此处不做过多介绍

3)创建TC与RM/TM通信的RPC服务器

在这里插入图片描述

单纯的new一个NettyRemotingServer,也没啥可说的;

4)初始化UUID生成器

UUID底层采用雪花算法,其用于生成全局事务id和分支事务id;

代码执行流程如下:

在这里插入图片描述

UUIDGenerator会委托IdWorker来生成雪花id,生成的雪花Id由0、10位的workerId、41位的时间戳、12位的sequence序列号组成。

IdWorker

IdWorker中有8个重要的成员变量/常量:

/**
 * Start time cut (2020-05-03)
 */
private final long twepoch = 1588435200000L;

/**
 * The number of bits occupied by workerId
 */
private final int workerIdBits = 10;

/**
 * The number of bits occupied by timestamp
 */
private final int timestampBits = 41;

/**
 * The number of bits occupied by sequence
 */
private final int sequenceBits = 12;

/**
 * Maximum supported machine id, the result is 1023
 */
private final int maxWorkerId = ~(-1 << workerIdBits);

/**
 * business meaning: machine ID (0 ~ 1023)
 * actual layout in memory:
 * highest 1 bit: 0
 * middle 10 bit: workerId
 * lowest 53 bit: all 0
 */
private long workerId;

/**
 * 又是一个雪花算法(64位,8字节)
 * timestamp and sequence mix in one Long
 * highest 11 bit: not used
 * middle  41 bit: timestamp
 * lowest  12 bit: sequence
 */
private AtomicLong timestampAndSequence;

/**
 * 从一个long数组类型中抽取出一个时间戳伴随序列号,偏向一个辅助性质
 * mask that help to extract timestamp and sequence from a long
 */
private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));

变量/常量解释:

  1. 常量twepoch表示我们的时间戳时间从2020-05-03开始计算,即当前时间的时间戳需要减去twepoch的值1588435200000L
  2. 常量workerIdBits表示机器号workerId占10位;
  3. 常量timestampBits表示时间戳timestamp占41位;
  4. 常量sequenceBits表示序列化占12位;
  5. 常量maxWorkerId表示机器号的最大值为1023;
  6. long类型的变量workerId本身也是一个雪花算法,只是从开头往后数,第2位开始,一共10位用来表示workerId,其余位全是0;
  7. AtomicLong类型的变量timestampAndSequence,其本身也是一个雪花算法,头11位不使用,中间41位表示timestamp,最后12位表示sequence;
  8. long类型的常量timestampAndSequenceMask,用于从一个完整的雪花ID(long类型)中摘出timestamp 和 sequence

IdWorker构造器中会分别初始化TimestampAndSequence、WorkerId。

1> initTimestampAndSequence()

initTimestampAndSequence()方法负责初始化timestampsequence

private void initTimestampAndSequence() {
    // 拿到当前时间戳 - (2020-05-03 时间戳)的数值,即当前时间相对2020-05-03的时间戳
    long timestamp = getNewestTimestamp();
    // 把时间戳左移12位,后12位流程sequence使用
    long timestampWithSequence = timestamp << sequenceBits;
    // 把混合sequence(默认为0)的时间戳赋值给timestampAndSequence
    this.timestampAndSequence = new AtomicLong(timestampWithSequence);
}

// 获取当前时间戳
private long getNewestTimestamp() {
    //当前时间的时间戳减去2020-05-03的时间戳
    return System.currentTimeMillis() - twepoch;
}

2> initWorkerId(Long)

initWorkerId(Long workerId)方法负责初始化workId,默认不会传过来workerId,如果传过来则使用传过来的workerId,并校验其不能大于1023,然后将其左移53位;

private void initWorkerId(Long workerId) {
   if (workerId == null) {
       // workid为null时,自动生成一个workerId
       workerId = generateWorkerId();
   }
   // workerId最大只能是1023,因为其只占10bit
   if (workerId > maxWorkerId || workerId < 0) {
       String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
       throw new IllegalArgumentException(message);
   }
   this.workerId = workerId << (timestampBits + sequenceBits);
}

如果没传则基于MAC地址生成;
在这里插入图片描述

如果基于MAC地址生成workerId出现异常,则也1023为基数生成一个随机的workerId;
在这里插入图片描述

最后同样,校验workerId不能大于1023,然后将其左移53位;

5)设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化

在这里插入图片描述

1> SessionHolder

SessionHolder负责事务会话Session的持久化,一个session对应一个事务,事务又分为全局事务和分支事务;

SessionHolder支持db,file和redis的持久化方式,其中redis和db支持集群模式,项目上推荐使用redis或db模式;

SessionHolder有五个重要的属性,如下:

// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
// 用于管理分布式锁
private static DistributedLocker DISTRIBUTED_LOCKER;

这五个属性在SessionHolder#init()方法中初始化,init()方法源码如下:

public static void init(String mode) {
    if (StringUtils.isBlank(mode)) {
        mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
                CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
    }
    StoreMode storeMode = StoreMode.get(mode);
    // 根据storeMode采用SPI机制初始化SessionManager
    // db模式
    if (StoreMode.DB.equals(storeMode)) {
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
            new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});

        DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
    } else if (StoreMode.FILE.equals(storeMode)) {
        // 文件模式
        String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                DEFAULT_SESSION_STORE_FILE_DIR);
        if (StringUtils.isBlank(sessionStorePath)) {
            throw new StoreException("the {store.file.dir} is empty.");
        }
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
            new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
        ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
        RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
        RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;

        DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
    } else if (StoreMode.REDIS.equals(storeMode)) {
        // redis模式
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
        ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
            StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
            StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
        RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
            StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});

        DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
    } else {
        // unknown store
        throw new IllegalArgumentException("unknown store mode:" + mode);
    }
    // 根据storeMode重新加载
    reload(storeMode);
}

init()方法中根据storeMode采用SPI机制初始化SessionManager,SessionManager有三个实现类:
在这里插入图片描述

2> LockerManager

在这里插入图片描述

SessionHolder一样,LockManagerFactory#init()方法同样根据storeMode采用SPI机制初始化LockManager,LockManager有三个实现类:

在这里插入图片描述

6)创建并初始化事务协调器(DefaultCoordinator

DefaultCoordinator是事务协调的核心,比如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是通过DefaultCoordinator进行协调处理的。

在这里插入图片描述
(1)先来看DefaultCoordinator的创建;
在这里插入图片描述

使用Double Check Lock(DCL-双重检查锁)机制获取到单例的DefaultCoordinator;如果DefaultCoordinator为实例化过,则new一个:
在这里插入图片描述

DefaultCoordinator的类构造器中,首先绑定远程通信的Server的具体实现到内部成员中,然后实例化一个DefaultCore,DefaultCore是AT、TCC、XA、Saga四种分布式事务模式的具体实现类;
在这里插入图片描述

DefaultCore的类构造器中首先通过SPI机制加载出所有的AbstractCore的子类,一共有四个:ATCore、TccCore、SagaCore、XACore;然后将AbstractCore子类可以处理的事务模式作为Key、AbstractCore子类作为Value存储到一个缓存Map(Map<BranchType, AbstractCore> coreMap)中;

private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();

后续通过BranchType(分支类型)就可以从coreMap中获取到相应事务模式的具体AbstractCore实现类。

(2)初始化DefaultCoordinator;

在这里插入图片描述

所谓的初始化,其实就是后台启动一堆线程做定时任务;去定时处理重试回滚、重试提交、异步提交、超时的检测,以及定时清理undo_log。

除定时清理undo_log外,其余定时任务的处理逻辑基本都是:

  1. 首先获取所有可回滚的全局事务会话Session,如果可回滚的分支事务为空,则直接返回;
  2. 否者,遍历所有的可回滚Session;为了防止重复回滚,如果session的状态是正在回滚中并且session不是死亡的,则直接返回;
  3. 如果Session重试回滚超时,从缓存中删除已经超时的回滚Session;
  4. 发布session回滚完成事件给到Metric,对回滚中的Session添加Session生命周期的监听;
  5. 使用DefaultCoordinator组合的DefaultCore执行全局回滚。

以处理重试回滚的方法handleRetryRollbacking()为例:

protected void handleRetryRollbacking() {
    SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
    sessionCondition.setLazyLoadBranch(true);
    // 获取所有的可回滚的全局事务session
    Collection<GlobalSession> rollbackingSessions =
        SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
    // 如果可回滚的分支事务为空,则直接返回
    if (CollectionUtils.isEmpty(rollbackingSessions)) {
        return;
    }
    long now = System.currentTimeMillis();
    // 遍历所有的可回滚Session,
    SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
        try {
            // prevent repeated rollback
            // 防止重复回滚:如果session的状态是正在回滚中并且session不是死亡的,则直接返回。
            if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
                && !rollbackingSession.isDeadSession()) {
                // The function of this 'return' is 'continue'.
                return;
            }
            // 判断回滚是否重试超时
            if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
                    rollbackingSession.clean();
                }
                // Prevent thread safety issues
                // 删除已经超时的回滚Session
                SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
                LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());

                SessionHelper.endRollbackFailed(rollbackingSession, true);

                // rollback retry timeout event
                // 发布session回滚完成事件给到Metric
                MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);

                //The function of this 'return' is 'continue'.
                return;
            }
            // 对回滚中的Session添加Session生命周期的监听
            rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            // 使用DefaultCoordinator组合的DefaultCore执行全局回滚
            core.doGlobalRollback(rollbackingSession, true);
        } catch (TransactionException ex) {
            LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
        }
    });
}

7)注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator

8)启动NettyServer(NettyRemotingServer)

启动NettyRemotingServer时会做两件事:注册消息处理器、初始化并启动NettyServerBootstrap
在这里插入图片描述

1> 首先注册消息处理器

消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);

在这里插入图片描述

每个消息类型和对应的处理器关系如下:

所谓的注册消息处理器本质上就是将处理器RemotingProcessor和处理消息的线程池ExecutorService包装成一个Pair,然后将Pair作为Value,messageType作为key放入一个Map(processorTable)中;
在这里插入图片描述

/**
 * This container holds all processors.
 * processor type {@link MessageType}
 */
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

2> 初始化NettyRemotingServer

在初始化NettyRemotingServer之前会通过AtomicBoolean类型的原子变量initialized + CAS操作确保仅会有一个线程进行NettyRemotingServer的初始化;

在这里插入图片描述

再看NettyRemotingServer的类继承图:
在这里插入图片描述

CAS成功后进入到NettyRemotingServer的父类AbstractNettyRemotingServer#init()方法;

在这里插入图片描述

方法中:

(1)首先调用父类AbstractNettyRemoting的init()方法:
在这里插入图片描述
启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;

(2)紧接着启动ServerBootstrap(就正常的nettyServer启动):

在这里插入图片描述
NettyRemotingServer在启动的过程中设置了4个ChannelHandler:

  1. IdleStateHandler:处理心跳
  2. ProtocolV1Decoder:消息解码器
  3. ProtocolV1Encoder:消息编码器
  4. AbstractNettyRemotingServer.ServerHandler:处理各种消息
AbstractNettyRemotingServer.ServerHandler类

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

三、总结和后续

本文我们聊了Seata Server启动时都做了哪些事?博主总结一共八件事:

  1. 对配置文件(包括registry.conf、file.conf)做参数解析;
  2. 初始化监控,做metric指标采集;
  3. 创建TC与RM/TM通信的RPC服务器(NettyRemotingServer)--netty;
  4. 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
  5. 设置事务会话(SessionHolder)、全局锁(LockManager)的持久化方式并初始化,有三种类型可选:file/db/redis;
  6. 创建并初始化事务协调器(DefaultCoordinator),后台启动一堆线程做定时任务,并将DefaultCoordinator绑定到RPC服务器上做为transactionMessageHandler
  7. 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
  8. 启动netty Server,用于接收TM/RM的请求;

下一篇文章我们聊一下Seata Client(AT模式下仅作为RM时)启动时都做了什么?

相关文章
|
16天前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
65 6
|
16天前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
30 1
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
73 3
|
1月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
113 3
|
2月前
|
Java 对象存储 开发者
解析Spring Cloud与Netflix OSS:微服务架构中的左右手如何协同作战
Spring Cloud与Netflix OSS不仅是现代微服务架构中不可或缺的一部分,它们还通过不断的技术创新和社区贡献推动了整个行业的发展。无论是对于初创企业还是大型组织来说,掌握并合理运用这两套工具,都能极大地提升软件系统的灵活性、可扩展性以及整体性能。随着云计算和容器化技术的进一步普及,Spring Cloud与Netflix OSS将继续引领微服务技术的发展潮流。
61 0
|
1月前
|
监控 安全 Java
构建高效后端服务:微服务架构深度解析与最佳实践###
【10月更文挑战第19天】 在数字化转型加速的今天,企业对后端服务的响应速度、可扩展性和灵活性提出了更高要求。本文探讨了微服务架构作为解决方案,通过分析传统单体架构面临的挑战,深入剖析微服务的核心优势、关键组件及设计原则。我们将从实际案例入手,揭示成功实施微服务的策略与常见陷阱,为开发者和企业提供可操作的指导建议。本文目的是帮助读者理解如何利用微服务架构提升后端服务的整体效能,实现业务快速迭代与创新。 ###
63 2
|
1月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
49 3
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
56 3
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
1月前
|
存储 Kubernetes 监控
深度解析Kubernetes在微服务架构中的应用与优化
【10月更文挑战第18天】深度解析Kubernetes在微服务架构中的应用与优化
111 0

推荐镜像

更多
下一篇
无影云桌面