深入剖析ribbon源码

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: 深入剖析ribbon源码

开篇提示:本文的讲解中,ribbon底层依赖于OkHttpClient,配置如下:

#ribbon配置
ribbon.okhttp.enabled=true
# 请求连接的超时时间 默认的时间为1秒,在RibbonClientConfiguration类
springboot-mybatis.ribbon.ConnectTimeout=2000
# 请求处理的超时时间
springboot-mybatis.ribbon.ReadTimeout=15000

我把ribbon官网上关于ribbon中组件的描述贴出来

#决定怎样从ServerList中选择一个Server
Rule - a logic component to determine which server to return from a list
#定时判断服务是否存活
Ping - a component running in background to ensure liveness of servers
#服务列表
ServerList - this can be static or dynamic. If it is dynamic (as used by DynamicServerListLoadBalancer), a background thread will refresh and filter the list at certain interval

当我们使用openfeign作为eureka客户端时,我们在@FeignClient中是不用指定url的,ribbon帮我们做了负载均衡。


回顾一下上篇《再谈openfeign,聊聊它的源代码》,之前提到,不指定url的情况下,如果我们底层httpclient选择了okhttpclient,程序是不会直接调用okhttpclient的,而是会选择使用LoadBalancerFeignClient作为代理去调用,代码如下:

public Response execute(Request request, Request.Options options) throws IOException {
  try {
    URI asUri = URI.create(request.url());
    String clientName = asUri.getHost();
    URI uriWithoutHost = cleanUrl(request.url(), clientName);
    FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
        this.delegate, request, uriWithoutHost);
    IClientConfig requestConfig = getClientConfig(options, clientName);
    return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
        requestConfig).toResponse();
  }
  catch (ClientException e) {
    //省略异常处理代码
  }
}

上面的executeWithLoadBalancer调用了AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法,代码如下:

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                    //这里的finalUri是http://192.168.0.118:8083/feign/feignReadTimeout
                    URI finalUri = reconstructURIWithServer(server, request.getUri());//这个就是一个拼接url的方法,不细讲了
                    //下面的requestForServer是FeignLoadBalancer,它是AbstractLoadBalancerAwareClient的子类
                    S requestForServer = (S) request.replaceUri(finalUri);
                    try {
                        return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                    } 
                    catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .toBlocking()
            .single();
    } //省略异常处理的代码
}

这个方法我们首先创建了一个LoadBalancerCommand对象,然后用这个对象的submit方法来执行请求。从我们上面的注解中我们可以看到,call方法的server参数里面有我们请求要发往的ip和port,这就是ribbon负载均衡器给我们的。我们看一下下面的UML类图:

微信图片_20221212165943.png

从这里可以看到,RibbonClientConfiguration类中定义了RibbonLoadBalancerContext这个bean和ZoneAwareLoadBalancer这个bean的初始化,RibbonLoadBalancerContext包装了ZoneAwareLoadBalancer。


Server来源


LoadBalancerCommand在执行submit方法的时候,会选择一个Server(selectServer方法),这个Server里面有feign要发送http请求的地址和端口。


这里有个selectServer()方法,就是获取Server对象的,我们看一下下面的源代码:

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

上面的getServerFromLoadBalancer方法在LoadBalancerContext,代码如下:

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
    String host = null;
    int port = -1;
    if (original != null) {
      //这里取到是-1
        host = original.getHost();
    }
    if (original != null) {
        Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);      
        //这里返回-1    
        port = schemeAndPort.second();
    }
    // Various Supported Cases
    // The loadbalancer to use and the instances it has is based on how it was registered
    // In each of these cases, the client might come in using Full Url or Partial URL
  //这里是DynamicServerListLoadBalancer
    ILoadBalancer lb = getLoadBalancer();
    if (host == null) {
        // Partial URI or no URI Case
        // well we have to just get the right instances from lb - or we fall back
    //这里的lb是ZoneAwareLoadBalancer
        if (lb != null){
            Server svc = lb.chooseServer(loadBalancerKey);
            if (svc == null){
                throw new ClientException(ClientException.ErrorType.GENERAL,
                        "Load balancer does not have available server for client: "
                                + clientName);
            }
            host = svc.getHost();
            if (host == null){
                throw new ClientException(ClientException.ErrorType.GENERAL,
                        "Invalid Server for :" + svc);
            }
            logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
            return svc;
        } else {
        //省略vipAddresses的逻辑
        }
    } else {
      //省略这部分逻辑
    }
    // end of creating final URL
    if (host == null){
        throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
    }
    // just verify that at this point we have a full URL
    return new Server(host, port);
}

