Nacos 服务端注册、客户端服务发现源码分析(二)

简介: Nacos 服务端注册、客户端服务发现源码分析(二)

Nacos 服务端服务注册源码分析

通过上节分析:Nacos 服务注册概述及客户端注册实例源码分析(一)

清晰地了解:客户端在注册服务时实际上是调用的 NamingService#registerInstance 方法来完成实例的注册,而且在最后知道 Nacos 2.0 以前的版本通过 Http 方式进行服务注册就是调用的接口:/nacos/v1/ns/intance

下图摘自官网的架构图

在这里就能分析出我们要找的接口就在 NamingService 这个服务中,从源码角度来看,其实就是存放在 naming 这个子模块上的,而且该模块实际上就是实现服务注册的

服务端调用接口

从「naming」模块中找到 controller,所有的接口都是在其中的,从这些 Controller 中可以很明显的看到一个 InstanceController,很明显它和注册实例的逻辑相关.

InstanceController 类上的 @RequestMapping 注解值就是我们访问的注册接口,前面提到过的官网 OpenAPI 调用开放接口实现注册

/v1/ns/instance
UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT

下面查看 RESTFUL API 接口 POST 请求类型的方法 register,在这个方法中实际上就是接受用户请求,把收到的信息进行解析,还原成 Instance,然后调用 registerInstance 方法完成注册,这个方法才是服务端注册的核心,源码如下:

@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
  final String namespaceId = WebUtils
    .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
  NamingUtils.checkServiceNameFormat(serviceName);
  final Instance instance = HttpRequestInstanceBuilder.newBuilder()
    .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
  // 区分不同协议获取到操作类
  getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
  NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
                                                           false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
                                                           instance.getIp(), instance.getPort()));
  return "ok";
}

主要注意一下这个方法:getInstanceOperator().registerInstance(namespaceId, serviceName, instance)

其中的 getInstanceOperator():判断是否采用 Grpc 协议,很明显这个位置走的是 instanceServiceV2

private InstanceOperator getInstanceOperator() {
  return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

服务注册

instanceServiceV2#registerInstance

实际上 instanceServiceV2 就是 InstanceOperatorClientImpl,所以我们来看这里的 registerInstance 方法

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
  // 判断是否为瞬时对象(临时客户端)
  boolean ephemeral = instance.isEphemeral();
  // 获取客户端 ID
  String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
  // 通过客户端 ID 创建客户端连接
  createIpPortClientIfAbsent(clientId);
  // 获取服务
  Service service = getService(namespaceId, serviceName, ephemeral);
  // 具体注册服务
  clientOperationService.registerInstance(service, instance, clientId);
}

在这里我们要分析一些细节,在 Nacos2.0 以后新增 Client 模型,一个客户端 gRPC 长连接对应一个 client,每个 client 有自己唯一的 id(clientId),Client 负责管理一个客户端的服务注册实例 Publish 和服务订阅 Subscribe,我们可以看一下这个模型其实就是一个接口

public interface Client {
    // 客户端ID/gRPC connectionId
    String getClientId();
    // 是否为临时客户端
    boolean isEphemeral();
    // 客户端更新时间
    void setLastUpdatedTime();
    long getLastUpdatedTime();
    // 服务实例注册/注销/查询
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    InstancePublishInfo removeServiceInstance(Service service);
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();
    // 服务订阅/取消订阅/查询订阅
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    boolean removeServiceSubscriber(Service service);
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步给其他节点 client 数据
    ClientSyncData generateSyncData();
    // 是否过期
    boolean isExpire(long currentTime);
    // 释放资源
    void release();
}

EphemeralClientOperationServiceImpl#registerInstance

EphemeralClientOperationServiceImpl:实际负责处理服务注册,看具体的源码

public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
  NamingUtils.checkInstanceIsLegal(instance);
  // 确保 Service 单例存在
  Service singleton = ServiceManager.getInstance().getSingleton(service);
  if (!singleton.isEphemeral()) {
    throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                                                  singleton.getGroupedServiceName()));
  }
  // 通过 clientId 找到对应的客户端
  Client client = clientManager.getClient(clientId);
  if (!clientIsLegal(client, clientId)) {
    return;
  }
  // 客户端 Instance 模型,转换为服务端 Instance 模型
  InstancePublishInfo instanceInfo = getPublishInfo(instance);
  // 将 Instance 存储到 Client 里
  client.addServiceInstance(singleton, instanceInfo);
  client.setLastUpdatedTime();
  // 建立 Service 与 ClientId 关系
  NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
  NotifyCenter
    .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

ServiceManager

Service 的容器是 ServiceManager,但是在 com.alibaba.nacos.naming.core.v2 包下,容器中 的 Service 都是单例

