图解Seata AT模式TM处理流程

简介: 图解Seata AT模式TM处理流程

TM的作用

我们根据源码解读画出了下图,该图示展现了TM在整个Seata AT模式的分布式事务中所起的作用:


88a635dccb134434ab073f66c18a98ed_tplv-k3u1fbpfcp-zoom-in-crop-mark_4536_0_0_0.png从上图中可以看出,TM主要有两个作用:

  1. 开启分布式事务,以拿到XID作为分布式事务开启的标识;一定是从TC拿到XID,不是从调用方传递过来的XID;
  2. 根据所有RM的处理结果来决定是提交分布式事务还是回滚分布式事务;

转换成伪代码如下:

try{
  // 开启分布式事务
  String xid = TM.beginGlobalTransaction();
  // 执行业务逻辑,包含远程rpc调用
  RM1.execute(xid); -------RPC调用--------> RM2.execute(xid);
  // 提交分布式事务
  TM.commitGlobalTransaction(xid);
}catch(Exception e){
  // 回滚分布式事务
  TM.rollbackGlobalTransaction(xid);
}finally{
  // 恢复现场
}
复制代码

源码分解

在之前讲述图解Seata AT模式启动流程中,我们已经知道了TM的处理流程是通过扫描注解@GlobalTransactional来完成对业务逻辑的拦截的。

主要完成这个拦截功能的类是io.seata.spring.annotation.GlobalTransactionalInterceptor,在这个类中,我们主要看invoke方法:

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        // 拿到被拦截的目标类
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        // 获取目标方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        // 判断这个方法是不是Object类中的toString()、equals()等方法
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            // 通过被拦截的方法找出对应的注解GlobalTransactional和GlobalLock
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            // 判断是否开启分布式事务,或者TM是否被降级处理,默认是没有被降级的
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            // 分布式事务可以正常使用
            if (!localDisable) {
                // 如果注解GlobalTransactional存在,那么直接把里面的配置解析成AspectTransactional
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) {
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                    } else {
                        transactional = this.aspectTransactional;
                    }
                    // 调用handleGlobalTransaction处理
                    return handleGlobalTransaction(methodInvocation, transactional);
                } else if (globalLockAnnotation != null) {
                    // 调用handleGlobalLock处理
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        // 如果是Object类中的方法的话,直接调用,不作拦截
        return methodInvocation.proceed();
    }
复制代码

以上代码就做了下面几件事情:

  1. 判断拦截的方法是否是一个合理的方法,像Object类中的toString()、equals()等方法是不应该被拦截的;
  2. 拦截的方法合理的话,那么要确认是否允许开启分布式事务;
  • 如果配置了service.disableGlobalTransaction=true,那么说明不能开启分布式事务;
  • 另一个就是配置了允许TM降级client.tm.degradeCheck=true(默认是false),那么就会开启定时任务不断地与TC通信,如果建立通信失败的次数超过了阈值client.tm.degradeCheckAllowTimes,那么就会触发TM降级,此时无法开启新的分布式事务,降级前开启的分布式事务没有影响;
  1. 可以正常地准备分布式事务了,那么开始收集注解的相关信息;
  • 如果是GlobalTransactional注解,交给handleGlobalTransaction()处理;
  • 如果是GlobalLock注解,交给handleGlobalLock()处理;

需要注意的是,我们从源码当中了解到,原来TM还可以做一个降级的配置。降级后的TM是不会开启新的分布式事务的,这个时候只能保证本地事务的正常进行,只有当TM与TC通信恢复后,降级后的TM会立马恢复,可以重新开启新的分布式事务。

