Nacos 2.2.1 healthCheckRequest 健康监测,是如何剔除不健康实例的?
先看看这个链接的内容:https://blog.csdn.net/Paranoia_ZK/article/details/133356315服务注册后,应该是放到任务里面去执行
public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,
ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,
NamingMetadataManager metadataManager, PushExecutorDelegate pushExecutor, SwitchDomain switchDomain) {
this.clientManager = clientManager;
this.indexesManager = indexesManager;
// 创建延迟任务执行引擎
this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,
metadataManager, pushExecutor, switchDomain);
NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
}
public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
PushExecutor pushExecutor, SwitchDomain switchDomain) {
super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
this.clientManager = clientManager;
this.indexesManager = indexesManager;
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
this.pushExecutor = pushExecutor;
this.switchDomain = switchDomain;
// 设置延迟任务的处理器
setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
}
// PushDelayTaskProcessor的process方法
@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
// 待执行的任务[PushExecuteTask]
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
// PushExecuteTask 执行方法
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
// 开始执行健康检测回调
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error('Push task for service' + service.getGroupedServiceName() + ' execute failed ', e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
【可忽略】这里是最开始的服务变更和客户端订阅事件 NamingSubscriberServiceV2Impl#onEvent,把相关任务添加到任务执行引擎
@Override
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers. 服务变化内容订阅
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client. 服务订阅
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
推送执行任务的run方法 PushExecuteTask#run -> delayTaskEngine.getPushExecutor().doPushWithCallback
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
// 这里调用了PushExecutorRpcImpl#pushWithCallback
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
callBack, GlobalExecutor.getCallbackExecutor());
}
PushExecutorRpcImpl#doPushWithCallback
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
// 执行心跳检测的访问
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
callBack, GlobalExecutor.getCallbackExecutor());
}
RpcPushService#pushWithCallback
/**
* push response with no ack.
*
* @param connectionId connectionId.
* @param request request.
* @param requestCallBack requestCallBack.
*/
public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
Executor executor) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
try {
connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onResponse(Response response) {
if (response.isSuccess()) {
requestCallBack.onSuccess();
} else {
requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
}
}
@Override
public void onException(Throwable e) {
requestCallBack.onFail(e);
}
});
} catch (ConnectionAlreadyClosedException e) {
// 调用异常就执行服务剔除
connectionManager.unregister(connectionId);
requestCallBack.onSuccess();
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.error('error to send push response to connectionId ={},push response={}', connectionId,
request, e);
requestCallBack.onFail(e);
}
} else {
requestCallBack.onSuccess();
}
}
ConnectionManager#unregister
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
LOGGER.info('[{}]Connection unregistered successfully. ', connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
赞0
踩0