Seata Transaction Coordinator

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中最核心的模块 Transaction Coordinator 的实现。

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中最核心的模块 Transaction Coordinator 的实现,其他 Seata 相关文章均收录于 <Seata系列文章>中。

TC

seata-server
Transaction Coordinator 整体的模块图如上所示:

  • Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如分支的注册, commit, rollback 等协调活动。
  • Store: 存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
  • Discover: 服务注册/发现模块,用于将 Server 地址暴露给我们 Client。
  • Config: 用来存储和查找我们服务端的配置。
  • Lock: 锁模块,用于给 Seata 提供全局锁的功能。
  • Rpc: 用于和其他端通信。
  • HA-Cluster: 高可用集群,目前还没开源。为 Seata 提供可靠的高可用功能。

Discover

首先来讲讲比较基础的 Discover 模块,又称服务注册/发现模块。我们将 TC 启动之后,需要将自己的地址暴露给其他使用者 TM & RM, 这部分工作就是由 Discover 模块实现的。

public interface RegistryService<T> {

    /**
     * The constant PREFIX_SERVICE_MAPPING.
     */
    String PREFIX_SERVICE_MAPPING = "vgroup_mapping.";
    /**
     * The constant PREFIX_SERVICE_ROOT.
     */
    String PREFIX_SERVICE_ROOT = "service";
    /**
     * The constant CONFIG_SPLIT_CHAR.
     */
    String CONFIG_SPLIT_CHAR = ".";

    /**
     * Register.
     *
     * @param address the address
     * @throws Exception the exception
     */
    void register(InetSocketAddress address) throws Exception;

    /**
     * Unregister.
     *
     * @param address the address
     * @throws Exception the exception
     */
    void unregister(InetSocketAddress address) throws Exception;

    /**
     * Subscribe.
     *
     * @param cluster  the cluster
     * @param listener the listener
     * @throws Exception the exception
     */
    void subscribe(String cluster, T listener) throws Exception;

    /**
     * Unsubscribe.
     *
     * @param cluster  the cluster
     * @param listener the listener
     * @throws Exception the exception
     */
    void unsubscribe(String cluster, T listener) throws Exception;

    /**
     * Lookup list.
     *
     * @param key the key
     * @return the list
     * @throws Exception the exception
     */
    List<InetSocketAddress> lookup(String key) throws Exception;

    /**
     * Close.
     * @throws Exception
     */
    void close() throws Exception;
}

这个模块有个核心接口 RegistryService,如上图所示:

  • register:TC 使用,进行服务注册。
  • unregister:TC 使用,一般在JVM关闭钩子,ShutdownHook中调用。
  • subscribe:TM RM 使用,注册监听事件,用来监听地址的变化。
  • unsubscribe:TM RM 使用,取消注册监听事件, 一般在JVM关闭钩子,ShutdownHook中调用。
  • lookup:TM RM使用,根据key查找服务地址列表。
  • close:都可以使用,用于关闭Register资源。

如果需要添加自己定义的服务注册/发现,那么实现这个接口即可。截止目前在社区的不断开发推动下,已经有七种服务注册/发现,分别是consul,etcd3,sofa,redis, zk, nacos, eruka。下面简单介绍下 redis 的实现:

register

@Override
public void register(InetSocketAddress address) {
    // 校验地址是否合法
    NetUtil.validAddress(address);
    String serverAddr = NetUtil.toStringAddress(address);
    // 获取 Redis 的实例
    Jedis jedis = jedisPool.getResource();
    try {
        // 将地址注册到当前 Redis 上面。
        jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName());
        // 发送注册成功的通知
        jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
    } finally {
        jedis.close();
    }
}

流程如下:

  1. 校验地址是否合法
  2. 获取 Redis 的实例,然后将地址注册到当前 Redis 上面。
  3. 发送注册成功的通知

unregister接口类似,就是反方向操作, 这里不做详解。

lookup

