【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么

本文接着上文中的Seata Client聊的GlobalTransactionScanner来聊一聊Seata Client 如何 与Seata Server建立连接、通信?

PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。

二、概述

在前文(【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么),我们聊了随着Spring容器初始化完毕,会调用GlobalTransactionScanner的初始化逻辑(即:afterPropertiesSet()方法),进而调用initClient()方法初始化seata client;

  • 初始化Seata Client时,TM和RM的逻辑不同,TM会直接和Seata Server建立长连接;
  • 而RM在AT模式下不会直接和Seata Server建立长连接。真正建立长连接的地方时实例化DataSourceProxy时。

首先我们要知道Seata Client 与Seata Server的通信是借助Netty的Channel(网络通道)来完成的,即所谓的建立长连接就是通过Netty的Channel进行通信;

在这里插入图片描述
本文就看一下TMClient.init()、RMClient.init()方法是如何将TM、RM与TC(Seata Server)建立连接通信的?

对TMClient 和 RMClient而言,除自身之外,还会设计到额外的三个类,一定要先明确这几个类的关系,不然看到后面跳过来跳过去的很乱,其关系图如下:

在这里插入图片描述

三、TM事务管理器初始化

TM全称:Transaction Manager,中文名:事务管理器,其定义全局事务的范围:开始全局事务、提交或回滚全局事务。

GlobalTransactionScanner类的如下代码段会进行TM的初始化:

TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

1、TM初始化流程图

在这里插入图片描述

2、TM初始化流程

在这里插入图片描述
TMClient.init()方法入参包括:当前应用的ID、事务组名称,其中获取一个TmNettyRemotingClient实例,然后对其进行初始化;

1)获取TmNettyRemotingClient实例

整体代码执行流程如下:

在这里插入图片描述

在获取TmNettyRemotingClient实例时,首先直接从Spring容器中获取到一个的TmNettyRemotingClient实例,然后再将应用的ID、事务分组名称、鉴权信息设置到TmNettyRemotingClient实例中;

这其中有几个细节点,下面展开聊一聊:

1> TmNettyRemotingClient实例化

实例化TmNettyRemotingClient时采用 DCL(Double Check Lock)保证多线程环境下只会实例化一个TmNettyRemotingClient实例:

在这里插入图片描述

其中用于处理消息的线程池的配置如下:

  1. 核心线程数和最大线程数都为16;
    在这里插入图片描述

    1. 线程过期时间为Integer.MAX_VALUE,单位为秒;
    2. 阻塞队列为有界的、最大容量为2000的LinkedBlockingQueue
    3. 所用线程工厂中生产出的线程名称的前缀为:rpcDispatch_1,其中的1表示TransactionRole为TM。

真正实例化TmNettyRemotingClient时首先会进入其父类AbstractNettyRemotingClient的构造器(下面聊),然后基于SPI扩展加载鉴权签名组件AuthSigner,接着获取开启批量发送请求的配置,默认不开启;注意注册一个配置变更的监听器。

在这里插入图片描述

2> AbstractNettyRemotingClient实例化

在这里插入图片描述

(1)构造器中首先调用父类AbstractNettyRemoting的构造器设置用于处理消息的线程池:

在这里插入图片描述

(2)设置当前事务角色为TMROLE;

(3)实例化一个NettyClientBootstrap,其中组成了原生netty的Bootstrap、EventLoopGroup、EventExecutorGroup;用于和seata server通信;

在这里插入图片描述

  • 其中netty工作线程名的前缀默认为:NettyClientSelector;工作线程池的数量为1;

(4)给Bootstrap设置消息处理器Handler(ChannelOutboundHandler),其为:AbstractNettyRemotingClient.ClientHandler

(5) 实例化netty的channel管理器,用于管理channel连接;

方法一路返回,进入到初始化TmNettyRemotingClient。

2)初始化TmNettyRemotingClient

在这里插入图片描述

初始化TmNettyRemotingClient时会做三件事:

  1. 注册一些请求处理组件;
  2. 调用其父类AbstractNettyRemotingClient的初始化方法定时对tx事务组进行重连、请求超时检查,启动netty客户端组件;
  3. 如果事务分组不为空,通过长连接管理组件对事务分组建立一个长连接;

下面细看一下:

1> 注册一些请求处理组件

seata server可以主动给seata client发送一些请求过来,对于netty里收到不同的请求需要有不同的请求处理组件;所以此处需要注册一些请求处理组件;

消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);

在这里插入图片描述
请求处理组件分为两大类:TC响应处理组件(处理seata server的请求)、心跳消息处理组件。

