微服务架构 | *3.5 Nacos 服务注册与发现的源码分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端;Nacos 客户端将自己注册进 Nacos 服务器。《1. 服务如何注册进 Nacos 注册中心》主要从 Nacos 客户端角度解释如何发送信息给 Nacos 服务器;《2. Nacos 服务器注册服务》主要从 Nacos 服务器角度解释注册原理;《3. 客户端查询所有服务实例》将从服务消费者和提供者的角度,解释服务消费者如何获取提供者的所有实例。服务消费者和提供者都是 Nacos 的客户端;

前言

参考资料
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》

为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端;

Nacos 客户端将自己注册进 Nacos 服务器。《1. 服务如何注册进 Nacos 注册中心》主要从 Nacos 客户端角度解释如何发送信息给 Nacos 服务器;《2. Nacos 服务器注册服务》主要从 Nacos 服务器角度解释注册原理;

《3. 客户端查询所有服务实例》将从服务消费者和提供者的角度,解释服务消费者如何获取提供者的所有实例。服务消费者和提供者都是 Nacos 的客户端;

《4. 客户端监听 Nacos 服务器以动态获取服务实例》从消费者客户端角度出发监听 Nacos 服务器,以动态获知提供者的变化;


1. 客户端注册进 Nacos 注册中心(客户端视角)

1.1 Spring Cloud 提供的规范标准

  • 服务要注册进 Spring Cloud 集成的 Nacos,首先要满足 Spring Cloud 提供的规范标准;
  • spring-cloud-common 包中有一个类 org.springframework.cloud.client.serviceregistry.ServiceRegistry,它是Spring Cloud 提供的服务注册的标准。集成到 Spring Cloud 中实现服务注册的组件,都会实现该接口;
public interface ServiceRegistry<R extends Registration>{
    void register(R registration); 
    void deregister(R registration); 
    void close(); 
    void setStatus(R registration,String status);
    <T> T getstatus(R registration);
}

1.2 Nacos 的自动配置类

  • spring-cloud-commons 包的 META-INF/spring.factories 中包含自动装配的配置信息。即约定 Spring Cloud 启动时,会将那些类自动注入到容器中:

Spring Cloud 自动装配

  • 其中 AutoServiceRegistrationAutoConfiguration(服务注册自动配置类) 是服务注册相关配置类,源码如下:
@Configuration
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
    value = {"spring.cloud.service-registry.auto-registration.enabled"},
    matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
    //自动注册类
    @Autowired(required = false)
    private AutoServiceRegistration autoServiceRegistration;
    //自动注册类的配置文件
    @Autowired
    private AutoServiceRegistrationProperties properties;

    public AutoServiceRegistrationAutoConfiguration() {
    }

    //初始化函数
    @PostConstruct
    protected void init() {
        if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
            throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
        }
    }
}
  • 其中关键之处在于注入了一个 AutoServiceRegistration(服务注册器) 自动注册接口,该接口在 Spring Cloud 中有一个抽象实现类 AbstractAutoServiceRegistration(服务注册器抽象类)
  • 我们要用什么注册中心,该注册中心就要继承 AbstractAutoServiceRegistration(服务注册器抽象类) 抽象类;
  • 对于 Nacos 来说,使用 NacosAutoServiceRegistration(Nacos 服务注册器) 类继承 AbstractAutoServiceRegistration(服务注册器抽象类) 抽象类;

NacosAutoServiceRegistration 类依赖结构图

1.3 监听服务初始化事件 AbstractAutoServiceRegistration.bind()

  • 在上述中需要关注 ApplicationListener(事件监听器) 接口,它是一种事件监听机制,接口声明如下:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}
  • AbstractAutoServiceRegistration(服务注册器抽象类) 使用 AbstractAutoServiceRegistration.bind() 方法实现了该接口,用来监听 WebServerInitializedEvent(服务初始化事件)
@Override
@SuppressWarnings("deprecation")
//【断点步入】
public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}
  • 我们给 AbstractAutoServiceRegistration.bind() 方法打上断点,启动服务提供者,可以发现:

    • Nacos 服务器上没有服务注册实例;
    • 服务初始化已经完成;

服务初始化完成

  • 【结论】说明服务注册到 Nacos 的流程是先进行服务初始化,然后通过事件监听机制监听初始化事件。当初始化完成时,调用 AbstractAutoServiceRegistration.bind() 方法将服务注册进 Nacos 注册中心;

1.4 注册服务实例的逻辑 NacosServiceRegistry.register()

这里能说明什么时候服务会将自己的信息发给 Nacos 服务器;
  • 我们追进 AbstractAutoServiceRegistration.bind() 方法,发现在 AbstractAutoServiceRegistration(服务注册器抽象类) 里调用 NacosServiceRegistry.register() 方法后, Nacos 服务器上出现服务实例;

register()方法

  • 我们追进 NacosServiceRegistry.register() 方法,方法的逻辑如下:
@Override
public void register(Registration registration) {
    //判断是否有服务 ID
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
    String serviceId = registration.getServiceId();  //服务 ID(service-provider)
    Instance instance = getNacosInstanceFromRegistration(registration);  //服务实例(里面有 ip、port 等信息)
    try {
        //【断点步入】注册的方法
        namingService.registerInstance(serviceId, instance);
        log.info("nacos registry, {} {}:{} register finished", serviceId,
                instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        log.error("nacos registry, {} register failed...{},", serviceId,
                registration.toString(), e);
    }
}

服务 ID 与服务实例信息

  • 我们点进去 namingService.registerInstance() 方法(实现逻辑在 NacosNamingService.registerInstance())得到注册的具体逻辑,如下:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        //用心跳 BeatInfo 封装服务实例信息
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
        //【断点步入 1.4.1】将 beatInfo 心跳信息放进 beatReactor 心跳发送器(发送心跳后,Nacos 服务器仍然没有服务实例)
        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
    //【断点步入 1.4.2】使用 namingProxy 命名代理方式将服务实例信息以 POST 请求方式发送服务实例
    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
  • 即注册服务实例的逻辑分两步:

    • 先通过 beatReactor.addBeatInfo 创建心跳信息实现健康检测;
    • 再以 POST 请求方式发送服务实例信息;

1.4.1 心跳机制 BeatReactor.addBeatInfo()

  • 我们进入 BeatReactor.addBeatInfo() 方法一探心跳机制,源码如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
    //【核心】定时向服务端发送一个心跳包 beatInfo
    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
    //【核心】
    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
  • BeatReactor.addBeatInfo() 方法主要做了两件事:

    • 客户端调用 ScheduledExecutorService.schedule() 接口方法(靠 ScheduledThreadPoolExecutor(计划线程执行器) 实现)执行定时任务,在每个任务周期内定时向服务端发送一个心跳包 beatInfo;
    • 然后通过 MetricsMonitor.getDom2BeatSizeMonitor() 方法获取一个 心跳测量监视器(实际为 Gauge),不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障;
  • Nacos 服务端会根据客户端的心跳包不断更新服务的状态;

1.4.2 注册服务 NamingProxy.registerService()

  • 接着进入 NamingProxy.registerService() 方法,源码如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
    Map<String, String> params = new HashMap(9);
    params.put("namespaceId", this.namespaceId);
    params.put("serviceName", serviceName);
    params.put("groupName", groupName);
    params.put("clusterName", instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    //【断点步入】这步执行完后,Nacos 服务器才出现服务实例信息
    this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
  • 该方法先对服务实例做了封装,然后通过 NamingProxy.reqAPI() 方法拼凑注册服务的 API。 NamingProxy.reqAPI() 方法源码如下:
    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        } else {
            Exception exception = new Exception();
            if (servers != null && !servers.isEmpty()) {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());

                for(int i = 0; i < servers.size(); ++i) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, server, method);
                    } catch (NacosException var11) {
                        exception = var11;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
                    } catch (Exception var12) {
                        exception = var12;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
                    }
                    index = (index + 1) % servers.size();
                }

                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            } else {
                int i = 0;
                while(i < 3) {
                    try {
                        return this.callServer(api, params, this.nacosDomain);
                    } catch (Exception var13) {
                        exception = var13;
                        LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
                        ++i;
                    }
                }
                throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            }
        }
    }

1.5 以 Open API 方式发送注册请求

  • 最终将以 Open API 方式发送如下请求给 Nacos 服务器:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'

reqAPI() debug

1.6 小结

  • Nacos 在先加载配置文件初始化服务,使用 ApplicationListener(事件监听器) 监听机制监听该服务初始化事件,当服务初始化完成后,进入 NacosServiceRegistry.register() 注册逻辑;
  • 注册原理分两步:

    • 先使用 NacosNamingService(Nacos 命名服务) 发送心跳;
    • 再使用 NamingProxy(命名代理),以 POST 请求方式发送服务实例给 Nacos 服务器;
  • 即 Nacos Service 必须要确保注册的服务实例是健康的(心跳检测),才能进行服务注册;


