Spring Cloud Ribbon 全解 (4) - 基本组件实现源码(2)

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: Spring Cloud Ribbon 全解 (4) - 基本组件实现源码(2)

4. 负责选取Server的接口ILoadBalancer


ILoadBalancer负责存储并更新服务实例列表,并调用IRule(即根据配置的负载均衡规则)来返回Server以供于服务调用


微信图片_20220624113022.jpg


这里,我们只看默认的ZoneAwareLoadBalancer相关的


AbstractLoadBalancer.java


public abstract class AbstractLoadBalancer implements ILoadBalancer {
    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }
    public Server chooseServer() {
      return chooseServer(null);
    }
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    public abstract LoadBalancerStats getLoadBalancerStats();    
}

AbstractLoadBalancer在原有ILoadBalancer接口基础上,增加了按照分组获取Server的方法,有ALL,STATUS_UP,STATUS_NOT_UP三种组别。同时还增加了LoadBalancerStats,记录每次请求的Server的负载均衡统计数据ServerStat,以及其他一些的实时记录信息。之后在介绍具体的LoadBalancer实现的时候,会用到


BaseLoadBalancer.java


BaseLoadBalancer是负载均衡的基本实现,包含如下元素:

1)两个列表:所有Server列表,还有所有Up Server的列表:

protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
//更新两个列表的锁
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();


2)定时PING任务相关的元素,为了定时检查Server是否UP的任务:

private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
protected Timer lbTimer = null;
protected int pingIntervalSeconds = 10;
protected int maxTotalPingTimeSeconds = 5;
protected Comparator<Server> serverComparator = new ServerComparator();
protected AtomicBoolean pingInProgress = new AtomicBoolean(false);

一般的在构造一个BaseLoadBalancer时候,会调用setupPingTask构造一个定时ping的任务:


void setupPingTask() {
    //判断是否需要ping
    if (canSkipPing()) {
        return;
    }
    //关闭之前已经开启的定时ping的任务
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    //设置PingTask,默认是每10秒一次
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
            true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    //这个可能有些多余,因为定时任务也是立即执行,就会ping
    forceQuickPing();
}
//如果ping为null,或者为DummyPing,就不用定时Ping了
private boolean canSkipPing() {
    if (ping == null
            || ping.getClass().getName().equals(DummyPing.class.getName())) {
        // default ping, no need to set up timer
        return true;
    } else {
        return false;
    }
}


Pinger包含了如何去Ping所有的Server的逻辑:

class Pinger {
    //ping每个server的方式,默认就是普通遍历
    private final IPingStrategy pingerStrategy;
    public Pinger(IPingStrategy pingerStrategy) {
        this.pingerStrategy = pingerStrategy;
    }
    public void runPinger() throws Exception {
        //如果设置失败,则证明,当前Ping的任务正在执行中(执行时间大于定时任务的周期)
        if (!pingInProgress.compareAndSet(false, true)) { 
            return; // Ping in progress - nothing to do
        }
        // we are "in" - we get to Ping
        Server[] allServers = null;
        boolean[] results = null;
        Lock allLock = null;
        Lock upLock = null;
        try {
            //读取所有Server的列表,需要获取读锁
            allLock = allServerLock.readLock();
            allLock.lock();
            //先读取之后再遍历,减少锁时间
            allServers = allServerList.toArray(new Server[allServerList.size()]);
            allLock.unlock();
            //ping每个Server
            int numCandidates = allServers.length;
            results = pingerStrategy.pingServers(ping, allServers);
            final List<Server> newUpList = new ArrayList<Server>();
            final List<Server> changedServers = new ArrayList<Server>();
            //更新所有Server的状态
            for (int i = 0; i < numCandidates; i++) {
                boolean isAlive = results[i];
                Server svr = allServers[i];
                boolean oldIsAlive = svr.isAlive();
                svr.setAlive(isAlive);
                if (oldIsAlive != isAlive) {
                    changedServers.add(svr);
                    logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                    name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                }
                if (isAlive) {
                    newUpList.add(svr);
                }
            }
            //更新Up Server列表需要获取写锁
            upLock = upServerLock.writeLock();
            upLock.lock();
            upServerList = newUpList;
            upLock.unlock();
            //通知监听Server状态变化的Listener
            notifyServerStatusChangeListener(changedServers);
        } finally {
            //任务结束,需要设置状态位
            pingInProgress.set(false);
        }
    }
}

IPingStrategy目前只有一种默认实现,就是SerialPingStrategy,依次串行遍历Ping每个Server:


private static class SerialPingStrategy implements IPingStrategy {
    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
        int numCandidates = servers.length;
        boolean[] results = new boolean[numCandidates];
        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);
        for (int i = 0; i < numCandidates; i++) {
            results[i] = false; /* Default answer is DEAD. */
            try {
                //因为在SpringCloud的环境下,默认ping是基于访问本地内存的Eureka缓存的列表,所有串行挨个ping也不会有太大的性能影响
                if (ping != null) {
                    results[i] = ping.isAlive(servers[i]);
                }
            } catch (Exception e) {
                logger.error("Exception while pinging Server: '{}'", servers[i], e);
            }
        }
        return results;
    }
}


