Dubbo3 基于 Kubernetes Informer 的服务发现原理解析

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: ## List/Watch 机制介绍List/Watch机制是Kubernetes中实现集群控制模块最核心的设计之一,它采用统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性和性能等,为声明式风格的API奠定了良好的基础。`list`是调用`list API`获取资源列表,基于`HTTP`短链接实现。`watch`则是调用`watch API`监听资源变更事件,基于`HTTP

List/Watch 机制介绍

List/Watch机制是Kubernetes中实现集群控制模块最核心的设计之一,它采用统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性和性能等,为声明式风格的API奠定了良好的基础。

list是调用list API获取资源列表,基于HTTP短链接实现。

watch则是调用watch API监听资源变更事件,基于HTTP 长链接,通过Chunked transfer encoding(分块传输编码)来实现消息通知。

当客户端调用watch API时,kube-apiserverresponseHTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便和服务端连接,并等待下一个数据块,即资源的事件信息。例如:

$ curl -i http://{kube-api-server-ip}:8080/api/v1/watch/endpoints?watch=yes
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 14 Seo 2022 20:22:59 GMT
Transfer-Encoding: chunked

{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"ADDED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}
{"type":"MODIFIED", "object":{"kind":"Endpoints","apiVersion":"v1",...}}

Dubbo 基于 Watch 的服务发现

在Dubbo3.1版本之前,dubbo-kubernetes服务发现是通过Fabric8 Kubernetes Java Client提供的watch API监听kube-apiserver中资源的create、update和delete事件,如下:

private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
    Watch watch = kubernetesClient
        .endpoints()
        .inNamespace(namespace)
        .withName(serviceName)
        .watch(new Watcher<Endpoints>() {
            @Override
            public void eventReceived(Action action, Endpoints resource) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Received Endpoint Event. Event type: " + action.name() +
                        ". Current pod name: " + currentHostname);
                }
                notifyServiceChanged(serviceName, listener);
            }
            @Override
            public void onClose(WatcherException cause) {
                // ignore
            }
        });
    ENDPOINTS_WATCHER.put(serviceName, watch);
}

private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
    long receivedTime = System.nanoTime();
    ServiceInstancesChangedEvent event;
    event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
    AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
    long lastUpdateTime = updateTime.get();
    if (lastUpdateTime <= receivedTime) {
        if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
            listener.onEvent(event);
            return;
        }
    }
    if (logger.isInfoEnabled()) {
        logger.info("Discard Service Instance Data. " +
            "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
            "Current Data received time: " + receivedTime + ". " +
            "Newer Data received time: " + lastUpdateTime + ".");
    }
}

@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
    // 直接调用kube-apiserver
    Endpoints endpoints =
        kubernetesClient
            .endpoints()
            .inNamespace(namespace)
            .withName(serviceName)
            .get();
    return toServiceInstance(endpoints, serviceName);
}

监听到资源变化后,调用notifyServiceChanged方法从kube-apiserver全量拉取资源list数据,保持dubbo本地侧服务列表的实时性,并做相应的事件发布。

这样的操作存在很严重的问题,由于watch对应的回调函数会将更新的资源返回,dubbo社区考虑到维护成本较高,之前并没有在本地维护关于CRD资源的缓存,这样每次监听到变化后调用listkube-apiserver获取对应serviceName的endpoints信息,无疑增加了一次对kube-apiserver的直接访问。

clint-go为解决客户端需要自行维护缓存的问题,推出了informer机制。

Informer 机制介绍

Informer模块是Kubernetes中的基础组件,以List/Watch为基础,负责各组件与kube-apiserver的资源与事件同步。Kubernetes中的组件,如果要访问Kubernetes中的Object,绝大部分情况下会使用Informer中的Lister()方法,而非直接调用kube-apiserver。

以Pod资源为例,介绍下informer的关键逻辑(与下图步骤一一对应):

  1. Informer 在初始化时,Reflector 会先调用 List 获得所有的 Pod,同时调用Watch长连接监听kube-apiserver。
  2. Reflector 拿到全部 Pod 后,将Add Pod这个事件发送到 DeltaFIFO。
  3. DeltaFIFO随后pop这个事件到Informer处理。
  4. Informer向Indexer发布Add Pod事件。
  5. Indexer接到通知后,直接操作Store中的数据(key->value格式)。
  6. Informer触发EventHandler回调。
  7. 将key推到Workqueue队列中。
  8. 从WorkQueue中pop一个key。
  9. 然后根据key去Indexer取到val。根据当前的EventHandler进行Add Pod操作(用户自定义的回调函数)。
  10. 随后当Watch到kube-apiserver资源有改变的时候,再重复2-9步骤。


