【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么

本文接着上文中的Seata Client聊的GlobalTransactionScanner来聊一聊Seata Client 如何 与Seata Server建立连接、通信?

PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

二、概述

在前文(【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么),我们聊了随着Spring容器初始化完毕,会调用GlobalTransactionScanner的初始化逻辑(即:afterPropertiesSet()方法),进而调用initClient()方法初始化seata client;

  • 初始化Seata Client时,TM和RM的逻辑不同,TM会直接和Seata Server建立长连接;
  • 而RM在AT模式下不会直接和Seata Server建立长连接。真正建立长连接的地方时实例化DataSourceProxy时。

首先我们要知道Seata Client 与Seata Server的通信是借助Netty的Channel(网络通道)来完成的,即所谓的建立长连接就是通过Netty的Channel进行通信;

在这里插入图片描述
本文就看一下TMClient.init()、RMClient.init()方法是如何将TM、RM与TC(Seata Server)建立连接通信的?

对TMClient 和 RMClient而言,除自身之外,还会设计到额外的三个类,一定要先明确这几个类的关系,不然看到后面跳过来跳过去的很乱,其关系图如下:

在这里插入图片描述

三、TM事务管理器初始化

TM全称:Transaction Manager,中文名:事务管理器,其定义全局事务的范围:开始全局事务、提交或回滚全局事务。

GlobalTransactionScanner类的如下代码段会进行TM的初始化:

TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

1、TM初始化流程图

在这里插入图片描述

2、TM初始化流程

在这里插入图片描述
TMClient.init()方法入参包括:当前应用的ID、事务组名称,其中获取一个TmNettyRemotingClient实例,然后对其进行初始化;

1)获取TmNettyRemotingClient实例

整体代码执行流程如下:

在这里插入图片描述

在获取TmNettyRemotingClient实例时,首先直接从Spring容器中获取到一个的TmNettyRemotingClient实例,然后再将应用的ID、事务分组名称、鉴权信息设置到TmNettyRemotingClient实例中;

这其中有几个细节点,下面展开聊一聊:

1> TmNettyRemotingClient实例化

实例化TmNettyRemotingClient时采用 DCL(Double Check Lock)保证多线程环境下只会实例化一个TmNettyRemotingClient实例:

在这里插入图片描述

其中用于处理消息的线程池的配置如下:

  1. 核心线程数和最大线程数都为16;
    在这里插入图片描述

    1. 线程过期时间为Integer.MAX_VALUE,单位为秒;
    2. 阻塞队列为有界的、最大容量为2000的LinkedBlockingQueue
    3. 所用线程工厂中生产出的线程名称的前缀为:rpcDispatch_1,其中的1表示TransactionRole为TM。

真正实例化TmNettyRemotingClient时首先会进入其父类AbstractNettyRemotingClient的构造器(下面聊),然后基于SPI扩展加载鉴权签名组件AuthSigner,接着获取开启批量发送请求的配置,默认不开启;注意注册一个配置变更的监听器。

在这里插入图片描述

2> AbstractNettyRemotingClient实例化

在这里插入图片描述

(1)构造器中首先调用父类AbstractNettyRemoting的构造器设置用于处理消息的线程池:

在这里插入图片描述

(2)设置当前事务角色为TMROLE;

(3)实例化一个NettyClientBootstrap,其中组成了原生netty的Bootstrap、EventLoopGroup、EventExecutorGroup;用于和seata server通信;

在这里插入图片描述

  • 其中netty工作线程名的前缀默认为:NettyClientSelector;工作线程池的数量为1;

(4)给Bootstrap设置消息处理器Handler(ChannelOutboundHandler),其为:AbstractNettyRemotingClient.ClientHandler

(5) 实例化netty的channel管理器,用于管理channel连接;

方法一路返回,进入到初始化TmNettyRemotingClient。

2)初始化TmNettyRemotingClient

在这里插入图片描述

