概述
Seata Server作为TC,在实现分布式事务中起到了至关重要的作用。以下是官方术语表中这对TC的介绍:
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
那么TC是如何运行来维护全局和分支事务的状态的呢,它通过怎样的机制来驱动全局事务提交或回滚的呢?
TC的主要工作
在TC服务的代码中,我们可以重点查看io.seata.server.coordinator.DefaultCoordinator
,它是TC服务在分布式事务中的核心处理器,TC服务在分布式事务中的大部分处理工作都可以在里面看到。
- 定时任务
DefaultCoordinator在初始化是就启动了5个定时任务:
- 每秒查询需要回滚的全局事务,调用指定的分支尝试回滚,达到驱动全局事务回滚的目的;
- 每秒查询需要提交的全局事务,调用指定的分支尝试提交,达到驱动全局事务提交的目的;
- 每秒查询异步提交状态的全局事务,调用对应的分支尝试异步提交;
- 每秒检查分布式事务是否超时,超时就设置为需要回滚的状态;
- 定时删除超出7天的undolog,默认是启动后3分钟开始,每天清理一次;
public void init() { // 每秒钟检查需要回滚的任务,并向RM发起回滚操作 retryRollbacking.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 每秒检查需要提交的任务,并向RM发起提交操作 retryCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 每秒检查需要异步提交的任务,向RM发起异步提交操作 asyncCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 每秒检查分布式事务是否超时,如果超时,那么设置成需要回滚 timeoutCheck.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 服务启动成功3分钟后执行一次,此后间隔24小时执行一次,把超过7天的undolog删除 undoLogDelete.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete), UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); } 复制代码
- 全局事务的开启、提交、回滚、上报以及状态查询
DefaultCoordinator另外还需要处理TM的请求,包括以下功能的处理:
- 响应TM的全局事务的开启,生成XID返回给TM;
- 响应TM全局事务的提交;
- 响应TM全局事务的回滚;
- TM上报全局事务状态给TC(目前只在Saga模式才有);
- 根据XID查询全局事务的状态;
// 1.TM发出请求开启分布式事务,TC接收请求,生成xid @Override public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) { GlobalBeginResponse response = new GlobalBeginResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() { @Override public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException { try { // 调用真正的处理逻辑 doGlobalBegin(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e); } } }, request, response); return response; } @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { // 通过applicationId、transactionServiceGroup、transactionName生成xid,并设置进响应数据 // 怎么生成的先不管 response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); // 剩下的都是打日志 if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } } 复制代码
// 2. 响应TM全局事务的提交; @Override public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) { GlobalCommitResponse response = new GlobalCommitResponse(); response.setGlobalStatus(GlobalStatus.Committing); // 同样是模版模式 exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() { @Override public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException { try { // 主要执行逻辑 doGlobalCommit(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } @Override public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) { super.onTransactionException(request, response, tex); checkTransactionStatus(request, response); } @Override public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) { super.onException(request, response, rex); checkTransactionStatus(request, response); } }, request, response); return response; } // 提交全局事务,并把提交结果返回TM @Override protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); // core.commit会逐个通知对应的RM把本地事务提交,里面包含异步提交和同步提交 response.setGlobalStatus(core.commit(request.getXid())); } 复制代码
// 3. 响应TM全局事务的回滚 @Override public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) { GlobalRollbackResponse response = new GlobalRollbackResponse(); response.setGlobalStatus(GlobalStatus.Rollbacking); // 模版模式 exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() { @Override public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response) throws TransactionException { try { // 回滚核心逻辑 doGlobalRollback(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String .format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } @Override public void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response, TransactionException tex) { super.onTransactionException(request, response, tex); // may be appears StoreException outer layer method catch checkTransactionStatus(request, response); } @Override public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) { super.onException(request, response, rex); // may be appears StoreException outer layer method catch checkTransactionStatus(request, response); } }, request, response); return response; } // 回滚全局事务,并把回滚结果返回TM @Override protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); // 逐个通知所有RM回滚本地事务 response.setGlobalStatus(core.rollback(request.getXid())); } 复制代码
// TM上报全局事务状态给TC(目前只在Saga模式才有) @Override public GlobalReportResponse handle(GlobalReportRequest request, final RpcContext rpcContext) { GlobalReportResponse response = new GlobalReportResponse(); response.setGlobalStatus(request.getGlobalStatus()); // 模版模式 exceptionHandleTemplate(new AbstractCallback<GlobalReportRequest, GlobalReportResponse>() { @Override public void execute(GlobalReportRequest request, GlobalReportResponse response) throws TransactionException { // 核心处理逻辑 doGlobalReport(request, response, rpcContext); } }, request, response); return response; } @Override protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); // tc根据saga上报的globalStatus来决定事务提交还是回滚 response.setGlobalStatus(core.globalReport(request.getXid(), request.getGlobalStatus())); } 复制代码
// 5. 根据XID查询全局事务的状态 @Override public GlobalStatusResponse handle(GlobalStatusRequest request, final RpcContext rpcContext) { GlobalStatusResponse response = new GlobalStatusResponse(); response.setGlobalStatus(GlobalStatus.UnKnown); // 模版模式 exceptionHandleTemplate(new AbstractCallback<GlobalStatusRequest, GlobalStatusResponse>() { @Override public void execute(GlobalStatusRequest request, GlobalStatusResponse response) throws TransactionException { try { // 查询分布式事务状态 doGlobalStatus(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global status request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } @Override public void onTransactionException(GlobalStatusRequest request, GlobalStatusResponse response, TransactionException tex) { super.onTransactionException(request, response, tex); checkTransactionStatus(request, response); } @Override public void onException(GlobalStatusRequest request, GlobalStatusResponse response, Exception rex) { super.onException(request, response, rex); checkTransactionStatus(request, response); } }, request, response); return response; } @Override protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); // 返回分布式事务状态 response.setGlobalStatus(core.getStatus(request.getXid())); } 复制代码
- 分支事务的注册、上报
DefaultCoordinator与RM之间的交互:
- 分支事务的注册,生成BranchId,也就是分支事务ID;
- RM上报各阶段事务状态给TC服务;
// 分支事务注册 @Override public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) { BranchRegisterResponse response = new BranchRegisterResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() { @Override public void execute(BranchRegisterRequest request, BranchRegisterResponse response) throws TransactionException { try { // 执行分支事务注册 doBranchRegister(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String .format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } }, request, response); return response; } @Override protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); // 生成branchId返回给到RM,并在数据库添加一条分支事务 response.setBranchId( core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), request.getXid(), request.getApplicationData(), request.getLockKey())); } 复制代码
// 分支事务上报 @Override public BranchReportResponse handle(BranchReportRequest request, final RpcContext rpcContext) { BranchReportResponse response = new BranchReportResponse(); exceptionHandleTemplate(new AbstractCallback<BranchReportRequest, BranchReportResponse>() { @Override public void execute(BranchReportRequest request, BranchReportResponse response) throws TransactionException { try { // 执行上报逻辑 doBranchReport(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String .format("branch report request failed. xid=%s, branchId=%s, msg=%s", request.getXid(), request.getBranchId(), e.getMessage()), e); } } }, request, response); return response; } @Override protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId())); // 修改分支事务的状态 core.branchReport(request.getBranchType(), request.getXid(), request.getBranchId(), request.getStatus(), request.getApplicationData()); } 复制代码
- 全局锁的查询
DefaultCoordinator另外一个功能就是记录分布式事务的全局锁,这个全局锁在两种情况下会存在:
- 两个分布式事务同时操作同一条记录的时候,会存在互斥的情况;依据是同一个主键ID;
- 使用了@GlobalLock的时候,update、delete、select for update操作的数据与其他分布式事务操作的数据是同一个主键ID,也会形成互斥;
// 查询全局锁 @Override public GlobalLockQueryResponse handle(GlobalLockQueryRequest request, final RpcContext rpcContext) { GlobalLockQueryResponse response = new GlobalLockQueryResponse(); // 模版模式 exceptionHandleTemplate(new AbstractCallback<GlobalLockQueryRequest, GlobalLockQueryResponse>() { @Override public void execute(GlobalLockQueryRequest request, GlobalLockQueryResponse response) throws TransactionException { try { // 执行查询逻辑 doLockCheck(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String .format("global lock query request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } }, request, response); return response; } @Override protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); // 查询是否存在全局锁 response.setLockable( core.lockQuery(request.getBranchType(), request.getResourceId(), request.getXid(), request.getLockKey())); } 复制代码
小结
至此,我们已经简单介绍了TC在分布式事务中所承担的主要任务了:
- 定时任务,实现提交、回滚操作的重试以及异步化,检查分布式事务是否超时,以及定时清理undolog;
- 与TM通信完成分布式事务的开启、提交或回滚;
- 与RM通信实现分支事务的注册与上报;
- 实现全局锁;