二十.SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter

简介: 经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》

前言

经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理

<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">

回顾一下zuul的执行流程,Zuul的执行流程是这样的

  • 首先请求进来会先到达ZuulControllerZuulController把请求交给ZuulServlet去处理
  • ZuulServelt会调用 ZuulRunner 依次执行: init初始化,pre前置filter,route路由filter,post后置filter, 出现异常会执行 error 异常filter
  • ZuulRunner通过 FilterProcessor 去执行各种Filter,FilterProcessor通过 FilterLoader 加载 各种filters

今天的主角儿 RibbonRoutingFilter 是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,

RibbonRoutingFilter#run

我们直接看它的源码,见RibbonRoutingFilter#shouldFilter

    @Override
    public boolean shouldFilter() {
   
   
        RequestContext ctx = RequestContext.getCurrentContext();
        //上下文中必须有 serviceId 
        return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
                && ctx.sendZuulResponse());
    }

这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run

public Object run() {
   
   
        RequestContext context = RequestContext.getCurrentContext();
        this.helper.addIgnoredHeaders();
        try {
   
   
            //构建一个Ribbon的命令上下文
            RibbonCommandContext commandContext = buildCommandContext(context);
            //执行Ribbon,发送请求
            ClientHttpResponse response = forward(commandContext);
            //设置结果
            setResponse(response);
            return response;
        }
        catch (ZuulException ex) {
   
   
            throw new ZuulRuntimeException(ex);
        }
        catch (Exception ex) {
   
   
            throw new ZuulRuntimeException(ex);
        }
    }
    //把响应结果设置到RequestContext上下文对象中
    protected void setResponse(ClientHttpResponse resp)
            throws ClientException, IOException {
   
   
            //把响应结果设置到RequestContext上下文对象中
        RequestContext.getCurrentContext().set("zuulResponse", resp);
        this.helper.setResponse(resp.getRawStatusCode(),
                resp.getBody() == null ? null : resp.getBody(), resp.getHeaders());
    }

RibbonCommandContext

在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

先看一下buildCommandContext方法是如何构建RibbonCommandContext

protected RibbonCommandContext buildCommandContext(RequestContext context) {
   
   
        HttpServletRequest request = context.getRequest();
        //获取请求头
        MultiValueMap<String, String> headers = this.helper
                .buildZuulRequestHeaders(request);
        //获取参数
        MultiValueMap<String, String> params = this.helper
                .buildZuulRequestQueryParams(request);
        //获取请求的方式        
        String verb = getVerb(request);
        //获取请求体中的内容
        InputStream requestEntity = getRequestBody(request);
        if (request.getContentLength() < 0 && !verb.equalsIgnoreCase("GET")) {
   
   
            context.setChunkedRequestBody();
        }
        //调用的服务ID
        String serviceId = (String) context.get(SERVICE_ID_KEY);
        //是否重试
        Boolean retryable = (Boolean) context.get(RETRYABLE_KEY);
        Object loadBalancerKey = context.get(LOAD_BALANCER_KEY);
        //请求的资源路径
        String uri = this.helper.buildZuulRequestURI(request);

        // remove double slashes
        uri = uri.replace("//", "/");
        //getContentLength 内容长度
        long contentLength = useServlet31 ? request.getContentLengthLong(): request.getContentLength();
        //创建一个 RibbonCommandContext
        return new RibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
                requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
    }

这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext ,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext 的源码

public class RibbonCommandContext {
   
   
    private final String serviceId;    //服务id
    private final String method;    //请求方式    
    private final String uri;        //请求资源URI
    private final Boolean retryable;    //重试
    private final MultiValueMap<String, String> headers; //请求头
    private final MultiValueMap<String, String> params;    //请求产妇
    private final List<RibbonRequestCustomizer> requestCustomizers;
    private InputStream requestEntity;    //请求体内容
    private Long contentLength;    //内容长度    
    private Object loadBalancerKey;    //负载均衡器

HttpClientRibbonCommand的创建

接着我们看一下 forward 方法是如何调用下游微服务的 RibbonRoutingFilter#forward

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
   
   
        Map<String, Object> info = this.helper.debug(context.getMethod(),
                context.getUri(), context.getHeaders(), context.getParams(),
                context.getRequestEntity());
        //使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand 
        RibbonCommand command = this.ribbonCommandFactory.create(context);
        try {
   
   
            //执行请求
            ClientHttpResponse response = command.execute();
            this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
            return response;
        }
        catch (HystrixRuntimeException ex) {
   
   
            return handleException(info, ex);
        }

    }

