一 . 前言
文档目的
- 梳理 Gateway 生产中转发请求的细节
- 梳理 转发的定制点
知识补充
请求转发是 Gateway 最核心的功能之一 , 它涉及到三个主要的概念 :
Route(路由): 路由是网关的基本单元,由ID、URI、一组Predicate、一组Filter组成,如果 Predicate 匹配 True ,则进行转发
Predicate(谓语、断言): 路由转发的判断条件,这是一个 Java 8函数断言, 输入类型是 Spring Framework ServerWebExchange , 目前SpringCloud Gateway支持多种方式,常见如:Path、Query、Method、Header等,写法必须遵循 key=vlue的形式
Filter(过滤器): 过滤器是路由转发请求时所经过的过滤逻辑,使用特定工厂构建的 GatewayFilter 实例 , 可用于修改请求、响应内容
二 . 简单使用
2.1 predicates 汇总
// - After=2017-01-20T17:42:47.789-07:00[America/Denver] // - Before=2017-01-20T17:42:47.789-07:00[America/Denver] // - Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver] // - Cookie=chocolate, ch.p 复制代码
2.2 Mono 和 Flux
Mono 和 Flux 是贯穿了整个流程的核心对象 ,根据 reactive-streams 规范,发布服务器提供了数量可能无限的有序元素,并根据从其订阅服务器接收到的需求发布这些元素。Reactor-core 有一组词 Publisher 接口的实现。我们将要创建序列的两个重要实现是 Mono 和 Flux。
- Flux 表示的是包含 0 到 N 个元素的异步序列
- Mono 表示的是包含 0 或 1 个元素的异步序列
> SpringGateway 是使用 webflux 作为底层调用框架的 , 其中涉及到 mono 和 Flux 对象
> 该序列中可以包含 3 种通知 :
- 正常的包含元素的消息
- 序列结束的消息
- 序列出错的消息
Flux
- Flux是一个标准Publisher,表示0到N个发射项的异步序列,选择性地以完成或错误信号终止。与Reactive Streams规范中一样,这三种类型的信号转换为对下游订阅者的onNext、onComplete或onError方法的调用。
Mono
- Mono 是 Publisher 的另一个实现。它最多发出一个条目,然后(可选)以 onComplete 信号或 onError 信号终止 , Mono 在本质上也是异步的
- 它只提供了可用于Flux的操作符的子集,并且一些操作符(特别是那些将Mono与另一个发布者组合的操作符)切换到Flux。
- 例如,Mono#concatWith(Publisher)返回一个Flux ,而Mono#then(Mono)则返回另一个Mono。
常见的方法如下 :
- create : 以编程方式创建具有多次发射能力的Flux,
- empty : 发出0元素或返回空 Flux < t >
- just : 创建一个基础
- error : 创建一个Flux,它在订阅之后立即以指定的错误终止
PS : 这一块就不深入看了 , 先看完 Gateway 的主流程
三 . 拦截深入
3.1 原理图
首先来看一下 SpringGateway 的原理图
四 . 调用的入口
4.1 调用流程
- Step 1 : HttpWebHandlerAdapter # handle : 构建 ServerWebExchange , 发起 Handler 处理
- Step 2 : DispatcherHandler # handle : 发起请求处理
- Step 3 : RoutePredicateHandlerMapping # getHandlerInternal : route 判断处理
4.2. getHandlerInternal 逻辑
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // don't handle requests on management port if set and different than server port if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))); } 复制代码
3.2. lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() // individually filter routes so that filterWhen error delaying is not a // problem .concatMap(route -> Mono.just(route).filterWhen(r -> { // add the current route we are testing exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) // instead of immediately stopping main flux due to error, log and // swallow it .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.<Route>empty().log("noroute")) .next() // TODO: error handling .map(route -> { validateRoute(route, exchange); return route; }); } 复制代码
会遍历所有的 route
五. 发送的流程
5.1 FilteringWebHandler 体系
此处的 webHandler 为 FilteringWebHandler 对象 , 来看一下这个对象的作用
这里涉及到以下的 Filter :
- C- ForwardPathFilter :
- C- ForwardRoutingFilter : 用来做本地forward的
- C- GatewayMetricsFilter : 与 Prometheus 整合,从而创建一个 Grafana dashboard
- C- LoadBalancerClientFilter : 用来整合Ribbon的 , 先获取微服务的名称,然后再通过Ribbon获取实际的调用地址
- C- NettyRoutingFilter : http 或 https ,使用 Netty 的 HttpClient 向下游的服务发送代理请求
- C- NettyWriteResponseFilter : 用于将代理响应写回网关的客户端侧,所以该过滤器会在所有其他过滤器执行完成后才执行
- C- OrderedGatewayFilter :
- C- RouteToRequestUrlFilter : 将从request里获取的 原始url转换成Gateway进行请求转发时所使用的url
- C- WebClientHttpRoutingFilter :
- C- WebClientWriteResponseFilter :
- C- WebsocketRoutingFilter : ws 或者 wss,那么该Filter会使用 Spring Web Socket 将 Websocket 请求转发到下游
- C- WeightCalculatorWebFilter :
可以参考 -> Spring Cloud Gateway 内置的全局过滤器
调用逻辑1 : FilteringWebHandler 管理
该对象中存在一个内部类 DefaultGatewayFilterChain , 该类为 Filter 过滤链
rivate static class DefaultGatewayFilterChain implements GatewayFilterChain { // 当前 Filter 链索引 private final int index; // Filter 集合 private final List<GatewayFilter> filters; DefaultGatewayFilterChain(List<GatewayFilter> filters) { this.filters = filters; this.index = 0; } private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) { this.filters = parent.getFilters(); this.index = index; } public List<GatewayFilter> getFilters() { return filters; } @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < filters.size()) { // 逐个 Filter 过滤调用 GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); } } 复制代码
调用流程 3 : Filter 过滤
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 通常判断部分条件 , 如果该 Filter 不符合 , 则跳过该 Filter if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } 复制代码
5.2 发送的主体
核心的发送 Filter 是 NettyRoutingFilter, 下面只关注这个 Filter 的相关逻辑 :
C- NettyRoutingFilter public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 请求 URL : http://httpbin.org:80/get URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); // 协议类型 : http String scheme = requestUrl.getScheme(); // Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理 if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } // Step 2 : 标识 Routed 已处理 setAlreadyRouted(exchange); // Step 3 : 获取 Request 请求对象 , 这个是外部请求的对象 ServerHttpRequest request = exchange.getRequest(); // Step 4 : 获取 Method 类型 (get/post...) final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toString(); // Step 5 : 对 Header 进行处理 , 需要转发过去 HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); // -> Transfer-Encoding String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); // -> preserveHostHeader boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); // 通过 netty httpClient 发起转发请求 , PS !!! 这里是异步的 Flux<HttpClientResponse> responseFlux = this.httpClient .chunkedTransfer(chunkedTransfer).request(method).uri(url) .send((req, nettyOutbound) -> { // Step 6 : 转发 Header req.headers(httpHeaders); // => 是否需要记录之前的 host if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); req.header(HttpHeaders.HOST, host); } // Step 7 : 真正发起请求 return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach) .send(request.getBody() .map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())); }).responseConnection((res, connection) -> { // Step 8 : 请求完成 , 获取 response ServerHttpResponse response = exchange.getResponse(); // Step 9 : 转发headers 和 status 等属性 HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach( entry -> headers.add(entry.getKey(), entry.getValue())); // => String CONTENT_TYPE = "Content-Type" // => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type"; String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } // 转发状态 , 存在往 GatewayResponse 设置状态 HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { ((AbstractServerHttpResponse) response) .setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException( "Unable to set status code on response: " + res.status().code() + ", " + response.getClass()); } // 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常 HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( getHeadersFilters(), headers, exchange, Type.RESPONSE); // String TRANSFER_ENCODING = "Transfer-Encoding" // String CONTENT_LENGTH = "Content-Length" if (!filteredResponseHeaders .containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders .containsKey(HttpHeaders.CONTENT_LENGTH)) { // content-length 存在需要去掉 Transfer-Encoding response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); // 延迟提交响应,直到所有路由过滤器都运行 // 将客户端响应作为ServerWebExchange属性,稍后写入响应NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); return Mono.just(res); }); if (properties.getResponseTimeout() != null) { // 超时异常处理 responseFlux = responseFlux.timeout(properties.getResponseTimeout(), Mono.error(new TimeoutException("Response took longer than timeout: " + properties.getResponseTimeout()))) .onErrorMap(TimeoutException.class, // GATEWAY_TIMEOUT(504, "Gateway Timeout") th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange)); } 复制代码
5.3 返回 Response
C- NettyWriteResponseFilter public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange).then(Mono.defer(() -> { // Step 1 : 获取 GatewayRequest Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); // 连接不存在直接返回空 if (connection == null) { return Mono.empty(); } // Step 2 : 获取 GatewayResponse ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response .bufferFactory(); // 此处主要包含一个 byteBufflux final Flux<NettyDataBuffer> body = connection.inbound().receive().retain() .map(factory::wrap); // 媒体类型 MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { log.trace("invalid media type", e); } return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); } 复制代码
总结
由于 netty 的底层了解得还不是很清楚 , 对于一些调用过程没办法输出数据看 , 这篇文章心里也不是很有底 , 后续深入后再来补充细节
本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。