Spring cloud gateway 如何在路由时进行负载均衡

简介: Spring cloud gateway 如何在路由时进行负载均衡

本文为博主原创,转载请注明出处:

 

1.spring cloud gateway 配置路由

  在网关模块的配置文件中配置路由:

复制代码
spring:
  cloud:
    gateway:
      routes:
        - id: user
          uri: lb://user-server
          predicates:
            - Path=/api-web/**    #前端访问需加入例如 http:ip:port/api-web
          filters:
            - StripPrefix=1   #访问后端服务过滤掉m 必填否则找不到后端服务也可以在服务加上统一路径
复制代码

  其中lb表示采用了负载均衡,user-server表示服务名

  当后端有多个服务节点时,网关会以负载均衡的方式将请求发送到后端的各个服务节点上,当某个服务节点关闭以后,后续的请求不会发送到该节点上。这个过程会存在一定的时间延迟,比如30秒左右。

 

2.查看 GatewayLoadBalancerClientAutoConfiguration 的配置类

  这个配置类会加载一个过滤器,使用这个过滤器可以实现负载均衡

@Configuration(
    proxyBeanMethods = false
)
@ConditionalOnClass({LoadBalancerClient.class, RibbonAutoConfiguration.class, DispatcherHandler.class})
@AutoConfigureAfter({RibbonAutoConfiguration.class})
@EnableConfigurationProperties({LoadBalancerProperties.class})
public class GatewayLoadBalancerClientAutoConfiguration {
    public GatewayLoadBalancerClientAutoConfiguration() {
    }
    @Bean
    @ConditionalOnBean({LoadBalancerClient.class})
    @ConditionalOnMissingBean({LoadBalancerClientFilter.class, ReactiveLoadBalancerClientFilter.class})
    public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client, LoadBalancerProperties properties) {
        // 会加载一个负载均衡的 过滤器 :LoadBalancerClientFilter
        return new LoadBalancerClientFilter(client, properties);
    }
}

 

3.查看 LoadBalancerClientFilter 过滤器的实现

  查看该过滤器的实现

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {  
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
    private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class);
    protected final LoadBalancerClient loadBalancer;
    private LoadBalancerProperties properties;
    public LoadBalancerClientFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {
        this.loadBalancer = loadBalancer;
        this.properties = properties;
    }
    public int getOrder() {
        return 10100;
    }
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url before: " + url);
            }
            ServiceInstance instance = this.choose(exchange);
            if (instance == null) {
                throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());
            } else {
                URI uri = exchange.getRequest().getURI();
                String overrideScheme = instance.isSecure() ? "https" : "http";
                if (schemePrefix != null) {
                    overrideScheme = url.getScheme();
                }
                URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
                if (log.isTraceEnabled()) {
                    log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                }
                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                return chain.filter(exchange);
            }
        } else {
            return chain.filter(exchange);
        }
    }
    protected ServiceInstance choose(ServerWebExchange exchange) {
    // 该loadBalancer 为ribbon 配置的负载均衡器,会根据指定的规则进行负载均衡,默认是轮询            
    return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());  
  } 
}


  NettyRoutingFilter 过滤器:

public class NettyRoutingFilter implements GlobalFilter, Ordered {
    private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);
    private final HttpClient httpClient;
    private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
    private final HttpClientProperties properties;
    private volatile List<HttpHeadersFilter> headersFilters;
    public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider, HttpClientProperties properties) {
        this.httpClient = httpClient;
        this.headersFiltersProvider = headersFiltersProvider;
        this.properties = properties;
    }
    public List<HttpHeadersFilter> getHeadersFilters() {
        if (this.headersFilters == null) {
            this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable();
        }
        return this.headersFilters;
    }
    public int getOrder() {
        return 2147483647;
    }
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String scheme = requestUrl.getScheme();
        if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equals(scheme) || "https".equals(scheme))) {
            ServerWebExchangeUtils.setAlreadyRouted(exchange);
            ServerHttpRequest request = exchange.getRequest();
            HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
            String url = requestUrl.toASCIIString();
            HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);
            DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
            filtered.forEach(httpHeaders::set);
            boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);
            Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
            Flux<HttpClientResponse> responseFlux = ((RequestSender)this.getHttpClient(route, exchange).headers((headers) -> {
                headers.add(httpHeaders);
                headers.remove("Host");
                if (preserveHost) {
                    String host = request.getHeaders().getFirst("Host");
                    headers.add("Host", host);
                }
            }).request(method).uri(url)).send((req, nettyOutbound) -> {
                if (log.isTraceEnabled()) {
                    nettyOutbound.withConnection((connection) -> {
                        log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix());
                    });
                }
                return nettyOutbound.send(request.getBody().map(this::getByteBuf));
            }).responseConnection((res, connection) -> {
                exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res);
                exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection);
                ServerHttpResponse response = exchange.getResponse();
                HttpHeaders headers = new HttpHeaders();
                res.responseHeaders().forEach((entry) -> {
                    headers.add((String)entry.getKey(), (String)entry.getValue());
                });
                String contentTypeValue = headers.getFirst("Content-Type");
                if (StringUtils.hasLength(contentTypeValue)) {
                    exchange.getAttributes().put("original_response_content_type", contentTypeValue);
                }
                this.setResponseStatus(res, response);
                HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE);
                if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) {
                    response.getHeaders().remove("Transfer-Encoding");
                }
                exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
                response.getHeaders().putAll(filteredResponseHeaders);
                return Mono.just(res);
            });
            Duration responseTimeout = this.getResponseTimeout(route);
            if (responseTimeout != null) {
                responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class, (th) -> {
                    return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th);
                });
            }
            return responseFlux.then(chain.filter(exchange));
        } else {
            return chain.filter(exchange);
        }
    }
    protected ByteBuf getByteBuf(DataBuffer dataBuffer) {
        if (dataBuffer instanceof NettyDataBuffer) {
            NettyDataBuffer buffer = (NettyDataBuffer)dataBuffer;
            return buffer.getNativeBuffer();
        } else if (dataBuffer instanceof DefaultDataBuffer) {
            DefaultDataBuffer buffer = (DefaultDataBuffer)dataBuffer;
            return Unpooled.wrappedBuffer(buffer.getNativeBuffer());
        } else {
            throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());
        }
    }
    private void setResponseStatus(HttpClientResponse clientResponse, ServerHttpResponse response) {
        HttpStatus status = HttpStatus.resolve(clientResponse.status().code());
        if (status != null) {
            response.setStatusCode(status);
        } else {
            while(true) {
                if (!(response instanceof ServerHttpResponseDecorator)) {
                    if (!(response instanceof AbstractServerHttpResponse)) {
                        throw new IllegalStateException("Unable to set status code " + clientResponse.status().code() + " on response of type " + response.getClass().getName());
                    }
                    ((AbstractServerHttpResponse)response).setStatusCodeValue(clientResponse.status().code());
                    break;
                }
                response = ((ServerHttpResponseDecorator)response).getDelegate();
            }
        }
    }
    protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {
        Object connectTimeoutAttr = route.getMetadata().get("connect-timeout");
        if (connectTimeoutAttr != null) {
            Integer connectTimeout = getInteger(connectTimeoutAttr);
            return this.httpClient.tcpConfiguration((tcpClient) -> {
                return tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
            });
        } else {
            return this.httpClient;
        }
    }
    static Integer getInteger(Object connectTimeoutAttr) {
        Integer connectTimeout;
        if (connectTimeoutAttr instanceof Integer) {
            connectTimeout = (Integer)connectTimeoutAttr;
        } else {
            connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());
        }
        return connectTimeout;
    }
    private Duration getResponseTimeout(Route route) {
        Object responseTimeoutAttr = route.getMetadata().get("response-timeout");
        Long responseTimeout = null;
        if (responseTimeoutAttr != null) {
            if (responseTimeoutAttr instanceof Number) {
                responseTimeout = ((Number)responseTimeoutAttr).longValue();
            } else {
                responseTimeout = Long.valueOf(responseTimeoutAttr.toString());
            }
        }
        return responseTimeout != null ? Duration.ofMillis(responseTimeout) : this.properties.getResponseTimeout();
    }
}


  在NettyRoutingFilter中根据GATEWAY_REQUEST_URL_ATTR属性读取requestUrl,然后进行相应请求。

  LoadBalancerClientFilter会作用在url以lb开头的路由,然后利用loadBalancer来获取服务实例,构造目标requestUrl,设置到GATEWAY_REQUEST_URL_ATTR属性中,供NettyRoutingFilter使用。

 


标签: spring cloud

目录
相关文章
|
6天前
|
安全 Java 开发者
强大!Spring Cloud Gateway新特性及高级开发技巧
在微服务架构日益盛行的今天,网关作为微服务架构中的关键组件,承担着路由、安全、监控、限流等多重职责。Spring Cloud Gateway作为新一代的微服务网关,凭借其基于Spring Framework 5、Project Reactor和Spring Boot 2.0的强大技术栈,正逐步成为业界的主流选择。本文将深入探讨Spring Cloud Gateway的新特性及高级开发技巧,助力开发者更好地掌握这一强大的网关工具。
53 6
|
30天前
|
JavaScript 安全 Java
【绝密攻略】揭秘Spring Boot与Ant Design Pro Vue的终极结合:打造梦幻般的动态路由与菜单管理,颠覆你的前后端分离世界!
【8月更文挑战第9天】随着前后端分离趋势的发展,构建高效且易维护的框架至关重要。本文介绍如何利用Spring Boot与Ant Design Pro Vue打造带有动态路由和菜单的应用。首先需安装Node.js、NPM及Java开发工具;接着通过Spring Initializr初始化含Web和Security依赖的项目,并配置Spring Security。后端API提供菜单数据,而前端则基于这些数据动态生成路由和菜单。通过具体步骤演示整个流程,包括创建Controller、配置动态路由、设置菜单等。此外还分享了实践心得,强调版本兼容性、安全性等方面的重要性。
38 1
|
14天前
|
Java 应用服务中间件 nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
|
2月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
54 3
|
2月前
|
Java 微服务 Spring
SpringCloud gateway自定义请求的 httpClient
SpringCloud gateway自定义请求的 httpClient
85 3
|
4月前
|
负载均衡 算法 应用服务中间件
面试题:Nginx有哪些负载均衡算法?Nginx位于七层网络结构中的哪一层?
字节跳动面试题:Nginx有哪些负载均衡算法?Nginx位于七层网络结构中的哪一层?
104 0
|
3月前
|
缓存 负载均衡 算法
解读 Nginx:构建高效反向代理和负载均衡的秘密
解读 Nginx:构建高效反向代理和负载均衡的秘密
89 2
|
2月前
|
负载均衡 算法 应用服务中间件
nginx自定义负载均衡及根据cpu运行自定义负载均衡
nginx自定义负载均衡及根据cpu运行自定义负载均衡
29 1
|
2月前
|
运维 负载均衡 算法
SLB与NGINX的异同是什么
SLB与NGINX的异同是什么
90 2
|
4月前
|
负载均衡 应用服务中间件 nginx
解决nginx配置负载均衡时invalid host in upstream报错
在Windows环境下,配置Nginx 1.11.5进行负载均衡时遇到问题,服务无法启动。错误日志显示“invalid host in upstream”。检查发现上游服务器列表中,192.168.29.128的主机地址无效。负载均衡配置中,两个服务器地址前误加了&quot;http://&quot;。修正方法是删除上游服务器列表和proxy_pass中的&quot;http://&quot;。问题解决后,Nginx服务应能正常启动。
261 4
解决nginx配置负载均衡时invalid host in upstream报错
下一篇
DDNS