nacos源码分析-服务注册(服务端)

简介: 一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3 ,

安装Nacos源码

上一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3在这里插入图片描述
解压之后使用IDEA工具导入即可。

在这里插入图片描述
但是编译过后发现代码会报错,主要是缺少实体类,比如:
在这里插入图片描述

安装protobuf

这主要是应该nacos数据通信底层使用到protobuf进行序列化(与JSON类似),是Google提供的一种数据序列化协议

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

所以这里我们需要安装protobuf ,先去下载 https://github.com/protocolbuffers/protobuf/releases,下载window版本如下:
在这里插入图片描述

  • 下载之后解压

在这里插入图片描述

  • 然后需要配置环境变量

在这里插入图片描述

  • 找到consistency模块,进入src/main

在这里插入图片描述

  • 进入main目录,执行cmd命令
    protoc --java_out=./java ./proto/consistency.proto
    protoc --java_out=./java ./proto/Data.proto
    
    效果如下:
    在这里插入图片描述

启动Nacos

找到console控制台,启动Nacos,第一次启动会报错,因为默认是以集群方式启动,会出现jdbc.properties找不到的错误
在这里插入图片描述

  • 然后指定为单机启动,指定VM参数

在这里插入图片描述

  • 启动成功

在这里插入图片描述

在这里插入图片描述

  • 到这里,nacos服务端的源码就启动成功了,那么我们尝试启动nacos-client程序,让他注册到nacos-server

在这里插入图片描述

  • 查看控制台,nacos-client成功注册到服务端

在这里插入图片描述

服务注册

在上一章节《Nacos源码分析-服务注册(客户端)》我们有分析到,nacos-client提交注册的地址是post /nacos/v1/ns/instance,那么我们在nacos-server源码中找到该接口,它位于 naming 模块中的/controllers包下的InstanceController接口中。源码如下

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
   
   

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private PushService pushService;

    @Autowired
    private ServiceManager serviceManager;

      ...省略...

    /**
      注册一个新的实例
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    //request请求对象中包括了注册的服务的port,namespaceId,groupName,serviceName,ip,集群名等等
    public String register(HttpServletRequest request) throws Exception {
   
   
        //拿到注册的服务的:namespaceId,默认是public
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //拿到注册的服务的:serviceName服务名会把组名加在前面,比如:DEFAULT_GROUP@@nacos-client
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //检查服务名的格式:groupName@@serviceName
        NamingUtils.checkServiceNameFormat(serviceName);
        //解析请求参数,封装服务实例对戏,把注册的服务封装为Instance,其中包括IP,端口,服务名等
        final Instance instance = parseInstance(request);
        //使用ServiceManger注册服务实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

    //解析要注册的服务实例
    private Instance parseInstance(HttpServletRequest request) throws Exception {
   
   
        //拿到服务名 DEFAULT_GROUP@@nacos-client
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //拿到app,没配置就是:unknown
        String app = WebUtils.optional(request, "app", "DEFAULT");
        //拿到注册服务的:IP,是否开启服务,权重,健康状况,等封装为Instance 对象
        Instance instance = getIpAddress(request);
        instance.setApp(app);
        instance.setServiceName(serviceName);
        // Generate simple instance id first. This value would be updated according to
        // 生成实例的ID:192.168.174.1#8080#DEFAULT#DEFAULT_GROUP@@nacos-client
        instance.setInstanceId(instance.generateInstanceId());
        //设置最后的心跳时间为当前时间
        instance.setLastBeat(System.currentTimeMillis());
        String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
        if (StringUtils.isNotEmpty(metadata)) {
   
   
            instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
        }
        //验证实例
        instance.validate();

        return instance;
    }

register方法中会从请求对象中拿到注册的参数比如IP,是否开启服务,权重,健康状况等,然后封装为 instance对象,交给 serviceManager.registerInstance 去注册,下面是 serviceManager.registerInstance的源码

缓存和初始化serivce

@Component
public class ServiceManager implements RecordListener<Service> {
   
   

    /**
     * Map(namespace, Map(group::serviceName, Service)).
     */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

   ...省略部分代码...
    //注册服务实例
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
   
   
            //1.会尝试从serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,
            // 并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager的一个ConcurrentHashMap中
            // 服务注册表的结构是Map<String,Map<String,Service>>
            createEmptyService(namespaceId, serviceName, instance.isEphemeral());
            //从注册表中获取服务,注册表是一个Map<String,Map<String,Service>>结构,
            // 先根据namespaceId取得到Map<String,Service>,然后再根据serviceName取Service
            Service service = getService(namespaceId, serviceName);
            //参数无效,没有找到服务
            if (service == null) {
   
   
                throw new NacosException(NacosException.INVALID_PARAM,
                        "service not found, namespace: " + namespaceId + ", service: " + serviceName);
            }
            //添加 instance 服务实例到注册表
            addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        }

        ...省略部分代码...

        //二.创建service,并初始化
        public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
                    throws NacosException {
   
   
            Service service = getService(namespaceId, serviceName);
            //如果服务不存在就创建一个service
            if (service == null) {
   
   

                Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
                service = new Service();
                service.setName(serviceName);
                service.setNamespaceId(namespaceId);
                service.setGroupName(NamingUtils.getGroupName(serviceName));
                // now validate the service. if failed, exception will be thrown
                service.setLastModifiedMillis(System.currentTimeMillis());
                service.recalculateChecksum();
                if (cluster != null) {
   
   
                    cluster.setService(service);
                    service.getClusterMap().put(cluster.getName(), cluster);
                }
                service.validate();
                //保存service和初始化service
                putServiceAndInit(service);
                if (!local) {
   
   
                    addOrReplaceService(service);
                }
            }
        }
        //保存service和初始化service
        private void putServiceAndInit(Service service) throws NacosException {
   
   
                //保存service
                putService(service);
                service = getService(service.getNamespaceId(), service.getName());
                //初始化service
                service.init();
                //consistencyService.listen实现数据一致性监听
                consistencyService
                        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
                consistencyService
                        .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
                Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
            }

          //保存service到注册表中
          public void putService(Service service) {
   
   
            if (!serviceMap.containsKey(service.getNamespaceId())) {
   
   
                synchronized (putServiceLock) {
   
   
                    if (!serviceMap.containsKey(service.getNamespaceId())) {
   
   
                        serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
                    }
                }
            }
            //把注册的服务存储到Map中
            serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
        }

