Spring GateWay : 网关的转发细节

简介: 请求转发是 Gateway 最核心的功能之一 , 它涉及到三个主要的概念 :Route(路由): 路由是网关的基本单元,由ID、URI、一组Predicate、一组Filter组成,如果 Predicate 匹配 True ,则进行转发Predicate(谓语、断言): 路由转发的判断条件,这是一个 Java 8函数断言, 输入类型是 Spring Framework ServerWebExchange , 目前SpringCloud Gateway支持多种方式,常见如:Path、Query、Method、Header等,写法必须遵循 key=vlue的形式

一 . 前言

文档目的

  • 梳理 Gateway 生产中转发请求的细节
  • 梳理 转发的定制点

知识补充

请求转发是 Gateway 最核心的功能之一 , 它涉及到三个主要的概念 :

Route(路由): 路由是网关的基本单元,由ID、URI、一组Predicate、一组Filter组成,如果 Predicate 匹配 True ,则进行转发

Predicate(谓语、断言): 路由转发的判断条件,这是一个 Java 8函数断言, 输入类型是 Spring Framework ServerWebExchange , 目前SpringCloud Gateway支持多种方式,常见如:Path、Query、Method、Header等,写法必须遵循 key=vlue的形式

Filter(过滤器): 过滤器是路由转发请求时所经过的过滤逻辑,使用特定工厂构建的 GatewayFilter 实例 , 可用于修改请求、响应内容

二 . 简单使用

2.1 predicates 汇总

// 
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
// 
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
// 
- Cookie=chocolate, ch.p
复制代码

2.2 Mono 和 Flux

Mono 和 Flux 是贯穿了整个流程的核心对象 ,根据 reactive-streams 规范,发布服务器提供了数量可能无限的有序元素,并根据从其订阅服务器接收到的需求发布这些元素。Reactor-core 有一组词 Publisher 接口的实现。我们将要创建序列的两个重要实现是 Mono 和 Flux。

  • Flux 表示的是包含 0 到 N 个元素的异步序列
  • Mono 表示的是包含 0 或 1 个元素的异步序列

> SpringGateway 是使用 webflux 作为底层调用框架的 , 其中涉及到 mono 和 Flux 对象

> 该序列中可以包含 3 种通知 :

  • 正常的包含元素的消息
  • 序列结束的消息
  • 序列出错的消息

Flux

  • Flux是一个标准Publisher,表示0到N个发射项的异步序列,选择性地以完成或错误信号终止。与Reactive Streams规范中一样,这三种类型的信号转换为对下游订阅者的onNext、onComplete或onError方法的调用。

Mono

  • Mono 是 Publisher 的另一个实现。它最多发出一个条目,然后(可选)以 onComplete 信号或 onError 信号终止 , Mono 在本质上也是异步的
  • 它只提供了可用于Flux的操作符的子集,并且一些操作符(特别是那些将Mono与另一个发布者组合的操作符)切换到Flux。
  • 例如,Mono#concatWith(Publisher)返回一个Flux ,而Mono#then(Mono)则返回另一个Mono。

常见的方法如下 :

  • create : 以编程方式创建具有多次发射能力的Flux,
  • empty : 发出0元素或返回空 Flux < t >
  • just : 创建一个基础
  • error : 创建一个Flux,它在订阅之后立即以指定的错误终止

PS : 这一块就不深入看了 , 先看完 Gateway 的主流程

三 . 拦截深入

3.1 原理图

首先来看一下 SpringGateway 的原理图

四 . 调用的入口

4.1 调用流程

  • Step 1 : HttpWebHandlerAdapter # handle : 构建 ServerWebExchange , 发起 Handler 处理
  • Step 2 : DispatcherHandler # handle : 发起请求处理
  • Step 3 : RoutePredicateHandlerMapping # getHandlerInternal : route 判断处理

4.2. getHandlerInternal 逻辑

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
   // don't handle requests on management port if set and different than server port
   if (this.managementPortType == DIFFERENT && this.managementPort != null
         && exchange.getRequest().getURI().getPort() == this.managementPort) {
      return Mono.empty();
   }
   exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
   return lookupRoute(exchange)
         // .log("route-predicate-handler-mapping", Level.FINER) //name this
         .flatMap((Function<Route, Mono<?>>) r -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isDebugEnabled()) {
               logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
            }
            exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
            return Mono.just(webHandler);
         }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isTraceEnabled()) {
               logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
            }
         })));
}
复制代码

3.2. lookupRoute

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
   return this.routeLocator.getRoutes()
         // individually filter routes so that filterWhen error delaying is not a
         // problem
         .concatMap(route -> Mono.just(route).filterWhen(r -> {
            // add the current route we are testing
            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
            return r.getPredicate().apply(exchange);
         })
               // instead of immediately stopping main flux due to error, log and
               // swallow it
               .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
               .onErrorResume(e -> Mono.empty()))
         // .defaultIfEmpty() put a static Route not found
         // or .switchIfEmpty()
         // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
         .next()
         // TODO: error handling
         .map(route -> {
            validateRoute(route, exchange);
            return route;
         });
}
复制代码