所谓的注册消息处理器本质上就是将处理器RemotingProcessor和处理消息的线程池ExecutorService包装成一个Pair,然后将Pair作为Value,messageType作为key放入一个Map(processorTable)中;
在这里插入图片描述

/**
 * This container holds all processors.
 * processor type {@link MessageType}
 */
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);

2> 初始化AbstractNettyRemotingClient

注册完请求处理组件之后,会使用原子类型(AtomicBoolean)变量initialized + CAS确保AbstractNettyRemotingClient仅会初始化一次。

在这里插入图片描述

AbstractNettyRemotingClient的初始化同样做了三件事:

  1. 启动一个延时60s,每隔10s对tx事务分组(seata server 列表)发起一个重新连接请求;
  2. 调用其父类AbstractNettyRemoting的初始化方法:启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;
  3. 启动netty客户端组件,其seata server可以与seata client通信;

下面我们细看一下AbstractNettyRemoting的初始化、netty客户端组件的启动;

(1)AbstractNettyRemoting初始化

在这里插入图片描述

其中仅会启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;请求超时检查的细节如下:

  • 所谓的请求超时检查,实际是指当seata client发送请求到seata server时,会使用MessageFuture(组合了CompletableFuture)来接收返回值,如果seata server及时返回结果会将MessageFuture从futures中移除。
(2)启动netty客户端组件
@Override
public void start() {
    if (this.defaultEventExecutorGroup == null) {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
            new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                nettyClientConfig.getClientWorkerThreads()));
    }

    // 真正的基于netty API构建一个bootstrap
    this.bootstrap
            // 设置对应的NioEventGroup,工作线程组,默认一个线程就够了
            .group(this.eventLoopGroupWorker)
            .channel(nettyClientConfig.getClientChannelClazz())
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());

    if (nettyClientConfig.enableNative()) {
        if (PlatformDependent.isOsx()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("client run on macOS");
            }
        } else {
            bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                .option(EpollChannelOption.TCP_QUICKACK, true);
        }
    }

    // netty网络通信数据处理组件(PipeLine)进行初始化
    bootstrap.handler(
        new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                // IdleStateHandler,空闲状态检查Handler,如果有数据通过 记录一下数据通过的时间
                // 如果超过很长时间都空闲,没有数据过来,则触发一个user triggered event给ClientHandler进行处理
                pipeline.addLast(new IdleStateHandler(
                        nettyClientConfig.getChannelMaxReadIdleSeconds(),
                        nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                        nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    // 基于seata通信协议的编码器和解码器
                    .addLast(new ProtocolV1Decoder())
                    .addLast(new ProtocolV1Encoder());
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers);
                }
            }
        });

    if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
        LOGGER.info("NettyClientBootstrap has started");
    }
}

NettyClientBootstrap在启动的过程中设置了4个ChannelHandler:

  1. IdleStateHandler:处理心跳
  2. ProtocolV1Decoder:消息解码器
  3. ProtocolV1Encoder:消息编码器
  4. AbstractNettyRemotingClient.ClientHandler:处理各种消息
AbstractNettyRemotingClient.ClientHandler类

在这里插入图片描述

ClientHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

AbstractNettyRemotingClient.ClientHandler处理消息的方式和seata server的AbstractNettyRemotingServer.ServerHandler一致,此处不再赘述。见文章:【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么

3> 和事务分组中的每个seata server建立一个长连接

在这里插入图片描述

这里有一个异常信息相信大家比较眼熟:
在这里插入图片描述

再看acquireChannel()方法和seata server建立长连接的代码执行流程:

在这里插入图片描述

TM/RM 和 TC 通信的关键在于Channel的创建,seata中通过池化的方式(借助了common-pool中的对象池)方式来创建、管理Channel。

(1)涉及到的common-pool中的主要类:

  • GenericKeydObjectPool<K, V>:KV泛型对象池,提供对所有对象的存取管理,而对象的创建由其内部的工厂类来完成
  • KeyedPoolableObjectFactory<K, V>:KV泛型对象工厂,负责池化对象的创建,被对象池持有

(2)涉及到的Seata中对象池实现相关的主要类:

  • 被池化管理的对象就是Channel,对应common-pool中的泛型V
  • NettyPoolKey:Channel对应的Key,对应common-pool中的泛型K,NettyPoolKey主要包含两个信息:

    • address:创建Channel时,对应的TC Server地址
    • message:创建Channel时,向TC Server发送的RPC消息体
  • GenericKeydObjectPool<NettyPoolKey,Channel>:Channel对象池