registerInstance做了三个事情

  • 通过putService()方法将服务缓存到内存

  • service.init()建立心跳机制

  • consistencyService.listen实现数据一致性监听

registerInstance方法会尝试从ServiceManager#serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager#serviceMap中。

该Map是一个ConcurrentHashMap,结构是Map>。第一个Key是NamespaceId 如:public ,第二个key是服务名,如 : DEFAULT_GROUP@@nacos-client

在这里插入图片描述

这就是nacos中的的服务注册表,用来存放注册的服务实例的Map.

在这里插入图片描述
注意:service和instance的关系是,一个service中包含一个 Map , 一个Cluster中包含一个 Set。

  • service代表一个服务:比如用户服务
  • Cluster代表服务集群,比如2个用户服务形成一个集群
  • 而一个集群中有多个服务实例,所以Cluster中有了Set 来保存服务实例

除此之外还会调用 com.alibaba.nacos.naming.core.Service#init 方法对service进行初始化,下面是init方法的源码

public void init() {
   
   
        //clientBeatCheckTask 是一个Runnable,它持有service,它的作用是
       //检查并更新临时实例的状态,如果它们已过期,则将其删除
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
   
   
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }
//定时任务:定时检查服务的健康状况,5S一次
 public static void scheduleCheck(ClientBeatCheckTask task) {
   
   
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

service.init 初始化方法中主要是把service封装到 ClientBeatCheckTask 对象中,ClientBeatCheckTask 是一个Runnable线程对象,然后使用定时任务5s执行一次健康检查。 ClientBeatCheckTask 的作用是 : 检查并更新临时实例的状态,如果它们已过期,则将其删除

下面是 com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run 线程对象的源码

public void run() {
   
   
        try {
   
   
            if (!getDistroMapper().responsible(service.getName())) {
   
   
                return;
            }

            if (!getSwitchDomain().isHealthCheckEnabled()) {
   
   
                return;
            }
            //拿到服务中的所有实例
            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            for (Instance instance : instances) {
   
   
                //当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
   
   
                    if (!instance.isMarked()) {
   
   
                        if (instance.isHealthy()) {
   
   
                            //健康状态设置为false
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                             //发布时间:服务状态改变
                            getPushService().serviceChanged(service);
                            //发布时间:服务实例心跳超时事件
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
   
   
                return;
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {
   
   

                if (instance.isMarked()) {
   
   
                    continue;
                }

                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
   
   
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                }
            }

        } catch (Exception e) {
   
   
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }

run方法中会拿到当前service的所有instance,然后循环 , 如果:当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时,然后会改变instance的Healthy健康状态Wie false; 并抛出 服务实例心跳超时事件

getPushService().serviceChanged(service):方法很有意思,他的作用是通知 nacos-client该服务已经下线(UDP协议 push),这样的话nacos-client就会从本地剔除掉下线的服务。这就是它和eureka不一样的地方,eureka使用的是pull.而 nacos采用pull + push模式。 具体源码见: PushService#onApplicationEvent

 public void onApplicationEvent(ServiceChangeEvent event) {
   
   
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        //使用定时任务 1s 一次
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
   
   
            try {
   
   
                //服务改变,添加到 push队列
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
   
   
                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {
   
   
                    if (client.zombie()) {
   
   
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }

                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
   
   
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();

                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }

                    if (compressData != null) {
   
   
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
   
   
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
   
   
                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }

                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.key));
                    //使用udp协议push
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
   
   
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

            } finally {
   
   
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }

添加instance

到这里,service的缓存和初始化就看完了,代码回到 com.alibaba.nacos.naming.core.ServiceManager#registerInstance 。接下来就是分析 addInstance方法

//添加一个instance到Add instance to service.
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
   
   
        //拿到key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@nacos-client
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        //拿到service
        Service service = getService(namespaceId, serviceName);
        //对service加同步锁,避免并发修改
        synchronized (service) {
   
   
            //拿到该service中的所有instance
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            //把实例列表封装到Instances 对象中
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            //调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性
            consistencyService.put(key, instances);
        }
    }

addInstance方法中会拿到service中的List<Instance>实例列表,然后设置到 Instances 中,调用 consistencyService去同步到nacos集群。

这里采用了CopyOnWrite方案。对于 addIPAddress方法会拷贝旧的实例列表添加到新实例到列表中。在同步完nacos集群后,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。

这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案

consistencyService是用作service同步的。代表集群一致性的接口。

在这里插入图片描述

下面看一下 consistencyService.put 方法,底层会调用 DistroConsistencyServiceImpl#put 方法,源码如下

@Override
public void put(String key, Record value) throws NacosException {
   
   
    //根据key确定是用ephemeralConsistencyService或者persistentConsistencyService
    mapConsistencyService(key).put(key, value);
}

private ConsistencyService mapConsistencyService(String key) {
   
   
        //key以 ephemeral 开头就是临时实例
        // 临时实例选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
        //  持久实例选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }

//初始化方法,
@PostConstruct
public void init() {
   
   
        //把notifier提交给线程池
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

@Override
public void put(String key, Record value) throws NacosException {
   
   
        //把实例保存到本地实例表
        onPut(key, value);
        //使用distro协议同步到集群
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
}

put方法中,会先先根据服务的key判断使用临时同步服务ephemeralConsistencyService ,或者持久同步服务persistentConsistencyService。然后会做2个事情

  • 调用onPut :把实例保存到本地实例列表 。
  • 调用distroProtocol.sync把实例同步到集群

    更新服务列表

    对于onPut 方法中做了2个事情.
  • 一个是把实例封装到Datum对象中,然后交给dataStore存储起来。
  • 另一个是通过notifier.addTask 把key放入阻塞队列,然后会通过线程池异步去执行阻塞队列
    ```java
    public void onPut(String key, Record value) {

      //判断是否是临时实例
      if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
          Datum<Instances> datum = new Datum<>();
          datum.value = (Instances) value;
          datum.key = key;
          datum.timestamp.incrementAndGet();
          //把数据存储到dataStore,内部维护了一个Map
          dataStore.put(key, datum);
      }
    
      if (!listeners.containsKey(key)) {
          return;
      }
      //这里是把key放入一个阻塞队列,然后会用线程池异步去执行队列
      notifier.addTask(key, DataOperation.CHANGE);
    

    }

    public class Notifier implements Runnable {

      private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
    
      //一个阻塞队列
      private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
      public void addTask(String datumKey, DataOperation action) {
    
          if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
              return;
          }
          if (action == DataOperation.CHANGE) {
              //如果是change,就把key放入一个map中
              services.put(datumKey, StringUtils.EMPTY);
          }
          //加入阻塞队列
          tasks.offer(Pair.with(datumKey, action));
      }
    
       @Override
       public void run() {
          Loggers.DISTRO.info("distro notifier started");
    
          for (; ; ) {
              try {
                  //从阻塞队列中取出任务
                  Pair<String, DataOperation> pair = tasks.take();
                  //处理任务更新服务列表
                  handle(pair);
              } catch (Throwable e) {
                  Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
              }
          }
      }
    
Notifier是一个Runnable,其中维护了一个tasks(ArrayBlockingQueue)用来存储服务列表的变更事件。他的run方法中是一个死循环,不停的从阻塞队列中取出任务交给handle方法去处理。下面是 DistroConsistencyServiceImpl.Notifier#handle方法
```java
private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();

                services.remove(datumKey);

                int count = 0;

                ConcurrentLinkedQueue<RecordListener> recordListeners = listeners.get(datumKey);
                if (recordListeners == null) {
                    Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey);
                    return;
                }
                //拿到有change的service,RecordListener 就是 service的接口
                for (RecordListener listener : recordListeners) {

                    count++;

                    try {
                        //如果是change事件
                        if (action == DataOperation.CHANGE) {
                            //取出服务
                            Datum datum = dataStore.get(datumKey);
                            if (datum != null) {
                                //执行linster的change事件。更新服务列表
                                listener.onChange(datumKey, datum.value);
                            } else {
                                Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey);
                            }
                            continue;
                        }
                        //处理服务的delete事件
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }

handle方法中会找到有变化的RecordListener,其实就是service(change 或者 delete事件)然后,触发onChange方法,其实就是调用 com.alibaba.nacos.naming.core.Service#onChange方法。

 public void onChange(String key, Instances value) throws Exception {
   
   

        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        //遍历service中的所有实例instance
        for (Instance instance : value.getInstanceList()) {
   
   

            if (instance == null) {
   
   
                // Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }

            if (instance.getWeight() > 10000.0D) {
   
   
                //设置权重
                instance.setWeight(10000.0D);
            }

            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
   
   
                instance.setWeight(0.01D);
            }
        }
        //修改IP
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

        recalculateChecksum();
    }

该方法中会调用updateIPS去更新服务实例,源码如下

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
   
   
    // 准备一个HashMap,key是cluster,值是集群下的Instance集合
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    // 获取集群名称存储到map中,key是集群名
    for (String clusterName : clusterMap.keySet()) {
   
   
        ipMap.put(clusterName, new ArrayList<>());
    }
    // 遍历要更新的实例
    for (Instance instance : instances) {
   
   
        try {
   
   
            if (instance == null) {
   
   
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            // 判断实例是否包含clusterName,没有的话用默认cluster
            if (StringUtils.isEmpty(instance.getClusterName())) {
   
   
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            // 判断cluster是否存在,不存在则创建新的cluster
            if (!clusterMap.containsKey(instance.getClusterName())) {
   
   
                Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                          instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            // 获取当前cluster实例的集合,不存在则创建新的
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
   
   
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            // 添加新的实例到 Instance 集合
            clusterIPs.add(instance);
        } catch (Exception e) {
   
   
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }

    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
   
   
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // 这里就是在更新注册表
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    //设置最后修改时间
    setLastModifiedMillis(System.currentTimeMillis());
    // 发布服务变更的通知消息
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();

    for (Instance instance : allIPs()) {
   
   
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }

    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                         stringBuilder.toString());

}

上面代码中 ,clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); 就是 在更新服务注册表,因为service#clusterMap 是一个Map 结构,cluster中就是服务实例。然后会调用 .Cluster#updateIps去更新实例。源码如下

public void updateIps(List<Instance> ips, boolean ephemeral) {
   
   

    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    //拿到旧的服务列表,
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());

    for (Instance ip : toUpdateInstances) {
   
   
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    ...省略部分代码...

    // 检查新加入实例的状态
    List<Instance> newIPs = subtract(ips, oldIpMap.values());

    if (newIPs.size() > 0) {
   
   
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                  getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
   
   
                //重置服务的健康状态
            HealthCheckStatus.reset(ip);
        }
    }
    // 移除要删除的实例
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
   
   
        Loggers.EVT_LOG
            .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                  getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
   
   
            //移除
            HealthCheckStatus.remv(ip);
        }
    }

    toUpdateInstances = new HashSet<>(ips);

    if (ephemeral) {
   
   
    // 直接覆盖旧实例列表
        ephemeralInstances = toUpdateInstances;
    } else {
   
   
        persistentInstances = toUpdateInstances;
    }
}

同步服务到集群

接下来回到 DistroConsistencyServiceImpl#put方法中。刚才说到该方法做了2个事情

  • onPut(key, value) : 更新服务列表
  • distroProtocol.sync :同步服务到集群

我们现在来看一下sync方法是怎么做的,下面是方法的源码

/**
     * 开始同步数据到所有的远程服务
     * Start to sync data to all remote server.
     *
     * @param distroKey distro key of sync data
     * @param action    the action of data operation
     */
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
   
   
        //拿到除开自己以外的所有nacos集群中的成员
        for (Member each : memberManager.allMembersWithoutSelf()) {
   
   
            //构建一个key
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            //构建一个延迟任务对象
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            //交给线程池去执行,维护了一个DistroDelayTaskExecuteEngine
            //任务交给 NacosDelayTaskExecuteEngine 引擎 其中维护了一个ScheduledExecutorService线程池
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
   
   
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }

该方法中会找到所有的nacos集群的成员(除开自己),然后会拿到服务的key(DistroKey )构建一个DistroDelayTask任务对象,交给线程池去执行同步。

这里维护了一个 DelayTaskExecuteEngine 延迟任务执行引擎NacosDelayTaskExecuteEngine,任务的执行通过引擎的 processTasks方法完成com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks

protected void processTasks() {
   
   
        //拿到所有任务
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
   
   
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
   
   
                continue;
            }
            //任务执行器
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
   
   
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
   
   
                // ReAdd task if process failed
                //执行任务,任务失败会重试
                if (!processor.process(task)) {
   
   
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
   
   
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                //重试失败的任务
                retryFailedTask(taskKey, task);
            }
        }
    }