public class ServiceManager {
    private static final ServiceManager INSTANCE = new ServiceManager();
    // 单例 Service,可以查看 Service equals、hashCode 方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
  // namespace 下所有的 service   
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
}

从这个位置可以看出,当调用这个注册方法时 ServiceManager 负责管理 Service 单例

public Service getSingleton(Service service) {
  // 通过 ConcurrentHashMap 来存储单例的 Service
  singletonRepository.putIfAbsent(service, service);
  Service result = singletonRepository.get(service);
  namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
  namespaceSingletonMaps.get(result.getNamespace()).add(result);
  return result;
}

ClientManager

它是一个接口,看它对应的实现类 ConnectionBasedClientManager,这个实现类负责管理长连接 clientId 与 Client 模型的映射关系

// 通过 clientId 查询 Client
public Client getClient(String clientId) {
  return clients.get(clientId);
}

Client 具体实例 AbstractClient

负责存储当前客户端的服务注册表,即 Service 与 Instance 关系;注意:对于单个客户端来说,同一个服务只能注册一个实例

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
  if (null == publishers.put(service, instancePublishInfo)) {
    if (instancePublishInfo instanceof BatchInstancePublishInfo) {
      MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
    } else {
      MetricsMonitor.incrementInstanceCount();
    }
  }
  NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
  Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
  return true;
}

ClientOperationEvent.ClientRegisterServiceEvent

当前这里的目的是为了过滤目标服务得到最终的 Instance 列表建立 Service 与 Client 关系,建立 Service 与 Client 关系就是为了加速查询

发布 ClientRegisterServiceEvent 事件,ClientServiceIndexesManager 负责监听,ClientServiceIndexesManager 维护了两个索引:

  • Service 与发布 clientId
  • Service 与订阅 clientId
private void handleClientOperation(ClientOperationEvent event) {
  Service service = event.getService();
  String clientId = event.getClientId();
  if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
    addPublisherIndexes(service, clientId);
  } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
    removePublisherIndexes(service, clientId);
  } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
    addSubscriberIndexes(service, clientId);
  } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
    removeSubscriberIndexes(service, clientId);
  }
}
// 建立 Service 与发布 clientId 关系
private void addPublisherIndexes(Service service, String clientId) {
  publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
  publisherIndexes.get(service).add(clientId);
  NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}

这个索引关系建立好以后,还会触发 ServiceChangedEvent,代表服务注册表变更;对于注册表变更,后面还要做两件事情「1、通知订阅客户端;2、Nacos 集群数据同步」这部分源码剖析后续文章分享出来

Nacos 客户端服务发现源码分析

Nacos 客户端的服务发现,其实就是封装参数、调用服务接口、获得返回实例列表

Nacos 客户端—>NamingService—>封装参数—>Nacos Server

如果细化这个流程,会发现不仅包括了通过 NamingService 获取服务列表,在获取服务列表的过程中还涉及到通信流程协议(Http、gRPC)、订阅流程、故障转移流程等,下面再详细的捋一捋

总体流程

入口「NamingTest」可以看到

public class NamingTest {
    @Test
    public void testServiceList() throws Exception {
    ......
        NamingService namingService = NacosFactory.createNamingService(properties);
        namingService.registerInstance("nacos.test.1", instance);
        ThreadUtils.sleep(5000L);
        List<Instance> list = namingService.getAllInstances("nacos.test.1");
        System.out.println(list);
    }
}

在这里我们主要关注 getAllIntances 方法,那么就需要看一下这个方法的具体操作,当然这其中需要经过一系列的重载方法调用

其实这里的方法比入口多出了几个参数,这里不仅有服务名称,还有分组名、集群列表、是否订阅,重载方法中的其他参数已经在各种重载方法的调用过程中设置了默认值,比如:

  • 分组名称默认:DEFAULT_GROUP
  • 集群列表:默认为空数组
  • 是否订阅:订阅
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
                                      boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    String clusterString = StringUtils.join(clusters, ",");
    // 是否是订阅模式
    if (subscribe) {
        // 先从客户端缓存获取服务信息
        serviceInfo = serviceInfoHolder.getServiceInfo(serviceName,  , clusterString);
        if (null == serviceInfo) {
            // 如果本地缓存不存在服务信息,则进行订阅
            serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
        }
    } else {
        // 如果未订阅服务信息,则直接从服务器进行查询
        serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
    }
    // 从服务信息中获取实例列表
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

此方法的流程图

流程基本逻辑:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从其中获取到了实例列表信息,这是因为订阅机制会自动同步服务器实例的变化到本地,如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获取服务信息
  • 如果是非订阅模式,那就直接请求服务器端,获取服务信息

