Dubbo3 基于 Kubernetes Informer 的服务发现原理解析

简介: ## List/Watch 机制介绍List/Watch机制是Kubernetes中实现集群控制模块最核心的设计之一,它采用统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性和性能等,为声明式风格的API奠定了良好的基础。`list`是调用`list API`获取资源列表,基于`HTTP`短链接实现。`watch`则是调用`watch API`监听资源变更事件,基于`HTTP

List/Watch 机制介绍

List/Watch机制是Kubernetes中实现集群控制模块最核心的设计之一,它采用统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性和性能等,为声明式风格的API奠定了良好的基础。

list是调用list API获取资源列表,基于HTTP短链接实现。

watch则是调用watch API监听资源变更事件,基于HTTP 长链接,通过Chunked transfer encoding(分块传输编码)来实现消息通知。

当客户端调用watch API时,kube-apiserverresponseHTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便和服务端连接,并等待下一个数据块,即资源的事件信息。例如:

$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/endpoints?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 14 Seo 2022 20:22:59 GMT
Transfer-Encoding: chunked

{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}

Dubbo 基于 Watch 的服务发现

在Dubbo3.1版本之前,dubbo-kubernetes服务发现是通过Fabric8 Kubernetes Java Client提供的watch API监听kube-apiserver中资源的create、update和delete事件,如下:

private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
    Watch watch = kubernetesClient
        .endpoints()
        .inNamespace(namespace)
        .withName(serviceName)
        .watch(new Watcher<Endpoints>() {
            @Override
            public void eventReceived(Action action, Endpoints resource) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Received Endpoint Event. Event type: " + action.name() +
                        ". Current pod name: " + currentHostname);
                }
                notifyServiceChanged(serviceName, listener);
            }
            @Override
            public void onClose(WatcherException cause) {
                // ignore
            }
        });
    ENDPOINTS_WATCHER.put(serviceName, watch);
}

private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
    long receivedTime = System.nanoTime();
    ServiceInstancesChangedEvent event;
    event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
    AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
    long lastUpdateTime = updateTime.get();
    if (lastUpdateTime <= receivedTime) {
        if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
            listener.onEvent(event);
            return;
        }
    }
    if (logger.isInfoEnabled()) {
        logger.info("Discard Service Instance Data. " +
            "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
            "Current Data received time: " + receivedTime + ". " +
            "Newer Data received time: " + lastUpdateTime + ".");
    }
}

@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
    // 直接调用kube-apiserver
    Endpoints endpoints =
        kubernetesClient
            .endpoints()
            .inNamespace(namespace)
            .withName(serviceName)
            .get();
    return toServiceInstance(endpoints, serviceName);
}

监听到资源变化后,调用notifyServiceChanged方法从kube-apiserver全量拉取资源list数据,保持dubbo本地侧服务列表的实时性,并做相应的事件发布。

这样的操作存在很严重的问题,由于watch对应的回调函数会将更新的资源返回,dubbo社区考虑到维护成本较高,之前并没有在本地维护关于CRD资源的缓存,这样每次监听到变化后调用listkube-apiserver获取对应serviceName的endpoints信息,无疑增加了一次对kube-apiserver的直接访问。

clint-go为解决客户端需要自行维护缓存的问题,推出了informer机制。

Informer 机制介绍

Informer模块是Kubernetes中的基础组件,以List/Watch为基础,负责各组件与kube-apiserver的资源与事件同步。Kubernetes中的组件,如果要访问Kubernetes中的Object,绝大部分情况下会使用Informer中的Lister()方法,而非直接调用kube-apiserver。

以Pod资源为例,介绍下informer的关键逻辑(与下图步骤一一对应):

  1. Informer 在初始化时,Reflector 会先调用 List 获得所有的 Pod,同时调用Watch长连接监听kube-apiserver。
  2. Reflector 拿到全部 Pod 后,将Add Pod这个事件发送到 DeltaFIFO。
  3. DeltaFIFO随后pop这个事件到Informer处理。
  4. Informer向Indexer发布Add Pod事件。
  5. Indexer接到通知后,直接操作Store中的数据(key->value格式)。
  6. Informer触发EventHandler回调。
  7. 将key推到Workqueue队列中。
  8. 从WorkQueue中pop一个key。
  9. 然后根据key去Indexer取到val。根据当前的EventHandler进行Add Pod操作(用户自定义的回调函数)。
  10. 随后当Watch到kube-apiserver资源有改变的时候,再重复2-9步骤。


(来源于kubernetes/sample-controller)