NettyPoolableFactory:创建Channel的工厂类;

至此,TM的初始化就此完毕!

四、RM资源管理器初始化

RM全称:Resource Manager,中文名:资源管理器,其管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

GlobalTransactionScanner类的如下代码段会进行RM的初始化:

RMClient.init(applicationId, txServiceGroup);

1、RM初始化流程图

和TM初始化流程图基本一致,建议大家研究源码的过程自己画一个出来。

2、RM初始化流程

RM的初始化流程和TM类似,下面我们看一下差异。

1、实例化RmNettyRemotingClient

在这里插入图片描述

在实例化RmNettyRemotingClient时,处理消息的线程池ThreadPoolExecutor,阻塞队列的最大容量是20000,而TmNettyRemotingClient的2000;

此外,RmNettyRemotingClient工作线程的前缀名为:rpcDispatch_2,而TmNettyRemotingClient的工作线程的前缀名为:rpcDispatch_1


在这里插入图片描述

RmNettyRemotingClient的构造器中,和TmNettyRemotingClient做了同样的事,区别在于RmNettyRemotingClient中不需要基于SPI扩展加载鉴权签名组件AuthSigner


在这里插入图片描述
在实例化RmNettyRemotingClient之后,初始化RmNettyRemotingClient之前,会给RmNettyRemotingClient设置资源管理器DefaultResourceManager、事务消息处理器DefaultRMHandler

2、配置RmNettyRemotingClient

配置RmNettyRemotingClient是指实例化DefaultResourceManager、DefaultRMHandler将其赋值到RmNettyRemotingClient的字段:resourceManager、transactionMessageHandler上。

1> 实例化DefaultResourceManager

DefaultResourceManager中使用静态内部类单例模式保证DefaultResourceManager的唯一性;实例化流程如下:
在这里插入图片描述

实例化DefaultResourceManager时会基于SPI扩展加载资源管理器ResourceManager,一共加载出四个:

  • DataSourceManger、SagaResourceManager、ResourceManagerXA、TCCResourceManager。
  • 博主就感觉这里的命名啊,真是一个人一个写法!!!做为一个开源组件,实现的命名方式不需要统一一下吗!!!!!

和事务模式的对应关系如下:
在这里插入图片描述

2> 实例化DefaultRMHandler

DefaultRMHandler中使用静态内部类单例模式保证DefaultRMHandler的唯一性;实例化流程如下:
在这里插入图片描述

实例化DefaultRMHandler时会基于SPI扩展加载资源管理器RMHandler,一共加载出四个:

  • RMHandlerXA、RMHandlerTCC、RMHandlerSaga、RMHandlerAT。
  • 这个命名就很顺眼了,ResourceManager那是啥!!!

和事务模式的对应关系如下:
在这里插入图片描述

3、初始化RmNettyRemotingClient

整体初始化流程和TM一致,差异点在于最后在与seata server建立长连接时会额外判断资源管理器ResourceManager中是否已经加载了连接资源;默认没有加载连接资源,所以初始化RMClient时不会立刻和seata server建立长连接
在这里插入图片描述

而RM的初始化之所以要判断资源是否存在也很好理解,RM就是管理资源的,没有资源也就没有必要理解和Seata Server建立长连接。

既然初始化时不会立刻建立长连接,定时任务每30s才会与seata server重新建立长连接,假如在RM初始化后、定时任务执行之前加载了数据库资源,开始要进行一个分布式事务的流程,此时RM还没有seata server(TC)建立channel通信,重大bug啊!!

既然seata都开源运行了那么久,应该不会存在这个bug吧,我们大胆推测:在创建数据库资源时就会立刻让RM和TC建立长连接。

五、DataSourceProxy

DataSourceProxy是使用seata 实现分布式事务(AT模式)必要引入的DataSource代理,其对数据库操作进行代理。引入方式如下:

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    /**
     * 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚
     *
     * @param druidDataSource The DruidDataSource
     * @return The default datasource
     */
    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

1、实例化DataSourceProxy

实例化DataSourceProxy的代码执行流程如下:

在这里插入图片描述
实例化很简单,单纯的将要代理的DataSource组合到DataSourceProxy中,然后进行DataSourceProxy的初始化;

2、初始化DataSourceProxy

初始化DataSourceProxy时会做三件事:

  1. 从数据库连接中获取出JDBC连接信息保存下来;
  2. 初始化资源ID,针对单机MySQL默认为数据库连接地址(不包含?后面的字符);其用于注册到TC中做标识;
  3. 将当前Resource资源注册到TC;

下面细看一下:初始化资源ID、将当前Resource资源注册到TC。

