Spring Cloud Zuul 源码解析

简介: Spring Cloud Zuul 源码解析

Zuul 架构图

Zuul的官方文档中的架构图
image.png
从架构图中可以看到 Zuul 通过Zuul Servlet 和 一系列 Zuul Filter 来完成智能路由和过滤器的功能。

Zuul 工作原理概述(转)

Zuul中,整个请求的过程是这样的,首先将请求给 ZuulServlet 处理,ZuulServlet中有一个ZuulRunner对象,该对象中初始化了RequestContext, RequestContext 作为整个请求的上下文,封装了请求的一些数据,并被所有的ZuulFilter共享。
ZuulRunner 中还有 FilterProcessorFilterProcessor作为执行所有的ZuulFilter的管理器。FilterProcessor FilterLoader 中获取ZuulFilter,而ZuulFilter是被FilterFileManager所加载,并支持groovy热加载,采用了轮询的方式热加载。
有了这些Filter之后,ZuulServlet首先执行的pre类型的过滤器,再执行route类型的过滤器,最后执行的是post 类型的过滤器。
如果在执行这些过滤器有错误的时候则会执行error类型的过滤器。执行完这些过滤器,最终将请求的结果返回给客户端。

Zuul 启动—源码分析

在程序的启动类上加@EnableZuulProxy 注解,我们可以使用Zuul 提供的功能了,该注解的源码为:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({ZuulProxyMarkerConfiguration.class})
public @interface EnableZuulProxy {
}

源码中,@EnableZuulProxy 引入了ZuulProxyMarkerConfiguration 配置类,跟踪ZuulProxyMarkerConfiguration 类:

public class ZuulProxyMarkerConfiguration {
    public ZuulProxyMarkerConfiguration() {
    }

    @Bean
    public ZuulProxyMarkerConfiguration.Marker zuulProxyMarkerBean() {
        return new ZuulProxyMarkerConfiguration.Marker();
    }

    class Marker {
        Marker() {
        }
    }
}

ZuulProxyMarkerConfiguration 配置类中,发现只是注册了一个ZuulProxyMarkerConfiguration.Markerbean。我们通过分析应该会有依赖这个bean的配置类。然后我们找到了 ZuulProxyAutoConfiguration 依赖了ZuulProxyMarkerConfiguration.Markerbean
跟踪 ZuulProxyAutoConfiguration

@Configuration
@Import({RestClientRibbonConfiguration.class, OkHttpRibbonConfiguration.class, HttpClientRibbonConfiguration.class, HttpClientConfiguration.class})
@ConditionalOnBean({Marker.class})
public class ZuulProxyAutoConfiguration extends ZuulServerAutoConfiguration {
  
    @Bean
    @ConditionalOnMissingBean({DiscoveryClientRouteLocator.class})
    public DiscoveryClientRouteLocator discoveryRouteLocator() {
        return new DiscoveryClientRouteLocator(this.server.getServlet().getContextPath(), this.discovery, this.zuulProperties, this.serviceRouteMapper, this.registration);
    }

    @Bean
    @ConditionalOnMissingBean({PreDecorationFilter.class})
    public PreDecorationFilter preDecorationFilter(RouteLocator routeLocator, ProxyRequestHelper proxyRequestHelper) {
        return new PreDecorationFilter(routeLocator, this.server.getServlet().getContextPath(), this.zuulProperties, proxyRequestHelper);
    }

    @Bean
    @ConditionalOnMissingBean({RibbonRoutingFilter.class})
    public RibbonRoutingFilter ribbonRoutingFilter(ProxyRequestHelper helper, RibbonCommandFactory<?> ribbonCommandFactory) {
        RibbonRoutingFilter filter = new RibbonRoutingFilter(helper, ribbonCommandFactory, this.requestCustomizers);
        return filter;
    }

    @Bean
    @ConditionalOnMissingBean({SimpleHostRoutingFilter.class, CloseableHttpClient.class})
    public SimpleHostRoutingFilter simpleHostRoutingFilter(ProxyRequestHelper helper, ZuulProperties zuulProperties, ApacheHttpClientConnectionManagerFactory connectionManagerFactory, ApacheHttpClientFactory httpClientFactory) {
        return new SimpleHostRoutingFilter(helper, zuulProperties, connectionManagerFactory, httpClientFactory);
    }

