SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: RibbonCommandContext在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

前言

经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》


回顾一下zuul的执行流程,Zuul的执行流程是这样的

  • 首先请求进来会先到达ZuulControllerZuulController把请求交给ZuulServlet去处理
  • ZuulServelt会调用 ZuulRunner 依次执行: init初始化,pre前置filter,route路由filter,post后置filter, 出现异常会执行 error 异常filter
  • ZuulRunner通过 FilterProcessor 去执行各种Filter,FilterProcessor通过 FilterLoader 加载 各种filters

今天的主角儿 RibbonRoutingFilter 是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,

RibbonRoutingFilter#run

我们直接看它的源码,见RibbonRoutingFilter#shouldFilter

@OverridepublicbooleanshouldFilter() {
RequestContextctx=RequestContext.getCurrentContext();
//上下文中必须有 serviceId return (ctx.getRouteHost() ==null&&ctx.get(SERVICE_ID_KEY) !=null&&ctx.sendZuulResponse());
}

这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run

publicObjectrun() {
RequestContextcontext=RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders();
try {
//构建一个Ribbon的命令上下文RibbonCommandContextcommandContext=buildCommandContext(context);
//执行Ribbon,发送请求ClientHttpResponseresponse=forward(commandContext);
//设置结果setResponse(response);
returnresponse;
    }
catch (ZuulExceptionex) {
thrownewZuulRuntimeException(ex);
    }
catch (Exceptionex) {
thrownewZuulRuntimeException(ex);
    }
}
//把响应结果设置到RequestContext上下文对象中protectedvoidsetResponse(ClientHttpResponseresp)
throwsClientException, IOException {
//把响应结果设置到RequestContext上下文对象中RequestContext.getCurrentContext().set("zuulResponse", resp);
this.helper.setResponse(resp.getRawStatusCode(),
resp.getBody() ==null?null : resp.getBody(), resp.getHeaders());
}

RibbonCommandContext

在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

先看一下buildCommandContext方法是如何构建RibbonCommandContext

protectedRibbonCommandContextbuildCommandContext(RequestContextcontext) {
HttpServletRequestrequest=context.getRequest();
//获取请求头MultiValueMap<String, String>headers=this.helper        .buildZuulRequestHeaders(request);
//获取参数MultiValueMap<String, String>params=this.helper        .buildZuulRequestQueryParams(request);
//获取请求的方式       Stringverb=getVerb(request);
//获取请求体中的内容InputStreamrequestEntity=getRequestBody(request);
if (request.getContentLength() <0&&!verb.equalsIgnoreCase("GET")) {
context.setChunkedRequestBody();
    }
//调用的服务IDStringserviceId= (String) context.get(SERVICE_ID_KEY);
//是否重试Booleanretryable= (Boolean) context.get(RETRYABLE_KEY);
ObjectloadBalancerKey=context.get(LOAD_BALANCER_KEY);
//请求的资源路径Stringuri=this.helper.buildZuulRequestURI(request);
// remove double slashesuri=uri.replace("//", "/");
//getContentLength 内容长度longcontentLength=useServlet31?request.getContentLengthLong(): request.getContentLength();
//创建一个 RibbonCommandContextreturnnewRibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
}

这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext ,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext 的源码