(来源于kubernetes/sample-controller)

Informer 关键设计

  • 本地缓存:Informer只会调用k8s List和Watch两种类型的API。Informer在初始化的时,先调用List获得某种resource的全部Object,缓存在内存中; 然后,调用Watch API去watch这种resource,去维护这份缓存; 最后,Informer就不再调用kube-apiserver。Informer抽象了cache这个组件,并且实现了store接口,后续获取资源直接通过本地的缓存来进行获取。
  • 无界队列:为了协调数据生产与消费的不一致状态,在客户端中通过实现了一个无界队列DeltaFIFO来进行数据的缓冲,当reflector获取到数据之后,只需要将数据推到到DeltaFIFO中,则就可以继续watch后续事件,从而减少阻塞时间,如上图2-3步骤所示。
  • 事件去重:在DeltaFIFO中,如果针对某个资源的事件重复被触发,则就只会保留相同事件最后一个事件作为后续处理,有resourceVersion唯一键保证,不会重复消费。
  • 复用连接:每一种资源都实现了Informer机制,允许监控不同的资源事件。为了避免同一个资源建立多个Informer,每个Informer使用一个Reflector与apiserver建立链接,导致kube-apiserver负载过高的情况,k8s中抽象了sharedInformer的概念,即共享的informer, 可以使同一类资源Informer共享一个Reflector。内部定义了一个map字段,用于存放所有Infromer的字段。针对同一资源只建立一个连接,减小kube-apiserver的负载。

Dubbo 引入 informer 机制后的服务发现

资源监听更换 Informer API

以Endpoints和Pods为例,将原本的Watch替换为Informer,回调函数分别为onAdd、onUpdate、onDelete,回调参数传的都是informer store中的资源全量值。

/**
 * 监听Endpoints
 */
private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
    SharedIndexInformer<Endpoints> endInformer = kubernetesClient
            .endpoints()
            .inNamespace(namespace)
            .withName(serviceName)
            .inform(new ResourceEventHandler<Endpoints>() {
                @Override
                public void onAdd(Endpoints endpoints) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Endpoint Event. Event type: added. Current pod name: " + currentHostname + ". Endpoints is: " + endpoints);
                    }
                    notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
                }

                @Override
                public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Endpoint Event. Event type: updated. Current pod name: " + currentHostname + ". The new Endpoints is: " + newEndpoints);
                    }
                    notifyServiceChanged(serviceName, listener, toServiceInstance(newEndpoints, serviceName));
                }

                @Override
                public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Endpoint Event. Event type: deleted. Current pod name: " + currentHostname + ". Endpoints is: " + endpoints);
                    }
                    notifyServiceChanged(serviceName, listener, toServiceInstance(endpoints, serviceName));
                }
            });
    // 将endInformer存入ENDPOINTS_INFORMER,便于优雅下线统一管理
    ENDPOINTS_INFORMER.put(serviceName, endInformer);
}
/**
 * 监听Pods
 */
private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
    Map<String, String> serviceSelector = getServiceSelector(serviceName);
    if (serviceSelector == null) {
        return;
    }
    SharedIndexInformer<Pod> podInformer = kubernetesClient
            .pods()
            .inNamespace(namespace)
            .withLabels(serviceSelector)
            .inform(new ResourceEventHandler<Pod>() {
                @Override
                public void onAdd(Pod pod) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Pods Event. Event type: added. Current pod name: " + currentHostname + ". Pod is: " + pod);
                    }
                    // 不处理
                }
                @Override
                public void onUpdate(Pod oldPod, Pod newPod) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Pods Event. Event type: updated. Current pod name: " + currentHostname + ". new Pod is: " + newPod);
                    }
                    // TODO 后续要监听Pod label的变化
                    notifyServiceChanged(serviceName, listener, getInstances(serviceName));
                }

                @Override
                public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received Pods Event. Event type: deleted. Current pod name: " + currentHostname + ". Pod is: " + pod);
                    }
                    // 不处理
                }
            });
    // 将podInformer存入PODS_INFORMER,便于优雅下线统一管理
    PODS_INFORMER.put(serviceName, podInformer);
}

/**
 * 通知订阅者Service改变
 */