在TM降级期间的需要业务侧自行处理因降级导致的数据脏写和脏读问题。

  • handleGlobalTransaction
    处理被@GlobalTransactional标注的业务逻辑
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable {
        // 默认succeed=true
        boolean succeed = true;
        try {
            // 执行分布式事务处理逻辑
            // 详细内容后面介绍
            return transactionalTemplate.execute(new TransactionalExecutor() {
                // 执行业务逻辑
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
                // 分布式事务名称,没有指定的话,就用【方法名+参数类型】命名
                public String name() {
                    String name = aspectTransactional.getName();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
                // 分布式事务信息,其实就是@GlobalTransactional注解里面拿到的配置
                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = aspectTransactional.getTimeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(aspectTransactional.getPropagation());
                    transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
                    transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            // 发生异常
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                // 已经回滚过了
                case RollbackDone:
                    throw e.getOriginalException();
                // 开启分布式事务失败
                case BeginFailure:
                    // 分布式事务失败
                    succeed = false;
                    // 调用失败处理逻辑
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                // 分布式事务提交失败
                case CommitFailure:
                    // 分布式事务失败
                    succeed = false;
                    // 调用失败处理逻辑
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                // 回滚失败
                case RollbackFailure:
                    // 调用失败处理逻辑
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                // 回滚重试
                case RollbackRetrying:
                    // 调用失败处理器中的回滚重试回调逻辑
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                // 啥也不是,直接抛异常
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            // 如果允许TM降级,那么这次处理完毕后,说明与TC恢复通信,可以解除降级
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }
复制代码
  • 其实上面就一行代码,使用的是模版模式,所以其实真正的重点还是应该进到模版里面去看看具体是怎么处理的。
public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. 拿到整理好的@GlobalTransactional注解里面的配置信息
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 获取当前的分布式事务,如果为null的话,说明这是分布式事务的发起者;如果不为null,说明这是分布式事务的参与者
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();
        // 1.2 获取分布式事务的传播级别,其实就是按照spring的传播级别来一套,区别就是spring事务是本地事务,这是分布式事务,原理都一样
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            // 这个switch里面全都是处理分布式事务传播级别的
            switch (propagation) {
                // 如果不支持分布式事务,如果当前存在事务,那么先挂起当前的分布式事务,再执行业务逻辑
                case NOT_SUPPORTED:
                    // 分布式事务存在,先挂起
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // 执行业务逻辑
                    return business.execute();
                // 如果是每次都要创建一个新的分布式事务,先把当前存在的分布式事务挂起,然后创建一个新分布式事务
                case REQUIRES_NEW:
                    // 如果分布式事务存在,先挂起当前分布式事务,再创建一个新的分布式事务
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // 之所以用break,是为了后面的代码和其他的传播级别一起共用,业务逻辑肯定还是要执行的
                    break;
                // 如果支持分布式事务,如果当前不存在分布式事务,那么直接执行业务逻辑,否则以分布式事务的方式执行业务逻辑
                case SUPPORTS:
                    // 如果不存在分布式事务,直接执行业务逻辑
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // 否则,以分布式事务的方式执行业务逻辑
                    break;
                // 如果有分布式事务,就在当前分布式事务下执行业务逻辑,否则创建一个新的分布式事务执行业务逻辑
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                // 如果不允许有分布式事务,那么一旦发现存在分布式事务,直接抛异常;只有不存在分布式事务的时候才正常执行
                case NEVER:
                    // 存在分布式事务,抛异常
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // 不存在分布式事务,执行业务逻辑
                        return business.execute();
                    }
                // 一定要有分布式事务,分布式事务不存在的话,抛异常;
                case MANDATORY:
                    // 不存在分布式事务,抛异常
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }
            // 上面的传播级别的逻辑处理完毕,下面就是公共的处理逻辑
            // 1.3 如果当前分布式事务没有的话,那么我们就要创建新的分布式事务,此时我们就是分布式事务的发起者,也就是TM本身,否则不能称之为`TM`
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }
            // 开始准备干活的条件
            // 把我们这个方法的全局锁配置放进当前线程中,并且把线程中已有的全局锁的配置取出来
            // 我们在干完自己的活后,会把这个取出来的配置放回去的
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
            try {
                // 2. 如果我们是分布式事务的发起者的话,那么我们会和TC通信,并且拿到一个XID;如果我们不是分布式事务的发起者的话,那么这一步啥也不干
                // 这个XID可以从RootContext中获取
                beginTransaction(txInfo, tx);
                Object rs;
                try {
                    // 执行业务逻辑
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. 发生任何异常,我们准备启动回滚机制
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                // 4. 一切顺利,通知提交分布式事务
                commitTransaction(tx);
                return rs;
            } finally {
                //5. 恢复现场,把之前的配置放回去
                resumeGlobalLockConfig(previousConfig);
                // 触发回调
                triggerAfterCompletion();
                // 清理工作
                cleanUp();
            }
        } finally {
            // 恢复之前挂起的事务
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }
复制代码

根据上面的源码分析,execute方法做了以下几件事情:

  1. 处理分布式事务的传播级别,参照spring的事务传播级别;
  2. 如果是分布式事务的发起者,那么需要与TC通信,并获取XID开启分布式事务;
  3. 如果业务逻辑处理出现异常,说明分布式事务需要准备回滚;如果没有任何异常,那么准备发起分布式事务提交
  4. 分布式事务处理完毕后,准备恢复现场
  • 分布式事务开启:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 回调,默认是空回调
            triggerBeforeBegin();
            // 发起分布式事务
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            // 回调,默认是空回调
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
        }
    }
@Override
    public void begin(int timeout, String name) throws TransactionException {
        // 如果不是分布式事务发起者,那么啥也不做
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        // 如果当前已经处于分布式事务当中,那么抛异常,因为事务发起者不可能事先处于别的分布式事务当中
        String currentXid = RootContext.getXID();
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
        // 发起分布式事务
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        // 把xid绑定到当前线程中
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // 发起分布式事务开启的请求
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        // 获取拿到的xid,表示分布式事务开启成功
        return response.getXid();
    }
复制代码