总结

文章有点长,下面做个总结,从大的流程上来说分为如下几个步骤

  1. instanceController接口: nacos服务点接受到注册请求后会把请求解析为Instance,紧接着会执行serviceManager#registerInstance方法注册实例
  2. serviceManager#registerInstance方法中会先尝试创建Service对象,并缓存到一个Map> 结构的服务注册表中,然后对每个service做初始化,主要是使用线程池10s一次检查服务是否健康状态,过期的服务会删除掉。
  3. serviceManager#registerInstance第二个事情就是执行addInstances方法添加实例,该方法会触发服务列表的更新以及把服务同步到其他nacos集群中。

在这里插入图片描述
文章到这里就结束了,如果文章对你有所帮助,请给个好评,你的鼓励是我最大的动力

相关文章
|
2月前
|
缓存 安全 Nacos
nacos常见问题之服务一直在报token expired!如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
97 0
|
2月前
|
网络协议 Java Nacos
nacos常见问题之在web界面 上下线服务时报错 400如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
39 0
|
2月前
|
缓存 PHP Nacos
nacos常见问题之服务升级后nacos控制台看到都是不可用重启nacos后恢复如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
45 4
|
5天前
|
安全 Linux Nacos
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
16 0
|
18天前
|
负载均衡 Cloud Native Java
Nacos 注册中心(2023旧笔记)
Nacos 注册中心(2023旧笔记)
18 0
|
1月前
|
Dubbo Java 应用服务中间件
深度剖析:Dubbo使用Nacos注册中心的坑
2020年笔者在做微服务部件升级时,Dubbo的注册中心从Zookeeper切换到Nacos碰到个问题,最近刷Github又有网友提到类似的问题,就在这篇文章里做个梳理和总结。
深度剖析:Dubbo使用Nacos注册中心的坑
|
1月前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
1月前
|
Dubbo Java 应用服务中间件
双活工作下的数据迁移:Nacos注册中心实战解析
这篇内容介绍了如何使用NacosSync组件进行双活项目中的注册中心数据迁移。首先,准备包括64位OS、JDK 1.8+、Maven 3.2+和MySQL 5.6+的环境。接着,获取并解压NacosSync安装包,配置数据库连接,启动服务,并通过访问特定URL检查系统状态。然后,通过NacosSync控制台进行集群配置,添加Zookeeper和Nacos集群,并设置同步任务。当数据同步完成后,Dubbo客户端(Consumer和Provider)更新配置以连接Nacos注册中心。最后,迁移完成后,原有的Zookeeper集群可下线,整个过程确保了服务的平滑迁移。
40 1
|
2月前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
64 0
|
2月前
|
关系型数据库 MySQL Nacos
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
【深入浅出Nacos原理及调优】「实战开发专题」采用Docker容器进行部署和搭建Nacos服务以及“坑点”
57 1