    @Bean
    @ConditionalOnMissingBean({SimpleHostRoutingFilter.class})
    public SimpleHostRoutingFilter simpleHostRoutingFilter2(ProxyRequestHelper helper, ZuulProperties zuulProperties, CloseableHttpClient httpClient) {
        return new SimpleHostRoutingFilter(helper, zuulProperties, httpClient);
    }

    @Bean
    @ConditionalOnMissingBean({ServiceRouteMapper.class})
    public ServiceRouteMapper serviceRouteMapper() {
        return new SimpleServiceRouteMapper();
    }
}

我们发现在类ZuulProxyAutoConfiguration中,引入了RestClientRibbonConfiguration, OkHttpRibbonConfiguration, HttpClientRibbonConfiguration, HttpClientConfigurationZuul默认是用HttpClientRibbonConfiguration做负载均衡配置, 注入了DiscoveryClientRibbonConfiguration用作负载均衡相关。注入了一些列的Filter,
pre类型: PreDecorationFilter ; // 装饰 Request
route类型: RibbonRoutingFilter , SimpleHostRoutingFilter ; // 路由Filter
ZuulProxyAutoConfiguration 的父类ZuulServerAutoConfiguration中,也引入了一些配置信息:

@EnableConfigurationProperties({ZuulProperties.class})
@ConditionalOnClass({ZuulServlet.class, ZuulServletFilter.class})
@ConditionalOnBean({Marker.class})
public class ZuulServerAutoConfiguration {

    // 在缺失`ZuulServlet`的情况下注入`ZuulServlet`
    @Bean
    @ConditionalOnMissingBean(
        name = {"zuulServlet"}
    )
    @ConditionalOnProperty(
        name = {"zuul.use-filter"},
        havingValue = "false",
        matchIfMissing = true
    )
    public ServletRegistrationBean zuulServlet() {
        ServletRegistrationBean<ZuulServlet> servlet = new ServletRegistrationBean(new ZuulServlet(), new String[]{this.zuulProperties.getServletPattern()});
        servlet.addInitParameter("buffer-requests", "false");
        return servlet;
    }
    
    // 在缺失`ZuulServletFilter`的情况下注入`ZuulServletFilter`
    @Bean
    @ConditionalOnMissingBean(
        name = {"zuulServletFilter"}
    )
    @ConditionalOnProperty(
        name = {"zuul.use-filter"},
        havingValue = "true",
        matchIfMissing = false
    )
    public FilterRegistrationBean zuulServletFilter() {
        FilterRegistrationBean<ZuulServletFilter> filterRegistration = new FilterRegistrationBean();
        filterRegistration.setUrlPatterns(Collections.singleton(this.zuulProperties.getServletPattern()));
        filterRegistration.setFilter(new ZuulServletFilter());
        filterRegistration.setOrder(2147483647);
        filterRegistration.addInitParameter("buffer-requests", "false");
        return filterRegistration;
    }
    // 注入  `ServletDetectionFilter`
    @Bean
    public ServletDetectionFilter servletDetectionFilter() {
        return new ServletDetectionFilter();
    }

    // 注入  `FormBodyWrapperFilter`
    @Bean
    public FormBodyWrapperFilter formBodyWrapperFilter() {
        return new FormBodyWrapperFilter();
    }

    // 注入  `DebugFilter`
    @Bean
    public DebugFilter debugFilter() {
        return new DebugFilter();
    }

   // 注入  `Servlet30WrapperFilter`
    @Bean
    public Servlet30WrapperFilter servlet30WrapperFilter() {
        return new Servlet30WrapperFilter();
    }
    
  // 注入  `SendResponseFilter`
    @Bean
    public SendResponseFilter sendResponseFilter(ZuulProperties properties) {
        return new SendResponseFilter(this.zuulProperties);
    }

   // 注入  `SendErrorFilter`
    @Bean
    public SendErrorFilter sendErrorFilter() {
        return new SendErrorFilter();
    }
 
