@[TOC]
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】
- 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
本文接着上文中的Seata Client聊的GlobalTransactionScanner
来聊一聊Seata Client 如何 与Seata Server建立连接、通信?
PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。
二、概述
在前文(【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么),我们聊了随着Spring容器初始化完毕,会调用GlobalTransactionScanner的初始化逻辑(即:afterPropertiesSet()
方法),进而调用initClient()
方法初始化seata client;
- 初始化Seata Client时,TM和RM的逻辑不同,TM会直接和Seata Server建立长连接;
- 而RM在AT模式下不会直接和Seata Server建立长连接。真正建立长连接的地方时实例化DataSourceProxy时。
首先我们要知道Seata Client 与Seata Server的通信是借助Netty的Channel(网络通道)来完成的,即所谓的建立长连接就是通过Netty的Channel进行通信;
本文就看一下TMClient.init()、RMClient.init()方法是如何将TM、RM与TC(Seata Server)建立连接通信的?
对TMClient 和 RMClient而言,除自身之外,还会设计到额外的三个类,一定要先明确这几个类的关系,不然看到后面跳过来跳过去的很乱,其关系图如下:
三、TM事务管理器初始化
TM全称:Transaction Manager,中文名:事务管理器,其定义全局事务的范围:开始全局事务、提交或回滚全局事务。
在GlobalTransactionScanner
类的如下代码段会进行TM的初始化:
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
1、TM初始化流程图
2、TM初始化流程
TMClient.init()方法入参包括:当前应用的ID、事务组名称,其中获取一个TmNettyRemotingClient
实例,然后对其进行初始化;
1)获取TmNettyRemotingClient实例
整体代码执行流程如下:
在获取TmNettyRemotingClient实例时,首先直接从Spring容器中获取到一个的TmNettyRemotingClient实例,然后再将应用的ID、事务分组名称、鉴权信息设置到TmNettyRemotingClient实例中;
这其中有几个细节点,下面展开聊一聊:
1> TmNettyRemotingClient实例化
实例化TmNettyRemotingClient时采用 DCL(Double Check Lock)保证多线程环境下只会实例化一个TmNettyRemotingClient
实例:
其中用于处理消息的线程池的配置如下:
核心线程数和最大线程数都为16;
- 线程过期时间为Integer.MAX_VALUE,单位为秒;
- 阻塞队列为有界的、最大容量为2000的
LinkedBlockingQueue
;- 所用线程工厂中生产出的线程名称的前缀为:
rpcDispatch_1
,其中的1表示TransactionRole
为TM。
真正实例化TmNettyRemotingClient时首先会进入其父类AbstractNettyRemotingClient
的构造器(下面聊),然后基于SPI扩展加载鉴权签名组件AuthSigner
,接着获取开启批量发送请求
的配置,默认不开启;注意注册一个配置变更的监听器。
2> AbstractNettyRemotingClient实例化
(1)构造器中首先调用父类AbstractNettyRemoting
的构造器设置用于处理消息的线程池:
(2)设置当前事务角色为TMROLE;
(3)实例化一个NettyClientBootstrap,其中组成了原生netty的Bootstrap、EventLoopGroup、EventExecutorGroup;用于和seata server通信;
- 其中netty工作线程名的前缀默认为:
NettyClientSelector
;工作线程池的数量为1;
(4)给Bootstrap设置消息处理器Handler(ChannelOutboundHandler),其为:AbstractNettyRemotingClient.ClientHandler
。
(5) 实例化netty的channel管理器,用于管理channel连接;
方法一路返回,进入到初始化TmNettyRemotingClient。
2)初始化TmNettyRemotingClient
初始化TmNettyRemotingClient时会做三件事:
- 注册一些请求处理组件;
- 调用其父类
AbstractNettyRemotingClient
的初始化方法定时对tx事务组进行重连、请求超时检查,启动netty客户端组件; - 如果事务分组不为空,通过长连接管理组件对事务分组建立一个长连接;
下面细看一下:
1> 注册一些请求处理组件
seata server可以主动给seata client发送一些请求过来,对于netty里收到不同的请求需要有不同的请求处理组件;所以此处需要注册一些请求处理组件;
消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);
请求处理组件分为两大类:TC响应处理组件(处理seata server的请求)、心跳消息处理组件。
所谓的注册消息处理器本质上就是将处理器RemotingProcessor
和处理消息的线程池ExecutorService
包装成一个Pair
,然后将Pair作为Value,messageType作为key放入一个Map(processorTable
)中;
/**
* This container holds all processors.
* processor type {@link MessageType}
*/
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
2> 初始化AbstractNettyRemotingClient
注册完请求处理组件之后,会使用原子类型(AtomicBoolean
)变量initialized
+ CAS确保AbstractNettyRemotingClient仅会初始化一次。
AbstractNettyRemotingClient的初始化同样做了三件事:
- 启动一个延时60s,每隔10s对tx事务分组(seata server 列表)发起一个重新连接请求;
- 调用其父类
AbstractNettyRemoting
的初始化方法:启动一个延时3s,每3s执行一次的定时任务,做请求超时检查; - 启动netty客户端组件,其seata server可以与seata client通信;
下面我们细看一下AbstractNettyRemoting的初始化、netty客户端组件的启动;
(1)AbstractNettyRemoting初始化
其中仅会启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;请求超时检查的细节如下:
- 所谓的请求超时检查,实际是指当seata client发送请求到seata server时,会使用
MessageFuture
(组合了CompletableFuture
)来接收返回值,如果seata server及时返回结果会将MessageFuture从futures中移除。
(2)启动netty客户端组件
@Override
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
// 真正的基于netty API构建一个bootstrap
this.bootstrap
// 设置对应的NioEventGroup,工作线程组,默认一个线程就够了
.group(this.eventLoopGroupWorker)
.channel(nettyClientConfig.getClientChannelClazz())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS");
}
} else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
}
}
// netty网络通信数据处理组件(PipeLine)进行初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler,空闲状态检查Handler,如果有数据通过 记录一下数据通过的时间
// 如果超过很长时间都空闲,没有数据过来,则触发一个user triggered event给ClientHandler进行处理
pipeline.addLast(new IdleStateHandler(
nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
// 基于seata通信协议的编码器和解码器
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started");
}
}
NettyClientBootstrap在启动的过程中设置了4个ChannelHandler:
- IdleStateHandler:处理心跳
- ProtocolV1Decoder:消息解码器
- ProtocolV1Encoder:消息编码器
- AbstractNettyRemotingClient.ClientHandler:处理各种消息
AbstractNettyRemotingClient.ClientHandler类
ClientHandler类上有个@ChannelHandler.Sharable
注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。
processMessage(ctx, (RpcMessage) msg)
方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。
AbstractNettyRemotingClient.ClientHandler
处理消息的方式和seata server的AbstractNettyRemotingServer.ServerHandler
一致,此处不再赘述。见文章:【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么。
3> 和事务分组中的每个seata server建立一个长连接
这里有一个异常信息相信大家比较眼熟:
再看acquireChannel()
方法和seata server建立长连接的代码执行流程:
TM/RM 和 TC 通信的关键在于Channel的创建,seata中通过池化的方式(借助了common-pool中的对象池)方式来创建、管理Channel。
(1)涉及到的common-pool中的主要类:
- GenericKeydObjectPool<K, V>:KV泛型对象池,提供对所有对象的存取管理,而对象的创建由其内部的工厂类来完成
- KeyedPoolableObjectFactory<K, V>:KV泛型对象工厂,负责池化对象的创建,被对象池持有
(2)涉及到的Seata中对象池实现相关的主要类:
- 被池化管理的对象就是Channel,对应common-pool中的泛型V
NettyPoolKey:Channel对应的Key,对应common-pool中的泛型K,NettyPoolKey主要包含两个信息:
- address:创建Channel时,对应的TC Server地址
- message:创建Channel时,向TC Server发送的RPC消息体
- GenericKeydObjectPool<NettyPoolKey,Channel>:Channel对象池
NettyPoolableFactory:创建Channel的工厂类;
至此,TM的初始化就此完毕!
四、RM资源管理器初始化
RM全称:Resource Manager,中文名:资源管理器,其管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
在GlobalTransactionScanner
类的如下代码段会进行RM的初始化:
RMClient.init(applicationId, txServiceGroup);
1、RM初始化流程图
和TM初始化流程图基本一致,建议大家研究源码的过程自己画一个出来。
2、RM初始化流程
RM的初始化流程和TM类似,下面我们看一下差异。
1、实例化RmNettyRemotingClient
在实例化RmNettyRemotingClient时,处理消息的线程池ThreadPoolExecutor
,阻塞队列的最大容量是20000,而TmNettyRemotingClient的2000;
此外,RmNettyRemotingClient工作线程的前缀名为:rpcDispatch_2
,而TmNettyRemotingClient的工作线程的前缀名为:rpcDispatch_1
;
RmNettyRemotingClient的构造器中,和TmNettyRemotingClient做了同样的事,区别在于RmNettyRemotingClient中不需要基于SPI扩展加载鉴权签名组件AuthSigner
:
在实例化RmNettyRemotingClient之后,初始化RmNettyRemotingClient之前,会给RmNettyRemotingClient
设置资源管理器DefaultResourceManager
、事务消息处理器DefaultRMHandler
。
2、配置RmNettyRemotingClient
配置RmNettyRemotingClient是指实例化DefaultResourceManager、DefaultRMHandler将其赋值到RmNettyRemotingClient的字段:resourceManager、transactionMessageHandler上。
1> 实例化DefaultResourceManager
DefaultResourceManager中使用静态内部类单例模式保证DefaultResourceManager的唯一性;实例化流程如下:
实例化DefaultResourceManager时会基于SPI扩展加载资源管理器ResourceManager,一共加载出四个:
- DataSourceManger、SagaResourceManager、ResourceManagerXA、TCCResourceManager。
- 博主就感觉这里的命名啊,真是一个人一个写法!!!做为一个开源组件,实现的命名方式不需要统一一下吗!!!!!
和事务模式的对应关系如下:
2> 实例化DefaultRMHandler
DefaultRMHandler中使用静态内部类单例模式保证DefaultRMHandler的唯一性;实例化流程如下:
实例化DefaultRMHandler时会基于SPI扩展加载资源管理器RMHandler,一共加载出四个:
- RMHandlerXA、RMHandlerTCC、RMHandlerSaga、RMHandlerAT。
- 这个命名就很顺眼了,ResourceManager那是啥!!!
和事务模式的对应关系如下:
3、初始化RmNettyRemotingClient
整体初始化流程和TM一致,差异点在于最后在与seata server建立长连接时会额外判断资源管理器ResourceManager
中是否已经加载了连接资源;默认没有加载连接资源,所以初始化RMClient时不会立刻和seata server建立长连接。
而RM的初始化之所以要判断资源是否存在也很好理解,RM就是管理资源的,没有资源也就没有必要理解和Seata Server建立长连接。
既然初始化时不会立刻建立长连接,定时任务每30s才会与seata server重新建立长连接,假如在RM初始化后、定时任务执行之前加载了数据库资源,开始要进行一个分布式事务的流程,此时RM还没有seata server(TC)建立channel通信,重大bug啊!!
既然seata都开源运行了那么久,应该不会存在这个bug吧,我们大胆推测:在创建数据库资源时就会立刻让RM和TC建立长连接。
五、DataSourceProxy
DataSourceProxy是使用seata 实现分布式事务(AT模式)必要引入的DataSource代理,其对数据库操作进行代理。引入方式如下:
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚
*
* @param druidDataSource The DruidDataSource
* @return The default datasource
*/
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
1、实例化DataSourceProxy
实例化DataSourceProxy的代码执行流程如下:
实例化很简单,单纯的将要代理的DataSource组合到DataSourceProxy中,然后进行DataSourceProxy的初始化;
2、初始化DataSourceProxy
初始化DataSourceProxy时会做三件事:
- 从数据库连接中获取出JDBC连接信息保存下来;
- 初始化资源ID,针对单机MySQL默认为数据库连接地址(不包含?后面的字符);其用于注册到TC中做标识;
- 将当前Resource资源注册到TC;
下面细看一下:初始化资源ID、将当前Resource资源注册到TC。
1)初始化资源ID
代码执行流程如下:
针对单机MySQL,ResourceId默认为数据库连接地址(不包含?后面的字符)。
2)注册Resource到TC
代码执行流程如下:
在DataSourceManager#registerResource()
方法中会将 数据库资源的resourceID作为key、数据库资源作为value保存到本地缓存dataSourceCache
中;
private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
在RMClient初始化时,会不会立即和TC建立长连接,相对TMClient而言,额外的一个判断条件正是dataSourceCache是否为空。
后续的注册资源流程其实就只是从NettyClientChannelManager
拿到和TC建立长连接的channel,然后向其发送注册RM请求RegisterRMRequest
。
注册成功之后,TC(Seata Server)端会打印日志:
[rverHandlerThread_1_9_500] i.s.c.r.processor.server.RegRmProcessor : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata_stock', applicationId='stock-service', transactionServiceGroup='saint-trade-tx-group'},channel:[id: 0x56942c85, L:/127.0.0.1:8091 - R:/127.0.0.1:59778],client version:1.5.2
加载完DataSourceProxy之前,如果定时任务向TC发起注册RM请求RegisterRMRequest
(每30s执行一次和seata server重新连接),则在TC端没有resourceId标识:
[ttyServerNIOWorker_1_5_16] i.s.c.r.processor.server.RegTmProcessor : TM register success,message:RegisterTMRequest{applicationId='stock-service', transactionServiceGroup='saint-trade-tx-group'},channel:[id: 0xf216813f, L:/127.0.0.1:8091 - R:/127.0.0.1:59369],client version:1.5.2
六、总结和后续
本文我们聊了TM / RM在实例化GlobalTransactionScanner之后 开始初始化 向TC发起注册请求、建立长连接,但是针对RMClient并不会在初始化时立即和TC建立长连接;
而是等到DataSourceProxy加载之后,才会立即和TC建立长连接;或者等每30秒执行一次的定时任务和TC建立长连接,但是如果DataSourceProxy还没有加载,则建立长连接时,资源的标识resourceID为null。
下一篇文章开聊seata如何开启全局事务?