Springcloud Gateway:动态配置,过滤器源码思路(2)

简介: Springcloud Gateway:动态配置,过滤器源码思路(2)

2.png

我们打开nacos的web 页面


[
  {
    "id": "e-commerce-nacos-client",
    "predicates": [
      {
        "args": {
          "pattern": "/imooc/ecommerce-nacos-client/**"
        },
        "name": "Path"
      }
    ],
    "uri": "lb://e-commerce-nacos-client",
    "filters": [
      {
        "name": "HeaderToken"
      },
      {
        "name": "StripPrefix",
        "args": {
          "parts": "1"
        }
      }
    ]
  },
  {
  "id": "e-commerce-account-service",
  "predicates": [
    {
      "args": {
        "pattern": "/imooc/ecommerce-account-service/**"
      },
      "name": "Path"
    }
  ],
  "uri": "lb://e-commerce-account-service",
  "filters": [
    {
      "name": "StripPrefix",
      "args": {
        "parts": "1"
      }
    }
  ]
},
  {
    "id": "e-commerce-goods-service",
    "predicates": [
      {
        "args": {
          "pattern": "/imooc/ecommerce-goods-service/**"
        },
        "name": "Path"
      }
    ],
    "uri": "lb://e-commerce-goods-service",
    "filters": [
      {
        "name": "StripPrefix",
        "args": {
          "parts": "1"
        }
      }
    ]
  }
]

动态路由网关的配置

GatewayConfig 创建 config 设置一些 需要的参数 ,比如


超时时间

nacos 服务器的地址

命名空间

data-id

Group id

/**
 * <h1>配置类, 读取 Nacos 相关的配置项, 用于配置监听器</h1>
 * */
@Configuration
public class GatewayConfig {
    /** 读取配置的超时时间 */
    public static final long DEFAULT_TIMEOUT = 30000;
    /** Nacos 服务器地址 */
    public static String NACOS_SERVER_ADDR;
    /** 命名空间 */
    public static String NACOS_NAMESPACE;
    /** data-id */
    public static String NACOS_ROUTE_DATA_ID;
    /** 分组 id */
    public static String NACOS_ROUTE_GROUP;
    @Value("${spring.cloud.nacos.discovery.server-addr}")
    public void setNacosServerAddr(String nacosServerAddr) {
        NACOS_SERVER_ADDR = nacosServerAddr;
    }
    @Value("${spring.cloud.nacos.discovery.namespace}")
    public void setNacosNamespace(String nacosNamespace) {
        NACOS_NAMESPACE = nacosNamespace;
    }
    @Value("${nacos.gateway.route.config.data-id}")
    public void setNacosRouteDataId(String nacosRouteDataId) {
        NACOS_ROUTE_DATA_ID = nacosRouteDataId;
    }
    @Value("${nacos.gateway.route.config.group}")
    public void setNacosRouteGroup(String nacosRouteGroup) {
        NACOS_ROUTE_GROUP = nacosRouteGroup;
    }
}

这里我们再次梳理一下思路 这里我们保存的这些配置信息,这里我们做的是保存当前配置的,之后发生改变了,我们先监听,再去获取配置信息之后刷新配置。 接下来我们编写注册网关事件更新操作


编写注册网关事件更新

/**
 * 事件推送 Aware: 动态更新路由网关 Service
 * */
