Seata TCC 分支事务

简介: 前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 TCC 模式分支事务的实现。

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 TCC 模式分支事务的实现,其他 Seata 相关文章均收录于 <Seata系列文章>中。

TCC 模式

先简单介绍一个 Seata 中 TCC 的使用方式, 然后我们在顺着它的使用方式, 一点点深入其实现方案。

在 Seata TCC 模式中, 每个 RM 都需要将 TCC 接口以 RPC 的形式暴露出去, 同时向 TC 中注册, 告诉 TC 自己是某一 TCC 接口的提供方, 这样如果发生提交或者回滚时, TC 就知道该去找谁了。然后 TM 在进行 TCC 调用之前先去 TC 注册分支事务,告诉 TC 这个分支事务用的是哪个 TCC 接口,然后才通过 RPC 调用 TCC 接口的 try 方法,当发生全局事务提交或者回滚时, TC 会直接通知该 TCC 接口的提供方进行分支处理。

public interface TccActionTwo {

    /**
     * Prepare boolean.
     *
     * @param actionContext the action context
     * @param b             the b
     * @param list          the list
     * @return the boolean
     */
    @TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b,
                           @BusinessActionContextParameter(paramName = "c",index = 1) List list);

    /**
     * Commit boolean.
     *
     * @param actionContext the action context
     * @return the boolean
     */
    public boolean commit(BusinessActionContext actionContext);

    /**
     * Rollback boolean.
     *
     * @param actionContext the action context
     * @return the boolean
     */
    public boolean rollback(BusinessActionContext actionContext);

}

上面的代码就是一个 TCC 接口的样例, 通过 TwoPhaseBusinessAction 注解,接口可以告诉 Seata 哪个是 prepare 函数, 该注解中标识了该 TCC 的资源名, 回滚和提交函数名, 在 prepare 函数的参数中, 我们可以看到 BusinessActionContextParameter 注解, 它标识了哪些参数在回滚或提交时也需要用到, 因为 TCC 模式中 RM 是不在本地存储参数信息的, 这些数据都存在 TC 中, 所以 TM 在进行 RPC 调用前, 会根据该注解将回滚或提交时需要用到的参数存储在 TC 中。如果发生回滚或者提交, TC 要把调用 prepare 时使用到的的这些参数存在 Context 里, 发送给 RM, 这样 RM 就可以用到这些参数。

知道了 TCC 的工作流程后, 就要深入 TCC 模式的实现了, 我们就得回到最初 AT 模式 和 TCC 模式分道扬镳的地方 GlobalTransactionScanner, 在这里进行了 TCC 的判断:

if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
}

看来, 这个 TCCBeanParserUtils.isTccAutoProxy 是判断 TCC 模式的关键, 我们不妨看一看它的内容:

public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
    RemotingDesc remotingDesc = null;
    // 在这个匹配过程中, 就完成了 RPC 提供方的注册 parserRemotingServiceInfo
    boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
    //is remoting bean
    if (isRemotingBean) {
        remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
        if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
            //LocalTCC
            return isTccProxyTargetBean(remotingDesc);
        } else {
            // sofa:reference / dubbo:reference, factory bean
            return false;
        }
    } else {
        //get RemotingBean description
        remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
        if (remotingDesc == null) {
            //check FactoryBean
            if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
                remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
                return isTccProxyTargetBean(remotingDesc);
            } else {
                return false;
            }
        } else {
            return isTccProxyTargetBean(remotingDesc);
        }
    }
}
// 上述的函数只是判断是不是 RPC 接口, 然后下面的才是检查注解的过程
protected static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
    if (remotingDesc == null) {
        return false;
    }
    //check if it is TCC bean
    boolean isTccClazz = false;
    Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
    Method[] methods = tccInterfaceClazz.getMethods();
    TwoPhaseBusinessAction twoPhaseBusinessAction = null;
    for (Method method : methods) {
        twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
        if (twoPhaseBusinessAction != null) {
            isTccClazz = true;
            break;
        }
    }
    if (!isTccClazz) {
        return false;
    }
    short protocols = remotingDesc.getProtocol();
    //LocalTCC
    if (Protocols.IN_JVM == protocols) {
        //in jvm TCC bean , AOP
        return true;
    }
    // sofa:reference /  dubbo:reference, AOP
    return remotingDesc.isReference();
}

