阿里中间件seata源码剖析五:聊聊seata中全局事务的开启

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 阿里中间件seata源码剖析五:聊聊seata中全局事务的开启

在之前的文章《springcloud+eureka整合seata-tcc模式》中,我写了一个使用seata实现TCC模式的demo,这个demo中,我们使用了springcloud+eureka来实现的微服务,其中包括订单服务、账户服务和库存服务,服务聚合在订单这个服务。


我们再来看一下TCC的官方流程图,RM都注册到TC,业务开始后,TM发起全局事务,RM报告分支事务状态,如果都prepare成功,则TC会通知各个服务依次调用分支事务的commit方法,否则TC调用分支事务的rollback方法。

微信图片_20221212154051.png

这里,订单服务作为一个TM,会开启一个全局事务,本文我们就来聊一聊全局事务的开启过程。


下面我们再看一段全局事务提交成功的seata server的日志:

2020-09-23 01:08:31.624  INFO --- [Thread_1_43_500] i.s.s.coordinator.DefaultCoordinator     : Begin new global transaction applicationId: order-server,transactionServiceGroup: my_test_tx_group, transactionName: create(io.seata.sample.entity.Order),timeout:60000,xid:192.168.59.143:8091:52019904118321152
2020-09-23 01:08:31.679  INFO --- [Thread_1_44_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019904344813568, resourceId = orderApi ,lockKeys = null
2020-09-23 01:08:31.680  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=orderApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.483  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=storageApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.483  INFO --- [Thread_1_45_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019907721228288, resourceId = storageApi ,lockKeys = null
2020-09-23 01:08:32.559  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=accountApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.559  INFO --- [Thread_1_46_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019908039995392, resourceId = accountApi ,lockKeys = null
2020-09-23 01:08:32.579  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,extraData=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.627  INFO --- [Thread_1_47_500] io.seata.server.coordinator.DefaultCore  : Committing global transaction is successfully done, xid = 192.168.59.143:8091:52019904118321152.

下面我们再看一段全局事务失败回滚的seata server的日志:

2020-09-23 01:30:55.806  INFO --- [Thread_1_10_500] i.s.s.coordinator.DefaultCoordinator     : Begin new global transaction applicationId: order-server,transactionServiceGroup: my_test_tx_group, transactionName: create(io.seata.sample.entity.Order),timeout:60000,xid:192.168.59.143:8091:52025542022066176
2020-09-23 01:30:55.808  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage timeout=60000,transactionName=create(io.seata.sample.entity.Order)
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:30:55.812  INFO --- [Thread_1_11_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52025542022066176, branchId = 52025542051426304, resourceId = orderApi ,lockKeys = null
2020-09-23 01:30:55.817  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,branchType=TCC,resourceId=orderApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:30:56.059  INFO --- [rThread_1_8_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52025542022066176, branchId = 52025543087419392, resourceId = storageApi ,lockKeys = null
2020-09-23 01:30:56.059  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,branchType=TCC,resourceId=storageApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:30:56.442  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,branchType=TCC,resourceId=accountApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:30:56.442  INFO --- [Thread_1_12_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52025542022066176, branchId = 52025544693837824, resourceId = accountApi ,lockKeys = null
2020-09-23 01:30:56.737  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,extraData=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:30:56.757  INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore  : Rollback branch transaction successfully, xid = 192.168.59.143:8091:52025542022066176 branchId = 52025544693837824
2020-09-23 01:30:56.771  INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore  : Rollback branch transaction successfully, xid = 192.168.59.143:8091:52025542022066176 branchId = 52025543087419392
2020-09-23 01:30:56.777  INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore  : Rollback branch transaction successfully, xid = 192.168.59.143:8091:52025542022066176 branchId = 52025542051426304
2020-09-23 01:30:56.778  INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore  : Rollback global transaction successfully, xid = 192.168.59.143:8091:52025542022066176.

TM发送全局事务请求


开始之前,我们还是看一下我们的demo中代码:

@GlobalTransactional
public boolean create(Order order) {
    String xid = RootContext.getXID();
    LOGGER.info("------->交易开始");
    BusinessActionContext actionContext = new BusinessActionContext();
    actionContext.setXid(xid);
    boolean result = orderSaveImpl.saveOrder(actionContext, order);
    if(!result){
        throw new RuntimeException("保存订单失败");
    }
    //远程方法 扣减库存
    LOGGER.info("------->扣减库存开始storage中");
    result = storageApi.decrease(actionContext, order.getProductId(), order.getCount());
    if(!result){
        throw new RuntimeException("扣减库存失败");
    }
    LOGGER.info("------->扣减库存结束storage中");
    //远程方法 扣减账户余额
    LOGGER.info("------->扣减账户开始account中");
    result = accountApi.prepare(actionContext, order.getUserId(),order.getPayAmount());
    LOGGER.info("------->扣减账户结束account中" + result);
    LOGGER.info("------->交易结束");
    throw new RuntimeException("调用2阶段提交的rollback方法");
    //return true;
}

之前讲过,开启全局事务的注解是@GlobalTransactional,这个是使用spring的aop来实现的。代理类是GlobalTransactionalInterceptor,我们就从这个类作为入口来看。作为代理类,我们看一下invoke方法:

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass =
        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//io.seata.sample.service.OrderServiceImpl
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);//create方法
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//create方法
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            if (globalTransactionalAnnotation != null) {
                return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);//全局事务走这个分支
            } else if (globalLockAnnotation != null) {
                return handleGlobalLock(methodInvocation);
            }
        }
    }
    return methodInvocation.proceed();//走不到这儿
}
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    final GlobalTransactional globalTrxAnno) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {//execute方法会通知TC开始全局事务
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();//调用被代理方法,即上面demo的create方法
            }
            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }
            @Override
            public TransactionInfo getTransactionInfo() {
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);//rollbackRules是空集合
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        //省略异常处理
        }
    } finally {
        if (degradeCheck) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));//Google guava提供的观察者模式,监听者在当前类的onDegradeCheck方法,自动服务降级和恢复,这里暂时不详细讨论
        }
    }
}