2. Nacos 服务器注册服务(服务器视角)

  • Nacos 服务器的源码下载、启动详情请见《微服务架构 | 3.2 Alibaba Nacos 注册中心》
  • 上述提到客户端注册服务器的最后一步是向服务器发送如下 Open API 请求:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'
  • 那么服务器的源码分析将从这个请求开始;

2.1 服务器接收请求 InstanceController.register()

  • 服务器在 nacos-naming 模块下 InstanceController(服务实例控制器) 里定义了一个 register 方法用来处理客户端的注册,源码如下:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    //省略其他代码

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //从请求体里解析出 namespaceId 命名空间(本例中是 public)
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //解析 serviceName 服务名(本例中是 DEFAULT_GROUP@@service-provider,实际就是service-provider)
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        //【断点步入】在服务器控制台注册服务实例的方法
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

        //省略其他代码
}

解析命名空间和服务名

2.2 在服务器控制台注册服务实例 ServiceManager.registerInstance()

  • ServiceManager.registerInstance() 方法的源码如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //【断点步入 2.3】创建空服务
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    //添加服务实例
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
  • 这段代码主要做了三个逻辑的内容:

    • createEmptyService():创建空服务(用于在 Nacos 服务器控制台的“服务列表”中展示服务信息),实际上是初始化一个 serviceMap,它是一个 ConcurrentHashMap 集合;
    • getService():从 serviceMap 中根据 namespaceld 和 serviceName 得到一个服务对象;
    • addInstance():把当前注册的服务实例保存到 Service 中;

2.3 创建空服务 ServiceManager.createEmptyService()

  • 我们一直追进去,发现最后调用的是 ServiceManager.createServiceIfAbsent() 方法,源码如下:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
    //通过命名空间 ID 和服务名从缓存中获取 service 服务,第一次是没有的,进入 if 语句创建 service
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //判断服务是否有效,不生效将抛出异常
        service.validate();
        //【断点步入】添加与初始化服务
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
  • 第一次注册服务时缓存中肯定是没有的,我们进入到 if 语句里的 ServiceManager.putServiceAndInit() 方法;
private void putServiceAndInit(Service service) throws NacosException {
    //【断点步入 2.3.1 】将服务添加到缓存
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    //【断点步入 2.3.2 】建立心跳机制
    service.init();
    //【断点步入 2.3.3 】实现数据一致性的监听,将服务数据进行同步
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

2.3.1 将服务添加到缓存 Service.putService()

public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    //将 Service 保存到 serviceMap 中
    serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}

2.3.2 建立心跳机制 Service.init()

public void init() {
    //【断点步入】定时检查心跳
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
  • 进入 HealthCheckReactor.scheduleCheck() 方法;
public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

创建定时任务检查心跳

  • 其主要是通过创建定时任务不断检测当前服务下所有实例最后发送心跳包的时间。如果超时,则设置healthy为false表示服务不健康,并且发送服务变更事件;
  • 心跳检测机制可以用下图概述:

心跳检测机制

2.3.3 实现数据一致性的监听 DelegateConsistencyServiceImpl.listen()

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    
    // 这个键 key 会被两个服务监听
    if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
        //持久一致性服务
        persistentConsistencyService.listen(key, listener);
        //短暂一致性服务
        ephemeralConsistencyService.listen(key, listener);
        return;
    }
    
    mapConsistencyService(key).listen(key, listener);
}

2.4 小结

  • Nacos 客户端通过 Open API 的形式发送服务注册请求,服务端收到请求后,做以下三件事:

    • 构建一个 Service 对象保存到 ConcurrentHashMap 集合中;
    • 使用定时任务对当前服务下的所有实例建立心跳检测机制;
    • 基于数据一致性协议将服务数据进行同步;
  • 至此,基于 Nacos 客户端和服务端的服务注册原理解析可以告一段落了,下面是一些补充内容;


3. 客户端查询所有服务实例

  • 在进行 RPC 调用时,根据对服务的需求不同可以将 Nacos 客户端分为两种角色,分别是服务消费者和服务提供者;
  • 服务消费者需要使用提供者的服务,就先要向 Nacos 服务器获取所有已注册的服务提供者实例,第 3 点将讨论客户端(消费者)如何获取这些实例(提供者);

