Nacos 服务列表管理
Nacos 提供了开放 API 可通过 /nacos/v1/ns/instance/list 获取服务列表。如果我们采用 spring-cloud 方式去获取服务,最终会通过 Nacos Client + loadbalancer 的方式进行客户端负载均衡。
Ribbon 源码解析
Ribbon 简介
Spring Cloud Ribbon 是 Netflix Ribbon 实现的一套客户端负载均衡工具 简单的说,Ribbon 是 Netflix 发布的开源项目,主要功能是提供客户端的复杂算法和服务调用。 Ribbon 客户端组件提供一系列完善的配置项如超时、重试等。简单的说,就是配置文件中列出 load Balancer (简称 LB)后面所有的机器,Ribbon 会自动的帮助你基于某种规则(如简单轮询,随机链接等)去链接这些机器。我们很容易使用 Ribbon 自定义的负载均衡算法。
Ribbon 使用
首先需要定义 RestTemplate 使用 Ribbon 策略;
@Configuration public class RestTemplateConfig { @LoadBalanced @Bean public RestTemplate restTemplate() { return new RestTemplate(); } }
本地使用 RestTemplate 调用远程接口;
@Autowired private RestTemplate restTemplate; @RequestMapping(value = "/echo/{id}", method = RequestMethod.GET) public String echo(@PathVariable Long id) { return restTemplate.getForObject("http://member-service/member/get/" + id, String.class); }
Ribbon 源码分析
RestTemplate 继承 InterceptingHttpAccessor 通过 interceptors 字段接受
HttpRequestInterceptor 请求拦截器。对于 Ribbion 初始化类是 RibbonAutoConfiguration 中的, 它在 spring-cloud-netflix-ribbon 中定义。但是它在初始化之前,又需要加载 RibbonAutoConfiguration 配置,它是在 spring-cloud-common 中。具体的代码如下:
@Configuration(proxyBeanMethods = false) // 工程中一定存在 RestTemplate 类 @ConditionalOnClass(RestTemplate.class) // 容器中一定存在 LoadBalancerClient 类 Bean 实例 @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { // 获取 Spring 容器中所有的 RestTemplate 实例 @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); // 获取 Spring 容器中 LoadBalancerRequestTransformer 实例 @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); // 在 Bean 初始化完成后会调用 afterSingletonsInstantiated 方法 // 这里是一个 lambda 表达式方式的实现, 主要是为 restTemplate 实例设置 RestTemplateCustomizer @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } }); } // LoadBalancerRequestFactory 工厂类 // 主要是用来提供 LoadBalancerClient 实例和 LoadBalancerRequestTransformer @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers); } // LoadBalancerInterceptor 拦截器 @Configuration(proxyBeanMethods = false) @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { // 创建默认的拦截器 LoadBalancerInterceptor 的实例 @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } // 如果没有 RestTemplateCustomizer 实例才会创建 // 这里就就会为咱们所有的 restTemplate 实例添加 loadBalancerInterceptor 拦截器 @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } // ... }
针对下面的代码我们可以总结一下:
如果需要使用负载均衡,工程下面必须要有 RestTemplate 类, 然后Spring 容器中要有 LoadBalancerClient 的实例。
LoadBalancerClient 在 spring-cloud-netflix-ribbon 中只有一个实现类:
RibbonLoadBalancerClient
利用 Spring 的 SmartInitializingSingleton 拓展点,在 restTemplateCustomizer() 中为所有的 RestTemplate 添加 LoadBalancerInterceptor 拦截器
其实 LoadBalancer 的本质就是通过拦截器。利用 RestTemplate 的拓展点来实现请求服务的负载均衡。
LoadBalancerInterceptor
LoadBalancerInterceptor 拦截器会将请求交给 LoadBalancerClient 去处理,首先会选择一个 ILoadBalancer 的实现来处理获取和选择服务,然后通过 serviceName 和负载均衡算法去选择 Server 对象。最后执行请求。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { // 负载均衡 private LoadBalancerClient loadBalancer; // 构建请求 private LoadBalancerRequestFactory requestFactory; // ... @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); } }
RibbonLoadBalancerClient
我们通过跟踪 this.loadBalancer.execute 代码发现。最终所有的请求都交由
RibbonLoadBalancerClient 去处理。它实现了。LoadBalancerClient 接口, 代码如下:
public interface ServiceInstanceChooser { // 通过 serviceId 选择具体的服务实例 ServiceInstance choose(String serviceId); } public interface LoadBalancerClient extends ServiceInstanceChooser { <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; // 将服务实例信息替换还具体的 IP 信息 URI reconstructURI(ServiceInstance instance, URI original); }
我们先来分析 RibbonLoadBalancerClient 的 choose 方法
@Override public ServiceInstance choose(String serviceId) { return choose(serviceId, null); } // 通过服务名选择具体的服务实例 public ServiceInstance choose(String serviceId, Object hint) { Server server = getServer(getLoadBalancer(serviceId), hint); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } // 通过服务名选择一个负载均衡器, 默认是 `ZoneAwareLoadBalancer` protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } // 获取服务 protected Server getServer(ILoadBalancer loadBalancer) { return getServer(loadBalancer, null); } protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } // Use 'default' on a null hint, or just pass it on? return loadBalancer.chooseServer(hint != null ? hint : "default"); }
LoadBalancerInterceptor 执行的时候是直接委托执行的 loadBalancer.execute() 这个方法:
// LoadBalancerRequest 是通过 LoadBalancerRequestFactory.createRequest(request, body, execution) 创建 // 它实现 LoadBalancerRequest 接口是用的一个匿名内部类,泛型类型是ClientHttpResponse // 因为最终执行的显然还是执行器:ClientHttpRequestExecution.execute() @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { return execute(serviceId, request, null); } public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 拿到负载均衡器,然后拿到一个serverInstance实例 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer, hint); if (server == null) { // 若没找到就直接抛出异常。这里使用的是IllegalStateException这个异常 throw new IllegalStateException("No instances available for " + serviceId); } // 把Server适配为RibbonServer isSecure:客户端是否安全 // serverIntrospector内省 参考配置文件:ServerIntrospectorProperties RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); //调用本类的重载接口方法 return execute(serviceId, ribbonServer, request); } // 它的参数是 ServiceInstance --> 已经确定了唯一的Server实例 @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { // 拿到 Server,RibbonServer 是 execute 时的唯一实现 Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } // 执行的上下文是和serviceId绑定的 RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); ... // 真正的向server发送请求,得到返回值 // 因为有拦截器,所以这里肯定说执行的是InterceptingRequestExecution.execute()方法 // so会调用ServiceRequestWrapper.getURI(),从而就会调用reconstructURI()方法 T returnVal = request.apply(serviceInstance); return returnVal; ... // 异常处理 }
returnVal 是一个 ClientHttpResponse,最后交给 handleResponse()方法来处理异常情况(若存在的话),若无异常就交给提取器提值:
responseExtractor.extractData(response),这样整个请求就算全部完成了。
ZoneAwareLoadBalancer
负载均衡器 ZoneAwareLoadBalancer 的类图结构如下图所示。它 DynamicServerListLoadBalancer 它的父类, 核心方法 重置和初始化:
restOfInit(clientConfig) 更新服务列表:updateListOfServers(); 这个方需要调用到 ServerList.getUpdatedListOfServers() 这里就会调用到具体的注册中心实现,以 Nacos 为例他的实现就是 NacosServerList#getUpdatedListOfServers();
- 更新所有服务列表:updateAllServerList();
- 设置所有服务列表 setServersList() ZoneAwareLoadBalancer 它的核心方法:
- 选择服务实例 chooseServer()
- 选择负载均衡器 getLoadBalancer
- 选择区域内的服务实例:zoneLoadBalancer.chooseServer
Ribbon 总结
针对 @LoadBalanced 下的 RestTemplate 的使用,我总结如下:
- 传入的String类型的url必须是绝对路径(http://...),否则抛出异常:
- java.lang.IllegalArgumentException: URI is not absolute
- serviceId 不区分大小写(http://order-service/...效果同http://OERDER-SERVICE/...)
- serviceId 后请不要跟 port 端口号
最后,需要特别指出的是:标注有@LoadBalanced 的 RestTemplate 只能填写 serviceId 而不能再写 IP地址/域名去发送请求了, 若你的项目中两种 case 都有需要,需要定义多个 RestTemplate 分别应对不同的使用场景
Nacos 服务查询
客户端查询
如果我们使用默认的 Nacos 客户端,那么走的就是
NacosServerList#getUpdatedListOfServers();接口来查询服务列表。
public class NacosServerList extends AbstractServerList<NacosServer> { private NacosDiscoveryProperties discoveryProperties; @Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { String group = discoveryProperties.getGroup(); // discoveryProperties.namingServiceInstance() // 最终通过反射获取 com.alibaba.nacos.client.naming.NacosNamingService 实例 List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, group, true); return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } }
然后调用 selectInstances 方法
@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // subscribe 默认传的是 true if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); }
其实核心的逻辑在 hostReactor.getServiceInfo 在查询服务信息里面会把当前的 serviceName、 clusters 转换为 key, 然后通过 getServiceInfo0 方法查询服务信息这里主要是查询的是本地的数据。
如果 null == serviceObj 会在 updateServiceNow 里面去调用 /instance/list接口查询服务信息
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // UPDATE_HOLD_INTERVAL 为常量默认金辉进去 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { // 最大等待时间 5s, 在更新 serviceObj 之后, 就会执行 notifyAll() // 方法入口 updateService(String serviceName, String clusters) // 最大延迟 2s DEFAULT_DELAY = 1 serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 通过 Schedule 更新 服务信息 scheduleUpdateIfAbsent(serviceName, clusters); // 获取最新的值 return serviceInfoMap.get(serviceObj.getKey()); }
代码看到这里我们不难理解,为什么第一次 Ribbon 调用的时候都会比较慢,因为它回去初始化服务列表,然后通过 Nacos Client 去 Nacos 查询服务实例信息。
服务端处理
服务端通过 /instance/list 接口来处理服务实例信息查询请求。首先服务实例信息都是被存储在 ConcurrentHashMap 中
/** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
在我们查询的过程中主要是通过 ServiceManager 来进行管理, 核心的入口方法在 InstanceController#doSrvIpxt 中
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { if (udpPort > 0 && pushService.canEnablePush(agent)) { pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis); result.replace("hosts", JacksonUtils.createEmptyArrayNode()); return result; } checkIfDisabled(service); List<Instance> srvedIPs; // 查询所有的服务 // 内部会更新服务列表 // allInstances.addAll(persistentInstances); // allInstances.addAll(ephemeralInstances); srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); // filter ips using selector: if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.set("hosts", JacksonUtils.createEmptyArrayNode()); result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; } Map<Boolean, List<Instance>> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } if (isCheck) { result.put("reachProtectThreshold", false); } double threshold = service.getProtectThreshold(); if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) { List<Instance> ips = entry.getValue(); if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // remove disabled instance: if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put("ip", instance.getIp()); ipObj.put("port", instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put("valid", entry.getKey()); ipObj.put("healthy", entry.getKey()); ipObj.put("marked", instance.isMarked()); ipObj.put("instanceId", instance.getInstanceId()); ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put("enabled", instance.isEnabled()); ipObj.put("weight", instance.getWeight()); ipObj.put("clusterName", instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { ipObj.put("serviceName", instance.getServiceName()); } else { ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put("ephemeral", instance.isEphemeral()); hosts.add(ipObj); } } result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; }
在上面的核心逻辑主要是:
- 调用 service.srvIPs 方法查询所有的服务实例信息
- Cluster#allIPs会将所有的服务注册信息写到服务注册列表。