@Slf4j
@Service
@SuppressWarnings("all")
public class DynamicRouteServiceImpl implements ApplicationEventPublisherAware {
    /** 写路由定义 */
    private final RouteDefinitionWriter routeDefinitionWriter;
    /** 获取路由定义 */
    private final RouteDefinitionLocator routeDefinitionLocator;
    /** 事件发布 */
    private ApplicationEventPublisher publisher;
    public DynamicRouteServiceImpl(RouteDefinitionWriter routeDefinitionWriter,
                                   RouteDefinitionLocator routeDefinitionLocator) {
        this.routeDefinitionWriter = routeDefinitionWriter;
        this.routeDefinitionLocator = routeDefinitionLocator;
    }
    @Override
    public void setApplicationEventPublisher(
            ApplicationEventPublisher applicationEventPublisher) {
        // 完成事件推送句柄的初始化
        this.  = applicationEventPublisher;
    }
    /**
     * <h2>增加路由定义</h2>
     * */
    public String addRouteDefinition(RouteDefinition definition) {
        log.info("gateway add route: [{}]", definition);
        // 保存路由配置并发布
        routeDefinitionWriter.save(Mono.just(definition)).subscribe();
        // 发布事件通知给 Gateway, 同步新增的路由定义
        this.publisher.publishEvent(new RefreshRoutesEvent(this));
        return "success";
    }
    /**
     * <h2>更新路由</h2>
     * */
    public String updateList(List<RouteDefinition> definitions) {
        log.info("gateway update route: [{}]", definitions);
        // 先拿到当前 Gateway 中存储的路由定义
        List<RouteDefinition> routeDefinitionsExits =
                routeDefinitionLocator.getRouteDefinitions().buffer().blockFirst();
        if (!CollectionUtils.isEmpty(routeDefinitionsExits)) {
            // 清除掉之前所有的 "旧的" 路由定义
            routeDefinitionsExits.forEach(rd -> {
                log.info("delete route definition: [{}]", rd);
                deleteById(rd.getId());
            });
        }
        // 把更新的路由定义同步到 gateway 中
        definitions.forEach(definition -> updateByRouteDefinition(definition));
        return "success";
    }
    /**
     * <h2>根据路由 id 删除路由配置</h2>
     * */
    private String deleteById(String id) {
        try {
            log.info("gateway delete route id: [{}]", id);
            this.routeDefinitionWriter.delete(Mono.just(id)).subscribe();
            // 发布事件通知给 gateway 更新路由定义
            this.publisher.publishEvent(new RefreshRoutesEvent(this));
            return "delete success";
        } catch (Exception ex) {
            log.error("gateway delete route fail: [{}]", ex.getMessage(), ex);
            return "delete fail";
        }
    }
    /**
     * <h2>更新路由</h2>
     * 更新的实现策略比较简单: 删除 + 新增 = 更新
     * */
    private String updateByRouteDefinition(RouteDefinition definition) {
        try {
            log.info("gateway update route: [{}]", definition);
            this.routeDefinitionWriter.delete(Mono.just(definition.getId()));
        } catch (Exception ex) {
            return "update fail, not find route routeId: " + definition.getId();
        }
        try {
            this.routeDefinitionWriter.save(Mono.just(definition)).subscribe();
            this.publisher.publishEvent(new RefreshRoutesEvent(this));
            return "success";
        } catch (Exception ex) {
            return "update route fail";
        }
    }
}

这里我们进行了 事件推送器的操作,更新配置和删除,新增操作,可能没接触过的小伙伴会比较的蒙圈,这里可以去 补充学习一下 Spring5 reactor编程


编写完对应的操作,我们就需要去连接 nacos 之后通过 nacos 的api 获取配置和初始化进行一些操作了


编写 连接 nacos 获取配置


/**
 * <h1>通过 nacos 下发动态路由配置, 监听 Nacos 中路由配置变更</h1>
 * */