这里的LoadBalancer我们用的是ZoneAwareLoadBalancer,我们看一下LoadBalancer的继承关系,UML类图如下:

微信图片_20221212170054.png

chooseServer方法在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
  //内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0}
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
        //这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

ZoneAvoidanceRule类的继承关系也比较复杂,我们后面再讲。


下面我们看一下PredicateBasedRule中的的choose方法:

public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
  //这里的LoadBalancer是ZoneAwareLoadBalancer
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

到这里,我们都看到了ribbon中一个关键的组件,叫ServerList。


从eureka拉取ServerList


从上节的介绍中我们看到,获取server实际上是从ZoneAwareLoadBalancer的getAllServers,这个方法在它的父类BaseLoadBalancer,代码如下:

public List<Server> getAllServers() {
    return Collections.unmodifiableList(allServerList);
}

这个allServerList还是在ZoneAwareLoadBalancer的父类BaseLoadBalancer,那么它是怎么来的呢?我们下看下面这个UML类图:

微信图片_20221212170228.png

其中DiscoveryClient是核心,这里CacheRefreshThread在定时线程池里面执行,会使用EurekaHttpClient定时的从eureka拉取服务列表并更新。这里的核心方法是fetchRegistry,这个方法在DiscoveryClient创建的时候也会调用,而DiscoveryClient的初始化在EurekaClientAutoConfiguration这个配置类里面。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            //去除日志打印
      //
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}

我们看一下上面方法里面的getAndStoreFullRegistry方法:

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    logger.info("Getting all instance registry info from the eureka server");
    Applications apps = null;
  //这个调用EurekaHttpClientDecorator去向eureka发送请求
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
      /**
      * 返回的Applications放到了localRegionApps上,
      * 类型是一个原子引用类AtomicReference<Applications>
      *
      * 这个filterAndShuffle方法一定要注意,这里调用了Applications的shuffleInstances方法,
      * 这个方法把拉取到的服务列表放到了virtualHostNameAppMap,从而给Balancer提供服务
      */
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

EurekaHttpClientDecorator的getApplications方法最终调用了AbstractJerseyEurekaHttpClient类的getApplicationsInternal方法,代码如下:

private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
    //这里发送了一个get请求
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            applications = response.getEntity(Applications.class);
        }
    //响应封装了applications
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        //省略处理代码
}

而ZoneAwareLoadBalancer获取列表的时候,正是从Applications中的virtualHostNameAppMap获取,UML类图如下:

微信图片_20221212170415.png

到这里,整个流程就通了,DiscoveryClient类似于生产者,负责从Eureka拉取服务列表并赋值给Applications,而ZoneAwareLoadBalancer则类似于消费者,从Applications获取服务列表。


缓存更新


ribbon的缓存更新有2个地方,一个是我们之前的配置:


springboot-mybatis.ribbon.ServerListRefreshInterval=3

这个key定义在CommonClientConfigKey类,使用的地方在PollingServerListUpdater,代码如下:

public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };
        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
        /**这里就是上面配置的ServerListRefreshInterval**/
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

上面的updateAction的实现在DynamicServerListLoadBalancer类,代码如下:

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
      /**这里就进入了更新ServerList的代码**/
        updateListOfServers();
    }
};

另一个更新缓存的地方在DiscoveryClient,代码如下:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        //这个时间默认是30s
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    //定时线程池
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
            /**这个是定时线程池要执行的任务**/
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
}

CacheRefreshThread的定义如下:

class CacheRefreshThread implements Runnable {
    public void run() {
      //这里进入了更新ServerList的代码
        refreshRegistry();
    }
}

关于ping


上面讲的ribbon官方文档提到过ping,是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。


ping任务的初始化在BaseLoadBalancer构造函数中初始化,代码如下:

