【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?

本文正式进入Seata最核心的全局事务执行流程。

二、全局事务执行的入口

【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务一文,我们知道了所谓的@GlobalTransactional注解开启全局事务,其实就是给类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。但是动态代理对象是针对类的;

1、拦截器GlobalTransactionalInterceptor

当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,当调用Bean时,拦截器GlobalTransactionalInterceptor会做进一步处理,保证只有加了@GlobalTransactional注解的方法才会开启全局事务。

GlobalTransactionalInterceptor类的继承图:

在这里插入图片描述

GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    // method invocation是一次方法调用,一定是针对某个对象的方法调用;
    // methodInvocation.getThis()就是拿到当前方法所属的对象;
    // AopUtils.getTargetClass()获取到当前实例对象所对应的Class
    Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;

    // 通过反射获取到被调用目标Class的method方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

    // 如果目标method不为空,并且方法的DeclaringClass不是Object
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        // 通过BridgeMethodResolver寻找method的桥接方法
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 获取目标方法的@GlobalTransactional注解
        final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
        // 如果目标方法被@GlobalLock注解标注,获取到@GlobalLock注解内容
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        // 如果禁用了全局事务 或 开启了事务降级检查并且降级检查次数大于等于降级检查允许的次数
        // 则localDisable等价于全局事务被禁用了
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);

        // 如果全局事务没有被禁用
        if (!localDisable) {
            // 全局事务注解不为空 或者 AOP切面全局事务核心配置不为空
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    // 构建一个AOP切面全局事务核心配置,配置的数据从全局事务注解中取
                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                } else {
                    transactional = this.aspectTransactional;
                }
                // 真正处理全局事务的入口
                return handleGlobalTransaction(methodInvocation, transactional);
            } else if (globalLockAnnotation != null) {
                // 获取事务锁
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    // 直接运行目标方法
    return methodInvocation.proceed();
}

invoke()方法解析

1)方法入参--MethodInvocation

invoke()方法的入参为MethodInvocation,MethodInvocation是一次方法调用,并且是针对某个对象的方法调用;

  • methodInvocation.getThis()会拿到当前方法所属的对象;

在这里插入图片描述

在通过methodInvocation.getThis()会拿到当前方法所属的对象时,如果获取到的是null,则使用AopUtils.getTargetClass()获取到当前实例对象所对应的Class(如果被AOP代理,则是代理类,否则是普通类)。

2)判断目标方法是否需要开启全局事务

直接通过反射拿到目标Class的method方法;如果method不为空,并且method所属的类不是Object类;再判断如果method直接或间接被GlobalTransactional注解标注,并且没有禁用全局事务,则再进一步判断全局事务是否被禁用,如果没有被禁用则执行全局事务。

在这里插入图片描述

3)开始处理全局事务

handleGlobalTransaction()方法中真正开始进行全局事务的处理。方法具体内容见<三、全局事务执行>

2、不用开启全局事务的情况

1)全局事务被禁用

在判断完method直接或间接被@GlobalTransactional标注之后,会判断全局事务是否被禁用,如果被禁用则至今运行目标方法。
在这里插入图片描述
禁用全局事务有两种方式:

1> 显示的设置disable属性

  • 配置service.disableGlobalTransaction,默认为false,表示不禁用全局事务;

在这里插入图片描述

2> 开启了事务降级检查,并且降级检查次数大于等于降级检查允许的次数

  • 配置client.tm.degradeCheck,默认为false,表示不开启事务降级检查;
  • 配置client.tm.degradeCheckAllowTimes,只有当开启事务降级检查,这个配置才有意义;

在这里插入图片描述

2)某一个类被标注的注解,但Object超类下的所有方法仍都不会开启全局事务

在GlobalTransactionalInterceptor#invoke()方法中会判断如果目标类的方法是Object类下的方法,则不会执行全局事务;

在这里插入图片描述

3)某一个方法标注了事务注解,其余方法没标注,并且类没有被标注,其余方法都不会开启全局事务

假如我们调用TradeService类中没有标注@GlobalTransactional注解的test()方法(且 TradeService类也没有标注@GlobalTransaction注解);
在这里插入图片描述

invoke()方法中会再次判断 当前调用的bean的方法 或 方法所处的类上是否标注了@GlobalTransactional注解,如果没有标注,则执行运行目标方法;否则才会以全局事务的方式执行方法。

三、全局事务执行

在上面我们聊了GlobalTransactionalInterceptor#handleGlobalTransaction()方法会进行全局事务的处理;

在这里插入图片描述
全局事务的执行会交给全局事务执行业务逻辑的模板TransactionalTemplate,并将目标方法封装到TransactionalExecutor中作为全局事务中执行业务逻辑的回调。

全局事务执行模板TransactionalTemplate