@Slf4j
@Component
//这个注解 是一个依赖注解,这里是只一个类加载之后,这个类再加载
@DependsOn({"gatewayConfig"})
public class DynamicRouteServiceImplByNacos {
    /** Nacos 配置服务 */
    private ConfigService configService;
    private final DynamicRouteServiceImpl dynamicRouteService;
    public DynamicRouteServiceImplByNacos(DynamicRouteServiceImpl dynamicRouteService) {
        this.dynamicRouteService = dynamicRouteService;
    }
    /**
     * <h2>Bean 在容器中构造完成之后会执行 init 方法</h2>
     * */
    @PostConstruct
    public void init() {
        log.info("gateway route init....");
        try {
            // 初始化 Nacos 配置客户端
            configService = initConfigService();
            if (null == configService) {
                log.error("init config service fail");
                return;
            }
            // 通过 Nacos Config 并指定路由配置路径去获取路由配置
            String configInfo = configService.getConfig(
                    GatewayConfig.NACOS_ROUTE_DATA_ID,
                    GatewayConfig.NACOS_ROUTE_GROUP,
                    GatewayConfig.DEFAULT_TIMEOUT
            );
            log.info("get current gateway config: [{}]", configInfo);
            List<RouteDefinition> definitionList =
                    JSON.parseArray(configInfo, RouteDefinition.class);
            if (CollectionUtils.isNotEmpty(definitionList)) {
                for (RouteDefinition definition : definitionList) {
                    log.info("init gateway config: [{}]", definition.toString());
                    dynamicRouteService.addRouteDefinition(definition);
                }
            }
        } catch (Exception ex) {
            log.error("gateway route init has some error: [{}]", ex.getMessage(), ex);
        }
        // 设置监听器
        dynamicRouteByNacosListener(GatewayConfig.NACOS_ROUTE_DATA_ID,
                GatewayConfig.NACOS_ROUTE_GROUP);
    }
    /**
     * <h2>初始化 Nacos Config</h2>
     * */
    private ConfigService initConfigService() {
        try {
            Properties properties = new Properties();
            properties.setProperty("serverAddr", GatewayConfig.NACOS_SERVER_ADDR);
            properties.setProperty("namespace", GatewayConfig.NACOS_NAMESPACE);
            return configService = NacosFactory.createConfigService(properties);
        } catch (Exception ex) {
            log.error("init gateway nacos config error: [{}]", ex.getMessage(), ex);
            return null;
        }
    }
    /**
     * <h2>监听 Nacos 下发的动态路由配置</h2>
     * */
    private void dynamicRouteByNacosListener(String dataId, String group) {
        try {
            // 给 Nacos Config 客户端增加一个监听器
            configService.addListener(dataId, group, new Listener() {
                /**
                 * <h2>自己提供线程池执行操作</h2>
                 * */
                @Override
                public Executor getExecutor() {
                    return null;
                }
                /**
                 * <h2>监听器收到配置更新</h2>
                 * @param configInfo Nacos 中最新的配置定义
                 * */
                @Override
                public void receiveConfigInfo(String configInfo) {
                    log.info("start to update config: [{}]", configInfo);
                    List<RouteDefinition> definitionList =
                            JSON.parseArray(configInfo, RouteDefinition.class);
                    log.info("update route: [{}]", definitionList.toString());
                    dynamicRouteService.updateList(definitionList);
                }
            });
        } catch (NacosException ex) {
            log.error("dynamic update gateway config error: [{}]", ex.getMessage(), ex);
        }
    }
}

验证动态配置的可用性


2021-12-08 14:15:58.335  INFO [e-commerce-gateway,,,] 32948 --- [           main] c.i.e.c.DynamicRouteServiceImplByNacos   : gateway route init....
2021-12-08 14:15:58.687  INFO [e-commerce-gateway,,,] 32948 --- [           main] c.i.e.c.DynamicRouteServiceImplByNacos   : get current gateway config: [[
  {
    "id": "e-commerce-nacos-client",
    "predicates": [
      {
        "args": {
          "pattern": "/imooc/ecommerce-nacos-client/**"
        },
        "name": "Path"
      }
    ],
    "uri": "lb://e-commerce-nacos-client"
  }]]
