分布式事务Seata源码解析九:分支事务如何注册到全局事务

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 分布式事务Seata源码解析九:分支事务如何注册到全局事务

@[TOC]

一、前言

更多内容见Seata专栏:https://blog.csdn.net/saintmm/category_11953405.html

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
  13. 分布式事务Seata源码解析八:本地事务执行流程(AT模式下)

Seata最核心的全局事务执行流程,上文我们聊了本地事务是如何执行的?在本地事务执行的过程中涉及到分支事务如何注册到全局事务、undo log的构建,本文我们接着聊分支事务如何注册到全局事务。

二、RM中分支事务注册入口

在上一文(分布式事务Seata源码解析八:本地事务执行流程(AT模式下))中,提到ConnectionProxy#processGlobalTransactionCommit()最终处理本地事务的提交。

在这里插入图片描述

其中register()方法向远程的TC中注册分支事务:

private void register() throws TransactionException {
    if (!context.hasUndoLog() || !context.hasLockKey()) {
        return;
    }

    // 分支事务注册:将事务类型AT、资源ID(资源在前面的流程已经注册过了)、事务xid、全局锁keys作为分支事务信息注册到seata server
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
    context.setBranchId(branchId);
}

注册分支事务、获取分支事务ID的入口流程如下:

  1. DefaultResourceManager.get() 获取单例形式的资源管理器DefaultResourceManager,通过其注册分支事务;
  2. 再根据分支类型(AT、TCC、XA、SAGA)获取相应类型的ResourceManager;
    在这里插入图片描述

因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种ResourceManager:

在这里插入图片描述

这四种ResourceManager都继承了AbstractResourceManager,并且都没有重写AbstractResourceManagerbranchRegister()方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractResourceManagerbranchRegister()方法中;而分支事务的提交和回滚方式却各不相同。

@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    try {
        BranchRegisterRequest request = new BranchRegisterRequest();
        // xid是全局事务ID
        request.setXid(xid);
        // 分布式事务要更新数据的全局锁keys
        request.setLockKey(lockKeys);
        // 分支事务对应的资源ID
        request.setResourceId(resourceId);
        // 分支事务类型
        request.setBranchType(branchType);
        // 引用的数据
        request.setApplicationData(applicationData);

        // 将请求通过RmNettyRemotingClient发送到seata-server
        BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
        }
        return response.getBranchId();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    } catch (RuntimeException rex) {
        throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
    }
}

方法中构建一个BranchRegisterRequest,通过netty将请求发送到TC进行分支事务的注册;

三、TC中处理分支事务注册

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:
在这里插入图片描述

1、BranchRegisterRequest

又由于RM发送开启事务请求时的RPCMessage的body为BranchRegisterRequest,所以进入到:

在这里插入图片描述

又由于在DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

在这里插入图片描述

所以会进入到:

在这里插入图片描述
DefaultCore封装了AT、TCC、Saga、XA分布式事务模式的具体实现类。

2、DefaultCore执行分支事务的注册

DefaultCore#branchRegister()方法中会首先根据分布式事务模式获取到相应的AbstractCore,这里的处理方式和上面获取分布式事务模式对应的ResourceManager的处理方式一样;

在这里插入图片描述

因为存在四种分布式事务的模式(AT、TCC、XA、SAGA),所以此处也正好对应四种AbstractCore:

在这里插入图片描述

这四种Core都继承了AbstractCore,并且都没有重写AbstractResourceManagerbranchRegister()方法,所以无论是哪种全局事务模式,分支事务注册到全局事务的方式都一样,都体现在AbstractCorebranchRegister()方法中;

@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    // 根据xid从DB中找到全局事务会话(会做一个DB查询操作)
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
        // 检查全局事务会话的状态
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 分支事务会话  根据全局事务开启一个分支事务
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        // 将branchID放到ThreadLocal中
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // 给分支事务加全局锁
        branchSessionLock(globalSession, branchSession);
        try {
            // 将分支事务会话添加到全局事务会话
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                    globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
        }
        return branchSession.getBranchId();
    });
}

方法做的事情如下:

  1. 根据xid从DB / file / redis (由TC的store.mode配置决定)中找到全局事务会话(这里会做一个DB查询操作),并且断言globalSession不许为空;
  2. 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
  3. 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常BranchTransactionException
  4. 将分支事务会话添加到全局事务会话,持久化分支事务会话;

1)获取全局事务会话GlobalSession

在这里插入图片描述
SessionHolder是会话管理者,其中包括四个会话管理器:

// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

在这里插入图片描述
用于获取全局事务会话的管理器为:ROOT_SESSION_MANAGER;在初始化SessionHolder时,会根据store.mode对其进行赋值:

在这里插入图片描述

例如博主的TC采用的store.mode是DB,所以找到:DataBaseSessionManager;

在这里插入图片描述

DataBaseSessionManager#findGlobalSession()方法如下:

在这里插入图片描述

==注意:在AbstractCore#branchRegister()方法中查询全局事务会话时,withBranchSessions = false,所以不会把分支事务查出来。==

