引入 Gateway 网关,这些坑一定要学会避开!!!

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 引入 Gateway 网关,这些坑一定要学会避开!!!

Spring cloud gateway是替代zuul的网关产品,基于Spring 5、Spring boot 2.0以上、Reactor, 提供任意的路由匹配和断言、过滤功能。上一篇文章谈了一下Gateway网关使用不规范,同事加班泪两行~,这篇文章将会侧重于其他的几个需要注意的地方。


网关实现


这里介绍编码方式实现


HystrixObservableCommand.Setter getSetter() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("group-accept");
        HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter.withGroupKey(groupKey);
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("command-accept");
        setter.andCommandKey(commandKey);
        HystrixCommandProperties.Setter proertiesSetter = HystrixCommandProperties.Setter();
        proertiesSetter
                /* *
                 * 线程策略配置
                 */
                //设置线程模式 缺省 1000ms
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                //执行是否启用超时时间 缺省 true
                .withExecutionTimeoutEnabled(true)
                //使用线程隔离时,是否对命令执行超时的线程调用中断 缺省false
                .withExecutionIsolationThreadInterruptOnFutureCancel(false)
                //执行超时的时候是否要它中断 缺省 true
                .withExecutionIsolationThreadInterruptOnTimeout(true)
                //执行的超时时间 缺省 1000ms
                .withExecutionTimeoutInMilliseconds(2000)
                /* *
                 * 熔断策略
                 */
                //是否开启溶断 缺省 true
                .withCircuitBreakerEnabled(true)
                // 是否允许熔断器忽略错误,默认false, 不开启 ;
                // true,断路器强制进入“关闭”状态,它会接收所有请求。
                // 如果forceOpen属性为true,该属性不生效
                .withCircuitBreakerForceClosed(false)
                // 是否强制开启熔断器阻断所有请求, 默认为false
                // 为true时,所有请求都将被拒绝,直接到fallback.
                // 如果该属性设置为true,断路器将强制进入“打开”状态,
                // 它会拒绝所有请求。该属性优于forceClosed属性
                .withCircuitBreakerForceOpen(false)
                // 用来设置当断路器打开之后的休眠时间窗。
                // 休眠时间窗结束之后,会将断路器设置为“半开”状态,尝试熔断的请求命令,
                // 如果依然请求错误就将断路器继续设置为“打开”状态,如果成功,就设置为“关闭”状态
                // 熔断器默认工作时间,默认:5000豪秒.
                // 熔断器中断请求10秒后会进入半打开状态,放部分流量过去重试.
                .withCircuitBreakerSleepWindowInMilliseconds(5000)
                // 熔断器在整个统计时间内是否开启的阀值.
                // 在metricsRollingStatisticalWindowInMilliseconds(默认10s)内默认至少请求10次,
                // 熔断器才发挥起作用,9次熔断器都不起作用。
                .withCircuitBreakerRequestVolumeThreshold(100)
                // 该属性用来设置断路器打开的错误百分比条件。默认值为50.
                // 表示在滚动时间窗中,在请求值超过requestVolumeThreshold阈值的前提下,
                // 如果错误请求数百分比超过50,就把断路器设置为“打开”状态,否则就设置为“关闭”状态
                .withCircuitBreakerErrorThresholdPercentage(50);
        setter.andCommandPropertiesDefaults(proertiesSetter);
        return setter;
    }


@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
 RouteLocatorBuilder.Builder routes = builder.routes();
 RouteLocatorBuilder.Builder serviceProvider = routes
  .route("accept",
   r -> r.method(HttpMethod.GET)
    .and()
    .path("/gateway-accept/**")
    .and()
    .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
    .filters(f -> {
     f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
     f.requestRateLimiter(
      config -> config.setKeyResolver(new GenericAccessResolver())
       .setRateLimiter(redisRateLimiter()));
     f.hystrix(config -> config.setName("accept")
      .setFallbackUri("forward:/gateway-fallback")
      .setSetter(getSetter()));
     return f;
    })
    .uri("http://localhost:8888")
     );
 return serviceProvider.build();
}


在上面的代码中,主要做了3件事情:限流、熔断策略及降级方法配置


限流


  • 配置redis


spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password:
    timeout: 1500
    lettuce:
      pool:
        max-active: 300 #连接池最大连接数(使用负值表示没有限制)
        max-idle: 10    #连接池中的最大空闲连接
        min-idle: 5     #连接池中的最小空闲连接
        max-wait: -1    #连接池最大阻塞等待时间(使用负值表示没有限制)


  • 自定义解析


/**
 * @description: 按照访问地址进行限流(也可以安装其他条件进行限流),具体可以看exchange.getRequest()的方法和属性
 **/
public class GenericAccessResolver implements KeyResolver {
    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return Mono.just(exchange.getRequest().getPath().value());
    }
}


  • 自定义限流配置