全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:

在这里插入图片描述

具体代码 和 注释:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
    // 刚开始一个全局事务时,肯定是没有全局事务的
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        // 根据事务的隔离级别做不同的处理
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    // 事务存在,则挂起事务(默认将xid从RootContext中移除)
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        if (tx == null) {
            // 创建全局事务(角色为事务发起者),并关联全局事务管理器
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
                // 每个分支事务都会去运行
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 如果全局事务执行发生了异常,则回滚;
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局事务和分支事务运行无误,提交事务;
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 全局事务完成之后做一些清理工作
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            // 如果有挂起的全局事务,则恢复全局事务
            tx.resume(suspendedResourcesHolder);
        }
    }
}

整个全局事务的执行由八步组成:

  1. 从线程本地变量副本中获取到xid,进而判断是否存在一个全局事务;
  2. 根据事务的隔离级别,对已存在的全局事务做不同的处理,包括:挂起事务、新建一个事务.....
    最后如果事务为空,则创建一个新的全局事务(刚开始一个新的全局事务时,会走进这个逻辑)
  3. 开启一个全局事务;
  4. 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
  5. 如果全局事务执行发生了异常,则通知TC回滚全局事务和所有的分支事务;
  6. 如果全局事务和分支事务运行无误,提交事务;
  7. 无论全局事务是否运行成功,都需要清理占用的全局锁资源;
  8. 最后,如果存在被挂起的全局事务,则恢复全局事务。

下面我们针对每一步具体来看;

1、第一步:判断是否存在一个全局事务

因为执行分支事务时,分支事务的业务方法也有可能被@GlobalTransactional注解直接或间接修饰,进而导致分支事务和全局事务的执行入口是一样的;所以需要先判断是否存在一个全局事务(而当存在全局事务时,分支事务应该如何执行,我们下一篇文章讨论)。

在这里插入图片描述

刚开始执行一个全局事务时,当前线程本地变量副本中的xid为null,即不存在一个全局事务。

2、第二步:根据事务的隔离级别做不同的处理

在这里插入图片描述
默认事务的隔离级别为REQUIRED:即:如果当前存在一个事务,则加入事务;否者新建一个事务。

由于刚开始执行一个全局事务时,不存在事务,所以默认会新建一个全局事务。

GlobalTransactionContext.createNew()负责新建一个全局事务:
在这里插入图片描述
在这里插入图片描述

6种事务隔离级别的具体逻辑

1> NOT_SUPPORTED

  • 不支持事务: 如果事务存在,则挂起事务(默认将xid从RootContext中移除,记录下挂起的事务资源)
    在这里插入图片描述
    在这里插入图片描述

2> REQUIRES_NEW

  • 新建一个事务:如果事务存在,则挂起事务,再新建一个事务。
    在这里插入图片描述

3> SUPPORTS

  • 支持事务:如果当前存在事务,则加入事务,不存在事务,则以非事务方式执行。
    在这里插入图片描述

4> REQUIRED(默认事务模式)

  • 必须有事务:如果当前存在一个事务,则加入事务;否者新建一个事务。
    在这里插入图片描述

5> NEVER

  • 不支持事务:如果当前存在事务,则报错;否则以非事务方式执行。
    在这里插入图片描述

6> MANDATORY

  • 强制使用事务:如果当前不存在事务,则报错;否则加入事务执行。
    在这里插入图片描述

3、第三步:开启全局事务

在这里插入图片描述

在这里插入图片描述

在开启全局事务前后会有钩子函数,默认开启全局事务前后的两个钩子中没有任何实现,如果有需要可以自己定制。这个业务执行前后的钩子函数在Spring体系中随处可见。

整个开启全局事务的逻辑如下:

  1. 开启全局事务时,会首先判断事务的角色是否Launcher,即全局事务;刚开始执行一个全局事务时,创建出来的DefaultGlobalTransaction,其role就是Launcher,也就是说事务角色为全局事务。

    • 如果事务的角色不是全局事务,则会断言xid不许为null,否者抛出异常IllegalStateException
  2. 当事务为全局事务时,首选断言xid为null,否者抛出异常IllegalStateException因为超时重试机制的缘故,会再次判断线程本地上下文中的xid是否为null,如果不为null,同样抛出异常IllegalStateException
  3. 请求TC(seata-server)开启全局事务,并获取到全局事务xid。
  4. 请求TC开启全局事务之后,设置事务的状态为开启,并将全局事务xid绑定到线程本地变量副本上。

下面着重看一下TM如何请求TC开启全局事务并获取到xid?

TM如何请求TC开启全局事务

在这里插入图片描述
全局事务发起者TM,会通过netty和TC进行网络通信;其中包括对seata-server集群的负载均衡,在获取到相应seata-server实例对应的channel之后,会进步处理请求的发送和相应结果的接收。

