前言
经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》
回顾一下zuul的执行流程,Zuul的执行流程是这样的
- 首先请求进来会先到达
ZuulController
,ZuulController
把请求交给ZuulServlet
去处理 - 在
ZuulServelt
会调用ZuulRunner
依次执行: init初始化,pre
前置filter,route
路由filter,post
后置filter, 出现异常会执行error
异常filter ZuulRunner
通过FilterProcessor
去执行各种Filter,FilterProcessor通过FilterLoader
加载 各种filters
今天的主角儿 RibbonRoutingFilter
是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,
RibbonRoutingFilter#run
我们直接看它的源码,见RibbonRoutingFilter#shouldFilter
publicbooleanshouldFilter() { RequestContextctx=RequestContext.getCurrentContext(); //上下文中必须有 serviceId return (ctx.getRouteHost() ==null&&ctx.get(SERVICE_ID_KEY) !=null&&ctx.sendZuulResponse()); }
这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run
publicObjectrun() { RequestContextcontext=RequestContext.getCurrentContext(); this.helper.addIgnoredHeaders(); try { //构建一个Ribbon的命令上下文RibbonCommandContextcommandContext=buildCommandContext(context); //执行Ribbon,发送请求ClientHttpResponseresponse=forward(commandContext); //设置结果setResponse(response); returnresponse; } catch (ZuulExceptionex) { thrownewZuulRuntimeException(ex); } catch (Exceptionex) { thrownewZuulRuntimeException(ex); } } //把响应结果设置到RequestContext上下文对象中protectedvoidsetResponse(ClientHttpResponseresp) throwsClientException, IOException { //把响应结果设置到RequestContext上下文对象中RequestContext.getCurrentContext().set("zuulResponse", resp); this.helper.setResponse(resp.getRawStatusCode(), resp.getBody() ==null?null : resp.getBody(), resp.getHeaders()); }
RibbonCommandContext
在run方法中构建了一个 RibbonCommandContext
Ribbon的上下文对象,然后调用 forward
方法转发请求 ,通过 setResponse
方法设置结果
先看一下buildCommandContext
方法是如何构建RibbonCommandContext
的
protectedRibbonCommandContextbuildCommandContext(RequestContextcontext) { HttpServletRequestrequest=context.getRequest(); //获取请求头MultiValueMap<String, String>headers=this.helper .buildZuulRequestHeaders(request); //获取参数MultiValueMap<String, String>params=this.helper .buildZuulRequestQueryParams(request); //获取请求的方式 Stringverb=getVerb(request); //获取请求体中的内容InputStreamrequestEntity=getRequestBody(request); if (request.getContentLength() <0&&!verb.equalsIgnoreCase("GET")) { context.setChunkedRequestBody(); } //调用的服务IDStringserviceId= (String) context.get(SERVICE_ID_KEY); //是否重试Booleanretryable= (Boolean) context.get(RETRYABLE_KEY); ObjectloadBalancerKey=context.get(LOAD_BALANCER_KEY); //请求的资源路径Stringuri=this.helper.buildZuulRequestURI(request); // remove double slashesuri=uri.replace("//", "/"); //getContentLength 内容长度longcontentLength=useServlet31?request.getContentLengthLong(): request.getContentLength(); //创建一个 RibbonCommandContextreturnnewRibbonCommandContext(serviceId, verb, uri, retryable, headers, params, requestEntity, this.requestCustomizers, contentLength, loadBalancerKey); }
这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext
,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext
的源码
publicclassRibbonCommandContext { privatefinalStringserviceId; //服务idprivatefinalStringmethod; //请求方式 privatefinalStringuri; //请求资源URIprivatefinalBooleanretryable; //重试privatefinalMultiValueMap<String, String>headers; //请求头privatefinalMultiValueMap<String, String>params; //请求产妇privatefinalList<RibbonRequestCustomizer>requestCustomizers; privateInputStreamrequestEntity; //请求体内容privateLongcontentLength; //内容长度 privateObjectloadBalancerKey; //负载均衡器}
HttpClientRibbonCommand
的创建
接着我们看一下 forward
方法是如何调用下游微服务的 RibbonRoutingFilter#forward
protectedClientHttpResponseforward(RibbonCommandContextcontext) throwsException { Map<String, Object>info=this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity()); //使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand RibbonCommandcommand=this.ribbonCommandFactory.create(context); try { //执行请求ClientHttpResponseresponse=command.execute(); this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders()); returnresponse; } catch (HystrixRuntimeExceptionex) { returnhandleException(info, ex); } }
这里 使用 ribbonCommandFactory
工厂根据context上下文 创建 RibbonCommand
,然后执行 RibbonCommand.execute
,我们看一下 ribbonCommandFactory.create
publicHttpClientRibbonCommandcreate(finalRibbonCommandContextcontext) { //获取服务对应的降级FallbackProviderzuulFallbackProvider=getFallbackProvider(context.getServiceId()); finalStringserviceId=context.getServiceId(); //获取Ribbon的负载均衡Http客户端finalRibbonLoadBalancingHttpClientclient=this.clientFactory.getClient( serviceId, RibbonLoadBalancingHttpClient.class); //获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); //创建一个 HttpClientRibbonCommand returnnewHttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, clientFactory.getClientConfig(serviceId)); }
HttpClientRibbonCommandFactory#create
方法主要是 把服务的降级FallbackProvider
,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer
负载均衡器设置给 HttpClientRibbonCommand
对象并返回
HttpClientRibbonCommand
是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类
HttpClientRibbonCommand
的执行
代码回到 RibbonRoutingFilter#forward
方法,接下来分析 RibbonCommand.execute
具体执行流程
ClientHttpResponseresponse=command.execute();
进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute
publicRexecute() { try { returnqueue().get(); } catch (Exceptione) { throwExceptions.sneakyThrow(decomposeException(e)); } }
进入qeueu方法
publicFuture<R>queue() { /** The Future returned by Observable.toBlocking().toFuture() does not implement the* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;* thus, to comply with the contract of Future, we must wrap around it.*/// 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求finalFuture<R>delegate=toObservable().toBlocking().toFuture(); finalFuture<R>f=newFuture<R>() { publicbooleancancel(booleanmayInterruptIfRunning) { ...省略... returnres; } publicbooleanisCancelled() { returndelegate.isCancelled(); } publicbooleanisDone() { returndelegate.isDone(); } publicRget() throwsInterruptedException, ExecutionException { returndelegate.get(); } publicRget(longtimeout, TimeUnitunit) throwsInterruptedException, ExecutionException, TimeoutException { returndelegate.get(timeout, unit); } }; /* special handling of error states that throw immediately */if (f.isDone()) { try { //获取结果f.get(); returnf; } catch (Exceptione) { Throwablet=decomposeException(e); if (tinstanceofHystrixBadRequestException) { returnf; } elseif (tinstanceofHystrixRuntimeException) { HystrixRuntimeExceptionhre= (HystrixRuntimeException) t; switch (hre.getFailureType()) { caseCOMMAND_EXCEPTION: caseTIMEOUT: // we don't throw these types from queue() only from queue().get() as they are execution errorsreturnf; default: // these are errors we throw from queue() as they as rejection type errorsthrowhre; } } else { throwExceptions.sneakyThrow(t); } } } returnf; }
这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture();
这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run
中
protectedClientHttpResponserun() throwsException { finalRequestContextcontext=RequestContext.getCurrentContext(); //创建请求,实现类是 RibbonApacheHttpRequestRQrequest=createRequest(); RSresponse; //可重试的客户端 booleanretryableClient=this.clientinstanceofAbstractLoadBalancingClient&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request); if (retryableClient) { response=this.client.execute(request, config); } else { //默认走这里使用负载均衡器执行response=this.client.executeWithLoadBalancer(request, config); } //把结果设置到RequestContext 上下文context.set("ribbonResponse", response); // Explicitly close the HttpResponse if the Hystrix command timed out to// release the underlying HTTP connection held by the response.//响应超时,关闭responseif (this.isResponseTimedOut()) { if (response!=null) { response.close(); } } //返回结果returnnewRibbonHttpResponse(response); }
RibbonLoadBalancingHttpClient
Http客户端
关键代码在 this.client.execute(request, config);
这里的 client是一个 RibbonLoadBalancingHttpClient
,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer
publicTexecuteWithLoadBalancer(finalSrequest, finalIClientConfigrequestConfig) throwsClientException { LoadBalancerCommand<T>command=buildLoadBalancerCommand(request, requestConfig); try { //1.调用 LoadBalancerCommand的submit执行请求returncommand.submit( newServerOperation<T>() { publicObservable<T>call(Serverserver) { //使用服务器重构URIURIfinalUri=reconstructURIWithServer(server, request.getUri()); SrequestForServer= (S) request.replaceUri(finalUri); try { //2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求returnObservable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exceptione) { returnObservable.error(e); } } }) .toBlocking() .single(); } catch (Exceptione) { Throwablet=e.getCause(); if (tinstanceofClientException) { throw (ClientException) t; } else { thrownewClientException(e); } } }
方法中有两个重要的代码
- command.submit : 调用
LoadBalancerCommand
的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
- AbstractLoadBalancerAwareClient.this.execute:执行具体的服务
LoadBalancerCommand.submit
提交请求
先看LoadBalancerCommand.submit
publicObservable<T>submit(finalServerOperation<T>operation) { finalExecutionInfoContextcontext=newExecutionInfoContext(); if (listenerInvoker!=null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionExceptione) { returnObservable.error(e); } } finalintmaxRetrysSame=retryHandler.getMaxRetriesOnSameServer(); finalintmaxRetrysNext=retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer//这里在做负载均衡 selectServerObservable<T>o= (server==null?selectServer() : Observable.just(server)) .concatMap(newFunc1<Server, Observable<T>>() { // Called for each server being selected//为每个选定的服务器调用publicObservable<T>call(Serverserver) { ...省略... }
很关键的一行代码 server == null ? selectServer() : Observable.just(server))
它在使用负载均衡选择服务,跟一下 selectServer方法
privateObservable<Server>selectServer() { returnObservable.create(newOnSubscribe<Server>() { publicvoidcall(Subscriber<?superServer>next) { try { //负载均衡Serverserver=loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exceptione) { next.onError(e); } } }); }
继续跟 loadBalancerContext.getServerFromLoadBalancer
方法
publicServergetServerFromLoadBalancer(URIoriginal, ObjectloadBalancerKey) throwsClientException { Stringhost=null; intport=-1; if (original!=null) { host=original.getHost(); } if (original!=null) { Pair<String, Integer>schemeAndPort=deriveSchemeAndPortFromPartialUri(original); port=schemeAndPort.second(); } // Various Supported Cases// The loadbalancer to use and the instances it has is based on how it was registered// In each of these cases, the client might come in using Full Url or Partial URLILoadBalancerlb=getLoadBalancer(); if (host==null) { // Partial URI or no URI Case// well we have to just get the right instances from lb - or we fall backif (lb!=null){ //通过 ILoadBalancer的chooseServer方法选择服务Serversvc=lb.chooseServer(loadBalancerKey); if (svc==null){ thrownewClientException(ClientException.ErrorType.GENERAL, "Load balancer does not have available server for client: "+clientName); } host=svc.getHost(); if (host==null){ thrownewClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :"+svc); } logger.debug("{} using LB returned Server: {} for request {}", newObject[]{clientName, svc, original}); returnsvc; ...省略... }
这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。
RibbonLoadBalancingHttpClient
Http客户端执行请求
选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),
我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute
, 该方法在执行具体的服务了
publicRibbonApacheHttpResponseexecute(RibbonApacheHttpRequestrequest, finalIClientConfigconfigOverride) throwsException { IClientConfigconfig=configOverride!=null?configOverride : this.config; RibbonPropertiesribbon=RibbonProperties.from(config); //封装请求配置,超时时间RequestConfigrequestConfig=RequestConfig.custom() .setConnectTimeout(ribbon.connectTimeout(this.connectTimeout)) .setSocketTimeout(ribbon.readTimeout(this.readTimeout)) .setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects)) .build(); //创建请求对象request=getSecureRequest(request, configOverride); finalHttpUriRequesthttpUriRequest=request.toRequest(requestConfig); //委派给 InternalHttpClient 客户去执行finalHttpResponsehttpResponse=this.delegate.execute(httpUriRequest); //封装结果返回returnnewRibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); }
这里封装了请求配置,创建请求对象,然后交给apache
的 InternalHttpClient
去执行请求,最后把结果封装成RibbonApacheHttpResponse
返回。
最后响应结果会在RibbonRoutingFilter
中被设置到RequestContext
上下文对象中 ,会通过SendResponseFilter
去处理把上下文对象中的响应结果写给客户端
到这里整理执行流程结束,我们来这里一下流程