在之前的文章《springcloud+eureka整合seata-tcc模式》中,我写了一个使用seata实现TCC模式的demo,这个demo中,我们使用了springcloud+eureka来实现的微服务,其中包括订单服务、账户服务和库存服务,服务聚合在订单这个服务。
我们再来看一下TCC的官方流程图,RM都注册到TC,业务开始后,TM发起全局事务,RM报告分支事务状态,如果都prepare成功,则TC会通知各个服务依次调用分支事务的commit方法,否则TC调用分支事务的rollback方法。
这里,订单服务作为一个TM,会开启一个全局事务,本文我们就来聊一聊全局事务的开启过程。
下面我们再看一段全局事务提交成功的seata server的日志:
2020-09-23 01:08:31.624 INFO --- [Thread_1_43_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: order-server,transactionServiceGroup: my_test_tx_group, transactionName: create(io.seata.sample.entity.Order),timeout:60000,xid:192.168.59.143:8091:52019904118321152 2020-09-23 01:08:31.679 INFO --- [Thread_1_44_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019904344813568, resourceId = orderApi ,lockKeys = null 2020-09-23 01:08:31.680 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=orderApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.483 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=storageApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.483 INFO --- [Thread_1_45_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019907721228288, resourceId = storageApi ,lockKeys = null 2020-09-23 01:08:32.559 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=accountApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.559 INFO --- [Thread_1_46_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019908039995392, resourceId = accountApi ,lockKeys = null 2020-09-23 01:08:32.579 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,extraData=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.627 INFO --- [Thread_1_47_500] io.seata.server.coordinator.DefaultCore : Committing global transaction is successfully done, xid = 192.168.59.143:8091:52019904118321152.
下面我们再看一段全局事务失败回滚的seata server的日志:
2020-09-23 01:30:55.806 INFO --- [Thread_1_10_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: order-server,transactionServiceGroup: my_test_tx_group, transactionName: create(io.seata.sample.entity.Order),timeout:60000,xid:192.168.59.143:8091:52025542022066176 2020-09-23 01:30:55.808 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage timeout=60000,transactionName=create(io.seata.sample.entity.Order) ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:30:55.812 INFO --- [Thread_1_11_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52025542022066176, branchId = 52025542051426304, resourceId = orderApi ,lockKeys = null 2020-09-23 01:30:55.817 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,branchType=TCC,resourceId=orderApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:30:56.059 INFO --- [rThread_1_8_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52025542022066176, branchId = 52025543087419392, resourceId = storageApi ,lockKeys = null 2020-09-23 01:30:56.059 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,branchType=TCC,resourceId=storageApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:30:56.442 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,branchType=TCC,resourceId=accountApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:30:56.442 INFO --- [Thread_1_12_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52025542022066176, branchId = 52025544693837824, resourceId = accountApi ,lockKeys = null 2020-09-23 01:30:56.737 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52025542022066176,extraData=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:30:56.757 INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore : Rollback branch transaction successfully, xid = 192.168.59.143:8091:52025542022066176 branchId = 52025544693837824 2020-09-23 01:30:56.771 INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore : Rollback branch transaction successfully, xid = 192.168.59.143:8091:52025542022066176 branchId = 52025543087419392 2020-09-23 01:30:56.777 INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore : Rollback branch transaction successfully, xid = 192.168.59.143:8091:52025542022066176 branchId = 52025542051426304 2020-09-23 01:30:56.778 INFO --- [Thread_1_13_500] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 192.168.59.143:8091:52025542022066176.
TM发送全局事务请求
开始之前,我们还是看一下我们的demo中代码:
@GlobalTransactional public boolean create(Order order) { String xid = RootContext.getXID(); LOGGER.info("------->交易开始"); BusinessActionContext actionContext = new BusinessActionContext(); actionContext.setXid(xid); boolean result = orderSaveImpl.saveOrder(actionContext, order); if(!result){ throw new RuntimeException("保存订单失败"); } //远程方法 扣减库存 LOGGER.info("------->扣减库存开始storage中"); result = storageApi.decrease(actionContext, order.getProductId(), order.getCount()); if(!result){ throw new RuntimeException("扣减库存失败"); } LOGGER.info("------->扣减库存结束storage中"); //远程方法 扣减账户余额 LOGGER.info("------->扣减账户开始account中"); result = accountApi.prepare(actionContext, order.getUserId(),order.getPayAmount()); LOGGER.info("------->扣减账户结束account中" + result); LOGGER.info("------->交易结束"); throw new RuntimeException("调用2阶段提交的rollback方法"); //return true; }
之前讲过,开启全局事务的注解是@GlobalTransactional,这个是使用spring的aop来实现的。代理类是GlobalTransactionalInterceptor,我们就从这个类作为入口来看。作为代理类,我们看一下invoke方法:
public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//io.seata.sample.service.OrderServiceImpl Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);//create方法 if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//create方法 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);//全局事务走这个分支 } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } } } return methodInvocation.proceed();//走不到这儿 } private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable { boolean succeed = true; try { return transactionalTemplate.execute(new TransactionalExecutor() {//execute方法会通知TC开始全局事务 @Override public Object execute() throws Throwable { return methodInvocation.proceed();//调用被代理方法,即上面demo的create方法 } public String name() { String name = globalTrxAnno.name(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } @Override public TransactionInfo getTransactionInfo() { TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(globalTrxAnno.timeoutMills()); transactionInfo.setName(name()); transactionInfo.setPropagation(globalTrxAnno.propagation()); Set<RollbackRule> rollbackRules = new LinkedHashSet<>(); for (Class<?> rbRule : globalTrxAnno.rollbackFor()) { rollbackRules.add(new RollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.rollbackForClassName()) { rollbackRules.add(new RollbackRule(rbRule)); } for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) { rollbackRules.add(new NoRollbackRule(rbRule)); } for (String rbRule : globalTrxAnno.noRollbackForClassName()) { rollbackRules.add(new NoRollbackRule(rbRule)); } transactionInfo.setRollbackRules(rollbackRules);//rollbackRules是空集合 return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { //省略异常处理 } } finally { if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent(succeed));//Google guava提供的观察者模式,监听者在当前类的onDegradeCheck方法,自动服务降级和恢复,这里暂时不详细讨论 } } }
下面我们讲一下上面方法调用的execute方法,在TransactionalTemplate类:
public Object execute(TransactionalExecutor business) throws Throwable { // 1 get transactionInfo TransactionInfo txInfo = business.getTransactionInfo();//生成逻辑在上面方法 //省略部分代码 // 1.1 get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.2 Handle the Transaction propatation and the branchType Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { //事务传播机制的代码省略 try { // 2. begin transaction beginTransaction(txInfo, tx); Object rs = null; try { // Do Your Business rs = business.execute();//触发被代理方法,即demo中的create } catch (Throwable ex) { // 3.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo, tx, ex);//回滚事务 throw ex; } // 4. everything is fine, commit. commitTransaction(tx);//提交事务 return rs; } finally { //5. clear triggerAfterCompletion(); cleanUp(); } } finally { tx.resume(suspendedResourcesHolder); } }
上面的beginTransaction方法会调用TC开启全局事务然后获得xid:
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { tx.begin(txInfo.getTimeOut(), txInfo.getName());//省略其他代码 }
上面的begin方法调用的是DefaultGlobalTransaction的begin,代码如下:
public void begin(int timeout, String name) throws TransactionException { //这里代码省略,都是判断xid是否为空,为空则抛出异常 xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); }
接着调用DefaultTransactionManager的begin方法,代码如下:
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);//使用netty向TC发送开启全局事务的请求 if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } return response.getXid(); }
可见,开启全局事务TM首先会向TC发送通知,获得xid后再进行本地事务rpc调用,这就跟TCC的官方流程图是一致的。
总结一下这个过程的UML类图如下:
TC处理全局事务请求
那么在TC端是怎么处理的呢?我们还是从TC server初始化讲起:
netty server初始化之后,处理请求的类是ServerHandler,这个对象的初始化在AbstractNettyRemotingServer的构造函数,代码如下:
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) { super(messageExecutor); serverBootstrap = new NettyServerBootstrap(nettyServerConfig); serverBootstrap.setChannelHandlers(new ServerHandler()); } AbstractNettyRemotingServer中的channelRead方法如下: public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); }
我们跟踪它处理的ServerOnRequestProcessor,在onRequestMessage方法中对DefaultCoordinator的调用,调用方法如下:
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToTC)) {//这里的request就是GlobalBeginRequest throw new IllegalArgumentException(); } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); return transactionRequest.handle(context); }
我们跟踪上面的代码,最后调用了GlobalBeginRequest的handle方法,代码如下:
public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this, rpcContext); }
上面的handler调用的是AbstractTCInboundHandler中的handle,代码如下:
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) { GlobalBeginResponse response = new GlobalBeginResponse(); exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() { @Override public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException { try { doGlobalBegin(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e); } } }, request, response); return response; }
最终调用了DefaultCoordinator中的doGlobalBegin方法,代码如下:
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()));//这里产生了xid if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } }
上面的core.begin调用了DefaultCore中的begin方法,代码如下:
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);//继续跟踪createGlobalSession方法,会发现xid是通过雪花算法生成的自增id session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); session.begin(); // transaction start event eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC, session.getTransactionName(), session.getBeginTime(), null, session.getStatus())); return session.getXid(); }
总结一下上面的类调用关系,UML图如下:
这张图上面的部分给出的是Server端从初始化到处理请求消息的相关类,而下面的部分则是消息的类型,本文我们讨论的消息类型是GlobalBeginRequest。
总结
seata开启全局事务的流程简单讲就是TM向TC发送一个申请,TC收到请求后,创建一个session,并用雪花算法生成一个xid返回给RM。
这篇文章的UML类图不仅仅是开启全局事务这个流程可以用,之后全局事务提交、回滚等流程也是可以用的。看懂了这几个图,对理解seata全局事务源码有很大的帮助。
源代码理解不正确的地方欢迎大佬们批评指正。