上篇文章中,我们以TCC模式的demo为例,讲解了seata中全局事务的开启。在这个demo中,TM作为一个全局事务的管理者,会依次调用订单服务、账户服务和库存服务,如果其中一个服务抛出异常,TM就会调用失败抛出异常,这时就会通知TC,进而TC会通知RM进行事务的回滚。如果TM没有异常,就会通知TC进行事务的commit,进而TC通知RM进行事务提交。
这个过程我用下面的时序图来表示:
具体在RM中,上面的2阶段提交是怎么实现的呢?我们先来看一下账户服务中2阶段提交的接口,这个接口是供账户服务的Controller调用,代码如下:
@LocalTCC public interface AccountService { /** * 扣减账户余额 * @param xid the global transactionId * @param userId 用户id * @param money 金额 * @return prepare是否成功 */ @TwoPhaseBusinessAction(name = "accountApi", commitMethod = "commit", rollbackMethod = "rollback") boolean decrease(String xid, Long userId, BigDecimal money); /** * Commit boolean. * * @param actionContext the global transactionId * @return the boolean */ boolean commit(BusinessActionContext actionContext); /** * Rollback boolean. * * @param actionContext the global transactionId * @return the boolean */ boolean rollback(BusinessActionContext actionContext); }
上面这个接口中有3个注解,@FeignClient我们都比较熟悉了,用eureka做注册中心服务之间的调用,这个就是给TM和TC提供rpc的接口。@LocalTCC是加在类上面的注解,@TwoPhaseBusinessAction是加在方法上面的注解。
之前我们在文章《阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化》,RM初始化的时候,会向TC注册一个分支事务,这就是通过@LocalTCC这个注解,来发起注册的。而注册请求中等的resourceId,其实就是@TwoPhaseBusinessAction注解中的name属性值。
下面我们再看一段全局事务提交成功的seata server的日志:
2020-09-23 01:08:31.624 INFO --- [Thread_1_43_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: order-server,transactionServiceGroup: my_test_tx_group, transactionName: create(io.seata.sample.entity.Order),timeout:60000,xid:192.168.59.143:8091:52019904118321152 2020-09-23 01:08:31.679 INFO --- [Thread_1_44_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019904344813568, resourceId = orderApi ,lockKeys = null 2020-09-23 01:08:31.680 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=orderApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.483 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=storageApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.483 INFO --- [Thread_1_45_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019907721228288, resourceId = storageApi ,lockKeys = null 2020-09-23 01:08:32.559 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=accountApi,lockKey=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.559 INFO --- [Thread_1_46_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019908039995392, resourceId = accountApi ,lockKeys = null 2020-09-23 01:08:32.579 INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,extraData=null ,clientIp:192.168.59.1,vgroup:my_test_tx_group 2020-09-23 01:08:32.627 INFO --- [Thread_1_47_500] io.seata.server.coordinator.DefaultCore : Committing global transaction is successfully done, xid = 192.168.59.143:8091:52019904118321152.
TM通知TC事务状态
上一篇文章我们讲全局事务的开启,讲到了开启全局事务的方法调用是在类TransactionalTemplate的execute方法,代码如下:
public Object execute(TransactionalExecutor business) throws Throwable { // 1 get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.2 Handle the Transaction propatation and the branchType //省略事务传播机制的代码 try { // 2. begin transaction beginTransaction(txInfo, tx);//开启事务 Object rs = null; try { // Do Your Business rs = business.execute(); } catch (Throwable ex) { // 3.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo, tx, ex);//全局事务异常回滚 throw ex; } // 4. everything is fine, commit. commitTransaction(tx);//提交全局事务 return rs; } finally { //5. clear triggerAfterCompletion();//预留钩子,没有做什么操作 cleanUp(); } } finally { tx.resume(suspendedResourcesHolder); } }
上面的方法中,捕获到异常后,会进行全局事务的回滚,执行方法completeTransactionAfterThrowing,代码如下:
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException { //roll back if (txInfo != null && txInfo.rollbackOn(originalException)) { try { rollbackTransaction(tx, originalException); } catch (TransactionException txe) { // Failed to rollback throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException); } } else { // not roll back on this exception, so commit commitTransaction(tx); } } private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException { triggerBeforeRollback(); tx.rollback();//回滚全局事务 triggerAfterRollback(); // 3.1 Successfully rolled back throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException); }
上面tx.rollback走的是DefaultGlobalTransaction的rollback方法,代码如下:
public void rollback() throws TransactionException { //省略部分代码 int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;//重试次数是在文件里面配置的,默认5次 try { while (retry > 0) try { status = transactionManager.rollback(xid);//回滚操作 break; } catch (Throwable ex) { LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); retry--; if (retry == 0) { throw new TransactionException("Failed to report global rollback", ex); } } } }//省略部分源代码 }
上面代码中rollback调用了DefaultTransactionManager的rollback方法,最终调用netty通知TC做rollback操作,代码如下:
public GlobalStatus rollback(String xid) throws TransactionException { GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); globalRollback.setXid(xid); GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);//跟开启事务一样,这里调用netty通知TC全局事务回滚 return response.getGlobalStatus(); }
而通知TC全局事务提交的方法,跟上面的流程完全一样,从invoke方法中commitTransaction方法说起,代码如下:
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeCommit(); tx.commit(); triggerAfterCommit(); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } }
上面的tx.commit()调用DefaultGlobalTransaction的commit方法,代码如下:
public void commit() throws TransactionException { //省略部分代码 int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT; try { while (retry > 0) { try { status = transactionManager.commit(xid); break; } catch (Throwable ex) { LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage()); retry--; if (retry == 0) { throw new TransactionException("Failed to report global commit", ex); } } } }//省略部分代码 }
DefaultTransactionManager的commit方法,最终调用netty通知TC做commit操作,代码如下:
public GlobalStatus commit(String xid) throws TransactionException { GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus(); }
而上面的类关系调用我们也可以用之前开启全局事务的UML类图来表示:
TC通知RM分支事务提交
上面讲的全局事务的提交和回滚代码中,TM向RM发送了2个消息,GlobalCommitRequest和GlobalRollbackRequest,我们来看TC端是怎么处理这2个请求的。
这次我们把上篇文张的第二张UML类图贴出来,可以看到跟GlobalBeginRequest同一个继承关系的还有2个,GlobalCommitRequest和GlobalRollbackRequest,这2个就是TC对事务提交和回滚消息的处理。
我们先来看全局事务提交,在GlobalCommitRequest的handle方法,代码如下:
public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this, rpcContext);//这个方法调用了AbstractTCInboundHandler类的handle方法 } public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) { GlobalCommitResponse response = new GlobalCommitResponse(); response.setGlobalStatus(GlobalStatus.Committing); exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() { @Override public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException { try { doGlobalCommit(request, response, rpcContext);//这里调用了DefaultCoordinator类的doGlobalCommit方法 } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } //省略部分代码 }, request, response); return response; }
DefaultCoordinator类的doGlobalCommit方法如下:
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { response.setGlobalStatus(core.commit(request.getXid())/**DefaultCore的commit方法**/); }
上面DefaultCore的commit方法看一下
public GlobalStatus commit(String xid) throws TransactionException { GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // just lock changeStatus boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> { // the lock should release after branch commit // Highlight: Firstly, close the session, then no more branch can be registered. globalSession.closeAndClean(); if (globalSession.getStatus() == GlobalStatus.Begin) { globalSession.changeStatus(GlobalStatus.Committing); return true; } return false; }); if (!shouldCommit) { return globalSession.getStatus(); } if (globalSession.canBeCommittedAsync()) { globalSession.asyncCommit(); return GlobalStatus.Committed; } else { doGlobalCommit(globalSession, false); } return globalSession.getStatus(); }
看看上面的doGlobalCommit方法
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException { boolean success = true; // start committing event eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus())); if (globalSession.isSaga()) { success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying); } else { for (BranchSession branchSession : globalSession.getSortedBranches()) {//取出所有分支事务,然后提交 BranchStatus currentStatus = branchSession.getStatus(); if (currentStatus == BranchStatus.PhaseOne_Failed) { globalSession.removeBranch(branchSession); continue; } try { BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);//这儿是提交分支事务的代码 //省略部分代码 } catch (Exception ex) { StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()}); if (!retrying) { globalSession.queueToRetryCommit(); throw new TransactionException(ex); } } } if (globalSession.hasBranch()) { LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid()); return false; } } if (success) { SessionHelper.endCommitted(globalSession); // committed event eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus())); LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid()); } return success; }
上面的提交分支事务的方法在AbstractCore类,代码如下:
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException { try { BranchCommitRequest request = new BranchCommitRequest(); request.setXid(branchSession.getXid()); request.setBranchId(branchSession.getBranchId()); request.setResourceId(branchSession.getResourceId()); request.setApplicationData(branchSession.getApplicationData()); request.setBranchType(branchSession.getBranchType()); return branchCommitSend(request, globalSession, branchSession); } catch (IOException | TimeoutException e) { throw new BranchTransactionException(FailedToSendBranchCommitRequest, String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e); } }
下面这个方法就是向RM发送分支提交事务的请求
protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException { BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( branchSession.getResourceId(), branchSession.getClientId(), request); return response.getBranchStatus(); }
RM处理TC提交事务请求
RM收到这个请求后,是怎么处理的呢?还记得《阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化》中RM初始化吗,RM初始化的适合会调用AbstractNettyRemotingClient的构造函数,代码如下:
public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) { super(messageExecutor); this.transactionRole = transactionRole; clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole); clientBootstrap.setChannelHandlers(new ClientHandler());//客户端处理请求的handler clientChannelManager = new NettyClientChannelManager( new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig); }
上面的ClientHandler就是处理收到的请求的,看一下channelRead方法,代码如下:
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); }
上面的方法使用RmBranchCommitProcessor方法处理读入的请求,代码如下:
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress()); Object msg = rpcMessage.getBody(); if (LOGGER.isInfoEnabled()) { LOGGER.info("rm client handle branch commit process:" + msg); } handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg); } private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) { BranchCommitResponse resultMessage; resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);//处理提交请求 if (LOGGER.isDebugEnabled()) { LOGGER.debug("branch commit result:" + resultMessage); } try { this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage); } catch (Throwable throwable) { LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable); } }
上面提交事务提交请求的方法用的是AbstractRMHandler类,代码如下:
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToRM)) { throw new IllegalArgumentException(); } AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request; transactionRequest.setRMInboundMessageHandler(this); return transactionRequest.handle(context); }
上面调用BranchCommitRequest的handle方法,代码如下:
public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this); }
这里调用了DefaultRMHandler的handle方法,代码如下:
public BranchCommitResponse handle(BranchCommitRequest request) { return getRMHandler(request.getBranchType()).handle(request); }
最后调用了AbstractRMHandler类的handle方法:
public BranchCommitResponse handle(BranchCommitRequest request) { BranchCommitResponse response = new BranchCommitResponse(); exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() { @Override public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { doBranchCommit(request, response); } }, request, response); return response; } protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); } BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);//调用TCCResourceManager的branchCommit方法 response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch commit result: " + status); } }
TCCResourceManager的branchCommit方法代码如下:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); if (tccResource == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId)); } Object targetTCCBean = tccResource.getTargetBean(); Method commitMethod = tccResource.getCommitMethod(); if (targetTCCBean == null || commitMethod == null) { throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId)); } try { //BusinessActionContext BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);//最终触发了两阶段的commit方法 LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId); boolean result; if (ret != null) { if (ret instanceof TwoPhaseResult) { result = ((TwoPhaseResult)ret).isSuccess(); } else { result = (boolean)ret; } } else { result = true; } return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable; } catch (Throwable t) { String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid); LOGGER.error(msg, t); return BranchStatus.PhaseTwo_CommitFailed_Retryable; } }
跟踪到这里,我们就找到了TwoPhaseBusinessAction注解中定义的commitMethod。这一段代码的调用我用如下的UML类图来表示一下:
总结
本文我介绍了TCC模式中的2阶段提交,最终找到了触发TwoPhaseBusinessAction注解中commitMethod和rollbackMethod方法的代码逻辑。
通过本文的代码跟踪我们更加了解seata的交互处理方法,TC端处理请求的方法在ServerHandler类,它是AbstractNettyRemotingServer的内部类,而RM处理通知的方法在ClientHandler类,它是AbstractNettyRemotingClient的内部类。
ServerHandler类和ClientHandler类都是在处理Processor,它的抽象是RemotingProcessor接口,TC和TM、RM的处理都有具体的实现。这里给出所有的实现类,我把它分成了客户端相关的、server端相关、RM相关和TM相关的,UML类图如下: