Spring Cloud Ribbon 全解 (5) - 基本组件实现源码(3)

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: Spring Cloud Ribbon 全解 (5) - 基本组件实现源码(3)

5. 负载均衡选取规则实现的接口IRule


微信图片_20220624113151.jpg


我们这里只看默认的实现ZoneAvoidanceRule相关的

IRule离不开负载均衡数据,这个数据如之前所说,是ILoadBalancer的实现BaseLoadBalancer一部分。所以对于IRule的抽象类,需要设置ILoadBalancer来获取负载均衡统计数据:


AbstractLoadBalancerRule


public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
    private ILoadBalancer lb;
    @Override
    public void setLoadBalancer(ILoadBalancer lb){
        this.lb = lb;
    }
    @Override
    public ILoadBalancer getLoadBalancer(){
        return lb;
    }      
}


RoundRobinRule.java


是最常见最基本的负载均衡规则-轮询的实现:

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }
    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();
        //如果没有Server或者没有UP状态的Server
        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }
        //检查当前轮询的Server是否是Alive并且准备好服务的状态
        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }
        // Next.
        server = null;
    }
    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

总结起来就是从所有Server中轮询出下一个可用的


ClientConfigEnabledRoundRobinRule.java


这个其实就是包含RoundRobinRule同时添加了一些配置方法的


ZoneAvoidanceRule.java


之前在ZoneAwareLoadBalancer的如何筛选的AvailableZone部分,已经介绍了这个Rule的核心逻辑,其实就是获取可用区域之后,在其中的Server轮询


6. 检查实例是否存活实现的接口IPing


微信图片_20220624113208.jpg


NoOpPing.java


默认IPing的实现就是这个,固定返回true

public class NoOpPing implements IPing {
    @Override
    public boolean isAlive(Server server) {
        return true;
    }
}


PingConstant.java


其实就是固定返回true或者false

public class PingConstant implements IPing {
  boolean constant = true;
  public void setConstant(String constantStr) {
      constant = (constantStr != null) && (constantStr.toLowerCase().equals("true"));
  }
  public void setConstant(boolean constant) {
      this.constant = constant;
  }
  public boolean getConstant() {
      return constant;
  }
  public boolean isAlive(Server server) {
      return constant;
  }
}


PingUrl.java


即向Server的url发送一次get请求,若成功(http相应状态码为200),则返回true

public boolean isAlive(Server server) {
    String urlStr = "";
    if (this.isSecure) {
        urlStr = "https://";
    } else {
        urlStr = "http://";
    }
    urlStr = urlStr + server.getId();
    urlStr = urlStr + this.getPingAppendString();
    boolean isAlive = false;
    HttpClient httpClient = new DefaultHttpClient();
    HttpUriRequest getRequest = new HttpGet(urlStr);
    String content = null;
    try {
        HttpResponse response = httpClient.execute(getRequest);
        content = EntityUtils.toString(response.getEntity());
        isAlive = response.getStatusLine().getStatusCode() == 200;
        if (this.getExpectedContent() != null) {
            LOGGER.debug("content:" + content);
            if (content == null) {
                isAlive = false;
            } else if (content.equals(this.getExpectedContent())) {
                isAlive = true;
            } else {
                isAlive = false;
            }
        }
    } catch (IOException var11) {
        var11.printStackTrace();
    } finally {
        getRequest.abort();
    }
    return isAlive;
}


AbstractLoadBalancerPing.java


这个抽象类的目的在于提供基于不同LoadBalancer的Ping实现

public abstract class AbstractLoadBalancerPing implements IPing, IClientConfigAware{
    AbstractLoadBalancer lb;
    @Override
    public boolean isAlive(Server server) {
        return true;
    }
    public void setLoadBalancer(AbstractLoadBalancer lb){
        this.lb = lb;
    }
    public AbstractLoadBalancer getLoadBalancer(){
        return lb;
    }
}


DummyPing.java


基于LoadBalancer但是直接返回true

public class DummyPing extends AbstractLoadBalancerPing {
    public DummyPing() {
    }
    public boolean isAlive(Server server) {
        return true;
    }
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}


NIWSDiscoveryPing.java


这个是针对Eureka作为发现服务的中间件环境下的ping,就是检查对应实例的InstantceStatus是否为UP

public boolean isAlive(Server server) {
    boolean isAlive = true;
    if (server!=null && server instanceof DiscoveryEnabledServer){
        DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;              
        InstanceInfo instanceInfo = dServer.getInstanceInfo();
        if (instanceInfo!=null){                  
            InstanceStatus status = instanceInfo.getStatus();
            if (status!=null){
                isAlive = status.equals(InstanceStatus.UP);
            }
        }
    }
    return isAlive;
}


