上一篇我们了解到Ribbon主要由如下几个组件组成:
- 所有Ribbon负载均衡器需要实现的接口IClient
- 服务实例列表维护机制实现的接口ServerList
- 负载均衡数据记录LoadBalancerStats
- 负责选取Server的接口ILoadBalancer
- 负载均衡选取规则实现的接口IRule
- 检查实例是否存活实现的接口IPing
- 服务实例列表更新机制实现的接口ServerListUpdater
- 服务实例列表过滤机制ServerListFilter
我们会逐个分析
1. 所有Ribbon负载均衡器需要实现的接口IClient
对于这个IClient,之前我们说到执行器逻辑,例如重试还有异常处理,都在这里处理。我们看他的默认抽象类实现AbstractLoadBalancerAwareClient:
AbstractLoadBalancerAwareClient.java
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { //获取重试处理器,这个由其他实现类动态实现 RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig); //构造LoadBalancerCommand,RxJava风格 LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) .withRetryHandler(handler) .withLoadBalancerURI(request.getUri()) .build(); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { //修改原始url为实际的url URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { //执行请求 return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } } public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);
这个构造的LoadBalancerCommand是一个RxJava风格的,它包含了重试和异常处理机制:
LoadBalancerCommand.java
//返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个 private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); } public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } //获取在每个服务实例重试的的次数 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); //最多尝试几个服务实例 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); //对于每个服务实例的调用逻辑 //默认field server是null,通过selectServer()方法获取一个Server Observable<T> o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override //对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用 public Observable<T> call(Server server) { //设置上下文 context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); //每个Server包含重试逻辑的请求调用 Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { context.incAttemptCount(); //增加Server正在处理的请求计数 loadBalancerContext.noteOpenConnection(stats); //监听器 if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } //计时器 final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); //operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求 //doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计 return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { //记录请求完成 recordStats(tracer, stats, entity, null); } @Override public void onError(Throwable e) { //记录请求结束 recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); //发生了错误,通知listener if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { //因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功 this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); if (maxRetrysSame > 0) //是否retry o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); if (maxRetrysNext > 0 && server == null) //是否retry,如果retry回调用selectServer()返回下一个Server o = o.retry(retryPolicy(maxRetrysNext, false)); //异常处理 return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { //如果超过重试次数,则抛异常 if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
2. 服务实例列表维护机制实现的接口ServerList
AbstractServerList.java
其实这个抽象类一是在实现ServerList接口的同时,实现了IClientConfigAware这个接口,代表是可配置的。
同时,提供了一个生成默认ServerListFilter(这个Filter的实现类是由NIWSServerListFilterClassName这个配置决定,默认是ZoneAffinityServerListFilter)的方法
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware { public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{ try { String niwsServerListFilterClassName = niwsClientConfig .getProperty( CommonClientConfigKey.NIWSServerListFilterClassName, ZoneAffinityServerListFilter.class.getName()) .toString(); AbstractServerListFilter<T> abstractNIWSServerListFilter = (AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig); return abstractNIWSServerListFilter; } catch (Throwable e) { throw new ClientException( ClientException.ErrorType.CONFIGURATION, "Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:" + niwsClientConfig .getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e); } } }
ConfigurationBasedServerList.java
这个是默认的实现,如果没有特殊配置,ServerList的实现类就是ConfigurationBasedServerList;这个实际上就是从配置中读取ServerList,这个配置可以是动态配置,例如是Archaius
public class ConfigurationBasedServerList extends AbstractServerList<Server> { private IClientConfig clientConfig; @Override public List<Server> getInitialListOfServers() { return getUpdatedListOfServers(); } @Override public List<Server> getUpdatedListOfServers() { String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers); return derive(listOfServers); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { this.clientConfig = clientConfig; } //可以看出这个配置就是以逗号分隔的字符串 private List<Server> derive(String value) { List<Server> list = Lists.newArrayList(); if (!Strings.isNullOrEmpty(value)) { for (String s: value.split(",")) { list.add(new Server(s.trim())); } } return list; } }
DiscoveryEnabledNIWSServerList.java
这个就是从Eureka上面获取Server列表的类,构造的时候需要传入相关配置以及最重要的EurekaClient的Provider来获取合适的EurekaClient以便于获取Server列表。
实现ServerList接口的方法都是基于obtainServersViaDiscovery这个方法:
@Override public List<DiscoveryEnabledServer> getInitialListOfServers(){ return obtainServersViaDiscovery(); } @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); //如果EurekaClient没有被初始化,则日志报警并返回空的列表 if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); //这里的vipAddresses其实就是微服务名称的各种形式,但是注意,它们代表的是同一个微服务 if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { //是否覆盖port if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } //这里复制一份是因为不希望其他的地方修改原有的实例信息 InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } //如果有一个vipAddress有服务列表,我们就不用获取剩余的了 if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; } } } return serverList; }
到这里我们可以看出,Ribbon和Eureka的配合其实就是Ribbon从Eureka中利用微服务名称获取Server列表;那么这个列表是如何更新的呢,在Eureka的章节我们提到过,Ribbon定时从EurekaClient获取服务实例列表更新,这就涉及到了下一个我们要讲到的Ribbon元素 - 服务实例列表更新机制实现的接口ServerListUpdater