Nacos 服务端健康检查
长连接
概念:长连接,指在一个连接上可以连续发送多个数据包,在连接保持期间,如果没有数据包发送,需要双方发送链路检测包
注册中心客户端 2.0 以后使用 gRPC 代替 http,会与服务端建立长连接,但仍然保留了对旧 http 客户端的支持
NamingClientProxy 接口负责底层通讯,调用服务端接口,有三个实现类
- NamingClientProxyDelegate:代理类,对所有 NacosNamingService 中方法进行代理,通过实际情况选择 http 或 gRPC 协议请求服务端
- NamingGrpcClientProxy:底层通讯基于 gRPC 长连接
- NamingHttpClientProxy:底层通讯基于 http 短连接,使用的都是老代码基本没改,原来 1.0「NamingProxy」重命名过来的
以客户端服务注册为例,NamingClientProxyDelegate 代理了 registerService 方法
// NacosNamingService.java private NamingClientProxy clientProxy;// NamingClientProxyDelegate public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); clientProxy.registerService(serviceName, groupName, instance); }
NamingClientProxyDelegate 会通过 instance 实例判断是否是临时节点而选择不同的协议
- 临时 instance:gRPC
- 持久 instance:http
public class NamingClientProxyDelegate implements NamingClientProxy { private final NamingHttpClientProxy httpClientProxy; private final NamingGrpcClientProxy grpcClientProxy; public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); } // 临时节点:走 gRPC 长连接,持久节点:走 http 短连接 private NamingClientProxy getExecuteClientProxy(Instance instance) { return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; } ... }
健康检查
临时实例:Distro 协议走内存存储、持久实例:Raft 协议持久化存储
在之前的 1.x 版本中临时实例走 Distro 协议内存存储,客户端向注册中心发送心跳来维持自身的 healthy 状态,持久实例走 Raft 协议持久化存储,服务端实时与客户端端建立 tcp 连接做健康检查,但是在 2.0 版本以后持久化实例没有什么变化,但是 2.0 临时实例不再使用心跳,而是通过长连接是否存活来判断实例是否健康
每过 3s 检测对所有超过 20s 没发生过通讯的客户端,向客户端发送 ClientDetectionRequest 探测请求,如果客户端在 1s 内成功响应,则检测通过,否则执行 unregister 方法移除 connection「在这里发生了服务端主动请求去探活客户端是否正常」
如果客户端持续与服务端通讯,服务端是不需要主动探活的
// since ClientManager.java Map<String, Connection> connections = new ConcurrentHashMap<>(); // 该方法使用 @PostConstruct 注解修饰,代表该方法会在该 ClientManager Bean 实例化之后,初始化之前进行调用处理. @PostConstruct public void start() { // 启动不健康连接排除的任务功能 RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(() -> { try { int totalCount = connections.size(); Loggers.REMOTE_DIGEST.info("Connection check task start"); MetricsMonitor.getLongConnectionMonitor().set(totalCount); // 统计过时(20s)连接 Set<Map.Entry<String, Connection>> entries = connections.entrySet(); // 获取连接中来源=sdk 客户端总数量 int currentSdkClientCount = currentSdkClientCount(); boolean isLoaderClient = loadClient >= 0; int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit; int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0); Loggers.REMOTE_DIGEST .info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}", totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount), currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount); List<String> expelClient = new LinkedList<>(); Map<String, AtomicInteger> expelForIp = new HashMap<>(16); // 1、计算 ip 驱逐数量 for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String appName = client.getMetaInfo().getAppName(); String clientIp = client.getMetaInfo().getClientIp(); if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) { // 获取当前的 ip 限制 int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp); if (countLimitOfIp < 0) { int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName); countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp; } if (countLimitOfIp < 0) { countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault(); } if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) { AtomicInteger currentCountIp = connectionForClientIp.get(clientIp); if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) { expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp)); } } } } Loggers.REMOTE_DIGEST .info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size()); if (expelForIp.size() > 0) { Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp); } Set<String> outDatedConnections = new HashSet<>(); long now = System.currentTimeMillis(); // 2、获取 ip 限制的驱逐连接 for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); String clientIp = client.getMetaInfo().getClientIp(); AtomicInteger integer = expelForIp.get(clientIp); if (integer != null && integer.intValue() > 0) { integer.decrementAndGet(); expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; // 若当前客户端最后的活跃时间已经超过了 20s,KEEP_ALIVE_TIME=20000L,视为该连接可被丢弃 } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { outDatedConnections.add(client.getMetaInfo().getConnectionId()); } } // 3、如果总计数仍然超过限制,再次检测 if (expelCount > 0) { for (Map.Entry<String, Connection> entry : entries) { Connection client = entry.getValue(); if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo() .isSdkSource() && expelCount > 0) { expelClient.add(client.getMetaInfo().getConnectionId()); expelCount--; outDatedConnections.remove(client.getMetaInfo().getConnectionId()); } } } String serverIp = null; String serverPort = null; if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) { String[] split = redirectAddress.split(Constants.COLON); serverIp = split[0]; serverPort = split[1]; } for (String expelledClientId : expelClient) { try { // 获取驱逐客户端的连接信息 Connection connection = getConnection(expelledClientId); if (connection != null) { // 组装连接信息,发出异步请求 ConnectResetRequest connectResetRequest = new ConnectResetRequest(); connectResetRequest.setServerIp(serverIp); connectResetRequest.setServerPort(serverPort); connection.asyncRequest(connectResetRequest, null); Loggers.REMOTE_DIGEST .info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}", expelledClientId, connectResetRequest.getServerIp(), connectResetRequest.getServerPort()); } // 若请求中发出异常,例如:超时,则注销当前连接 } catch (ConnectionAlreadyClosedException e) { unregister(expelledClientId); } catch (Exception e) { Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e); } } // 4、客户端主动检测 Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size()); if (CollectionUtils.isNotEmpty(outDatedConnections)) { Set<String> successConnections = new HashSet<>(); final CountDownLatch latch = new CountDownLatch(outDatedConnections.size()); for (String outDateConnectionId : outDatedConnections) { try { Connection connection = getConnection(outDateConnectionId); if (connection != null) { // 异步请求所有需要检测的连接,并给出请求回调不同的处理方式 ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest(); connection.asyncRequest(clientDetectionRequest, new RequestCallBack() { @Override public Executor getExecutor() { return null; } // 1s内是否有正常响应 @Override public long getTimeout() { return 1000L; } // 若请求有响应,说明该连接仍然可用,计为成功 @Override public void onResponse(Response response) { latch.countDown(); if (response != null && response.isSuccess()) { connection.freshActiveTime(); successConnections.add(outDateConnectionId); } } @Override public void onException(Throwable e) { latch.countDown(); } }); Loggers.REMOTE_DIGEST .info("[{}]send connection active request ", outDateConnectionId); } else { latch.countDown(); } } catch (ConnectionAlreadyClosedException e) { latch.countDown(); } catch (Exception e) { Loggers.REMOTE_DIGEST .error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e); latch.countDown(); } } latch.await(3000L, TimeUnit.MILLISECONDS); Loggers.REMOTE_DIGEST .info("Out dated connection check successCount={}", successConnections.size()); // 对于没有成功响应的客户端,执行 unregister 移除 for (String outDateConnectionId : outDatedConnections) { if (!successConnections.contains(outDateConnectionId)) { Loggers.REMOTE_DIGEST .info("[{}]Unregister Out dated connection....", outDateConnectionId); // 注销连接 unregister(outDateConnectionId); } } } // 重置加载客户端 if (isLoaderClient) { loadClient = -1; redirectAddress = null; } Loggers.REMOTE_DIGEST.info("Connection check task end"); } catch (Throwable e) { Loggers.REMOTE.error("Error occurs during connection check... ", e); } }, 1000L, 3000L, TimeUnit.MILLISECONDS); } // 注销/移除连接方法 public synchronized void unregister(String connectionId) { Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { connectionForClientIp.remove(clientIp); } } remove.close(); Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } }
移除 connection 以后,继承 ClientConnectionEventListener#ConnectionBasedClientManager 会移除 client,发布 ClientDisconnectEvent 事件
public boolean clientDisconnected(String clientId) { Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId); ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } client.release(); NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client))); return true; }
ClientDisconnectEvent 会触发几个事件:
- Distro 协议:同步移除 client 数据
- 清除两个索引缓存:ClientServiceIndexesManager 类中 Service 与发布 client/订阅 client 关系、ServiceStorage 中 Service 与 instance 关系
- 服务订阅:ClientDisconnectEvent 会间接触发 ServiceChangedEvent 事件,将服务变更通知客户端
Nacos 客户端服务订阅的事件机制剖析
上节已经分析了 Nacos 客户端订阅的核心流程:Nacos 客户端通过一个定时任务,每 6 秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理,然后更新内存中和本地缓存中的实例
定时任务获取到最新实例以后,整个事件机制是如何处理的,首先先回顾整体的流程:
监听事件的注册
在 subscribe 方法中,通过下面的源码进行注册监听事件
@Override public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { if (null == listener) { return; } String clusterString = StringUtils.join(clusters, ","); changeNotifier.registerListener(groupName, serviceName, clusterString, listener); clientProxy.subscribe(serviceName, groupName, clusterString); }
在这其中,我们主要关注的就是 changeNotifier.registerListener,此监听就是进行具体事件注册逻辑的,看以下源码:
/** * register listener. * * @param groupName group name 组名 * @param serviceName serviceName 服务名 * @param clusters clusters, concat by ','. such as 'xxx,yyy' 集群名 * @param listener custom listener 自定义的监听器 */ public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (eventListeners == null) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet<>(); // 将 EventListener 缓存到 LinkenerMap listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); }
可以看出,事件的注册便是将 EventListener 存储在 InstancesChangeNotifier#listenerMap 属性中了,同时这里的数据结构为 ConcurrentHashMap,key:服务实例信息的拼接,value:监听事件的集合
处理 ServiceInfo
上面的源码中已经完成了事件的注册,现在就是追溯触发事件的来源,UpdateTask 中获取到最新的实例会进行本地化处理,部分远吗如下:
// ServiceInfoUpdateService->UpdateTask#run ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj == null) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); lastRefTime = serviceObj.getLastRefTime(); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); serviceInfoHolder.processServiceInfo(serviceObj); }
这个 run 方法的详细逻辑已经分析过了,今天主要来看其中本地缓存处理的方法 serviceInfoHolder#processServiceInfo,往下分析流程图:
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (isEmptyOrErrorPush(serviceInfo)) { //empty or error push, just ignore return oldService; } // 缓存服务信息 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 判断注册的实例信息是否发生变更 boolean changed = isChangedServiceInfo(oldService, serviceInfo); if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) { serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo)); } // 监控服务监控缓存 Map 大小 MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); // 服务实例已变更 if (changed) { NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts())); // 添加实例变更事件,会被订阅者执行 NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(),serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); // 记录 Service 本地文件 DiskCache.write(serviceInfo, cacheDir); } return serviceInfo; }
这个逻辑简单来说:判断该新的 ServiceInfo 数据是否正确、是否发生了变化,如果数据格式正确,则发生变化,那就发布一个 InstancesChangeEvent 事件,同时将 ServiceInfo 写入本地缓存
分析到这里,可以发现这个重点应该在服务信息变更之后,发布的 InstancesChangeEvent 事件,这个事件是 NotifyCenter 进行发布的,下面来追踪一下事件追踪的源码
事件追踪
NotifyCenter 通知中心的核心流程如下:
- 根据事件类型获取其 CanonicalName,例:根据 InstancesChangeEvent 事件类型,获得对应 CanonicalName
- 将 CanonicalName 作为 key,从 NotifyCenter.publisherMap 中获取对应的事件发布者(EventPublisher)
- 事件发布者进行事件发布:EventPublisher 发布 InstancesChangeEvent 事件
核心代码如下:
// NotifyCenter.java private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } // 根据 InstancesChangeEvent 事件类型,获取对应的 CanonicalName final String topic = ClassUtils.getCanonicalName(eventType); // 将 CanonicalName 作为 Key,从 NotifyCenter#publishMap 中获取到对应的事件发布者(EventPublisher) EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { // 事件发布者 publisher 发布事件(InstancesChangeEvent) return publisher.publish(event); } LOGGER.warn("There are no [{}] publishers for this event, please register", topic); return false; } • 16
在这个源码中,其实 INSTANCE 是 NotifyCenter 的单例实现,那么这里的 publishMap#key「CanonicalName」和 value「EventPublisher」之间的关系是什么时候进行建立的?
其实是在 NacosNamingService 实例化时调用 init 初始化方法中进行绑定的
// NacosNamingService.java private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); InitUtils.initWebRootContext(properties); initLogName(properties); this.notifierEventScope = UUID.randomUUID().toString(); this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope); // 该行进行关联绑定发布者的 NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 该行进行关联绑定订阅者的 NotifyCenter.registerSubscriber(changeNotifier); this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, properties); this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier); }
这里再继续追踪 registerToPublisher 方法就会发现默认采用了 「EventPublisherFactory DEFAULT_PUBLISHER_FACTORY」默认发布者工厂来进行构建,我们再继续追踪发现,在 NotifyCenter 中静态代码块,会发现 DEFAULT_PUBLISHER_FACTORY 默认构建的 EventPublisher 为 DefaultPublisher
// NotifyCenter.java public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) { return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize); } ------------------------------------------------ // NotifyCenter#static 静态代码块中的代码 DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> { try { EventPublisher publisher = clazz.newInstance(); publisher.init(cls, buffer); return publisher; } catch (Throwable ex) { LOGGER.error("Service class newInstance has error : ", ex); throw new NacosRuntimeException(SERVER_ERROR, ex); } };
所以我们得出结论 NotifyCenter 中它维护了事件名称和事件发布者的关系,而默认的事件发布者为 DefaultPublisher