7. 服务实例列表更新机制实现的接口ServerListUpdater


微信图片_20220624113242.jpg


PollingServerListUpdater.java


这个是ServerListUpdater的默认基本实现,如果你不配置,那么默认的ServerListUpdater就是这个,基本组成:

//核心调度线程池LazyHolder._serverListRefreshExecutor(这个LazyHolder只是一个加了些配置包装和进程启动还有进程停止的)
private static ScheduledThreadPoolExecutor getRefreshExecutor() {
    return LazyHolder._serverListRefreshExecutor;
}
//Updater是否是激活的状态位
private final AtomicBoolean isActive = new AtomicBoolean(false);
//上次更新时间
private volatile long lastUpdated = System.currentTimeMillis();
//第一次更新延迟
private final long initialDelayMs;
//定时更新延迟
private final long refreshIntervalMs;
//定时调度的返回,为了能停止所以记录到这个变量
private volatile ScheduledFuture<?> scheduledFuture;


其实他就是一个定时读取ServerList的刷新机制,我们来回忆下之前在Eureka章节中提到过的EurekaServer -> 服务消费者EurekaClient,SpringCloud环境下服务消费者调用一般用Ribbon做负载均衡,从Eureka所有服务所有实例缓存到Ribbon某个服务所有实例缓存,也是有定时任务,每隔Ribbon服务实例列表刷新时间同步,这个服务实例列表刷新时间ribbon.ServerListRefreshInterval就是配置这里的refreshIntervalMs

@Override
public synchronized void start(final UpdateAction updateAction) {
    //保证只能启动一次
    if (isActive.compareAndSet(false, true)) {
        //定时更新任务定义,其实就是执行updateAction以及更新最后更新时间
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };
        //调度,记录
        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}
@Override
public synchronized void stop() {
    //stop就是停止调度
    if (isActive.compareAndSet(true, false)) {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
    } else {
        logger.info("Not active, no-op");
    }
}
@Override
public String getLastUpdate() {
    return new Date(lastUpdated).toString();
}
@Override
public long getDurationSinceLastUpdateMs() {
    return System.currentTimeMillis() - lastUpdated;
}
@Override
public int getNumberMissedCycles() {
    if (!isActive.get()) {
        return 0;
    }
    //通过时间差计算
    return (int) ((int) (System.currentTimeMillis() - lastUpdated) / refreshIntervalMs);
}
@Override
public int getCoreThreads() {
    if (isActive.get()) {
        if (getRefreshExecutor() != null) {
            return getRefreshExecutor().getCorePoolSize();
        }
    }
    return 0;
}


EurekaNotificationServerListUpdater.java


这个类是针对Eureka的Server列表更新,他并不是自己主动定时更新,而是在EurekaClient读取完服务列表之后,会trigger一个CacheRefreshedEvent:

EurekaClient更新服务列表的代码:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    //获取服务列表的代码略
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    // registry was fetched successfully, so return true
    return true;
}
protected void onCacheRefreshed() {
    fireEvent(new CacheRefreshedEvent());
}

EurekaNotificationServerListUpdater的ServerList更新机制就是在有这个CacheRefreshedEvent之后,触发从EurekaClient本地缓存中读取,而不是像PollingServerListUpdater定时主动获取。

@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        this.updateListener = new EurekaEventListener() {
            @Override
            public void onEvent(EurekaEvent event) {
                //监听CacheRefreshedEvent
                if (event instanceof CacheRefreshedEvent) {
                    //如果已经在更新,就不再触发了
                    if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                        logger.info("an update action is already queued, returning as no-op");
                        return;
                    }
                    try {
                        //更新,异步执行updateAction,执行完后更新lastUpdated
                        refreshExecutor.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    updateAction.doUpdate();
                                    lastUpdated.set(System.currentTimeMillis());
                                } catch (Exception e) {
                                    logger.warn("Failed to update serverList", e);
                                } finally {
                                    updateQueued.set(false);
                                }
                            }
                        });  // fire and forget
                    } catch (Exception e) {
                        logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                        updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                    }
                }
            }
        };
        if (eurekaClient == null) {
            eurekaClient = eurekaClientProvider.get();
        }
        if (eurekaClient != null) {
            eurekaClient.registerEventListener(updateListener);
        } else {
            logger.error("Failed to register an updateListener to eureka client, eureka client is null");
            throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
        }
    } else {
        logger.info("Update listener already registered, no-op");
    }
}
@Override
public synchronized void stop() {
    if (isActive.compareAndSet(true, false)) {
        if (eurekaClient != null) {
            eurekaClient.unregisterEventListener(updateListener);
        }
    } else {
        logger.info("Not currently active, no-op");
    }
}
@Override
public String getLastUpdate() {
    return new Date(lastUpdated.get()).toString();
}
@Override
public long getDurationSinceLastUpdateMs() {
    return System.currentTimeMillis() - lastUpdated.get();
}
@Override
public int getNumberMissedCycles() {
    return 0;
}
@Override
public int getCoreThreads() {
    if (isActive.get()) {
        if (refreshExecutor != null && refreshExecutor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) refreshExecutor).getCorePoolSize();
        }
    }
    return 0;
}


8.服务实例列表过滤机制ServerListFilter


微信图片_20220624113331.jpg


AbstractServerListFilter.java


所有的Filter都需要LoadBalancerStats来过滤。这个LoadBalancerStats就是记录每次负载均衡的结果成功还是失败还有每个Server被调用情况的统计数据。

public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
    private volatile LoadBalancerStats stats;
    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }
    public LoadBalancerStats getLoadBalancerStats() {
        return stats;
    }
}


ZoneAffinityServerListFilter.java


这个是默认的ServerListFilter,如果不进行配置,那么就是这个实现。

这个Filter根据Zone进行过滤,过滤掉不在同一个Zone的Server:

@Override
public List<T> getFilteredListOfServers(List<T> servers) {
    if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
        //利用ZoneAffinityPredicate进行过滤
        List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
        //检查是否应该过滤        
        if (shouldEnableZoneAffinity(filteredServers)) {
            return filteredServers;
        } else if (zoneAffinity) {
            overrideCounter.increment();
        }
    }
    return servers;
}

ZoneAffinityPredicate就是检查Zone是否与当前配置中的Zone一致:

public class ZoneAffinityPredicate extends AbstractServerPredicate {
    private final String zone = ConfigurationManager.getDeploymentContext().getValue(ContextKey.zone);
    public ZoneAffinityPredicate() {        
    }
    @Override
    public boolean apply(PredicateKey input) {
        Server s = input.getServer();
        String az = s.getZone();
        if (az != null && zone != null && az.toLowerCase().equals(zone.toLowerCase())) {
            return true;
        } else {
            return false;
        }
    }
}

在什么情况下,不过滤呢?

private boolean shouldEnableZoneAffinity(List<T> filtered) {    
    //不启用Zone感知的情况下,就不过滤,默认就是不启用
    if (!zoneAffinity && !zoneExclusive) {
        return false;
    }
    if (zoneExclusive) {
        return true;
    }
    LoadBalancerStats stats = getLoadBalancerStats();
    if (stats == null) {
        return zoneAffinity;
    } else {
        logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
        ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
        double loadPerServer = snapshot.getLoadPerServer();
        int instanceCount = snapshot.getInstanceCount();            
        int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
        //在被断路的Server占比超过一定百分比或者可用Server小于一定个数的时候,每个Server负载超过一定值的时候,就不过滤了
        if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                || loadPerServer >= activeReqeustsPerServerThreshold.get()
                || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
            logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                    new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
            return false;
        } else {
            return true;
        }
    }
}


ServerListSubsetFilter.java


在Server个数超过一定量(例如好几百个Server)的时候,我们也许并不想在这几百个Server负载均衡,我们每次随机取一个子集做负载均衡,这个Filter就是ServerListSubsetFilter,同时,一些不太健康的或者比较忙的Server也会被剔除。

主要有下面四个配置进行筛选:

//取子集的大小
private DynamicIntProperty sizeProp =
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.size", 20);
//至少要排除的Server所占百分比
private DynamicFloatProperty eliminationPercent =
        new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f);
//当在这个Server上负载均衡调用失败次数达到一定次数的时候就去掉这个Server
private DynamicIntProperty eliminationFailureCountThreshold =
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationFailureThresold", 0);
//当在这个Server上面的连接数超过一定个数的时候就去掉这个Server
private DynamicIntProperty eliminationConnectionCountThreshold =
        new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".Ser



相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
2月前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
1月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
54 2
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
1月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
58 5
|
1月前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
41 5
|
2月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
71 9
|
2月前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
114 5
|
1月前
|
负载均衡 Java Nacos
常见的Ribbon/Spring LoadBalancer的负载均衡策略
自SpringCloud 2020版起,Ribbon被弃用,转而使用Spring Cloud LoadBalancer。Ribbon支持轮询、随机、加权响应时间和重试等负载均衡策略;而Spring Cloud LoadBalancer则提供轮询、随机及Nacos负载均衡策略,基于Reactor实现,更高效灵活。
79 0
|
3月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
188 5