关于Spring Cloud Gateway与下游服务器的连接分析

简介: 背景最近面试了不少同学,有很大一部分简历上会提到网关,我一般都会顺着往下问他们的网关是怎么做的。基本上都是说直接使用的Spring Cloud Gateway或者基于Spring Cloud Gateway二次开发。这种时候我会继续问一个比较基础的问题:Spring Cloud Gateway作为网关,会把接收到的请求转发给下游服务,那么Spring Cloud Gateway跟下游的服务之间保持的是长连还是短连?还是说每次转发的时候都会新建立一个连接吗?

背景

最近面试了不少同学,有很大一部分简历上会提到网关,我一般都会顺着往下问他们的网关是怎么做的。

基本上都是说直接使用的Spring Cloud Gateway或者基于Spring Cloud Gateway二次开发。

这种时候我会继续问一个比较基础的问题:Spring Cloud Gateway作为网关,会把接收到的请求转发给下游服务,那么Spring Cloud Gateway跟下游的服务之间保持的是长连还是短连?还是说每次转发的时候都会新建立一个连接吗?

很遗憾的是,这么基础的问题,很少有面试者完全搞清楚。

所以才有了这篇文章:通过研究Spring Cloud Gateway的源码,来看看Spring Cloud Gateway跟下游服务之间是怎么通信的。

Spring Cloud Gateway

在源码分析之前,需要先了解一下Spring Cloud Gateway

SpringCloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。

Spring Cloud Gateway是基于Spring WebFlux框架实现的,而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。

Spring Cloud Gateway架构图如下:

网络异常,图片无法展示
|

源码分析

对于基于webflux的应用,入口点都在DispatchHandler.handle()方法:

网络异常,图片无法展示
|

网络异常,图片无法展示
|

最终执行到
SimpleHandlerAdapter.handle() 方法

网络异常,图片无法展示
|

handler()方法中执行的是
FilteringWebHandler.handle()方法

网络异常,图片无法展示
|


FilteringWebHandler.handler()方法的主要逻辑就是依次执行已经形成的全局过滤器globalFilter的filter()方法。

从截图中可以看到,默认会生成9个全局过滤器GatewayFilter对象。

单步调试下去,发现涉及到网络这一块的操作都在倒数第二个过滤器NettyRoutingFilter类中。

现在着重来看一下NettyRoutingFilter.filter()方法:

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    // ... 一些省略代码
    // 获取httpclient
    Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
            .headers(headers -> {
                headers.add(httpHeaders);
                // Will either be set below, or later by Netty
                headers.remove(HttpHeaders.HOST);
                if (preserveHost) {
                    String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                    headers.add(HttpHeaders.HOST, host);
                }
            }).request(method).uri(url).send((req, nettyOutbound) -> {
                if (log.isTraceEnabled()) {
                    nettyOutbound
                            .withConnection(connection -> log.trace("outbound route: "
                                    + connection.channel().id().asShortText()
                                    + ", inbound: " + exchange.getLogPrefix()));
                }
                // 发送请求
                return nettyOutbound.send(request.getBody().map(this::getByteBuf));
            }).responseConnection((res, connection) -> {
                // 省略代码,下游请求返回之后做的一些处理
                return Mono.just(res);
            });
    Duration responseTimeout = getResponseTimeout(route);
    // 一些省略代码
    return responseFlux.then(chain.filter(exchange));
}

上面代码的逻辑主要就是

  1. 获取通信用的httpclient
  2. 设置headers,method和url
  3. 执行responseConnection()方法发起连接
  4. 连接成功之后执行send()方法传入的lambda方法。
  5. 执行responseConnection()传入的lambda方法。

首先来看一下getHttpClient()方法

protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {
    // 省略代码,timeout设置
    return httpClient;
}

实际上就是直接返回httpClient对象,那么httpClient是在哪里设置的呢?

public NettyRoutingFilter(HttpClient httpClient,
        ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider,
        HttpClientProperties properties) {
    this.httpClient = httpClient;
    this.headersFiltersProvider = headersFiltersProvider;
    this.properties = properties;
}

可以看到是在生成NettyRoutingFilter对象的时候传入的,那么NettyRoutingFilter对象在哪里生成的呢?

答:在GatewayAutoConfiguration类中生成的,这个类是在引入网关的依赖之后自动引入的。

同样的,HttpClient对象也是在这个类里面生成的。

