分布式事务Seata源码解析九:分支事务如何注册到全局事务

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 分布式事务Seata源码解析九:分支事务如何注册到全局事务

@[TOC]

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)

Seata最核心的全局事务执行流程,上文我们聊了本地事务是如何执行的?在本地事务执行的过程中涉及到分支事务如何注册到全局事务、undo log的构建,本文我们接着聊分支事务如何注册到全局事务。

二、RM中分支事务注册入口

在上一文(分布式事务Seata源码解析八:本地事务执行流程(AT模式下))中,提到ConnectionProxy#processGlobalTransactionCommit()最终处理本地事务的提交。

在这里插入图片描述

其中register()方法向远程的TC中注册分支事务:

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }

    // 分支事务注册:将事务类型AT、资源ID(资源在前面的流程已经注册过了)、事务xid、全局锁keys作为分支事务信息注册到seata server
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
    context.setBranchId(branchId);
}

注册分支事务、获取分支事务ID的入口流程如下:

  1. DefaultResourceManager.get() 获取单例形式的资源管理器DefaultResourceManager,通过其注册分支事务;
  2. 再根据分支类型(AT、TCC、XA、SAGA)获取相应类型的ResourceManager;
    在这里插入图片描述

因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种ResourceManager:

在这里插入图片描述

这四种ResourceManager都继承了AbstractResourceManager,并且都没有重写AbstractResourceManagerbranchRegister()方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractResourceManagerbranchRegister()方法中;而分支事务的提交和回滚方式却各不相同。

@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    try {
        BranchRegisterRequest request = new BranchRegisterRequest();
        // xid是全局事务ID
        request.setXid(xid);
        // 分布式事务要更新数据的全局锁keys
        request.setLockKey(lockKeys);
        // 分支事务对应的资源ID
        request.setResourceId(resourceId);
        // 分支事务类型
        request.setBranchType(branchType);
        // 引用的数据
        request.setApplicationData(applicationData);

        // 将请求通过RmNettyRemotingClient发送到seata-server
        BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
        }
        return response.getBranchId();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    } catch (RuntimeException rex) {
        throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
    }
}

方法中构建一个BranchRegisterRequest,通过netty将请求发送到TC进行分支事务的注册;

三、TC中处理分支事务注册

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:
在这里插入图片描述

1、BranchRegisterRequest

又由于RM发送开启事务请求时的RPCMessage的body为BranchRegisterRequest,所以进入到:

在这里插入图片描述

又由于在DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

在这里插入图片描述

所以会进入到:

在这里插入图片描述
DefaultCore封装了AT、TCC、Saga、XA分布式事务模式的具体实现类。

2、DefaultCore执行分支事务的注册

DefaultCore#branchRegister()方法中会首先根据分布式事务模式获取到相应的AbstractCore,这里的处理方式和上面获取分布式事务模式对应的ResourceManager的处理方式一样;

在这里插入图片描述

因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种AbstractCore:

在这里插入图片描述

这四种Core都继承了AbstractCore,并且都没有重写AbstractResourceManagerbranchRegister()方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractCorebranchRegister()方法中;

@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    // 根据xid从DB中找到全局事务会话(会做一个DB查询操作)
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
        // 检查全局事务会话的状态
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 分支事务会话  根据全局事务开启一个分支事务
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        // 将branchID放到ThreadLocal中
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // 给分支事务加全局锁
        branchSessionLock(globalSession, branchSession);
        try {
            // 将分支事务会话添加到全局事务会话
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                    globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
        }
        return branchSession.getBranchId();
    });
}

方法做的事情如下:

  1. 根据xid从DB / file / redis (由TC的store.mode配置决定)中找到全局事务会话(这里会做一个DB查询操作),并且断言globalSession不许为空;
  2. 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
  3. 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常BranchTransactionException
  4. 将分支事务会话添加到全局事务会话,持久化分支事务会话;

1)获取全局事务会话GlobalSession

