Nacos 客户端/服务端同步集群数据源码分析(五)

简介: Nacos 客户端/服务端同步集群数据源码分析(五)

2023 年农年第一篇文章,学习无止境,分享技术博客无比乐趣,记录自己的学习生涯!!!

Nacos 同步集群数据

当我们有服务进行注册以后,会写入注册信息同时会触发 ClientChangedEvent 事件,通过这个事件,就开始进行了 Nacos 集群数据的同步,当然这其中只有一个 Nacos 节点来处理对应的客户端请求

在整个处理过程中,涉及到一个负责节点非负责节点

负责节点

首先可以看到的是处理这个事件的 DistroClientDataProcessor「客户端数据一致性处理器」类型,这个类型会处理当前节点负责的 client,接下来看看该类下的 syncToAllServer 方法

// DistroClientDataProcessor#onEvent->syncToAllServer
private void syncToAllServer(ClientEvent event) {
  Client client = event.getClient();
  // 只有临时数据通过 Distro 同步,持久化数据应该通过 raft(分布式一致式协议)进行同步
  // Only ephemeral data sync by Distro, persist client should sync by raft.
  // 判断客户端是否为空、是否是临时实例、是否为负责节点
  if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
    return;
  }
  if (event instanceof ClientEvent.ClientDisconnectEvent) {
    DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
    distroProtocol.sync(distroKey, DataOperation.DELETE);
  } else if (event instanceof ClientEvent.ClientChangedEvent) {
    // 客户端新增/更改
    DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
    distroProtocol.sync(distroKey, DataOperation.CHANGE);
  }
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
  // 遍历当前集群下所有节点,除了当前所在节点
  for (Member each : memberManager.allMembersWithoutSelf()) {
    syncToTarget(distroKey, action, each.getAddress(), delay);
  }
}

DistroProtocol 类会循环遍历其他的 Nacos 节点,提交一个异步任务,这个异步任务会延迟 1s 后进行执行,在这里我们可以看到「客户端断开」和「客户端新增/修改」;对于 Delete 操作,由 DistroSyncDeleteTask 处理;对于 Change 操作,由 DistroSyncChangeTask 处理,先从 DistroSyncChangeTask 这个异步任务抽象子类介绍:

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    private static final DataOperation OPERATION = DataOperation.CHANGE;
    public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
        super(distroKey, distroComponentHolder);
    }
    @Override
    protected DataOperation getDataOperation() {
        return OPERATION;
    }
    // 无回调
    @Override
    protected boolean doExecute() {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    // 有回调
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    @Override
    public String toString() {
        return "DistroSyncChangeTask for " + getDistroKey().toString();
    }
    // 从 DistroClientDataProcessor 获取 DistroData
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
}

获取到的 DistroData,其实是来自于从 ClientManager 实时获取到的 Client

// DistroClientDataProcessor.java
@Override
public DistroData getDistroData(DistroKey distroKey) {
  Client client = clientManager.getClient(distroKey.getResourceKey());
  if (null == client) {
    return null;
  }
  // 把生成的同步数据放入到数组中进行返回
  byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
  return new DistroData(distroKey, data);
}
// AbstractClient.java
@Override
public ClientSyncData generateSyncData() {
  List<String> namespaces = new LinkedList<>();
  List<String> groupNames = new LinkedList<>();
  List<String> serviceNames = new LinkedList<>();
  List<String> batchNamespaces = new LinkedList<>();
  List<String> batchGroupNames = new LinkedList<>();
  List<String> batchServiceNames = new LinkedList<>();
  List<InstancePublishInfo> instances = new LinkedList<>();
  List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
  BatchInstanceData  batchInstanceData = new BatchInstanceData();
  for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
    InstancePublishInfo instancePublishInfo = entry.getValue();
    if (instancePublishInfo instanceof BatchInstancePublishInfo) {
      BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
      batchInstancePublishInfos.add(batchInstance);
      buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
      batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
    } else {
      namespaces.add(entry.getKey().getNamespace());
      groupNames.add(entry.getKey().getGroup());
      serviceNames.add(entry.getKey().getName());
      instances.add(entry.getValue());
    }
  }
  return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
}

AbstractClient 继承了 Client,同时给 DistroClientDataProcessor 提供了 Client 的注册信息,包括客户端注册了哪些 namespace、group、service、instance

回过头来看 DistroSyncChangeTask#doExecute 下调用的 syncData 方法

