2023 年农年第一篇文章,学习无止境,分享技术博客无比乐趣,记录自己的学习生涯!!!
Nacos 同步集群数据
当我们有服务进行注册以后,会写入注册信息同时会触发 ClientChangedEvent 事件,通过这个事件,就开始进行了 Nacos 集群数据的同步,当然这其中只有一个 Nacos 节点来处理对应的客户端请求
在整个处理过程中,涉及到一个负责节点和非负责节点
负责节点
首先可以看到的是处理这个事件的 DistroClientDataProcessor「客户端数据一致性处理器」类型,这个类型会处理当前节点负责的 client,接下来看看该类下的 syncToAllServer 方法
// DistroClientDataProcessor#onEvent->syncToAllServer private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); // 只有临时数据通过 Distro 同步,持久化数据应该通过 raft(分布式一致式协议)进行同步 // Only ephemeral data sync by Distro, persist client should sync by raft. // 判断客户端是否为空、是否是临时实例、是否为负责节点 if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE); } else if (event instanceof ClientEvent.ClientChangedEvent) { // 客户端新增/更改 DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); } } public void sync(DistroKey distroKey, DataOperation action, long delay) { // 遍历当前集群下所有节点,除了当前所在节点 for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); } }
DistroProtocol 类会循环遍历其他的 Nacos 节点,提交一个异步任务,这个异步任务会延迟 1s 后进行执行,在这里我们可以看到「客户端断开」和「客户端新增/修改」;对于 Delete 操作,由 DistroSyncDeleteTask 处理;对于 Change 操作,由 DistroSyncChangeTask 处理,先从 DistroSyncChangeTask 这个异步任务抽象子类介绍:
public class DistroSyncChangeTask extends AbstractDistroExecuteTask { private static final DataOperation OPERATION = DataOperation.CHANGE; public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) { super(distroKey, distroComponentHolder); } @Override protected DataOperation getDataOperation() { return OPERATION; } // 无回调 @Override protected boolean doExecute() { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return true; } return getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer()); } // 有回调 @Override protected void doExecuteWithCallback(DistroCallback callback) { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type); if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return; } getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer(), callback); } @Override public String toString() { return "DistroSyncChangeTask for " + getDistroKey().toString(); } // 从 DistroClientDataProcessor 获取 DistroData private DistroData getDistroData(String type) { DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey()); if (null != result) { result.setType(OPERATION); } return result; } }
获取到的 DistroData,其实是来自于从 ClientManager 实时获取到的 Client
// DistroClientDataProcessor.java @Override public DistroData getDistroData(DistroKey distroKey) { Client client = clientManager.getClient(distroKey.getResourceKey()); if (null == client) { return null; } // 把生成的同步数据放入到数组中进行返回 byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData()); return new DistroData(distroKey, data); }
// AbstractClient.java @Override public ClientSyncData generateSyncData() { List<String> namespaces = new LinkedList<>(); List<String> groupNames = new LinkedList<>(); List<String> serviceNames = new LinkedList<>(); List<String> batchNamespaces = new LinkedList<>(); List<String> batchGroupNames = new LinkedList<>(); List<String> batchServiceNames = new LinkedList<>(); List<InstancePublishInfo> instances = new LinkedList<>(); List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>(); BatchInstanceData batchInstanceData = new BatchInstanceData(); for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) { InstancePublishInfo instancePublishInfo = entry.getValue(); if (instancePublishInfo instanceof BatchInstancePublishInfo) { BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo; batchInstancePublishInfos.add(batchInstance); buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry); batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos); } else { namespaces.add(entry.getKey().getNamespace()); groupNames.add(entry.getKey().getGroup()); serviceNames.add(entry.getKey().getName()); instances.add(entry.getValue()); } } return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData); }
AbstractClient 继承了 Client,同时给 DistroClientDataProcessor 提供了 Client 的注册信息,包括客户端注册了哪些 namespace、group、service、instance
回过头来看 DistroSyncChangeTask#doExecute 下调用的 syncData 方法
// DistroClientTransportAgent.java @Override public boolean syncData(DistroData data, String targetServer) { if (isNoExistTarget(targetServer)) { return true; } DistroDataRequest request = new DistroDataRequest(data, data.getType()); Member member = memberManager.find(targetServer); if (checkTargetServerStatusUnhealthy(member)) { Loggers.DISTRO .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer, data.getDistroKey()); return false; } try { Response response = clusterRpcClientProxy.sendRequest(member, request); return checkResponse(response); } catch (NacosException e) { Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e); } return false; }
这个方法实际上是由 DistroClientTransportAgent 封装为 DistroDataRequest 调用其他的 Nacos 节点
非负责节点
// DistroClientDataProcessor.java @Override public boolean processData(DistroData distroData) { switch (distroData.getType()) { case ADD: case CHANGE: ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class) .deserialize(distroData.getContent(), ClientSyncData.class); handlerClientSyncData(clientSyncData); return true; case DELETE: String deleteClientId = distroData.getDistroKey().getResourceKey(); Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId); clientManager.clientDisconnected(deleteClientId); return true; default: return false; } }
当负责节点将数据发送给非负责节点以后,将要处理发送过来的 Client 数据,这里我们要看 DistroClientDataProcessor#handlerClientSyncData 方法
// DistroClientDataProcessor.java private void handlerClientSyncData(ClientSyncData clientSyncData) { Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId()); clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes()); Client client = clientManager.getClient(clientSyncData.getClientId()); upgradeClient(client, clientSyncData); }
handlerClientSyncData->upgradeClient:查看具体处理方法
private void upgradeClient(Client client, ClientSyncData clientSyncData) { Set<Service> syncedService = new HashSet<>(); // 处理批次实例同步的逻辑 processBatchInstanceDistroData(syncedService, client, clientSyncData); List<String> namespaces = clientSyncData.getNamespaces(); List<String> groupNames = clientSyncData.getGroupNames(); List<String> serviceNames = clientSyncData.getServiceNames(); List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos(); for (int i = 0; i < namespaces.size(); i++) { Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i)); Service singleton = ServiceManager.getInstance().getSingleton(service); syncedService.add(singleton); InstancePublishInfo instancePublishInfo = instances.get(i); if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) { client.addServiceInstance(singleton, instancePublishInfo); NotifyCenter.publishEvent( new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); } } for (Service each : client.getAllPublishedService()) { if (!syncedService.contains(each)) { client.removeServiceInstance(each); NotifyCenter.publishEvent( new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId())); } } }
DistroClientDataProcessor#upgradeClient 方法:先更新 Client 里的注册表信息,再发布对应的事件 ClientRegisterServiceEvent
注意: 这里要注意下此时 Client 实现类 ConnectionBasedClient,只不过它的 isNative 属性为 false,这是非负责节点和负责节点的主要区别
其实判断当前 Nacos 节点是否为负责节点的依据主要是靠这个 isNative 属性;如果是客户端直接注册在这个 Nacos 节点上的 ConnectionBasedClient,它的 isNative 属性为 true;如果是 Distro 协议,同步到这个 Nacos 节点上 ConnectionBasedClient,它的 isNative 属性为 false
2.x 版本以后使用了长连接,所以通过长连接建立在哪个节点上,哪个节点就是责任节点,客户端也只会向这个责任节点发送请求
Distro 协议负责集群数据统一
Distro 协议为了确保集群之间数据一致,不仅仅依赖于数据发送改变时的实时同步,后台有定时任务作数据同步
在 1.x 版本中,责任节点每 5s 同步所有 Service 下 Instance 列表的摘要(md5)给非责任节点,非责任节点用对端传过来的服务 md5 对比本地服务的 md5,如果发送了改变,需要反查责任节点
在 2.x 版本中,对这个流程进行改造,责任节点会发送 Client 全量数据,非责任节点会定时监测同步过来的 Client 是否过期,减少 1.x 版本中非责任节点的反查
// DistroProtocol#startDistroTask->startVerifyTask private void startVerifyTask() { GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder, distroTaskEngineHolder.getExecuteWorkersManager()),DistroConfig.getInstance().getVerifyIntervalMillis()); // DistroConfig.getInstance().getVerifyIntervalMillis():间隔 5s } // 每隔 5s 执行该任务 // DistroVerifyTimedTask.java @Override public void run() { try { // 所有其他节点,除了当前自身 List<Member> targetServer = serverMemberManager.allMembersWithoutSelf(); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("server list is: {}", targetServer); } for (String each : distroComponentHolder.getDataStorageTypes()) { // 遍历这些节点发送 Client#isNative=true 的 DistroData,type=VERIFY verifyForDataStorage(each, targetServer); } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e); } }
责任节点每 5s 会向其他节点发送 DataOperation=VERIFY 类型的 DistroData,来维持非责任节点的 Client 数据不过期
public ConnectionBasedClientManager() { // 每隔 5s 扫描 isNative=false 状态的 Client, GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL,TimeUnit.MILLISECONDS); } // 每隔 5s 会执行当前这个任务 private static class ExpiredClientCleaner implements Runnable { private final ConnectionBasedClientManager clientManager; public ExpiredClientCleaner(ConnectionBasedClientManager clientManager) { this.clientManager = clientManager; } @Override public void run() { long currentTime = System.currentTimeMillis(); for (String each : clientManager.allClientId()) { ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each); if (null != client && client.isExpire(currentTime)) { clientManager.clientDisconnected(each); } } } } // ConnectionBaseClient.java @Override public boolean isExpire(long currentTime) { // ClientConfig.getInstance().getClientExpiredTime():5min // 该时间内没有更新,代表已过期 return !isNative() && currentTime - getLastRenewTime() > ClientConfig.getInstance().getClientExpiredTime(); }
非责任节点每 5s 扫描 isNative=false 的 Client,如果 client 3 分钟内没有被 VERIFY 状态的 DistroData 数据更新过续期时间,会删除这个同步过来的 Client 数据
由客户端的更新时间来保证该实例下的数据是否已经过期,若过期,则移除掉该客户端
结尾
至此「Nacos 客户端/服务端同步集群数据源码分析」分析到这里,基本只需要掌握大致的脉路即可
欢迎大家在评论框分享您的看法,喜欢该文章帮忙给个赞👍和收藏,喜欢博客分享的文章内容帮忙给个粉丝位,感谢,感谢!!!
分享个人学习源码的几部曲
- 设计模式掌握为前提,程序员的内功修炼法,🙅不分语言
- 不要太追究于细节,捋清大致脉路即可;太过于追究于细节,你会越捋越乱
- 关注重要的类和方法、核心逻辑
- 掌握 Debug 技巧,在关键的类和方法多停留,多作分析和记录
更多技术文章可以查看:vnjohn 个人博客
祝贺大家兔年快乐,新的一年顶呱呱👍