publicclassRibbonCommandContext {
privatefinalStringserviceId; //服务idprivatefinalStringmethod;    //请求方式  privatefinalStringuri;       //请求资源URIprivatefinalBooleanretryable;    //重试privatefinalMultiValueMap<String, String>headers; //请求头privatefinalMultiValueMap<String, String>params; //请求产妇privatefinalList<RibbonRequestCustomizer>requestCustomizers;
privateInputStreamrequestEntity;  //请求体内容privateLongcontentLength; //内容长度  privateObjectloadBalancerKey; //负载均衡器}

HttpClientRibbonCommand的创建

接着我们看一下 forward 方法是如何调用下游微服务的 RibbonRoutingFilter#forward

protectedClientHttpResponseforward(RibbonCommandContextcontext) throwsException {
Map<String, Object>info=this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
//使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand RibbonCommandcommand=this.ribbonCommandFactory.create(context);
try {
//执行请求ClientHttpResponseresponse=command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
returnresponse;
    }
catch (HystrixRuntimeExceptionex) {
returnhandleException(info, ex);
    }
}

这里 使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand ,然后执行 RibbonCommand.execute ,我们看一下 ribbonCommandFactory.create

@OverridepublicHttpClientRibbonCommandcreate(finalRibbonCommandContextcontext) {
//获取服务对应的降级FallbackProviderzuulFallbackProvider=getFallbackProvider(context.getServiceId());
finalStringserviceId=context.getServiceId();
//获取Ribbon的负载均衡Http客户端finalRibbonLoadBalancingHttpClientclient=this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
//获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
//创建一个 HttpClientRibbonCommand returnnewHttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}

HttpClientRibbonCommandFactory#create 方法主要是 把服务的降级FallbackProvider,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer 负载均衡器设置给 HttpClientRibbonCommand 对象并返回

HttpClientRibbonCommand 是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类

HttpClientRibbonCommand 的执行

代码回到 RibbonRoutingFilter#forward 方法,接下来分析 RibbonCommand.execute 具体执行流程

ClientHttpResponseresponse=command.execute();

进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute

publicRexecute() {
try {
returnqueue().get();
    } catch (Exceptione) {
throwExceptions.sneakyThrow(decomposeException(e));
    }
}

进入qeueu方法

publicFuture<R>queue() {
/** The Future returned by Observable.toBlocking().toFuture() does not implement the* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;* thus, to comply with the contract of Future, we must wrap around it.*/// 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求finalFuture<R>delegate=toObservable().toBlocking().toFuture();
finalFuture<R>f=newFuture<R>() {
@Overridepublicbooleancancel(booleanmayInterruptIfRunning) {
            ...省略...
returnres;
        }
@OverridepublicbooleanisCancelled() {
returndelegate.isCancelled();
        }
@OverridepublicbooleanisDone() {
returndelegate.isDone();
        }
@OverridepublicRget() throwsInterruptedException, ExecutionException {
returndelegate.get();
        }
@OverridepublicRget(longtimeout, TimeUnitunit) throwsInterruptedException, ExecutionException, TimeoutException {
returndelegate.get(timeout, unit);
        }
    };
/* special handling of error states that throw immediately */if (f.isDone()) {
try {
//获取结果f.get();
returnf;
        } catch (Exceptione) {
Throwablet=decomposeException(e);
if (tinstanceofHystrixBadRequestException) {
returnf;
            } elseif (tinstanceofHystrixRuntimeException) {
HystrixRuntimeExceptionhre= (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
caseCOMMAND_EXCEPTION:
caseTIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errorsreturnf;
default:
// these are errors we throw from queue() as they as rejection type errorsthrowhre;
                }
            } else {
throwExceptions.sneakyThrow(t);
            }
        }
    }
returnf;
}

这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture(); 这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run

@OverrideprotectedClientHttpResponserun() throwsException {
finalRequestContextcontext=RequestContext.getCurrentContext();
//创建请求,实现类是 RibbonApacheHttpRequestRQrequest=createRequest();
RSresponse;
//可重试的客户端 booleanretryableClient=this.clientinstanceofAbstractLoadBalancingClient&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
if (retryableClient) {
response=this.client.execute(request, config);
    } else {
//默认走这里使用负载均衡器执行response=this.client.executeWithLoadBalancer(request, config);
    }
//把结果设置到RequestContext 上下文context.set("ribbonResponse", response);
// Explicitly close the HttpResponse if the Hystrix command timed out to// release the underlying HTTP connection held by the response.//响应超时,关闭responseif (this.isResponseTimedOut()) {
if (response!=null) {
response.close();
        }
    }
//返回结果returnnewRibbonHttpResponse(response);
}

RibbonLoadBalancingHttpClient Http客户端

关键代码在 this.client.execute(request, config); 这里的 client是一个 RibbonLoadBalancingHttpClient ,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer

publicTexecuteWithLoadBalancer(finalSrequest, finalIClientConfigrequestConfig) throwsClientException {
LoadBalancerCommand<T>command=buildLoadBalancerCommand(request, requestConfig);
try {
//1.调用 LoadBalancerCommand的submit执行请求returncommand.submit(
newServerOperation<T>() {
@OverridepublicObservable<T>call(Serverserver) {
//使用服务器重构URIURIfinalUri=reconstructURIWithServer(server, request.getUri());
SrequestForServer= (S) request.replaceUri(finalUri);
try {
//2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求returnObservable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
catch (Exceptione) {
returnObservable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exceptione) {
Throwablet=e.getCause();
if (tinstanceofClientException) {
throw (ClientException) t;
        } else {
thrownewClientException(e);
        }
    }
}

方法中有两个重要的代码

  • command.submit : 调用 LoadBalancerCommand的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
  • AbstractLoadBalancerAwareClient.this.execute:执行具体的服务

LoadBalancerCommand.submit 提交请求

先看LoadBalancerCommand.submit

publicObservable<T>submit(finalServerOperation<T>operation) {
finalExecutionInfoContextcontext=newExecutionInfoContext();
if (listenerInvoker!=null) {
try {
listenerInvoker.onExecutionStart();
        } catch (AbortExecutionExceptione) {
returnObservable.error(e);
        }
    }
finalintmaxRetrysSame=retryHandler.getMaxRetriesOnSameServer();
finalintmaxRetrysNext=retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer//这里在做负载均衡 selectServerObservable<T>o=        (server==null?selectServer() : Observable.just(server))
        .concatMap(newFunc1<Server, Observable<T>>() {
@Override// Called for each server being selected//为每个选定的服务器调用publicObservable<T>call(Serverserver) {
                ...省略...
                }

很关键的一行代码 server == null ? selectServer() : Observable.just(server))它在使用负载均衡选择服务,跟一下 selectServer方法

privateObservable<Server>selectServer() {
returnObservable.create(newOnSubscribe<Server>() {
@Overridepublicvoidcall(Subscriber<?superServer>next) {
try {
//负载均衡Serverserver=loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
next.onNext(server);
next.onCompleted();
            } catch (Exceptione) {
next.onError(e);
            }
        }
    });
}

继续跟 loadBalancerContext.getServerFromLoadBalancer 方法

publicServergetServerFromLoadBalancer(@NullableURIoriginal, @NullableObjectloadBalancerKey) throwsClientException {
Stringhost=null;
intport=-1;
if (original!=null) {
host=original.getHost();
    }
if (original!=null) {
Pair<String, Integer>schemeAndPort=deriveSchemeAndPortFromPartialUri(original);        
port=schemeAndPort.second();
    }
// Various Supported Cases// The loadbalancer to use and the instances it has is based on how it was registered// In each of these cases, the client might come in using Full Url or Partial URLILoadBalancerlb=getLoadBalancer();
if (host==null) {
// Partial URI or no URI Case// well we have to just get the right instances from lb - or we fall backif (lb!=null){
//通过 ILoadBalancer的chooseServer方法选择服务Serversvc=lb.chooseServer(loadBalancerKey);
if (svc==null){
thrownewClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "+clientName);
            }
host=svc.getHost();
if (host==null){
thrownewClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :"+svc);
            }
logger.debug("{} using LB returned Server: {} for request {}", newObject[]{clientName, svc, original});
returnsvc;
            ...省略...
            }

这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。

RibbonLoadBalancingHttpClient Http客户端执行请求

选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute , 该方法在执行具体的服务了

@OverridepublicRibbonApacheHttpResponseexecute(RibbonApacheHttpRequestrequest,
finalIClientConfigconfigOverride) throwsException {
IClientConfigconfig=configOverride!=null?configOverride : this.config;
RibbonPropertiesribbon=RibbonProperties.from(config);
//封装请求配置,超时时间RequestConfigrequestConfig=RequestConfig.custom()
        .setConnectTimeout(ribbon.connectTimeout(this.connectTimeout))
        .setSocketTimeout(ribbon.readTimeout(this.readTimeout))
        .setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects))
        .build();
//创建请求对象request=getSecureRequest(request, configOverride);
finalHttpUriRequesthttpUriRequest=request.toRequest(requestConfig);
//委派给 InternalHttpClient 客户去执行finalHttpResponsehttpResponse=this.delegate.execute(httpUriRequest);
//封装结果返回returnnewRibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}

这里封装了请求配置,创建请求对象,然后交给apacheInternalHttpClient 去执行请求,最后把结果封装成RibbonApacheHttpResponse返回。

最后响应结果会在RibbonRoutingFilter中被设置到RequestContext 上下文对象中 ,会通过SendResponseFilter去处理把上下文对象中的响应结果写给客户端

到这里整理执行流程结束,我们来这里一下流程

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
2月前
|
负载均衡 算法 架构师
Ribbon负载均衡
上一节就已经实现的负载均衡笔者并未深入探讨,本节通过分析负载均衡算法、Ribbon实现负载均衡的底层原理和实现过程,让大家对负载均衡有了一个大体认识,同时针对Ribbon自定义负载均衡策略,饥饿加载让大家对于Ribbon的了解又多一些。Ribbon实现的负载均衡只是方案之一,我们可以尽量多了解但不要局限于此。
|
9天前
|
负载均衡 Java 对象存储
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
24 2
|
22天前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
6天前
|
负载均衡 Java 开发者
Ribbon框架实现客户端负载均衡的方法与技巧
Ribbon框架为微服务架构中的客户端负载均衡提供了强大的支持。通过简单的配置和集成,开发者可以轻松地在应用中实现服务的发现、选择和负载均衡。适当地使用Ribbon,配合其他Spring Cloud组件,可以有效提升微服务架构的可用性和性能。
8 0
|
2月前
|
存储 设计模式 缓存
OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现
该文章主要介绍了如何在OpenFeign中集成Ribbon以实现负载均衡,并详细分析了Ribbon中服务选择和服务过滤的核心实现过程。文章还涉及了Ribbon中负载均衡器(ILoadBalancer)和负载均衡策略(IRule)的初始化方式。
OpenFeign集成Ribbon负载均衡-过滤和选择服务核心实现
|
2月前
|
缓存 负载均衡 Java
OpenFeign最核心组件LoadBalancerFeignClient详解(集成Ribbon负载均衡能力)
文章标题为“OpenFeign的Ribbon负载均衡详解”,是继OpenFeign十大可扩展组件讨论之后,深入探讨了Ribbon如何为OpenFeign提供负载均衡能力的详解。
OpenFeign最核心组件LoadBalancerFeignClient详解(集成Ribbon负载均衡能力)
|
2月前
|
负载均衡 算法 Java
SpringCloud之Ribbon使用
通过 Ribbon,可以非常便捷的在微服务架构中实现请求负载均衡,提升系统的高可用性和伸缩性。在实际使用中,需要根据实际场景选择合适的负载均衡策略,并对其进行适当配置,以达到更佳的负载均衡效果。
35 13
|
3月前
|
负载均衡 算法 网络协议
Ribbon 负载均衡源码解读
Ribbon 负载均衡源码解读
52 15
Ribbon 负载均衡源码解读
|
3月前
|
负载均衡 Java API
Feign 进行rpc 调用时使用ribbon负载均衡源码解析
Feign 进行rpc 调用时使用ribbon负载均衡源码解析
64 11
|
4月前
|
负载均衡 算法 Java
Spring Cloud Netflix 之 Ribbon
Spring Cloud Netflix Ribbon是客户端负载均衡器,用于在微服务架构中分发请求。它与RestTemplate结合,自动在服务发现(如Eureka)注册的服务之间进行调用。配置包括在pom.xml中添加依赖,设置application.yml以连接Eureka服务器,并在配置类中创建@LoadBalanced的RestTemplate。通过这种方式,当调用如`/user/userInfoList`的接口时,Ribbon会自动处理到多个可用服务实例的负载均衡。
下一篇
无影云桌面