下面我们讲一下上面方法调用的execute方法,在TransactionalTemplate类:

public Object execute(TransactionalExecutor business) throws Throwable {
     // 1 get transactionInfo
     TransactionInfo txInfo = business.getTransactionInfo();//生成逻辑在上面方法
     //省略部分代码
     // 1.1 get or create a transaction
     GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
     // 1.2 Handle the Transaction propatation and the branchType
     Propagation propagation = txInfo.getPropagation();
     SuspendedResourcesHolder suspendedResourcesHolder = null;
     try {
         //事务传播机制的代码省略
         try {
             // 2. begin transaction
             beginTransaction(txInfo, tx);
             Object rs = null;
             try {
                 // Do Your Business
                 rs = business.execute();//触发被代理方法,即demo中的create
             } catch (Throwable ex) {
                 // 3.the needed business exception to rollback.
                 completeTransactionAfterThrowing(txInfo, tx, ex);//回滚事务
                 throw ex;
             }
             // 4. everything is fine, commit.
             commitTransaction(tx);//提交事务
             return rs;
         } finally {
             //5. clear
             triggerAfterCompletion();
             cleanUp();
         }
     } finally {
         tx.resume(suspendedResourcesHolder);
     }
 }

上面的beginTransaction方法会调用TC开启全局事务然后获得xid:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    tx.begin(txInfo.getTimeOut(), txInfo.getName());//省略其他代码
}

上面的begin方法调用的是DefaultGlobalTransaction的begin,代码如下:

public void begin(int timeout, String name) throws TransactionException {
    //这里代码省略,都是判断xid是否为空,为空则抛出异常
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
}

接着调用DefaultTransactionManager的begin方法,代码如下:

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);//使用netty向TC发送开启全局事务的请求
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}

可见,开启全局事务TM首先会向TC发送通知,获得xid后再进行本地事务rpc调用,这就跟TCC的官方流程图是一致的。


总结一下这个过程的UML类图如下:

微信图片_20221212154648.png

TC处理全局事务请求


那么在TC端是怎么处理的呢?我们还是从TC server初始化讲起:


netty server初始化之后,处理请求的类是ServerHandler,这个对象的初始化在AbstractNettyRemotingServer的构造函数,代码如下:

