游客tj43ub7tbifls_个人页

个人头像照片 游客tj43ub7tbifls
0
1
0

个人介绍

暂无个人介绍

擅长的技术

获得更多能力
通用技术能力:

暂时未有相关通用技术能力~

云产品技术能力:

暂时未有相关云产品技术能力~

阿里云技能认证

详细说明
暂无更多信息

2024年07月

正在加载, 请稍后...
暂无更多信息
  • 回答了问题 2024-07-18

    Nacos 2.2.1 healthCheckRequest 健康监测,是如何剔除不健康实例的?

    先看看这个链接的内容:https://blog.csdn.net/Paranoia_ZK/article/details/133356315服务注册后,应该是放到任务里面去执行 public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager, ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage, NamingMetadataManager metadataManager, PushExecutorDelegate pushExecutor, SwitchDomain switchDomain) { this.clientManager = clientManager; this.indexesManager = indexesManager; // 创建延迟任务执行引擎 this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage, metadataManager, pushExecutor, switchDomain); NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance()); } public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage, NamingMetadataManager metadataManager, PushExecutor pushExecutor, SwitchDomain switchDomain) { super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH); this.clientManager = clientManager; this.indexesManager = indexesManager; this.serviceStorage = serviceStorage; this.metadataManager = metadataManager; this.pushExecutor = pushExecutor; this.switchDomain = switchDomain; // 设置延迟任务的处理器 setDefaultTaskProcessor(new PushDelayTaskProcessor(this)); } // PushDelayTaskProcessor的process方法 @Override public boolean process(NacosTask task) { PushDelayTask pushDelayTask = (PushDelayTask) task; Service service = pushDelayTask.getService(); NamingExecuteTaskDispatcher.getInstance() // 待执行的任务[PushExecuteTask] .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask)); return true; } // PushExecuteTask 执行方法 @Override public void run() { try { PushDataWrapper wrapper = generatePushData(); ClientManager clientManager = delayTaskEngine.getClientManager(); for (String each : getTargetClientIds()) { Client client = clientManager.getClient(each); if (null == client) { // means this client has disconnect continue; } Subscriber subscriber = clientManager.getClient(each).getSubscriber(service); // 开始执行健康检测回调 delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper, new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll())); } } catch (Exception e) { Loggers.PUSH.error('Push task for service' + service.getGroupedServiceName() + ' execute failed ', e); delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L)); } } 【可忽略】这里是最开始的服务变更和客户端订阅事件 NamingSubscriberServiceV2Impl#onEvent,把相关任务添加到任务执行引擎 @Override public void onEvent(Event event) { if (event instanceof ServiceEvent.ServiceChangedEvent) { // If service changed, push to all subscribers. 服务变化内容订阅 ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName()); } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { // If service is subscribed by one client, only push this client. 服务订阅 ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; Service service = subscribedEvent.getService(); delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); } } 推送执行任务的run方法 PushExecuteTask#run -> delayTaskEngine.getPushExecutor().doPushWithCallback @Override public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, NamingPushCallback callBack) { ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber); callBack.setActualServiceInfo(actualServiceInfo); // 这里调用了PushExecutorRpcImpl#pushWithCallback pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo), callBack, GlobalExecutor.getCallbackExecutor()); } PushExecutorRpcImpl#doPushWithCallback @Override public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, NamingPushCallback callBack) { ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber); callBack.setActualServiceInfo(actualServiceInfo); // 执行心跳检测的访问 pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo), callBack, GlobalExecutor.getCallbackExecutor()); } RpcPushService#pushWithCallback /** * push response with no ack. * * @param connectionId connectionId. * @param request request. * @param requestCallBack requestCallBack. */ public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { try { connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) { @Override public Executor getExecutor() { return executor; } @Override public void onResponse(Response response) { if (response.isSuccess()) { requestCallBack.onSuccess(); } else { requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage())); } } @Override public void onException(Throwable e) { requestCallBack.onFail(e); } }); } catch (ConnectionAlreadyClosedException e) { // 调用异常就执行服务剔除 connectionManager.unregister(connectionId); requestCallBack.onSuccess(); } catch (Exception e) { Loggers.REMOTE_DIGEST .error('error to send push response to connectionId ={},push response={}', connectionId, request, e); requestCallBack.onFail(e); } } else { requestCallBack.onSuccess(); } } ConnectionManager#unregister public synchronized void unregister(String connectionId) { Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count 0) { connectionForClientIp.remove(clientIp); } } remove.close(); LOGGER.info('[{}]Connection unregistered successfully. ', connectionId); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); } }
    踩0 评论0
正在加载, 请稍后...
滑动查看更多
正在加载, 请稍后...
暂无更多信息