阿里中间件seata源码剖析六:TCC模式中2阶段提交实现

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
简介: 阿里中间件seata源码剖析六:TCC模式中2阶段提交实现

上篇文章中,我们以TCC模式的demo为例,讲解了seata中全局事务的开启。在这个demo中,TM作为一个全局事务的管理者,会依次调用订单服务、账户服务和库存服务,如果其中一个服务抛出异常,TM就会调用失败抛出异常,这时就会通知TC,进而TC会通知RM进行事务的回滚。如果TM没有异常,就会通知TC进行事务的commit,进而TC通知RM进行事务提交。


这个过程我用下面的时序图来表示:

微信图片_20221212155005.png具体在RM中,上面的2阶段提交是怎么实现的呢?我们先来看一下账户服务中2阶段提交的接口,这个接口是供账户服务的Controller调用,代码如下:

@LocalTCC
public interface AccountService {
    /**
     * 扣减账户余额
     * @param xid the global transactionId
     * @param userId 用户id
     * @param money 金额
     * @return prepare是否成功
     */
    @TwoPhaseBusinessAction(name = "accountApi", commitMethod = "commit", rollbackMethod = "rollback")
    boolean decrease(String xid, Long userId, BigDecimal money);
    /**
     * Commit boolean.
     *
     * @param actionContext the global transactionId
     * @return the boolean
     */
    boolean commit(BusinessActionContext actionContext);
    /**
     * Rollback boolean.
     *
     * @param actionContext the global transactionId
     * @return the boolean
     */
    boolean rollback(BusinessActionContext actionContext);
}

上面这个接口中有3个注解,@FeignClient我们都比较熟悉了,用eureka做注册中心服务之间的调用,这个就是给TM和TC提供rpc的接口。@LocalTCC是加在类上面的注解,@TwoPhaseBusinessAction是加在方法上面的注解。


之前我们在文章《阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化》,RM初始化的时候,会向TC注册一个分支事务,这就是通过@LocalTCC这个注解,来发起注册的。而注册请求中等的resourceId,其实就是@TwoPhaseBusinessAction注解中的name属性值。


下面我们再看一段全局事务提交成功的seata server的日志:

2020-09-23 01:08:31.624  INFO --- [Thread_1_43_500] i.s.s.coordinator.DefaultCoordinator     : Begin new global transaction applicationId: order-server,transactionServiceGroup: my_test_tx_group, transactionName: create(io.seata.sample.entity.Order),timeout:60000,xid:192.168.59.143:8091:52019904118321152
2020-09-23 01:08:31.679  INFO --- [Thread_1_44_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019904344813568, resourceId = orderApi ,lockKeys = null
2020-09-23 01:08:31.680  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=orderApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.483  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=storageApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.483  INFO --- [Thread_1_45_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019907721228288, resourceId = storageApi ,lockKeys = null
2020-09-23 01:08:32.559  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,branchType=TCC,resourceId=accountApi,lockKey=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.559  INFO --- [Thread_1_46_500] i.seata.server.coordinator.AbstractCore  : Register branch successfully, xid = 192.168.59.143:8091:52019904118321152, branchId = 52019908039995392, resourceId = accountApi ,lockKeys = null
2020-09-23 01:08:32.579  INFO --- [LoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler         : SeataMergeMessage xid=192.168.59.143:8091:52019904118321152,extraData=null
,clientIp:192.168.59.1,vgroup:my_test_tx_group
2020-09-23 01:08:32.627  INFO --- [Thread_1_47_500] io.seata.server.coordinator.DefaultCore  : Committing global transaction is successfully done, xid = 192.168.59.143:8091:52019904118321152.

TM通知TC事务状态


上一篇文章我们讲全局事务的开启,讲到了开启全局事务的方法调用是在类TransactionalTemplate的execute方法,代码如下:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1 get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    // 1.2 Handle the Transaction propatation and the branchType
    //省略事务传播机制的代码
        try {
            // 2. begin transaction
            beginTransaction(txInfo, tx);//开启事务
            Object rs = null;
            try {
                // Do Your Business
                rs = business.execute();
            } catch (Throwable ex) {
                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo, tx, ex);//全局事务异常回滚
                throw ex;
            }
            // 4. everything is fine, commit.
            commitTransaction(tx);//提交全局事务
            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();//预留钩子,没有做什么操作
            cleanUp();
        }
    } finally {
        tx.resume(suspendedResourcesHolder);
    }
}