  // 注入  `SendForwardFilter`
    @Bean
    public SendForwardFilter sendForwardFilter() {
        return new SendForwardFilter();
    }
    
    
     @Configuration
    protected static class ZuulFilterConfiguration {
        @Autowired
        private Map<String, ZuulFilter> filters;

        protected ZuulFilterConfiguration() {
        }
       //  ZuulFilterInitializer,在初始化类中将Filter向FilterRegistry注册
        @Bean
        public ZuulFilterInitializer zuulFilterInitializer(CounterFactory counterFactory, TracerFactory tracerFactory) {
            FilterLoader filterLoader = FilterLoader.getInstance();
            FilterRegistry filterRegistry = FilterRegistry.instance();
            return new ZuulFilterInitializer(this.filters, counterFactory, tracerFactory, filterLoader, filterRegistry);
        }
    }
}

父类ZuulServerAutoConfiguration中,在缺失ZuulServletZuulServletFilterbean的情况下,注入ZuulServletZuulServletFilter。同时也注入了其他的过滤器,
pre类型: ServletDetectionFilterDebugFilterServlet30WrapperFilter ;
post类型: SendResponseFilter ; // 响应处理Filter
route类型: SendForwardFilter ; // 重定向处理Filter
error类型 : SendErrorFilter ; // 错误处理Filter

初始化ZuulFilterInitializer类,通过FilterLoader 将所有的FilterFilterRegistry注册。我们看一下ZuulFilterInitializer类中部分代码

public class ZuulFilterInitializer {
   
    // 初始化完成后注册所有的Filter
    @PostConstruct
    public void contextInitialized() {
        log.info("Starting filter initializer");
        TracerFactory.initialize(this.tracerFactory);
        CounterFactory.initialize(this.counterFactory);
        Iterator var1 = this.filters.entrySet().iterator();

        while(var1.hasNext()) {
            Entry<String, ZuulFilter> entry = (Entry)var1.next();
            this.filterRegistry.put((String)entry.getKey(), (ZuulFilter)entry.getValue());
        }

    }
    // 销毁前移除所有注册所有的Filter
    @PreDestroy
    public void contextDestroyed() {
        log.info("Stopping filter initializer");
        Iterator var1 = this.filters.entrySet().iterator();

        while(var1.hasNext()) {
            Entry<String, ZuulFilter> entry = (Entry)var1.next();
            this.filterRegistry.remove((String)entry.getKey());
        }

        this.clearLoaderCache();
        TracerFactory.initialize((TracerFactory)null);
        CounterFactory.initialize((CounterFactory)null);
    }
}

Zuul 路由-源码分析

Filter 的执行

我们站在了源码的角度分析了 Zuul启动过程。接下来我们通过源码来分析,我们注入的FilterZuul 在一次路由的过程是怎样的执行的。在上部分内容,我们介绍了Zuul的核心类 ZuulServlet是所有请求的入口,我们来进入 ZuulServlet的源码:

public class ZuulServlet extends HttpServlet {

      public ZuulServlet() {
      }
      // 在初始化方法里初始化了ZuulRunner
      public void init(ServletConfig config) throws ServletException {
          super.init(config);
          String bufferReqsStr = config.getInitParameter("buffer-requests");
          boolean bufferReqs = bufferReqsStr != null && bufferReqsStr.equals("true");
          this.zuulRunner = new ZuulRunner(bufferReqs);
      }
      // 标准的Servlet的service方法
      public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException, IOException {
          try {
              // 调用ZuulRunner的init方法
              this.init((HttpServletRequest)servletRequest, (HttpServletResponse)servletResponse);
              RequestContext context = RequestContext.getCurrentContext();
              context.setZuulEngineRan();

              try {
                 // 调用ZuulRunner的preRoute方法
                  this.preRoute();
              } catch (ZuulException var12) {
                  this.error(var12);
                 // 调用ZuulRunner的postRoute方法
                  this.postRoute();
                  return;
              }

              try {
                  this.route();
              } catch (ZuulException var13) {
                  this.error(var13);
                  this.postRoute();
                  return;
              }

              try {
                  this.postRoute();
              } catch (ZuulException var11) {
                  this.error(var11);
              }
          } catch (Throwable var14) {
              this.error(new ZuulException(var14, 500, "UNHANDLED_EXCEPTION_" + var14.getClass().getName()));
          } finally {
              RequestContext.getCurrentContext().unset();
          }
      }
  }