@Override
public List<InetSocketAddress> lookup(String key) {
    Configuration config = ConfigurationFactory.getInstance();
    // 获取当前 clusterName 名字
    String clusterName = config.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key);
    if (null == clusterName) {
        return null;
    }
    // 判断当前 cluster 是否已经获取过了,如果获取过就从map中取
    if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
        Jedis jedis = jedisPool.getResource();
        Map<String, String> instances = null;
        // 从 Redis 拿到地址数据,将其转换成我们所需要的
        try {
            instances = jedis.hgetAll(getRedisRegistryKey());
        } finally {
            jedis.close();
        }
        if (null != instances && !instances.isEmpty()) {
            Set<InetSocketAddress> newAddressSet = new HashSet<>();
            for (Map.Entry<String, String> instance : instances.entrySet()) {
                String serverAddr = instance.getKey();
                newAddressSet.add(NetUtil.toInetSocketAddress(serverAddr));
            }
            CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
        }
        // 将数据变更的 Listener 注册到 Redis
        subscribe(clusterName, new RedisListener() {
            @Override
            public void onEvent(String msg) {
                String[] msgr = msg.split("-");
                String serverAddr = msgr[0];
                String eventType = msgr[1];
                switch (eventType) {
                    case RedisListener.REGISTER:
                        CLUSTER_ADDRESS_MAP.get(clusterName).add(NetUtil.toInetSocketAddress(serverAddr));
                        break;
                    case RedisListener.UN_REGISTER:
                        CLUSTER_ADDRESS_MAP.get(clusterName).remove(NetUtil.toInetSocketAddress(serverAddr));
                        break;
                    default:
                        throw new ShouldNeverHappenException("unknown redis msg:" + msg);
                }
            }
        });
    }
    return new ArrayList<>(CLUSTER_ADDRESS_MAP.get(clusterName));
}

订阅的过程如下:

  1. 获取当前 clusterName 名字
  2. 判断当前 cluster 是否已经获取过了,如果获取过就从map中取。
  3. 从 Redis 拿到地址数据,将其转换成我们所需要的数据。
  4. 将数据变动的 Listener 注册到 Redis

其实这里面有个问题, 如果获取了服务器列表, 但是还没来得及注册订阅时, 发生了服务器列表变化, 那么客户端会感知不到, 但是这个问题在 Redis 中确实没有什么好的办法解决, 毕竟 Redis 没有提供机制来解决这个问题。但是 etcd3 中是有机制来解决的, 获取数据时能拿到当时的版本号, 然后订阅时从该版本号开始即可。然后我看了一下基于 etcd3 的 RegistryService 实现, 发现它并没有使用该机制。于是我提了一个 issuePR, 感兴趣的同学可以去看一下。

subscribe

@Override
public void subscribe(String cluster, RedisListener listener) {
    // 存储该 listener
    String redisRegistryKey = REDIS_FILEKEY_PREFIX + cluster;
    LISTENER_SERVICE_MAP.putIfAbsent(cluster, new ArrayList<>());
    LISTENER_SERVICE_MAP.get(cluster).add(listener);
    threadPoolExecutor.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Jedis jedis = jedisPool.getResource();
                try {
                    // 向 Redis 注册
                    jedis.subscribe(new NotifySub(LISTENER_SERVICE_MAP.get(cluster)), redisRegistryKey);
                } finally {
                    jedis.close();
                }
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
    });
}

流程如下:

  1. 存储该 listener
  2. 向 Redis 注册

RegistryService 的主要功能就这些了, TM 和 TC 是通过 lookup 找到服务器列表之后, 会根据设定的负载均衡策略请求 TC, 接下来我们看一看 loadbalance。

loadbalance
public interface LoadBalance {

    /**
     * Select t.
     *
     * @param <T>      the type parameter
     * @param invokers the invokers
     * @return the t
     * @throws Exception the exception
     */
    <T> T select(List<T> invokers) throws Exception;
}

这个接口的实现比较简单, 目前就只有随机和轮训。

Config

配置模块也是一个比较基础,比较简单的模块。我们需要配置一些常用的参数比如:Netty的select线程数量,work线程数量,session允许最大为多少等等,当然这些参数在 Seata 中都有自己的默认设置。

同样的在 Seata 中也提供了一个接口 Configuration,通过它来存取配置内容:

public interface Configuration<T> {
    // 这里只保留了 getShort 其他都类似
    /**
     * Gets short.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @param timeoutMills the timeout mills
     * @return the short
     */
    short getShort(String dataId, int defaultValue, long timeoutMills);

    /**
     * Gets short.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @return the int
     */
    short getShort(String dataId, short defaultValue);

    /**
     * Gets short.
     *
     * @param dataId the data id
     * @return the int
     */
    short getShort(String dataId);

    /**
     * Gets config.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @param timeoutMills the timeout mills
     * @return the config
     */
    String getConfig(String dataId, String defaultValue, long timeoutMills);