1.分布式事务的发起其实就是TM向TC请求,获取XID,并把XID绑定到当前线程中

  • 异常回滚:
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        // 如果异常类型和指定的类型一致,那么发起回滚;不一致还是要提交分布式事务
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                // 回滚分布式事务
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // 回滚失败抛异常
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            // 不是指定的异常类型,还是继续提交分布式事务
            commitTransaction(tx);
        }
    }
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        // 执行回调,默认空回调
        triggerBeforeRollback();
        // 回滚
        tx.rollback();
        // 执行回调,默认空回调
        triggerAfterRollback();
        // 就算回滚没问题,照样抛异常,目的应该是告知开发人员此处产生了回滚
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }
@Override
    public void rollback() throws TransactionException {
        // 如果是分布式事务参与者,那么啥也不做,RM的回滚不在这里,这是TM的回滚
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 下面就是一个循环重试发起分布式事务回滚
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 发起回滚的核心代码
                    status = transactionManager.rollback(xid);
                    // 回滚成功跳出循环
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重试失败次数完成才会跳出循环
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            // 如果回滚的分布式事务就是当前的分布式事务,那么从当前线程中解绑XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        // 准备发起请求给TC,回滚指定的分布式事务
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }
复制代码

分布式事务回滚逻辑中有以下几个点:

  1. 触发回滚需要产生的异常和注解中指定的异常一致才会发起回滚,否则还是继续提交;
  2. 回滚是可以设置重试次数的,只有重试都失败了,才会导致回滚失败,否则只要有一次成功,那么回滚就成功;
  3. TM发起的回滚其实只是和TC发起一次分布式事务回滚的通信,并没有数据库的操作;
  • 分布式事务提交:
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 回调,默认空回调
            triggerBeforeCommit();
            // 分布式事务提交
            tx.commit();
            // 回调,默认空回调
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 提交出异常,提交失败
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }
@Override
    public void commit() throws TransactionException {
        // 如果只是分布式事务参与者,那么啥也不干,TM只能有一个,哈哈
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        // 分布式事务提交也是有重试的
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    retry--;
                    // 发起分布式事务提交
                    status = transactionManager.commit(xid);
                    // 提交成功跳出循环
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    // 重试结束,依然失败就抛异常
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            // 如果提交的分布式事务就是当前事务,那么需要清理当前线程中的XID
            if (xid.equals(RootContext.getXID())) {
                suspend();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }
    }
@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        // 发起分布式事务提交请求,这是与TC通信
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }
复制代码
  1. 分布式事务回滚也是可以设置重试次数的;
  2. 分布式事务提交其实也是TM与TC进行通信,告知TC这个XID对应的分布式事务可以提交了;
  • handleGlobalLock
private Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
        // 模版模式实现全局锁
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            // 执行业务逻辑
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
            // 获取全局锁配置
            // 一个是全局锁重试间隔时间
            // 一个是全局锁重试次数
            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });
    }
public Object execute(GlobalLockExecutor executor) throws Throwable {
        // 判断当前是否有全局锁
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        // 如果没有全局锁,那么在当前线程中设置需要全局锁标识
        if (!alreadyInGlobalLock) {
            RootContext.bindGlobalLockFlag();
        }
        // 把全局锁的配置设置进当前线程,并把线程中已有的全局锁配置拿出来,后面恢复现场需要用
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
        try {
            // 执行业务逻辑
            return executor.execute();
        } finally {
            // 清除线程中的全局锁标记
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }
            // 恢复现场
            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }
复制代码

其实真正的全局锁逻辑并不在TM当中,TM只是负责根据@GlobalLock注解把相应的全局锁标记绑定到线程中,真正负责处理全局锁的还是底层的RM;

小结

至此我们已经把TM的所有工作都解读完毕了,下面来做一个小结:

1.TM主要针对两个注解GlobalTransactional和GlobalLock来实现处理逻辑,原理都是基于Aop和反射;处理逻辑里面涉及到TM降级的一个情况,这是一个值得注意的点

2.处理GlobalTransactional主要分两步:

  • 开启分布式事务,需要与TC交互,存在rpc开销;
  • 根据RM的处理情况决定是提交分布式事务还是回滚分布式事务,也是需要与TC交互,存在rpc开销;在提交或回滚分布式事务中,还可以设置重试次数;

3.处理GlobalLock,主要就是在当前线程中设置一个需要检查全局锁的标记,让底层的RM去做全局锁的检测动作;


相关文章
|
20天前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
36 5
|
26天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
37 3
|
6月前
|
Java 微服务 Spring
Seata 客户端需要同时启动 TM 和 RM 吗?
Seata 客户端需要同时启动 TM 和 RM 吗?
|
6月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
238 7
|
7月前
|
NoSQL Java 数据库
Seata常见问题之xa模式下插入一条数据再更新这条数据会报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
206 2
|
7月前
|
监控 API 数据库
Seata常见问题之Seata AT的设计不支持使用临时表如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
7月前
|
Java 关系型数据库 微服务
Seata常见问题之项目一直启动不成功如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
591 0
|
29天前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
16天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
115 7
|
28天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
49 6