SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: RibbonCommandContext在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

前言

经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》


回顾一下zuul的执行流程,Zuul的执行流程是这样的

  • 首先请求进来会先到达ZuulControllerZuulController把请求交给ZuulServlet去处理
  • ZuulServelt会调用 ZuulRunner 依次执行: init初始化,pre前置filter,route路由filter,post后置filter, 出现异常会执行 error 异常filter
  • ZuulRunner通过 FilterProcessor 去执行各种Filter,FilterProcessor通过 FilterLoader 加载 各种filters

今天的主角儿 RibbonRoutingFilter 是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,

RibbonRoutingFilter#run

我们直接看它的源码,见RibbonRoutingFilter#shouldFilter

@OverridepublicbooleanshouldFilter() {
RequestContextctx=RequestContext.getCurrentContext();
//上下文中必须有 serviceId return (ctx.getRouteHost() ==null&&ctx.get(SERVICE_ID_KEY) !=null&&ctx.sendZuulResponse());
}

这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run

publicObjectrun() {
RequestContextcontext=RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders();
try {
//构建一个Ribbon的命令上下文RibbonCommandContextcommandContext=buildCommandContext(context);
//执行Ribbon,发送请求ClientHttpResponseresponse=forward(commandContext);
//设置结果setResponse(response);
returnresponse;
    }
catch (ZuulExceptionex) {
thrownewZuulRuntimeException(ex);
    }
catch (Exceptionex) {
thrownewZuulRuntimeException(ex);
    }
}
//把响应结果设置到RequestContext上下文对象中protectedvoidsetResponse(ClientHttpResponseresp)
throwsClientException, IOException {
//把响应结果设置到RequestContext上下文对象中RequestContext.getCurrentContext().set("zuulResponse", resp);
this.helper.setResponse(resp.getRawStatusCode(),
resp.getBody() ==null?null : resp.getBody(), resp.getHeaders());
}

RibbonCommandContext

在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果

先看一下buildCommandContext方法是如何构建RibbonCommandContext

protectedRibbonCommandContextbuildCommandContext(RequestContextcontext) {
HttpServletRequestrequest=context.getRequest();
//获取请求头MultiValueMap<String, String>headers=this.helper        .buildZuulRequestHeaders(request);
//获取参数MultiValueMap<String, String>params=this.helper        .buildZuulRequestQueryParams(request);
//获取请求的方式       Stringverb=getVerb(request);
//获取请求体中的内容InputStreamrequestEntity=getRequestBody(request);
if (request.getContentLength() <0&&!verb.equalsIgnoreCase("GET")) {
context.setChunkedRequestBody();
    }
//调用的服务IDStringserviceId= (String) context.get(SERVICE_ID_KEY);
//是否重试Booleanretryable= (Boolean) context.get(RETRYABLE_KEY);
ObjectloadBalancerKey=context.get(LOAD_BALANCER_KEY);
//请求的资源路径Stringuri=this.helper.buildZuulRequestURI(request);
// remove double slashesuri=uri.replace("//", "/");
//getContentLength 内容长度longcontentLength=useServlet31?request.getContentLengthLong(): request.getContentLength();
//创建一个 RibbonCommandContextreturnnewRibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
}

这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext ,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext 的源码