LogStore

LogStore是对全局事务、分支事务做DB操作的DAO层;LogStore接口只有一个实现LogStoreDataBaseDAO,其queryGlobalTransactionDO()方法内容很简单,就直接使用JDBC查表:

@Override
public GlobalTransactionDO queryGlobalTransactionDO(String xid) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(1, xid);
        rs = ps.executeQuery();
        if (rs.next()) {
            return convertGlobalTransactionDO(rs);
        } else {
            return null;
        }
    } catch (SQLException e) {
        throw new DataAccessException(e);
    } finally {
        IOUtil.close(rs, ps, conn);
    }
}

本文后面相似的DB操作不再赘述。

2)校验全局事务会话GlobalSession

全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;

protected void globalSessionStatusCheck(GlobalSession globalSession) throws GlobalTransactionException {
    if (!globalSession.isActive()) {
        throw new GlobalTransactionException(GlobalTransactionNotActive, String.format(
            "Could not register branch into global session xid = %s status = %s, cause by globalSession not active",
            globalSession.getXid(), globalSession.getStatus()));
    }
    if (globalSession.getStatus() != GlobalStatus.Begin) {
        throw new GlobalTransactionException(GlobalTransactionStatusInvalid, String
                .format("Could not register branch into global session xid = %s status = %s while expecting %s",
                        globalSession.getXid(), globalSession.getStatus(), GlobalStatus.Begin));
    }
}

3)分支事务会话BranchSession加全局锁

在这里插入图片描述

1> SessionHolder首先构建一个BranchSession

BranchSession的内容包括:全局事务xid、全局事务id、根据雪花算法生成的分支事务id、全局事务模式、RM资源Id、分支事务要加的全局锁keys、RM客户端ID、RM应用名;

public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId,
        String applicationData, String lockKeys, String clientId) {
    BranchSession branchSession = new BranchSession();

    branchSession.setXid(globalSession.getXid());
    branchSession.setTransactionId(globalSession.getTransactionId());
    branchSession.setBranchId(UUIDGenerator.generateUUID());
    branchSession.setBranchType(branchType);
    branchSession.setResourceId(resourceId);
    branchSession.setLockKey(lockKeys);
    branchSession.setClientId(clientId);
    branchSession.setApplicationData(applicationData);

    return branchSession;
}

2> branchID放入ThreadLocal

MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));

3> *给分支事务加全局锁

在这里插入图片描述

AbstractCore中的branchSessionLock()方法没有具体的实现,并且在Seata中只有AT模式有全局锁的概念,因此只需要看ATCore的branchSessionLock()方法即可;

在这里插入图片描述

默认情况下seata在给分支事务加全局锁的同时,会检查全局锁是否冲突;

在这里插入图片描述

LockStoreDataBaseDAO#acquireLock()获取全局锁

LockStoreDataBaseDAO#acquireLock()方法如下:

@Override
public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
    Connection conn = null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    // 已经存在的行锁key集合
    Set<String> dbExistedRowKeys = new HashSet<>();
    boolean originalAutoCommit = true;
    if (lockDOs.size() > 1) {
        lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
    }
    try {
        // 从全局锁数据源里获取到一个连接
        conn = lockStoreDataSource.getConnection();
        // 把自动提交事务关闭
        if (originalAutoCommit = conn.getAutoCommit()) {
            conn.setAutoCommit(false);
        }
        List<LockDO> unrepeatedLockDOs = lockDOs;

        //check lock
        // 是否跳过锁检查
        if (!skipCheckLock) {

            boolean canLock = true;
            //query,针对全局锁表查询某个数据
            String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
            ps = conn.prepareStatement(checkLockSQL);
            for (int i = 0; i < lockDOs.size(); i++) {
                ps.setString(i + 1, lockDOs.get(i).getRowKey());
            }
            rs = ps.executeQuery();
            // 获取到当前要加全局锁的事务xid
            String currentXID = lockDOs.get(0).getXid();
            boolean failFast = false;
            // 查询结果为空时,说明没有事务加全局锁
            while (rs.next()) {
                String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                // 如果加全局锁的是其他的全局事务xid
                if (!StringUtils.equals(dbXID, currentXID)) {
                    if (LOGGER.isInfoEnabled()) {
                        String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                        String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                        long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                        LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
                    }
                    if (!autoCommit) {
                        int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
                        if (status == LockStatus.Rollbacking.getCode()) {
                            failFast = true;
                        }
                    }
                    canLock = false;
                    break;
                }

                dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
            }

            // 不可以加全局锁,全局锁已经被其他事务占用
            if (!canLock) {
                conn.rollback();
                if (failFast) {
                    throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
                }
                return false;
            }
            // If the lock has been exists in db, remove it from the lockDOs
            if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                        .collect(Collectors.toList());
            }
            if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                conn.rollback();
                return true;
            }
        }

        // lock
        if (unrepeatedLockDOs.size() == 1) {
            LockDO lockDO = unrepeatedLockDOs.get(0);
            // 加全局锁
            if (!doAcquireLock(conn, lockDO)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                }
                conn.rollback();
                return false;
            }
        } else {
            // 批量加全局锁
            if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                        unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                }
                conn.rollback();
                return false;
            }
        }
        conn.commit();
        return true;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(rs, ps);
        if (conn != null) {
            try {
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
                conn.close();
            } catch (SQLException e) {
            }
        }
    }
}