@Bean
@ConditionalOnMissingBean
public HttpClient gatewayHttpClient(HttpClientProperties properties,
        List<HttpClientCustomizer> customizers) {
    // 配置连接池
    HttpClientProperties.Pool pool = properties.getPool();
    ConnectionProvider connectionProvider;
    if (pool.getType() == DISABLED) {
        connectionProvider = ConnectionProvider.newConnection();
    }
    else if (pool.getType() == FIXED) {
        connectionProvider = ConnectionProvider.fixed(pool.getName(),
                pool.getMaxConnections(), pool.getAcquireTimeout(),
                pool.getMaxIdleTime(), pool.getMaxLifeTime());
    }
    else {
        connectionProvider = ConnectionProvider.elastic(pool.getName(),
                pool.getMaxIdleTime(), pool.getMaxLifeTime());
    }
    HttpClient httpClient = HttpClient.create(connectionProvider)
            // TODO: move customizations to HttpClientCustomizers
            .httpResponseDecoder(spec -> {
                // 省略代码
                return spec;
            }).tcpConfiguration(tcpClient -> {
                // 省略代码
                return tcpClient;
            });
    // 省略代码  ssl设置
    return httpClient;
}

从上面代码可以看出,HttpClient对象自带一个连接池,生成Httpclient的时候首先会配置这个连接池。

可以看到HttpClient提供的连接池的类型:

public enum PoolType {
    /**
     * 弹性的连接池
     */
    ELASTIC,
    /**
     * 固定长度的连接池
     */
    FIXED,
    /**
     * 不使用连接池
     */
    DISABLED
}

默认使用的是第一种 弹性的连接池

private PoolType type = PoolType.ELASTIC;
connectionProvider = ConnectionProvider.elastic(pool.getName(),
            pool.getMaxIdleTime(), pool.getMaxLifeTime());
static ConnectionProvider elastic(String name, @Nullable Duration maxIdleTime, @Nullable Duration maxLifeTime) {
    return builder(name).maxConnections(Integer.MAX_VALUE) //设置最大连接数无限制
                        .pendingAcquireTimeout(Duration.ofMillis(0))
                        .pendingAcquireMaxCount(-1)
                        .maxIdleTime(maxIdleTime)
                        .maxLifeTime(maxLifeTime)
                        .build();
}
static Builder builder(String name) {
    return new Builder(name);
}

在Builder()构造函数中会调用ConnectionPoolSpec()方法:

private ConnectionPoolSpec() {
    if (DEFAULT_POOL_MAX_IDLE_TIME > -1) {
        maxIdleTime(Duration.ofMillis(DEFAULT_POOL_MAX_IDLE_TIME));
    }
    // 支持不同类型的链接保存方式
    // lifo和fifo
    if(LEASING_STRATEGY_LIFO.equals(DEFAULT_POOL_LEASING_STRATEGY)) {
        lifo();
    }
    else {
        fifo();
    }
}

从代码里面可以看到,httpclient自带的连接池还支持两种连接获取方式: lifo(后进先出)和fifo(先进先出) 默认使用的是fifo。

先总结一下,在引入网关的依赖之后,会自动创建一个HttpClient对象,而这个HttpClient对象自带一个连接池,且默认是Elastic连接池,即连接池内的数量会弹性发生变化。 连接池内部默认采用fifo的方式来保存以及使用连接

现在重新回到NettyRoutingFilter.filter()方法中来看下:

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    // ... 一些省略代码
    // 获取httpclient
    Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
            .headers(headers -> {
                headers.add(httpHeaders);
                // Will either be set below, or later by Netty
                headers.remove(HttpHeaders.HOST);
                if (preserveHost) {
                    String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                    headers.add(HttpHeaders.HOST, host);
                }
            }).request(method).uri(url).send((req, nettyOutbound) -> {
                if (log.isTraceEnabled()) {
                    nettyOutbound
                            .withConnection(connection -> log.trace("outbound route: "
                                    + connection.channel().id().asShortText()
                                    + ", inbound: " + exchange.getLogPrefix()));
                }
                // 发送请求
                return nettyOutbound.send(request.getBody().map(this::getByteBuf));
            }).responseConnection((res, connection) -> {
                // 省略代码,下游请求返回之后做的一些处理
                return Mono.just(res);
            });
    Duration responseTimeout = getResponseTimeout(route);
    // 一些省略代码
    return responseFlux.then(chain.filter(exchange));
}

responseConnection()方法中会发起连接操作:

final TcpClient cachedConfiguration;
@SuppressWarnings("unchecked")
Mono<HttpClientOperations> connect() {
    return (Mono<HttpClientOperations>)cachedConfiguration.connect();
}
@Override
public <V> Flux<V> responseConnection(BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher<V>> receiver) {
    return connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp)));
}