初始化TmNettyRemotingClient时会做三件事:

  1. 注册一些请求处理组件;
  2. 调用其父类AbstractNettyRemotingClient的初始化方法定时对tx事务组进行重连、请求超时检查,启动netty客户端组件;
  3. 如果事务分组不为空,通过长连接管理组件对事务分组建立一个长连接;

下面细看一下:

1> 注册一些请求处理组件

seata server可以主动给seata client发送一些请求过来,对于netty里收到不同的请求需要有不同的请求处理组件;所以此处需要注册一些请求处理组件;

消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);

在这里插入图片描述
请求处理组件分为两大类:TC响应处理组件(处理seata server的请求)、心跳消息处理组件。

所谓的注册消息处理器本质上就是将处理器RemotingProcessor和处理消息的线程池ExecutorService包装成一个Pair,然后将Pair作为Value,messageType作为key放入一个Map(processorTable)中;
在这里插入图片描述

/**
 * This container holds all processors.
 * processor type {@link MessageType}
 */
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

2> 初始化AbstractNettyRemotingClient

注册完请求处理组件之后,会使用原子类型(AtomicBoolean)变量initialized + CAS确保AbstractNettyRemotingClient仅会初始化一次。

在这里插入图片描述

AbstractNettyRemotingClient的初始化同样做了三件事:

  1. 启动一个延时60s,每隔10s对tx事务分组(seata server 列表)发起一个重新连接请求;
  2. 调用其父类AbstractNettyRemoting的初始化方法:启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;
  3. 启动netty客户端组件,其seata server可以与seata client通信;

下面我们细看一下AbstractNettyRemoting的初始化、netty客户端组件的启动;

(1)AbstractNettyRemoting初始化

在这里插入图片描述

其中仅会启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;请求超时检查的细节如下:

  • 所谓的请求超时检查,实际是指当seata client发送请求到seata server时,会使用MessageFuture(组合了CompletableFuture)来接收返回值,如果seata server及时返回结果会将MessageFuture从futures中移除。
(2)启动netty客户端组件
@Override
public void start() {
    if (this.defaultEventExecutorGroup == null) {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
            new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                nettyClientConfig.getClientWorkerThreads()));
    }

    // 真正的基于netty API构建一个bootstrap
    this.bootstrap
            // 设置对应的NioEventGroup,工作线程组,默认一个线程就够了
            .group(this.eventLoopGroupWorker)
            .channel(nettyClientConfig.getClientChannelClazz())
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());

    if (nettyClientConfig.enableNative()) {
        if (PlatformDependent.isOsx()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("client run on macOS");
            }
        } else {
            bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                .option(EpollChannelOption.TCP_QUICKACK, true);
        }
    }

    // netty网络通信数据处理组件(PipeLine)进行初始化
    bootstrap.handler(
        new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                // IdleStateHandler,空闲状态检查Handler,如果有数据通过 记录一下数据通过的时间
                // 如果超过很长时间都空闲,没有数据过来,则触发一个user triggered event给ClientHandler进行处理
                pipeline.addLast(new IdleStateHandler(
                        nettyClientConfig.getChannelMaxReadIdleSeconds(),
                        nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                        nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    // 基于seata通信协议的编码器和解码器
                    .addLast(new ProtocolV1Decoder())
                    .addLast(new ProtocolV1Encoder());
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers);
                }
            }
        });

    if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyClientBootstrap has started");
    }
}

NettyClientBootstrap在启动的过程中设置了4个ChannelHandler:

  1. IdleStateHandler:处理心跳
  2. ProtocolV1Decoder:消息解码器
  3. ProtocolV1Encoder:消息编码器
  4. AbstractNettyRemotingClient.ClientHandler:处理各种消息
AbstractNettyRemotingClient.ClientHandler类

在这里插入图片描述

ClientHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

AbstractNettyRemotingClient.ClientHandler处理消息的方式和seata server的AbstractNettyRemotingServer.ServerHandler一致,此处不再赘述。见文章:【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么

3> 和事务分组中的每个seata server建立一个长连接

