对于一个负载均衡器,就是以用户请求为输入,请求响应为输出的代理模块。
这个模块基本上就是包括一个服务实例列表,根据请求还有负载均衡规则选择一个合适的实例来执行请求并返回响应。
这个服务实例列表,一般包含每个实例基本信息,然后还有,这个实例相关的负载均衡统计信息(例如请求失败多少次,有多少正在处理的请求等等,用于实例过滤和负载均衡规则选择Server)
再深入些,我们需要获取和更新这个服务列表(一般是从注册中心或者配置中获取),有时候我们还要根据一些规则过滤掉某些服务实例不参与负载均衡,同时,我们可能还需要与每个服务实例维持心跳来保证服务实例是可用的(虽然注册中心中也可以体现,但是心跳能更准确迅速地排除故障实例,减少响应时间)。
接下来,我们来看一下Ribbon的主要元素(先不仔细看实现),和上面的负载均衡器组成对应起来
- 所有Ribbon负载均衡器需要实现的接口IClient
- 服务实例列表维护机制实现的接口ServerList
- 负载均衡数据记录LoadBalancerStats
- 负责选取Server的接口ILoadBalancer
- 负载均衡选取规则实现的接口IRule
- 检查实例是否存活实现的接口IPing
- 服务实例列表更新机制实现的接口ServerListUpdater
- 服务实例列表过滤机制ServerListFilter
1. 所有Ribbon负载均衡器需要实现的接口IClient
IClient.java
public interface IClient<S extends ClientRequest, T extends IResponse> { /** * Execute the request and return the response. It is expected that there is no retry and all exceptions are thrown directly. */ public T execute(S request, IClientConfig requestConfig) throws Exception; }
负载均衡器执行一个ClientRequest,并返回一个IResponse。注意,execute方法直接抛出所有异常,并且重试逻辑并不在实现execute的方法里面。而且,可以看出execute方法并没有指定通信协议,在SpringCloud环境下,通信协议是HTTP或者HTTPS
基本上Ribbon中可以被配置的元素,都是用IClientConfig
这个类作为配置类。这里不详细分析源码,如果用到了其中的配置,我们会仔细分析其中的配置以及原理。默认实现是DefaultClientConfigImpl
,其中每个配置都是基于Spring家族的Archaius
实现的动态配置
ClientRequest.java
public class ClientRequest implements Cloneable { protected URI uri; protected Object loadBalancerKey = null; protected Boolean isRetriable = null; protected IClientConfig overrideConfig; //构造器和getter、setter省略 }
其中uri是请求需要发送到的uri地址;loadBalancerKey用于选择服务实例,即用作后面我们会提到的这个方法ILoadBalancer.chooseServer(Object key)
的参数;isRetriable代表这个请求是否可以被重试;overrideConfig是请求相关的一些配置。
IResponse.java
public interface IResponse extends Closeable { /** * Returns the raw entity if available from the response */ public Object getPayload() throws ClientException; /** * A "peek" kinda API. Use to check if your service returned a response with an Entity */ public boolean hasPayload(); /** * @return true if the response is deemed success, for example, 200 response code for http protocol. */ public boolean isSuccess(); /** * Return the Request URI that generated this response */ public URI getRequestedURI(); /** * * @return Headers if any in the response. */ public Map<String, ?> getHeaders(); }
其实这个的实现类也会包含执行器逻辑,例如重试和异常处理等等
2. 服务实例列表维护机制实现的接口ServerList
public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); public List<T> getUpdatedListOfServers(); }
这个接口定义了如何更新并维护实例列表,getInitialListOfServers定义如何初次获取服务实例列表,getUpdatedListOfServers如何获取服务实例列表的更新
3. 负载均衡数据记录LoadBalancerStats
public class LoadBalancerStats { String name; //zone相关信息,默认用不到 volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>(); volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>(); //动态配置,连接失败上限 private volatile DynamicIntProperty connectionFailureThreshold; //动态配置,熔断时间因子 private volatile DynamicIntProperty circuitTrippedTimeoutFactor; //动态配置,最大熔断时间 private volatile DynamicIntProperty maxCircuitTrippedTimeout; //动态配置,每个Server负载均衡统计数据过期时间配置 private static final DynamicIntProperty SERVERSTATS_EXPIRE_MINUTES = DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.serverStats.expire.minutes", 30); //每个Server的负载均衡统计数据 private final LoadingCache<Server, ServerStats> serverStatsCache = CacheBuilder.newBuilder() .expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES) .removalListener(new RemovalListener<Server, ServerStats>() { @Override public void onRemoval(RemovalNotification<Server, ServerStats> notification) { notification.getValue().close(); } }) .build( new CacheLoader<Server, ServerStats>() { public ServerStats load(Server server) { return createServerStats(server); } }); }
4. 负责选取Server的接口ILoadBalancer
ILoadBalancer.java
public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getServerList(boolean availableOnly); }
addServers用来添加可以用来执行请求的Server;chooseServer根据传入的参数,结合某些负载均衡规则,返回某个Server;markServerDown用来标记某个Server为DOWN状态,这样一般下次调用chooseServer就不会选取这个Server;getServerList用来获取所有Server列表
Server.java
public class Server { public static final String UNKNOWN_ZONE = "UNKNOWN"; String host; int port = 80; String id; boolean isAliveFlag; private String zone = UNKNOWN_ZONE; private volatile boolean readyToServe = true; //getter和setter省略 }
host和port代表实例提供服务的的ip和端口。id唯一标识这个Server,isAliveFlag标识这个Server是否还活着;zone代表所在区域
5. 负载均衡选取规则实现的接口IRule
public interface IRule{ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
choose方法是根据key选择Server;setLoadBalancer和getLoadBalancer是设置相应的ILoadBalancer
6. 检查实例是否存活实现的接口IPing
public interface IPing { public boolean isAlive(Server server); }
isAlive方法判断Server是否存活
7. 服务实例列表更新机制实现的接口ServerListUpdater
public interface ServerListUpdater { //定义了更新服务实例列表的操作doUpdate public interface UpdateAction { void doUpdate(); } //开始调度更新服务实例列表,可以看到这里以UpdateAction为传参 void start(UpdateAction updateAction); //停止更新服务实例列表 void stop(); //上次服务实例列表更新时间 String getLastUpdate(); //上次服务实例列表更新时间间隔 long getDurationSinceLastUpdateMs(); //已经错过几轮更新 int getNumberMissedCycles(); //使用线程池大小 int getCoreThreads(); }
这个接口定义了如何更新服务实例列表
8. 服务实例列表过滤机制ServerListFilter
public interface ServerListFilter<T extends Server> { public List<T> getFilteredListOfServers(List<T> servers); }
这个接口定义了如何过滤出需要的服务实例列表