    /**
     * Gets config.
     *
     * @param dataId       the data id
     * @param defaultValue the default value
     * @return the config
     */
    String getConfig(String dataId, String defaultValue);

    /**
     * Gets config.
     *
     * @param dataId       the data id
     * @param timeoutMills the timeout mills
     * @return the config
     */
    String getConfig(String dataId, long timeoutMills);

    /**
     * Gets config.
     *
     * @param dataId the data id
     * @return the config
     */
    String getConfig(String dataId);

    /**
     * Put config boolean.
     *
     * @param dataId       the data id
     * @param content      the content
     * @param timeoutMills the timeout mills
     * @return the boolean
     */
    boolean putConfig(String dataId, String content, long timeoutMills);

    /**
     * Put config boolean.
     *
     * @param dataId  the data id
     * @param content the content
     * @return the boolean
     */
    boolean putConfig(String dataId, String content);

    /**
     * Put config if absent boolean.
     *
     * @param dataId       the data id
     * @param content      the content
     * @param timeoutMills the timeout mills
     * @return the boolean
     */
    boolean putConfigIfAbsent(String dataId, String content, long timeoutMills);

    /**
     * Put config if absent boolean.
     *
     * @param dataId  the data id
     * @param content the content
     * @return the boolean
     */
    boolean putConfigIfAbsent(String dataId, String content);

    /**
     * Remove config boolean.
     *
     * @param dataId       the data id
     * @param timeoutMills the timeout mills
     * @return the boolean
     */
    boolean removeConfig(String dataId, long timeoutMills);

    /**
     * Remove config boolean.
     *
     * @param dataId the data id
     * @return the boolean
     */
    boolean removeConfig(String dataId);

    /**
     * Add config listener.
     *
     * @param dataId   the data id
     * @param listener the listener
     */
    void addConfigListener(String dataId, T listener);

    /**
     * Remove config listener.
     *
     * @param dataId   the data id
     * @param listener the listener
     */
    void removeConfigListener(String dataId, T listener);

    /**
     * Gets config listeners.
     *
     * @param dataId the data id
     * @return the config listeners
     */
    List<T> getConfigListeners(String dataId);

    /**
     * Gets config from sys pro.
     *
     * @param dataId the data id
     * @return the config from sys pro
     */
    default String getConfigFromSysPro(String dataId) {
        return System.getProperty(dataId);
    }

}
  • getShort/getInt/Long/Boolean/Config():通过dataId来获取对应的值。
  • putConfig:用于添加配置。
  • removeConfig:删除一个配置。
  • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。

目前为止有四种方式获取 Config:File(文件获取), Nacos, Apollo, ZK,etcd。在 Seata 中首先现在项目 resources 下保存一个 registry.conf 文件,在该文件中配置具体使用 Config 接口哪个实现类。

// registry.conf 相关内容
config {
  # file、nacos 、apollo、zk、consul
  type = "file"

  file {
    name = "file.conf"
  }
}

config 相关的内容我们就不多描述了, 就是简单地存取数据, 发生变化时通知各个节点进行改变。

Store

存储层的实现对于 Seata 是否高性能,是否可靠非常关键。

如果存储层没有实现好,那么如果发生宕机,在 TC 中正在进行分布式事务处理的数据将会被丢失,既然使用了分布式事务,那么其肯定不能容忍丢失。如果存储层实现好了,但是其性能有很大问题,RM 可能会发生频繁回滚那么其完全无法应对高并发的场景。

在 Seata 中默认提供了文件方式的存储,下面我们定义我们存储的数据为 Session,而我们的TM创造的全局事务数据叫 GlobalSession,RM 创造的分支事务叫 BranchSession,一个 GlobalSession 可以拥有多个 BranchSession。我们的目的就是要将这么多 Session 存储下来。

Seata 中目前有 2 种实现方案, 一种是基于文件的, 一种是基于 DB 的, 我们接下来会分别介绍。

File

