前言
经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
回顾一下zuul的执行流程,Zuul的执行流程是这样的
- 首先请求进来会先到达
ZuulController
,ZuulController
把请求交给ZuulServlet
去处理 - 在
ZuulServelt
会调用ZuulRunner
依次执行: init初始化,pre
前置filter,route
路由filter,post
后置filter, 出现异常会执行error
异常filter ZuulRunner
通过FilterProcessor
去执行各种Filter,FilterProcessor通过FilterLoader
加载 各种filters
今天的主角儿 RibbonRoutingFilter
是“route”类型的Filter ,它的作用是使用Ribbon对后端的微服务做负载均衡,实现服务的调用,
RibbonRoutingFilter#run
我们直接看它的源码,见RibbonRoutingFilter#shouldFilter
@Override
public boolean shouldFilter() {
RequestContext ctx = RequestContext.getCurrentContext();
//上下文中必须有 serviceId
return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
&& ctx.sendZuulResponse());
}
这里做了简单判断,上下文中必须有 serviceId ,默认的true ,然后会执行run方法 RibbonRoutingFilter#run
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders();
try {
//构建一个Ribbon的命令上下文
RibbonCommandContext commandContext = buildCommandContext(context);
//执行Ribbon,发送请求
ClientHttpResponse response = forward(commandContext);
//设置结果
setResponse(response);
return response;
}
catch (ZuulException ex) {
throw new ZuulRuntimeException(ex);
}
catch (Exception ex) {
throw new ZuulRuntimeException(ex);
}
}
//把响应结果设置到RequestContext上下文对象中
protected void setResponse(ClientHttpResponse resp)
throws ClientException, IOException {
//把响应结果设置到RequestContext上下文对象中
RequestContext.getCurrentContext().set("zuulResponse", resp);
this.helper.setResponse(resp.getRawStatusCode(),
resp.getBody() == null ? null : resp.getBody(), resp.getHeaders());
}
RibbonCommandContext
在run方法中构建了一个 RibbonCommandContext
Ribbon的上下文对象,然后调用 forward
方法转发请求 ,通过 setResponse
方法设置结果
先看一下buildCommandContext
方法是如何构建RibbonCommandContext
的
protected RibbonCommandContext buildCommandContext(RequestContext context) {
HttpServletRequest request = context.getRequest();
//获取请求头
MultiValueMap<String, String> headers = this.helper
.buildZuulRequestHeaders(request);
//获取参数
MultiValueMap<String, String> params = this.helper
.buildZuulRequestQueryParams(request);
//获取请求的方式
String verb = getVerb(request);
//获取请求体中的内容
InputStream requestEntity = getRequestBody(request);
if (request.getContentLength() < 0 && !verb.equalsIgnoreCase("GET")) {
context.setChunkedRequestBody();
}
//调用的服务ID
String serviceId = (String) context.get(SERVICE_ID_KEY);
//是否重试
Boolean retryable = (Boolean) context.get(RETRYABLE_KEY);
Object loadBalancerKey = context.get(LOAD_BALANCER_KEY);
//请求的资源路径
String uri = this.helper.buildZuulRequestURI(request);
// remove double slashes
uri = uri.replace("//", "/");
//getContentLength 内容长度
long contentLength = useServlet31 ? request.getContentLengthLong(): request.getContentLength();
//创建一个 RibbonCommandContext
return new RibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
}
这里从请求中获取了请求头,请求参数,请求体内容,以及服务ID,请求的资源URL等信息封装分 RibbonCommandContext
,那RibbonCommandContext 其实就是对下游微服务的请求的上下文数据的封装,下面是 RibbonCommandContext
的源码
public class RibbonCommandContext {
private final String serviceId; //服务id
private final String method; //请求方式
private final String uri; //请求资源URI
private final Boolean retryable; //重试
private final MultiValueMap<String, String> headers; //请求头
private final MultiValueMap<String, String> params; //请求产妇
private final List<RibbonRequestCustomizer> requestCustomizers;
private InputStream requestEntity; //请求体内容
private Long contentLength; //内容长度
private Object loadBalancerKey; //负载均衡器
HttpClientRibbonCommand
的创建
接着我们看一下 forward
方法是如何调用下游微服务的 RibbonRoutingFilter#forward
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
//使用 ribbonCommandFactory 工厂根据context上下文 创建 RibbonCommand
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
//执行请求
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}
}
这里 使用 ribbonCommandFactory
工厂根据context上下文 创建 RibbonCommand
,然后执行 RibbonCommand.execute
,我们看一下 ribbonCommandFactory.create
@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
//获取服务对应的降级
FallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
final String serviceId = context.getServiceId();
//获取Ribbon的负载均衡Http客户端
final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
//获取 ILoadBalancer 负载均衡器,设置给 RibbonLoadBalancingHttpClient
client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
//创建一个 HttpClientRibbonCommand
return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}
HttpClientRibbonCommandFactory#create
方法主要是 把服务的降级FallbackProvider
,以及Ribbon的负载均衡Http客户端,以及ILoadBalancer
负载均衡器设置给 HttpClientRibbonCommand
对象并返回
HttpClientRibbonCommand
是Ribbon的Http客户端,是用来执行相关请求的,看一下它的继承体系它是HystrixCommand的子类
HttpClientRibbonCommand
的执行
代码回到 RibbonRoutingFilter#forward
方法,接下来分析 RibbonCommand.execute
具体执行流程
ClientHttpResponse response = command.execute();
进入这个方法,此时代码来到 com.netflix.hystrix.HystrixCommand#execute
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
进入qeueu方法
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
// 这里采用了观察者设计模式,使用了异步阻塞方式去执行请求
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
...省略...
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
//获取结果
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
这里采用了观察者设计模式,使用了异步阻塞方式去执行请求,final Future<R> delegate = toObservable().toBlocking().toFuture();
这个可以好好去研究一下,最代码会来到 AbstractRibbonCommand#run
中
@Override
protected ClientHttpResponse run() throws Exception {
final RequestContext context = RequestContext.getCurrentContext();
//创建请求,实现类是 RibbonApacheHttpRequest
RQ request = createRequest();
RS response;
//可重试的客户端
boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
if (retryableClient) {
response = this.client.execute(request, config);
} else {
//默认走这里使用负载均衡器执行
response = this.client.executeWithLoadBalancer(request, config);
}
//把结果设置到RequestContext 上下文
context.set("ribbonResponse", response);
// Explicitly close the HttpResponse if the Hystrix command timed out to
// release the underlying HTTP connection held by the response.
//响应超时,关闭response
if (this.isResponseTimedOut()) {
if (response != null) {
response.close();
}
}
//返回结果
return new RibbonHttpResponse(response);
}
RibbonLoadBalancingHttpClient
Http客户端
关键代码在 this.client.execute(request, config);
这里的 client是一个 RibbonLoadBalancingHttpClient
,继续跟踪 execute的代码来到 `AbstractLoadBalancerAwareClient#executeWithLoadBalancer
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
//1.调用 LoadBalancerCommand的submit执行请求
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
//使用服务器重构URI
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
//2.这里调用了 RibbonLoadBalancingHttpClient.execute方法执行请求
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
方法中有两个重要的代码
- command.submit : 调用
LoadBalancerCommand
的submit,它创建一个Observable 观察者,一旦订阅,该Observable将与负载均衡器选择的服务器异步执行网络调用。
- AbstractLoadBalancerAwareClient.this.execute:执行具体的服务
LoadBalancerCommand.submit
提交请求
先看LoadBalancerCommand.submit
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
//这里在做负载均衡 selectServer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
//为每个选定的服务器调用
public Observable<T> call(Server server) {
...省略...
很关键的一行代码 server == null ? selectServer() : Observable.just(server))
它在使用负载均衡选择服务,跟一下 selectServer方法
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
//负载均衡
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
继续跟 loadBalancerContext.getServerFromLoadBalancer
方法
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
//通过 ILoadBalancer的chooseServer方法选择服务
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{
clientName, svc, original});
return svc;
...省略...
这里调用了 ILoadBalancer 的 chooseServer方法,继续跟下去会调用 IRule 负载均衡策略的的choose方法选择服务。
RibbonLoadBalancingHttpClient
Http客户端执行请求
选择好服务之后原路返回,请求回到 AbstractLoadBalancerAwareClient#executeWithLoadBalancer(S, com.netflix.client.config.IClientConfig),
我们来跟第二个比较重要的代码RibbonLoadBalancingHttpClient#execute
, 该方法在执行具体的服务了
@Override
public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,
final IClientConfig configOverride) throws Exception {
IClientConfig config = configOverride != null ? configOverride : this.config;
RibbonProperties ribbon = RibbonProperties.from(config);
//封装请求配置,超时时间
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(ribbon.connectTimeout(this.connectTimeout))
.setSocketTimeout(ribbon.readTimeout(this.readTimeout))
.setRedirectsEnabled(ribbon.isFollowRedirects(this.followRedirects))
.build();
//创建请求对象
request = getSecureRequest(request, configOverride);
final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
//委派给 InternalHttpClient 客户去执行
final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
//封装结果返回
return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}
这里封装了请求配置,创建请求对象,然后交给apache
的 InternalHttpClient
去执行请求,最后把结果封装成RibbonApacheHttpResponse
返回。
最后响应结果会在RibbonRoutingFilter
中被设置到RequestContext
上下文对象中 ,会通过SendResponseFilter
去处理把上下文对象中的响应结果写给客户端
到这里整理执行流程结束,我们来这里一下流程