本文为博主原创,转载请注明出处:
引用 的 spring cloud gateway 的版本为 2.2.5 ;
SpringCloud gateway 在实现服务路由并请求的具体过程是在 org.springframework.cloud.gateway.filter.NettyRoutingFilter 的过滤器中,该过滤器封装了具体的请求参数,以及根据路由规则请求的对应服务,然后根据 HttpClient 进行微服务之间的请求; 该 httpClient 类是 用netty 封装的 客户端,其包路径为 : reactor.netty.http.client.HttpClient ;
查看 NettyRoutingFilter 中的 filter 实现过程:
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equals(scheme) || "https".equals(scheme))) { ServerWebExchangeUtils.setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); String url = requestUrl.toASCIIString(); HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange); DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false); Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); Flux<HttpClientResponse> responseFlux = ((RequestSender)this.getHttpClient(route, exchange).headers((headers) -> { headers.add(httpHeaders); headers.remove("Host"); if (preserveHost) { String host = request.getHeaders().getFirst("Host"); headers.add("Host", host); } }).request(method).uri(url)).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound.withConnection((connection) -> { log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()); }); } return nettyOutbound.send(request.getBody().map(this::getByteBuf)); }).responseConnection((res, connection) -> { exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach((entry) -> { headers.add((String)entry.getKey(), (String)entry.getValue()); }); String contentTypeValue = headers.getFirst("Content-Type"); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put("original_response_content_type", contentTypeValue); } this.setResponseStatus(res, response); HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) { response.getHeaders().remove("Transfer-Encoding"); } exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); return Mono.just(res); }); Duration responseTimeout = this.getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class, (th) -> { return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th); }); } return responseFlux.then(chain.filter(exchange)); } else { return chain.filter(exchange); } }
该方法中 有一个 getHttpClient 方法获取 httpClient 客户端实例的过程,由于在 GatewayAutoConfiguration 中 定义了 springCloud gateway 使用的 httpclient 实例,其声明并自动加载的代码如下:
@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({HttpClient.class}) protected static class NettyConfiguration { protected final Log logger = LogFactory.getLog(this.getClass()); protected NettyConfiguration() { } @Bean @ConditionalOnProperty( name = {"spring.cloud.gateway.httpserver.wiretap"} ) public NettyWebServerFactoryCustomizer nettyServerWiretapCustomizer(Environment environment, ServerProperties serverProperties) { return new NettyWebServerFactoryCustomizer(environment, serverProperties) { public void customize(NettyReactiveWebServerFactory factory) { factory.addServerCustomizers(new NettyServerCustomizer[]{(httpServer) -> { return httpServer.wiretap(true); }}); super.customize(factory); } }; } @Bean @ConditionalOnMissingBean public HttpClient gatewayHttpClient(HttpClientProperties properties, List<HttpClientCustomizer> customizers) { Pool pool = properties.getPool(); ConnectionProvider connectionProvider; if (pool.getType() == PoolType.DISABLED) { connectionProvider = ConnectionProvider.newConnection(); } else if (pool.getType() == PoolType.FIXED) { connectionProvider = ConnectionProvider.fixed(pool.getName(), pool.getMaxConnections(), pool.getAcquireTimeout(), pool.getMaxIdleTime(), pool.getMaxLifeTime()); } else { connectionProvider = ConnectionProvider.elastic(pool.getName(), pool.getMaxIdleTime(), pool.getMaxLifeTime()); } HttpClient httpClient = HttpClient.create(connectionProvider).httpResponseDecoder((spec) -> { if (properties.getMaxHeaderSize() != null) { spec.maxHeaderSize((int)properties.getMaxHeaderSize().toBytes()); } if (properties.getMaxInitialLineLength() != null) { spec.maxInitialLineLength((int)properties.getMaxInitialLineLength().toBytes()); } return spec; }).tcpConfiguration((tcpClient) -> { if (properties.getConnectTimeout() != null) { tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, properties.getConnectTimeout()); } Proxy proxy = properties.getProxy(); if (StringUtils.hasText(proxy.getHost())) { tcpClient = tcpClient.proxy((proxySpec) -> { Builder builder = proxySpec.type(reactor.netty.tcp.ProxyProvider.Proxy.HTTP).host(proxy.getHost()); PropertyMapper map = PropertyMapper.get(); proxy.getClass(); map.from(proxy::getPort).whenNonNull().to(builder::port); proxy.getClass(); map.from(proxy::getUsername).whenHasText().to(builder::username); proxy.getClass(); map.from(proxy::getPassword).whenHasText().to((password) -> { builder.password((s) -> { return password; }); }); proxy.getClass(); map.from(proxy::getNonProxyHostsPattern).whenHasText().to(builder::nonProxyHosts); }); } return tcpClient; }); Ssl ssl = properties.getSsl(); if (ssl.getKeyStore() != null && ssl.getKeyStore().length() > 0 || ssl.getTrustedX509CertificatesForTrustManager().length > 0 || ssl.isUseInsecureTrustManager()) { httpClient = httpClient.secure((sslContextSpec) -> { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); X509Certificate[] trustedX509Certificates = ssl.getTrustedX509CertificatesForTrustManager(); if (trustedX509Certificates.length > 0) { sslContextBuilder = sslContextBuilder.trustManager(trustedX509Certificates); } else if (ssl.isUseInsecureTrustManager()) { sslContextBuilder = sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); } try { sslContextBuilder = sslContextBuilder.keyManager(ssl.getKeyManagerFactory()); } catch (Exception var6) { this.logger.error(var6); } sslContextSpec.sslContext(sslContextBuilder).defaultConfiguration(ssl.getDefaultConfigurationType()).handshakeTimeout(ssl.getHandshakeTimeout()).closeNotifyFlushTimeout(ssl.getCloseNotifyFlushTimeout()).closeNotifyReadTimeout(ssl.getCloseNotifyReadTimeout()); }); } if (properties.isWiretap()) { httpClient = httpClient.wiretap(true); } if (!CollectionUtils.isEmpty(customizers)) { customizers.sort(AnnotationAwareOrderComparator.INSTANCE); HttpClientCustomizer customizer; for(Iterator var7 = customizers.iterator(); var7.hasNext(); httpClient = customizer.customize(httpClient)) { customizer = (HttpClientCustomizer)var7.next(); } } return httpClient; } @Bean public HttpClientProperties httpClientProperties() { return new HttpClientProperties(); } @Bean public NettyRoutingFilter routingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFilters, HttpClientProperties properties) { return new NettyRoutingFilter(httpClient, headersFilters, properties); } @Bean public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) { return new NettyWriteResponseFilter(properties.getStreamingMediaTypes()); } @Bean public ReactorNettyWebSocketClient reactorNettyWebSocketClient(HttpClientProperties properties, HttpClient httpClient) { ReactorNettyWebSocketClient webSocketClient = new ReactorNettyWebSocketClient(httpClient); if (properties.getWebsocket().getMaxFramePayloadLength() != null) { webSocketClient.setMaxFramePayloadLength(properties.getWebsocket().getMaxFramePayloadLength()); } webSocketClient.setHandlePing(properties.getWebsocket().isProxyPing()); return webSocketClient; } @Bean public ReactorNettyRequestUpgradeStrategy reactorNettyRequestUpgradeStrategy(HttpClientProperties httpClientProperties) { ReactorNettyRequestUpgradeStrategy requestUpgradeStrategy = new ReactorNettyRequestUpgradeStrategy(); Websocket websocket = httpClientProperties.getWebsocket(); PropertyMapper map = PropertyMapper.get(); websocket.getClass(); map.from(websocket::getMaxFramePayloadLength).whenNonNull().to(requestUpgradeStrategy::setMaxFramePayloadLength); websocket.getClass(); map.from(websocket::isProxyPing).to(requestUpgradeStrategy::setHandlePing); return requestUpgradeStrategy; } }
上面 代码中的 gatewayHttpClient 为 spring cloud gateway 使用的 HttpClient 实例,在spring cloud gateway 进行服务请求时,会自动配置使用该 实例。
如果需要自定义的 HttpClient 实例,如在 httpClient 中自定义 ip 白名单校验,https 请求证书预置,或是添加特殊认证请求头等,这种场景下需要在代码中显示的定义 gatewayHttpClient 实例,代码如下:
@Configuration public class GatewayAutoConfiguration { @Bean @ConditionalOnMissingBean public HttpClient gatewayHttpClient(HttpClientProperties properties, List<HttpClientCustomizer> customizers) { Pool pool = properties.getPool(); ConnectionProvider connectionProvider; if (pool.getType() == PoolType.DISABLED) { connectionProvider = ConnectionProvider.newConnection(); } else if (pool.getType() == PoolType.FIXED) { connectionProvider = ConnectionProvider.fixed(pool.getName(), pool.getMaxConnections(), pool.getAcquireTimeout(), pool.getMaxIdleTime(), pool.getMaxLifeTime()); } else { connectionProvider = ConnectionProvider.elastic(pool.getName(), pool.getMaxIdleTime(), pool.getMaxLifeTime()); } HttpClient httpClient = HttpClient.create(connectionProvider).httpResponseDecoder((spec) -> { if (properties.getMaxHeaderSize() != null) { spec.maxHeaderSize((int)properties.getMaxHeaderSize().toBytes()); } if (properties.getMaxInitialLineLength() != null) { spec.maxInitialLineLength((int)properties.getMaxInitialLineLength().toBytes()); } return spec; }).tcpConfiguration((tcpClient) -> { if (properties.getConnectTimeout() != null) { tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, properties.getConnectTimeout()); } Proxy proxy = properties.getProxy(); if (StringUtils.hasText(proxy.getHost())) { tcpClient = tcpClient.proxy((proxySpec) -> { Builder builder = proxySpec.type(reactor.netty.tcp.ProxyProvider.Proxy.HTTP).host(proxy.getHost()); PropertyMapper map = PropertyMapper.get(); proxy.getClass(); map.from(proxy::getPort).whenNonNull().to(builder::port); proxy.getClass(); map.from(proxy::getUsername).whenHasText().to(builder::username); proxy.getClass(); map.from(proxy::getPassword).whenHasText().to((password) -> { builder.password((s) -> { return password; }); }); proxy.getClass(); map.from(proxy::getNonProxyHostsPattern).whenHasText().to(builder::nonProxyHosts); }); } return tcpClient; }); Ssl ssl = properties.getSsl(); if (ssl.getKeyStore() != null && ssl.getKeyStore().length() > 0 || ssl.getTrustedX509CertificatesForTrustManager().length > 0 || ssl.isUseInsecureTrustManager()) { httpClient = httpClient.secure((sslContextSpec) -> { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); X509Certificate[] trustedX509Certificates = ssl.getTrustedX509CertificatesForTrustManager(); if (trustedX509Certificates.length > 0) { sslContextBuilder = sslContextBuilder.trustManager(trustedX509Certificates); } else if (ssl.isUseInsecureTrustManager()) { sslContextBuilder = sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); } try { sslContextBuilder = sslContextBuilder.keyManager(ssl.getKeyManagerFactory()); } catch (Exception var6) { this.logger.error(var6); } sslContextSpec.sslContext(sslContextBuilder).defaultConfiguration(ssl.getDefaultConfigurationType()).handshakeTimeout(ssl.getHandshakeTimeout()).closeNotifyFlushTimeout(ssl.getCloseNotifyFlushTimeout()).closeNotifyReadTimeout(ssl.getCloseNotifyReadTimeout()); }); } if (properties.isWiretap()) { httpClient = httpClient.wiretap(true); } if (!CollectionUtils.isEmpty(customizers)) { customizers.sort(AnnotationAwareOrderComparator.INSTANCE); HttpClientCustomizer customizer; for(Iterator var7 = customizers.iterator(); var7.hasNext(); httpClient = customizer.customize(httpClient)) { customizer = (HttpClientCustomizer)var7.next(); } } return httpClient; } }
这样服务在启动的时候就会优先加载自定的 httpClient 实例。
标签: spring cloud