3.1 消费者客户端向 Nacos 发出请求

  • 与客户端向 Nacos 服务器注册服务一样,消费者客户端想要查询提供者的所有实例,也要向 Nacos 服务器发送请求;
  • 向 Nacos 服务器发送请求也是由两种形式,Open API 和 SDK。其中 SDK 最终也是以 Open API 方式发送的:

    • Open APIGET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider
    • SDKList<Instance> selectInstances(String serviceName, boolean healthy)throws NacosException;
  • 我们使用 postman 发送 GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider 请求,来模拟消费者客户端获取提供者实例;

发送 postman 请求所有提供者服务实例

  • 这里主要多发送几次请求,因为 Nacos 服务器可能缓存了所有提供者实例信息,可能直接从缓存中拿而不是处理请求;

3.2 Nacos 服务器处理请求 InstanceController.list()

  • Nacos 服务器处理获取所有服务实例的控制器还是 nacos-naming 模块下的 InstanceController(服务实例控制器)
  • InstanceController(服务实例控制器) 提供了一个 InstanceController.list() 接口处理该请求:

Nacos 服务器处理获取服务实例的请求

  • 可以看到我们的断点也被捕获;
  • 源码如下:
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    
    //解析命名空间 ID
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //解析服务名
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    //解析获取一堆信息
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    //【断点步入 3.2】获取所有服务的所有信息
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}

3.2 获取所有服务的所有信息 InstanceController.doSrvIpxt()

  • InstanceController.doSrvIpxt() 方法很长,笔者删去一些非重点源码方便理解:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    //省略一些非核心代码
    
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //根据 namespaceld、serviceName 获得 Service 实例
    Service service = serviceManager.getService(namespaceId, serviceName);
    List<Instance> srvedIPs;
    
    //获取指定服务下的所有实例,从Service实例中基于srvIPs得到所有服务提供者的实例信息
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    //使用选择器过滤一些 ip
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    
    //过滤不健康实例
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    //遍历完成 JSON 字符串的封装
    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();
        if (healthyOnly && !entry.getKey()) {
            continue;
        }
        for (Instance instance : ips) {
            //删除不可用的实例
            if (!instance.isEnabled()) {
                continue;
            }
            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }
            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);
        }
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}
  • 该方法的主要逻辑主要有三步:

    • 首先获取所有服务实例;
    • 过滤一些服务实例;
    • 遍历完成 JSON 字符串的封装并返回;


4. 客户端监听 Nacos 服务器以动态获取服务实例

  • 消费者客户端角度出发监听 Nacos 服务器,以动态获知提供者的变化;

4.1 客户端发送请求

  • 客户端服务有两种调用方式:

    • void subscribe(String serviceName, EventListener listener) throws NacosException
    • public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe),其中 subscribe 置为 true;

4.2 服务动态感知的原理

  • Nacos 客户端有一个 HostReactor 类,它的功能是实现服务的动态更新;
  • 客户端发起事件订阅后,在 HostReactor 中有一个 UpdateTask 线程,每 10s 发送一次 Pull 请求,获得服务端最新的地址列表;
  • 对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个 Push 消息给 Nacos 客户端,也就是服务消费者;
  • 服务消费者收到请求之后,使用 HostReactor 中提供的 processServiceJSON 解析消息,并更新本地服务地址列表;
  • 原理可以用下图总结:

服务动态感知的原理

4.3 Nacos 服务器处理请求

  • 服务器与本篇第 3 点相同,其中服务器的逻辑在《3.2 Nacos 服务器处理请求》;


5. 补充内容

5.1 Dubbo 的自动装配

  • Spring Cloud Alibaba Dubbo 集成 Nacos 时,服务的注册是依托 Dubbo 中的自动装配机制完成的;
  • spring-cloud-alibaba-dubbo 下的 META-INF/spring.factories 文件中自动装配了一个和服务注册相关的配置类 DubboServiceRegistrationNonWebApplicationAutoConfiguration(Dubbo 注册自动配置类)

Dubbo 的自动配置文件

@Configuration
@ConditionalOnNotWebApplication
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(DubboServiceRegistrationAutoConfiguration.class)
@Aspect
public class DubboServiceRegistrationNonWebApplicationAutoConfiguration {

    @Autowired
    private ServiceRegistry serviceRegistry; //实现类为 NacosServiceRegistry
    ...