调用的是TcpClient对象的connect()方法,一步步断点下去发现最终调用的是TcpClientConnect.connect()方法.

final ConnectionProvider provider;
@Override
public Mono<? extends Connection> connect(Bootstrap b) {
    if (b.config()
         .group() == null) {
        TcpClientRunOn.configure(b,
                LoopResources.DEFAULT_NATIVE,
                TcpResources.get());
    }
    // 这里的provider实际上就是前面分析的创建HttpClient的时候生成的ConnectProvider对象
    return provider.acquire(b);
}

从代码实现中可以看到,实际上TcpClienConnect是直接从ConnectionProvider获取连接。

看到这里,本文一开始的问题其实已经有解答了:

默认情况下(除非显示设置不使用连接池),网关在把请求转发给下游服务器的时候,是会使用连接池的,而不是每次都重新发起连接。

继续往下分析。

对于Elastic类型的连接池来说,其默认实现为PooledConnectionProvider

// key为远程地址(一般指代一个远程服务),value则对应的ConnectioAllocator
final ConcurrentMap<PoolKey, InstrumentedPool<PooledConnection>> channelPools =
      PlatformDependent.newConcurrentHashMap();
@Override
public Mono<Connection> acquire(Bootstrap b) {
    return Mono.create(sink -> {
        // ...其他省略代码
        SocketAddress remoteAddress = bootstrap.config().remoteAddress();
        PoolKey holder = new PoolKey(remoteAddress, handler != null ? handler.hashCode() : -1);
        // 每个远程地址都可以配置一个PoolFactory,如果没配置则使用默认的PoolFactory
        PoolFactory poolFactory = poolFactoryPerRemoteHost.getOrDefault(remoteAddress, defaultPoolFactory);
        InstrumentedPool<PooledConnection> pool = channelPools.computeIfAbsent(holder, poolKey -> {
            if (log.isDebugEnabled()) {
                log.debug("Creating a new client pool [{}] for [{}]", poolFactory, remoteAddress);
            }
            // newPool是一个连接分配器,实际上就是一个连接池
            InstrumentedPool<PooledConnection> newPool =
                    new PooledConnectionAllocator(bootstrap, poolFactory, opsFactory).pool;
            if (poolFactory.metricsEnabled || BootstrapHandlers.findMetricsSupport(bootstrap) != null) {
                PooledConnectionProviderMetrics.registerMetrics(name,
                        poolKey.hashCode() + "",
                        Metrics.formatSocketAddress(remoteAddress),
                        newPool.metrics());
            }
            return newPool;
        });
        //
        disposableAcquire(new DisposableAcquire(sink, pool, obs, opsFactory, poolFactory.pendingAcquireTimeout, false));
    });
}
static void disposableAcquire(DisposableAcquire disposableAcquire) {
    // accquire一个连接,如果无可用了解则创建,则调用
    Mono<PooledRef<PooledConnection>> mono =
            disposableAcquire.pool.acquire(Duration.ofMillis(disposableAcquire.pendingAcquireTimeout));
    mono.subscribe(disposableAcquire);
}
Publisher<PooledConnection> connectChannel() {
    return Mono.create(sink -> {
        Bootstrap b = bootstrap.clone();
        PooledConnectionInitializer initializer = new PooledConnectionInitializer(sink);
        b.handler(initializer);
        // 创建连接
        ChannelFuture f = b.connect();
        if (f.isDone()) {
            initializer.operationComplete(f);
        } else {
            f.addListener(initializer);
        }
    });
}

从代码里面可以看出,ConnectionProvider对每一个远程地址(即下游服的某一个服务器)都缓存了一个连接分配器(ConnectionAllocator),而这个ConnectionAllocator才是真正的连接池,是Project Reactor项目内部实现的一个连接池,就不从源码角度分析,简单来说,就是请求方获取连接的时候,如果池子里面有空闲连接,则直接用现成连接,如果没有的话,则调用PoolFactory创建新的链接。

网络异常,图片无法展示
|

总结一下:

网关内部维持了一个缓存映射,缓存着下游每一个服务地址(ip:port)对应的连接分配器(ConnectionAllocator),而ConnectionAllocator是一个连接池,内部会保存复用已经生成的连接。

当网关转发请求时确认下游目标服务的地址,即可直接从对应的连接池中取出连接复用。