在这里插入图片描述
SessionHolder是会话管理者,其中包括四个会话管理器:

// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

在这里插入图片描述
用于获取全局事务会话的管理器为:ROOT_SESSION_MANAGER;在初始化SessionHolder时,会根据store.mode对其进行赋值:

在这里插入图片描述

例如博主的TC采用的store.mode是DB,所以找到:DataBaseSessionManager;

在这里插入图片描述

DataBaseSessionManager#findGlobalSession()方法如下:

在这里插入图片描述

==注意:在AbstractCore#branchRegister()方法中查询全局事务会话时,withBranchSessions = false,所以不会把分支事务查出来。==

LogStore

LogStore是对全局事务、分支事务做DB操作的DAO层;LogStore接口只有一个实现LogStoreDataBaseDAO,其queryGlobalTransactionDO()方法内容很简单,就直接使用JDBC查表:

@Override
public GlobalTransactionDO queryGlobalTransactionDO(String xid) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(1, xid);
        rs = ps.executeQuery();
        if (rs.next()) {
            return convertGlobalTransactionDO(rs);
        } else {
            return null;
        }
    } catch (SQLException e) {
        throw new DataAccessException(e);
    } finally {
        IOUtil.close(rs, ps, conn);
    }
}

本文后面相似的DB操作不再赘述。

2)校验全局事务会话GlobalSession

全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;

protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {
    if (!globalSession.isActive()) {
        throw new GlobalTransactionException(GlobalTransactionNotActive, String.format(
            "Could not register branch into global session xid = %s status = %s, cause by globalSession not active",
            globalSession.getXid(), globalSession.getStatus()));
    }
    if (globalSession.getStatus() != GlobalStatus.Begin) {
        throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String
                .format("Could not register branch into global session xid = %s status = %s while expecting %s",
                        globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));
    }
}

3)分支事务会话BranchSession加全局锁

在这里插入图片描述

1> SessionHolder首先构建一个BranchSession

BranchSession的内容包括:全局事务xid、全局事务id、根据雪花算法生成的分支事务id、全局事务模式、RM资源Id、分支事务要加的全局锁keys、RM客户端ID、RM应用名;

public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,
        String applicationData, String lockKeys, String clientId) {
    BranchSession branchSession = new BranchSession();

    branchSession.setXid(globalSession.getXid());
    branchSession.setTransactionId(globalSession.getTransactionId());
    branchSession.setBranchId(UUIDGenerator.generateUUID());
    branchSession.setBranchType(branchType);
    branchSession.setResourceId(resourceId);
    branchSession.setLockKey(lockKeys);
    branchSession.setClientId(clientId);
    branchSession.setApplicationData(applicationData);

    return branchSession;
}

2> branchID放入ThreadLocal

MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));

3> *给分支事务加全局锁

在这里插入图片描述

AbstractCore中的branchSessionLock()方法没有具体的实现,并且在Seata中只有AT模式有全局锁的概念,因此只需要看ATCore的branchSessionLock()方法即可;

在这里插入图片描述

默认情况下seata在给分支事务加全局锁的同时,会检查全局锁是否冲突;

在这里插入图片描述

LockStoreDataBaseDAO#acquireLock()获取全局锁

LockStoreDataBaseDAO#acquireLock()方法如下:

@Override
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    // 已经存在的行锁key集合
    Set<String> dbExistedRowKeys = new HashSet<>();
    boolean originalAutoCommit = true;
    if (lockDOs.size() > 1) {
        lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
    }
    try {
        // 从全局锁数据源里获取到一个连接
        conn = lockStoreDataSource.getConnection();
        // 把自动提交事务关闭
        if (originalAutoCommit = conn.getAutoCommit()) {
            conn.setAutoCommit(false);
        }
        List<LockDO> unrepeatedLockDOs = lockDOs;

        //check lock
        // 是否跳过锁检查
        if (!skipCheckLock) {

            boolean canLock = true;
            //query,针对全局锁表查询某个数据
            String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
            ps = conn.prepareStatement(checkLockSQL);
            for (int i = 0; i < lockDOs.size(); i++) {
                ps.setString(i + 1, lockDOs.get(i).getRowKey());
            }
            rs = ps.executeQuery();
            // 获取到当前要加全局锁的事务xid
            String currentXID = lockDOs.get(0).getXid();
            boolean failFast = false;
            // 查询结果为空时,说明没有事务加全局锁
            while (rs.next()) {
                String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                // 如果加全局锁的是其他的全局事务xid
                if (!StringUtils.equals(dbXID, currentXID)) {
                    if (LOGGER.isInfoEnabled()) {
                        String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                        String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                        long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                        LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                    }
                    if (!autoCommit) {
                        int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                        if (status == LockStatus.Rollbacking.getCode()) {
                            failFast = true;
                        }
                    }
                    canLock = false;
                    break;
                }

                dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
            }

            // 不可以加全局锁,全局锁已经被其他事务占用
            if (!canLock) {
                conn.rollback();
                if (failFast) {
                    throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                }
                return false;
            }
            // If the lock has been exists in db, remove it from the lockDOs
            if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                        .collect(Collectors.toList());
            }
            if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                conn.rollback();
                return true;
            }
        }

        // lock
        if (unrepeatedLockDOs.size() == 1) {
            LockDO lockDO = unrepeatedLockDOs.get(0);
            // 加全局锁
            if (!doAcquireLock(conn, lockDO)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                }
                conn.rollback();
                return false;
            }
        } else {
            // 批量加全局锁
            if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                        unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                }
                conn.rollback();
                return false;
            }
        }
        conn.commit();
        return true;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(rs, ps);
        if (conn != null) {
            try {
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
                conn.close();
            } catch (SQLException e) {
            }
        }
    }
}

增加全局行锁、检查全局锁冲突的逻辑如下:

  1. 先对要加的全局行锁去重,然后关闭数据库连接的自动提交;
  2. 如果跳过了全局锁冲突检查,则直接持久化全局行锁,然后提交全局锁数据持久化事务;
  3. 如果需要进行全局锁冲突检查:

    1. 首先根据分支事务传入的全局行锁构建查询全局锁的SQL;
      在这里插入图片描述

      • SQL模板(要上几个行锁,in后面就有几个?)

        select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified,status from lock_table where row_key in ( ? ) order by status desc 
      • 行锁的key值(数据库连接URL + 表明 + 主键id)

        jdbc:mysql://127.0.0.1:3306/seata_stock^^^stock_tbl^^^1
    2. 如果根据查询全局行锁SQL没有从DB中查出记录,说明没有其他事务加当前分支事务所需要的全局行锁;

      则直接持久化全局行锁,然后提交全局锁数据持久化事务;

    3. 如果根据查询全局行锁SQL从DB中查出了记录,并且加全局锁的全局事务xid不是当前全局事务的,则说明全局锁已经被其他全局事务占用;

      进而回滚当前提交全局锁数据持久化事务,返回false,表示加全局锁失败;

      方法返回到ATCore#branchSessionLock()方法中,如果加全局锁失败,则直接抛出异常BranchTransactionException

所谓的加全局锁操作,其实就是针对每一行记录 持久化一条行锁记录到lock_table表中:

protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
    PreparedStatement ps = null;
    try {
        //insert
        String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);
        ps = conn.prepareStatement(insertLockSQL);
        // 全局事务xid
        ps.setString(1, lockDO.getXid());
        ps.setLong(2, lockDO.getTransactionId());
        // 分支事务ID
        ps.setLong(3, lockDO.getBranchId());
        ps.setString(4, lockDO.getResourceId());
        ps.setString(5, lockDO.getTableName());
        // 主键
        ps.setString(6, lockDO.getPk());
        // rowKey
        ps.setString(7, lockDO.getRowKey());
        // 锁状态:Locked(已加锁)
        ps.setInt(8, LockStatus.Locked.getCode());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps);
    }
}