    //监听 ApplicationStartedEvent 事件,该事件在刷新上下文之后,调用 application 命令前触发;
    @EventListener(ApplicationStartedEvent.class)
    public void onApplicationStarted() {
        setServerPort();
        //最终调用 NacosServiceRegistry.registry 方法实现服务注册
        register();
    }
    private void register() {
        if (registered) {
            return;
        }
        //这里即 NacosServiceRegistry.registry()
        serviceRegistry.register(registration);
        registered = true;
    }
    ...
}
  • NacosServiceRegistry.registry() 的原理详情请见本篇《1.4 注册服务实例的逻辑 NacosServiceRegistry.register()》;
  • 由此可以得出结论

    • Dubbo 集成 Nacos 时,服务的注册是依赖 Dubbo 的自动装配机制;
    • 而 Dubbo 的自动装配机制是依靠 NacosServiceRegistry.register() 实现的;


6. 源码结构图总结

源码结构图大致与本篇目录一致;

6.1 客户端视角下的服务注册结构图

  • AbstractAutoServiceRegistration.bind():监听服务初始化事件;

    • NacosServiceRegistry.register():注册服务实例;

      • NacosNamingService.registerInstance():通过Nacos 命名服务注册服务实例;

        • BeatReactor.addBeatInfo():心跳机制;

          • ScheduledThreadPoolExecutor.schedule():执行定时任务,发送心跳包 beatInfo;
          • MetricsMonitor.getDom2BeatSizeMonitor():启动心跳测量监视器;
        • NamingProxy.registerService():以 Open API 方式发送注册请求;

          • NamingProxy.reqAPI():拼凑注册服务的 API;

            • NamingProxy.callServer():呼叫服务,发送请求;

6.2 服务器视角下的服务注册结构图

  • InstanceController.register():服务器接收请求;

    • ServiceManager.registerInstance():在服务器控制台注册服务实例;

      • ServiceManager.createEmptyService():创建空服务;

        • ServiceManager.createServiceIfAbsent():如果空缺就创建服务;

          • ServiceManager.putServiceAndInit():初始化服务;

            • Service.putService():将服务添加到缓存;
            • Service.init():建立心跳机制;
            • DelegateConsistencyServiceImpl.listen():实现数据一致性的监听;
      • ServiceManager.addInstance():添加服务实例;

        • KeyBuilder.buildInstanceListKey():创建服务是一致性键;
        • DelegateConsistencyServiceImpl.put():将键和服务实例放 ConsistencyService 进行一致性维护;

6.3 客户端查询所有服务实例结构图

  • InstanceController.list():服务器接收请求;

    • InstanceController.doSrvIpxt():获取所有服务的所有信息;

      • ServiceManager.getService():根据 namespaceld、serviceName 获得 service 实例;
      • Service.srvIPs():获取指定 service 服务下的所有实例;
      • JacksonUtils.createEmptyArrayNode():遍历完成 JSON 字符串的封装;