public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
    super(messageExecutor);
    serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
    serverBootstrap.setChannelHandlers(new ServerHandler());
}
AbstractNettyRemotingServer中的channelRead方法如下:
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (!(msg instanceof RpcMessage)) {
        return;
    }
    processMessage(ctx, (RpcMessage) msg);
}

我们跟踪它处理的ServerOnRequestProcessor,在onRequestMessage方法中对DefaultCoordinator的调用,调用方法如下:

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    if (!(request instanceof AbstractTransactionRequestToTC)) {//这里的request就是GlobalBeginRequest
        throw new IllegalArgumentException();
    }
    AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
    transactionRequest.setTCInboundHandler(this);
    return transactionRequest.handle(context);
}

我们跟踪上面的代码,最后调用了GlobalBeginRequest的handle方法,代码如下:

public AbstractTransactionResponse handle(RpcContext rpcContext) {
    return handler.handle(this, rpcContext);
}

上面的handler调用的是AbstractTCInboundHandler中的handle,代码如下:

public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
    GlobalBeginResponse response = new GlobalBeginResponse();
    exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
        @Override
        public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
            try {
                doGlobalBegin(request, response, rpcContext);
            } catch (StoreException e) {
                throw new TransactionException(TransactionExceptionCode.FailedStore,
                    String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                    e);
            }
        }
    }, request, response);
    return response;
}

最终调用了DefaultCoordinator中的doGlobalBegin方法,代码如下:

protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));//这里产生了xid
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
            rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    }
}

上面的core.begin调用了DefaultCore中的begin方法,代码如下:

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);//继续跟踪createGlobalSession方法,会发现xid是通过雪花算法生成的自增id
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    session.begin();
    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
    return session.getXid();
}

总结一下上面的类调用关系,UML图如下:

微信图片_20221212154850.png

这张图上面的部分给出的是Server端从初始化到处理请求消息的相关类,而下面的部分则是消息的类型,本文我们讨论的消息类型是GlobalBeginRequest。


总结


seata开启全局事务的流程简单讲就是TM向TC发送一个申请,TC收到请求后,创建一个session,并用雪花算法生成一个xid返回给RM。


这篇文章的UML类图不仅仅是开启全局事务这个流程可以用,之后全局事务提交、回滚等流程也是可以用的。看懂了这几个图,对理解seata全局事务源码有很大的帮助。


源代码理解不正确的地方欢迎大佬们批评指正。

相关文章
|
7月前
|
开发者
seata事务问题之不回滚客户端如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
446 14
|
19天前
|
存储 NoSQL 架构师
阿里面试:聊聊 CAP 定理?哪些中间件是AP?为什么?
本文深入探讨了分布式系统中的“不可能三角”——CAP定理,即一致性(C)、可用性(A)和分区容错性(P)三者无法兼得。通过实例分析了不同场景下如何权衡CAP,并介绍了几种典型分布式中间件的CAP策略,强调了理解CAP定理对于架构设计的重要性。
50 4
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
7月前
|
监控 数据库
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
在Seata中一张表使用了联合主键,在事务回滚时报异常,改为单个主键,就没有这个异常,如何解决?
|
6月前
|
消息中间件 存储 NoSQL
阿里开源中间件一览
阿里开源中间件一览
425 2
|
7月前
|
Java 数据库连接 API
Seata异常捕获问题之回滚事务如何解决
Seata是一款开源的分布式事务解决方案,旨在提供高效且无缝的分布式事务服务;在集成和使用Seata过程中,开发者可能会遇到不同的异常问题,本合集针对Seata常见异常进行系统整理,为开发者提供详细的问题分析和解决方案,助力高效解决分布式事务中的难题。
602 17
|
7月前
|
Dubbo 关系型数据库 MySQL
Seata常见问题之serviceA方法无法注册分支事务到Seata如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
7月前
|
Kubernetes 网络协议 Java
Seata常见问题之全局事务处理中的本地会话过多 seata1.7报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
231 0
|
7月前
|
SQL 监控 Java
Seata常见问题之报找不到全局事务可能已经完成如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
1206 0
|
7月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)