5. 负载均衡选取规则实现的接口IRule
我们这里只看默认的实现ZoneAvoidanceRule相关的
IRule离不开负载均衡数据,这个数据如之前所说,是ILoadBalancer的实现BaseLoadBalancer一部分。所以对于IRule的抽象类,需要设置ILoadBalancer来获取负载均衡统计数据:
AbstractLoadBalancerRule
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }
RoundRobinRule.java
是最常见最基本的负载均衡规则-轮询的实现:
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); //如果没有Server或者没有UP状态的Server if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } //检查当前轮询的Server是否是Alive并且准备好服务的状态 if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; }
总结起来就是从所有Server中轮询出下一个可用的
ClientConfigEnabledRoundRobinRule.java
这个其实就是包含RoundRobinRule同时添加了一些配置方法的
ZoneAvoidanceRule.java
之前在ZoneAwareLoadBalancer的如何筛选的AvailableZone部分,已经介绍了这个Rule的核心逻辑,其实就是获取可用区域之后,在其中的Server轮询
6. 检查实例是否存活实现的接口IPing
NoOpPing.java
默认IPing的实现就是这个,固定返回true
public class NoOpPing implements IPing { @Override public boolean isAlive(Server server) { return true; } }
PingConstant.java
其实就是固定返回true或者false
public class PingConstant implements IPing { boolean constant = true; public void setConstant(String constantStr) { constant = (constantStr != null) && (constantStr.toLowerCase().equals("true")); } public void setConstant(boolean constant) { this.constant = constant; } public boolean getConstant() { return constant; } public boolean isAlive(Server server) { return constant; } }
PingUrl.java
即向Server的url发送一次get请求,若成功(http相应状态码为200),则返回true
public boolean isAlive(Server server) { String urlStr = ""; if (this.isSecure) { urlStr = "https://"; } else { urlStr = "http://"; } urlStr = urlStr + server.getId(); urlStr = urlStr + this.getPingAppendString(); boolean isAlive = false; HttpClient httpClient = new DefaultHttpClient(); HttpUriRequest getRequest = new HttpGet(urlStr); String content = null; try { HttpResponse response = httpClient.execute(getRequest); content = EntityUtils.toString(response.getEntity()); isAlive = response.getStatusLine().getStatusCode() == 200; if (this.getExpectedContent() != null) { LOGGER.debug("content:" + content); if (content == null) { isAlive = false; } else if (content.equals(this.getExpectedContent())) { isAlive = true; } else { isAlive = false; } } } catch (IOException var11) { var11.printStackTrace(); } finally { getRequest.abort(); } return isAlive; }
AbstractLoadBalancerPing.java
这个抽象类的目的在于提供基于不同LoadBalancer
的Ping实现
public abstract class AbstractLoadBalancerPing implements IPing, IClientConfigAware{ AbstractLoadBalancer lb; @Override public boolean isAlive(Server server) { return true; } public void setLoadBalancer(AbstractLoadBalancer lb){ this.lb = lb; } public AbstractLoadBalancer getLoadBalancer(){ return lb; } }
DummyPing.java
基于LoadBalancer但是直接返回true
public class DummyPing extends AbstractLoadBalancerPing { public DummyPing() { } public boolean isAlive(Server server) { return true; } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { } }
NIWSDiscoveryPing.java
这个是针对Eureka作为发现服务的中间件环境下的ping,就是检查对应实例的InstantceStatus是否为UP
public boolean isAlive(Server server) { boolean isAlive = true; if (server!=null && server instanceof DiscoveryEnabledServer){ DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server; InstanceInfo instanceInfo = dServer.getInstanceInfo(); if (instanceInfo!=null){ InstanceStatus status = instanceInfo.getStatus(); if (status!=null){ isAlive = status.equals(InstanceStatus.UP); } } } return isAlive; }
7. 服务实例列表更新机制实现的接口ServerListUpdater
PollingServerListUpdater.java
这个是ServerListUpdater的默认基本实现,如果你不配置,那么默认的ServerListUpdater就是这个,基本组成:
//核心调度线程池LazyHolder._serverListRefreshExecutor(这个LazyHolder只是一个加了些配置包装和进程启动还有进程停止的) private static ScheduledThreadPoolExecutor getRefreshExecutor() { return LazyHolder._serverListRefreshExecutor; } //Updater是否是激活的状态位 private final AtomicBoolean isActive = new AtomicBoolean(false); //上次更新时间 private volatile long lastUpdated = System.currentTimeMillis(); //第一次更新延迟 private final long initialDelayMs; //定时更新延迟 private final long refreshIntervalMs; //定时调度的返回,为了能停止所以记录到这个变量 private volatile ScheduledFuture<?> scheduledFuture;
其实他就是一个定时读取ServerList的刷新机制,我们来回忆下之前在Eureka章节中提到过的EurekaServer -> 服务消费者EurekaClient,SpringCloud环境下服务消费者调用一般用Ribbon做负载均衡,从Eureka所有服务所有实例缓存到Ribbon某个服务所有实例缓存,也是有定时任务,每隔Ribbon服务实例列表刷新时间同步,这个服务实例列表刷新时间ribbon.ServerListRefreshInterval
就是配置这里的refreshIntervalMs
@Override public synchronized void start(final UpdateAction updateAction) { //保证只能启动一次 if (isActive.compareAndSet(false, true)) { //定时更新任务定义,其实就是执行updateAction以及更新最后更新时间 final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //调度,记录 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } } @Override public synchronized void stop() { //stop就是停止调度 if (isActive.compareAndSet(true, false)) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } } else { logger.info("Not active, no-op"); } } @Override public String getLastUpdate() { return new Date(lastUpdated).toString(); } @Override public long getDurationSinceLastUpdateMs() { return System.currentTimeMillis() - lastUpdated; } @Override public int getNumberMissedCycles() { if (!isActive.get()) { return 0; } //通过时间差计算 return (int) ((int) (System.currentTimeMillis() - lastUpdated) / refreshIntervalMs); } @Override public int getCoreThreads() { if (isActive.get()) { if (getRefreshExecutor() != null) { return getRefreshExecutor().getCorePoolSize(); } } return 0; }
EurekaNotificationServerListUpdater.java
这个类是针对Eureka的Server列表更新,他并不是自己主动定时更新,而是在EurekaClient读取完服务列表之后,会trigger一个CacheRefreshedEvent:
EurekaClient更新服务列表的代码:
private boolean fetchRegistry(boolean forceFullRegistryFetch) { //获取服务列表的代码略 // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // registry was fetched successfully, so return true return true; } protected void onCacheRefreshed() { fireEvent(new CacheRefreshedEvent()); }
EurekaNotificationServerListUpdater的ServerList更新机制就是在有这个CacheRefreshedEvent之后,触发从EurekaClient本地缓存中读取,而不是像PollingServerListUpdater定时主动获取。
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { this.updateListener = new EurekaEventListener() { @Override public void onEvent(EurekaEvent event) { //监听CacheRefreshedEvent if (event instanceof CacheRefreshedEvent) { //如果已经在更新,就不再触发了 if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued logger.info("an update action is already queued, returning as no-op"); return; } try { //更新,异步执行updateAction,执行完后更新lastUpdated refreshExecutor.submit(new Runnable() { @Override public void run() { try { updateAction.doUpdate(); lastUpdated.set(System.currentTimeMillis()); } catch (Exception e) { logger.warn("Failed to update serverList", e); } finally { updateQueued.set(false); } } }); // fire and forget } catch (Exception e) { logger.warn("Error submitting update task to executor, skipping one round of updates", e); updateQueued.set(false); // if submit fails, need to reset updateQueued to false } } } }; if (eurekaClient == null) { eurekaClient = eurekaClientProvider.get(); } if (eurekaClient != null) { eurekaClient.registerEventListener(updateListener); } else { logger.error("Failed to register an updateListener to eureka client, eureka client is null"); throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null."); } } else { logger.info("Update listener already registered, no-op"); } } @Override public synchronized void stop() { if (isActive.compareAndSet(true, false)) { if (eurekaClient != null) { eurekaClient.unregisterEventListener(updateListener); } } else { logger.info("Not currently active, no-op"); } } @Override public String getLastUpdate() { return new Date(lastUpdated.get()).toString(); } @Override public long getDurationSinceLastUpdateMs() { return System.currentTimeMillis() - lastUpdated.get(); } @Override public int getNumberMissedCycles() { return 0; } @Override public int getCoreThreads() { if (isActive.get()) { if (refreshExecutor != null && refreshExecutor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) refreshExecutor).getCorePoolSize(); } } return 0; }
8.服务实例列表过滤机制ServerListFilter
AbstractServerListFilter.java
所有的Filter都需要LoadBalancerStats来过滤。这个LoadBalancerStats就是记录每次负载均衡的结果成功还是失败还有每个Server被调用情况的统计数据。
public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> { private volatile LoadBalancerStats stats; public void setLoadBalancerStats(LoadBalancerStats stats) { this.stats = stats; } public LoadBalancerStats getLoadBalancerStats() { return stats; } }
ZoneAffinityServerListFilter.java
这个是默认的ServerListFilter,如果不进行配置,那么就是这个实现。
这个Filter根据Zone进行过滤,过滤掉不在同一个Zone的Server:
@Override public List<T> getFilteredListOfServers(List<T> servers) { if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){ //利用ZoneAffinityPredicate进行过滤 List<T> filteredServers = Lists.newArrayList(Iterables.filter( servers, this.zoneAffinityPredicate.getServerOnlyPredicate())); //检查是否应该过滤 if (shouldEnableZoneAffinity(filteredServers)) { return filteredServers; } else if (zoneAffinity) { overrideCounter.increment(); } } return servers; }
ZoneAffinityPredicate就是检查Zone是否与当前配置中的Zone一致:
public class ZoneAffinityPredicate extends AbstractServerPredicate { private final String zone = ConfigurationManager.getDeploymentContext().getValue(ContextKey.zone); public ZoneAffinityPredicate() { } @Override public boolean apply(PredicateKey input) { Server s = input.getServer(); String az = s.getZone(); if (az != null && zone != null && az.toLowerCase().equals(zone.toLowerCase())) { return true; } else { return false; } } }
在什么情况下,不过滤呢?
private boolean shouldEnableZoneAffinity(List<T> filtered) { //不启用Zone感知的情况下,就不过滤,默认就是不启用 if (!zoneAffinity && !zoneExclusive) { return false; } if (zoneExclusive) { return true; } LoadBalancerStats stats = getLoadBalancerStats(); if (stats == null) { return zoneAffinity; } else { logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered); ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered); double loadPerServer = snapshot.getLoadPerServer(); int instanceCount = snapshot.getInstanceCount(); int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount(); //在被断路的Server占比超过一定百分比或者可用Server小于一定个数的时候,每个Server负载超过一定值的时候,就不过滤了 if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() || loadPerServer >= activeReqeustsPerServerThreshold.get() || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) { logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount}); return false; } else { return true; } } }
ServerListSubsetFilter.java
在Server个数超过一定量(例如好几百个Server)的时候,我们也许并不想在这几百个Server负载均衡,我们每次随机取一个子集做负载均衡,这个Filter就是ServerListSubsetFilter,同时,一些不太健康的或者比较忙的Server也会被剔除。
主要有下面四个配置进行筛选:
//取子集的大小 private DynamicIntProperty sizeProp = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.size", 20); //至少要排除的Server所占百分比 private DynamicFloatProperty eliminationPercent = new DynamicFloatProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.forceEliminatePercent", 0.1f); //当在这个Server上负载均衡调用失败次数达到一定次数的时候就去掉这个Server private DynamicIntProperty eliminationFailureCountThreshold = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".ServerListSubsetFilter.eliminationFailureThresold", 0); //当在这个Server上面的连接数超过一定个数的时候就去掉这个Server private DynamicIntProperty eliminationConnectionCountThreshold = new DynamicIntProperty(DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + ".Ser