相关文章
|
7月前
|
存储 弹性计算 缓存
阿里云服务器ECS经济型、通用算力、计算型、通用和内存型选购指南及使用场景分析
本文详细解析阿里云ECS服务器的经济型、通用算力型、计算型、通用型和内存型实例的区别及适用场景,涵盖性能特点、配置比例与实际应用,助你根据业务需求精准选型,提升资源利用率并降低成本。
516 3
|
5月前
|
存储 弹性计算 运维
阿里云服务器全解析:ECS是什么、应用场景、租用流程及优缺点分析
阿里云ECS(Elastic Compute Service)是阿里云提供的高性能、高可用的云计算服务,支持弹性扩展、多样化实例类型和多种计费模式。适用于网站搭建、数据处理、运维测试等多种场景,具备分钟级交付、安全可靠、成本低、易运维等优势,是企业及开发者上云的理想选择。
884 5
|
6月前
|
Java Linux 网络安全
Linux云端服务器上部署Spring Boot应用的教程。
此流程涉及Linux命令行操作、系统服务管理及网络安全知识,需要管理员权限以进行配置和服务管理。务必在一个测试环境中验证所有步骤,确保一切配置正确无误后,再将应用部署到生产环境中。也可以使用如Ansible、Chef等配置管理工具来自动化部署过程,提升效率和可靠性。
693 13
|
9月前
|
存储 人工智能 项目管理
2025年GitHub平台上的十大开源MCP服务器汇总分析
本文深入解析了GitHub上十个代表性MCP(Model Context Protocol)服务器项目,探讨其在连接AI与现实世界中的关键作用。这些服务器实现了AI模型与应用程序、数据库、云存储、项目管理等工具的无缝交互,扩展了AI的应用边界。文中涵盖Airbnb、Supabase、AWS-S3、Kubernetes等领域的MCP实现方案,展示了AI在旅行规划、数据处理、云存储、容器编排等场景中的深度应用。未来,MCP技术将向标准化、安全性及行业定制化方向发展,为AI系统集成提供更强大的支持。
2213 2
2025年GitHub平台上的十大开源MCP服务器汇总分析
|
7月前
|
负载均衡 Java API
基于 Spring Cloud 的微服务架构分析
Spring Cloud 是一个基于 Spring Boot 的微服务框架,提供全套分布式系统解决方案。它整合了 Netflix、Zookeeper 等成熟技术,通过简化配置和开发流程,支持服务发现(Eureka)、负载均衡(Ribbon)、断路器(Hystrix)、API网关(Zuul)、配置管理(Config)等功能。此外,Spring Cloud 还兼容 Nacos、Consul、Etcd 等注册中心,满足不同场景需求。其核心组件如 Feign 和 Stream,进一步增强了服务调用与消息处理能力,为开发者提供了一站式微服务开发工具包。
716 0
|
XML Java 应用服务中间件
Spring Boot 两种部署到服务器的方式
本文介绍了Spring Boot项目的两种部署方式:jar包和war包。Jar包方式使用内置Tomcat,只需配置JDK 1.8及以上环境,通过`nohup java -jar`命令后台运行,并开放服务器端口即可访问。War包则需将项目打包后放入外部Tomcat的webapps目录,修改启动类继承`SpringBootServletInitializer`并调整pom.xml中的打包类型为war,最后启动Tomcat访问应用。两者各有优劣,jar包更简单便捷,而war包适合传统部署场景。需要注意的是,war包部署时,内置Tomcat的端口配置不会生效。
2884 17
Spring Boot 两种部署到服务器的方式
|
11月前
|
前端开发 Java Nacos
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
本文介绍了如何使用Spring Cloud Alibaba 2023.0.0.0技术栈构建微服务网关,以应对微服务架构中流量治理与安全管控的复杂性。通过一个包含鉴权服务、文件服务和主服务的项目,详细讲解了网关的整合与功能开发。首先,通过统一路由配置,将所有请求集中到网关进行管理;其次,实现了限流防刷功能,防止恶意刷接口;最后,添加了登录鉴权机制,确保用户身份验证。整个过程结合Nacos注册中心,确保服务注册与配置管理的高效性。通过这些实践,帮助开发者更好地理解和应用微服务网关。
2024 0
🛡️Spring Boot 3 整合 Spring Cloud Gateway 工程实践
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
336 6
|
监控 IDE Java
如何在无需重新启动服务器的情况下在 Spring Boot 上重新加载我的更改?
如何在无需重新启动服务器的情况下在 Spring Boot 上重新加载我的更改?
1339 8
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
235 5