public BaseLoadBalancer() {
    this.name = DEFAULT_NAME;
    this.ping = null;
    setRule(DEFAULT_RULE);
  //这里开启了ping任务
    setupPingTask();
    lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
  //pingIntervalSeconds默认是10ms
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}
class PingTask extends TimerTask {
    public void run() {
        try {
          new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

我们看一下ping方法的核心逻辑:

public void runPinger() throws Exception {
    if (!pingInProgress.compareAndSet(false, true)) { 
        return; // Ping in progress - nothing to do
    }
    // we are "in" - we get to Ping
    Server[] allServers = null;
    boolean[] results = null;
    Lock allLock = null;
    Lock upLock = null;
    try {
        /*
         * The readLock should be free unless an addServer operation is
         * going on...
         */
        allLock = allServerLock.readLock();
        allLock.lock();
        allServers = allServerList.toArray(new Server[allServerList.size()]);
        allLock.unlock();
        int numCandidates = allServers.length;
    //对所有的 server进行ping操作后返回一个boolean类型数组
        results = pingerStrategy.pingServers(ping, allServers);
        final List<Server> newUpList = new ArrayList<Server>();
        final List<Server> changedServers = new ArrayList<Server>();
        for (int i = 0; i < numCandidates; i++) {
            boolean isAlive = results[i];
            Server svr = allServers[i];
            boolean oldIsAlive = svr.isAlive();
            svr.setAlive(isAlive);
            if (oldIsAlive != isAlive) {
                changedServers.add(svr);
                logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
            }
            if (isAlive) {
          //把在线状态的Server加入新的ServerList
                newUpList.add(svr);
            }
        }
        upLock = upServerLock.writeLock();
        upLock.lock();
    //覆盖掉旧的ServerList
        upServerList = newUpList;
        upLock.unlock();
        //这里的ServerStatusChangeListener没有找到实现类
        notifyServerStatusChangeListener(changedServers);
    } finally {
        pingInProgress.set(false);
    }
}

可见对于ping失败的Server,会从ServerList移除。


聊聊rule


上面讲chooseServer时我们知道,代码在ZoneAwareLoadBalancer的父类BaseLoadBalancer中,代码如下:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
  //内容是BasicCounter{config=MonitorConfig{name=LoadBalancer_ChooseServer, tags=COUNTER, policy=DefaultPublishingPolicy}, count=0}
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
        //这里的rule是ZoneAvoidanceRule,但是choose方法在父类PredicateBasedRule
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

这里的Rule用于选择从服务列表上返回哪个Server。Rule的继承关系如下图所示:

微信图片_20221212171238.png

我们来看一下PredicateBasedRule的choose方法,代码如下:

public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer();
  //这里getPredicate()返回AbstractServerPredicate
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

AbstractServerPredicate的chooseRoundRobinAfterFiltering方法如下:

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    //这里的loadBalancerKey是null
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
    return new AbstractServerPredicate() {
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
        public boolean apply(PredicateKey input) {
            return p.apply(input);
        }            
    };        
}

上面方法的p有2个Predicates,debug的内容如下:

p = {Predicates$AndPredicate@14490} "Predicates.and(com.netflix.loadbalancer.ZoneAvoidancePredicate@70d2b344,com.netflix.loadbalancer.AvailabilityPredicate@7380394)"
 components = {ArrayList@14673}  size = 2
  0 = {ZoneAvoidancePredicate@14675} 
  1 = {AvailabilityPredicate@14676}

我们看一下他们的apply方法,先看AvailabilityPredicate:

public boolean apply(@Nullable PredicateKey input) {
    LoadBalancerStats stats = getLBStats();
    if (stats == null) {
        return true;
    }
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}
private boolean shouldSkipServer(ServerStats stats) {  
    //根据ServerStats进行判断      
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
            || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
        return true;
    }
    return false;
}

我们再看一下ZoneAvoidancePredicate的apply方法,这段代码的注解很清晰,zone数量小于等于一个或者取不到stats返回true,如果可用的zone里面没有这个server,返回失败:

public boolean apply(@Nullable PredicateKey input) {
    if (!ENABLED.get()) {
        return true;
    }
    String serverZone = input.getServer().getZone();
    if (serverZone == null) {
        // there is no zone information from the server, we do not want to filter
        // out this server
        return true;
    }
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) {
        // The server zone is unknown to the load balancer, do not filter it out 
        return true;
    }
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null) {
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
}

这2个Predicate是在CompositePredicate的建造者中传入的,代码如下:

Builder(AbstractServerPredicate ...primaryPredicates) {
    toBuild = new CompositePredicate();
    Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
    toBuild.delegate =  AbstractServerPredicate.ofKeyPredicate(chain);                
}

而调用这个建造者函数的地方是在ZoneAvoidanceRule初始化的时候传入的

public ZoneAvoidanceRule() {
    super();
    ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
    AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
    compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
    return CompositePredicate.withPredicates(p1, p2)
                         .addFallbackPredicate(p2)
                         .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                         .build();
}

当然,Predicate也是支持自定义配置的,感兴趣的大家可以研究,获取配置的代码如下:

public void initWithNiwsConfig(IClientConfig clientConfig) {
    ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
    AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
    compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}

ribbon重试


ribbon重试功能对服务的优雅发布有一定好处。这里首先要提示一下,ribbon负载均衡要依赖于spring的retry,需要pom中添加下面依赖:

<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.2.5.RELEASE</version>
</dependency

重试的配置如下:

#Ribbon缓存更新周期默认30s,改为3s
springboot-mybatis.ribbon.ServerListRefreshInterval=3
#同一台实例最大重试次数,不包括首次调用
springboot-mybatis.ribbon.MaxAutoRetries=1
#重试负载均衡其他的实例最大重试次数,不包括首次调用
springboot-mybatis.ribbon.MaxAutoRetriesNextServer=1
#对所有操作请求都进行重试,如果改为true,对post也重试
springboot-mybatis.ribbon.OkToRetryOnAllOperations=false
springboot-mybatis.ribbon.retryableStatusCodes=404,408,502,500

下面我给出一个UML类图:

微信图片_20221212171636.png

重试的核心代码在LoadBalancerCommand类submit方法,下面的原生注释大家一定能要看一下:

/**
 * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
 * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
 * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
 * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
 * result during execution and retries will be emitted.
 */
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();
    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }
    //上面配置的MaxAutoRetries和MaxAutoRetriesNextServer,这里都是1
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
    // Use the load balancer
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                // Called for each server being selected
                public Observable<T> call(Server server) {
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);
                    // Called for each attempt and retry
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    loadBalancerContext.noteOpenConnection(stats);
                                    //省略部分代码                                                                  
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            recordStats(tracer, stats, entity, null);
                                            // TODO: What to do if onNext or onError are never called?
                                        }
                                        @Override
                                        public void onError(Throwable e) {
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }
                                        @Override
                                        public void onNext(T entity) {
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            
                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });
                    if (maxRetrysSame > 0) 
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });
    //初始化会走到这里
    if (maxRetrysNext > 0 && server == null) 
      //retryPolicy会判断错误码类型
        o = o.retry(retryPolicy(maxRetrysNext, false));
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
          //重试次数超过后要抛出异常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

这个方法里面的retryPolicy会捕获异常并判断是不是RetriableException,而RetriableException就是我们配置的4个错误码:404,408,502,500


我们再看下RetryableFeignLoadBalancer的execute方法:

public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride)
    throws IOException {
  final Request.Options options;
  if (configOverride != null) {
    RibbonProperties ribbon = RibbonProperties.from(configOverride);
    options = new Request.Options(
        ribbon.connectTimeout(this.connectTimeout),
        ribbon.readTimeout(this.readTimeout));
  }
  else {
    options = new Request.Options(this.connectTimeout, this.readTimeout);
  }
  final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
  RetryTemplate retryTemplate = new RetryTemplate();
  BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
  retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
  RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName());
  if (retryListeners != null && retryListeners.length != 0) {
    retryTemplate.setListeners(retryListeners);
  }
  //retryPolicy是RibbonLoadBalancedRetryPolicy,里面的lbContext里面有我们的参数
  retryTemplate.setRetryPolicy(retryPolicy == null ? new NeverRetryPolicy()
      : new FeignRetryPolicy(request.toHttpRequest(), retryPolicy, this, this.getClientName()));
  //retryTemplate.execute方法里失败后会直接进行重试,用while循环进行重试
  return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() {
    @Override
    public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException {
      Request feignRequest = null;
      //on retries the policy will choose the server and set it in the context
      //extract the server and update the request being made
      if (retryContext instanceof LoadBalancedRetryContext) {
        ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance();
        if (service != null) {
          feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest();
        }
      }
      if (feignRequest == null) {
        feignRequest = request.toRequest();
      }
      Response response = request.client().execute(feignRequest, options);
      /**
       * 判断错误码类型是不是我们配置的,是的话抛出RetryableStatusCodeException,
       * 这个是RetryableStatusCodeException的子类
       */
      if (retryPolicy != null && retryPolicy.retryableStatusCode(response.status())) {
        byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream());
        response.close();
        throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response,
            byteArray, request.getUri());
      }
      return new RibbonResponse(request.getUri(), response);
    }
  }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() {
    @Override
    protected RibbonResponse createResponse(Response response, URI uri) {
      return new RibbonResponse(uri, response);
    }
  });
}