Informer 关键设计

  • 本地缓存:Informer只会调用k8s List和Watch两种类型的API。Informer在初始化的时,先调用List获得某种resource的全部Object,缓存在内存中; 然后,调用Watch API去watch这种resource,去维护这份缓存; 最后,Informer就不再调用kube-apiserver。Informer抽象了cache这个组件,并且实现了store接口,后续获取资源直接通过本地的缓存来进行获取。
  • 无界队列:为了协调数据生产与消费的不一致状态,在客户端中通过实现了一个无界队列DeltaFIFO来进行数据的缓冲,当reflector获取到数据之后,只需要将数据推到到DeltaFIFO中,则就可以继续watch后续事件,从而减少阻塞时间,如上图2-3步骤所示。
  • 事件去重:在DeltaFIFO中,如果针对某个资源的事件重复被触发,则就只会保留相同事件最后一个事件作为后续处理,有resourceVersion唯一键保证,不会重复消费。
  • 复用连接:每一种资源都实现了Informer机制,允许监控不同的资源事件。为了避免同一个资源建立多个Informer,每个Informer使用一个Reflector与apiserver建立链接,导致kube-apiserver负载过高的情况,k8s中抽象了sharedInformer的概念,即共享的informer, 可以使同一类资源Informer共享一个Reflector。内部定义了一个map字段,用于存放所有Infromer的字段。针对同一资源只建立一个连接,减小kube-apiserver的负载。

Dubbo 引入 informer 机制后的服务发现

资源监听更换 Informer API

以Endpoints和Pods为例,将原本的Watch替换为Informer,回调函数分别为onAdd、onUpdate、onDelete,回调参数传的都是informer store中的资源全量值。

/**
 * 监听Endpoints
 */
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
    SharedIndexInformer<Endpoints> endInformer = kubernetesClient
            .endpoints()
            .inNamespace(namespace)
            .withName(serviceName)
            .inform(new ResourceEventHandler<Endpoints>() {
                @Override
                public void onAdd(Endpoints endpoints) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Endpoint Event. Event type: added. Current pod name: " + currentHostname + ". Endpoints is: " + endpoints);
                    }
                    notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
                }

                @Override
                public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Endpoint Event. Event type: updated. Current pod name: " + currentHostname + ". The new Endpoints is: " + newEndpoints);
                    }
                    notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
                }

                @Override
                public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Endpoint Event. Event type: deleted. Current pod name: " + currentHostname + ". Endpoints is: " + endpoints);
                    }
                    notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
                }
            });
    // 将endInformer存入ENDPOINTS_INFORMER,便于优雅下线统一管理
    ENDPOINTS_INFORMER.put(serviceName, endInformer);
}
/**
 * 监听Pods
 */
private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
    Map<String, String> serviceSelector = getServiceSelector(serviceName);
    if (serviceSelector == null) {
        return;
    }
    SharedIndexInformer<Pod> podInformer = kubernetesClient
            .pods()
            .inNamespace(namespace)
            .withLabels(serviceSelector)
            .inform(new ResourceEventHandler<Pod>() {
                @Override
                public void onAdd(Pod pod) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Pods Event. Event type: added. Current pod name: " + currentHostname + ". Pod is: " + pod);
                    }
                    // 不处理
                }
                @Override
                public void onUpdate(Pod oldPod, Pod newPod) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Pods Event. Event type: updated. Current pod name: " + currentHostname + ". new Pod is: " + newPod);
                    }
                    // TODO 后续要监听Pod label的变化
                    notifyServiceChanged(serviceName, listener, getInstances(serviceName));
                }

                @Override
                public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Pods Event. Event type: deleted. Current pod name: " + currentHostname + ". Pod is: " + pod);
                    }
                    // 不处理
                }
            });
    // 将podInformer存入PODS_INFORMER,便于优雅下线统一管理
    PODS_INFORMER.put(serviceName, podInformer);
}

/**
 * 通知订阅者Service改变
 */
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
    long receivedTime = System.nanoTime();
    ServiceInstancesChangedEvent event;
    event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
    AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
    long lastUpdateTime = updateTime.get();
    if (lastUpdateTime <= receivedTime) {
        if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
            // 发布事件
            listener.onEvent(event);
            return;
        }
    }
    if (logger.isInfoEnabled()) {
        logger.info("Discard Service Instance Data. " +
                "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
                "Current Data received time: " + receivedTime + ". " +
                "Newer Data received time: " + lastUpdateTime + ".");
    }
}

getInstances() 优化

引入informer后,无需直接调用list接口,而是直接从informer的store中获取,减少对kube-apiserver的直接调用。

public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
    Endpoints endpoints = null;
    SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
    if (endInformer != null) {
        // 直接从informer的store中获取Endpoints信息
        List<Endpoints> endpointsList = endInformer.getStore().list();
        if (endpointsList.size() > 0) {
            endpoints = endpointsList.get(0);
        }
    }
    // 如果endpoints经过上面处理仍为空,属于异常情况,那就从kube-apiserver拉取
    if (endpoints == null) {
        endpoints = kubernetesClient
                .endpoints()
                .inNamespace(namespace)
                .withName(serviceName)
                .get();
    }

    return toServiceInstance(endpoints, serviceName);
}