2021-12-08 14:15:58.776  INFO [e-commerce-gateway,,,] 32948 --- [           main] c.i.e.c.DynamicRouteServiceImplByNacos   : init gateway config: [RouteDefinition{id='e-commerce-nacos-client', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]
2021-12-08 14:15:58.776  INFO [e-commerce-gateway,,,] 32948 --- [           main] c.i.e.config.DynamicRouteServiceImpl     : gateway add route: [RouteDefinition{id='e-commerce-nacos-client', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]

可以看到 我们成功的连接到了 nacos 并且拿到了配置


这个时候我们 修改配置 id 变动 这个时候查看我们的控制台

2.png



2021-12-08 14:53:35.641  WARN [e-commerce-gateway,,,] 32948 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
2021-12-08 14:53:37.788  INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.c.DynamicRouteServiceImplByNacos   : start to update config: [[
  {
    "id": "e-commerce-nacos-client1",
    "predicates": [
      {
        "args": {
          "pattern": "/imooc/ecommerce-nacos-client/**"
        },
        "name": "Path"
      }
    ],
    "uri": "lb://e-commerce-nacos-client"
  }]]
2021-12-08 14:53:37.788  INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.c.DynamicRouteServiceImplByNacos   : update route: [[RouteDefinition{id='e-commerce-nacos-client1', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]]
2021-12-08 14:53:37.788  INFO [e-commerce-gateway,,,] 32948 --- [4f-6d1b8c7fd7ea] c.i.e.config.DynamicRouteServiceImpl     : gateway update route: [[RouteDefinition{id='e-commerce-nacos-client1', predicates=[PredicateDefinition{name='Path', args={pattern=/imooc/ecommerce-nacos-client/**}}], filters=[], uri=lb://e-commerce-nacos-client, order=0, metadata={}}]]

就可以看到我们配置更新 日志打出 证明 我们可以在项目启动的时候动态的修改路由配置,网关随着负责增加,需要频繁的变更,所以我们这里才会使用动态配置。


SpringCloud Gateway Filter

认识过滤器 , SpringCloud Gateway Filter


基于过滤器的思想实现,与 zuul 类似 。有 pre 和 post 两种方式都filter,分别处理前置逻辑和后置逻辑


前置 : 客户端请求会经过pre类型的filter 然后将请求转发到具体的业务服务,


**后置:**收到服务端响应后 经过 post 类型的filter 处理 最后返回给客户端


**Filter有两大类别:**全局过滤器和局部过滤器


这里我们查看一下Gateway给我们提供的 局部和全局过滤器的各别思路


全局的过滤器

2.png



这里我们可以看到,每一个全局过滤器都需要实现 全局过滤器接口和对应的 filter方法,下面我们来看一下其中一个实现类


RouteToRequestUrlFilter


这个类的核心方法,我们来解读一下这个方法的作用 (以对应代码部分的注释的方式解读)


@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    //传入路由对象,从前一个过滤器
   Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
   //判断是否为null 就继续通过
    if (route == null) {
      return chain.filter(exchange);
   }
    //判断是否有 uri 获取到之后 创建一个新的uri
   log.trace("RouteToRequestUrlFilter start");
   URI uri = exchange.getRequest().getURI();
   boolean encoded = containsEncodedParts(uri);
   URI routeUri = route.getUri();
   if (hasAnotherScheme(routeUri)) {
      // this is a special url, save scheme to special attribute
      // replace routeUri with schemeSpecificPart
      exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
            routeUri.getScheme());
      routeUri = URI.create(routeUri.getSchemeSpecificPart());
   }
    // 如果uri 前面有 lb 就是告诉gateway 不能从uri拿到服务了,要去对应的注册中心获取,此时再用服务地址会抛出异常
   if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
      // Load balanced URIs should always have a host. If the host is null it is
      // most
      // likely because the host name was invalid (for example included an
      // underscore)
      throw new IllegalStateException("Invalid host: " + routeUri.toString());
   }
    //这部分就是 将微服务的地址,转换成 uri的服务地址,方便调用服务,新生成的uri 会继续往下传递
   URI mergedUrl = UriComponentsBuilder.fromUri(uri)
         // .uri(routeUri)
         .scheme(routeUri.getScheme()).host(routeUri.getHost())
         .port(routeUri.getPort()).build(encoded).toUri();
   exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
   return chain.filter(exchange);
}

局部过滤器


局部过滤器主要方法 都是返回一个 GatewayFilter对象


PrefixPathGatewayFilterFactory 局部 前置过滤器


/**
 * @author Spencer Gibb
 */
public class PrefixPathGatewayFilterFactory
    extends AbstractGatewayFilterFactory<PrefixPathGatewayFilterFactory.Config> {
  /**
   * Prefix key.
   */
    //代表他是一个 Pre类型的
  public static final String PREFIX_KEY = "prefix";
  private static final Log log = LogFactory
      .getLog(PrefixPathGatewayFilterFactory.class);
  public PrefixPathGatewayFilterFactory() {
    super(Config.class);
  }
  @Override
  public List<String> shortcutFieldOrder() {
    return Arrays.asList(PREFIX_KEY);
  }
  @Override
  public GatewayFilter apply(Config config) {
    return new GatewayFilter() {
      @Override
      public Mono<Void> filter(ServerWebExchange exchange,
          GatewayFilterChain chain) {
                //校验 是否添加了前缀
        boolean alreadyPrefixed = exchange
            .getAttributeOrDefault(GATEWAY_ALREADY_PREFIXED_ATTR, false);
        if (alreadyPrefixed) {
          return chain.filter(exchange);
        }
        exchange.getAttributes().put(GATEWAY_ALREADY_PREFIXED_ATTR, true);
        ServerHttpRequest req = exchange.getRequest();
        addOriginalRequestUrl(exchange, req.getURI());
        String newPath = config.prefix + req.getURI().getRawPath();
//构造新的 路径 获取到请求 获取到 url 添加一个前缀 ,之后重新构造一个url request
        ServerHttpRequest request = req.mutate().path(newPath).build();
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());
        if (log.isTraceEnabled()) {
          log.trace("Prefixed URI with: " + config.prefix + " -> "
              + request.getURI());
        }
        return chain.filter(exchange.mutate().request(request).build());
      }
      @Override
      public String toString() {
        return filterToStringCreator(PrefixPathGatewayFilterFactory.this)
            .append("prefix", config.getPrefix()).toString();
      }
    };
  }
  public static class Config {
    private String prefix;
    public String getPrefix() {
      return prefix;
    }
    public void setPrefix(String prefix) {
      this.prefix = prefix;
    }
  }
}

StripPrefixGatewayFilterFactory 后置过滤器


这里 StripPrefix的意思就是 去掉前缀,


/**
 * This filter removes the first part of the path, known as the prefix, from the request
 * before sending it downstream.
 *
 * @author Ryan Baxter
 */
public class StripPrefixGatewayFilterFactory
    extends AbstractGatewayFilterFactory<StripPrefixGatewayFilterFactory.Config> {
  /**
   * Parts key.
   */
  public static final String PARTS_KEY = "parts";
  public StripPrefixGatewayFilterFactory() {
    super(Config.class);
  }
  @Override
  public List<String> shortcutFieldOrder() {
    return Arrays.asList(PARTS_KEY);
  }
  @Override
  public GatewayFilter apply(Config config) {
    return new GatewayFilter() {
      @Override
      public Mono<Void> filter(ServerWebExchange exchange,
          GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        addOriginalRequestUrl(exchange, request.getURI());
                //获取到原始的 uri
        String path = request.getURI().getRawPath();
        String newPath = "/"
            + Arrays.stream(StringUtils.tokenizeToStringArray(path, "/"))
                      //去掉相应的前缀   
          .skip(config.parts).collect(Collectors.joining("/"));
                //之后去构建一个 新的 path
        newPath += (newPath.length() > 1 && path.endsWith("/") ? "/" : "");
        ServerHttpRequest newRequest =        request.mutate().path(newPath).build(); 
//之后转发
        exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR,
            newRequest.getURI());
        return chain.filter(exchange.mutate().request(newRequest).build());
      }
      @Override
      public String toString() {
        return filterToStringCreator(StripPrefixGatewayFilterFactory.this)
            .append("parts", config.getParts()).toString();
      }
    };
  }
  public static class Config {
    private int parts;
    public int getParts() {
      return parts;
    }
    public void setParts(int parts) {
      this.parts = parts;
    }
  }
}

过滤器的执行流程


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J2DcdAbW-1639027996653)(springcloudalibaba项目.assets/image-20211209003355832.png)]


过滤器有优先级之分,Order越大 优先级越来越低,越晚被执行

全局过滤器 所有的请求都会执行

局部过滤器只有配置了对应请求才会执行


相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
2天前
|
算法 NoSQL API
SpringCloud&Gateway网关限流
SpringCloud&Gateway网关限流
42 7
|
2天前
|
Java 应用服务中间件 Nacos
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
37 0
|
2天前
|
负载均衡 Java 网络安全
gateway基本配置
gateway基本配置
27 4
|
2天前
|
架构师 Java API
Gateway基本配置
Gateway基本配置
17 0
|
2天前
|
安全 Java API
gateway基本配置
【5月更文挑战第7天】API Gateway在微服务架构中起着关键作用,作为客户端与后端服务的统一入口,负责路由转发、安全控制和负载均衡。本文深入介绍了API Gateway的基本配置、常见问题、跨平台配置差异及避免错误的方法。内容包括路由和过滤器配置、动态路由、安全性配置、限流和熔断机制,以及自定义过滤器和服务降级策略。通过示例代码和实践指南,帮助读者理解和部署API Gateway。
31 3
|
2天前
|
负载均衡 监控 Java
新手入门gateway基本配置详解与深入分析
欢迎关注 `威哥爱编程` 一起交流学习,人生海海,相遇就是缘分,让我们以技术为信物,成为相互惦记的人。
|
2天前
|
Java 微服务 Spring
SpringCloud&Gateway全局过滤器
SpringCloud&Gateway全局过滤器
11 1
|
2天前
|
监控 Java API
第七章 Spring Cloud 之 GateWay
第七章 Spring Cloud 之 GateWay
23 0
|
2天前
|
Java Maven Nacos
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
31 0
|
2天前
|
人工智能 监控 安全
Java+Spring Cloud +Vue+UniApp微服务智慧工地云平台源码
视频监控系统、人员实名制与分账制管理系统、车辆管理系统、环境监测系统、大型设备监测(龙门吊、塔吊、升降机、卸料平台等)、用电监测系统、基坑监测系统、AI算法分析(安全帽佩戴、火焰识别、周界报警、人员聚众报警、升降机超载报警)、安全培训、设备监测。
31 4