3)记录每次负载均衡统计数据的LoadBalancerStats 这个暂且不表,在后面具体介绍负载均衡选取规则的时候,会用到这里面的统计数据

protected LoadBalancerStats lbStats;


4)用于预热连接的逻辑类PrimeConnections

private PrimeConnections primeConnections;
private volatile boolean enablePrimingConnections = false;

但是这个配置默认是不开启的。


5)监听Server列表变化和Server状态变化的Listener

private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();

选择Server的方法非常简单,就是调用IRule来选取:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}


DynamicServerListLoadBalancer.java


DynamicServerListLoadBalancer在BaseLoadBalancer的基础上,增加了之前提到的:服务实例列表维护机制实现的接口ServerList、服务实例列表更新机制实现的接口ServerListUpdater和服务实例列表过滤机制ServerListFilter。利用这些元素实现服务实例列表的更新


ZoneAwareLoadBalancer.java


ZoneAwareLoadBalancer则是进一步增加了对于Zone的感知。利用一个ConcurrentHashMap来维护不同Zone下的负载均衡数据,并且不同的Zone可以设置不同的Rule:

private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();

我们可以通过setRule方法给不同的Zone设置不同的IRule

public void setRule(IRule rule) {
    super.setRule(rule);
    if (balancers != null) {
        for (String zone: balancers.keySet()) {
            balancers.get(zone).setRule(cloneRule(rule));
        }
    }
}

查看负载均衡方法:

public Server chooseServer(Object key) {
    //如果未启用(默认是启用的,可以通过ZoneAwareNIWSDiscoveryLoadBalancer.enabled修改),或者可用区域不大于1,则调用BaseLoadBalancer的choose方法选取
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    Server server = null;
    try {
        LoadBalancerStats lbStats = getLoadBalancerStats();
        //获取当前负载均衡数据的快照
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        //获取最大负载阈值配置
        if (triggeringLoad == null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
        }
        //获取熔断实例比例配置
        if (triggeringBlackoutPercentage == null) {
            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                    "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        //获取当前有效的可用区域(利用最大负载阈值配置和熔断实例比例配置)
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            //随机选取zone
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                //在选取的zone下选取server
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {
        logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {
        return server;
    } else {
        logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

如何筛选的AvailableZone:

public static Set<String> getAvailableZones(
        Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
        double triggeringBlackoutPercentage) {
    if (snapshot.isEmpty()) {
        return null;
    }
    Set<String> availableZones = new HashSet<String>(snapshot.keySet());
    if (availableZones.size() == 1) {
        return availableZones;
    }
    Set<String> worstZones = new HashSet<String>();
    double maxLoadPerServer = 0;
    boolean limitedZoneAvailability = false;
    for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
        String zone = zoneEntry.getKey();
        ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
        int instanceCount = zoneSnapshot.getInstanceCount();
        //可用区域数量为0,这个可用区需要被移除
        if (instanceCount == 0) {
            availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {
            double loadPerServer = zoneSnapshot.getLoadPerServer();
            //熔断的实例个数/实例数量大于triggeringBlackoutPercentage,这个可用区需要被移除
            if (((double) zoneSnapshot.getCircuitTrippedCount())
                    / instanceCount >= triggeringBlackoutPercentage
                    || loadPerServer < 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                //寻找平均负载最高的可用区,将它添加到worstZones集合
                if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                    // they are the same considering double calculation
                    // round error
                    worstZones.add(zone);
                } else if (loadPerServer > maxLoadPerServer) {
                    maxLoadPerServer = loadPerServer;
                    worstZones.clear();
                    worstZones.add(zone);
                }
            }
        }
    }
    //如果最大负载小于负载阈值,并且没有被移除过可用区,就直接返回当前结果
    if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
        // zone override is not needed here
        return availableZones;
    }
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
    if (zoneToAvoid != null) {
        availableZones.remove(zoneToAvoid);
    }
    return availableZones;
}


相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
2月前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
1月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
54 2
|
2月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
71 9
|
2月前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
114 5
|
1月前
|
负载均衡 Java Nacos
常见的Ribbon/Spring LoadBalancer的负载均衡策略
自SpringCloud 2020版起,Ribbon被弃用,转而使用Spring Cloud LoadBalancer。Ribbon支持轮询、随机、加权响应时间和重试等负载均衡策略;而Spring Cloud LoadBalancer则提供轮询、随机及Nacos负载均衡策略,基于Reactor实现,更高效灵活。
79 0
|
3月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
188 5
|
3月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
3月前
|
XML Java 数据格式
手动开发-简单的Spring基于注解配置的程序--源码解析
手动开发-简单的Spring基于注解配置的程序--源码解析
56 0
|
2月前
|
负载均衡 监控 网络协议
SpringCloud之Ribbon使用
通过以上步骤,就可以在Spring Cloud项目中有效地使用Ribbon来实现服务调用的负载均衡,提高系统的可靠性和性能。在实际应用中,根据具体的业务场景和需求选择合适的负载均衡策略,并进行相应的配置和优化,以确保系统的稳定运行。
86 15