上面的方法中,捕获到异常后,会进行全局事务的回滚,执行方法completeTransactionAfterThrowing,代码如下:

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
    //roll back
    if (txInfo != null && txInfo.rollbackOn(originalException)) {
        try {
            rollbackTransaction(tx, originalException);
        } catch (TransactionException txe) {
            // Failed to rollback
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, originalException);
        }
    } else {
        // not roll back on this exception, so commit
        commitTransaction(tx);
    }
}
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
    triggerBeforeRollback();
    tx.rollback();//回滚全局事务
    triggerAfterRollback();
    // 3.1 Successfully rolled back
    throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
        ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}

上面tx.rollback走的是DefaultGlobalTransaction的rollback方法,代码如下:

public void rollback() throws TransactionException {
    //省略部分代码
    int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;//重试次数是在文件里面配置的,默认5次
    try {
        while (retry > 0) 
            try {
                status = transactionManager.rollback(xid);//回滚操作
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global rollback", ex);
                }
            }
        }
    }//省略部分源代码
}

上面代码中rollback调用了DefaultTransactionManager的rollback方法,最终调用netty通知TC做rollback操作,代码如下:

public GlobalStatus rollback(String xid) throws TransactionException {
    GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
    globalRollback.setXid(xid);
    GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);//跟开启事务一样,这里调用netty通知TC全局事务回滚
    return response.getGlobalStatus();
}

而通知TC全局事务提交的方法,跟上面的流程完全一样,从invoke方法中commitTransaction方法说起,代码如下:

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeCommit();
        tx.commit();
        triggerAfterCommit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);
    }
}

上面的tx.commit()调用DefaultGlobalTransaction的commit方法,代码如下:

public void commit() throws TransactionException {
    //省略部分代码
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                retry--;
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    }//省略部分代码
}

DefaultTransactionManager的commit方法,最终调用netty通知TC做commit操作,代码如下:

public GlobalStatus commit(String xid) throws TransactionException {
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
}

而上面的类关系调用我们也可以用之前开启全局事务的UML类图来表示:

微信图片_20221212155921.png

TC通知RM分支事务提交


上面讲的全局事务的提交和回滚代码中,TM向RM发送了2个消息,GlobalCommitRequest和GlobalRollbackRequest,我们来看TC端是怎么处理这2个请求的。


这次我们把上篇文张的第二张UML类图贴出来,可以看到跟GlobalBeginRequest同一个继承关系的还有2个,GlobalCommitRequest和GlobalRollbackRequest,这2个就是TC对事务提交和回滚消息的处理。

微信图片_20221212155948.png

我们先来看全局事务提交,在GlobalCommitRequest的handle方法,代码如下:

public AbstractTransactionResponse handle(RpcContext rpcContext) {
    return handler.handle(this, rpcContext);//这个方法调用了AbstractTCInboundHandler类的handle方法
}
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
    GlobalCommitResponse response = new GlobalCommitResponse();
    response.setGlobalStatus(GlobalStatus.Committing);
    exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
        @Override
        public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
            throws TransactionException {
            try {
                doGlobalCommit(request, response, rpcContext);//这里调用了DefaultCoordinator类的doGlobalCommit方法
            } catch (StoreException e) {
                throw new TransactionException(TransactionExceptionCode.FailedStore,
                    String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()),
                    e);
            }
        }
        //省略部分代码
    }, request, response);
    return response;
}

DefaultCoordinator类的doGlobalCommit方法如下:

protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
    throws TransactionException {
    response.setGlobalStatus(core.commit(request.getXid())/**DefaultCore的commit方法**/);
}

上面DefaultCore的commit方法看一下

public GlobalStatus commit(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus
    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        // the lock should release after branch commit
        // Highlight: Firstly, close the session, then no more branch can be registered.
        globalSession.closeAndClean();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            globalSession.changeStatus(GlobalStatus.Committing);
            return true;
        }
        return false;
    });
    if (!shouldCommit) {
        return globalSession.getStatus();
    }
    if (globalSession.canBeCommittedAsync()) {
        globalSession.asyncCommit();
        return GlobalStatus.Committed;
    } else {
        doGlobalCommit(globalSession, false);
    }
    return globalSession.getStatus();
}

看看上面的doGlobalCommit方法

public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start committing event
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
        for (BranchSession branchSession : globalSession.getSortedBranches()) {//取出所有分支事务,然后提交
            BranchStatus currentStatus = branchSession.getStatus();
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
                globalSession.removeBranch(branchSession);
                continue;
            }
            try {
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);//这儿是提交分支事务的代码
            //省略部分代码
            } catch (Exception ex) {
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                    new String[] {branchSession.toString()});
                if (!retrying) {
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
        }
        if (globalSession.hasBranch()) {
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    if (success) {
        SessionHelper.endCommitted(globalSession);
        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(),
            globalSession.getStatus()));
        LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
    }
    return success;
}

上面的提交分支事务的方法在AbstractCore类,代码如下:

public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
    try {
        BranchCommitRequest request = new BranchCommitRequest();
        request.setXid(branchSession.getXid());
        request.setBranchId(branchSession.getBranchId());
        request.setResourceId(branchSession.getResourceId());
        request.setApplicationData(branchSession.getApplicationData());
        request.setBranchType(branchSession.getBranchType());
        return branchCommitSend(request, globalSession, branchSession);
    } catch (IOException | TimeoutException e) {
        throw new BranchTransactionException(FailedToSendBranchCommitRequest,
                String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(),
                        branchSession.getBranchId()), e);
    }
}

下面这个方法就是向RM发送分支提交事务的请求

protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,
                                        BranchSession branchSession) throws IOException, TimeoutException {
    BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(
            branchSession.getResourceId(), branchSession.getClientId(), request);
    return response.getBranchStatus();
}

RM处理TC提交事务请求


RM收到这个请求后,是怎么处理的呢?还记得《阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化》中RM初始化吗,RM初始化的适合会调用AbstractNettyRemotingClient的构造函数,代码如下:

public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
                                   ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
    super(messageExecutor);
    this.transactionRole = transactionRole;
    clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
    clientBootstrap.setChannelHandlers(new ClientHandler());//客户端处理请求的handler
    clientChannelManager = new NettyClientChannelManager(
        new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
}

上面的ClientHandler就是处理收到的请求的,看一下channelRead方法,代码如下:

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (!(msg instanceof RpcMessage)) {
        return;
    }
    processMessage(ctx, (RpcMessage) msg);
}

上面的方法使用RmBranchCommitProcessor方法处理读入的请求,代码如下:

public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
    Object msg = rpcMessage.getBody();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("rm client handle branch commit process:" + msg);
    }
    handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
}
private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
    BranchCommitResponse resultMessage;
    resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);//处理提交请求
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("branch commit result:" + resultMessage);
    }
    try {
        this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
    } catch (Throwable throwable) {
        LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
    }
}

上面提交事务提交请求的方法用的是AbstractRMHandler类,代码如下:

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    if (!(request instanceof AbstractTransactionRequestToRM)) {
        throw new IllegalArgumentException();
    }
    AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
    transactionRequest.setRMInboundMessageHandler(this);
    return transactionRequest.handle(context);
}

上面调用BranchCommitRequest的handle方法,代码如下:

public AbstractTransactionResponse handle(RpcContext rpcContext) {
    return handler.handle(this);
}