增加全局行锁、检查全局锁冲突的逻辑如下:

  1. 先对要加的全局行锁去重,然后关闭数据库连接的自动提交;
  2. 如果跳过了全局锁冲突检查,则直接持久化全局行锁,然后提交全局锁数据持久化事务;
  3. 如果需要进行全局锁冲突检查:

    1. 首先根据分支事务传入的全局行锁构建查询全局锁的SQL;
      在这里插入图片描述

      • SQL模板(要上几个行锁,in后面就有几个?)

        select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified,status from lock_table where row_key in ( ? ) order by status desc 
      • 行锁的key值(数据库连接URL + 表明 + 主键id)

        jdbc:mysql://127.0.0.1:3306/seata_stock^^^stock_tbl^^^1
    2. 如果根据查询全局行锁SQL没有从DB中查出记录,说明没有其他事务加当前分支事务所需要的全局行锁;

      则直接持久化全局行锁,然后提交全局锁数据持久化事务;

    3. 如果根据查询全局行锁SQL从DB中查出了记录,并且加全局锁的全局事务xid不是当前全局事务的,则说明全局锁已经被其他全局事务占用;

      进而回滚当前提交全局锁数据持久化事务,返回false,表示加全局锁失败;

      方法返回到ATCore#branchSessionLock()方法中,如果加全局锁失败,则直接抛出异常BranchTransactionException

所谓的加全局锁操作,其实就是针对每一行记录 持久化一条行锁记录到lock_table表中:

protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
    PreparedStatement ps = null;
    try {
        //insert
        String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);
        ps = conn.prepareStatement(insertLockSQL);
        // 全局事务xid
        ps.setString(1, lockDO.getXid());
        ps.setLong(2, lockDO.getTransactionId());
        // 分支事务ID
        ps.setLong(3, lockDO.getBranchId());
        ps.setString(4, lockDO.getResourceId());
        ps.setString(5, lockDO.getTableName());
        // 主键
        ps.setString(6, lockDO.getPk());
        // rowKey
        ps.setString(7, lockDO.getRowKey());
        // 锁状态:Locked(已加锁)
        ps.setInt(8, LockStatus.Locked.getCode());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps);
    }
}

4)分支事务添加到全局事务

在这里插入图片描述

1> 持久化分支事务

将分支事务注册到全局事务之后,会触发Session生命周期监听器SessionLifecycleListeneronAddBranch()事件;

// AbstractSessionManager 将 分支事务会话持久化到DB中
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    lifecycleListener.onAddBranch(this, branchSession);
}

在此处,SessionLifecycleListener只有一个实现AbstractSessionManager:

在这里插入图片描述
DataBaseTransactionStoreManager是store.mode为db时的事务存储管理器,其writeSession()方法负责持久化全局事务、分支事务;

@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
}

前面也提到过LogStore可以看做是封装了JDBC、操作DB的工具类 / DAO层,其只有一个实现LogStoreDataBaseDAO;

@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        int index = 1;
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(index++, globalTransactionDO.getXid());
        ps.setLong(index++, globalTransactionDO.getTransactionId());
        ps.setInt(index++, globalTransactionDO.getStatus());
        ps.setString(index++, globalTransactionDO.getApplicationId());
        ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ?
            transactionName.substring(0, transactionNameColumnSize) :
        transactionName;
        ps.setString(index++, transactionName);
        ps.setInt(index++, globalTransactionDO.getTimeout());
        ps.setLong(index++, globalTransactionDO.getBeginTime());
        ps.setString(index++, globalTransactionDO.getApplicationData());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
}

这里只是单纯的利用JDBC对数据做持久化,将数据持久化到branch_table

2> JVM层面分支事务添加到全局事务

在这里插入图片描述

四、总结

无论是AT、TCC、XA、SAGA哪种分布式事务模式,分支事务注册到全局事务的方式都一样;

TC中对RM注册分支事务到全局事务的处理逻辑为:

  1. 首先根据xid从DB中找到全局事务会话(这里会做一个DB查询操作);
  2. 校验全局事务会话,全局事务会话必须是存活的,并且状态必须为:GlobalStatus.Begin;
  3. 构建一个分支事务会话BranchSession;给分支事务加全局锁,出现锁冲突则直接报错,抛出异常BranchTransactionException
  4. 将分支事务会话添加到全局事务会话,持久化分支事务会话;

TC在分支事务注册的同时,会同时增加全局行锁、检查全局锁冲突。

相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
107 2
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
135 3
|
28天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
213 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
4天前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
28 7
|
25天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
25天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
25天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
25天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
1天前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
7 0
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
65 12

推荐镜像

更多