在这里插入图片描述

在写Channel之前,channelWritableCheck()方法会检查channel是否可写。

TM / RM 和TC的RPC通信均是异步进行的:

  • TM / RM 发送请求时,将封装了CompletableFuture的MessageFuture放到futures(ConcurrentHashMap<Integer, MessageFuture>)中;
  • TC处理完请求之后,会通过netty框架发送响应到TM / RM 的AbstractNettyRemoting中,其再将futures中的MessageFuture完成,发送请求的代码段中messageFuture.get()会获取到返回值,停止阻塞。

TM发送请求之后,TC如何接收请求,如何处理请求?

TC接收到TM的请求如何开启全局事务

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:
在这里插入图片描述

又由于TM发送开启事务请求时的RPCMessage的body为GlobalBeginRequest:
在这里插入图片描述

所以进入到:
在这里插入图片描述

又由于在DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

在这里插入图片描述

所以进入到:
在这里插入图片描述

AbstractExceptionHandler#exceptionHandleTemplate()方法只是运行方法的入参Callback,即接着会进入到:

在这里插入图片描述

DefaultCore执行开启全局事务的业务逻辑

DefaultCore#begin()方法负责开启全局事务的业务逻辑,方法的入参包括:开启全局事务的应用程序名称、事务服务分组名称、事务名称(开启全局事务的方法名以及方法的入参类型)、事务超时时间。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 创建一个全局事务会话
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    // 通过MDC把XID放入线程本地变量ThreadLocal中(MDC是Slf4j提供的工具)
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 添加对全局事务会话生命周期的监听
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 开启全局事务会话
    session.begin();

    // transaction start event
    // 发布全局事务开启事件 做指标监控
    MetricsPublisher.postSessionDoingEvent(session, false);

    // 返回全局事务会话的xid
    return session.getXid();
}

seata-server开启全局事务的流程:

  • 创建一个全局会话GlobalSession;
  • 通过MDC把XID放入线程本地变量ThreadLocal中,并添加对全局事务会话生命周期的监听;
  • 开启全局事务会话;
  • 发布全局事务开启事件 做指标监控;
  • 返回全局事务会话的xid。

1> 第一步:创建全局会话GlobalSession

在这里插入图片描述

创建全局会话的最主要的点是根据雪花算法生成全局事务ID(transactionId)、XID(seata server的IP、Port和transactionId使用:拼接到一起)。

Seata如何使用雪花算法生成全局事务ID的见文章:【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?

2> 第二步:把XID放入线程本地变量副本,并添加对全局事务会话生命周期的监听

在这里插入图片描述

3> 第三步:开启全局事务会话

在这里插入图片描述
开启全局事务会话的逻辑主要在遍历所有的生成周期监听函数,执行begin事件;

根据我们启动Seata Server时选择的store.mode,会执行不同的SessionLifecycleListener
在这里插入图片描述

博主启动Seata Server时store.mode = db,所以我这里的SessionLifecycleListenerDataBaseSessionManager

在这里插入图片描述

DataBaseSessionManager执行begin事件的链路如下:

在这里插入图片描述
在这里插入图片描述

这里其实就是将全局事务会话信息持久化到DB中:

  • 首先将全局事务会话信息封装到GlobalTransactionDO模型中;
    在这里插入图片描述
  • 然后使用JDBC将全局事务会话信息持久化到表global_table中;
    在这里插入图片描述

所谓的开启全局事务会话,其实就是将全局事务会话信息持久化到Store.mode中。

4> 第四步:发布全局事务开启事件 做指标监控

在这里插入图片描述

这一块对了解seata事务的执行主流程没影响,不需要耗费特别大的精力关注,如果有指标监控的需求再重点看。

5> 返回全局事务会话的xid

在这里插入图片描述

4、第四步 --- 第八步:见下一篇博文

点个关注、订阅订阅专栏,下一篇系列文章更精彩。

执行业务方法(AT模式下)、全局事务回滚、全局事务提交、全局锁资源释放见下一篇博文。

四、总结

本文重点聊了Seata事务执行流程中TM、TC中如何开启全局事务;其中设计几个比较关键的类:

  • TransactionalExecutor --> 全局事务执行组件
  • TransactionalTemplate --> 全局事务生命周期模板管理组件,负责管理事务的生命周期;
  • TransactionManager --> 全局事务管理组件,负责执行事务的业务逻辑;
  • DefaultCore --> Seata Server端事务业务的执行逻辑,封装了AT、TCC、Saga、XA分布式事务模式的具体实现。
相关文章
|
8天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
28 2
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
71 3
|
8天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
1月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
21天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
39 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
48 3
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
113 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
102 0

推荐镜像

更多
下一篇
无影云桌面