引言
前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 TCC 模式分支事务的实现,其他 Seata 相关文章均收录于 <Seata系列文章>中。
TCC 模式
先简单介绍一个 Seata 中 TCC 的使用方式, 然后我们在顺着它的使用方式, 一点点深入其实现方案。
在 Seata TCC 模式中, 每个 RM 都需要将 TCC 接口以 RPC 的形式暴露出去, 同时向 TC 中注册, 告诉 TC 自己是某一 TCC 接口的提供方, 这样如果发生提交或者回滚时, TC 就知道该去找谁了。然后 TM 在进行 TCC 调用之前先去 TC 注册分支事务,告诉 TC 这个分支事务用的是哪个 TCC 接口,然后才通过 RPC 调用 TCC 接口的 try 方法,当发生全局事务提交或者回滚时, TC 会直接通知该 TCC 接口的提供方进行分支处理。
public interface TccActionTwo {
/**
* Prepare boolean.
*
* @param actionContext the action context
* @param b the b
* @param list the list
* @return the boolean
*/
@TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "b") String b,
@BusinessActionContextParameter(paramName = "c",index = 1) List list);
/**
* Commit boolean.
*
* @param actionContext the action context
* @return the boolean
*/
public boolean commit(BusinessActionContext actionContext);
/**
* Rollback boolean.
*
* @param actionContext the action context
* @return the boolean
*/
public boolean rollback(BusinessActionContext actionContext);
}
上面的代码就是一个 TCC 接口的样例, 通过 TwoPhaseBusinessAction 注解,接口可以告诉 Seata 哪个是 prepare 函数, 该注解中标识了该 TCC 的资源名, 回滚和提交函数名, 在 prepare 函数的参数中, 我们可以看到 BusinessActionContextParameter 注解, 它标识了哪些参数在回滚或提交时也需要用到, 因为 TCC 模式中 RM 是不在本地存储参数信息的, 这些数据都存在 TC 中, 所以 TM 在进行 RPC 调用前, 会根据该注解将回滚或提交时需要用到的参数存储在 TC 中。如果发生回滚或者提交, TC 要把调用 prepare 时使用到的的这些参数存在 Context 里, 发送给 RM, 这样 RM 就可以用到这些参数。
知道了 TCC 的工作流程后, 就要深入 TCC 模式的实现了, 我们就得回到最初 AT 模式 和 TCC 模式分道扬镳的地方 GlobalTransactionScanner, 在这里进行了 TCC 的判断:
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
}
看来, 这个 TCCBeanParserUtils.isTccAutoProxy 是判断 TCC 模式的关键, 我们不妨看一看它的内容:
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
RemotingDesc remotingDesc = null;
// 在这个匹配过程中, 就完成了 RPC 提供方的注册 parserRemotingServiceInfo
boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
//is remoting bean
if (isRemotingBean) {
remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
//LocalTCC
return isTccProxyTargetBean(remotingDesc);
} else {
// sofa:reference / dubbo:reference, factory bean
return false;
}
} else {
//get RemotingBean description
remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
if (remotingDesc == null) {
//check FactoryBean
if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
return isTccProxyTargetBean(remotingDesc);
} else {
return false;
}
} else {
return isTccProxyTargetBean(remotingDesc);
}
}
}
// 上述的函数只是判断是不是 RPC 接口, 然后下面的才是检查注解的过程
protected static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
if (remotingDesc == null) {
return false;
}
//check if it is TCC bean
boolean isTccClazz = false;
Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
Method[] methods = tccInterfaceClazz.getMethods();
TwoPhaseBusinessAction twoPhaseBusinessAction = null;
for (Method method : methods) {
twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
isTccClazz = true;
break;
}
}
if (!isTccClazz) {
return false;
}
short protocols = remotingDesc.getProtocol();
//LocalTCC
if (Protocols.IN_JVM == protocols) {
//in jvm TCC bean , AOP
return true;
}
// sofa:reference / dubbo:reference, AOP
return remotingDesc.isReference();
}
我们可以看到, isTccAutoProxy 只判断它是不是 RPC 调用, 而 isTccProxyTargetBean 进行注解的检查, 在 isTccProxyTargetBean 的最后几行, 决定了是否要加入 TCC 的拦截器: 只有当接口协议为 JVM 或者是 RPC 的消费者(isReference)。这时候,我们可以大胆的猜测,TccActionInterceptor 这个拦截器肯定就是 TM 注册分支事务的拦截器, 因为只有 TM 才负责 TCC 模式的分支注册:
// TccActionInterceptor#invoke -> ActionInterceptorHandler#proceed
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(16);
//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName);
//TODO services
//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
BusinessActionContext actionContext) {
String actionName = actionContext.getActionName();
String xid = actionContext.getXid();
// 找到所有需要保存的参数, 存在 context 中一起发给 TC
Map<String, Object> context = fetchActionRequestContext(method, arguments);
context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());
//init business context
initBusinessContext(context, method, businessAction);
//Init running environment context
initFrameworkContext(context);
actionContext.setActionContext(context);
//init applicationData
Map<String, Object> applicationContext = new HashMap<String, Object>(4);
applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);
String applicationContextStr = JSON.toJSONString(applicationContext);
try {
//registry branch record
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
applicationContextStr, null);
return String.valueOf(branchId);
} catch (Throwable t) {
String msg = "TCC branch Register error, xid:" + xid;
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}
果不其然, 这个 TccActionInterceptor 下层调用的 ActionInterceptorHandler 进行了分支注册, 而且我们可以看到, 在进行分支注册时, 它找到所有需要保存的参数, 存在 context 中一起发给 TC。
这里还有两个问题: Seata 怎么分辨出 RPC 类型的 ? Seata 在哪向 TC 进行了 TCC 接口的注册?
我们先看第一个问题, Seata 如何识别出 RPC 的呢? 换句话说, 目前 TCC 模式支持的 Dubbo, Sofa, JVM, 这三种模式又是如何识别出来的呢? 看过我 Dubbo系列文章 的同学, 可能就知道 RPC 的实现过程实际上就是代理本地接口, 加入 RPC 框架, 那么我们在进行实际的调用时, 判断一下调用的类是不是 Dubbo 的代理类名, 就知道啦。同样的, Sofa 也是这样实现的, 而 JVM 呢, 则是需要开发人员显式地加注解 LocalTCC
, 然后 Seata 扫描该注解。
// DubboRemotingParser: dubbo 和 sofa 都类似如下, 判断代理类的名字
@Override
public boolean isReference(Object bean, String beanName) throws FrameworkException {
Class<?> c = bean.getClass();
return "com.alibaba.dubbo.config.spring.ReferenceBean".equals(c.getName())
|| "org.apache.dubbo.config.spring.ReferenceBean".equals(c.getName());
}
@Override
public boolean isService(Object bean, String beanName) throws FrameworkException {
Class<?> c = bean.getClass();
return "com.alibaba.dubbo.config.spring.ServiceBean".equals(c.getName())
|| "org.apache.dubbo.config.spring.ServiceBean".equals(c.getName());
}
// LocalTCCRemotingParser: JVM 直接判断注解
@Override
public boolean isReference(Object bean, String beanName) {
Class<?> classType = bean.getClass();
Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(classType);
for (Class<?> interClass : interfaceClasses) {
if (interClass.isAnnotationPresent(LocalTCC.class)) {
return true;
}
}
return false;
}
@Override
public boolean isService(Object bean, String beanName) {
Class<?> classType = bean.getClass();
Set<Class<?>> interfaceClasses = ReflectionUtil.getInterfaces(classType);
for (Class<?> interClass : interfaceClasses) {
if (interClass.isAnnotationPresent(LocalTCC.class)) {
return true;
}
}
return false;
}
通过这个判断过程, 我们拿到了许多信息, 我们就知道了项目中包含的所有 TCC 接口, 也知道了本节点是该 RPC 的提供方(isService)还是消费方(isReference), 在进行匹配的过程中, 如果自己是该 TCC 接口的提供方的话, 就会立即去 TC 注册, 这样 TC 就知道该 TCC 接口提交和回滚找谁了。注册的过程如下, 重点是第一行的判断逻辑, isService 时才会注册:
// DefaultRemotingParser
// 是 RPC 的提供方才会去 TC 注册
if (isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
//
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
new Class[] {BusinessActionContext.class}));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remting service error");
}
}
到此为止, Seata 怎么发现 TCC 接口, 什么角色去 TC 注册 TCC 接口, 什么角色进行 TCC 分支的注册, 想必大家全都明白了, 当全局事务发生提交或回滚时, TC 可以根据前面的注册内容, 找到所有提供该 TCC 接口服务的节点, 然后向它们发送提交或者回滚请求。
// TCCResourceManager
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
// RM 从本地找到该 resourceId 对应的 TCC 接口数据
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
// 找到提交函数
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
try {
boolean result = false;
//BusinessActionContext
// 根据 TC 发来的数据, 构建事务 Context, 其中用到的 applicationData 就是调用 prepare 时使用的参数
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
// 执行提交函数
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info(
"TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:"
+ resourceId);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
}
// 上报结果
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}
我们就简单给他家看看分支提交时 RM 的处理过程, TCC 的 RM 收到提交命令后, 从内存中找到该资源对应的 commit 函数, 把事务编号, 资源编号, 还有 TCC try 过程中用到的 applicationData 都塞到这个 BusinessActionContext 中, 最后调用 TCC commit 函数,然后它还会将执行结果上报给 TC。
目前 TCC 模式的功能到这就都走完了, 在前面的理论环节我们说过, TCC 模式要做到幂等, 防倒挂, 这些虽然还没有实装在 Seata 中, 但是今后肯定会有的, 它的实现应该会很类似于 AT 模式, 在 DB 中存储分支事务状态, 随 TCC 接口的本地事务一同提交。
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server