订阅处理流程

在刚才的流程中,涉及到了订阅逻辑,入口代码为获取实例列表中的方法

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);

以下是具体的分析,首先这里的 clientProxy 是 NamingClientProxy 类对象,对应的实现类 NamingClientProxyDelegate,对应 subscribe 实现如下:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
  NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
  String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
  String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
  // 定时调度 UpdateTask
  serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
  // 获取缓存中的 ServiceInfo
  ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
    // 如果为 null,则进行订阅逻辑处理,基于 gRPC 协议
    result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
  }
  // ServiceInfo 本地缓存处理
  serviceInfoHolder.processServiceInfo(result);
  return result;
}

在这段代码中,可以看到在获取服务实例列表时(特别是首次)也进行了订阅逻辑的扩展,基本流程图如下:

  • 订阅方法时,首先开启定时任务,这个定时任务的主要作用:用来定时同步服务端的实例信息,并进行本地缓存更新等操作,但是如果是首次的话将会直接返回来走下一步
  • 判断本地缓存是否存在,如果本地缓存存在 ServiceInfo 信息,则直接返回;如果不存在,则默认采用 gRPC 协议进行订阅,并返回 ServiceInfo
  • grpcClientProxy#subscribe 订阅方法就是直接向服务器发送了一个订阅请求,并返回结果
  • 最后,ServiceInfo 本地缓存处理;这里会将获得的最新 ServiceInfo 与本地内存中的 ServiceInfo 进行比较、更新、发布变更时间,磁盘文件存储等操作;其实,这一步的操作,在订阅定时任务中也进行了处理

结尾

至此「Nacos 服务端注册、客户端服务发现源码分析」分析到这里,基本只需要掌握大致的脉路即可

欢迎大家在评论框分享您的看法,喜欢该文章帮忙给个赞👍和收藏,感谢!!

分享个人学习源码的几部曲

  • 设计模式掌握为前提,程序员的内功修炼法,🙅不分语言
  • 不要太追究于细节,捋清大致脉路即可;太过于追究于细节,你会越捋越乱
  • 关注重要的类和方法、核心逻辑
  • 掌握 Debug 技巧,在关键的类和方法多停留,多作分析和记录

更多技术文章可以查看:vnjohn 个人博客


目录
相关文章
|
14天前
|
SpringCloudAlibaba 负载均衡 Java
【微服务 SpringCloudAlibaba】实用篇 · Nacos注册中心
【微服务 SpringCloudAlibaba】实用篇 · Nacos注册中心
35 3
|
14天前
|
安全 Linux Nacos
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
如何使用公网地址远程访问内网Nacos UI界面查看注册服务
38 0
|
2天前
|
存储 Nacos 数据安全/隐私保护
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
【SpringCloud】Nacos的安装、Nacos注册、Nacos服务多级存储模型
16 1
|
14天前
|
负载均衡 Cloud Native Java
Nacos 注册中心(2023旧笔记)
Nacos 注册中心(2023旧笔记)
21 0
|
14天前
|
Dubbo Java 应用服务中间件
深度剖析:Dubbo使用Nacos注册中心的坑
2020年笔者在做微服务部件升级时,Dubbo的注册中心从Zookeeper切换到Nacos碰到个问题,最近刷Github又有网友提到类似的问题,就在这篇文章里做个梳理和总结。
深度剖析:Dubbo使用Nacos注册中心的坑
|
14天前
|
SpringCloudAlibaba Java Nacos
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
SpringCloud Alibaba微服务 -- Nacos使用以及注册中心和配置中心的应用(保姆级)
|
14天前
|
Dubbo Java 应用服务中间件
双活工作下的数据迁移:Nacos注册中心实战解析
这篇内容介绍了如何使用NacosSync组件进行双活项目中的注册中心数据迁移。首先,准备包括64位OS、JDK 1.8+、Maven 3.2+和MySQL 5.6+的环境。接着,获取并解压NacosSync安装包,配置数据库连接,启动服务,并通过访问特定URL检查系统状态。然后,通过NacosSync控制台进行集群配置,添加Zookeeper和Nacos集群,并设置同步任务。当数据同步完成后,Dubbo客户端(Consumer和Provider)更新配置以连接Nacos注册中心。最后,迁移完成后,原有的Zookeeper集群可下线,整个过程确保了服务的平滑迁移。
51 1
|
14天前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
67 0
|
14天前
|
运维 Kubernetes Nacos
nacos常见问题之服务注册IP白名单如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
|
14天前
|
安全 前端开发 Nacos
nacos常见问题之配置注册的白名单如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
164 0

热门文章

最新文章