阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 阿里中间件seata源码剖析一:聊聊RM和TM客户端初始化

微信图片_20221212145948.jpg这篇开始,介绍一下seata的源代码。我们再看一下seata官方TCC模式下的这张图片:

微信图片_20221212150008.png

而RM和TC的职责如下:

Transaction Coordinator(TC): Maintain status of global and branch transactions, drive the global commit or rollback.
Resource Manager(RM): Manage resources that branch transactions working on, talk to TC for registering branch transactions and reporting status of branch transactions, and drive the branch transaction commit or rollback.

简单概括就是RM管理分支事务,而TC管理着全局事务和分支事务的状态。


说明:本文的代码基于文件方式注册TC(seata-server)来讲解,基于seata最新代码。


初始化流程


我们启动一个集成seata的服务,这个服务自身就是一个RM,启动后第一步就是注册到TC,我们看一下启动日志:

2020-08-30 16:51:32.946  INFO 1108808 --- [eoutChecker_1_1] i.s.c.r.netty.NettyClientChannelManager  : will connect to 192.168.59.132:8091
2020-08-30 16:51:32.950  INFO 1108808 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory  : NettyPool create channel to transactionRole:TMROLE,address:192.168.59.132:8091,msg:< RegisterTMRequest{applicationId='order-server', transactionServiceGroup='my_test_tx_group'} >
2020-08-30 16:51:33.276  INFO 1108808 --- [eoutChecker_1_1] i.s.c.rpc.netty.TmNettyRemotingClient    : register TM success. client version:1.3.0, server version:1.3.0,channel:[id: 0xfabece13, L:/192.168.59.1:63274 - R:/192.168.59.132:8091]
2020-08-30 16:51:33.277  INFO 1108808 --- [eoutChecker_1_1] i.s.core.rpc.netty.NettyPoolableFactory  : register success, cost 261 ms, version:1.3.0,role:TMROLE,channel:[id: 0xfabece13, L:/192.168.59.1:63274 - R:/192.168.59.132:8091]

RM注册为TC的客户端,我们就从这个客户端初始化开始,这个逻辑在类GlobalTransactionScanner中,URL类图如下:

微信图片_20221212150114.png

GlobalTransactionScanner这个类的初始化在GlobalTransactionAutoConfiguration类,这个类是一个spring的Configuration 类,里面定义了Bean(GlobalTransactionScanner),代码如下:

@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {
  private final ApplicationContext applicationContext;
  private final SeataProperties seataProperties;
  public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext,
      SeataProperties seataProperties) {
    this.applicationContext = applicationContext;
    this.seataProperties = seataProperties;
  }
  @Bean
  public GlobalTransactionScanner globalTransactionScanner() {
    String applicationName = applicationContext.getEnvironment()
        .getProperty("spring.application.name");//来自application.yml参数(见下面注意)
    String txServiceGroup = seataProperties.getTxServiceGroup();//seataProperties定义了txServiceGroup变量,这个变量对应application.yml参数(见下面注意)
    if (StringUtils.isEmpty(txServiceGroup)) {
      txServiceGroup = applicationName + "-fescar-service-group";
      seataProperties.setTxServiceGroup(txServiceGroup);
    }
    return new GlobalTransactionScanner(applicationName, txServiceGroup);
  }
}

注意:上面方法中的2个参数正是来自我们服务中的application.yml文件,代码如下:

spring:
    application:
        name: storage-server
    cloud:
        alibaba:
            seata:
                tx-service-group: my_test_tx_group

上面的globalTransactionScanner这个bean实现了spring的InitializingBean接口,初始化结束后会向TC注册客户端,代码如下:

public void afterPropertiesSet() {
    if (disableGlobalTransaction) {//在seata的file.conf文件中配置
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global transaction is disabled.");
        }
        return;
    }
    initClient();
}
private void initClient() {
    //省略部分源代码
    //init TM
    TMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    //省略部分源代码
    registerSpringShutdownHook();
}

上面的代码有2个部分,TM和RM的初始化,这2个初始化就是连接TC的过程。介绍TM和RM初始化之前,我们再看一下上一讲中我们基于eureka注册中心的微服务架构图:

微信图片_20221212150254.png

从图上可以看出,订单中心(order-server)就是一个TM,同时也是一个RM。


TM初始化


我们看一下TM的初始化代码:

public class TMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        tmNettyRemotingClient.init();
    }
}

