二十.SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》

前言

经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理

<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">

回顾一下zuul的执行流程,Zuul的执行流程是这样的

  • 首先请求进来会先到达ZuulControllerZuulController把请求交给ZuulServlet去处理
  • ZuulServelt会调用 ZuulRunner 依次执行: init初始化,pre前置filter,route路由filter,post后置filter, 出现异常会执行 error 异常filter
  • ZuulRunner通过 FilterProcessor 去执行各种Filter,FilterProcessor通过 FilterLoader 加载 各种filters

今天的主角儿 RibbonRoutingFilter 是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,

RibbonRoutingFilter#run

我们直接看它的源码,见RibbonRoutingFilter#shouldFilter

    @Override
    public boolean shouldFilter() {
   
   
        RequestContext ctx = RequestContext.getCurrentContext();
        //上下文中必须有 serviceId 
        return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
                && ctx.sendZuulResponse());
    }

这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run

public Object run() {
   
   
        RequestContext context = RequestContext.getCurrentContext();
        this.helper.addIgnoredHeaders();
        try {
   
   
            //构建一个Ribbon的命令上下文
            RibbonCommandContext commandContext = buildCommandContext(context);
            //执行Ribbon,发送请求
            ClientHttpResponse response = forward(commandContext);
            //设置结果
            setResponse(response);
            return response;
        }
        catch (ZuulException ex) {
   
   
            throw new ZuulRuntimeException(ex);
        }
        catch (Exception ex) {
   
   
            throw new ZuulRuntimeException(ex);
        }
    }
    //把响应结果设置到RequestContext上下文对象中
    protected void setResponse(ClientHttpResponse resp)
            throws ClientException, IOException {
   
   
            //把响应结果设置到RequestContext上下文对象中
        RequestContext.getCurrentContext().set("zuulResponse", resp);
        this.helper.setResponse(resp.getRawStatusCode(),
                resp.getBody() == null ? null : resp.getBody(), resp.getHeaders());
    }

RibbonCommandContext

在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

先看一下buildCommandContext方法是如何构建RibbonCommandContext

protected RibbonCommandContext buildCommandContext(RequestContext context) {
   
   
        HttpServletRequest request = context.getRequest();
        //获取请求头
        MultiValueMap<String, String> headers = this.helper
                .buildZuulRequestHeaders(request);
        //获取参数
        MultiValueMap<String, String> params = this.helper
                .buildZuulRequestQueryParams(request);
        //获取请求的方式        
        String verb = getVerb(request);
        //获取请求体中的内容
        InputStream requestEntity = getRequestBody(request);
        if (request.getContentLength() < 0 && !verb.equalsIgnoreCase("GET")) {
   
   
            context.setChunkedRequestBody();
        }
        //调用的服务ID
        String serviceId = (String) context.get(SERVICE_ID_KEY);
        //是否重试
        Boolean retryable = (Boolean) context.get(RETRYABLE_KEY);
        Object loadBalancerKey = context.get(LOAD_BALANCER_KEY);
        //请求的资源路径
        String uri = this.helper.buildZuulRequestURI(request);

        // remove double slashes
        uri = uri.replace("//", "/");
        //getContentLength 内容长度
        long contentLength = useServlet31 ? request.getContentLengthLong(): request.getContentLength();
        //创建一个 RibbonCommandContext
        return new RibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
                requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
    }

这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext ,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext 的源码

public class RibbonCommandContext {
   
   
    private final String serviceId;    //服务id
    private final String method;    //请求方式    
    private final String uri;        //请求资源URI
    private final Boolean retryable;    //重试
    private final MultiValueMap<String, String> headers; //请求头
    private final MultiValueMap<String, String> params;    //请求产妇
    private final List<RibbonRequestCustomizer> requestCustomizers;
    private InputStream requestEntity;    //请求体内容
    private Long contentLength;    //内容长度    
    private Object loadBalancerKey;    //负载均衡器

HttpClientRibbonCommand的创建

接着我们看一下 forward 方法是如何调用下游微服务的 RibbonRoutingFilter#forward

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
   
   
        Map<String, Object> info = this.helper.debug(context.getMethod(),
                context.getUri(), context.getHeaders(), context.getParams(),
                context.getRequestEntity());
        //使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand 
        RibbonCommand command = this.ribbonCommandFactory.create(context);
        try {
   
   
            //执行请求
            ClientHttpResponse response = command.execute();
            this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
            return response;
        }
        catch (HystrixRuntimeException ex) {
   
   
            return handleException(info, ex);
        }

    }