我们可以看到, isTccAutoProxy 只判断它是不是 RPC 调用, 而 isTccProxyTargetBean 进行注解的检查, 在 isTccProxyTargetBean 的最后几行, 决定了是否要加入 TCC 的拦截器: 只有当接口协议为 JVM 或者是 RPC 的消费者(isReference)。这时候,我们可以大胆的猜测,TccActionInterceptor 这个拦截器肯定就是 TM 注册分支事务的拦截器, 因为只有 TM 才负责 TCC 模式的分支注册:

// TccActionInterceptor#invoke -> ActionInterceptorHandler#proceed
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                       Callback<Object> targetCallback) throws Throwable {
    Map<String, Object> ret = new HashMap<String, Object>(16);

    //TCC name
    String actionName = businessAction.name();
    BusinessActionContext actionContext = new BusinessActionContext();
    actionContext.setXid(xid);
    //set action anme
    actionContext.setActionName(actionName);
    //TODO services

    //Creating Branch Record
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);

    //set the parameter whose type is BusinessActionContext
    Class<?>[] types = method.getParameterTypes();
    int argIndex = 0;
    for (Class<?> cls : types) {
        if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    //the final parameters of the try method
    ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
    //the final result
    ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
    return ret;
}
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
                                         BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    String xid = actionContext.getXid();
    // 找到所有需要保存的参数, 存在 context 中一起发给 TC
    Map<String, Object> context = fetchActionRequestContext(method, arguments);
    context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());

    //init business context
    initBusinessContext(context, method, businessAction);
    //Init running environment context
    initFrameworkContext(context);
    actionContext.setActionContext(context);

    //init applicationData
    Map<String, Object> applicationContext = new HashMap<String, Object>(4);
    applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);
    String applicationContextStr = JSON.toJSONString(applicationContext);
    try {
        //registry branch record
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
            applicationContextStr, null);
        return String.valueOf(branchId);
    } catch (Throwable t) {
        String msg = "TCC branch Register error, xid:" + xid;
        LOGGER.error(msg, t);
        throw new FrameworkException(t, msg);
    }
}

果不其然, 这个 TccActionInterceptor 下层调用的 ActionInterceptorHandler 进行了分支注册, 而且我们可以看到, 在进行分支注册时, 它找到所有需要保存的参数, 存在 context 中一起发给 TC。

这里还有两个问题: Seata 怎么分辨出 RPC 类型的 ? Seata 在哪向 TC 进行了 TCC 接口的注册?

我们先看第一个问题, Seata 如何识别出 RPC 的呢? 换句话说, 目前 TCC 模式支持的 Dubbo, Sofa, JVM, 这三种模式又是如何识别出来的呢? 看过我 Dubbo系列文章 的同学, 可能就知道 RPC 的实现过程实际上就是代理本地接口, 加入 RPC 框架, 那么我们在进行实际的调用时, 判断一下调用的类是不是 Dubbo 的代理类名, 就知道啦。同样的, Sofa 也是这样实现的, 而 JVM 呢, 则是需要开发人员显式地加注解 LocalTCC, 然后 Seata 扫描该注解。

// DubboRemotingParser: dubbo 和 sofa 都类似如下, 判断代理类的名字
@Override
public boolean isReference(Object bean, String beanName) throws FrameworkException {
    Class<?> c = bean.getClass();
    return "com.alibaba.dubbo.config.spring.ReferenceBean".equals(c.getName())
        || "org.apache.dubbo.config.spring.ReferenceBean".equals(c.getName());
}

@Override
public boolean isService(Object bean, String beanName) throws FrameworkException {
    Class<?> c = bean.getClass();
    return "com.alibaba.dubbo.config.spring.ServiceBean".equals(c.getName())
        || "org.apache.dubbo.config.spring.ServiceBean".equals(c.getName());
}
// LocalTCCRemotingParser: JVM 直接判断注解
@Override
public boolean isReference(Object bean, String beanName) {
    Class<?> classType = bean.getClass();
    Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(classType);
    for (Class<?> interClass : interfaceClasses) {
        if (interClass.isAnnotationPresent(LocalTCC.class)) {
            return true;
        }
    }
    return false;
}
@Override
public boolean isService(Object bean, String beanName) {
    Class<?> classType = bean.getClass();
    Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(classType);
    for (Class<?> interClass : interfaceClasses) {
        if (interClass.isAnnotationPresent(LocalTCC.class)) {
            return true;
        }
    }
    return false;
}

