【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?

本文正式进入Seata最核心的全局事务执行流程。

二、全局事务执行的入口

【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务一文,我们知道了所谓的@GlobalTransactional注解开启全局事务,其实就是给类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。但是动态代理对象是针对类的;

1、拦截器GlobalTransactionalInterceptor

当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,当调用Bean时,拦截器GlobalTransactionalInterceptor会做进一步处理,保证只有加了@GlobalTransactional注解的方法才会开启全局事务。

GlobalTransactionalInterceptor类的继承图:

在这里插入图片描述

GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    // method invocation是一次方法调用,一定是针对某个对象的方法调用;
    // methodInvocation.getThis()就是拿到当前方法所属的对象;
    // AopUtils.getTargetClass()获取到当前实例对象所对应的Class
    Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;

    // 通过反射获取到被调用目标Class的method方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

    // 如果目标method不为空,并且方法的DeclaringClass不是Object
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        // 通过BridgeMethodResolver寻找method的桥接方法
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 获取目标方法的@GlobalTransactional注解
        final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
        // 如果目标方法被@GlobalLock注解标注,获取到@GlobalLock注解内容
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        // 如果禁用了全局事务 或 开启了事务降级检查并且降级检查次数大于等于降级检查允许的次数
        // 则localDisable等价于全局事务被禁用了
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);

        // 如果全局事务没有被禁用
        if (!localDisable) {
            // 全局事务注解不为空 或者 AOP切面全局事务核心配置不为空
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    // 构建一个AOP切面全局事务核心配置,配置的数据从全局事务注解中取
                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                } else {
                    transactional = this.aspectTransactional;
                }
                // 真正处理全局事务的入口
                return handleGlobalTransaction(methodInvocation, transactional);
            } else if (globalLockAnnotation != null) {
                // 获取事务锁
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    // 直接运行目标方法
    return methodInvocation.proceed();
}

invoke()方法解析

1)方法入参--MethodInvocation

invoke()方法的入参为MethodInvocation,MethodInvocation是一次方法调用,并且是针对某个对象的方法调用;

  • methodInvocation.getThis()会拿到当前方法所属的对象;

在这里插入图片描述

在通过methodInvocation.getThis()会拿到当前方法所属的对象时,如果获取到的是null,则使用AopUtils.getTargetClass()获取到当前实例对象所对应的Class(如果被AOP代理,则是代理类,否则是普通类)。

2)判断目标方法是否需要开启全局事务

直接通过反射拿到目标Class的method方法;如果method不为空,并且method所属的类不是Object类;再判断如果method直接或间接被GlobalTransactional注解标注,并且没有禁用全局事务,则再进一步判断全局事务是否被禁用,如果没有被禁用则执行全局事务。

在这里插入图片描述

3)开始处理全局事务

handleGlobalTransaction()方法中真正开始进行全局事务的处理。方法具体内容见<三、全局事务执行>

2、不用开启全局事务的情况

1)全局事务被禁用

在判断完method直接或间接被@GlobalTransactional标注之后,会判断全局事务是否被禁用,如果被禁用则至今运行目标方法。
在这里插入图片描述
禁用全局事务有两种方式:

1> 显示的设置disable属性

  • 配置service.disableGlobalTransaction,默认为false,表示不禁用全局事务;

在这里插入图片描述

2> 开启了事务降级检查,并且降级检查次数大于等于降级检查允许的次数

  • 配置client.tm.degradeCheck,默认为false,表示不开启事务降级检查;
  • 配置client.tm.degradeCheckAllowTimes,只有当开启事务降级检查,这个配置才有意义;

在这里插入图片描述

2)某一个类被标注的注解,但Object超类下的所有方法仍都不会开启全局事务

在GlobalTransactionalInterceptor#invoke()方法中会判断如果目标类的方法是Object类下的方法,则不会执行全局事务;

在这里插入图片描述

3)某一个方法标注了事务注解,其余方法没标注,并且类没有被标注,其余方法都不会开启全局事务

假如我们调用TradeService类中没有标注@GlobalTransactional注解的test()方法(且 TradeService类也没有标注@GlobalTransaction注解);
在这里插入图片描述