ZuulServlet 源码中, service方法调用 this.init(request,response) , 跟进 init(request,response)方法:

  void init(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {
        this.zuulRunner.init(servletRequest, servletResponse);
    }

发现调用的是 ZuulServlet 持有的 ZuulRunnerinit方法,进入 ZuulRunnerinit方法:

 public void init(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {
        RequestContext ctx = RequestContext.getCurrentContext();
        if (this.bufferRequests) {
            ctx.setRequest(new HttpServletRequestWrapper(servletRequest));
        } else {
            ctx.setRequest(servletRequest);
        }

        ctx.setResponse(new HttpServletResponseWrapper(servletResponse));
    }

ZuulRunnerinit方法中,我们发现只是对Request,Response进行了有条件的包装。
我们回退到 ZuulServlet 的方法中,在执行完 init 方法后,调用 this.preRoute() , 跟进preRoute()方法:

  void preRoute() throws ZuulException {
        this.zuulRunner.preRoute();
    }

调用了 ZuulRunnerpreRoute()方法, 进入ZuulRunner.preRoute() :

   public void preRoute() throws ZuulException {
        FilterProcessor.getInstance().preRoute();
    }

ZuulRunner.preRoute() 中获取了一个FilterProcessor的实例并且执行了其preRoute() 方法,进入FilterProcessor.preRoute()方法:

 public void preRoute() throws ZuulException {
        try {
            this.runFilters("pre");
        } catch (ZuulException var2) {
            throw var2;
        } catch (Throwable var3) {
            throw new ZuulException(var3, 500, "UNCAUGHT_EXCEPTION_IN_PRE_FILTER_" + var3.getClass().getName());
        }
    }

FilterProcessor.preRoute()方法中,执行this.runFilters()方法并且传入参数pre,进入this.runFilters()方法中:

    public Object runFilters(String sType) throws Throwable {
        if (RequestContext.getCurrentContext().debugRouting()) {
            Debug.addRoutingDebug("Invoking {" + sType + "} type filters");
        }

        boolean bResult = false;
        //通过FilterLoader的实例获取所有pre的Filter
        List<ZuulFilter> list = FilterLoader.getInstance().getFiltersByType(sType);
        if (list != null) {
            for(int i = 0; i < list.size(); ++i) {
                ZuulFilter zuulFilter = (ZuulFilter)list.get(i);
                // 执行Filter
                Object result = this.processZuulFilter(zuulFilter);
                if (result != null && result instanceof Boolean) {
                    bResult |= (Boolean)result;
                }
            }
        }

        return bResult;
    }

this.runFilters()方法中通过FilterLoader的实例获取所有pre类型的Filter,并调用this.processZuulFilte(Filter) 执行所有pre类型的Filter
上述分析过程中我们了解了pre类型的Filter在一次路由中优先执行,我们通过一个简单的图了加深一下这个过程
image.png
这就是一个完整的Filter的执行过程,routepost 类型的Filter执行过程也是一致的。

Filter 路由

上部分内容我们了解Filter的执行入口,这部分内容我们来了解Zuul是怎么选择路由和负载均衡的。
在第一部分Zuul的启动过程中,Zuul 注入了pre类型的Filter,有PreDecorationFilter通过名字我们可以猜测,这个PreDecorationFilter是起到装饰Filter的作用,我们进入PreDecorationFilter.run()源码:

 public Object run() {
      RequestContext ctx = RequestContext.getCurrentContext();
      String requestURI = this.urlPathHelper.getPathWithinApplication(ctx.getRequest());
      // 获取请求路径匹配的路由信息
      Route route = this.routeLocator.getMatchingRoute(requestURI);
      String location;
      if (route != null) {
          location = route.getLocation();
          if (location != null) {
              // 在RequextContext中放入请求路径,路由的标识
              ctx.put("requestURI", route.getPath());
              ctx.put("proxy", route.getId());
              if (!route.isCustomSensitiveHeaders()) {
                  this.proxyRequestHelper.addIgnoredHeaders((String[])this.properties.getSensitiveHeaders().toArray(new String[0]));
              } else {
                  this.proxyRequestHelper.addIgnoredHeaders((String[])route.getSensitiveHeaders().toArray(new String[0]));
              }
              //在RequextContext中放入该请求是否是可重试的标识
              if (route.getRetryable() != null) {
                  ctx.put("retryable", route.getRetryable());
              }

              if (!location.startsWith("http:") && !location.startsWith("https:")) {
                  if (location.startsWith("forward:")) {
                      ctx.set("forward.to", StringUtils.cleanPath(location.substring("forward:".length()) + route.getPath()));
                      ctx.setRouteHost((URL)null);
                      return null;
                  }
                 //设置路由配置的ServiceId
                  ctx.set("serviceId", location);
                  ctx.setRouteHost((URL)null);
                  ctx.addOriginResponseHeader("X-Zuul-ServiceId", location);
              } else {
                  ctx.setRouteHost(this.getUrl(location));
                  ctx.addOriginResponseHeader("X-Zuul-Service", location);
              }

        
          }
      } else {
          log.warn("No route found for uri: " + requestURI);
          location = this.getForwardUri(requestURI);
          ctx.set("forward.to", location);
      }

      return null;
  }

run方法中,获取和当前路径匹配的路由信息,将路由相关信息放入RequestContenxt中,路由信息是读取配置文件中的配置。并且放入了一个很重要的标识retryable这个就是决定我们这次请求是否可重试的开关,而这个读取配置文件中的zuul.retryable来决定的。
Zuul到底是怎么进行负载均衡的呢?
我们知道Zuul负载均衡底层是通过Ribbon来实现的,并且在启动Zuul的时候,我们注入了一个RibbonRoutingFilter的过滤器。这个类很重要,它主要是完成请求的路由转发。接下来我们看下他的 run方法

    public Object run() {
      RequestContext context = RequestContext.getCurrentContext();
      this.helper.addIgnoredHeaders(new String[0]);

      try {
          RibbonCommandContext commandContext = this.buildCommandContext(context);
          ClientHttpResponse response = this.forward(commandContext);
          this.setResponse(response);
          return response;
      } catch (ZuulException var4) {
          throw new ZuulRuntimeException(var4);
      } catch (Exception var5) {
          throw new ZuulRuntimeException(var5);
      }
  }

run中 可以看到,先构建了一个 RibbonCommandContext 然后通过forward()方法转发的,进入forward方法:


    protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
        Map<String, Object> info = this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity());
        RibbonCommand command = this.ribbonCommandFactory.create(context);

        try {
            ClientHttpResponse response = (ClientHttpResponse)command.execute();
            this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
            return response;
        } catch (HystrixRuntimeException var5) {
            return this.handleException(info, var5);
        }
    }