publicclassRibbonCommandContext {
privatefinalStringserviceId; //服务idprivatefinalStringmethod;    //请求方式  privatefinalStringuri;       //请求资源URIprivatefinalBooleanretryable;    //重试privatefinalMultiValueMap<String, String>headers; //请求头privatefinalMultiValueMap<String, String>params; //请求产妇privatefinalList<RibbonRequestCustomizer>requestCustomizers;
privateInputStreamrequestEntity;  //请求体内容privateLongcontentLength; //内容长度  privateObjectloadBalancerKey; //负载均衡器}

HttpClientRibbonCommand的创建

接着我们看一下 forward 方法是如何调用下游微服务的 RibbonRoutingFilter#forward

protectedClientHttpResponseforward(RibbonCommandContextcontext) throwsException {
Map<String, Object>info=this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
//使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand RibbonCommandcommand=this.ribbonCommandFactory.create(context);
try {
//执行请求ClientHttpResponseresponse=command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
returnresponse;
    }
catch (HystrixRuntimeExceptionex) {
returnhandleException(info, ex);
    }
}

这里 使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand ,然后执行 RibbonCommand.execute ,我们看一下 ribbonCommandFactory.create

@OverridepublicHttpClientRibbonCommandcreate(finalRibbonCommandContextcontext) {
//获取服务对应的降级FallbackProviderzuulFallbackProvider=getFallbackProvider(context.getServiceId());
finalStringserviceId=context.getServiceId();
//获取Ribbon的负载均衡Http客户端finalRibbonLoadBalancingHttpClientclient=this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
//获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
//创建一个 HttpClientRibbonCommand returnnewHttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}

HttpClientRibbonCommandFactory#create 方法主要是 把服务的降级FallbackProvider,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer 负载均衡器设置给 HttpClientRibbonCommand 对象并返回

HttpClientRibbonCommand 是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类

HttpClientRibbonCommand 的执行

代码回到 RibbonRoutingFilter#forward 方法,接下来分析 RibbonCommand.execute 具体执行流程

ClientHttpResponseresponse=command.execute();

进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute

publicRexecute() {
try {
returnqueue().get();
    } catch (Exceptione) {
throwExceptions.sneakyThrow(decomposeException(e));
    }
}

进入qeueu方法

publicFuture<R>queue() {
/** The Future returned by Observable.toBlocking().toFuture() does not implement the* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;* thus, to comply with the contract of Future, we must wrap around it.*/// 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求finalFuture<R>delegate=toObservable().toBlocking().toFuture();
finalFuture<R>f=newFuture<R>() {
@Overridepublicbooleancancel(booleanmayInterruptIfRunning) {
            ...省略...
returnres;
        }
@OverridepublicbooleanisCancelled() {
returndelegate.isCancelled();
        }
@OverridepublicbooleanisDone() {
returndelegate.isDone();
        }
@OverridepublicRget() throwsInterruptedException, ExecutionException {
returndelegate.get();
        }
@OverridepublicRget(longtimeout, TimeUnitunit) throwsInterruptedException, ExecutionException, TimeoutException {
returndelegate.get(timeout, unit);
        }
    };
/* special handling of error states that throw immediately */if (f.isDone()) {
try {
//获取结果f.get();
returnf;
        } catch (Exceptione) {
Throwablet=decomposeException(e);
if (tinstanceofHystrixBadRequestException) {
returnf;
            } elseif (tinstanceofHystrixRuntimeException) {
HystrixRuntimeExceptionhre= (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
caseCOMMAND_EXCEPTION:
caseTIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errorsreturnf;
default:
// these are errors we throw from queue() as they as rejection type errorsthrowhre;
                }
            } else {
throwExceptions.sneakyThrow(t);
            }
        }
    }
returnf;
}

这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture(); 这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run

@OverrideprotectedClientHttpResponserun() throwsException {
finalRequestContextcontext=RequestContext.getCurrentContext();
//创建请求,实现类是 RibbonApacheHttpRequestRQrequest=createRequest();
RSresponse;
//可重试的客户端 booleanretryableClient=this.clientinstanceofAbstractLoadBalancingClient&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
if (retryableClient) {
response=this.client.execute(request, config);
    } else {
//默认走这里使用负载均衡器执行response=this.client.executeWithLoadBalancer(request, config);
    }
//把结果设置到RequestContext 上下文context.set("ribbonResponse", response);
// Explicitly close the HttpResponse if the Hystrix command timed out to// release the underlying HTTP connection held by the response.//响应超时,关闭responseif (this.isResponseTimedOut()) {
if (response!=null) {
response.close();
        }
    }
//返回结果returnnewRibbonHttpResponse(response);
}

RibbonLoadBalancingHttpClient Http客户端

关键代码在 this.client.execute(request, config); 这里的 client是一个 RibbonLoadBalancingHttpClient ,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer

publicTexecuteWithLoadBalancer(finalSrequest, finalIClientConfigrequestConfig) throwsClientException {
LoadBalancerCommand<T>command=buildLoadBalancerCommand(request, requestConfig);
try {
//1.调用 LoadBalancerCommand的submit执行请求returncommand.submit(
newServerOperation<T>() {
@OverridepublicObservable<T>call(Serverserver) {
//使用服务器重构URIURIfinalUri=reconstructURIWithServer(server, request.getUri());
SrequestForServer= (S) request.replaceUri(finalUri);
try {
//2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求returnObservable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
catch (Exceptione) {
returnObservable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } catch (Exceptione) {
Throwablet=e.getCause();
if (tinstanceofClientException) {
throw (ClientException) t;
        } else {
thrownewClientException(e);
        }
    }
}

方法中有两个重要的代码

  • command.submit : 调用 LoadBalancerCommand的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
  • AbstractLoadBalancerAwareClient.this.execute:执行具体的服务

LoadBalancerCommand.submit 提交请求

先看LoadBalancerCommand.submit

publicObservable<T>submit(finalServerOperation<T>operation) {
finalExecutionInfoContextcontext=newExecutionInfoContext();
if (listenerInvoker!=null) {
try {
listenerInvoker.onExecutionStart();
        } catch (AbortExecutionExceptione) {
returnObservable.error(e);
        }
    }
finalintmaxRetrysSame=retryHandler.getMaxRetriesOnSameServer();
finalintmaxRetrysNext=retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer//这里在做负载均衡 selectServerObservable<T>o=        (server==null?selectServer() : Observable.just(server))
        .concatMap(newFunc1<Server, Observable<T>>() {
@Override// Called for each server being selected//为每个选定的服务器调用publicObservable<T>call(Serverserver) {
                ...省略...
                }

很关键的一行代码 server == null ? selectServer() : Observable.just(server))它在使用负载均衡选择服务,跟一下 selectServer方法

privateObservable<Server>selectServer() {
returnObservable.create(newOnSubscribe<Server>() {
@Overridepublicvoidcall(Subscriber<?superServer>next) {
try {
//负载均衡Serverserver=loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
next.onNext(server);
next.onCompleted();
            } catch (Exceptione) {
next.onError(e);
            }
        }
    });
}

继续跟 loadBalancerContext.getServerFromLoadBalancer 方法

publicServergetServerFromLoadBalancer(@NullableURIoriginal, @NullableObjectloadBalancerKey) throwsClientException {
Stringhost=null;
intport=-1;
if (original!=null) {
host=original.getHost();
    }
if (original!=null) {
Pair<String, Integer>schemeAndPort=deriveSchemeAndPortFromPartialUri(original);        
port=schemeAndPort.second();
    }
// Various Supported Cases// The loadbalancer to use and the instances it has is based on how it was registered// In each of these cases, the client might come in using Full Url or Partial URLILoadBalancerlb=getLoadBalancer();
if (host==null) {
// Partial URI or no URI Case// well we have to just get the right instances from lb - or we fall backif (lb!=null){
//通过 ILoadBalancer的chooseServer方法选择服务Serversvc=lb.chooseServer(loadBalancerKey);
if (svc==null){
thrownewClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "+clientName);
            }
host=svc.getHost();
if (host==null){
thrownewClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :"+svc);
            }
logger.debug("{} using LB returned Server: {} for request {}", newObject[]{clientName, svc, original});
returnsvc;
            ...省略...
            }

这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。

RibbonLoadBalancingHttpClient Http客户端执行请求

选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute , 该方法在执行具体的服务了

@OverridepublicRibbonApacheHttpResponseexecute(RibbonApacheHttpRequestrequest,
finalIClientConfigconfigOverride) throwsException {
IClientConfigconfig=configOverride!=null?configOverride : this.config;
RibbonPropertiesribbon=RibbonProperties.from(config);
//封装请求配置,超时时间RequestConfigrequestConfig=RequestConfig.custom()
        .setConnectTimeout(ribbon.connectTimeout(this.connectTimeout))
        .setSocketTimeout(ribbon.readTimeout(this.readTimeout))
        .setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects))
        .build();
//创建请求对象request=getSecureRequest(request, configOverride);
finalHttpUriRequesthttpUriRequest=request.toRequest(requestConfig);
//委派给 InternalHttpClient 客户去执行finalHttpResponsehttpResponse=this.delegate.execute(httpUriRequest);
//封装结果返回returnnewRibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}

这里封装了请求配置,创建请求对象,然后交给apacheInternalHttpClient 去执行请求,最后把结果封装成RibbonApacheHttpResponse返回。

最后响应结果会在RibbonRoutingFilter中被设置到RequestContext 上下文对象中 ,会通过SendResponseFilter去处理把上下文对象中的响应结果写给客户端

到这里整理执行流程结束,我们来这里一下流程

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
10天前
|
负载均衡 监控 网络协议
SpringCloud之Ribbon使用
通过以上步骤,就可以在Spring Cloud项目中有效地使用Ribbon来实现服务调用的负载均衡,提高系统的可靠性和性能。在实际应用中,根据具体的业务场景和需求选择合适的负载均衡策略,并进行相应的配置和优化,以确保系统的稳定运行。
36 15
|
10天前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
26 5
|
2月前
|
设计模式 Java 关系型数据库
【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析
本文是“Java学习路线”专栏的导航文章,目标是为Java初学者和初中高级工程师提供一套完整的Java学习路线。
410 37
|
2月前
|
负载均衡 Java 网络架构
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
实现微服务网关:Zuul与Spring Cloud Gateway的比较分析
102 5
|
3月前
|
人工智能 前端开发 Java
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
本文介绍了如何使用 **Spring Cloud Alibaba AI** 构建基于 Spring Boot 和 uni-app 的聊天机器人应用。主要内容包括:Spring Cloud Alibaba AI 的概念与功能,使用前的准备工作(如 JDK 17+、Spring Boot 3.0+ 及通义 API-KEY),详细实操步骤(涵盖前后端开发工具、组件选择、功能分析及关键代码示例)。最终展示了如何成功实现具备基本聊天功能的 AI 应用,帮助读者快速搭建智能聊天系统并探索更多高级功能。
1342 2
【实操】Spring Cloud Alibaba AI,阿里AI这不得玩一下(含前后端源码)
|
2月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
3月前
|
缓存 负载均衡 Java
OpenFeign最核心组件LoadBalancerFeignClient详解(集成Ribbon负载均衡能力)
文章标题为“OpenFeign的Ribbon负载均衡详解”,是继OpenFeign十大可扩展组件讨论之后,深入探讨了Ribbon如何为OpenFeign提供负载均衡能力的详解。
OpenFeign最核心组件LoadBalancerFeignClient详解(集成Ribbon负载均衡能力)
|
3月前
|
负载均衡 算法 Java
SpringCloud之Ribbon使用
通过 Ribbon,可以非常便捷的在微服务架构中实现请求负载均衡,提升系统的高可用性和伸缩性。在实际使用中,需要根据实际场景选择合适的负载均衡策略,并对其进行适当配置,以达到更佳的负载均衡效果。
59 13
|
2月前
|
负载均衡 Java 开发者
Ribbon框架实现客户端负载均衡的方法与技巧
Ribbon框架为微服务架构中的客户端负载均衡提供了强大的支持。通过简单的配置和集成,开发者可以轻松地在应用中实现服务的发现、选择和负载均衡。适当地使用Ribbon,配合其他Spring Cloud组件,可以有效提升微服务架构的可用性和性能。
32 0
|
4月前
|
负载均衡 算法 网络协议
Ribbon 负载均衡源码解读
Ribbon 负载均衡源码解读
61 15
Ribbon 负载均衡源码解读