RedisRateLimiter redisRateLimiter() {
 //1000,1500对应replenishRate、burstCapacity
 return new RedisRateLimiter(1000, 1500);
}


  • 网关使用自定义限流器(网关使用代码实现)


@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
    RouteLocatorBuilder.Builder routes = builder.routes();
    RouteLocatorBuilder.Builder serviceProvider = routes
        .route("accept",
               r -> r.method(HttpMethod.GET)
               .and()
               .path("/gateway-accept/**")
               .and()
               .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
               //.and()
               //.readBody(String.class, readBody -> true)
               .filters(f -> {
                   f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
                   f.requestRateLimiter(config -> config.setKeyResolver(new GenericAccessResolver()).setRateLimiter(redisRateLimiter()));                                   
                   return f;
               })
               .uri("http://localhost:8888")
              );
    return serviceProvider.build();
}


  • 测试


  • jmeter配置


image.png


结果


image.png


  • 其他


如果有多个路由,使用不同的限流策略,可以自定义KeyResolver和RedisRateLimiter, 在路由定义时加入


//基于ip限流
public class OtherAccessResolver implements KeyResolver {
    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
    }
}


RedisRateLimiter otherRedisRateLimiter() {
 //1000,1500对应replenishRate、burstCapacity
 return new RedisRateLimiter(100, 500);
}


@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
 RouteLocatorBuilder.Builder routes = builder.routes();
 RouteLocatorBuilder.Builder serviceProvider = routes
  .route("accept",
   r -> r.method(HttpMethod.GET)
    .and()
    .path("/gateway-accept/**")
    .and()
    .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
    .filters(f -> {
     f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
     f.requestRateLimiter(
      config -> config.setKeyResolver(new GenericAccessResolver())
       .setRateLimiter(redisRateLimiter()));
     f.hystrix(config -> config.setName("accept")
      .setFallbackUri("forward:/gateway-fallback")
      .setSetter(getSetter()));
     return f;
    })
    .uri("http://localhost:8888"))
        .route("sign",
   r -> r.method(HttpMethod.POST)
    .and()
    .path("/gateway-sign/**")
    .and()
    .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
    .filters(f -> {
     f.rewritePath("/gateway-sign/(?<path>.*)", "/${path}");
     f.requestRateLimiter(
      config -> config.setKeyResolver(new OtherAccessResolver())
       .setRateLimiter(otherRedisRateLimiter()));
     f.hystrix(config -> config.setName("sign")
      .setFallbackUri("forward:/gateway-fallback")
      .setSetter(getSetter()));
     return f;
    })
    .uri("http://localhost:7777")
     );
 return serviceProvider.build();
}


熔断策略


熔断策略主要是线程配置和熔断配置,上面已经说明很清楚了。在上篇文章中,为了解决网关调用后台服务Connection prematurely closed BEFORE response的问题,要设置后台服务线程的空闲时间和网关线程池线程的空闲时间,并让网关线程池线程的空闲时间小于后台服务的空闲时间


配置方法


spring:
  cloud:
    gateway:
      httpclient:
        pool:
            max-connections: 500
            max-idle-time: 10000


编码实现


翻阅Spring Cloud Gateway英文资料,知道路由提供一个metadata方法,可以设置路由的元数据(https://docs.spring.io/spring-cloud-


gateway/docs/2.2.6.RELEASE/reference/html/#route-metadata-configuration),这些元数据在RouteMetadataUtils中定义:


package org.springframework.cloud.gateway.support;
public final class RouteMetadataUtils {
    public static final String RESPONSE_TIMEOUT_ATTR = "response-timeout";
    public static final String CONNECT_TIMEOUT_ATTR = "connect-timeout";
    private RouteMetadataUtils() {
        throw new AssertionError("Must not instantiate utility class.");
    }
}


其中没有我要的线程数量(max-connection)和空闲时间(max-idle-time)的设置,没有关系,自己加上去:


@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        RouteLocatorBuilder.Builder routes = builder.routes();
        RouteLocatorBuilder.Builder serviceProvider = routes
           .route("accept",
               r -> r.method(HttpMethod.GET)
                     .and()
                     .path("/gateway-accept/**")
                     .and()
                     .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8")
                     .filters(f -> {
                          f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}");
                          f.requestRateLimiter(
                             config -> config.setKeyResolver(new GenericAccessResolver())
                                           .setRateLimiter(redisRateLimiter()));
                          f.hystrix(config -> config.setName("accept")
                                      .setFallbackUri("forward:/gateway-fallback")
                                      .setSetter(getSetter()));
                                return f;
                          })
                     .uri("http://localhost:8888")
                     .metadata("max-idle-time", 10000)  //网关调用后台线程空闲时间设置
                     .metadata("max-connections", 200)  //网关调用后台服务线程数量设置
          );
     return serviceProvider.build();
}