这里调用了DefaultRMHandler的handle方法,代码如下:

public BranchCommitResponse handle(BranchCommitRequest request) {
    return getRMHandler(request.getBranchType()).handle(request);
}

最后调用了AbstractRMHandler类的handle方法:

public BranchCommitResponse handle(BranchCommitRequest request) {
    BranchCommitResponse response = new BranchCommitResponse();
    exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
        @Override
        public void execute(BranchCommitRequest request, BranchCommitResponse response)
            throws TransactionException {
            doBranchCommit(request, response);
        }
    }, request, response);
    return response;
}
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
    }
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
        applicationData);//调用TCCResourceManager的branchCommit方法
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Branch commit result: " + status);
    }
}

TCCResourceManager的branchCommit方法代码如下:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
    if (tccResource == null) {
        throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
    }
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    if (targetTCCBean == null || commitMethod == null) {
        throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
    }
    try {
        //BusinessActionContext
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
            applicationData);
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);//最终触发了两阶段的commit方法
        LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
        boolean result;
        if (ret != null) {
            if (ret instanceof TwoPhaseResult) {
                result = ((TwoPhaseResult)ret).isSuccess();
            } else {
                result = (boolean)ret;
            }
        } else {
            result = true;
        }
        return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
    } catch (Throwable t) {
        String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
        LOGGER.error(msg, t);
        return BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
}

跟踪到这里,我们就找到了TwoPhaseBusinessAction注解中定义的commitMethod。这一段代码的调用我用如下的UML类图来表示一下:

微信图片_20221212160429.png

总结


本文我介绍了TCC模式中的2阶段提交,最终找到了触发TwoPhaseBusinessAction注解中commitMethod和rollbackMethod方法的代码逻辑。


通过本文的代码跟踪我们更加了解seata的交互处理方法,TC端处理请求的方法在ServerHandler类,它是AbstractNettyRemotingServer的内部类,而RM处理通知的方法在ClientHandler类,它是AbstractNettyRemotingClient的内部类。


ServerHandler类和ClientHandler类都是在处理Processor,它的抽象是RemotingProcessor接口,TC和TM、RM的处理都有具体的实现。这里给出所有的实现类,我把它分成了客户端相关的、server端相关、RM相关和TM相关的,UML类图如下:

微信图片_20221212160451.png

相关文章
|
19天前
|
存储 NoSQL 架构师
阿里面试:聊聊 CAP 定理?哪些中间件是AP?为什么?
本文深入探讨了分布式系统中的“不可能三角”——CAP定理,即一致性(C)、可用性(A)和分区容错性(P)三者无法兼得。通过实例分析了不同场景下如何权衡CAP,并介绍了几种典型分布式中间件的CAP策略,强调了理解CAP定理对于架构设计的重要性。
50 4
|
23天前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
41 5
|
29天前
Seata框架在AT模式下是如何保证数据一致性的?
通过以上这些机制的协同作用,Seata 在 AT 模式下能够有效地保证数据的一致性,确保分布式事务的可靠执行。你还可以进一步深入研究 Seata 的具体实现细节,以更好地理解其数据一致性保障的原理。
40 3
|
6月前
|
Apache 开发者
Apache Seata 如何解决 TCC 模式的幂等、悬挂和空回滚问题
【6月更文挑战第8天】Apache Seata 是一款分布式事务框架,解决TCC模式下的幂等、悬挂和空回滚问题。通过记录事务状态处理幂等,设置超时机制避免悬挂,明确标记Try操作成功来处理空回滚。Seata 提供丰富配置和管理功能,确保分布式事务的可靠性和效率,支持复杂事务处理场景,为企业业务发展提供支持。
242 7
|
6月前
|
消息中间件 存储 NoSQL
阿里开源中间件一览
阿里开源中间件一览
425 2
|
7月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
120 0
|
6月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1746 0
|
5月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
285 3
|
2月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
128 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
4月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】