在这里插入图片描述

这里有一个异常信息相信大家比较眼熟:
在这里插入图片描述

再看acquireChannel()方法和seata server建立长连接的代码执行流程:

在这里插入图片描述

TM/RM 和 TC 通信的关键在于Channel的创建,seata中通过池化的方式(借助了common-pool中的对象池)方式来创建、管理Channel。

(1)涉及到的common-pool中的主要类:

  • GenericKeydObjectPool<K, V>:KV泛型对象池,提供对所有对象的存取管理,而对象的创建由其内部的工厂类来完成
  • KeyedPoolableObjectFactory<K, V>:KV泛型对象工厂,负责池化对象的创建,被对象池持有

(2)涉及到的Seata中对象池实现相关的主要类:

  • 被池化管理的对象就是Channel,对应common-pool中的泛型V
  • NettyPoolKey:Channel对应的Key,对应common-pool中的泛型K,NettyPoolKey主要包含两个信息:

    • address:创建Channel时,对应的TC Server地址
    • message:创建Channel时,向TC Server发送的RPC消息体
  • GenericKeydObjectPool<NettyPoolKey,Channel>:Channel对象池

NettyPoolableFactory:创建Channel的工厂类;

至此,TM的初始化就此完毕!

四、RM资源管理器初始化

RM全称:Resource Manager,中文名:资源管理器,其管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

GlobalTransactionScanner类的如下代码段会进行RM的初始化:

RMClient.init(applicationId, txServiceGroup);

1、RM初始化流程图

和TM初始化流程图基本一致,建议大家研究源码的过程自己画一个出来。

2、RM初始化流程

RM的初始化流程和TM类似,下面我们看一下差异。

1、实例化RmNettyRemotingClient

在这里插入图片描述

在实例化RmNettyRemotingClient时,处理消息的线程池ThreadPoolExecutor,阻塞队列的最大容量是20000,而TmNettyRemotingClient的2000;

此外,RmNettyRemotingClient工作线程的前缀名为:rpcDispatch_2,而TmNettyRemotingClient的工作线程的前缀名为:rpcDispatch_1


在这里插入图片描述

RmNettyRemotingClient的构造器中,和TmNettyRemotingClient做了同样的事,区别在于RmNettyRemotingClient中不需要基于SPI扩展加载鉴权签名组件AuthSigner


在这里插入图片描述
在实例化RmNettyRemotingClient之后,初始化RmNettyRemotingClient之前,会给RmNettyRemotingClient设置资源管理器DefaultResourceManager、事务消息处理器DefaultRMHandler

2、配置RmNettyRemotingClient

配置RmNettyRemotingClient是指实例化DefaultResourceManager、DefaultRMHandler将其赋值到RmNettyRemotingClient的字段:resourceManager、transactionMessageHandler上。

1> 实例化DefaultResourceManager

DefaultResourceManager中使用静态内部类单例模式保证DefaultResourceManager的唯一性;实例化流程如下:
在这里插入图片描述

实例化DefaultResourceManager时会基于SPI扩展加载资源管理器ResourceManager,一共加载出四个:

  • DataSourceManger、SagaResourceManager、ResourceManagerXA、TCCResourceManager。
  • 博主就感觉这里的命名啊,真是一个人一个写法!!!做为一个开源组件,实现的命名方式不需要统一一下吗!!!!!

和事务模式的对应关系如下:
在这里插入图片描述

2> 实例化DefaultRMHandler

DefaultRMHandler中使用静态内部类单例模式保证DefaultRMHandler的唯一性;实例化流程如下:
在这里插入图片描述

实例化DefaultRMHandler时会基于SPI扩展加载资源管理器RMHandler,一共加载出四个:

  • RMHandlerXA、RMHandlerTCC、RMHandlerSaga、RMHandlerAT。
  • 这个命名就很顺眼了,ResourceManager那是啥!!!

和事务模式的对应关系如下:
在这里插入图片描述

3、初始化RmNettyRemotingClient