这里 使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand ,然后执行 RibbonCommand.execute ,我们看一下 ribbonCommandFactory.create


    @Override
    public HttpClientRibbonCommand create(final RibbonCommandContext context) {
   
   
        //获取服务对应的降级
        FallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
        final String serviceId = context.getServiceId();
        //获取Ribbon的负载均衡Http客户端
        final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
                serviceId, RibbonLoadBalancingHttpClient.class);

        //获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient 
        client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
        //创建一个 HttpClientRibbonCommand 
        return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
                clientFactory.getClientConfig(serviceId));
    }

HttpClientRibbonCommandFactory#create 方法主要是 把服务的降级FallbackProvider,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer 负载均衡器设置给 HttpClientRibbonCommand 对象并返回

HttpClientRibbonCommand 是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类
在这里插入图片描述

HttpClientRibbonCommand 的执行

代码回到 RibbonRoutingFilter#forward 方法,接下来分析 RibbonCommand.execute 具体执行流程

ClientHttpResponse response = command.execute();

进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute

   public R execute() {
   
   
        try {
   
   
            return queue().get();
        } catch (Exception e) {
   
   
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

进入qeueu方法

public Future<R> queue() {
   
   
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        // 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求
        final Future<R> delegate = toObservable().toBlocking().toFuture();

        final Future<R> f = new Future<R>() {
   
   

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
   
   
                  ...省略...
                return res;
            }

            @Override
            public boolean isCancelled() {
   
   
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
   
   
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
   
   
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
   
   
                return delegate.get(timeout, unit);
            }

        };

        /* special handling of error states that throw immediately */
        if (f.isDone()) {
   
   
            try {
   
   
                //获取结果
                f.get();
                return f;
            } catch (Exception e) {
   
   
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
   
   
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
   
   
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
   
   
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                    }
                } else {
   
   
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }

这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture(); 这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run

@Override
    protected ClientHttpResponse run() throws Exception {
   
   
        final RequestContext context = RequestContext.getCurrentContext();
        //创建请求,实现类是 RibbonApacheHttpRequest
        RQ request = createRequest();
        RS response;
        //可重试的客户端 
        boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
                && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);

        if (retryableClient) {
   
   
            response = this.client.execute(request, config);
        } else {
   
   
            //默认走这里使用负载均衡器执行
            response = this.client.executeWithLoadBalancer(request, config);
        }
        //把结果设置到RequestContext 上下文
        context.set("ribbonResponse", response);

        // Explicitly close the HttpResponse if the Hystrix command timed out to
        // release the underlying HTTP connection held by the response.
        //响应超时,关闭response
        if (this.isResponseTimedOut()) {
   
   
            if (response != null) {
   
   
                response.close();
            }
        }
        //返回结果
        return new RibbonHttpResponse(response);
    }

RibbonLoadBalancingHttpClient Http客户端

关键代码在 this.client.execute(request, config); 这里的 client是一个 RibbonLoadBalancingHttpClient ,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer

 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
   
   
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
   
   
            //1.调用 LoadBalancerCommand的submit执行请求
            return command.submit(
                new ServerOperation<T>() {
   
   
                    @Override
                    public Observable<T> call(Server server) {
   
   
                        //使用服务器重构URI
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
   
   
                        //2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
   
   
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
   
   
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
   
   
                throw (ClientException) t;
            } else {
   
   
                throw new ClientException(e);
            }
        }

    }

方法中有两个重要的代码

  • command.submit : 调用 LoadBalancerCommand的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
  • AbstractLoadBalancerAwareClient.this.execute:执行具体的服务

LoadBalancerCommand.submit 提交请求

先看LoadBalancerCommand.submit

 public Observable<T> submit(final ServerOperation<T> operation) {
   
   
        final ExecutionInfoContext context = new ExecutionInfoContext();

        if (listenerInvoker != null) {
   
   
            try {
   
   
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
   
   
                return Observable.error(e);
            }
        }

        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        // Use the load balancer
        //这里在做负载均衡 selectServer
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
   
   
                    @Override
                    // Called for each server being selected
                    //为每个选定的服务器调用
                    public Observable<T> call(Server server) {
   
   
   ...省略...

很关键的一行代码 server == null ? selectServer() : Observable.just(server))它在使用负载均衡选择服务,跟一下 selectServer方法

    private Observable<Server> selectServer() {
   
   
        return Observable.create(new OnSubscribe<Server>() {
   
   
            @Override
            public void call(Subscriber<? super Server> next) {
   
   
                try {
   
   
                //负载均衡
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
   
   
                    next.onError(e);
                }
            }
        });
    }

继续跟 loadBalancerContext.getServerFromLoadBalancer 方法

 public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
   
   
        String host = null;
        int port = -1;
        if (original != null) {
   
   
            host = original.getHost();
        }
        if (original != null) {
   
   
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }

        // Various Supported Cases
        // The loadbalancer to use and the instances it has is based on how it was registered
        // In each of these cases, the client might come in using Full Url or Partial URL
        ILoadBalancer lb = getLoadBalancer();
        if (host == null) {
   
   
            // Partial URI or no URI Case
            // well we have to just get the right instances from lb - or we fall back
            if (lb != null){
   
   
                //通过 ILoadBalancer的chooseServer方法选择服务
                Server svc = lb.chooseServer(loadBalancerKey);
                if (svc == null){
   
   
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Load balancer does not have available server for client: "
                                    + clientName);
                }
                host = svc.getHost();
                if (host == null){
   
   
                    throw new ClientException(ClientException.ErrorType.GENERAL,
                            "Invalid Server for :" + svc);
                }
                logger.debug("{} using LB returned Server: {} for request {}", new Object[]{
   
   clientName, svc, original});
                return svc;
                ...省略...

这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。

RibbonLoadBalancingHttpClient Http客户端执行请求

选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute , 该方法在执行具体的服务了

    @Override
    public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,
                                            final IClientConfig configOverride) throws Exception {
   
   
        IClientConfig config = configOverride != null ? configOverride : this.config;
        RibbonProperties ribbon = RibbonProperties.from(config);
        //封装请求配置,超时时间
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(ribbon.connectTimeout(this.connectTimeout))
                .setSocketTimeout(ribbon.readTimeout(this.readTimeout))
                .setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects))
                .build();
        //创建请求对象
        request = getSecureRequest(request, configOverride);
        final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
        //委派给 InternalHttpClient 客户去执行
        final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
        //封装结果返回
        return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
    }