基于文件的实现是 FileTransactionStoreManager, 它可以使用同步刷盘或异步刷盘的策略,每当有 Session 的状态的更新时,它都会将变化的内容存储起来。为了防止存储文件的无限增殖,当达到一定条件时,它会另打开一个文件从头开始记录,并将之前的文件保存起来。这里有一个非常巧妙的设计,就是该方案既能保证所有超时事务不丢,只有已完成的事务被清除,同时文件的大小也得到了控制。我们会结合代码来介绍 Seata 是如何做到的。

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    // 靠锁保证安全
    writeSessionLock.lock();
    long curFileTrxNum;
    try {
        // 实际的写数据过程,将编码后的比特数组写入 FileChannel
        if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
            return false;
        }
        lastModifiedTime = System.currentTimeMillis();
        curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
        // 如果当前事务存储文件已经累计记录一定数量的事务,并且该文件使用时间达标,则进行当前文件的保存和新文件的创建
        if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 &&
            (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
            return saveHistory();
        }
    } catch (Exception exx) {
        LOGGER.error("writeSession error," + exx.getMessage());
        return false;
    } finally {
        writeSessionLock.unlock();
    }
    // 实际刷盘过程,根据配置,可以是同步也可以是异步
    flushDisk(curFileTrxNum, currFileChannel);
    return true;
}

上面的代码,就是存储 Session 的入口,其中 logOperation 可以是增加、删除、更新,session 可以是 GlobalSession,也可以是 BranchSession。其中就 3 个关键函数,writeDataFilesaveHistoryflushDisk,我们分别介绍一下它们。

private boolean writeDataFile(byte[] bs) {
    if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {
        return false;
    }
    ByteBuffer byteBuffer = null;

    // 有一个默认缓存,如果该缓存太小,则临时申请
    if (bs.length + 4 > MAX_WRITE_BUFFER_SIZE) {
        //allocateNew
        byteBuffer = ByteBuffer.allocateDirect(bs.length + 4);
    } else {
        byteBuffer = writeBuffer;
        //recycle
        byteBuffer.clear();
    }

    byteBuffer.putInt(bs.length);
    byteBuffer.put(bs);
    return writeDataFileByBuffer(byteBuffer);
}

private boolean writeDataFileByBuffer(ByteBuffer byteBuffer) {
    byteBuffer.flip();
    for (int retry = 0; retry < MAX_WRITE_RETRY; retry++) {
        try {
            // 循环写入
            while (byteBuffer.hasRemaining()) {
                currFileChannel.write(byteBuffer);
            }
            return true;
        } catch (IOException exx) {
            LOGGER.error("write data file error:" + exx.getMessage());
        }
    }
    LOGGER.error("write dataFile failed,retry more than :" + MAX_WRITE_RETRY);
    return false;
}

writeDataFile 的实现很简单,就是同步写入 FileChannel。接下来看一下最妙的 saveHistory

private boolean saveHistory() throws IOException {
    boolean result;
    try {
        // 找到内存中保存的所有超时的事务,这些事务是需要回滚的,不能清除,但是其他完成的事务是可以清除的,这个保存过程实际上就是将超时事务追加到当前文件的结尾
        result = findTimeoutAndSave();
        // 然后异步关闭文件
        writeDataFileRunnable.putRequest(new CloseFileRequest(currFileChannel, currRaf));
        // 同时,给文件改名为historyFullFileName,替换掉旧的historyFullFile,同一时刻,Seata 只会有 2 个事务存储文件,一个是currentDataFile代表正在使用的文件,一个是historyFullFile,存储了过往的所有可能有用的 Session
        Files.move(currDataFile.toPath(), new File(hisFullFileName).toPath(), StandardCopyOption.REPLACE_EXISTING);
    } catch (IOException exx) {
        LOGGER.error("save history data file error," + exx.getMessage());
        result = false;
    } finally {
        initFile(currFullFileName);
    }
    return result;
}
// 找到所有超时的 Session 存储起来
private boolean findTimeoutAndSave() throws IOException {
    List<GlobalSession> globalSessionsOverMaxTimeout =
        sessionManager.findGlobalSessions(new SessionCondition(MAX_TRX_TIMEOUT_MILLS));
    if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) {
        return true;
    }
    List<byte[]> listBytes = new ArrayList<>();
    int totalSize = 0;
    // 1. find all data and merge
    for (GlobalSession globalSession : globalSessionsOverMaxTimeout) {
        TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD);
        byte[] data = globalWriteStore.encode();
        listBytes.add(data);
        totalSize += data.length + INT_BYTE_SIZE;
        List<BranchSession> branchSessIonsOverMaXTimeout = globalSession.getSortedBranches();
        if (null != branchSessIonsOverMaXTimeout) {
            for (BranchSession branchSession : branchSessIonsOverMaXTimeout) {
                TransactionWriteStore branchWriteStore =
                    new TransactionWriteStore(branchSession, LogOperation.BRANCH_ADD);
                data = branchWriteStore.encode();
                listBytes.add(data);
                totalSize += data.length + INT_BYTE_SIZE;
            }
        }
    }
    // 2. batch write
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(totalSize);
    for (byte[] bytes : listBytes) {
        byteBuffer.putInt(bytes.length);
        byteBuffer.put(bytes);
    }
    if (writeDataFileByBuffer(byteBuffer)) {
        currFileChannel.force(false);
        return true;
    }
    return false;
}

