Nacos 2.2.1 healthCheckRequest 健康监测,是如何剔除不健康实例的?
                                                    先看看这个链接的内容:https://blog.csdn.net/Paranoia_ZK/article/details/133356315服务注册后,应该是放到任务里面去执行
public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,
            ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,
            NamingMetadataManager metadataManager, PushExecutorDelegate pushExecutor, SwitchDomain switchDomain) {
        this.clientManager = clientManager;
        this.indexesManager = indexesManager;
        // 创建延迟任务执行引擎
        this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,
                metadataManager, pushExecutor, switchDomain);
        NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
    }
public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
                                      ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
                                      PushExecutor pushExecutor, SwitchDomain switchDomain) {
        super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
        this.clientManager = clientManager;
        this.indexesManager = indexesManager;
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
        this.pushExecutor = pushExecutor;
        this.switchDomain = switchDomain;
        // 设置延迟任务的处理器
        setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
    }
   // PushDelayTaskProcessor的process方法 
  @Override
  public boolean process(NacosTask task) {
            PushDelayTask pushDelayTask = (PushDelayTask) task;
            Service service = pushDelayTask.getService();
            NamingExecuteTaskDispatcher.getInstance()
                      // 待执行的任务[PushExecuteTask]
                    .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
            return true;
        }
// PushExecuteTask 执行方法
@Override
public void run() {
        try {
            PushDataWrapper wrapper = generatePushData();
            ClientManager clientManager = delayTaskEngine.getClientManager();
            for (String each : getTargetClientIds()) {
                Client client = clientManager.getClient(each);
                if (null == client) {
                    // means this client has disconnect
                    continue;
                }
                Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
                // 开始执行健康检测回调
                delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                        new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
            }
        } catch (Exception e) {
            Loggers.PUSH.error('Push task for service' + service.getGroupedServiceName() + ' execute failed ', e);
            delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
        }
    }
【可忽略】这里是最开始的服务变更和客户端订阅事件 NamingSubscriberServiceV2Impl#onEvent,把相关任务添加到任务执行引擎
    @Override
    public void onEvent(Event event) {
        if (event instanceof ServiceEvent.ServiceChangedEvent) {
            // If service changed, push to all subscribers. 服务变化内容订阅
            ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
            Service service = serviceChangedEvent.getService();
            delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
            MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
        } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
            // If service is subscribed by one client, only push this client. 服务订阅
            ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
            Service service = subscribedEvent.getService();
            delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                    subscribedEvent.getClientId()));
        }
    }
推送执行任务的run方法 PushExecuteTask#run -> delayTaskEngine.getPushExecutor().doPushWithCallback
    @Override
    public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
            NamingPushCallback callBack) {
        ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
        callBack.setActualServiceInfo(actualServiceInfo);
        // 这里调用了PushExecutorRpcImpl#pushWithCallback
        pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
                callBack, GlobalExecutor.getCallbackExecutor());
    }
PushExecutorRpcImpl#doPushWithCallback
    @Override
    public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
            NamingPushCallback callBack) {
        ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
        callBack.setActualServiceInfo(actualServiceInfo);
        // 执行心跳检测的访问
        pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
                callBack, GlobalExecutor.getCallbackExecutor());
    }
RpcPushService#pushWithCallback
/**
     * push response with no ack.
     *
     * @param connectionId    connectionId.
     * @param request         request.
     * @param requestCallBack requestCallBack.
     */
    public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
            Executor executor) {
        Connection connection = connectionManager.getConnection(connectionId);
        if (connection != null) {
            try {
                connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
                    @Override
                    public Executor getExecutor() {
                        return executor;
                    }
                    @Override
                    public void onResponse(Response response) {
                        if (response.isSuccess()) {
                            requestCallBack.onSuccess();
                        } else {
                            requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
                        }
                    }
                    @Override
                    public void onException(Throwable e) {
                        requestCallBack.onFail(e);
                    }
                });
            } catch (ConnectionAlreadyClosedException e) {
                // 调用异常就执行服务剔除
                connectionManager.unregister(connectionId);
                requestCallBack.onSuccess();
            } catch (Exception e) {
                Loggers.REMOTE_DIGEST
                        .error('error to send push response to connectionId ={},push response={}', connectionId,
                                request, e);
                requestCallBack.onFail(e);
            }
        } else {
            requestCallBack.onSuccess();
        }
    }
ConnectionManager#unregister
 public synchronized void unregister(String connectionId) {
        Connection remove = this.connections.remove(connectionId);
        if (remove != null) {
            String clientIp = remove.getMetaInfo().clientIp;
            AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
            if (atomicInteger != null) {
                int count = atomicInteger.decrementAndGet();
                if (count  0) {
                    connectionForClientIp.remove(clientIp);
                }
            }
            remove.close();
            LOGGER.info('[{}]Connection unregistered successfully. ', connectionId);
            clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
        }
    }
                                                    
                                                        赞0
                                                        踩0