会遍历所有的 route

五. 发送的流程

5.1 FilteringWebHandler 体系

此处的 webHandler 为 FilteringWebHandler 对象 , 来看一下这个对象的作用

这里涉及到以下的 Filter :

  • C- ForwardPathFilter :
  • C- ForwardRoutingFilter : 用来做本地forward的
  • C- GatewayMetricsFilter : 与 Prometheus 整合,从而创建一个 Grafana dashboard
  • C- LoadBalancerClientFilter : 用来整合Ribbon的 , 先获取微服务的名称,然后再通过Ribbon获取实际的调用地址
  • C- NettyRoutingFilter : http 或 https ,使用 Netty 的 HttpClient 向下游的服务发送代理请求
  • C- NettyWriteResponseFilter : 用于将代理响应写回网关的客户端侧,所以该过滤器会在所有其他过滤器执行完成后才执行
  • C- OrderedGatewayFilter :
  • C- RouteToRequestUrlFilter : 将从request里获取的 原始url转换成Gateway进行请求转发时所使用的url
  • C- WebClientHttpRoutingFilter :
  • C- WebClientWriteResponseFilter :
  • C- WebsocketRoutingFilter : ws 或者 wss,那么该Filter会使用 Spring Web Socket 将 Websocket 请求转发到下游
  • C- WeightCalculatorWebFilter :

可以参考 -> Spring Cloud Gateway 内置的全局过滤器

调用逻辑1 : FilteringWebHandler 管理

该对象中存在一个内部类 DefaultGatewayFilterChain , 该类为 Filter 过滤链

rivate static class DefaultGatewayFilterChain implements GatewayFilterChain {
   // 当前 Filter 链索引 
   private final int index;
   // Filter 集合 
   private final List<GatewayFilter> filters;
   DefaultGatewayFilterChain(List<GatewayFilter> filters) {
      this.filters = filters;
      this.index = 0;
   }
   private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
      this.filters = parent.getFilters();
      this.index = index;
   }
   public List<GatewayFilter> getFilters() {
      return filters;
   }
   @Override
   public Mono<Void> filter(ServerWebExchange exchange) {
      return Mono.defer(() -> {
         if (this.index < filters.size()) {
            // 逐个 Filter 过滤调用 
            GatewayFilter filter = filters.get(this.index);
            DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
                  this.index + 1);
            return filter.filter(exchange, chain);
         }
         else {
            return Mono.empty(); // complete
         }
      });
   }
}
复制代码

调用流程 3 : Filter 过滤

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 通常判断部分条件  , 如果该 Filter 不符合 , 则跳过该 Filter
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }
复制代码

5.2 发送的主体

核心的发送 Filter 是 NettyRoutingFilter, 下面只关注这个 Filter 的相关逻辑 :

C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   // 请求 URL : http://httpbin.org:80/get
   URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    // 协议类型 : http
   String scheme = requestUrl.getScheme();
   // Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }
   // Step 2 : 标识 Routed 已处理
   setAlreadyRouted(exchange);
   // Step 3 : 获取 Request 请求对象 , 这个是外部请求的对象
   ServerHttpRequest request = exchange.getRequest();
   // Step 4 : 获取 Method 类型 (get/post...)
   final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
   final String url = requestUrl.toString();
   // Step 5 : 对 Header 进行处理 , 需要转发过去
   HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
   final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
   filtered.forEach(httpHeaders::set);
   // -> Transfer-Encoding
   String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
   boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
   // -> preserveHostHeader
   boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
   // 通过 netty httpClient 发起转发请求 , PS !!! 这里是异步的
   Flux<HttpClientResponse> responseFlux = this.httpClient
         .chunkedTransfer(chunkedTransfer).request(method).uri(url)
         .send((req, nettyOutbound) -> {
            // Step 6 : 转发 Header 
            req.headers(httpHeaders);
            // => 是否需要记录之前的 host
            if (preserveHost) {
               String host = request.getHeaders().getFirst(HttpHeaders.HOST);
               req.header(HttpHeaders.HOST, host);
            }
            // Step 7 : 真正发起请求
            return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
                  .send(request.getBody()
                        .map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
                              .getNativeBuffer()));
         }).responseConnection((res, connection) -> {
            // Step 8 : 请求完成 , 获取 response
            ServerHttpResponse response = exchange.getResponse();
            // Step 9 : 转发headers 和 status 等属性
            HttpHeaders headers = new HttpHeaders();
            res.responseHeaders().forEach(
                  entry -> headers.add(entry.getKey(), entry.getValue()));
            // => String CONTENT_TYPE = "Content-Type" 
            // => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
               exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
                     contentTypeValue);
            }
            // 转发状态 , 存在往 GatewayResponse 设置状态
            HttpStatus status = HttpStatus.resolve(res.status().code());
            if (status != null) {
               response.setStatusCode(status);
            }
            else if (response instanceof AbstractServerHttpResponse) {
               ((AbstractServerHttpResponse) response)
                     .setStatusCodeValue(res.status().code());
            }
            else {
               throw new IllegalStateException(
                     "Unable to set status code on response: "
                           + res.status().code() + ", "
                           + response.getClass());
            }
            // 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                  getHeadersFilters(), headers, exchange, Type.RESPONSE);
            //  String TRANSFER_ENCODING = "Transfer-Encoding"
            //  String CONTENT_LENGTH = "Content-Length"
            if (!filteredResponseHeaders
                  .containsKey(HttpHeaders.TRANSFER_ENCODING)
                  && filteredResponseHeaders
                        .containsKey(HttpHeaders.CONTENT_LENGTH)) {
               // content-length 存在需要去掉 Transfer-Encoding
               response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }
            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
                  filteredResponseHeaders.keySet());
            response.getHeaders().putAll(filteredResponseHeaders);
            // 延迟提交响应,直到所有路由过滤器都运行
            // 将客户端响应作为ServerWebExchange属性,稍后写入响应NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
            return Mono.just(res);
         });
   if (properties.getResponseTimeout() != null) {
      // 超时异常处理
      responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
            Mono.error(new TimeoutException("Response took longer than timeout: "
                  + properties.getResponseTimeout())))
            .onErrorMap(TimeoutException.class,
                  // GATEWAY_TIMEOUT(504, "Gateway Timeout")
                  th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
                        th.getMessage(), th));
   }
   return responseFlux.then(chain.filter(exchange));
}
复制代码

