Seata Transaction Manager

简介: 前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 Transaction Manager 的实现。

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 Transaction Manager 的实现,其他 Seata 相关文章均收录于 <Seata系列文章>中。

TM

TM 和 TC 一样是一个共通的模块, 无论是 AT 模式还是 TCC 模式都需要使用 TM 模块。

首先 TM 在启动的时候会去连接 TC Server, 然后然后通过该 TM Client 与 TC 模块进行通讯。在 TM 模块中最核心的接口就是 GlobalTransaction, 里面包含了全局事务的创建, 提交, 回滚过程, 其实质就是向 TC 发送 RPC 请求。

public class DefaultGlobalTransaction implements GlobalTransaction {
    // 只保留核心内容...
    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            check();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid != null) {
            throw new IllegalStateException();
        }
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [" + xid + "]");
        }

    }

    @Override
    public void commit() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid == null) {
            throw new IllegalStateException();
        }

        status = transactionManager.commit(xid);
        if (RootContext.getXID() != null) {
            if (xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[" + xid + "] commit status:" + status);
        }

    }

    @Override
    public void rollback() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid == null) {
            throw new IllegalStateException();
        }

        status = transactionManager.rollback(xid);
        if (RootContext.getXID() != null) {
            if (xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[" + xid + "] rollback status:" + status);
        }
    }
}

我们可以看到, 这个接口中实际上没做什么实际的事, 它调用 transactionManager 发送消息, 然后将涉及到的全局事务 XID 保存起来, 我们看看它把数据存在什么地方了:

public class ThreadLocalContextCore implements ContextCore {

    private ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>() {
        @Override
        protected Map<String, String> initialValue() {
            return new HashMap<String, String>();
        }

    };

    @Override
    public String put(String key, String value) {
        return threadLocal.get().put(key, value);
    }

    @Override
    public String get(String key) {
        return threadLocal.get().get(key);
    }

    @Override
    public String remove(String key) {
        return threadLocal.get().remove(key);
    }
}

看上去, 它是将 XID 存在了 ThreadLocal 中, 这样在整个 RPC 调用的上下文中都能获取到 XID。接下来, 我们看看下层的 transactionManager 都做了什么:

public class DefaultTransactionManager implements TransactionManager {
    // All PRCs here
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
        return response.getGlobalStatus();
    }
}

可以看到, TransactionManager 才是真正做实事的, 消息的发送工作都在这里完成。好了, 至此我们知道了哪个接口管理着全局事务的记录, 哪个接口真正进行 RPC 调用, 那么谁才是这些接口的真正调用者呢? Seata 使用了模板方法模式来进行这部分工作:

public class TransactionalTemplate {

    /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }
}

该模板的工作流程如下:

  1. 看看当前是不是已经在一个分布式事务中了, 如果是, 则复用现存的全局事务, 否则创建新的

    • 什么时候会出现已经存在全局事务的情况呢? 假设 A 调用了 B, A 创建了全局事务 GT1, B 碰巧也执行了上述的模板, 这时候 B 就不会创建新的全局事务, 而是使用 GT1, 这实际上是前面提到的事物的传播
  2. 如果是自己创建的全局事务, 则发 RPC 开始事务, 如果不是自己创建的则什么都不干
  3. 执行真正的业务逻辑
  4. 如果发生了异常, 如果自己创建全局事务, 才负责回滚, 否则就只管异常外抛
  5. 如果没发生异常, 如果自己创建全局事务, 才负责提交, 否则就什么都不做
  6. 清理工作

我们看到, 该模板实际上是业务服务的完整执行流程, 那我们每次都要自己在代码中通过该模板接口执行自己的业务代码吗? 当然不用, 实际上 Seata 基于 Spring 切面, 已经帮我们做了这些事, 我们只需要使用 GlobalTransactional 注解就够了, 接下来我们看看这部分内容:

public class GlobalTransactionalInterceptor implements MethodInterceptor {
    // 只保留核心代码
    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }

    private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
        return globalLockTemplate.execute(() -> {
            try {
                return methodInvocation.proceed();
            } catch (Throwable e) {
                if (e instanceof Exception) {
                    throw (Exception)e;
                } else {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

            }
        }
    }
}

我们可以看到, 上面的就是全局事务的拦截器, 它扫描 GlobalTransactionalGlobalLock, 如果是 GlobalTransactional 则用 transactionalTemplate 来执行真正的业务代码, 此外还从注解中拿出配置的回滚条件, 超时时间等配置, 给 transactionalTemplate 使用。

大家也看到了这个拦截器中, 还有一个 globalLockTemplate, 实际上这个是在 RM 中使用的, 至于为什么, 使用我们在前面的理论环节已经介绍了, 这里就不赘述了, 而且该模板代码也很简单, 就是加锁->执行->放锁, 并且这里的加锁和放锁只是改变 Context 中的标志位, 真正通过 RPC 进行锁确认的过程, 我们后面会介绍。