private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
    long receivedTime = System.nanoTime();
    ServiceInstancesChangedEvent event;
    event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
    AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
    long lastUpdateTime = updateTime.get();
    if (lastUpdateTime <= receivedTime) {
        if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
            // 发布事件
            listener.onEvent(event);
            return;
        }
    }
    if (logger.isInfoEnabled()) {
        logger.info("Discard Service Instance Data. " +
                "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
                "Current Data received time: " + receivedTime + ". " +
                "Newer Data received time: " + lastUpdateTime + ".");
    }
}

getInstances() 优化

引入informer后,无需直接调用list接口,而是直接从informer的store中获取,减少对kube-apiserver的直接调用。

public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
    Endpoints endpoints = null;
    SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
    if (endInformer != null) {
        // 直接从informer的store中获取Endpoints信息
        List<Endpoints> endpointsList = endInformer.getStore().list();
        if (endpointsList.size() > 0) {
            endpoints = endpointsList.get(0);
        }
    }
    // 如果endpoints经过上面处理仍为空,属于异常情况,那就从kube-apiserver拉取
    if (endpoints == null) {
        endpoints = kubernetesClient
                .endpoints()
                .inNamespace(namespace)
                .withName(serviceName)
                .get();
    }

    return toServiceInstance(endpoints, serviceName);
}

结论

优化为Informer后,Dubbo的服务发现不用每次直接调用kube-apiserver,减小了kube-apiserver的压力,也大大减少了响应时间,助力Dubbo从传统架构迁移到Kubernetes中。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
1月前
|
存储 算法 Java
解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用
在Java中,Set接口以其独特的“无重复”特性脱颖而出。本文通过解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用。
41 3
|
13天前
|
运维 Kubernetes Cloud Native
Kubernetes云原生架构深度解析与实践指南####
本文深入探讨了Kubernetes作为领先的云原生应用编排平台,其设计理念、核心组件及高级特性。通过剖析Kubernetes的工作原理,结合具体案例分析,为读者呈现如何在实际项目中高效部署、管理和扩展容器化应用的策略与技巧。文章还涵盖了服务发现、负载均衡、配置管理、自动化伸缩等关键议题,旨在帮助开发者和运维人员掌握利用Kubernetes构建健壮、可伸缩的云原生生态系统的能力。 ####
|
19天前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
32 1
|
25天前
|
数据采集 存储 编解码
一份简明的 Base64 原理解析
Base64 编码器的原理,其实很简单,花一点点时间学会它,你就又消除了一个知识盲点。
66 3
|
6天前
|
存储 供应链 物联网
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
|
6天前
|
存储 供应链 安全
深度解析区块链技术的核心原理与应用前景
深度解析区块链技术的核心原理与应用前景
15 0
|
10天前
|
存储 Kubernetes 调度
深度解析Kubernetes中的Pod生命周期管理
深度解析Kubernetes中的Pod生命周期管理
|
1月前
|
前端开发 Java 应用服务中间件
21张图解析Tomcat运行原理与架构全貌
【10月更文挑战第2天】本文通过21张图详细解析了Tomcat的运行原理与架构。Tomcat作为Java Web开发中最流行的Web服务器之一,其架构设计精妙。文章首先介绍了Tomcat的基本组件:Connector(连接器)负责网络通信,Container(容器)处理业务逻辑。连接器内部包括EndPoint、Processor和Adapter等组件,分别处理通信、协议解析和请求封装。容器采用多级结构(Engine、Host、Context、Wrapper),并通过Mapper组件进行请求路由。文章还探讨了Tomcat的生命周期管理、启动与停止机制,并通过源码分析展示了请求处理流程。
|
1月前
|
搜索推荐 Shell
解析排序算法:十大排序方法的工作原理与性能比较
解析排序算法:十大排序方法的工作原理与性能比较
51 9
|
1月前
|
开发框架 缓存 前端开发
electron-builder 解析:你了解其背后的构建原理吗?
本文首发于微信公众号“前端徐徐”,详细解析了 electron-builder 的工作原理。electron-builder 是一个专为整合前端项目与 Electron 应用的打包工具,负责管理依赖、生成配置文件及多平台构建。文章介绍了前端项目的构建流程、配置信息收集、依赖处理、asar 打包、附加资源准备、Electron 打包、代码签名、资源压缩、卸载程序生成、安装程序生成及最终安装包输出等环节。通过剖析 electron-builder 的原理,帮助开发者更好地理解和掌握跨端桌面应用的构建流程。
89 2

推荐镜像

更多