4)分支事务添加到全局事务

在这里插入图片描述

1> 持久化分支事务

将分支事务注册到全局事务之后,会触发Session生命周期监听器SessionLifecycleListeneronAddBranch()事件;

// AbstractSessionManager 将 分支事务会话持久化到DB中
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    lifecycleListener.onAddBranch(this, branchSession);
}

在此处,SessionLifecycleListener只有一个实现AbstractSessionManager:

在这里插入图片描述
DataBaseTransactionStoreManager是store.mode为db时的事务存储管理器,其writeSession()方法负责持久化全局事务、分支事务;

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
}

前面也提到过LogStore可以看做是封装了JDBC、操作DB的工具类 / DAO层,其只有一个实现LogStoreDataBaseDAO;

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        int index = 1;
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(index++, globalTransactionDO.getXid());
        ps.setLong(index++, globalTransactionDO.getTransactionId());
        ps.setInt(index++, globalTransactionDO.getStatus());
        ps.setString(index++, globalTransactionDO.getApplicationId());
        ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ?
            transactionName.substring(0, transactionNameColumnSize) :
        transactionName;
        ps.setString(index++, transactionName);
        ps.setInt(index++, globalTransactionDO.getTimeout());
        ps.setLong(index++, globalTransactionDO.getBeginTime());
        ps.setString(index++, globalTransactionDO.getApplicationData());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
}

这里只是单纯的利用JDBC对数据做持久化,将数据持久化到branch_table

2> JVM层面分支事务添加到全局事务

在这里插入图片描述

四、总结

无论是AT、TCC、XA、SAGA哪种分布式事务模式,分支事务注册到全局事务的方式都一样;

TC中对RM注册分支事务到全局事务的处理逻辑为:

  1. 首先根据xid从DB中找到全局事务会话(这里会做一个DB查询操作);
  2. 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
  3. 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常BranchTransactionException
  4. 将分支事务会话添加到全局事务会话,持久化分支事务会话;

TC在分支事务注册的同时,会同时增加全局行锁、检查全局锁冲突。

相关文章
|
2月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
3天前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
3月前
|
NoSQL Java Nacos
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
97 3
|
1月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
1月前
Saga模式在分布式系统中如何保证事务的隔离性
Saga模式在分布式系统中如何保证事务的隔离性
|
2月前
|
存储 SQL 关系型数据库
深入解析MySQL事务机制和锁机制
深入解析MySQL事务机制和锁机制
|
2月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
78 2
|
2月前
|
Java 关系型数据库 MySQL
(二十七)舞动手指速写一个Seata-XA框架解决棘手的分布式事务问题
相信大家对于事务问题都不陌生,在之前《MySQL事务篇》中曾详解过MySQL的事务机制,在传统的单库环境下开发,咱们可依赖于MySQL所提供的事务机制,来确保单个事务内的一组操作,要么全部执行成功,要么全部执行失败。
|
2月前
|
消息中间件 监控 RocketMQ
分布式事务实现方案:一文详解RocketMQ事务消息
分布式事务实现方案:一文详解RocketMQ事务消息
|
2月前
|
Java Nacos Docker
"揭秘!Docker部署Seata遇上Nacos,注册成功却报错?这些坑你不得不防!一网打尽解决秘籍,让你的分布式事务稳如老狗!"
【8月更文挑战第15天】在微服务架构中,Nacos搭配Seata确保数据一致性时,Docker部署Seata后可能出现客户端连接错误,如“can not connect to services-server”。此问题多由网络配置不当、配置文件错误或版本不兼容引起。解决策略包括:调整Docker网络设置确保可达性;检查并修正`file.conf`和`registry.conf`中的Nacos地址和端口;验证Seata与Nacos版本兼容性;修改配置后重启服务;参考官方文档和最佳实践进行配置。通过这些步骤,能有效排除故障,保障服务稳定运行。
158 0

热门文章

最新文章

推荐镜像

更多