总结


ribbon作为openfeign的负载均衡器,是懒加载的,只有openfeign第一次获取服务列表的时候,才会初始化LoadBalancer,也才会初始化ping和缓存刷新任务。


ribbon重试对优雅发布会有作用,所以建议把MaxAutoRetries设置为0,即当前服务失败时直接请求下一个服务。


ribbon的源代码比较复杂,有一些RxJava的基础会容易一些。


ribbon的源代码非常烧脑,有问题欢迎大家交流。

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
5月前
|
负载均衡 算法 网络协议
Ribbon 负载均衡源码解读
Ribbon 负载均衡源码解读
62 15
Ribbon 负载均衡源码解读
|
5月前
|
负载均衡 Java API
Feign 进行rpc 调用时使用ribbon负载均衡源码解析
Feign 进行rpc 调用时使用ribbon负载均衡源码解析
76 11
|
设计模式 负载均衡 Apache
SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter
RibbonCommandContext 在run方法中构建了一个 RibbonCommandContext Ribbon的上下文对象,然后调用 forward 方法转发请求 ,通过 setResponse方法设置结果
181 0
|
7月前
|
负载均衡 算法 Java
SpringCloud负载均衡源码解析 | 带你从表层一步步剖析Ribbon组件如何实现负载均衡功能
SpringCloud负载均衡源码解析 | 带你从表层一步步剖析Ribbon组件如何实现负载均衡功能
168 0
|
设计模式 负载均衡 Apache
二十.SpringCloud源码剖析-Zuul使用Ribbon负载均衡-RibbonRoutingFilter
经过前面几章的学习,我们对Zuul的的详细执行流程,以及内置的Filter都有了一些认识,本篇文章是针对RibbonRoutingFilter做一个详细的分析,跟一下它是如何使用Ribbon对下游的微服务进行负载均衡的。注意:这篇文章是在 《zuul的执行流程》基础上进行延伸的,另外Ribbon的原理见:《Ribbon负载均衡原理》
|
缓存 负载均衡 算法
十五.SpringCloud源码剖析-Ribbon工作流程
Ribbon是由Netflix公司开源的一个客户端负载均衡器,主要功能是实现服务之间的负载均衡调用,内置丰富的负载均衡算法,本章意在探讨Ribbon的核心工作流程,Ribbon基本使用请看《[SpringCloud极简入门-客户端负载均衡Ribbon](https://blog.csdn.net/u014494148/article/details/105002095)》
|
缓存 负载均衡 算法
十四.SpringCloud源码剖析-Ribbon的初始化配置
前面我们分析了Eureka的源码,接下来这一章我们来研究一下Ribbon,本篇文章主要是对Ribbon的相关组件做一个认识,以及它的初始化配置做一个分析。
|
Java Spring
Spring Cloud Alibaba源码 - 21 Ribbon 源码解析
Spring Cloud Alibaba源码 - 21 Ribbon 源码解析
120 0
|
负载均衡 Cloud Native 算法
【微服务七】Ribbon负载均衡策略之BestAvailableRule源码深度剖析
【微服务七】Ribbon负载均衡策略之BestAvailableRule源码深度剖析
396 0
【微服务七】Ribbon负载均衡策略之BestAvailableRule源码深度剖析
|
存储 负载均衡 Cloud Native
【云原生&微服务八】Ribbon负载均衡策略之WeightedResponseTimeRule源码剖析(响应时间加权)
【云原生&微服务八】Ribbon负载均衡策略之WeightedResponseTimeRule源码剖析(响应时间加权)
282 0
【云原生&微服务八】Ribbon负载均衡策略之WeightedResponseTimeRule源码剖析(响应时间加权)