测试果然和yml配置一样有效果。


降级方法


降级方法本身没有什么特别,有一个问题需要注意,调用降级方法也是使用线程池的,缺省在HystrixThreadPoolProperties中定义:


public abstract class HystrixThreadPoolProperties {
    /* defaults */
    static int default_coreSize = 10;            // core size of thread pool
    static int default_maximumSize = 10;         // maximum size of thread pool
    static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
    static int default_maxQueueSize = -1;        // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
                                                 // -1 turns it off and makes us use SynchronousQueue


错误


如果上面的限流设置比较大,比如1000,最大突发2000,网关调用后台服务发生熔断降级, 熔断后降级的方法调用太频繁,10个线程不够用,会导致以下500错误:


2021-02-01 14:29:45.076 ERROR 64868 --- [ioEventLoop-5-1] a.w.r.e.AbstractErrorWebExceptionHandler : [a0ed6911-18982]  500 Server Error for HTTP GET "/gateway-accept/test"
com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected.
 at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18]
 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
 |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
 |_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]
com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected.
 at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18]
 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
 |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
 |_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]


配置方法


所以要在yml中设置合适的调用降级方法的线程池, 合理的配置能够杜绝网关500错误的发生。


hystrix:
  threadpool:
    group-accept:  #代码里面设置的HystrixCommandGroupKey.Factory.asKey("group-accept")
      coreSize: 50 #并发执行的最大线程数,默认10
      maxQueueSize: 1500 #BlockingQueue的最大队列数
      #即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝
      queueSizeRejectionThreshold: 1400


网关异常截获


上面的异常后,没有捕获异常直接返回前端500错误,一般情况下需要返回一个统一接口,比如:


@Data
@ToString
@EqualsAndHashCode
@Accessors(chain = true)
public class Result<T> implements Serializable {
    private Integer code;
    private String message;
    private T data;
    private String sign;
    public static final String SUCCESS = "成功";
    public static final String FAILURE = "失败";
    public Result(int code, String message) {
        this.code = code;
        this.message = message;
    }
    public Result(int code, String message, T data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }
    public Result(int code, String message, T data, String sign) {
        this.code = code;
        this.message = message;
        this.data = data;
        this.sign = sign;
    }
    public static Result<Object> success() {
        return new Result<Object>(200, SUCCESS);
    }
    public static Result<Object> success(Object data) {
        return new Result<Object>(200, SUCCESS, data);
    }
    public static Result<Object> success(Object data, String sign) {
        return new Result<Object>(200, SUCCESS, data, sign);
    }
    public static Result<Object> failure() {
        return new Result<Object>(400, FAILURE);
    }
    public static Result<Object> failure(Object data) {
        return new Result<Object>(400, FAILURE, data);
    }
    public static Result<Object> failure(Object data, String sign) {
        return new Result<Object>(400, FAILURE, data, sign);
    }
}


创建GlobalExceptionConfiguration 实现ErrorWebExceptionHandler(这一段是来者网友提供的)


@Slf4j
@Order(-1)
@Component
@RequiredArgsConstructor
public class GlobalExceptionConfiguration implements ErrorWebExceptionHandler {
    private final ObjectMapper objectMapper;
    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        if (response.isCommitted()) {
            return Mono.error(ex);
        }
        response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);
        if (ex instanceof ResponseStatusException) {
            response.setStatusCode(((ResponseStatusException) ex).getStatus());
        }
        return response
                .writeWith(Mono.fromSupplier(() -> {
                    DataBufferFactory bufferFactory = response.bufferFactory();
                    try {
                        return bufferFactory.wrap(objectMapper.writeValueAsBytes(Result.failure(ex.getMessage())));
                    } catch (JsonProcessingException e) {
                        log.warn("Error writing response", ex);
                        return bufferFactory.wrap(new byte[0]);
                    }
                }));
    }
}


这样,就会把网关异常统一包装在接口中返回:如:


image.png


后台日志已经没有之前的错误日志了。


编码实现,没找到


由于Spring Cloud Gateway 中的 Hystrix采用的是HystrixObservableCommand.Setter, 没有采用 HystrixCommand.Setter, 在 HystrixCommand.Setter中是可以编码实现线程池配置的, 但是在HystrixObservableCommand.Setter没有提供:


final public static class Setter {
        protected final HystrixCommandGroupKey groupKey;
        protected HystrixCommandKey commandKey;
        protected HystrixThreadPoolKey threadPoolKey;  //有属性但是没有set方法
        protected HystrixCommandProperties.Setter commandPropertiesDefaults;
        protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; //有属性没有set方法
        protected Setter(HystrixCommandGroupKey groupKey) {
            this.groupKey = groupKey;
            // default to using SEMAPHORE for ObservableCommand
            commandPropertiesDefaults = setDefaults(HystrixCommandProperties.Setter());
        }
        public static Setter withGroupKey(HystrixCommandGroupKey groupKey) {
            return new Setter(groupKey);
        }
        public Setter andCommandKey(HystrixCommandKey commandKey) {
            this.commandKey = commandKey;
            return this;
        }
        public Setter andCommandPropertiesDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) {
            this.commandPropertiesDefaults = setDefaults(commandPropertiesDefaults);
            return this;
        }
        private HystrixCommandProperties.Setter setDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) {
            if (commandPropertiesDefaults.getExecutionIsolationStrategy() == null) {
                // default to using SEMAPHORE for ObservableCommand if the user didn't set it
                commandPropertiesDefaults.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE);
            }
            return commandPropertiesDefaults;
        }
    }


由于本人水平有限,没有找到Setter中设置HystrixThreadPoolKeyHystrixThreadPoolProperties.Setter的方法,所以只能在yml中配置。有知道的同学告诉我一声,不胜感激。


总结


所以在Spring Cloud Gateway网关的配置中,需要综合考虑限流大小、网关调用后台连接池设置大小、后台服务的连接池以及空闲时间,包括网关调用降级方法的线程池配置,都需要在压测中调整到一个合理的配置,才能发挥最大的功效。


本人水平有限,跟深入的研究还在继续,如果文章有表达错误或者不周,请大家指正,谢谢!

END

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6月前
|
算法 NoSQL API
SpringCloud&Gateway网关限流
SpringCloud&Gateway网关限流
314 7
|
6月前
|
缓存
SpringCloud Gateway 网关的请求体body的读取和修改
SpringCloud Gateway 框架中,为了处理请求体body,实现多次读取与修改,创建了一个名为`RequestParamGlobalFilter`的全局过滤器。这个过滤器使用`@Component`和`@Slf4j`注解,实现了`GlobalFilter`和`Ordered`接口,设置最高优先级以首先读取body。它通过缓存请求体并创建装饰过的`ServerHttpRequest`来实现body的动态获取。
976 4
|
15天前
|
负载均衡 Java 应用服务中间件
Gateway服务网关
Gateway服务网关
29 1
Gateway服务网关
|
2月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
106 5
|
3月前
|
Java API 微服务
服务网关Gateway
该博客文章详细介绍了Spring Cloud Gateway的使用方法和概念。文章首先阐述了API网关在微服务架构中的重要性,解释了客户端直接与微服务通信可能带来的问题。接着,文章通过具体的示例代码,展示了如何在Spring Cloud Gateway中添加依赖、编写路由规则,并对路由规则中的基本概念如Route、Predicate和Filter进行了详细解释。最后,文章还提供了路由规则的测试方法。
服务网关Gateway
|
3月前
|
安全 API
【Azure API 管理】APIM Self-Host Gateway 自建本地环境中的网关数量超过10个且它们的出口IP为同一个时出现的429错误
【Azure API 管理】APIM Self-Host Gateway 自建本地环境中的网关数量超过10个且它们的出口IP为同一个时出现的429错误
|
3月前
|
存储 容器
【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心
【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心
|
4月前
|
JSON 前端开发 Java
SpringCloud怎么搭建GateWay网关&统一登录模块
本文来分享一下,最近我在自己的项目中实现的认证服务,目前比较简单,就是可以提供一个公共的服务,专门来处理登录请求,然后我还在API网关处实现了登录拦截的效果,因为在一个博客系统中,有一些地址是可以不登录的,比方说首页;也有一些是必须登录的,比如发布文章、评论等。所以,在网关处可以支持自定义一些不需要登录的地址,一些需要登录的地址,也可以在网关处进行校验,如果未登录,可以返回JSON格式的出参,前端可以进行相关处理,比如跳转到登录页面等。
123 4
|
3月前
|
负载均衡 Java 应用服务中间件
Gateway服务网关
本节针对微服务中另一重要组件:网关 进行了实战性演练,网关作为分布式架构中的重要中间件,不仅承担着路由分发(重点关注Path规则配置),同时可根据自身负载均衡策略,对多个注册服务实例进行均衡调用。本节我们借助GateWay实现的网关只是技术实现的方案之一,后续大家可能会接触像:Zuul、Kong等,其实现细节或有差异,但整体目标是一致的。
|
4月前
|
Kubernetes 监控 Java
有了k8s还需要gateway网关,nacos配置中心吗
在Kubernetes环境中,服务网关(如Spring Cloud Gateway)和Nacos配置中心补充了k8s的不足。Nacos提供灵活服务路由和动态配置更新,超越k8s基础服务发现。它还支持更复杂的配置管理和实时推送,以及环境隔离和版本控制。作为服务注册中心,Nacos增强k8s服务治理能力,保持技术一致性,并提供额外的安全层及监控功能。
237 0