Seata Server在分布式事务中的主要任务

简介: Seata Server在分布式事务中的主要任务

概述

Seata Server作为TC,在实现分布式事务中起到了至关重要的作用。以下是官方术语表中这对TC的介绍:

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

那么TC是如何运行来维护全局和分支事务的状态的呢,它通过怎样的机制来驱动全局事务提交或回滚的呢?

TC的主要工作

在TC服务的代码中,我们可以重点查看io.seata.server.coordinator.DefaultCoordinator,它是TC服务在分布式事务中的核心处理器,TC服务在分布式事务中的大部分处理工作都可以在里面看到。

  • 定时任务

DefaultCoordinator在初始化是就启动了5个定时任务:

  1. 每秒查询需要回滚的全局事务,调用指定的分支尝试回滚,达到驱动全局事务回滚的目的;
  2. 每秒查询需要提交的全局事务,调用指定的分支尝试提交,达到驱动全局事务提交的目的;
  3. 每秒查询异步提交状态的全局事务,调用对应的分支尝试异步提交;
  4. 每秒检查分布式事务是否超时,超时就设置为需要回滚的状态;
  5. 定时删除超出7天的undolog,默认是启动后3分钟开始,每天清理一次;
public void init() {
        // 每秒钟检查需要回滚的任务,并向RM发起回滚操作
        retryRollbacking.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
            ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        // 每秒检查需要提交的任务,并向RM发起提交操作
        retryCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
            COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        // 每秒检查需要异步提交的任务,向RM发起异步提交操作
        asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
            ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        // 每秒检查分布式事务是否超时,如果超时,那么设置成需要回滚
        timeoutCheck.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
            TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
        // 服务启动成功3分钟后执行一次,此后间隔24小时执行一次,把超过7天的undolog删除
        undoLogDelete.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
            UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }
复制代码
  • 全局事务的开启、提交、回滚、上报以及状态查询

DefaultCoordinator另外还需要处理TM的请求,包括以下功能的处理:

  1. 响应TM的全局事务的开启,生成XID返回给TM;
  2. 响应TM全局事务的提交;
  3. 响应TM全局事务的回滚;
  4. TM上报全局事务状态给TC(目前只在Saga模式才有);
  5. 根据XID查询全局事务的状态;
// 1.TM发出请求开启分布式事务,TC接收请求,生成xid
@Override
    public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                try {
                    // 调用真正的处理逻辑
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                        e);
                }
            }
        }, request, response);
        return response;
    }
@Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
            throws TransactionException {
        // 通过applicationId、transactionServiceGroup、transactionName生成xid,并设置进响应数据
        // 怎么生成的先不管
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                request.getTransactionName(), request.getTimeout()));
        // 剩下的都是打日志
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                    rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }
复制代码
// 2. 响应TM全局事务的提交;
@Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        // 同样是模版模式
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                    // 主要执行逻辑
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
                        e);
                }
            }
            @Override
            public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,
                                               TransactionException tex) {
                super.onTransactionException(request, response, tex);
                checkTransactionStatus(request, response);
            }
            @Override
            public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
                super.onException(request, response, rex);
                checkTransactionStatus(request, response);
            }
        }, request, response);
        return response;
    }
// 提交全局事务,并把提交结果返回TM
@Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // core.commit会逐个通知对应的RM把本地事务提交,里面包含异步提交和同步提交
        response.setGlobalStatus(core.commit(request.getXid()));
    }
复制代码
// 3. 响应TM全局事务的回滚
@Override
    public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {
        GlobalRollbackResponse response = new GlobalRollbackResponse();
        response.setGlobalStatus(GlobalStatus.Rollbacking);
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {
            @Override
            public void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)
                throws TransactionException {
                try {
                    // 回滚核心逻辑
                    doGlobalRollback(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }
            @Override
            public void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                               TransactionException tex) {
                super.onTransactionException(request, response, tex);
                // may be appears StoreException outer layer method catch
                checkTransactionStatus(request, response);
            }
            @Override
            public void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {
                super.onException(request, response, rex);
                // may be appears StoreException outer layer method catch
                checkTransactionStatus(request, response);
            }
        }, request, response);
        return response;
    }
// 回滚全局事务,并把回滚结果返回TM
@Override
    protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // 逐个通知所有RM回滚本地事务
        response.setGlobalStatus(core.rollback(request.getXid()));
    }
复制代码
// TM上报全局事务状态给TC(目前只在Saga模式才有)
@Override
    public GlobalReportResponse handle(GlobalReportRequest request, final RpcContext rpcContext) {
        GlobalReportResponse response = new GlobalReportResponse();
        response.setGlobalStatus(request.getGlobalStatus());
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<GlobalReportRequest, GlobalReportResponse>() {
            @Override
            public void execute(GlobalReportRequest request, GlobalReportResponse response)
                throws TransactionException {
                // 核心处理逻辑
                doGlobalReport(request, response, rpcContext);
            }
        }, request, response);
        return response;
    }