现在我们知道 Seata 同时最多有2个存储文件,一个是 currentDataFile 一个是 historyFullFile,currentDataFile 存储了最新的数据,而 historyFullFile 相较于 currentDataFile,还存储了之前过期的所有 Session。任何时候,如果 TC 宕机,重启时只要先读取 historyFullFile,再读取 currentDataFile 就能恢复所有数据。

替换 historyFullFile 时,因为会将所有超时的 Session 信息先写入 currentDataFile,然后才会将 currentDataFile 改名为 historyFullFile 并替换掉之前的 oldHistoryFullFile,这样所有过期 Session 就被延续下去了,实际上 Session 过期时间和新建 currentDataFile 的时间是一致的,都是 30 分钟,这样再进行 historyFullFile 替换时,之前的 oldHistoryFullFile 实际上只会存在超时 Session 和完成的 Session,所有超时 Session 已经被记录在新的 historyFullFile 中了,而完成的 Session 会在替换时,随着 oldHistoryFullFile 一起被删除。这就是为什么我觉得这个地方的设计十分巧妙。

最后刷盘的过程也很简单。根据配置,如果是同步刷盘会用 Future#get 阻塞等待,否则异步进行,writeDataFileRunnable 内部有一个阻塞队列,会有一个线程循环从中提取任务并执行,应该不难理解吧。

private void flushDisk(long curFileNum, FileChannel currFileChannel) {
    if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
        SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);
        writeDataFileRunnable.putRequest(syncFlushRequest);
        syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);
    } else {
        writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));
    }
}

DB