invoke()方法中会再次判断 当前调用的bean的方法 或 方法所处的类上是否标注了@GlobalTransactional注解,如果没有标注,则执行运行目标方法;否则才会以全局事务的方式执行方法。

三、全局事务执行

在上面我们聊了GlobalTransactionalInterceptor#handleGlobalTransaction()方法会进行全局事务的处理;

在这里插入图片描述
全局事务的执行会交给全局事务执行业务逻辑的模板TransactionalTemplate,并将目标方法封装到TransactionalExecutor中作为全局事务中执行业务逻辑的回调。

全局事务执行模板TransactionalTemplate

全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:

在这里插入图片描述

具体代码 和 注释:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
    // 刚开始一个全局事务时,肯定是没有全局事务的
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        // 根据事务的隔离级别做不同的处理
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    // 事务存在,则挂起事务(默认将xid从RootContext中移除)
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        if (tx == null) {
            // 创建全局事务(角色为事务发起者),并关联全局事务管理器
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
                // 每个分支事务都会去运行
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 如果全局事务执行发生了异常,则回滚;
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局事务和分支事务运行无误,提交事务;
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 全局事务完成之后做一些清理工作
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            // 如果有挂起的全局事务,则恢复全局事务
            tx.resume(suspendedResourcesHolder);
        }
    }
}

整个全局事务的执行由八步组成:

  1. 从线程本地变量副本中获取到xid,进而判断是否存在一个全局事务;
  2. 根据事务的隔离级别,对已存在的全局事务做不同的处理,包括:挂起事务、新建一个事务.....
    最后如果事务为空,则创建一个新的全局事务(刚开始一个新的全局事务时,会走进这个逻辑)
  3. 开启一个全局事务;
  4. 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
  5. 如果全局事务执行发生了异常,则通知TC回滚全局事务和所有的分支事务;
  6. 如果全局事务和分支事务运行无误,提交事务;
  7. 无论全局事务是否运行成功,都需要清理占用的全局锁资源;
  8. 最后,如果存在被挂起的全局事务,则恢复全局事务。

下面我们针对每一步具体来看;

1、第一步:判断是否存在一个全局事务

因为执行分支事务时,分支事务的业务方法也有可能被@GlobalTransactional注解直接或间接修饰,进而导致分支事务和全局事务的执行入口是一样的;所以需要先判断是否存在一个全局事务(而当存在全局事务时,分支事务应该如何执行,我们下一篇文章讨论)。

在这里插入图片描述

刚开始执行一个全局事务时,当前线程本地变量副本中的xid为null,即不存在一个全局事务。

2、第二步:根据事务的隔离级别做不同的处理

在这里插入图片描述
默认事务的隔离级别为REQUIRED:即:如果当前存在一个事务,则加入事务;否者新建一个事务。

由于刚开始执行一个全局事务时,不存在事务,所以默认会新建一个全局事务。

GlobalTransactionContext.createNew()负责新建一个全局事务:
在这里插入图片描述
在这里插入图片描述

6种事务隔离级别的具体逻辑

1> NOT_SUPPORTED

  • 不支持事务: 如果事务存在,则挂起事务(默认将xid从RootContext中移除,记录下挂起的事务资源)
    在这里插入图片描述
    在这里插入图片描述

2> REQUIRES_NEW

  • 新建一个事务:如果事务存在,则挂起事务,再新建一个事务。
    在这里插入图片描述

3> SUPPORTS

  • 支持事务:如果当前存在事务,则加入事务,不存在事务,则以非事务方式执行。
    在这里插入图片描述

4> REQUIRED(默认事务模式)

  • 必须有事务:如果当前存在一个事务,则加入事务;否者新建一个事务。
    在这里插入图片描述

5> NEVER

  • 不支持事务:如果当前存在事务,则报错;否则以非事务方式执行。
    在这里插入图片描述

6> MANDATORY

  • 强制使用事务:如果当前不存在事务,则报错;否则加入事务执行。
    在这里插入图片描述

3、第三步:开启全局事务

在这里插入图片描述

在这里插入图片描述