这里 使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand ,然后执行 RibbonCommand.execute ,我们看一下 ribbonCommandFactory.create


    @Override
    public HttpClientRibbonCommand create(final RibbonCommandContext context) {
   
   
        //获取服务对应的降级
        FallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
        final String serviceId = context.getServiceId();
        //获取Ribbon的负载均衡Http客户端
        final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
                serviceId, RibbonLoadBalancingHttpClient.class);

        //获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient 
        client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
        //创建一个 HttpClientRibbonCommand 
        return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
                clientFactory.getClientConfig(serviceId));
    }

HttpClientRibbonCommandFactory#create 方法主要是 把服务的降级FallbackProvider,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer 负载均衡器设置给 HttpClientRibbonCommand 对象并返回

HttpClientRibbonCommand 是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类
在这里插入图片描述

HttpClientRibbonCommand 的执行

代码回到 RibbonRoutingFilter#forward 方法,接下来分析 RibbonCommand.execute 具体执行流程

ClientHttpResponse response = command.execute();

进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute

   public R execute() {
   
   
        try {
   
   
            return queue().get();
        } catch (Exception e) {
   
   
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

进入qeueu方法

public Future<R> queue() {
   
   
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        // 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求
        final Future<R> delegate = toObservable().toBlocking().toFuture();

        final Future<R> f = new Future<R>() {
   
   

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
   
   
                  ...省略...
                return res;
            }

            @Override
            public boolean isCancelled() {
   
   
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
   
   
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
   
   
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
   
   
                return delegate.get(timeout, unit);
            }

        };

        /* special handling of error states that throw immediately */
        if (f.isDone()) {
   
   
            try {
   
   
                //获取结果
                f.get();
                return f;
            } catch (Exception e) {
   
   
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
   
   
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
   
   
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
   
   
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                    }
                } else {
   
   
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }

这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture(); 这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run

@Override
    protected ClientHttpResponse run() throws Exception {
   
   
        final RequestContext context = RequestContext.getCurrentContext();
        //创建请求,实现类是 RibbonApacheHttpRequest
        RQ request = createRequest();
        RS response;
        //可重试的客户端 
        boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
                && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);

        if (retryableClient) {
   
   
            response = this.client.execute(request, config);
        } else {
   
   
            //默认走这里使用负载均衡器执行
            response = this.client.executeWithLoadBalancer(request, config);
        }
        //把结果设置到RequestContext 上下文
        context.set("ribbonResponse", response);

        // Explicitly close the HttpResponse if the Hystrix command timed out to
        // release the underlying HTTP connection held by the response.
        //响应超时,关闭response
        if (this.isResponseTimedOut()) {
   
   
            if (response != null) {
   
   
                response.close();
            }
        }
        //返回结果
        return new RibbonHttpResponse(response);
    }

RibbonLoadBalancingHttpClient Http客户端

关键代码在 this.client.execute(request, config); 这里的 client是一个 RibbonLoadBalancingHttpClient ,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer

 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
   
   
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
   
   
            //1.调用 LoadBalancerCommand的submit执行请求
            return command.submit(
                new ServerOperation<T>() {
   
   
                    @Override
                    public Observable<T> call(Server server) {
   
   
                        //使用服务器重构URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
   
   
                        //2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
   
   
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
   
   
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
   
   
                throw (ClientException) t;
            } else {
   
   
                throw new ClientException(e);
            }
        }

    }

方法中有两个重要的代码

  • command.submit : 调用 LoadBalancerCommand的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
  • AbstractLoadBalancerAwareClient.this.execute:执行具体的服务

LoadBalancerCommand.submit 提交请求

先看LoadBalancerCommand.submit

 public Observable<T> submit(final ServerOperation<T> operation) {
   
   
        final ExecutionInfoContext context = new ExecutionInfoContext();

        if (listenerInvoker != null) {
   
   
            try {
   
   
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
   
   
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        //这里在做负载均衡 selectServer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
   
   
                    @Override
                    // Called for each server being selected
                    //为每个选定的服务器调用
                    public Observable<T> call(Server server) {
   
   
   ...省略...

很关键的一行代码 server == null ? selectServer() : Observable.just(server))它在使用负载均衡选择服务,跟一下 selectServer方法

    private Observable<Server> selectServer() {
   
   
        return Observable.create(new OnSubscribe<Server>() {
   
   
            @Override
            public void call(Subscriber<? super Server> next) {
   
   
                try {
   
   
                //负载均衡
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
   
   
                    next.onError(e);
                }
            }
        });
    }

继续跟 loadBalancerContext.getServerFromLoadBalancer 方法

 public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
   
   
        String host = null;
        int port = -1;
        if (original != null) {
   
   
            host = original.getHost();
        }
        if (original != null) {
   
   
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
   
   
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
   
   
                //通过 ILoadBalancer的chooseServer方法选择服务
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
   
   
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
   
   
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{
   
   clientName, svc, original});
                return svc;
                ...省略...

这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。

RibbonLoadBalancingHttpClient Http客户端执行请求

选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute , 该方法在执行具体的服务了

    @Override
    public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,
                                            final IClientConfig configOverride) throws Exception {
   
   
        IClientConfig config = configOverride != null ? configOverride : this.config;
        RibbonProperties ribbon = RibbonProperties.from(config);
        //封装请求配置,超时时间
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(ribbon.connectTimeout(this.connectTimeout))
                .setSocketTimeout(ribbon.readTimeout(this.readTimeout))
                .setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects))
                .build();
        //创建请求对象
        request = getSecureRequest(request, configOverride);
        final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
        //委派给 InternalHttpClient 客户去执行
        final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
        //封装结果返回
        return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
    }

这里封装了请求配置,创建请求对象,然后交给apacheInternalHttpClient 去执行请求,最后把结果封装成RibbonApacheHttpResponse返回。

最后响应结果会在RibbonRoutingFilter中被设置到RequestContext 上下文对象中 ,会通过SendResponseFilter去处理把上下文对象中的响应结果写给客户端

到这里整理执行流程结束,我们来这里一下流程
在这里插入图片描述

相关实践学习
部署高可用架构
本场景主要介绍如何使用云服务器ECS、负载均衡SLB、云数据库RDS和数据传输服务产品来部署多可用区高可用架构。
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
19天前
|
Java 应用服务中间件 Nacos
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
31 0
|
5天前
|
负载均衡 算法 Java
Ribbon自定义负载均衡算法
Ribbon自定义负载均衡算法
14 1
|
12天前
|
负载均衡
Ribbon负载均衡策略
Ribbon负载均衡策略
|
18天前
|
负载均衡 算法
SpringCloud&Ribbon负载均衡原理与实践
SpringCloud&Ribbon负载均衡原理与实践
20 3
|
23天前
|
Java Maven Nacos
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
31 0
|
24天前
|
人工智能 监控 安全
Java+Spring Cloud +Vue+UniApp微服务智慧工地云平台源码
视频监控系统、人员实名制与分账制管理系统、车辆管理系统、环境监测系统、大型设备监测(龙门吊、塔吊、升降机、卸料平台等)、用电监测系统、基坑监测系统、AI算法分析(安全帽佩戴、火焰识别、周界报警、人员聚众报警、升降机超载报警)、安全培训、设备监测。
28 4
|
26天前
|
人工智能 监控 安全
Spring Cloud+Uniapp 智慧工地云平台源码 智慧工地云平台AI视频分析应用
AI视频分析包括行为分析,即人员安全帽佩戴检测、反光衣穿戴检测、人员出入检测、区域入侵监测,以及烟火监测、人数统计、人脸识别、车辆识别、人体测温等。
17 0
|
28天前
|
人工智能 监控 安全
Springcloud数字化物联网智慧工地综合平台源码 劳务管理、设备管理、绿色施工
Springcloud数字化物联网智慧工地综合平台源码 劳务管理、设备管理、绿色施工
43 3
|
1月前
|
负载均衡 算法 Java
Ribbon的负载均衡策略
Ribbon的负载均衡策略
38 2
|
1月前
|
传感器 数据采集 监控
基于Springcloud可视化项目:智慧工地可视化大数据云平台源码
终端层,充分利用物联网技术和移动应用提高现场管控能力。通过传感器、摄像头等终端设备,实现对项目建设过程的实时监控、智能感知、数据采集和高效协同,提高作业现场的管理能力。
31 5