我们先来分析 RibbonLoadBalancerClient
的 choose
方法
@Override public ServiceInstance choose(String serviceId) { return choose(serviceId, null); } // 通过服务名选择具体的服务实例 public ServiceInstance choose(String serviceId, Object hint) { Server server = getServer(getLoadBalancer(serviceId), hint); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } // 通过服务名选择一个负载均衡器, 默认是 `ZoneAwareLoadBalancer` protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } // 获取服务 protected Server getServer(ILoadBalancer loadBalancer) { return getServer(loadBalancer, null); } protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } // Use 'default' on a null hint, or just pass it on? return loadBalancer.chooseServer(hint != null ? hint : "default"); }
LoadBalancerInterceptor
执行的时候是直接委托执行的
loadBalancer.execute()
这个方法:
// LoadBalancerRequest 是通过 LoadBalancerRequestFactory.createRequest(request, body, execution) 创建 // 它实现 LoadBalancerRequest 接口是用的一个匿名内部类,泛型类型是ClientHttpResponse // 因为最终执行的显然还是执行器:ClientHttpRequestExecution.execute() @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { return execute(serviceId, request, null); } public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 拿到负载均衡器,然后拿到一个serverInstance实例 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer, hint); if (server == null) { // 若没找到就直接抛出异常。这里使用的是IllegalStateException这个异常 throw new IllegalStateException("No instances available for " + serviceId); } // 把Server适配为RibbonServer isSecure:客户端是否安全 // serverIntrospector内省 参考配置文件:ServerIntrospectorProperties RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); //调用本类的重载接口方法 return execute(serviceId, ribbonServer, request); } // 它的参数是 ServiceInstance --> 已经确定了唯一的Server实例 @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { // 拿到 Server,RibbonServer 是 execute 时的唯一实现 Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } // 执行的上下文是和serviceId绑定的 RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); ... // 真正的向server发送请求,得到返回值 // 因为有拦截器,所以这里肯定说执行的是InterceptingRequestExecution.execute()方法 // so会调用ServiceRequestWrapper.getURI(),从而就会调用reconstructURI()方法 T returnVal = request.apply(serviceInstance); return returnVal; ... // 异常处理 }
returnVal
是一个 ClientHttpResponse
,最后交给 handleResponse()
方法来处理异常情况(若存在的话),若无异常就交给提取器提值:responseExtractor.extractData(response)
,这样整个请求就算全部完成了。
ZoneAwareLoadBalancer
负载均衡器 ZoneAwareLoadBalancer
的类图结构如下图所示。它
DynamicServerListLoadBalancer
它的父类, 核心方法
- 重置和初始化:
restOfInit(clientConfig)
- 更新服务列表:
updateListOfServers();
这个方需要调用到 ServerList.getUpdatedListOfServers() 这里就会调用到具体的注册中心实现,以 Nacos 为例他的实现就是 NacosServerList#getUpdatedListOfServers();
- 更新所有服务列表:
updateAllServerList();
- 设置所有服务列表
setServersList()
ZoneAwareLoadBalancer
它的核心方法:
- 选择服务实例
chooseServer()
- 选择负载均衡器
getLoadBalancer
- 选择区域内的服务实例:
zoneLoadBalancer.chooseServer
Ribbon 总结
针对 @LoadBalanced
下的 RestTemplate
的使用,我总结如下:
- 传入的String类型的url必须是绝对路径(http://...),否则抛出异常:
java.lang.IllegalArgumentException: URI is not absolute
- serviceId 不区分大小写(http://order-service/...效果同http://OERDER-SERVICE/...)
- serviceId 后请不要跟 port 端口号
最后,需要特别指出的是:标注有@LoadBalanced
的 RestTemplate
只能填写 serviceId
而不能再写 IP地址/域名去发送请求了, 若你的项目中两种 case 都有需要,需要定义多个 RestTemplate
分别应对不同的使用场景
Nacos 服务查询
客户端查询
如果我们使用默认的 Nacos 客户端,那么走的就是 NacosServerList#getUpdatedListOfServers();
接口来查询服务列表。
public class NacosServerList extends AbstractServerList<NacosServer> { private NacosDiscoveryProperties discoveryProperties; @Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { String group = discoveryProperties.getGroup(); // discoveryProperties.namingServiceInstance() // 最终通过反射获取 com.alibaba.nacos.client.naming.NacosNamingService 实例 List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, group, true); return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } }
然后调用 selectInstances
方法
@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // subscribe 默认传的是 true if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); }
其实核心的逻辑在 hostReactor.getServiceInfo
在查询服务信息里面会把当前的 serviceName
、 clusters
转换为 key, 然后通过 getServiceInfo0
方法查询服务信息这里主要是查询的是本地的数据。
如果 null == serviceObj 会在 updateServiceNow
里面去调用 /instance/list
接口查询服务信息
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // UPDATE_HOLD_INTERVAL 为常量默认金辉进去 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { // 最大等待时间 5s, 在更新 serviceObj 之后, 就会执行 notifyAll() // 方法入口 updateService(String serviceName, String clusters) // 最大延迟 2s DEFAULT_DELAY = 1 serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 通过 Schedule 更新 服务信息 scheduleUpdateIfAbsent(serviceName, clusters); // 获取最新的值 return serviceInfoMap.get(serviceObj.getKey()); }
代码看到这里我们不难理解,为什么第一次 Ribbon 调用的时候都会比较慢,因为它回去初始化服务列表,然后通过 Nacos Client 去 Nacos 查询服务实例信息。