这里封装了请求配置,创建请求对象,然后交给apacheInternalHttpClient 去执行请求,最后把结果封装成RibbonApacheHttpResponse返回。

最后响应结果会在RibbonRoutingFilter中被设置到RequestContext 上下文对象中 ,会通过SendResponseFilter去处理把上下文对象中的响应结果写给客户端

到这里整理执行流程结束,我们来这里一下流程
在这里插入图片描述

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
2月前
|
负载均衡 算法 Java
Spring Cloud全解析:负载均衡算法
本文介绍了负载均衡的两种方式:集中式负载均衡和进程内负载均衡,以及常见的负载均衡算法,包括轮询、随机、源地址哈希、加权轮询、加权随机和最小连接数等方法,帮助读者更好地理解和应用负载均衡技术。
|
12天前
|
负载均衡 Java Nacos
Ribbon负载均衡
Ribbon负载均衡
21 1
Ribbon负载均衡
|
3月前
|
负载均衡 算法 架构师
Ribbon负载均衡
上一节就已经实现的负载均衡笔者并未深入探讨,本节通过分析负载均衡算法、Ribbon实现负载均衡的底层原理和实现过程,让大家对负载均衡有了一个大体认识,同时针对Ribbon自定义负载均衡策略,饥饿加载让大家对于Ribbon的了解又多一些。Ribbon实现的负载均衡只是方案之一,我们可以尽量多了解但不要局限于此。
|
10天前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
26 5
|
2月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
410 37
|
2月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
102 5
|
2月前
|
负载均衡 Java 对象存储
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
49 2
|
3月前
|
人工智能 前端开发 Java
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
本文介绍了如何使用 **Spring Cloud Alibaba AI** 构建基于 Spring Boot 和 uni-app 的聊天机器人应用。主要内容包括:Spring Cloud Alibaba AI 的概念与功能,使用前的准备工作(如 JDK 17+、Spring Boot 3.0+ 及通义 API-KEY),详细实操步骤(涵盖前后端开发工具、组件选择、功能分析及关键代码示例)。最终展示了如何成功实现具备基本聊天功能的 AI 应用,帮助读者快速搭建智能聊天系统并探索更多高级功能。
1342 2
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
|
2月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
3月前
|
存储 设计模式 缓存
OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现
该文章主要介绍了如何在OpenFeign中集成Ribbon以实现负载均衡,并详细分析了Ribbon中服务选择和服务过滤的核心实现过程。文章还涉及了Ribbon中负载均衡器(ILoadBalancer)和负载均衡策略(IRule)的初始化方式。
OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现