接下来,我们看一下基于 DB 的实现。

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        logStore.insertGlobalTransactionDO(convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        logStore.updateGlobalTransactionDO(convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        logStore.deleteGlobalTransactionDO(convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        logStore.insertBranchTransactionDO(convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        logStore.updateBranchTransactionDO(convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        logStore.deleteBranchTransactionDO(convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
    return true;
}

基于 DB 的实现相较于基于文件的实现就显得朴实无华,logStore 实际上就是一个 DAO 层的接口,对应了数据的 CRUD,在重启恢复时只不过是按照条件遍历 DB 中的所有数据,进行 Session 恢复。

Lock

大家知道数据库实现隔离级别主要是通过锁来实现的,同样的在分布式事务框架 Seata 中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在 Seata 中可以保证写操作的互斥性,而读的隔离级别一般是读未提交,但是提供了达到读已提交隔离的手段。

Lock 模块也就是 Seata 实现隔离级别的核心模块。在 Lock 模块中提供了一个接口用于管理我们的锁:

public interface LockManager {

    /**
     * Acquire lock boolean.
     *
     * @param branchSession the branch session
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean acquireLock(BranchSession branchSession) throws TransactionException;

    /**
     * Un lock boolean.
     *
     * @param branchSession the branch session
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean releaseLock(BranchSession branchSession) throws TransactionException;

    /**
     * Un lock boolean.
     *
     * @param globalSession the global session
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;

    /**
     * Is lockable boolean.
     *
     * @param xid        the xid
     * @param resourceId the resource id
     * @param lockKey    the lock key
     * @return the boolean
     * @throws TransactionException the transaction exception
     */
    boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;

    /**
     * Clean all locks.
     *
     * @throws TransactionException the transaction exception
     */
    void cleanAllLocks() throws TransactionException;

}
  • acquireLock:用于对我们的 BranchSession 加锁,这里虽然是传的分支事务 Session,实际上是对分支事务操作的数据行加锁,成功返回 true。
  • isLockable:根据事务 ID,资源 ID,锁住的Key来查询是否已经加锁。
  • releaseLock: 释放分支事务的所有锁。
  • releaseGlobalSessionLock: 释放全局事务的所有分支事务的锁。
  • cleanAllLocks:清除所有的锁。

在 Seata 中, LockManager 下层有使用的锁有两种实现, 一种是基于内存的锁(Session 存储模式为 File 时使用), 一种是基于 DB 的(Session 存储模式为 DB 时使用),它们都实现了 Locker 接口:

public interface Locker {

    /**
     * Acquire lock boolean.
     *
     * @param rowLock the row lock
     * @return the boolean
     */
    boolean acquireLock(List<RowLock> rowLock) ;

    /**
     * Un lock boolean.
     *
     * @param rowLock the row lock
     * @return the boolean
     */
    boolean releaseLock(List<RowLock> rowLock);

    /**
     * Is lockable boolean.
     *
     * @param rowLock the row lock
     * @return the boolean
     */
    boolean isLockable(List<RowLock> rowLock);

    /**
     * Clean all locks boolean.
     *
     * @return the boolean
     */
    void cleanAllLocks();
}

我们可以看到, 在 Locker 中将 branchSession 的概念剥离出去了, 只保留了 RowLock 的概念, 责任更加单一, 接下来我们分别看看它的实现类。

MemoryLocker

内存锁的实现全都存在一个锁 Map 中, 它是整个 Locker 的实现核心, 我们先来看一下它的结构:

private static final ConcurrentHashMap<String /* resourceId */,
        ConcurrentHashMap<String /* tableName */,
            ConcurrentHashMap<Integer /* bucketId */,
                ConcurrentHashMap<String /* pk */, Long/* transactionId */>>>>
        LOCK_MAP
        = new ConcurrentHashMap<>();

我们可以看到, 通过这个 Map 将锁的粒度控制的很小, 最外层 Map 的 key 是 resourceId, 也就是对应了一个 RM, 然后第二层 Map 的 key 是表名, 对应了 RM 上操作的一张表, 下一层 Map 的 key 是 BucketID, Seata 根据表主键哈希值进行了分桶, 让冲突的概率降低, 默认有 128 个桶, 最后一层 Map 的 key 才是主键, Value 是持有该主键锁的事务 ID。

明确了锁存储的数据结构后, 再分析加解锁过程就清晰多了:

// MemoryLocker
@Override
public boolean acquireLock(List<RowLock> rowLocks) {
    if (CollectionUtils.isEmpty(rowLocks)) {
        //no lock
        return true;
    }
    String resourceId = branchSession.getResourceId();
    long transactionId = branchSession.getTransactionId();

    ConcurrentHashMap<ConcurrentHashMap<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();
    ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);
    // 确认 RM 对应的 Map 是否已经构建
    if (dbLockMap == null) {
        LOCK_MAP.putIfAbsent(resourceId,
            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>>());
        dbLockMap = LOCK_MAP.get(resourceId);
    }

    for (RowLock lock : rowLocks) {
        String tableName = lock.getTableName();
        String pk = lock.getPk();
        ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>> tableLockMap = dbLockMap.get(tableName);
        // 确认表对应的 Map 是否已经构建好
        if (tableLockMap == null) {
            dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, ConcurrentHashMap<String, Long>>());
            tableLockMap = dbLockMap.get(tableName);
        }
        int bucketId = pk.hashCode() % BUCKET_PER_TABLE;
        ConcurrentHashMap<String, Long> bucketLockMap = tableLockMap.get(bucketId);
        // 确认 bucket map 是否已经构建好
        if (bucketLockMap == null) {
            tableLockMap.putIfAbsent(bucketId, new ConcurrentHashMap<String, Long>());
            bucketLockMap = tableLockMap.get(bucketId);
        }
        // 实际加锁过程
        Long previousLockTransactionId = bucketLockMap.putIfAbsent(pk, transactionId);
        if (previousLockTransactionId == null) {
            //No existing lock, and now locked by myself
            Set<String> keysInHolder = bucketHolder.get(bucketLockMap);
            if (keysInHolder == null) {
                bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());
                keysInHolder = bucketHolder.get(bucketLockMap);
            }
            keysInHolder.add(pk);
        } else if (previousLockTransactionId == transactionId) {
            // Locked by me before
            continue;
        } else {
            LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + previousLockTransactionId);
            try {
                // Release all acquired locks.
                branchSession.unlock();
            } catch (TransactionException e) {
                throw new FrameworkException(e);
            }
            return false;
        }
    }
    return true;
}

我们可以看到, 加锁的过程无非就是确认各级 Map 中是否有自己要的数据, 如果没有就用 putIfAbsent 添加进去, 最后到主键所在的 bucketMap 时, 才是真正加锁并确认的过程:

  1. 使用 putIfAbsent 将自己的 transactionId 填入 bucketLockMap
  2. 如果 previousLockTransactionId 为空, 说明自己获得了锁, 把自己获得的锁记录在 branchSession 中, 方便释放时查找
  3. 如果 previousLockTransactionId 和自己的 transactionId 相同, 说明这个锁之前就被自己持有了, 直接返回即可
  4. 否则, 发生了锁冲突, 释放自己之前获取到的所有锁

其实, 这个实现中原来有一个死锁的 bug, 之前给 bucket Map 的加锁过程, 使用了 Synchronized block, 如果两个分支 Session 要同时锁一个表的相同数据, 并且加锁的顺序不同(BS1: row1, row2, row3; BS2: row3, row2, row1), 就会发生死锁。

导致这个 bug 的原因是Synchronized block 的作用范围有误, 将解锁过程也包在了该代码块中。所以, 当时我就提了 issuepr, 有兴趣的同学可以去看一看。

释放锁的过程就很简单了, 遍历 branchSession 中持有的所有锁, 并依次释放它们。

// MemoryLocker
@Override
public boolean releaseLock(List<RowLock> rowLock) {
    // 取出所有持有的锁
    ConcurrentHashMap<ConcurrentHashMap<String, Long>, Set<String>> lockHolder = branchSession.getLockHolder();
    if (lockHolder == null || lockHolder.size() == 0) {
        return true;
    }
    Iterator<Map.Entry<ConcurrentHashMap<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();
    // 挨个释放锁
    while (it.hasNext()) {
        Map.Entry<ConcurrentHashMap<String, Long>, Set<String>> entry = it.next();
        ConcurrentHashMap<String, Long> bucket = entry.getKey();
        Set<String> keys = entry.getValue();
        for (String key : keys) {
            // remove lock only if it locked by myself
            bucket.remove(key, branchSession.getTransactionId());
        }
    }
    lockHolder.clear();
    return true;
}

这里大家可能会有疑问, 存在内存中的锁, 如果发生了崩溃, 重启的时候锁不就没了么, 其实 Seata 在重启并恢复 Session 的同时, 也会按顺序恢复各个 Session 的锁, 下面只会展示核心代码。

/**
 * io.seata.server.session.SessionHolder#reload
 */
protected static void reload() {
    // ...
    Collection<GlobalSession> reloadedSessions = ROOT_SESSION_MANAGER.allSessions();
    if (reloadedSessions != null && !reloadedSessions.isEmpty()) {
        reloadedSessions.forEach(globalSession -> {
            GlobalStatus globalStatus = globalSession.getStatus();
            switch (globalStatus) {
                case UnKnown:
                case Committed:
                case CommitFailed:
                case Rollbacked:
                case RollbackFailed:
                case TimeoutRollbacked:
                case TimeoutRollbackFailed:
                case Finished:
                    throw new ShouldNeverHappenException("Reloaded Session should NOT be " + globalStatus);
                case AsyncCommitting:
                    try {
                        // 恢复未完成的异步提交过程
                        globalSession.addSessionLifecycleListener(getAsyncCommittingSessionManager());
                        getAsyncCommittingSessionManager().addGlobalSession(globalSession);
                    } catch (TransactionException e) {
                        throw new ShouldNeverHappenException(e);
                    }
                    break;
                default: {
                    ArrayList<BranchSession> branchSessions = globalSession.getSortedBranches();
                    // Lock, 重新加锁
                    branchSessions.forEach(branchSession -> {
                        try {
                            branchSession.lock();
                        } catch (TransactionException e) {
                            throw new ShouldNeverHappenException(e);
                        }
                    });
                    // ...
                }
            }

        });
    }
}

DataBaseLocker

和 SessionManager 的实现相同, DataBaseLocker 的加锁过程实际上就是一个对 DB 增删数据。因为这部分比较简单, 所以我们只展示加锁的最核心内容:

// LockStoreDataBaseDAO
protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
    PreparedStatement ps = null;
    try {
        //insert
        String insertLockSQL = LockStoreSqls.getInsertLockSQL(lockTable, dbType);
        ps = conn.prepareStatement(insertLockSQL);
        ps.setString(1, lockDO.getXid());
        ps.setLong(2, lockDO.getTransactionId());
        ps.setLong(3, lockDO.getBranchId());
        ps.setString(4, lockDO.getResourceId());
        ps.setString(5, lockDO.getTableName());
        ps.setString(6, lockDO.getPk());
        ps.setString(7, lockDO.getRowKey());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        if (ps != null) {
            try {
                ps.close();
            } catch (SQLException e) {
            }
        }
    }
}

Rpc

保证 Seata 高性能的关键之一也是使用了 Netty 作为 RPC 框架,采用默认配置的线程模型如下图所示:
rpc
如果采用默认的基本配置, 那么会有一个 Acceptor 线程用于处理客户端的链接,会有 cpu*2 数量的 NIO-Thread,在这些 NIO-Thread 线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和 TM 注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为 100,最大为 500。

关于 Netty 的使用基础, 我们这里就不详细介绍了, 简单说就是对于每个连接都会绑定上数据的 handler, 它会按照责任链的原则, 顺着 handler 的绑定顺序, 处理数据, 这里简单看下它都绑定了什么 handler:

// Rpc Server 和 Rpc Client
ch.pipeline()
    .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
    .addLast(new ProtocolV1Decoder())
    .addLast(new ProtocolV1Encoder())
    .addList(this);

我们可以看到, 它们都绑定了心跳组件 IdleStateHandler, 然后是编解码器, 最后是 Server(TC) 和 Client(TM RM), 它们会拿到原始的请求和回应数据, 据此来进行业务交互。

前面介绍 Discover 模块时, 我们知道 Server 是将自己注册到注册中心, 然后 Client 订阅更新, 并得到 Server 的列表, 最后通过负载均衡选择一个 Server 进行连接。当连接建立成功后, Server 会保存所有的连接, 在需要进行分支回滚和提交时, 从所有 RM 的连接记录中, 找到对应 RM 的所有连接, 它会首先寻找最原始的 RM 节点, 如果该节点宕机了, 它会找到该 RM 的其他节点, 然后发送分支提交请求。

