4. 负责选取Server的接口ILoadBalancer
ILoadBalancer负责存储并更新服务实例列表,并调用IRule(即根据配置的负载均衡规则)来返回Server以供于服务调用
这里,我们只看默认的ZoneAwareLoadBalancer相关的
AbstractLoadBalancer.java
public abstract class AbstractLoadBalancer implements ILoadBalancer { public enum ServerGroup{ ALL, STATUS_UP, STATUS_NOT_UP } public Server chooseServer() { return chooseServer(null); } public abstract List<Server> getServerList(ServerGroup serverGroup); public abstract LoadBalancerStats getLoadBalancerStats(); }
AbstractLoadBalancer
在原有ILoadBalancer
接口基础上,增加了按照分组获取Server的方法,有ALL,STATUS_UP,STATUS_NOT_UP三种组别。同时还增加了LoadBalancerStats
,记录每次请求的Server
的负载均衡统计数据ServerStat
,以及其他一些的实时记录信息。之后在介绍具体的LoadBalancer实现的时候,会用到
BaseLoadBalancer.java
BaseLoadBalancer
是负载均衡的基本实现,包含如下元素:
1)两个列表:所有Server列表,还有所有Up Server的列表:
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>()); //更新两个列表的锁 protected ReadWriteLock allServerLock = new ReentrantReadWriteLock(); protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();
2)定时PING任务相关的元素,为了定时检查Server是否UP的任务:
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy(); protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY; protected IPing ping = null; protected Timer lbTimer = null; protected int pingIntervalSeconds = 10; protected int maxTotalPingTimeSeconds = 5; protected Comparator<Server> serverComparator = new ServerComparator(); protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
一般的在构造一个BaseLoadBalancer
时候,会调用setupPingTask
构造一个定时ping的任务:
void setupPingTask() { //判断是否需要ping if (canSkipPing()) { return; } //关闭之前已经开启的定时ping的任务 if (lbTimer != null) { lbTimer.cancel(); } //设置PingTask,默认是每10秒一次 lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); //这个可能有些多余,因为定时任务也是立即执行,就会ping forceQuickPing(); } //如果ping为null,或者为DummyPing,就不用定时Ping了 private boolean canSkipPing() { if (ping == null || ping.getClass().getName().equals(DummyPing.class.getName())) { // default ping, no need to set up timer return true; } else { return false; } }
Pinger
包含了如何去Ping所有的Server的逻辑:
class Pinger { //ping每个server的方式,默认就是普通遍历 private final IPingStrategy pingerStrategy; public Pinger(IPingStrategy pingerStrategy) { this.pingerStrategy = pingerStrategy; } public void runPinger() throws Exception { //如果设置失败,则证明,当前Ping的任务正在执行中(执行时间大于定时任务的周期) if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // we are "in" - we get to Ping Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { //读取所有Server的列表,需要获取读锁 allLock = allServerLock.readLock(); allLock.lock(); //先读取之后再遍历,减少锁时间 allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); //ping每个Server int numCandidates = allServers.length; results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); //更新所有Server的状态 for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } //更新Up Server列表需要获取写锁 upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); //通知监听Server状态变化的Listener notifyServerStatusChangeListener(changedServers); } finally { //任务结束,需要设置状态位 pingInProgress.set(false); } } }
IPingStrategy目前只有一种默认实现,就是SerialPingStrategy,依次串行遍历Ping每个Server:
private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates); for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ try { //因为在SpringCloud的环境下,默认ping是基于访问本地内存的Eureka缓存的列表,所有串行挨个ping也不会有太大的性能影响 if (ping != null) { results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; } }
3)记录每次负载均衡统计数据的LoadBalancerStats 这个暂且不表,在后面具体介绍负载均衡选取规则的时候,会用到这里面的统计数据
protected LoadBalancerStats lbStats;
4)用于预热连接的逻辑类PrimeConnections
private PrimeConnections primeConnections; private volatile boolean enablePrimingConnections = false;
但是这个配置默认是不开启的。
5)监听Server列表变化和Server状态变化的Listener
private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>(); private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
选择Server的方法非常简单,就是调用IRule来选取:
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
DynamicServerListLoadBalancer.java
DynamicServerListLoadBalancer在BaseLoadBalancer的基础上,增加了之前提到的:服务实例列表维护机制实现的接口ServerList、服务实例列表更新机制实现的接口ServerListUpdater和服务实例列表过滤机制ServerListFilter。利用这些元素实现服务实例列表的更新
ZoneAwareLoadBalancer.java
ZoneAwareLoadBalancer则是进一步增加了对于Zone的感知。利用一个ConcurrentHashMap来维护不同Zone下的负载均衡数据,并且不同的Zone可以设置不同的Rule:
private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
我们可以通过setRule方法给不同的Zone设置不同的IRule
public void setRule(IRule rule) { super.setRule(rule); if (balancers != null) { for (String zone: balancers.keySet()) { balancers.get(zone).setRule(cloneRule(rule)); } } }
查看负载均衡方法:
public Server chooseServer(Object key) { //如果未启用(默认是启用的,可以通过ZoneAwareNIWSDiscoveryLoadBalancer.enabled修改),或者可用区域不大于1,则调用BaseLoadBalancer的choose方法选取 if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); //获取当前负载均衡数据的快照 Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); //获取最大负载阈值配置 if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } //获取熔断实例比例配置 if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } //获取当前有效的可用区域(利用最大负载阈值配置和熔断实例比例配置) Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { //随机选取zone String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); //在选取的zone下选取server server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }
如何筛选的AvailableZone:
public static Set<String> getAvailableZones( Map<String, ZoneSnapshot> snapshot, double triggeringLoad, double triggeringBlackoutPercentage) { if (snapshot.isEmpty()) { return null; } Set<String> availableZones = new HashSet<String>(snapshot.keySet()); if (availableZones.size() == 1) { return availableZones; } Set<String> worstZones = new HashSet<String>(); double maxLoadPerServer = 0; boolean limitedZoneAvailability = false; for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) { String zone = zoneEntry.getKey(); ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); int instanceCount = zoneSnapshot.getInstanceCount(); //可用区域数量为0,这个可用区需要被移除 if (instanceCount == 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { double loadPerServer = zoneSnapshot.getLoadPerServer(); //熔断的实例个数/实例数量大于triggeringBlackoutPercentage,这个可用区需要被移除 if (((double) zoneSnapshot.getCircuitTrippedCount()) / instanceCount >= triggeringBlackoutPercentage || loadPerServer < 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { //寻找平均负载最高的可用区,将它添加到worstZones集合 if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) { // they are the same considering double calculation // round error worstZones.add(zone); } else if (loadPerServer > maxLoadPerServer) { maxLoadPerServer = loadPerServer; worstZones.clear(); worstZones.add(zone); } } } } //如果最大负载小于负载阈值,并且没有被移除过可用区,就直接返回当前结果 if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) { // zone override is not needed here return availableZones; } String zoneToAvoid = randomChooseZone(snapshot, worstZones); if (zoneToAvoid != null) { availableZones.remove(zoneToAvoid); } return availableZones; }