forward中 可以看到,通过 RibbonCommandFactory 创建一个RibbonCommand,然后执行RibbonCommandexecute方法,
这个 RibbonCommandFactory 是什么时候注入的呢?
Zuul启动的时候,在 ZuulProxyAutoConfiguration 配置类引入了HttpClientRibbonConfiguration 配置类

@Configuration
@Import({RestClientRibbonConfiguration.class, OkHttpRibbonConfiguration.class, HttpClientRibbonConfiguration.class, HttpClientConfiguration.class})
@ConditionalOnBean({Marker.class})
public class ZuulProxyAutoConfiguration extends ZuulServerAutoConfiguration {
 }

HttpClientRibbonConfiguration 中注入了 RibbonCommandFactory ,源码如下:

    @Configuration
    @RibbonCommandFactoryConfiguration.ConditionalOnRibbonHttpClient
    protected static class HttpClientRibbonConfiguration {
        @Autowired(
            required = false
        )
        private Set<FallbackProvider> zuulFallbackProviders = Collections.emptySet();

        protected HttpClientRibbonConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public RibbonCommandFactory<?> ribbonCommandFactory(SpringClientFactory clientFactory, ZuulProperties zuulProperties) {
            return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, this.zuulFallbackProviders);
        }
    }

这个时候我们就明白了 RibbonCommandFactory 是何时注入的了,
然后我们在看一下 RibbonCommandFactory.create() 创建RibbonCommand的方法:
image.png
Zuul 默认使用HttpClientRibbonCommandFactory,进入到create()方法:

   public HttpClientRibbonCommand create(final RibbonCommandContext context) {
       /**
          *服务降级
          */ 
       FallbackProvider zuulFallbackProvider = this.getFallbackProvider(context.getServiceId());
        String serviceId = context.getServiceId();
         /**
            *负载均衡类,处理请求转发类
        */
        RibbonLoadBalancingHttpClient client = (RibbonLoadBalancingHttpClient)this.clientFactory.getClient(serviceId, RibbonLoadBalancingHttpClient.class);
        client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
        /**
          *将降级、负载,请求转发类、以及其他一些内容
          *包装成HttpClientRibbonCommand(这个类继承了HystrixCommand)
          */ 
       return new HttpClientRibbonCommand(serviceId, client, context, this.zuulProperties, zuulFallbackProvider, this.clientFactory.getClientConfig(serviceId));
    }