通过这个判断过程, 我们拿到了许多信息, 我们就知道了项目中包含的所有 TCC 接口, 也知道了本节点是该 RPC 的提供方(isService)还是消费方(isReference), 在进行匹配的过程中, 如果自己是该 TCC 接口的提供方的话, 就会立即去 TC 注册, 这样 TC 就知道该 TCC 接口提交和回滚找谁了。注册的过程如下, 重点是第一行的判断逻辑, isService 时才会注册:

// DefaultRemotingParser
// 是 RPC 的提供方才会去 TC 注册
if (isService(bean, beanName)) {
    try {
        //service bean, registry resource
        Object targetBean = remotingBeanDesc.getTargetBean();
        for (Method m : methods) {
            TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
            if (twoPhaseBusinessAction != null) {
                //
                TCCResource tccResource = new TCCResource();
                tccResource.setActionName(twoPhaseBusinessAction.name());
                tccResource.setTargetBean(targetBean);
                tccResource.setPrepareMethod(m);
                tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                tccResource.setCommitMethod(ReflectionUtil
                    .getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
                        new Class[] {BusinessActionContext.class}));
                tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                tccResource.setRollbackMethod(ReflectionUtil
                    .getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
                        new Class[] {BusinessActionContext.class}));
                //registry tcc resource
                DefaultResourceManager.get().registerResource(tccResource);
            }
        }
    } catch (Throwable t) {
        throw new FrameworkException(t, "parser remting service error");
    }
}

到此为止, Seata 怎么发现 TCC 接口, 什么角色去 TC 注册 TCC 接口, 什么角色进行 TCC 分支的注册, 想必大家全都明白了, 当全局事务发生提交或回滚时, TC 可以根据前面的注册内容, 找到所有提供该 TCC 接口服务的节点, 然后向它们发送提交或者回滚请求。

// TCCResourceManager
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    // RM 从本地找到该 resourceId 对应的 TCC 接口数据
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    if (tccResource == null) {
        throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    // 找到提交函数
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    if (targetTCCBean == null || commitMethod == null) {
        throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
    }
    try {
        boolean result = false;
        //BusinessActionContext
        // 根据 TC 发来的数据, 构建事务 Context, 其中用到的 applicationData 就是调用 prepare 时使用的参数
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        // 执行提交函数
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        LOGGER.info(
            "TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:"
                + resourceId);
        if (ret != null) {
            if (ret instanceof TwoPhaseResult) {
                result = ((TwoPhaseResult)ret).isSuccess();
            } else {
                result = (boolean)ret;
            }
        }
        // 上报结果
        return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
    } catch (Throwable t) {
        String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
        LOGGER.error(msg, t);
        throw new FrameworkException(t, msg);
    }
}

我们就简单给他家看看分支提交时 RM 的处理过程, TCC 的 RM 收到提交命令后, 从内存中找到该资源对应的 commit 函数, 把事务编号, 资源编号, 还有 TCC try 过程中用到的 applicationData 都塞到这个 BusinessActionContext 中, 最后调用 TCC commit 函数,然后它还会将执行结果上报给 TC。

目前 TCC 模式的功能到这就都走完了, 在前面的理论环节我们说过, TCC 模式要做到幂等, 防倒挂, 这些虽然还没有实装在 Seata 中, 但是今后肯定会有的, 它的实现应该会很类似于 AT 模式, 在 DB 中存储分支事务状态, 随 TCC 接口的本地事务一同提交。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

相关文章
|
6月前
|
自然语言处理 监控 Dubbo
Seata常见问题之使用tcc模式配置yml如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
204 4
|
6月前
|
开发者
seata事务问题之不回滚客户端如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
423 11
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
6月前
|
监控 数据库
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
|
5月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
219 7
|
6月前
|
Dubbo 关系型数据库 MySQL
Seata常见问题之serviceA方法无法注册分支事务到Seata如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
6月前
|
Java 数据库连接 API
Seata异常捕获问题之回滚事务如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
572 11
|
6月前
|
数据库 开发者
Seata的 TCC 模式
Seata的 TCC 模式
|
6月前
|
Dubbo Oracle 关系型数据库
Seata常见问题之TC使用mysql8如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
165 0
|
6月前
|
SQL 监控 Java
Seata常见问题之报找不到全局事务可能已经完成如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
1102 0
下一篇
无影云桌面