// DistroClientTransportAgent.java
@Override
public boolean syncData(DistroData data, String targetServer) {
  if (isNoExistTarget(targetServer)) {
    return true;
  }
  DistroDataRequest request = new DistroDataRequest(data, data.getType());
  Member member = memberManager.find(targetServer);
  if (checkTargetServerStatusUnhealthy(member)) {
    Loggers.DISTRO
      .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
            data.getDistroKey());
    return false;
  }
  try {
    Response response = clusterRpcClientProxy.sendRequest(member, request);
    return checkResponse(response);
  } catch (NacosException e) {
    Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
  }
  return false;
}

这个方法实际上是由 DistroClientTransportAgent 封装为 DistroDataRequest 调用其他的 Nacos 节点

非负责节点

// DistroClientDataProcessor.java
@Override
public boolean processData(DistroData distroData) {
  switch (distroData.getType()) {
    case ADD:
    case CHANGE:
      ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), ClientSyncData.class);
      handlerClientSyncData(clientSyncData);
      return true;
    case DELETE:
      String deleteClientId = distroData.getDistroKey().getResourceKey();
      Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
      clientManager.clientDisconnected(deleteClientId);
      return true;
    default:
      return false;
  }
}

当负责节点将数据发送给非负责节点以后,将要处理发送过来的 Client 数据,这里我们要看 DistroClientDataProcessor#handlerClientSyncData 方法

// DistroClientDataProcessor.java
private void handlerClientSyncData(ClientSyncData clientSyncData) {
  Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
  clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
  Client client = clientManager.getClient(clientSyncData.getClientId());
  upgradeClient(client, clientSyncData);
}

handlerClientSyncData->upgradeClient:查看具体处理方法

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
  Set<Service> syncedService = new HashSet<>();
  // 处理批次实例同步的逻辑
  processBatchInstanceDistroData(syncedService, client, clientSyncData);
  List<String> namespaces = clientSyncData.getNamespaces();
  List<String> groupNames = clientSyncData.getGroupNames();
  List<String> serviceNames = clientSyncData.getServiceNames();
  List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
  for (int i = 0; i < namespaces.size(); i++) {
    Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    syncedService.add(singleton);
    InstancePublishInfo instancePublishInfo = instances.get(i);
    if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
      client.addServiceInstance(singleton, instancePublishInfo);
      NotifyCenter.publishEvent(
        new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
    }
  }
  for (Service each : client.getAllPublishedService()) {
    if (!syncedService.contains(each)) {
      client.removeServiceInstance(each);
      NotifyCenter.publishEvent(
        new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
    }
  }
}

DistroClientDataProcessor#upgradeClient 方法:先更新 Client 里的注册表信息,再发布对应的事件 ClientRegisterServiceEvent

注意: 这里要注意下此时 Client 实现类 ConnectionBasedClient,只不过它的 isNative 属性为 false,这是非负责节点和负责节点的主要区别

其实判断当前 Nacos 节点是否为负责节点的依据主要是靠这个 isNative 属性;如果是客户端直接注册在这个 Nacos 节点上的 ConnectionBasedClient,它的 isNative 属性为 true;如果是 Distro 协议,同步到这个 Nacos 节点上 ConnectionBasedClient,它的 isNative 属性为 false

2.x 版本以后使用了长连接,所以通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求

Distro 协议负责集群数据统一

Distro 协议为了确保集群之间数据一致,不仅仅依赖于数据发送改变时的实时同步,后台有定时任务作数据同步

在 1.x 版本中,责任节点每 5s 同步所有 Service 下 Instance 列表的摘要(md5)给非责任节点,非责任节点用对端传过来的服务 md5 对比本地服务的 md5,如果发送了改变,需要反查责任节点

在 2.x 版本中,对这个流程进行改造,责任节点会发送 Client 全量数据,非责任节点会定时监测同步过来的 Client 是否过期,减少 1.x 版本中非责任节点的反查

// DistroProtocol#startDistroTask->startVerifyTask
private void startVerifyTask() {
  GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                                                                          distroTaskEngineHolder.getExecuteWorkersManager()),DistroConfig.getInstance().getVerifyIntervalMillis());
  // DistroConfig.getInstance().getVerifyIntervalMillis():间隔 5s
}
// 每隔 5s 执行该任务
// DistroVerifyTimedTask.java
@Override
public void run() {
  try {
    // 所有其他节点,除了当前自身
    List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
    if (Loggers.DISTRO.isDebugEnabled()) {
      Loggers.DISTRO.debug("server list is: {}", targetServer);
    }
    for (String each : distroComponentHolder.getDataStorageTypes()) {
      // 遍历这些节点发送 Client#isNative=true 的 DistroData,type=VERIFY
      verifyForDataStorage(each, targetServer);
    }
  } catch (Exception e) {
    Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
  }
}

责任节点每 5s 会向其他节点发送 DataOperation=VERIFY 类型的 DistroData,来维持非责任节点的 Client 数据不过期