在开启全局事务前后会有钩子函数,默认开启全局事务前后的两个钩子中没有任何实现,如果有需要可以自己定制。这个业务执行前后的钩子函数在Spring体系中随处可见。

整个开启全局事务的逻辑如下:

  1. 开启全局事务时,会首先判断事务的角色是否Launcher,即全局事务;刚开始执行一个全局事务时,创建出来的DefaultGlobalTransaction,其role就是Launcher,也就是说事务角色为全局事务。

    • 如果事务的角色不是全局事务,则会断言xid不许为null,否者抛出异常IllegalStateException
  2. 当事务为全局事务时,首选断言xid为null,否者抛出异常IllegalStateException因为超时重试机制的缘故,会再次判断线程本地上下文中的xid是否为null,如果不为null,同样抛出异常IllegalStateException
  3. 请求TC(seata-server)开启全局事务,并获取到全局事务xid。
  4. 请求TC开启全局事务之后,设置事务的状态为开启,并将全局事务xid绑定到线程本地变量副本上。

下面着重看一下TM如何请求TC开启全局事务并获取到xid?

TM如何请求TC开启全局事务

在这里插入图片描述
全局事务发起者TM,会通过netty和TC进行网络通信;其中包括对seata-server集群的负载均衡,在获取到相应seata-server实例对应的channel之后,会进步处理请求的发送和相应结果的接收。

在这里插入图片描述

在写Channel之前,channelWritableCheck()方法会检查channel是否可写。

TM / RM 和TC的RPC通信均是异步进行的:

  • TM / RM 发送请求时,将封装了CompletableFuture的MessageFuture放到futures(ConcurrentHashMap<Integer, MessageFuture>)中;
  • TC处理完请求之后,会通过netty框架发送响应到TM / RM 的AbstractNettyRemoting中,其再将futures中的MessageFuture完成,发送请求的代码段中messageFuture.get()会获取到返回值,停止阻塞。

TM发送请求之后,TC如何接收请求,如何处理请求?

TC接收到TM的请求如何开启全局事务

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:
在这里插入图片描述

又由于TM发送开启事务请求时的RPCMessage的body为GlobalBeginRequest:
在这里插入图片描述

所以进入到:
在这里插入图片描述

又由于在DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

在这里插入图片描述

所以进入到:
在这里插入图片描述

AbstractExceptionHandler#exceptionHandleTemplate()方法只是运行方法的入参Callback,即接着会进入到:

在这里插入图片描述

DefaultCore执行开启全局事务的业务逻辑

DefaultCore#begin()方法负责开启全局事务的业务逻辑,方法的入参包括:开启全局事务的应用程序名称、事务服务分组名称、事务名称(开启全局事务的方法名以及方法的入参类型)、事务超时时间。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 创建一个全局事务会话
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    // 通过MDC把XID放入线程本地变量ThreadLocal中(MDC是Slf4j提供的工具)
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 添加对全局事务会话生命周期的监听
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 开启全局事务会话
    session.begin();

    // transaction start event
    // 发布全局事务开启事件 做指标监控
    MetricsPublisher.postSessionDoingEvent(session, false);

    // 返回全局事务会话的xid
    return session.getXid();
}

seata-server开启全局事务的流程:

  • 创建一个全局会话GlobalSession;
  • 通过MDC把XID放入线程本地变量ThreadLocal中,并添加对全局事务会话生命周期的监听;
  • 开启全局事务会话;
  • 发布全局事务开启事件 做指标监控;
  • 返回全局事务会话的xid。

1> 第一步:创建全局会话GlobalSession

在这里插入图片描述

创建全局会话的最主要的点是根据雪花算法生成全局事务ID(transactionId)、XID(seata server的IP、Port和transactionId使用:拼接到一起)。

Seata如何使用雪花算法生成全局事务ID的见文章:【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?

2> 第二步:把XID放入线程本地变量副本,并添加对全局事务会话生命周期的监听

在这里插入图片描述

3> 第三步:开启全局事务会话

在这里插入图片描述
开启全局事务会话的逻辑主要在遍历所有的生成周期监听函数,执行begin事件;

根据我们启动Seata Server时选择的store.mode,会执行不同的SessionLifecycleListener
在这里插入图片描述