HA-Cluster

尚未实现

Metrics

统计接口目前的实现也很简单, 就是在内存中计数, 然后支持通过 HTTP 获取统计数据,这部分很简单我就不展示了。

Coordinator Core

在 TC 端, 大部分工作都是响应 TM 的请求, 然后发送提交回滚请求给 RM,下达提交或回滚命令, 这些部分我们会在后面的 AT 模式串讲 和 TCC 模式串讲中介绍, 本节主要看一下在 TC 模块中, 自主进行的一些工作。

当 TC 启动时, 先恢复本机的 Session, 然后启动 RPC Server, 最后注册自己的地址到注册中心, 这些我们前面已经介绍过了, 除此之外, TC 还会启动几个后台线程, 这些线程保证了 TC 的协调工作能够在发生错误时, 最终能顺利完成, 我们来看一下这部分的代码:

/**
 * Init.
 */
public void init() {
    retryRollbacking.scheduleAtFixedRate(() -> {
        try {
            handleRetryRollbacking();
        } catch (Exception e) {
            LOGGER.info("Exception retry rollbacking ... ", e);
        }
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    retryCommitting.scheduleAtFixedRate(() -> {
        try {
            handleRetryCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception retry committing ... ", e);
        }
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    asyncCommitting.scheduleAtFixedRate(() -> {
        try {
            handleAsyncCommitting();
        } catch (Exception e) {
            LOGGER.info("Exception async committing ... ", e);
        }
    }, 0, ASYN_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    timeoutCheck.scheduleAtFixedRate(() -> {
        try {
            timeoutCheck();
        } catch (Exception e) {
            LOGGER.info("Exception timeout checking ... ", e);
        }
    }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    undoLogDelete.scheduleAtFixedRate(() -> {
        try {
            undoLogDelete();
        } catch (Exception e) {
            LOGGER.info("Exception undoLog deleting ... ", e);
        }
    }, UNDOLOG_DELAY_DELETE_PERIOD, UNDOLOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

我们可以看到, 这些后台任务分别是回滚重试, 提交重试, 异步提交, 超时检测, 删除没用的 AT 模式 undo log。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
存储 Dubbo Java
Seata Transaction Manager
前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 Transaction Manager 的实现。
|
8月前
|
Nacos 数据库
分布式事务解决方案Seata
分布式事务解决方案Seata
105 1
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
23天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
158 7
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
73 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
40 6
|
2月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
32 1
|
4月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
8月前
|
存储 关系型数据库 MySQL
基于Seata实现分布式事务
通过以上步骤,你可以使用 Seata 实现分布式事务,确保在微服务架构中的事务一致性。Seata 支持多种语言和框架,能够满足不同业务场景的需求。欢迎关注威哥爱编程,一起学习成长。
189 1
|
5月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
106 2