请教各位大佬Nacos 2.2.1 healthCheckRequest 健康监测,是如何剔除不健康实例的,没有找到梳理清对应源码
先看看这个链接的内容:https://blog.csdn.net/Paranoia_ZK/article/details/133356315
服务注册后,应该是放到任务里面去执行
public NamingSubscriberServiceV2Impl(ClientManagerDelegate clientManager,
ClientServiceIndexesManager indexesManager, ServiceStorage serviceStorage,
NamingMetadataManager metadataManager, PushExecutorDelegate pushExecutor, SwitchDomain switchDomain) {
this.clientManager = clientManager;
this.indexesManager = indexesManager;
// 创建延迟任务执行引擎
this.delayTaskEngine = new PushDelayTaskExecuteEngine(clientManager, indexesManager, serviceStorage,
metadataManager, pushExecutor, switchDomain);
NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
}
public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
PushExecutor pushExecutor, SwitchDomain switchDomain) {
super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
this.clientManager = clientManager;
this.indexesManager = indexesManager;
this.serviceStorage = serviceStorage;
this.metadataManager = metadataManager;
this.pushExecutor = pushExecutor;
this.switchDomain = switchDomain;
// 设置延迟任务的处理器
setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
}
// PushDelayTaskProcessor的process方法
@Override
public boolean process(NacosTask task) {
PushDelayTask pushDelayTask = (PushDelayTask) task;
Service service = pushDelayTask.getService();
NamingExecuteTaskDispatcher.getInstance()
// 待执行的任务[PushExecuteTask]
.dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
return true;
}
// PushExecuteTask 执行方法
@Override
public void run() {
try {
PushDataWrapper wrapper = generatePushData();
ClientManager clientManager = delayTaskEngine.getClientManager();
for (String each : getTargetClientIds()) {
Client client = clientManager.getClient(each);
if (null == client) {
// means this client has disconnect
continue;
}
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
// 开始执行健康检测回调
delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
}
} catch (Exception e) {
Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
}
}
【可忽略】这里是最开始的服务变更和客户端订阅事件 NamingSubscriberServiceV2Impl#onEvent,把相关任务添加到任务执行引擎
@Override
public void onEvent(Event event) {
if (event instanceof ServiceEvent.ServiceChangedEvent) {
// If service changed, push to all subscribers. 服务变化内容订阅
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
MetricsMonitor.incrementServiceChangeCount(service.getNamespace(), service.getGroup(), service.getName());
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
// If service is subscribed by one client, only push this client. 服务订阅
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(),
subscribedEvent.getClientId()));
}
}
推送执行任务的run方法 PushExecuteTask#run -> delayTaskEngine.getPushExecutor().doPushWithCallback
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
// 这里调用了PushExecutorRpcImpl#pushWithCallback
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
callBack, GlobalExecutor.getCallbackExecutor());
}
PushExecutorRpcImpl#doPushWithCallback
@Override
public void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data,
NamingPushCallback callBack) {
ServiceInfo actualServiceInfo = getServiceInfo(data, subscriber);
callBack.setActualServiceInfo(actualServiceInfo);
// 执行心跳检测的访问
pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(actualServiceInfo),
callBack, GlobalExecutor.getCallbackExecutor());
}
RpcPushService#pushWithCallback
/**
* push response with no ack.
*
* @param connectionId connectionId.
* @param request request.
* @param requestCallBack requestCallBack.
*/
public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
Executor executor) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
try {
connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {
@Override
public Executor getExecutor() {
return executor;
}
@Override
public void onResponse(Response response) {
if (response.isSuccess()) {
requestCallBack.onSuccess();
} else {
requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));
}
}
@Override
public void onException(Throwable e) {
requestCallBack.onFail(e);
}
});
} catch (ConnectionAlreadyClosedException e) {
// 调用异常就执行服务剔除
connectionManager.unregister(connectionId);
requestCallBack.onSuccess();
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.error("error to send push response to connectionId ={},push response={}", connectionId,
request, e);
requestCallBack.onFail(e);
}
} else {
requestCallBack.onSuccess();
}
}
ConnectionManager#unregister
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
LOGGER.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。