开篇提示:本文的讲解中,ribbon底层依赖于OkHttpClient,配置如下:
#ribbon配置 ribbon.okhttp.enabled=true # 请求连接的超时时间 默认的时间为1秒,在RibbonClientConfiguration类 springboot-mybatis.ribbon.ConnectTimeout=2000 # 请求处理的超时时间 springboot-mybatis.ribbon.ReadTimeout=15000
我把ribbon官网上关于ribbon中组件的描述贴出来:
#决定怎样从ServerList中选择一个Server Rule - a logic component to determine which server to return from a list #定时判断服务是否存活 Ping - a component running in background to ensure liveness of servers #服务列表 ServerList - this can be static or dynamic. If it is dynamic (as used by DynamicServerListLoadBalancer), a background thread will refresh and filter the list at certain interval
当我们使用openfeign作为eureka客户端时,我们在@FeignClient中是不用指定url的,ribbon帮我们做了负载均衡。
回顾一下上篇《再谈openfeign,聊聊它的源代码》,之前提到,不指定url的情况下,如果我们底层httpclient选择了okhttpclient,程序是不会直接调用okhttpclient的,而是会选择使用LoadBalancerFeignClient作为代理去调用,代码如下:
public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { //省略异常处理代码 } }
上面的executeWithLoadBalancer调用了AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,代码如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { //这里的finalUri是http://192.168.0.118:8083/feign/feignReadTimeout URI finalUri = reconstructURIWithServer(server, request.getUri());//这个就是一个拼接url的方法,不细讲了 //下面的requestForServer是FeignLoadBalancer,它是AbstractLoadBalancerAwareClient的子类 S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } //省略异常处理的代码 }
这个方法我们首先创建了一个LoadBalancerCommand对象,然后用这个对象的submit方法来执行请求。从我们上面的注解中我们可以看到,call方法的server参数里面有我们请求要发往的ip和port,这就是ribbon负载均衡器给我们的。我们看一下下面的UML类图:
从这里可以看到,RibbonClientConfiguration类中定义了RibbonLoadBalancerContext这个bean和ZoneAwareLoadBalancer这个bean的初始化,RibbonLoadBalancerContext包装了ZoneAwareLoadBalancer。
Server来源
LoadBalancerCommand在执行submit方法的时候,会选择一个Server(selectServer方法),这个Server里面有feign要发送http请求的地址和端口。
这里有个selectServer()方法,就是获取Server对象的,我们看一下下面的源代码:
private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }
上面的getServerFromLoadBalancer方法在LoadBalancerContext,代码如下:
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { String host = null; int port = -1; if (original != null) { //这里取到是-1 host = original.getHost(); } if (original != null) { Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original); //这里返回-1 port = schemeAndPort.second(); } // Various Supported Cases // The loadbalancer to use and the instances it has is based on how it was registered // In each of these cases, the client might come in using Full Url or Partial URL //这里是DynamicServerListLoadBalancer ILoadBalancer lb = getLoadBalancer(); if (host == null) { // Partial URI or no URI Case // well we have to just get the right instances from lb - or we fall back //这里的lb是ZoneAwareLoadBalancer if (lb != null){ Server svc = lb.chooseServer(loadBalancerKey); if (svc == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Load balancer does not have available server for client: " + clientName); } host = svc.getHost(); if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :" + svc); } logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original}); return svc; } else { //省略vipAddresses的逻辑 } } else { //省略这部分逻辑 } // end of creating final URL if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to"); } // just verify that at this point we have a full URL return new Server(host, port); }
这里的LoadBalancer我们用的是ZoneAwareLoadBalancer,我们看一下LoadBalancer的继承关系,UML类图如下:
chooseServer方法在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } //内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0} counter.increment(); if (rule == null) { return null; } else { try { //这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
ZoneAvoidanceRule类的继承关系也比较复杂,我们后面再讲。
下面我们看一下PredicateBasedRule中的的choose方法:
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //这里的LoadBalancer是ZoneAwareLoadBalancer Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
到这里,我们都看到了ribbon中一个关键的组件,叫ServerList。
从eureka拉取ServerList
从上节的介绍中我们看到,获取server实际上是从ZoneAwareLoadBalancer的getAllServers,这个方法在它的父类BaseLoadBalancer,代码如下:
public List<Server> getAllServers() { return Collections.unmodifiableList(allServerList); }
这个allServerList还是在ZoneAwareLoadBalancer的父类BaseLoadBalancer,那么它是怎么来的呢?我们下看下面这个UML类图:
其中DiscoveryClient是核心,这里CacheRefreshThread在定时线程池里面执行,会使用EurekaHttpClient定时的从eureka拉取服务列表并更新。这里的核心方法是fetchRegistry,这个方法在DiscoveryClient创建的时候也会调用,而DiscoveryClient的初始化在EurekaClientAutoConfiguration这个配置类里面。
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //去除日志打印 // getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
我们看一下上面方法里面的getAndStoreFullRegistry方法:
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; //这个调用EurekaHttpClientDecorator去向eureka发送请求 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { /** * 返回的Applications放到了localRegionApps上, * 类型是一个原子引用类AtomicReference<Applications> * * 这个filterAndShuffle方法一定要注意,这里调用了Applications的shuffleInstances方法, * 这个方法把拉取到的服务列表放到了virtualHostNameAppMap,从而给Balancer提供服务 */ localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
EurekaHttpClientDecorator的getApplications方法最终调用了AbstractJerseyEurekaHttpClient类的getApplicationsInternal方法,代码如下:
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //这里发送了一个get请求 response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { applications = response.getEntity(Applications.class); } //响应封装了applications return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { //省略处理代码 }
而ZoneAwareLoadBalancer获取列表的时候,正是从Applications中的virtualHostNameAppMap获取,UML类图如下:
到这里,整个流程就通了,DiscoveryClient类似于生产者,负责从Eureka拉取服务列表并赋值给Applications,而ZoneAwareLoadBalancer则类似于消费者,从Applications获取服务列表。
缓存更新
ribbon的缓存更新有2个地方,一个是我们之前的配置:
springboot-mybatis.ribbon.ServerListRefreshInterval=3
这个key定义在CommonClientConfigKey类,使用的地方在PollingServerListUpdater,代码如下:
public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, /**这里就是上面配置的ServerListRefreshInterval**/ refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
上面的updateAction的实现在DynamicServerListLoadBalancer类,代码如下:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { /**这里就进入了更新ServerList的代码**/ updateListOfServers(); } };
另一个更新缓存的地方在DiscoveryClient,代码如下:
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { //这个时间默认是30s int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //定时线程池 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, /**这个是定时线程池要执行的任务**/ new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } }
CacheRefreshThread的定义如下:
class CacheRefreshThread implements Runnable { public void run() { //这里进入了更新ServerList的代码 refreshRegistry(); } }
关于ping
上面讲的ribbon官方文档提到过ping,是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。
ping任务的初始化在BaseLoadBalancer构造函数中初始化,代码如下:
public BaseLoadBalancer() { this.name = DEFAULT_NAME; this.ping = null; setRule(DEFAULT_RULE); //这里开启了ping任务 setupPingTask(); lbStats = new LoadBalancerStats(DEFAULT_NAME); } void setupPingTask() { if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); //pingIntervalSeconds默认是10ms lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); } class PingTask extends TimerTask { public void run() { try { new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }
我们看一下ping方法的核心逻辑:
public void runPinger() throws Exception { if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // we are "in" - we get to Ping Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { /* * The readLock should be free unless an addServer operation is * going on... */ allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); int numCandidates = allServers.length; //对所有的 server进行ping操作后返回一个boolean类型数组 results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { //把在线状态的Server加入新的ServerList newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); //覆盖掉旧的ServerList upServerList = newUpList; upLock.unlock(); //这里的ServerStatusChangeListener没有找到实现类 notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } }
可见对于ping失败的Server,会从ServerList移除。
聊聊rule
上面讲chooseServer时我们知道,代码在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } //内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0} counter.increment(); if (rule == null) { return null; } else { try { //这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
这里的Rule用于选择从服务列表上返回哪个Server。Rule的继承关系如下图所示:
我们来看一下PredicateBasedRule的choose方法,代码如下:
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //这里getPredicate()返回AbstractServerPredicate Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
AbstractServerPredicate的chooseRoundRobinAfterFiltering方法如下:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { //这里的loadBalancerKey是null List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } } public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) { return new AbstractServerPredicate() { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP") public boolean apply(PredicateKey input) { return p.apply(input); } }; }
上面方法的p有2个Predicates,debug的内容如下:
p = {Predicates$AndPredicate@14490} "Predicates.and(com.netflix.loadbalancer.ZoneAvoidancePredicate@70d2b344,com.netflix.loadbalancer.AvailabilityPredicate@7380394)" components = {ArrayList@14673} size = 2 0 = {ZoneAvoidancePredicate@14675} 1 = {AvailabilityPredicate@14676}
我们看一下他们的apply方法,先看AvailabilityPredicate:
public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { //根据ServerStats进行判断 if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; }
我们再看一下ZoneAvoidancePredicate的apply方法,这段代码的注解很清晰,zone数量小于等于一个或者取不到stats返回true,如果可用的zone里面没有这个server,返回失败:
public boolean apply(@Nullable PredicateKey input) { if (!ENABLED.get()) { return true; } String serverZone = input.getServer().getZone(); if (serverZone == null) { // there is no zone information from the server, we do not want to filter // out this server return true; } LoadBalancerStats lbStats = getLBStats(); if (lbStats == null) { // no stats available, do not filter return true; } if (lbStats.getAvailableZones().size() <= 1) { // only one zone is available, do not filter return true; } Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); if (!zoneSnapshot.keySet().contains(serverZone)) { // The server zone is unknown to the load balancer, do not filter it out return true; } logger.debug("Zone snapshots: {}", zoneSnapshot); Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null) { return availableZones.contains(input.getServer().getZone()); } else { return false; } }
这2个Predicate是在CompositePredicate的建造者中传入的,代码如下:
Builder(AbstractServerPredicate ...primaryPredicates) { toBuild = new CompositePredicate(); Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates); toBuild.delegate = AbstractServerPredicate.ofKeyPredicate(chain); }
而调用这个建造者函数的地方是在ZoneAvoidanceRule初始化的时候传入的
public ZoneAvoidanceRule() { super(); ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { return CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); }
当然,Predicate也是支持自定义配置的,感兴趣的大家可以研究,获取配置的代码如下:
public void initWithNiwsConfig(IClientConfig clientConfig) { ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); }
ribbon重试
ribbon重试功能对服务的优雅发布有一定好处。这里首先要提示一下,ribbon负载均衡要依赖于spring的retry,需要pom中添加下面依赖:
<dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> <version>1.2.5.RELEASE</version> </dependency
重试的配置如下:
#Ribbon缓存更新周期默认30s,改为3s springboot-mybatis.ribbon.ServerListRefreshInterval=3 #同一台实例最大重试次数,不包括首次调用 springboot-mybatis.ribbon.MaxAutoRetries=1 #重试负载均衡其他的实例最大重试次数,不包括首次调用 springboot-mybatis.ribbon.MaxAutoRetriesNextServer=1 #对所有操作请求都进行重试,如果改为true,对post也重试 springboot-mybatis.ribbon.OkToRetryOnAllOperations=false springboot-mybatis.ribbon.retryableStatusCodes=404,408,502,500
下面我给出一个UML类图:
重试的核心代码在LoadBalancerCommand类submit方法,下面的原生注释大家一定能要看一下:
/** * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer. * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful * result during execution and retries will be emitted. */ public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } //上面配置的MaxAutoRetries和MaxAutoRetriesNextServer,这里都是1 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable<T> o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override // Called for each server being selected public Observable<T> call(Server server) { context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); //省略部分代码 final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); //初始化会走到这里 if (maxRetrysNext > 0 && server == null) //retryPolicy会判断错误码类型 o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { //重试次数超过后要抛出异常 if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
这个方法里面的retryPolicy会捕获异常并判断是不是RetriableException,而RetriableException就是我们配置的4个错误码:404,408,502,500
我们再看下RetryableFeignLoadBalancer的execute方法:
public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride) throws IOException { final Request.Options options; if (configOverride != null) { RibbonProperties ribbon = RibbonProperties.from(configOverride); options = new Request.Options( ribbon.connectTimeout(this.connectTimeout), ribbon.readTimeout(this.readTimeout)); } else { options = new Request.Options(this.connectTimeout, this.readTimeout); } final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this); RetryTemplate retryTemplate = new RetryTemplate(); BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName()); retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy); RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName()); if (retryListeners != null && retryListeners.length != 0) { retryTemplate.setListeners(retryListeners); } //retryPolicy是RibbonLoadBalancedRetryPolicy,里面的lbContext里面有我们的参数 retryTemplate.setRetryPolicy(retryPolicy == null ? new NeverRetryPolicy() : new FeignRetryPolicy(request.toHttpRequest(), retryPolicy, this, this.getClientName())); //retryTemplate.execute方法里失败后会直接进行重试,用while循环进行重试 return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() { @Override public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException { Request feignRequest = null; //on retries the policy will choose the server and set it in the context //extract the server and update the request being made if (retryContext instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance(); if (service != null) { feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest(); } } if (feignRequest == null) { feignRequest = request.toRequest(); } Response response = request.client().execute(feignRequest, options); /** * 判断错误码类型是不是我们配置的,是的话抛出RetryableStatusCodeException, * 这个是RetryableStatusCodeException的子类 */ if (retryPolicy != null && retryPolicy.retryableStatusCode(response.status())) { byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream()); response.close(); throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response, byteArray, request.getUri()); } return new RibbonResponse(request.getUri(), response); } }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() { @Override protected RibbonResponse createResponse(Response response, URI uri) { return new RibbonResponse(uri, response); } }); }
总结
ribbon作为openfeign的负载均衡器,是懒加载的,只有openfeign第一次获取服务列表的时候,才会初始化LoadBalancer,也才会初始化ping和缓存刷新任务。
ribbon重试对优雅发布会有作用,所以建议把MaxAutoRetries设置为0,即当前服务失败时直接请求下一个服务。
ribbon的源代码比较复杂,有一些RxJava的基础会容易一些。
ribbon的源代码非常烧脑,有问题欢迎大家交流。