博主启动Seata Server时store.mode = db,所以我这里的SessionLifecycleListenerDataBaseSessionManager

在这里插入图片描述

DataBaseSessionManager执行begin事件的链路如下:

在这里插入图片描述
在这里插入图片描述

这里其实就是将全局事务会话信息持久化到DB中:

  • 首先将全局事务会话信息封装到GlobalTransactionDO模型中;
    在这里插入图片描述
  • 然后使用JDBC将全局事务会话信息持久化到表global_table中;
    在这里插入图片描述

所谓的开启全局事务会话,其实就是将全局事务会话信息持久化到Store.mode中。

4> 第四步:发布全局事务开启事件 做指标监控

在这里插入图片描述

这一块对了解seata事务的执行主流程没影响,不需要耗费特别大的精力关注,如果有指标监控的需求再重点看。

5> 返回全局事务会话的xid

在这里插入图片描述

4、第四步 --- 第八步:见下一篇博文

点个关注、订阅订阅专栏,下一篇系列文章更精彩。

执行业务方法(AT模式下)、全局事务回滚、全局事务提交、全局锁资源释放见下一篇博文。

四、总结

本文重点聊了Seata事务执行流程中TM、TC中如何开启全局事务;其中设计几个比较关键的类:

  • TransactionalExecutor --> 全局事务执行组件
  • TransactionalTemplate --> 全局事务生命周期模板管理组件,负责管理事务的生命周期;
  • TransactionManager --> 全局事务管理组件,负责执行事务的业务逻辑;
  • DefaultCore --> Seata Server端事务业务的执行逻辑,封装了AT、TCC、Saga、XA分布式事务模式的具体实现。
相关文章
|
15天前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
15天前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
15天前
|
Java 对象存储 开发者
解析Spring Cloud与Netflix OSS:微服务架构中的左右手如何协同作战
Spring Cloud与Netflix OSS不仅是现代微服务架构中不可或缺的一部分,它们还通过不断的技术创新和社区贡献推动了整个行业的发展。无论是对于初创企业还是大型组织来说,掌握并合理运用这两套工具,都能极大地提升软件系统的灵活性、可扩展性以及整体性能。随着云计算和容器化技术的进一步普及,Spring Cloud与Netflix OSS将继续引领微服务技术的发展潮流。
30 0
|
26天前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
2月前
|
消息中间件 测试技术 API
深入解析微服务架构的设计与实践
在软件工程领域,"分而治之"的策略一直是解决复杂问题的有效方法。微服务架构作为这一策略的现代体现,它通过将大型应用程序分解为一组小的、独立的服务来简化开发与部署。本文将带你了解微服务的核心概念,探讨设计时的关键考虑因素,并分享实践中的一些经验教训,旨在帮助开发者更好地构建和维护可扩展的系统。
|
2月前
|
数据库 C++ Ruby
为什么你应该选择分布式平台与微服务?
为什么你应该选择分布式平台与微服务?
|
2月前
|
监控 Go API
带你十天轻松搞定 Go 微服务之大结局(分布式事务)
带你十天轻松搞定 Go 微服务之大结局(分布式事务)
|
2月前
|
Java 数据库连接 微服务
揭秘微服务架构下的数据魔方:Hibernate如何玩转分布式持久化,实现秒级响应的秘密武器?
【8月更文挑战第31天】微服务架构通过将系统拆分成独立服务,提升了可维护性和扩展性,但也带来了数据一致性和事务管理等挑战。Hibernate 作为强大的 ORM 工具,在微服务中发挥关键作用,通过二级缓存和分布式事务支持,简化了对象关系映射,并提供了有效的持久化策略。其二级缓存机制减少数据库访问,提升性能;支持 JTA 保证跨服务事务一致性;乐观锁机制解决并发数据冲突。合理配置 Hibernate 可助力构建高效稳定的分布式系统。
51 0
|
2月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
40 0
|
2月前
|
消息中间件 SQL 关系型数据库
go-zero微服务实战系列(十、分布式事务如何实现)
go-zero微服务实战系列(十、分布式事务如何实现)