create 方法中,分别创建了FallbackProviderRibbonLoadBalancingHttpClient ,从命名上我们就可以知道, FallbackProvider和熔断相关,RibbonLoadBalancingHttpClient和负载均衡相关。然后将这些作为参数创建了HttpClientRibbonCommand 。我们看一下HttpClientRibbonCommand的继承关系。
企业微信20200409031803.png
从类继承关系可以看出HttpClientRibbonCommand继承了AbstractRibbonCommand,并且AbstractRibbonCommand继承了HystrixCommand
这样我们就了解到HystrixCommand是如何集成进来的了。
HystrixCommand中定义了run抽象接口,并且在AbstractRibbonCommand中实现了该接口。
我们回退到RibbonRoutingFilter

   protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
        Map<String, Object> info = this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity());
        RibbonCommand command = this.ribbonCommandFactory.create(context);

        try {
            ClientHttpResponse response = (ClientHttpResponse)command.execute();
            this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
            return response;
        } catch (HystrixRuntimeException var5) {
            return this.handleException(info, var5);
        }
    }

当完成了RibbonCommand 创建工作后,执行的command.execute() 方法,通过刚刚的分析我们知道了command其实指的是HttpClientRibbonCommand,同时我们也知道HttpClientRibbonCommand继承了HystrixCommand,所以当执行command.execute()时,其实执行的是HttpClientRibbonCommandrun方法。查看源码我们并没有发现run方法,但是其父类AbstractRibbonCommand实现了run方法。所以其实执行的是AbstractRibbonCommandrun方法,进入AbstractRibbonCommandrun方法:

   protected ClientHttpResponse run() throws Exception {
        RequestContext context = RequestContext.getCurrentContext();
        RQ request = this.createRequest();
        boolean retryableClient = this.client instanceof AbstractLoadBalancingClient && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
        HttpResponse response;
        if (retryableClient) {
            response = (HttpResponse)this.client.execute(request, this.config);
        } else {
            response = (HttpResponse)this.client.executeWithLoadBalancer(request, this.config);
        }

        context.set("ribbonResponse", response);
        if (this.isResponseTimedOut() && response != null) {
            response.close();
        }

        return new RibbonHttpResponse(response);
    }

run 方法中,先判断是否是重试的client,通过分析,第一次执行的时候,clientRibbonLoadBalancingHttpClient 从而会调用executeWithLoadBalancer() 方法,但是RibbonLoadBalancingHttpClient并没有executeWithLoadBalancer() 方法,查看而类继承关系图,
企业微信20200409034247.png

其父类AbstractLoadBalancerAwareClient实现了executeWithLoadBalancer() ,进入AbstractLoadBalancerAwareClient.executeWithLoadBalancer()方法:

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {

        LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);

        try {
            return (IResponse)command.submit(new ServerOperation<T>() {
                public Observable<T> call(Server server) {
                    URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
                    ClientRequest requestForServer = request.replaceUri(finalUri);

                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } catch (Exception var5) {
                        return Observable.error(var5);
                    }
                }
            }).toBlocking().single();
        } catch (Exception var6) {
            Throwable t = var6.getCause();
            if (t instanceof ClientException) {
                throw (ClientException)t;
            } else {
                throw new ClientException(var6);
            }
        }
    }