结论

优化为Informer后,Dubbo的服务发现不用每次直接调用kube-apiserver,减小了kube-apiserver的压力,也大大减少了响应时间,助力Dubbo从传统架构迁移到Kubernetes中。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
9月前
|
安全 算法 网络协议
解析:HTTPS通过SSL/TLS证书加密的原理与逻辑
HTTPS通过SSL/TLS证书加密,结合对称与非对称加密及数字证书验证实现安全通信。首先,服务器发送含公钥的数字证书,客户端验证其合法性后生成随机数并用公钥加密发送给服务器,双方据此生成相同的对称密钥。后续通信使用对称加密确保高效性和安全性。同时,数字证书验证服务器身份,防止中间人攻击;哈希算法和数字签名确保数据完整性,防止篡改。整个流程保障了身份认证、数据加密和完整性保护。
|
8月前
|
机器学习/深度学习 数据可视化 PyTorch
深入解析图神经网络注意力机制:数学原理与可视化实现
本文深入解析了图神经网络(GNNs)中自注意力机制的内部运作原理,通过可视化和数学推导揭示其工作机制。文章采用“位置-转移图”概念框架,并使用NumPy实现代码示例,逐步拆解自注意力层的计算过程。文中详细展示了从节点特征矩阵、邻接矩阵到生成注意力权重的具体步骤,并通过四个类(GAL1至GAL4)模拟了整个计算流程。最终,结合实际PyTorch Geometric库中的代码,对比分析了核心逻辑,为理解GNN自注意力机制提供了清晰的学习路径。
610 7
深入解析图神经网络注意力机制:数学原理与可视化实现
|
8月前
|
机器学习/深度学习 缓存 自然语言处理
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
Tiktokenizer 是一款现代分词工具,旨在高效、智能地将文本转换为机器可处理的离散单元(token)。它不仅超越了传统的空格分割和正则表达式匹配方法,还结合了上下文感知能力,适应复杂语言结构。Tiktokenizer 的核心特性包括自适应 token 分割、高效编码能力和出色的可扩展性,使其适用于从聊天机器人到大规模文本分析等多种应用场景。通过模块化设计,Tiktokenizer 确保了代码的可重用性和维护性,并在分词精度、处理效率和灵活性方面表现出色。此外,它支持多语言处理、表情符号识别和领域特定文本处理,能够应对各种复杂的文本输入需求。
1062 6
深入解析Tiktokenizer:大语言模型中核心分词技术的原理与架构
|
8月前
|
传感器 人工智能 监控
反向寻车系统怎么做?基本原理与系统组成解析
本文通过反向寻车系统的核心组成部分与技术分析,阐述反向寻车系统的工作原理,适用于适用于商场停车场、医院停车场及火车站停车场等。如需获取智慧停车场反向寻车技术方案前往文章最下方获取,如有项目合作及技术交流欢迎私信作者。
612 2
|
9月前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
1135 12
|
9月前
|
开发框架 监控 JavaScript
解锁鸿蒙装饰器:应用、原理与优势全解析
ArkTS提供了多维度的状态管理机制。在UI开发框架中,与UI相关联的数据可以在组件内使用,也可以在不同组件层级间传递,比如父子组件之间、爷孙组件之间,还可以在应用全局范围内传递或跨设备传递。
265 2
|
8月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
1月前
|
人工智能 算法 调度
阿里云ACK托管集群Pro版共享GPU调度操作指南
本文介绍在阿里云ACK托管集群Pro版中,如何通过共享GPU调度实现显存与算力的精细化分配,涵盖前提条件、使用限制、节点池配置及任务部署全流程,提升GPU资源利用率,适用于AI训练与推理场景。
213 1
|
1月前
|
弹性计算 监控 调度
ACK One 注册集群云端节点池升级:IDC 集群一键接入云端 GPU 算力,接入效率提升 80%
ACK One注册集群节点池实现“一键接入”,免去手动编写脚本与GPU驱动安装,支持自动扩缩容与多场景调度,大幅提升K8s集群管理效率。
222 89
|
6月前
|
资源调度 Kubernetes 调度
从单集群到多集群的快速无损转型:ACK One 多集群应用分发
ACK One 的多集群应用分发,可以最小成本地结合您已有的单集群 CD 系统,无需对原先应用资源 YAML 进行修改,即可快速构建成多集群的 CD 系统,并同时获得强大的多集群资源调度和分发的能力。
270 9

推荐镜像

更多
  • DNS