至此, 我们知道了 TM 是如何觉察到全局事务需求(GlobalTransactional 注解), 如何创建全局事务(RPC 调用 TC 接口), 如何通过模板方法模式完成对业务的封装(GlobalTransactionTemplate)。那么还有一个问题, 事务信息是怎么传递的呢, TM 怎么将全局事务 XID 传递给 RPC 的提供者的呢? 这部分, 根据 RPC 框架的不同, 需要不同的实现, 但是本质上都是一样的, 拦截 RPC 的调用过程, 在 RPC 请求中加一个隐藏属性来存储 XID, RPC 的提供方从请求中获取到该隐藏属性, 然后存储在事务 Context 的 ThreadLocal 中, 我们以 Dubbo 为例, 看一看它是怎么做的:

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
            // 如果当前存在 xid 说明是 RPC 发起方, 将 xid 存在 RPC context 中, 它会随着 RPC request 一起发送
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            // 否则他就是 RPC 提供方, 它从 rpc context 中拿到 xid, 并设置到事务 context 中
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);

        } finally {
            // 最后清除事务 context 中绑定的 xid, 防止 ThreadLocal 内容污染下次调用
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}

看过我之前发的 Dubbo系列文章 的同学, 可能还记得 Dubbo 是通过一个 Filter 的概念来表示 AOP 特性的,Filter 的注入基于 Dubbo 的 SPI, 而 Seata 这里所做的就是实现了一个 Dubbo Filter, 把事务 Context 和 RPCContext 中的数据做一下绑定。其他 RPC 框架的支持方案基本类似, 这里就不再赘述了, 值得一提的是, 如果全局事务中的各个微服务是通过 HTTP 请求来互相调用的话, 可以将 XID 存储在 HTTP Header 中, 在官方的提供的 Sample 中有一份样例代码:

// 实现 servlet filter, 这样服务提供者能从 Http request 中获取 xid 并绑定进事务 Context
@Component
public class SeataFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }
}
// 实现 ClientHttpRequestInterceptor, 服务请求方讲 xid 塞入 http request header
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {

    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }
}
// 找到所有 RestTemplate 将 SeataRestTemplateInterceptor 注入
@Configuration
public class SeataRestTemplateAutoConfiguration {
    @Autowired(required = false)
    private Collection<RestTemplate> restTemplates;
    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
            Iterator var1 = this.restTemplates.iterator();
            while (var1.hasNext()) {
                RestTemplate restTemplate = (RestTemplate) var1.next();
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }
    }
}

至此, TM 的重要功能就介绍完了, 值得注意的是事务的传播过程不仅仅是 TM -> RM, RM -> RM 也会进行。接下来, 我们看一看 Seata 两种分支事务类型 AT 和 TCC 的实现方案。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

相关文章
|
存储 编解码 负载均衡
Seata Transaction Coordinator
前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中最核心的模块 Transaction Coordinator 的实现。
|
6月前
|
Nacos 数据库
分布式事务解决方案Seata
分布式事务解决方案Seata
93 1
|
6月前
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
6月前
|
关系型数据库 MySQL 数据库
分布式事务Seata
分布式事务Seata
|
6月前
|
存储 关系型数据库 MySQL
基于Seata实现分布式事务
通过以上步骤,你可以使用 Seata 实现分布式事务,确保在微服务架构中的事务一致性。Seata 支持多种语言和框架,能够满足不同业务场景的需求。欢迎关注威哥爱编程,一起学习成长。
148 1
|
3月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
81 2
|
3月前
|
Java 关系型数据库 MySQL
(二十七)舞动手指速写一个Seata-XA框架解决棘手的分布式事务问题
相信大家对于事务问题都不陌生,在之前《MySQL事务篇》中曾详解过MySQL的事务机制,在传统的单库环境下开发,咱们可依赖于MySQL所提供的事务机制,来确保单个事务内的一组操作,要么全部执行成功,要么全部执行失败。
|
3月前
|
Java Nacos Docker
"揭秘!Docker部署Seata遇上Nacos,注册成功却报错?这些坑你不得不防!一网打尽解决秘籍,让你的分布式事务稳如老狗!"
【8月更文挑战第15天】在微服务架构中,Nacos搭配Seata确保数据一致性时,Docker部署Seata后可能出现客户端连接错误,如“can not connect to services-server”。此问题多由网络配置不当、配置文件错误或版本不兼容引起。解决策略包括:调整Docker网络设置确保可达性;检查并修正`file.conf`和`registry.conf`中的Nacos地址和端口;验证Seata与Nacos版本兼容性;修改配置后重启服务;参考官方文档和最佳实践进行配置。通过这些步骤,能有效排除故障,保障服务稳定运行。
192 0
|
6月前
|
Java 数据库连接 API
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
分布式事物【XA强一致性分布式事务实战、Seata提供XA模式实现分布式事务】(五)-全面详解(学习总结---从入门到深化)
121 0

热门文章

最新文章