这样,TM客户端就去连seata-server了(TC)。

public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();
    }
}

下面的注册方法一个是向注册了接收响应消息类型,另一个是注册了心跳机制:

private void registerProcessor() {
    // 1.registry TC response processor
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    // 2.registry heartbeat message processor
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

接着init方法去连接TC,这个逻辑在类AbstractNettyRemotingClient,代码如下:

public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());//这里的transactionServiceGroup是从GlobalTransactionScanner传过来的,值是my_test_tx_group
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    if (NettyClientConfig.isEnableClientBatchSendRequest()) {//file.conf中定义enableClientBatchSendRequest = true,看第3部分讲解
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();
    clientBootstrap.start();
}

上面这段代码是一个核心的逻辑,我们分下面3步来解读:

1.我们首先看一下上面的clientChannelManager.reconnect方法,这个方法在一个定时执行器中,会定时去执行。这段代码在NettyClientChannelManager类,

void reconnect(String transactionServiceGroup) {
    List<String> availList = null;
    try {
        availList = getAvailServerList(transactionServiceGroup);
    } catch (Exception e) {
        LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
        return;
    }
    if (CollectionUtils.isEmpty(availList)) {
        String serviceGroup = RegistryFactory.getInstance()
                                             .getServiceGroup(transactionServiceGroup);
        LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);
        return;
    }
    for (String serverAddress : availList) {
        try {
            acquireChannel(serverAddress);
        } catch (Exception e) {
            LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
        }
    }
}

上面的getAvailServerList是通过transactionServiceGroup这个属性(本文属性值是my_test_tx_group)来查找seata-server集群地址列表,看下面的file.conf这段配置就很容易明白了。逻辑就是通过my_test_tx_group拼接出vgroupMapping.my_test_tx_group,然后找到这个属性值(这里是seata-server),然后属性值拼接出seata-server.grouplist,从而查找seata-server集群列表。详细代码就不贴了。

service {
  #transaction service group mapping
  vgroupMapping.my_test_tx_group = "seata-server"
  #only support when registry.type=file, please don't set multiple addresses
  seata-server.grouplist = "192.168.59.132:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

上面获取的availList(seata-server集群地址列表)如果不空,则调用方法acquireChannel。acquireChannel方法首先判断连接是否存在,不存在,则创建连接:

Channel acquireChannel(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null) {//当前地址已经存在连接,直接返回
        channelToServer = getExistAliveChannel(channelToServer, serverAddress);
        if (channelToServer != null) {
            return channelToServer;
        }
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("will connect to " + serverAddress);
    }
    channelLocks.putIfAbsent(serverAddress, new Object());
    synchronized (channelLocks.get(serverAddress)) {
        return doConnect(serverAddress);
    }
}
private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;
private Channel doConnect(String serverAddress) {
    Channel channelToServer = channels.get(serverAddress);
    if (channelToServer != null && channelToServer.isActive()) {//当前地址已经存在连接,直接返回
        return channelToServer;
    }
    Channel channelFromPool;
    try {
        NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
        NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
        if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {//TM和RM的初始化流程都要走这段代码,如果是RM,则要set一下ResourceIds,还记得这个吗?看下面RM部分的讲解
            RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
            ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
        }
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        channels.put(serverAddress, channelFromPool);
    } catch (Exception exx) {
        LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
        throw new FrameworkException("can not register RM,err:" + exx.getMessage());
    }
    return channelFromPool;
}

上面nettyClientKeyPool.borrowObject方法就是从连接池中获取一个连接,seata在这里使用的连接池是commons-pool,读过源码的人肯定会似曾相识,我这类就不再介绍了,感兴趣的看这篇文章《lettuce连接池很香,撸撸它的源代码》,里面有介绍。

2.接着我们讲一下super.init()方法,这个方法在父类AbstractNettyRemoting,代码如下:

public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                if (entry.getValue().isTimeout()) {
                    futures.remove(entry.getKey());
                    entry.getValue().setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

这个方法非常简单,定时任务不断检测消息发送结果,如果是超时,则移除消息,然后把消息结果置为空。

3.最后我们看一下clientBootstrap.start()方法:

public void start() {
    if (this.defaultEventExecutorGroup == null) {//defaultEventExecutorGroup初始化
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
            new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                nettyClientConfig.getClientWorkerThreads()));
    }
    this.bootstrap.group(this.eventLoopGroupWorker).channel(//对连接的配置
        nettyClientConfig.getClientChannelClazz()).option(
        ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
        ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
        ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
        nettyClientConfig.getClientSocketRcvBufSize());
    if (nettyClientConfig.enableNative()) {
        if (PlatformDependent.isOsx()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("client run on macOS");
            }
        } else {//非mac系统则配置epoll模式和TCP快速确认机制
            bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                .option(EpollChannelOption.TCP_QUICKACK, true);
        }
    }
    bootstrap.handler(//添加handlers
        new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(
                    new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                        nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                        nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    .addLast(new ProtocolV1Decoder())
                    .addLast(new ProtocolV1Encoder());
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers);
                }
            }
        });
    if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyClientBootstrap has started");
    }
}

上面的代码也比较简单,就是使用本地的配置来初始化netty的bootstrap。这些配置在file.conf这个文件中。


RM初始化


RM的初始化跟TM基本一样,我们从RMClient.init(applicationId, txServiceGroup)方法讲起。

public static void init(String applicationId, String transactionServiceGroup) {
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
    rmNettyRemotingClient.init();
}

下面是RmNettyRemotingClient中的init方法,跟TM的代码是一样的:

public void init() {
    // registry processor
    registerProcessor();
    if (initialized.compareAndSet(false, true)) {
        super.init();
    }
}

RM注册的响应消息类型比较多,包括分支事务提交、回滚、写undo_log、注册TC响应和向TC注册心跳,跟TM相比,这里的处理更复杂一些,前面的几篇文章我也讲过了seata的AT模式和TCC模式,无论哪种模式,其实RM的处理逻辑都是最多的。

private void registerProcessor() {
    // 1.registry rm client handle branch commit processor
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
    // 2.registry rm client handle branch commit processor
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
    // 3.registry rm handler undo log processor
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
    // 4.registry TC response processor
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
    // 5.registry heartbeat message processor
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

上面的super.init()就是调用父类AbstractNettyRemotingClient的方法了,跟TmNettyRemotingClient是一个父类,所以不再重复贴代码了。上面留了一个问题就是resourceId,来源依然在RmNettyRemotingClient这个类中,代码如下:

protected Function<String, NettyPoolKey> getPoolKeyFunction() {
    return (serverAddress) -> {
        String resourceIds = getMergedResourceKeys();
        if (resourceIds != null && LOGGER.isInfoEnabled()) {
            LOGGER.info("RM will register :{}", resourceIds);
        }
        RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
        message.setResourceIds(resourceIds);
        return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message);
    };
}
public String getMergedResourceKeys() {
    Map<String, Resource> managedResources = resourceManager.getManagedResources();
    Set<String> resourceIds = managedResources.keySet();
    if (!resourceIds.isEmpty()) {
        StringBuilder sb = new StringBuilder();
        boolean first = true;
        for (String resourceId : resource  Ids) {
            if (first) {
                first = false;
            } else {
                sb.append(DBKEYS_SPLIT_CHAR);
            }
            sb.append(resourceId);
        }
        return sb.toString();
    }
    return null;
}

在TCC模式下,跟踪代码后,发现有一段代码给ResourceId赋值了,赋值的代码正是2阶段提交的事务注解,比如:


@TwoPhaseBusinessAction(name = "storageApi", commitMethod = "commit", rollbackMethod = "rollback")

赋值的代码如下,可以看到TCC模式下resourceId其实就是2阶段提交的注解中的name值,其他模式暂时不做研究了,这段代码在DefaultRemotingParser类:

public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {
    RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);//这儿获得的Bean是在接口上有@LocalTCC注解的
    if (remotingBeanDesc == null) {
        return null;
    }
    remotingServiceMap.put(beanName, remotingBeanDesc);
    Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
    Method[] methods = interfaceClass.getMethods();
    if (remotingParser.isService(bean, beanName)) {
        try {
            //service bean, registry resource
            Object targetBean = remotingBeanDesc.getTargetBean();
            for (Method m : methods) {
                TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
                if (twoPhaseBusinessAction != null) {
                    TCCResource tccResource = new TCCResource();
                    tccResource.setActionName(twoPhaseBusinessAction.name());//resourceId是在这儿赋值的
                    //省略部分代码
                    DefaultResourceManager.get().registerResource(tccResource);
                }
            }
        } catch (Throwable t) {
            throw new FrameworkException(t, "parser remoting service error");
        }
    }
    //省略部分代码
    return remotingBeanDesc;
}