executeWithLoadBalancer 方法中,首先进入buildLoadBalancerCommand() 方法:

   protected LoadBalancerCommand<T> buildLoadBalancerCommand(S request, IClientConfig config) {
    /**
     * 创建一个RetryHandler,这个很重要它是用来
     * 决定利用RxJava的Observable是否进行重试的标准。
     */
        RequestSpecificRetryHandler handler = this.getRequestSpecificRetryHandler(request, config);
        Builder<T> builder = LoadBalancerCommand.builder().withLoadBalancerContext(this).withRetryHandler(handler).withLoadBalancerURI(request.getUri());
        this.customizeLoadBalancerCommandBuilder(request, config, builder);
        return builder.build();
    }

buildLoadBalancerCommand() 中通过了 getRequestSpecificRetryHandler()获取 RequestSpecificRetryHandler 的处理类,这个是个抽象方法,在子类AbstractLoadBalancingClient中实现

  public RequestSpecificRetryHandler getRequestSpecificRetryHandler(final S request, final IClientConfig requestConfig) {
        if (this.okToRetryOnAllOperations) {
            return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
        } else {
            return !request.getContext().getMethod().equals("GET") ? new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig) : new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
        }
    }

在方法中我们可以看到重试的默认机制。若不配置ribbon.OkToRetryOnAllOperations=true, 默认只是连接失败GET请求失败才会发生重试。
回到executeWithLoadBalancer()方法中:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);

        try {
            return (IResponse)command.submit(new ServerOperation<T>() {
                public Observable<T> call(Server server) {
                    URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
                    ClientRequest requestForServer = request.replaceUri(finalUri);

                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } catch (Exception var5) {
                        return Observable.error(var5);
                    }
                }
            }).toBlocking().single();
        } catch (Exception var6) {
            Throwable t = var6.getCause();
            if (t instanceof ClientException) {
                throw (ClientException)t;
            } else {
                throw new ClientException(var6);
            }
        }
    }

调用command.submit() ,创建了一个Observable (RxJava)并且最终会调用AbstractLoadBalancerAwareClient.execute ()方法

   public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
        IClientConfig config = configOverride != null ? configOverride : this.config;
        RibbonProperties ribbon = RibbonProperties.from(config);
        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(ribbon.connectTimeout(this.connectTimeout)).setSocketTimeout(ribbon.readTimeout(this.readTimeout)).setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects)).setContentCompressionEnabled(ribbon.isGZipPayload(this.gzipPayload)).build();
        request = this.getSecureRequest(request, configOverride);
        HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
        HttpResponse httpResponse = ((CloseableHttpClient)this.delegate).execute(httpUriRequest);
        return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
    }

execute()方法中,我们看到创建连接并且发送了http请求并将结果返回。
回退到submit()方法中,创建了Observable 会监听execute()的执行状态从而决定是否重试请求:

public Observable<T> submit(final ServerOperation<T> operation) {
        final LoadBalancerCommand<T>.ExecutionInfoContext context = new LoadBalancerCommand.ExecutionInfoContext();
        if (this.listenerInvoker != null) {
            try {
                this.listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException var6) {
                return Observable.error(var6);
            }
        }
     /**
      * 相同server重试次数,去除首次
    */
        final int maxRetrysSame = this.retryHandler.getMaxRetriesOnSameServer();
        /**
          * 集群内其他server重试次数
        **/
        final int maxRetrysNext = this.retryHandler.getMaxRetriesOnNextServer();
        /**
        * 创建一个Observable(RxJava)
        **/
        Observable<T> o = (this.server == null ? this.selectServer() : Observable.just(this.server)).concatMap(new Func1<Server, Observable<T>>() {
            public Observable<T> call(Server server) {
                context.setServer(server);
                final ServerStats stats = LoadBalancerCommand.this.loadBalancerContext.getServerStats(server);
                Observable<T> o = Observable.just(server).concatMap(new Func1<Server, Observable<T>>() {
                    public Observable<T> call(final Server server) {
                        context.incAttemptCount();
                        LoadBalancerCommand.this.loadBalancerContext.noteOpenConnection(stats);
                        if (LoadBalancerCommand.this.listenerInvoker != null) {
                            try {
                                LoadBalancerCommand.this.listenerInvoker.onStartWithServer(context.toExecutionInfo());
                            } catch (AbortExecutionException var3) {
                                return Observable.error(var3);
                            }
                        }

                        final Stopwatch tracer = LoadBalancerCommand.this.loadBalancerContext.getExecuteTracer().start();
                        return operation.call(server).doOnEach(new Observer<T>() {
                            private T entity;

                            public void onCompleted() {
                                this.recordStats(tracer, stats, this.entity, (Throwable)null);
                            }

                            public void onError(Throwable e) {
                                this.recordStats(tracer, stats, (Object)null, e);
                                LoadBalancerCommand.logger.debug("Got error {} when executed on server {}", e, server);
                                if (LoadBalancerCommand.this.listenerInvoker != null) {
                                    LoadBalancerCommand.this.listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                }

                            }

                            public void onNext(T entity) {
                                this.entity = entity;
                                if (LoadBalancerCommand.this.listenerInvoker != null) {
                                    LoadBalancerCommand.this.listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                }

                            }

                            private void recordStats(Stopwatch tracerx, ServerStats statsx, Object entity, Throwable exception) {
                                tracerx.stop();
                                LoadBalancerCommand.this.loadBalancerContext.noteRequestCompletion(statsx, entity, exception, tracerx.getDuration(TimeUnit.MILLISECONDS), LoadBalancerCommand.this.retryHandler);
                            }
                        });
                    }
                });
                if (maxRetrysSame > 0) {
                    o = o.retry(LoadBalancerCommand.this.retryPolicy(maxRetrysSame, true));
                }

                return o;
            }
        });
        if (maxRetrysNext > 0 && this.server == null) {
            o = o.retry(this.retryPolicy(maxRetrysNext, false));
        }

        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == maxRetrysNext + 1) {
                        e = new ClientException(ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), (Throwable)e);
                    } else if (maxRetrysSame > 0 && context.getAttemptCount() == maxRetrysSame + 1) {
                        e = new ClientException(ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), (Throwable)e);
                    }
                }

                if (LoadBalancerCommand.this.listenerInvoker != null) {
                    LoadBalancerCommand.this.listenerInvoker.onExecutionFailed((Throwable)e, context.toFinalExecutionInfo());
                }

                return Observable.error((Throwable)e);
            }
        });
    }

讲到这里,就是一次完整的路由过程了。我们大致回顾一下这个路由过程。
A.Zuul的转发是通过RibbonRoutingFilter这个Filter进行操作的。
B. 在转发之前,Zuul包装请求为RibbonCommand,并且RibbonCommand继承了HystrixCommand,并且持有RibbonLoadBalancingHttpClientFallbackProvider,正应为这样才使得Zuul具有了服务降级(Fallback),和负载均衡的功能,同时HystrixCommand是具备超时时间的(默认是1s)。而且Zuul默认采用的隔离级别是信号量模式。
C.在RibbonCommand的内部Zuul再次将请求包装成一个Observable,(有关RxJava的知识请参照其官方文档)。并且为Observable设置了重试次数,默认只对GET请求失败连接失败重试。

目录
相关文章
|
4月前
|
设计模式 Java 开发者
如何快速上手【Spring AOP】?从动态代理到源码剖析(下篇)
Spring AOP的实现本质上依赖于代理模式这一经典设计模式。代理模式通过引入代理对象作为目标对象的中间层,实现了对目标对象访问的控制与增强,其核心价值在于解耦核心业务逻辑与横切关注点。在框架设计中,这种模式广泛用于实现功能扩展(如远程调用、延迟加载)、行为拦截(如权限校验、异常处理)等场景,为系统提供了更高的灵活性和可维护性。
|
8月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
532 70
|
9月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
909 29
|
9月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
386 4
|
9月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
9月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
367 2
|
9月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
9月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。

推荐镜像

更多
  • DNS