相关文章
|
23小时前
|
Kubernetes API 开发者
构建高效微服务架构:后端开发的新范式
【5月更文挑战第2天】 随着现代软件开发的演进,传统的单体应用已难以满足快速变化的业务需求和敏捷开发的挑战。本文探讨了如何通过构建高效的微服务架构来提升后端开发的灵活性、可维护性和扩展性。我们将深入分析微服务的核心组件,包括服务拆分、容器化、API网关和持续集成/持续部署(CI/CD)等关键技术,并讨论它们如何共同作用以支持复杂的业务场景和云原生应用的需求。
6 1
|
23小时前
|
负载均衡 Java API
构建高效微服务架构:API网关与服务熔断策略
【5月更文挑战第2天】 在微服务架构中,确保系统的高可用性与灵活性是至关重要的。本文将深入探讨如何通过实施有效的API网关和设计合理的服务熔断机制来提升分布式系统的鲁棒性。我们将分析API网关的核心职责,包括请求路由、负载均衡、认证授权以及限流控制,并讨论如何利用熔断器模式防止故障传播,维护系统的整体稳定性。文章还将介绍一些实用的技术和工具,如Netflix Zuul、Spring Cloud Gateway以及Hystrix,以帮助开发者构建一个可靠且高效的微服务环境。
|
2天前
|
监控 安全 开发者
构建高效可靠的微服务架构:后端开发的新范式
【4月更文挑战第30天】随着现代软件开发的复杂性日益增加,传统的单体应用架构已难以满足快速迭代与灵活部署的需求。微服务架构作为一种新兴的设计理念,它通过将一个大型应用程序拆分成一系列小而专注的服务来提供解决方案。本文旨在探讨如何构建一个高效且可靠的微服务架构系统,涵盖从设计原则、技术选型到部署实践的全方位知识,为后端开发者提供一种全新的开发思路和实践指导。
|
2天前
|
Java 调度 开发者
构建高效微服务架构:后端开发的新趋势深入理解操作系统之进程调度策略
【4月更文挑战第30天】 随着企业数字化转型的不断深入,传统的单体应用逐渐不能满足快速迭代和灵活部署的需求。微服务架构以其高度模块化、独立部署和易于扩展的特性,成为现代后端开发的重要趋势。本文将探讨如何构建一个高效的微服务架构,包括关键的设计原则、技术选型以及可能面临的挑战。
|
2天前
|
Cloud Native Devops 持续交付
构建未来:云原生架构在企业数字化转型中的关键作用构建高效微服务架构:后端开发的新范式
【4月更文挑战第30天】 随着企业加速其数字化进程,云原生架构已成为支撑复杂、可伸缩和灵活应用的骨干。本文探讨了云原生技术的崛起,重点分析了其在促进业务敏捷性、提高运营效率及推动创新方面的核心价值。通过深入剖析云原生生态系统的关键技术组件,如容器化、微服务、持续集成/持续部署(CI/CD)和DevOps实践,揭示了企业如何利用这些技术来构建和维护高度可用且动态的IT环境。文章还提出了一个多维度的采纳框架,帮助企业评估和实施云原生解决方案,以实现真正的业务价值。 【4月更文挑战第30天】在现代软件开发的快速演变中,微服务架构已经成为一种领先的设计模式,用于构建可扩展、灵活且容错的应用程序。与传
|
2天前
|
消息中间件 监控 负载均衡
构建高效微服务架构:后端开发的新范式
【4月更文挑战第30天】 在现代软件开发的浪潮中,微服务架构已成为一种广泛采用的设计模式。它通过将大型应用程序拆分成一组小型、松散耦合的服务来增强系统的可维护性、可扩展性和敏捷性。本文将探讨如何构建一个高效的微服务架构,包括关键的设计原则、技术选型、以及实现过程中的最佳实践。我们将深入讨论微服务间的通信机制、数据一致性问题、服务发现与负载均衡策略,以及如何确保系统的安全性和监控。
|
2天前
|
运维 监控 数据可视化
探索微服务架构下的系统监控策略
【4月更文挑战第30天】 在当今快速迭代和持续部署盛行的软件发展环境中,微服务架构以其灵活性、可扩展性成为众多企业的首选。然而,随着服务的细分与增多,传统的监控手段已不足以应对复杂多变的系统状态。本文将深入探讨在微服务架构中实施有效系统监控的策略,包括指标的选择、数据的收集与处理,以及监控信息的可视化等方面。通过分析现有问题,并提出切实可行的解决方案,旨在帮助开发者构建更健壮、更易于管理的微服务系统。
|
2天前
|
机器学习/深度学习 安全 网络安全
数字堡垒的构筑者:网络安全与信息安全的深层剖析构建高效微服务架构:后端开发的新趋势
【4月更文挑战第30天】在信息技术高速发展的今天,构建坚不可摧的数字堡垒已成为个人、企业乃至国家安全的重要组成部分。本文深入探讨网络安全漏洞的本质、加密技术的进展以及提升安全意识的必要性,旨在为读者提供全面的网络安全与信息安全知识框架。通过对网络攻防技术的解析和案例研究,我们揭示了防御策略的关键点,并强调了持续教育在塑造安全文化中的作用。
|
2天前
|
缓存 监控 API
构建高效微服务架构:后端开发的新范式
【4月更文挑战第30天】 随着现代软件开发的演进,传统的单体应用逐渐向微服务架构转变。本文将深入探讨微服务的核心概念、优势以及在设计高效后端系统时所面临的挑战。通过实例分析与最佳实践的结合,我们将揭示如何优化微服务的性能,保证系统的可扩展性、可维护性和安全性。
|
2天前
|
存储 运维 负载均衡
探索微服务架构下的服务治理
【4月更文挑战第30天】 在当今软件开发领域,微服务架构已经成为了解决复杂系统问题的重要技术手段。随着微服务的广泛应用,如何有效管理与治理这些分散的服务成为了开发和维护的关键。本文将探讨在微服务架构下,实现高效服务治理的策略与实践,重点分析服务发现、配置管理、负载均衡和故障处理等核心要素,旨在为读者提供一套系统的服务治理思路。