@Override
    protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // tc根据saga上报的globalStatus来决定事务提交还是回滚
        response.setGlobalStatus(core.globalReport(request.getXid(), request.getGlobalStatus()));
    }
复制代码
// 5. 根据XID查询全局事务的状态
@Override
    public GlobalStatusResponse handle(GlobalStatusRequest request, final RpcContext rpcContext) {
        GlobalStatusResponse response = new GlobalStatusResponse();
        response.setGlobalStatus(GlobalStatus.UnKnown);
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<GlobalStatusRequest, GlobalStatusResponse>() {
            @Override
            public void execute(GlobalStatusRequest request, GlobalStatusResponse response)
                throws TransactionException {
                try {
                    // 查询分布式事务状态
                    doGlobalStatus(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("global status request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
                        e);
                }
            }
            @Override
            public void onTransactionException(GlobalStatusRequest request, GlobalStatusResponse response,
                                               TransactionException tex) {
                super.onTransactionException(request, response, tex);
                checkTransactionStatus(request, response);
            }
            @Override
            public void onException(GlobalStatusRequest request, GlobalStatusResponse response, Exception rex) {
                super.onException(request, response, rex);
                checkTransactionStatus(request, response);
            }
        }, request, response);
        return response;
    }
@Override
    protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // 返回分布式事务状态
        response.setGlobalStatus(core.getStatus(request.getXid()));
    }
复制代码
  • 分支事务的注册、上报

DefaultCoordinator与RM之间的交互:

  1. 分支事务的注册,生成BranchId,也就是分支事务ID;
  2. RM上报各阶段事务状态给TC服务;
// 分支事务注册
@Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    // 执行分支事务注册
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
                }
            }
        }, request, response);
        return response;
    }
@Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // 生成branchId返回给到RM,并在数据库添加一条分支事务
        response.setBranchId(
                core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                        request.getXid(), request.getApplicationData(), request.getLockKey()));
    }
复制代码
// 分支事务上报 
@Override
    public BranchReportResponse handle(BranchReportRequest request, final RpcContext rpcContext) {
        BranchReportResponse response = new BranchReportResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchReportRequest, BranchReportResponse>() {
            @Override
            public void execute(BranchReportRequest request, BranchReportResponse response)
                throws TransactionException {
                try {
                    // 执行上报逻辑
                    doBranchReport(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("branch report request failed. xid=%s, branchId=%s, msg=%s", request.getXid(),
                            request.getBranchId(), e.getMessage()), e);
                }
            }
        }, request, response);
        return response;
    }
@Override
    protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        // 修改分支事务的状态
        core.branchReport(request.getBranchType(), request.getXid(), request.getBranchId(), request.getStatus(),
                request.getApplicationData());
    }
复制代码
  • 全局锁的查询

DefaultCoordinator另外一个功能就是记录分布式事务的全局锁,这个全局锁在两种情况下会存在:

  1. 两个分布式事务同时操作同一条记录的时候,会存在互斥的情况;依据是同一个主键ID;
  2. 使用了@GlobalLock的时候,update、delete、select for update操作的数据与其他分布式事务操作的数据是同一个主键ID,也会形成互斥;
// 查询全局锁
@Override
    public GlobalLockQueryResponse handle(GlobalLockQueryRequest request, final RpcContext rpcContext) {
        GlobalLockQueryResponse response = new GlobalLockQueryResponse();
        // 模版模式
        exceptionHandleTemplate(new AbstractCallback<GlobalLockQueryRequest, GlobalLockQueryResponse>() {
            @Override
            public void execute(GlobalLockQueryRequest request, GlobalLockQueryResponse response)
                throws TransactionException {
                try {
                    // 执行查询逻辑
                    doLockCheck(request, response, rpcContext);
                } catch (StoreException e) {
                    throw new TransactionException(TransactionExceptionCode.FailedStore, String
                        .format("global lock query request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
                        e);
                }
            }
        }, request, response);
        return response;
    }
@Override
    protected void doLockCheck(GlobalLockQueryRequest request, GlobalLockQueryResponse response, RpcContext rpcContext)
            throws TransactionException {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        // 查询是否存在全局锁
        response.setLockable(
                core.lockQuery(request.getBranchType(), request.getResourceId(), request.getXid(), request.getLockKey()));
    }
复制代码

小结

至此,我们已经简单介绍了TC在分布式事务中所承担的主要任务了:

  • 定时任务,实现提交、回滚操作的重试以及异步化,检查分布式事务是否超时,以及定时清理undolog;
  • 与TM通信完成分布式事务的开启、提交或回滚;
  • 与RM通信实现分支事务的注册与上报;
  • 实现全局锁;
相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
25天前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
173 7
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
74 6
|
2月前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
43 6
|
2月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
32 1
|
2月前
|
存储 NoSQL Java
Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
【10月更文挑战第29天】Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
103 1
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
63 1
|
4月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
5月前
|
资源调度 Java 调度
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
|
5月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
106 2