开发者社区 > 云原生 > 正文

Nacos 2.2.1 healthCheckRequest 健康监测,是如何剔除不健康实例的?

请教各位大佬Nacos 2.2.1 healthCheckRequest 健康监测,是如何剔除不健康实例的,没有找到梳理清对应源码

展开
收起
奔放或澜 2023-08-21 17:56:27 107 0
2 条回答
写回答
取消 提交回答
  • 先看看这个链接的内容:https://blog.csdn.net/Paranoia_ZK/article/details/133356315
    服务注册后,应该是放到任务里面去执行

    image.png

    public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,
                ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,
                NamingMetadataManager metadataManager, PushExecutorDelegate pushExecutor, SwitchDomain switchDomain) {
            this.clientManager = clientManager;
            this.indexesManager = indexesManager;
            // 创建延迟任务执行引擎
            this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,
                    metadataManager, pushExecutor, switchDomain);
            NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
    
        }
    
    public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
                                          ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
                                          PushExecutor pushExecutor, SwitchDomain switchDomain) {
            super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
            this.clientManager = clientManager;
            this.indexesManager = indexesManager;
            this.serviceStorage = serviceStorage;
            this.metadataManager = metadataManager;
            this.pushExecutor = pushExecutor;
            this.switchDomain = switchDomain;
            // 设置延迟任务的处理器
            setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
        }
    
       // PushDelayTaskProcessor的process方法 
      @Override
      public boolean process(NacosTask task) {
                PushDelayTask pushDelayTask = (PushDelayTask) task;
                Service service = pushDelayTask.getService();
                NamingExecuteTaskDispatcher.getInstance()
                          // 待执行的任务[PushExecuteTask]
                        .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
                return true;
            }
    
    // PushExecuteTask 执行方法
    @Override
    public void run() {
            try {
                PushDataWrapper wrapper = generatePushData();
                ClientManager clientManager = delayTaskEngine.getClientManager();
                for (String each : getTargetClientIds()) {
                    Client client = clientManager.getClient(each);
                    if (null == client) {
                        // means this client has disconnect
                        continue;
                    }
                    Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
                    // 开始执行健康检测回调
                    delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                            new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
                }
            } catch (Exception e) {
                Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
                delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
            }
        }
    

    【可忽略】这里是最开始的服务变更和客户端订阅事件 NamingSubscriberServiceV2Impl#onEvent,把相关任务添加到任务执行引擎

        @Override
        public void onEvent(Event event) {
            if (event instanceof ServiceEvent.ServiceChangedEvent) {
                // If service changed, push to all subscribers. 服务变化内容订阅
                ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
                Service service = serviceChangedEvent.getService();
                delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
                MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
            } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
                // If service is subscribed by one client, only push this client. 服务订阅
                ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
                Service service = subscribedEvent.getService();
                delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
                        subscribedEvent.getClientId()));
            }
        }
    

    推送执行任务的run方法 PushExecuteTask#run -> delayTaskEngine.getPushExecutor().doPushWithCallback

        @Override
        public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
                NamingPushCallback callBack) {
            ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
            callBack.setActualServiceInfo(actualServiceInfo);
            // 这里调用了PushExecutorRpcImpl#pushWithCallback
            pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
                    callBack, GlobalExecutor.getCallbackExecutor());
        }
    

    image.png

    PushExecutorRpcImpl#doPushWithCallback

        @Override
        public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
                NamingPushCallback callBack) {
            ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
            callBack.setActualServiceInfo(actualServiceInfo);
            // 执行心跳检测的访问
            pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
                    callBack, GlobalExecutor.getCallbackExecutor());
        }
    

    RpcPushService#pushWithCallback

    /**
         * push response with no ack.
         *
         * @param connectionId    connectionId.
         * @param request         request.
         * @param requestCallBack requestCallBack.
         */
        public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
                Executor executor) {
            Connection connection = connectionManager.getConnection(connectionId);
            if (connection != null) {
                try {
                    connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
    
                        @Override
                        public Executor getExecutor() {
                            return executor;
                        }
    
                        @Override
                        public void onResponse(Response response) {
                            if (response.isSuccess()) {
                                requestCallBack.onSuccess();
                            } else {
                                requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
                            }
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            requestCallBack.onFail(e);
                        }
                    });
                } catch (ConnectionAlreadyClosedException e) {
                    // 调用异常就执行服务剔除
                    connectionManager.unregister(connectionId);
                    requestCallBack.onSuccess();
                } catch (Exception e) {
                    Loggers.REMOTE_DIGEST
                            .error("error to send push response to connectionId ={},push response={}", connectionId,
                                    request, e);
                    requestCallBack.onFail(e);
                }
            } else {
                requestCallBack.onSuccess();
            }
        }
    

    ConnectionManager#unregister

     public synchronized void unregister(String connectionId) {
            Connection remove = this.connections.remove(connectionId);
            if (remove != null) {
                String clientIp = remove.getMetaInfo().clientIp;
                AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
                if (atomicInteger != null) {
                    int count = atomicInteger.decrementAndGet();
                    if (count <= 0) {
                        connectionForClientIp.remove(clientIp);
                    }
                }
                remove.close();
                LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
                clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
            }
        }
    
    2024-07-18 15:27:34
    赞同 展开评论 打赏
  • 我搜集了这几个b6aa8f818da294ec77c927429a86e9d7.png—该回答整理自钉群“Nacos社区群4”

    2023-08-21 20:12:31
    赞同 展开评论 打赏
问答分类:
问答地址:

阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。

相关电子书

更多
Nacos架构&原理 立即下载
workshop专场-微服务专场-开发者动手实践营-微服务-使用Nacos进行服务的动态发现和流量调度 立即下载
Nacos 启航,发布第一个版本, 云原生时代助力用户微服务平台建设 立即下载