public ConnectionBasedClientManager() {
  // 每隔 5s 扫描 isNative=false 状态的 Client,
  GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL,TimeUnit.MILLISECONDS);
}
// 每隔 5s 会执行当前这个任务
private static class ExpiredClientCleaner implements Runnable {
  private final ConnectionBasedClientManager clientManager;
  public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) {
    this.clientManager = clientManager;
  }
  @Override
  public void run() {
    long currentTime = System.currentTimeMillis();
    for (String each : clientManager.allClientId()) {
      ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
      if (null != client && client.isExpire(currentTime)) {
        clientManager.clientDisconnected(each);
      }
    }
  }
}
// ConnectionBaseClient.java
@Override
public boolean isExpire(long currentTime) {
  // ClientConfig.getInstance().getClientExpiredTime():5min
  // 该时间内没有更新,代表已过期
  return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime();
}

非责任节点每 5s 扫描 isNative=false 的 Client,如果 client 3 分钟内没有被 VERIFY 状态的 DistroData 数据更新过续期时间,会删除这个同步过来的 Client 数据

由客户端的更新时间来保证该实例下的数据是否已经过期,若过期,则移除掉该客户端

结尾

至此「Nacos 客户端/服务端同步集群数据源码分析」分析到这里,基本只需要掌握大致的脉路即可

欢迎大家在评论框分享您的看法,喜欢该文章帮忙给个赞👍和收藏,喜欢博客分享的文章内容帮忙给个粉丝位,感谢,感谢!!!

分享个人学习源码的几部曲

  • 设计模式掌握为前提,程序员的内功修炼法,🙅不分语言
  • 不要太追究于细节,捋清大致脉路即可;太过于追究于细节,你会越捋越乱
  • 关注重要的类和方法、核心逻辑
  • 掌握 Debug 技巧,在关键的类和方法多停留,多作分析和记录

更多技术文章可以查看:vnjohn 个人博客

祝贺大家兔年快乐,新的一年顶呱呱👍


目录
相关文章
|
7月前
|
缓存 前端开发 Java
nacos常见问题之开启鉴权后客户端报403升级版本如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
1646 0
|
4月前
|
Java Nacos 开发工具
【Nacos】心跳断了怎么办?!8步排查法+实战代码,手把手教你解决Nacos客户端不发送心跳检测问题,让服务瞬间恢复活力!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心。然而,“客户端不发送心跳检测”的问题时有发生,可能导致服务实例被视为离线。本文介绍如何排查此类问题:确认Nacos服务器地址配置正确;检查网络连通性;查看客户端日志;确保Nacos SDK版本兼容;调整心跳检测策略;验证服务实例注册状态;必要时重启应用;检查影响行为的环境变量。通过这些步骤,通常可定位并解决问题,保障服务稳定运行。
297 0
|
5月前
|
网络安全 Nacos
Nacos客户端配置错误检查
Nacos客户端配置错误检查
207 3
|
5月前
|
缓存 网络安全 Nacos
登录nacos客户端提示no message available
登录nacos客户端提示no message available
|
7月前
|
Java 测试技术 Nacos
|
7月前
|
缓存 关系型数据库 Nacos
nacos常见问题之服务端不开启鉴权日志一直报403如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
697 2
|
15天前
|
JSON Java Nacos
SpringCloud 应用 Nacos 配置中心注解
在 Spring Cloud 应用中可以非常低成本地集成 Nacos 实现配置动态刷新,在应用程序代码中通过 Spring 官方的注解 @Value 和 @ConfigurationProperties,引用 Spring enviroment 上下文中的属性值,这种用法的最大优点是无代码层面侵入性,但也存在诸多限制,为了解决问题,提升应用接入 Nacos 配置中心的易用性,Spring Cloud Alibaba 发布一套全新的 Nacos 配置中心的注解。
|
1月前
|
负载均衡 应用服务中间件 Nacos
Nacos配置中心
Nacos配置中心
90 1
Nacos配置中心
|
1月前
|
Java 网络安全 Nacos
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评
Nacos作为流行的微服务注册与配置中心,其稳定性与易用性广受好评。然而,“客户端不发送心跳检测”是使用中常见的问题之一。本文详细探讨了该问题的原因及解决方法,包括检查客户端配置、网络连接、日志、版本兼容性、心跳检测策略、服务实例注册状态、重启应用及环境变量等步骤,旨在帮助开发者快速定位并解决问题,确保服务正常运行。
45 5
|
1月前
|
监控 Java 测试技术
Nacos 配置中心变更利器:自定义标签灰度
本文是对 MSE Nacos 应用自定义标签灰度的功能介绍,欢迎大家升级版本进行试用。
150 10