List/Watch 机制介绍
List/Watch机制是Kubernetes中实现集群控制模块最核心的设计之一,它采用统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性和性能等,为声明式风格的API奠定了良好的基础。
list
是调用list API
获取资源列表,基于HTTP
短链接实现。
watch
则是调用watch API
监听资源变更事件,基于HTTP 长链接
,通过Chunked transfer encoding(分块传输编码)来实现消息通知。
当客户端调用watch API
时,kube-apiserver
在response
的HTTP Header
中设置Transfer-Encoding
的值为chunked
,表示采用分块传输
编码,客户端收到该信息后,便和服务端连接,并等待下一个数据块,即资源的事件信息。例如:
$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/endpoints?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 14 Seo 2022 20:22:59 GMT
Transfer-Encoding: chunked
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
Dubbo 基于 Watch 的服务发现
在Dubbo3.1版本之前,dubbo-kubernetes服务发现是通过Fabric8 Kubernetes Java Client
提供的watch API
监听kube-apiserver中资源的create、update和delete事件,如下:
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
Watch watch = kubernetesClient
.endpoints()
.inNamespace(namespace)
.withName(serviceName)
.watch(new Watcher<Endpoints>() {
@Override
public void eventReceived(Action action, Endpoints resource) {
if (logger.isDebugEnabled()) {
logger.debug("Received Endpoint Event. Event type: " + action.name() +
". Current pod name: " + currentHostname);
}
notifyServiceChanged(serviceName, listener);
}
@Override
public void onClose(WatcherException cause) {
// ignore
}
});
ENDPOINTS_WATCHER.put(serviceName, watch);
}
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
long receivedTime = System.nanoTime();
ServiceInstancesChangedEvent event;
event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
long lastUpdateTime = updateTime.get();
if (lastUpdateTime <= receivedTime) {
if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
listener.onEvent(event);
return;
}
}
if (logger.isInfoEnabled()) {
logger.info("Discard Service Instance Data. " +
"Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
"Current Data received time: " + receivedTime + ". " +
"Newer Data received time: " + lastUpdateTime + ".");
}
}
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
// 直接调用kube-apiserver
Endpoints endpoints =
kubernetesClient
.endpoints()
.inNamespace(namespace)
.withName(serviceName)
.get();
return toServiceInstance(endpoints, serviceName);
}
监听到资源变化后,调用notifyServiceChanged方法从kube-apiserver全量拉取资源list数据,保持dubbo本地侧服务列表的实时性,并做相应的事件发布。
这样的操作存在很严重的问题,由于watch对应的回调函数会将更新的资源返回,dubbo社区考虑到维护成本较高,之前并没有在本地维护关于CRD资源的缓存,这样每次监听到变化后调用list
从kube-apiserver
获取对应serviceName的endpoints信息,无疑增加了一次对kube-apiserver
的直接访问。
clint-go
为解决客户端需要自行维护缓存的问题,推出了informer机制。
Informer 机制介绍
Informer模块是Kubernetes中的基础组件,以List/Watch为基础,负责各组件与kube-apiserver的资源与事件同步。Kubernetes中的组件,如果要访问Kubernetes中的Object,绝大部分情况下会使用Informer中的Lister()方法,而非直接调用kube-apiserver。
以Pod资源为例,介绍下informer的关键逻辑(与下图步骤一一对应):
- Informer 在初始化时,Reflector 会先调用 List 获得所有的 Pod,同时调用Watch长连接监听kube-apiserver。
- Reflector 拿到全部 Pod 后,将
Add Pod
这个事件发送到 DeltaFIFO。 - DeltaFIFO随后pop这个事件到Informer处理。
- Informer向Indexer发布
Add Pod
事件。 - Indexer接到通知后,直接操作Store中的数据(key->value格式)。
- Informer触发EventHandler回调。
- 将key推到Workqueue队列中。
- 从WorkQueue中pop一个key。
- 然后根据key去Indexer取到val。根据当前的EventHandler进行
Add Pod
操作(用户自定义的回调函数)。 - 随后当Watch到kube-apiserver资源有改变的时候,再重复2-9步骤。
(来源于kubernetes/sample-controller)
Informer 关键设计
- 本地缓存:Informer只会调用k8s List和Watch两种类型的API。Informer在初始化的时,先调用List获得某种resource的全部Object,缓存在内存中; 然后,调用Watch API去watch这种resource,去维护这份缓存; 最后,Informer就不再调用kube-apiserver。Informer抽象了cache这个组件,并且实现了store接口,后续获取资源直接通过本地的缓存来进行获取。
- 无界队列:为了协调数据生产与消费的不一致状态,在客户端中通过实现了一个无界队列DeltaFIFO来进行数据的缓冲,当reflector获取到数据之后,只需要将数据推到到DeltaFIFO中,则就可以继续watch后续事件,从而减少阻塞时间,如上图2-3步骤所示。
- 事件去重:在DeltaFIFO中,如果针对某个资源的事件重复被触发,则就只会保留相同事件最后一个事件作为后续处理,有resourceVersion唯一键保证,不会重复消费。
- 复用连接:每一种资源都实现了Informer机制,允许监控不同的资源事件。为了避免同一个资源建立多个Informer,每个Informer使用一个Reflector与apiserver建立链接,导致kube-apiserver负载过高的情况,k8s中抽象了sharedInformer的概念,即共享的informer, 可以使同一类资源Informer共享一个Reflector。内部定义了一个map字段,用于存放所有Infromer的字段。针对同一资源只建立一个连接,减小kube-apiserver的负载。
Dubbo 引入 informer 机制后的服务发现
资源监听更换 Informer API
以Endpoints和Pods为例,将原本的Watch替换为Informer,回调函数分别为onAdd、onUpdate、onDelete,回调参数传的都是informer store中的资源全量值。
/**
* 监听Endpoints
*/
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
SharedIndexInformer<Endpoints> endInformer = kubernetesClient
.endpoints()
.inNamespace(namespace)
.withName(serviceName)
.inform(new ResourceEventHandler<Endpoints>() {
@Override
public void onAdd(Endpoints endpoints) {
if (logger.isDebugEnabled()) {
logger.debug("Received Endpoint Event. Event type: added. Current pod name: " + currentHostname + ". Endpoints is: " + endpoints);
}
notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
}
@Override
public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
if (logger.isDebugEnabled()) {
logger.debug("Received Endpoint Event. Event type: updated. Current pod name: " + currentHostname + ". The new Endpoints is: " + newEndpoints);
}
notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
}
@Override
public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
if (logger.isDebugEnabled()) {
logger.debug("Received Endpoint Event. Event type: deleted. Current pod name: " + currentHostname + ". Endpoints is: " + endpoints);
}
notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
}
});
// 将endInformer存入ENDPOINTS_INFORMER,便于优雅下线统一管理
ENDPOINTS_INFORMER.put(serviceName, endInformer);
}
/**
* 监听Pods
*/
private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
Map<String, String> serviceSelector = getServiceSelector(serviceName);
if (serviceSelector == null) {
return;
}
SharedIndexInformer<Pod> podInformer = kubernetesClient
.pods()
.inNamespace(namespace)
.withLabels(serviceSelector)
.inform(new ResourceEventHandler<Pod>() {
@Override
public void onAdd(Pod pod) {
if (logger.isDebugEnabled()) {
logger.debug("Received Pods Event. Event type: added. Current pod name: " + currentHostname + ". Pod is: " + pod);
}
// 不处理
}
@Override
public void onUpdate(Pod oldPod, Pod newPod) {
if (logger.isDebugEnabled()) {
logger.debug("Received Pods Event. Event type: updated. Current pod name: " + currentHostname + ". new Pod is: " + newPod);
}
// TODO 后续要监听Pod label的变化
notifyServiceChanged(serviceName, listener, getInstances(serviceName));
}
@Override
public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
if (logger.isDebugEnabled()) {
logger.debug("Received Pods Event. Event type: deleted. Current pod name: " + currentHostname + ". Pod is: " + pod);
}
// 不处理
}
});
// 将podInformer存入PODS_INFORMER,便于优雅下线统一管理
PODS_INFORMER.put(serviceName, podInformer);
}
/**
* 通知订阅者Service改变
*/
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
long receivedTime = System.nanoTime();
ServiceInstancesChangedEvent event;
event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
long lastUpdateTime = updateTime.get();
if (lastUpdateTime <= receivedTime) {
if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
// 发布事件
listener.onEvent(event);
return;
}
}
if (logger.isInfoEnabled()) {
logger.info("Discard Service Instance Data. " +
"Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
"Current Data received time: " + receivedTime + ". " +
"Newer Data received time: " + lastUpdateTime + ".");
}
}
getInstances() 优化
引入informer后,无需直接调用list
接口,而是直接从informer的store中获取,减少对kube-apiserver的直接调用。
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
Endpoints endpoints = null;
SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
if (endInformer != null) {
// 直接从informer的store中获取Endpoints信息
List<Endpoints> endpointsList = endInformer.getStore().list();
if (endpointsList.size() > 0) {
endpoints = endpointsList.get(0);
}
}
// 如果endpoints经过上面处理仍为空,属于异常情况,那就从kube-apiserver拉取
if (endpoints == null) {
endpoints = kubernetesClient
.endpoints()
.inNamespace(namespace)
.withName(serviceName)
.get();
}
return toServiceInstance(endpoints, serviceName);
}
结论
优化为Informer后,Dubbo的服务发现不用每次直接调用kube-apiserver
,减小了kube-apiserver
的压力,也大大减少了响应时间,助力Dubbo从传统架构迁移到Kubernetes中。