剩下的流程,RM和TM就一样了。


批量发送请求


在前面的TM初始化流程中,AbstractNettyRemotingClient类init方法有一段批量提交的代码,如下:

if (NettyClientConfig.isEnableClientBatchSendRequest()) {//file.conf中定义enableClientBatchSendRequest = true,看第3部分讲解
    mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
        MAX_MERGE_SEND_THREAD,
        KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(),
        new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
    mergeSendExecutorService.submit(new MergedSendRunnable());
}

因为配置文件中定义的是允许批量提交的,所以这段代码会走进去,这段代码就是定义了一个只有一个线程的线程池,执行批量提交请求的线程:

private class MergedSendRunnable implements Runnable {
    @Override
    public void run() {
        while (true) {
            synchronized (mergeLock) {
                try {
                    mergeLock.wait(MAX_MERGE_SEND_MILLS);
                } catch (InterruptedException e) {
                }
            }
            isSending = true;
            for (String address : basketMap.keySet()) {
                BlockingQueue<RpcMessage> basket = basketMap.get(address);
                if (basket.isEmpty()) {
                    continue;
                }
                MergedWarpMessage mergeMessage = new MergedWarpMessage();
                while (!basket.isEmpty()) {
                    RpcMessage msg = basket.poll();
                    mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                    mergeMessage.msgIds.add(msg.getId());
                }
                if (mergeMessage.msgIds.size() > 1) {
                    printMergeMessageLog(mergeMessage);
                }
                Channel sendChannel = null;
                try {
                    // send batch message is sync request, but there is no need to get the return value.
                    // Since the messageFuture has been created before the message is placed in basketMap,
                    // the return value will be obtained in ClientOnResponseProcessor.
                    sendChannel = clientChannelManager.acquireChannel(address);
                    AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                } catch (FrameworkException e) {
                    if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                        destroyChannel(address, sendChannel);
                    }
                    // fast fail
                    for (Integer msgId : mergeMessage.msgIds) {
                        MessageFuture messageFuture = futures.remove(msgId);
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(null);
                        }
                    }
                    LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                }
            }
            isSending = false;
        }
    }
    //省略printMergeMessageLog方法代码
}

上面的流程非常简单,在一个周期性的无线循环中,把basketMap中的消息队列取出来,把每个队列的消息都放到mergeMessage中,最后把mergeMessage发送出去,可以看到,就是一个批量发送。


总结


seata中TM和RM的初始化流程基本相同,不一样的有2点:注册响应消息类型、RM中需要增加ResourceId和分支事务handler。

TM和RM客户端使用netty跟TC(seata-server)建立连接,而连接池使用的是commons-pool的连接池。


相关文章
|
3月前
|
消息中间件 存储 NoSQL
阿里开源中间件一览
阿里开源中间件一览
157 2
|
4月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
消息中间件 中间件 Kafka
限时开源!阿里内部消息中间件合集:MQ+Kafka+体系图+笔记
近好多小伙伴说在准备金三银四的面试突击了,但是遇到消息中间件不知道该怎么学了,问我有没有成体系的消息中间件的学习方式。 额,有点不知所措,于是乎小编就想着做一次消息中间件的专题,归类整理了一些纯手绘知识体系图、面试以及相关的学习笔记。
226 1
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
消息中间件 安全 Java
全网首发!消息中间件神仙笔记,涵盖阿里十年技术精髓
消息中间件是分布式系统中的重要组件,在实际工作中常用消息中间件进行系统间数据交换,从而解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。
|
缓存 NoSQL 容灾
《Java应用提速(速度与激情)》——六、阿里中间件提速
《Java应用提速(速度与激情)》——六、阿里中间件提速
|
消息中间件 NoSQL Dubbo
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
一转眼,都2023年了,你是否在满意的公司?拿着理想的薪水? 虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因: 第一,“知其然不知其所以然”。做了多年技术,开发了很多业务应用,但似乎并未思考过种种技术选择背后的逻辑。所以,他无法向面试官展现出自己未来技术能力的成长潜力。面试官也不会放心把具有一定深度的任务交给他。 第二,知识碎片化,不成系统。在面试中,面试者似乎无法完整、清晰地描述自己所开发的系统,或者使用的相关技术。
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。
下一篇
云函数