一、基础配置初始化
NacosDiscoveryClientConfiguration
NacosDiscoveryProperties
初始化Nacos基础配置信息的bean,主要指yaml中配置Nacos服务相关的信息。
NacosServiceDiscovery
初始化获取Nacos服务和实例的bean,通过该bean可以获取服务的信息和实例的信息。
NacosDiscoveryClientConfiguration
NacosDiscoveryClient
初始化NacosDiscoveryClient的bean,本质上就是实现NacosServiceDiscovery的实现。该类的作用就是获取到实例信息和服务信息。
NacosWatch
从继承结构上看,NacosWatch主要实现了SmartLifecycle和ApplicationEventPublisherAware,ApplicationEventPublisherAware就是发布事件,这里主要指的就是发布HeartbeatEvent
事件上报心跳。SmartLifecycle该接口主要是作用是所有的bean都创建完成之后,可以执行自己的初始化工作,或者在退出时执行资源销毁工作。NacosWatch的start方法,主要是完成以下四件事情:
- 加入NamingEvent监听;
- 获取NamingService;
- 订阅NamingService监听事件;
- 发布HeartbeatEvent事件;
public void start() { //加入NamingEvent监听 if (this.running.compareAndSet(false, true)) { //更新本地的Instance EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), event -> new EventListener() { @Override public void onEvent(Event event) { if (event instanceof NamingEvent) { List<Instance> instances = ((NamingEvent) event) .getInstances(); Optional<Instance> instanceOptional = selectCurrentInstance( instances); instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); }); } } }); //获取NamingService NamingService namingService = nacosServiceManager .getNamingService(properties.getNacosProperties()); try { //订阅相关NamingService的事件 namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (Exception e) { log.error("namingService subscribe failed, properties:{}", properties, e); } //发布HeartbeatEvent事件 this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( this::nacosServicesWatch, this.properties.getWatchDelay()); } }
NacosWatch的stop方法主要是完成两件事情:
- 释放掉监听的线程池资源;
- 取消NamingService相关的监听事件
@Override public void stop() { if (this.running.compareAndSet(true, false)) { //关闭和释放watch的线程池 if (this.watchFuture != null) { // shutdown current user-thread, // then the other daemon-threads will terminate automatic. ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown(); this.watchFuture.cancel(true); } EventListener eventListener = listenerMap.get(buildKey()); try { //取消NamingService相关的订阅信息 NamingService namingService = nacosServiceManager .getNamingService(properties.getNacosProperties()); namingService.unsubscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (NacosException e) { log.error("namingService unsubscribe failed, properties:{}", properties, e); } } }
二、NacosNamingServicei
客户端信息的初始化发生在发起调用的时候,是一种懒加载的方式,并没有在初始化完成的时候就进行,这部分我们分析Ribbon源码的时候我们具体在讲解一下。我们的重点看的是NacosNamingService,从基础配置类中的NacosServiceDiscovery的getInstances的方法调用追踪到更下层我们会发现,与服务端交互的重点的类就是NacosNamingService,NacosNamingService在初始化的时候,主要做了以下10事件
- initNamespaceForNaming:用于初始命名空间,在Nacos中命名空间用于租户粗粒度隔离,同时还可以进行环境的区别,如开发环境和测试环境等等;
- initSerialization:序列化初始化;
- initServerAddr:初始化服务器地址,其中涉及到的endpoint 等;
- initWebRootContext:初始化web上下文,其支持通过阿里云EDAS进行部署;
- initCacheDir:初始化缓存目录;
- initLogName:从配置中获取日志文件;
- EventDispatcher:监听事件分发,当客户端订阅了某个服务信息后,会以Listener的方式注册到EventDispatcher的队列中,当有服务变化的时候,会通知订阅者;
- NamingProxy:服务端的代理,用于客户端与服务端的通信;
- BeatReactor:用于维持与服务器之间的心跳通信,上报客户端注册到服务端的服务信息;
- HostReactor:用于客户端服务的订阅,以及从服务端更新服务信息;
initNamespaceForNaming
//初始化获取Namespace public static String initNamespaceForNaming(Properties properties) { String tmpNamespace = null; //是否使用阿里云上环境进行解析,默认为true,如果没有进行配置, //默认使用DEFAULT_USE_CLOUD_NAMESPACE_PARSING String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING))); if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) { tmpNamespace = TenantUtil.getUserTenantForAns(); //从系统变量获取namespace tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace); return namespace; } }); } //如果不是上云环境,那么从系统变量获取namespace tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { String namespace = System.getProperty(PropertyKeyConst.NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); //从properties中获取namespace if (StringUtils.isEmpty(tmpNamespace) && properties != null) { tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE); } //获取系统默认的namespace tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { return UtilAndComs.DEFAULT_NAMESPACE_ID; } }); return tmpNamespace; }
initServerAddr
//初始化服务器地址 private void initServerAddr(Properties properties) { //从properties中获取服务器地址 serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR); //初始化endpoint,如果有endpoint,则废弃serverList endpoint = InitUtils.initEndpoint(properties); if (StringUtils.isNotEmpty(endpoint)) { serverList = ""; } } public static String initEndpoint(final Properties properties) { if (properties == null) { return ""; } // Whether to enable domain name resolution rules //是否使用endpoint解析,默认为true,也就是:USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE String isUseEndpointRuleParsing = properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE, System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE, String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE))); boolean isUseEndpointParsingRule = Boolean.valueOf(isUseEndpointRuleParsing); String endpointUrl; //使用endpoint解析功能 if (isUseEndpointParsingRule) { // Get the set domain name information endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT)); if (StringUtils.isBlank(endpointUrl)) { return ""; } } else { //不使用的化,直接通过properties文件来获取 endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT); } if (StringUtils.isBlank(endpointUrl)) { return ""; } //获取endpoint的端口 String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() { @Override public String call() { return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT); } }); endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() { @Override public String call() { return "8080"; } }); return endpointUrl + ":" + endpointPort; }
initWebRootContext
//阿里云EDAS相关的 public static void initWebRootContext() { // support the web context with ali-yun if the app deploy by EDAS final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT); TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() { @Override public void run() { UtilAndComs.webContext = webContext.indexOf("/") > -1 ? webContext : "/" + webContext; UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns"; UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance"; } }); }
initCacheDir
//初始化缓存目录,用于存放从服务端获取的服务信息,如果客户端与服务端断开了连接,将会使用缓存的信息 private void initCacheDir() { cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir"); if (StringUtils.isEmpty(cacheDir)) { cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace; } }
initLogName
//初始化日志存放路径 private void initLogName(Properties properties) { logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); if (StringUtils.isEmpty(logName)) { if (properties != null && StringUtils .isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) { logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); } else { logName = "naming.log"; } } }
EventDispatcher
public class EventDispatcher implements Closeable { private ExecutorService executor = null; //发生了变化的服务队列 private final BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>(); //监听者维护映射 private final ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>(); private volatile boolean closed = false; public EventDispatcher() { this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); this.executor.execute(new Notifier()); } /** * Add listener. * * @param serviceInfo service info * @param clusters clusters * @param listener listener */ public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); } /** * Remove listener. * * @param serviceName service name * @param clusters clusters * @param listener listener */ public void removeListener(String serviceName, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map"); List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters)); if (observers != null) { Iterator<EventListener> iter = observers.iterator(); while (iter.hasNext()) { EventListener oldListener = iter.next(); if (oldListener.equals(listener)) { iter.remove(); } } if (observers.isEmpty()) { observerMap.remove(ServiceInfo.getKey(serviceName, clusters)); } } } public boolean isSubscribed(String serviceName, String clusters) { return observerMap.containsKey(ServiceInfo.getKey(serviceName, clusters)); } public List<ServiceInfo> getSubscribeServices() { List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>(); for (String key : observerMap.keySet()) { serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; } /** * Service changed. * * @param serviceInfo service info */ public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo); } @Override public void shutdown() throws NacosException { String className = this.getClass().getName(); NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER); closed = true; NAMING_LOGGER.info("{} do shutdown stop", className); } //服务变化通知线程 private class Notifier implements Runnable { @Override public void run() { while (!closed) { ServiceInfo serviceInfo = null; try { //从队列取出变化消息 serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null) { continue; } try { //获取监听者队列 List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); //遍历监听者队列,调用其onEvent方法 if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } } }
EventDispatcher维护一个阻塞队列,主要存储发生改变的服务的信息,维护了一个对于服务的监听队列的映射的Map,实时的将服务变化信息同步给监听者,这样客户端就可以通过注册监听者实现在服务变化后动态进行操作。
NamingProxy
NamingProxy封装了与服务端的操作,代码相对比较简单,值得注意的是如果配置安全相关的内容,在初始化的时候该处会进行一个定时任务的查询,如果对安全要求比较高,可重SecurityProxy部分内容,当然服务端部分也需要重写,大部分的情况这注册中心一般暴露在内网环境下,基本上不需要重写的。
BeatReactor
BeatReactor负责将客户端的信息上报和下线,对于非持久化的内容采用周期上报内容,这部分在服务心跳的时候我们讲解过,这里不进行源码分析,大家重点关注addBeatInfo、removeBeatInfo和BeatTask的内容,相对比较简单。
HostReactor
HostReactor主要负责客户端获取服务端注册的信息的部分,主要分为三个部分:
- 客户端需要调用NacosNamingService获取服务信息方法的时候,HostReactor负责把服务信息维护本地缓存的serviceInfoMap中,并且通过UpdateTask定时更新已存在的服务;
- HostReactor内部维护PushReceiver对象,负责接收服务端通过UDP协议推送过来的服务变更的信息,并更新到本地缓存serviceInfoMap当中;
- HostReactor内部维护FailoverReactor对象,负责当服务端不可用的时候,切换到本地文件缓存模式,从本地文件的缓存中获取服务信息;
public class HostReactor implements Closeable { private static final long DEFAULT_DELAY = 1000L; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>(); private final Map<String, ServiceInfo> serviceInfoMap; private final Map<String, Object> updatingMap; //接收UDP服务端UDP协议 private final PushReceiver pushReceiver; //阻塞队列的变更消息处理 private final EventDispatcher eventDispatcher; //心跳上报 private final BeatReactor beatReactor; //HTTP请求消息的处理 private final NamingProxy serverProxy; //服务不可用时本地降级文件的处理模式 private final FailoverReactor failoverReactor; private final String cacheDir; //定时更新服务消息的定时任务 private final ScheduledExecutorService executor; public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) { this(eventDispatcher, serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); } public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) { // init executorService this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.client.naming.updater"); return thread; } }); this.eventDispatcher = eventDispatcher; this.beatReactor = beatReactor; this.serverProxy = serverProxy; this.cacheDir = cacheDir; if (loadCacheAtStart) { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16); } this.updatingMap = new ConcurrentHashMap<String, Object>(); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); } public Map<String, ServiceInfo> getServiceInfoMap() { return serviceInfoMap; } public synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); } /** * Process service json. * * @param json service json * @return service info */ //处理从服务端接收到的数据 public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; //新老信息对比处理 if (oldService != null) { //如果本地旧服务的获取时间比服务器端获取的时间新,则保留本地旧服务的时间 if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } //用新服务信息替换serviceInfoMap serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet<Instance>(); Set<Instance> newHosts = new HashSet<Instance>(); Set<Instance> remvHosts = new HashSet<Instance>(); List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { //服务信息变更写入阻塞队列 eventDispatcher.serviceChanged(serviceInfo); //磁盘缓存希尔 DiskCache.write(serviceInfo, cacheDir); } } else { //新服务的信息直接加入本地缓存 changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } //上报数量 MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; } private void updateBeatInfo(Set<Instance> modHosts) { for (Instance instance : modHosts) { String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort()); if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(instance); beatReactor.addBeatInfo(instance.getServiceName(), beatInfo); } } } //通过key获取服务对象 private ServiceInfo getServiceInfo0(String serviceName, String clusters) { //得到ServiceInfo的key String key = ServiceInfo.getKey(serviceName, clusters); //从本地缓存中获取服务信息 return serviceInfoMap.get(key); } //从服务器端获取Service信息,并解析为ServiceInfo对象 public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException { String result = serverProxy.queryList(serviceName, clusters, 0, false); if (StringUtils.isNotEmpty(result)) { return JacksonUtils.toObj(result, ServiceInfo.class); } return null; } public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); //是否开启本地文件缓存模式 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); //从serviceInfoMap获取serviceObj,如果没有serviceObj,则新生成一个 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //如果更新列表中包含服务,则等待更新结束 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //添加更新调度任务 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } //从服务端更新服务 private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } /** * Schedule update if absent. * * @param serviceName service name * @param clusters clusters */ //添加更新调度任务 public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } /** * Update service now. * * @param serviceName service name * @param clusters clusters */ public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } /** * Refresh only. * * @param serviceName service name * @param clusters cluster */ public void refreshOnly(String serviceName, String clusters) { try { serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } @Override public void shutdown() throws NacosException { String className = this.getClass().getName(); NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER); pushReceiver.shutdown(); failoverReactor.shutdown(); NAMING_LOGGER.info("{} do shutdown stop", className); } //定时更新已存在的服务 public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private final String clusters; private final String serviceName; /** * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty */ private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (failCount == limit) { return; } failCount++; } private void resetFailCount() { failCount = 0; } @Override public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } } } }
总结
从NacosNamingService初始化的整个过程中,很重要的一点值得我们学习,就是单一职责这个理念在每个类的设计上表现的淋淋尽致。