1)初始化资源ID

代码执行流程如下:

在这里插入图片描述

针对单机MySQL,ResourceId默认为数据库连接地址(不包含?后面的字符)。

2)注册Resource到TC

代码执行流程如下:

在这里插入图片描述

DataSourceManager#registerResource()方法中会将 数据库资源的resourceID作为key、数据库资源作为value保存到本地缓存dataSourceCache中;

private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();

在RMClient初始化时,会不会立即和TC建立长连接,相对TMClient而言,额外的一个判断条件正是dataSourceCache是否为空。

后续的注册资源流程其实就只是从NettyClientChannelManager拿到和TC建立长连接的channel,然后向其发送注册RM请求RegisterRMRequest

注册成功之后,TC(Seata Server)端会打印日志:
在这里插入图片描述

[rverHandlerThread_1_9_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata_stock', applicationId='stock-service', transactionServiceGroup='saint-trade-tx-group'},channel:[id: 0x56942c85, L:/127.0.0.1:8091 - R:/127.0.0.1:59778],client version:1.5.2

加载完DataSourceProxy之前,如果定时任务向TC发起注册RM请求RegisterRMRequest(每30s执行一次和seata server重新连接),则在TC端没有resourceId标识:
在这里插入图片描述

[ttyServerNIOWorker_1_5_16] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='stock-service', transactionServiceGroup='saint-trade-tx-group'},channel:[id: 0xf216813f, L:/127.0.0.1:8091 - R:/127.0.0.1:59369],client version:1.5.2

六、总结和后续

本文我们聊了TM / RM在实例化GlobalTransactionScanner之后 开始初始化 向TC发起注册请求、建立长连接,但是针对RMClient并不会在初始化时立即和TC建立长连接;

而是等到DataSourceProxy加载之后,才会立即和TC建立长连接;或者等每30秒执行一次的定时任务和TC建立长连接,但是如果DataSourceProxy还没有加载,则建立长连接时,资源的标识resourceID为null。

下一篇文章开聊seata如何开启全局事务?

相关文章
|
9天前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
45 6
|
9天前
|
设计模式 Java API
微服务架构演变与架构设计深度解析
【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
25 1
|
1月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
100 3
|
2月前
|
Java 对象存储 开发者
解析Spring Cloud与Netflix OSS:微服务架构中的左右手如何协同作战
Spring Cloud与Netflix OSS不仅是现代微服务架构中不可或缺的一部分,它们还通过不断的技术创新和社区贡献推动了整个行业的发展。无论是对于初创企业还是大型组织来说,掌握并合理运用这两套工具,都能极大地提升软件系统的灵活性、可扩展性以及整体性能。随着云计算和容器化技术的进一步普及,Spring Cloud与Netflix OSS将继续引领微服务技术的发展潮流。
54 0
|
23天前
|
监控 安全 Java
构建高效后端服务:微服务架构深度解析与最佳实践###
【10月更文挑战第19天】 在数字化转型加速的今天,企业对后端服务的响应速度、可扩展性和灵活性提出了更高要求。本文探讨了微服务架构作为解决方案,通过分析传统单体架构面临的挑战,深入剖析微服务的核心优势、关键组件及设计原则。我们将从实际案例入手,揭示成功实施微服务的策略与常见陷阱,为开发者和企业提供可操作的指导建议。本文目的是帮助读者理解如何利用微服务架构提升后端服务的整体效能,实现业务快速迭代与创新。 ###
57 2
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
25天前
|
存储 Kubernetes 监控
深度解析Kubernetes在微服务架构中的应用与优化
【10月更文挑战第18天】深度解析Kubernetes在微服务架构中的应用与优化
95 0
|
3月前
|
消息中间件 测试技术 API
深入解析微服务架构的设计与实践
在软件工程领域,"分而治之"的策略一直是解决复杂问题的有效方法。微服务架构作为这一策略的现代体现,它通过将大型应用程序分解为一组小的、独立的服务来简化开发与部署。本文将带你了解微服务的核心概念,探讨设计时的关键考虑因素,并分享实践中的一些经验教训,旨在帮助开发者更好地构建和维护可扩展的系统。
|
3月前
|
关系型数据库 MySQL 数据库
SpringCloud2023中使用Seata解决分布式事务
对于分布式系统而言,需要保证分布式系统中的数据一致性,保证数据在子系统中始终保持一致,避免业务出现问题。分布式系统中对数据的操作要么一起成功,要么一起失败,必须是一个整体性的事务。Seata简化了这个使用过程。
87 2

推荐镜像

更多