整体初始化流程和TM一致,差异点在于最后在与seata server建立长连接时会额外判断资源管理器ResourceManager中是否已经加载了连接资源;默认没有加载连接资源,所以初始化RMClient时不会立刻和seata server建立长连接
在这里插入图片描述

而RM的初始化之所以要判断资源是否存在也很好理解,RM就是管理资源的,没有资源也就没有必要理解和Seata Server建立长连接。

既然初始化时不会立刻建立长连接,定时任务每30s才会与seata server重新建立长连接,假如在RM初始化后、定时任务执行之前加载了数据库资源,开始要进行一个分布式事务的流程,此时RM还没有seata server(TC)建立channel通信,重大bug啊!!

既然seata都开源运行了那么久,应该不会存在这个bug吧,我们大胆推测:在创建数据库资源时就会立刻让RM和TC建立长连接。

五、DataSourceProxy

DataSourceProxy是使用seata 实现分布式事务(AT模式)必要引入的DataSource代理,其对数据库操作进行代理。引入方式如下:

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    /**
     * 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚
     *
     * @param druidDataSource The DruidDataSource
     * @return The default datasource
     */
    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

1、实例化DataSourceProxy

实例化DataSourceProxy的代码执行流程如下:

在这里插入图片描述
实例化很简单,单纯的将要代理的DataSource组合到DataSourceProxy中,然后进行DataSourceProxy的初始化;

2、初始化DataSourceProxy

初始化DataSourceProxy时会做三件事:

  1. 从数据库连接中获取出JDBC连接信息保存下来;
  2. 初始化资源ID,针对单机MySQL默认为数据库连接地址(不包含?后面的字符);其用于注册到TC中做标识;
  3. 将当前Resource资源注册到TC;

下面细看一下:初始化资源ID、将当前Resource资源注册到TC。

1)初始化资源ID

代码执行流程如下:

在这里插入图片描述

针对单机MySQL,ResourceId默认为数据库连接地址(不包含?后面的字符)。

2)注册Resource到TC

代码执行流程如下:

在这里插入图片描述

DataSourceManager#registerResource()方法中会将 数据库资源的resourceID作为key、数据库资源作为value保存到本地缓存dataSourceCache中;

private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();

在RMClient初始化时,会不会立即和TC建立长连接,相对TMClient而言,额外的一个判断条件正是dataSourceCache是否为空。

后续的注册资源流程其实就只是从NettyClientChannelManager拿到和TC建立长连接的channel,然后向其发送注册RM请求RegisterRMRequest

注册成功之后,TC(Seata Server)端会打印日志:
在这里插入图片描述

[rverHandlerThread_1_9_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata_stock', applicationId='stock-service', transactionServiceGroup='saint-trade-tx-group'},channel:[id: 0x56942c85, L:/127.0.0.1:8091 - R:/127.0.0.1:59778],client version:1.5.2

加载完DataSourceProxy之前,如果定时任务向TC发起注册RM请求RegisterRMRequest(每30s执行一次和seata server重新连接),则在TC端没有resourceId标识:
在这里插入图片描述

[ttyServerNIOWorker_1_5_16] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='stock-service', transactionServiceGroup='saint-trade-tx-group'},channel:[id: 0xf216813f, L:/127.0.0.1:8091 - R:/127.0.0.1:59369],client version:1.5.2

六、总结和后续

本文我们聊了TM / RM在实例化GlobalTransactionScanner之后 开始初始化 向TC发起注册请求、建立长连接,但是针对RMClient并不会在初始化时立即和TC建立长连接;

而是等到DataSourceProxy加载之后,才会立即和TC建立长连接;或者等每30秒执行一次的定时任务和TC建立长连接,但是如果DataSourceProxy还没有加载,则建立长连接时,资源的标识resourceID为null。

下一篇文章开聊seata如何开启全局事务?

相关文章
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
3天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
16天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
36 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
53 5
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
60 0
|
1月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
80 0
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
107 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)

推荐镜像

更多