5.3 返回 Response

C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   return chain.filter(exchange).then(Mono.defer(() -> {
      // Step 1 : 获取 GatewayRequest
      Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
      // 连接不存在直接返回空 
      if (connection == null) {
         return Mono.empty();
      }
      // Step 2 : 获取 GatewayResponse
      ServerHttpResponse response = exchange.getResponse();
      NettyDataBufferFactory factory = (NettyDataBufferFactory) response
            .bufferFactory();
      // 此处主要包含一个 byteBufflux
      final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
            .map(factory::wrap);
      // 媒体类型  
      MediaType contentType = null;
      try {
         contentType = response.getHeaders().getContentType();
      }
      catch (Exception e) {
         log.trace("invalid media type", e);
      }
      return (isStreamingMediaType(contentType)
            ? response.writeAndFlushWith(body.map(Flux::just))
            : response.writeWith(body));
   }));
}
复制代码

总结

由于 netty 的底层了解得还不是很清楚 , 对于一些调用过程没办法输出数据看 , 这篇文章心里也不是很有底 , 后续深入后再来补充细节

本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。
相关文章
|
14天前
|
JavaScript Java Kotlin
深入 Spring Cloud Gateway 过滤器
Spring Cloud Gateway 是新一代微服务网关框架,支持多种过滤器实现。本文详解了 `GlobalFilter`、`GatewayFilter` 和 `AbstractGatewayFilterFactory` 三种过滤器的实现方式及其应用场景,帮助开发者高效利用这些工具进行网关开发。
|
1月前
|
负载均衡 Java 应用服务中间件
Gateway服务网关
Gateway服务网关
52 1
Gateway服务网关
|
22天前
|
负载均衡 Java API
项目中用的网关Gateway及SpringCloud
Spring Cloud Gateway 是一个功能强大、灵活易用的API网关解决方案。通过配置路由、过滤器、熔断器和限流等功能,可以有效地管理和保护微服务。本文详细介绍了Spring Cloud Gateway的基本概念、配置方法和实际应用,希望能帮助开发者更好地理解和使用这一工具。通过合理使用Spring Cloud Gateway,可以显著提升微服务架构的健壮性和可维护性。
28 0
|
2月前
|
XML Java 数据格式
如何使用 Spring Cloud 实现网关
如何使用 Spring Cloud 实现网关
49 3
|
3月前
|
Java 开发者 Spring
Spring Cloud Gateway 中,过滤器的分类有哪些?
Spring Cloud Gateway 中,过滤器的分类有哪些?
77 3
|
3月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
160 5
|
2月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
53 0
|
3月前
|
安全 Java 开发者
强大!Spring Cloud Gateway新特性及高级开发技巧
在微服务架构日益盛行的今天,网关作为微服务架构中的关键组件,承担着路由、安全、监控、限流等多重职责。Spring Cloud Gateway作为新一代的微服务网关,凭借其基于Spring Framework 5、Project Reactor和Spring Boot 2.0的强大技术栈,正逐步成为业界的主流选择。本文将深入探讨Spring Cloud Gateway的新特性及高级开发技巧,助力开发者更好地掌握这一强大的网关工具。
269 6
|
4月前
|
Java API 微服务
服务网关Gateway
该博客文章详细介绍了Spring Cloud Gateway的使用方法和概念。文章首先阐述了API网关在微服务架构中的重要性,解释了客户端直接与微服务通信可能带来的问题。接着,文章通过具体的示例代码,展示了如何在Spring Cloud Gateway中添加依赖、编写路由规则,并对路由规则中的基本概念如Route、Predicate和Filter进行了详细解释。最后,文章还提供了路由规则的测试方法。
服务网关Gateway
|
4月前
|
Java 应用服务中间件 nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
下一篇
DataWorks