开发 k8s 管理平台 - k8sailor 13. 使用 k8s informer 订阅集群事件

简介: 开发 k8s 管理平台 - k8sailor 13. 使用 k8s informer 订阅集群事件

开发 k8s 管理平台 - k8sailor 13. 使用 k8s informer 订阅集群事件

原文地址: https://tangx.in/posts/books/k8sailor/chapter02/13-k8s-informer/
tag: https://github.com/tangx/k8sailor/tree/feat/13-k8s-informer

informer-design.jpeg

从应用层面来说, 创建 informer 并启动之后就与 k8s cluster 创建了一个长链接并订阅了 某个资源 Resource 的变化。

至于订阅后得到的数据要怎么用完全取决于订阅者的业务设计。

Shared Informer Factory 共享机制

Informer 又称为 Shared Informer,表明是可以共享使用的,在使用 client-go 写代码时,若同一资源的 Informer 被实例化太多次,每个 Informer 使用一个 Reflector,会运行过多的相同 ListAndWatch(即图中的第一步),太多重复的序列化和反序列化会导致 k8s API Server 负载过重。

而 Shared Informer 通过对同一类资源 Informer 共享一个 Reflector 可以节约很多资源,这通过 map 数据结构即可实现这样一个共享 Informer 机制。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}
  1. informer.Run(stopCh) : informer 在启动的时候需要传入一个 通知停止的 channel stopCh <-chan struct{}。 因此用户是可以 主动 关闭 informer 通道的。
  2. 所有 informer 都是通过 go 协程跑在后台的。

Shared Informer Factory 注册 infomers

上面提到, Shared Informer Factory 一个很重要的作用就是

  1. 管理注册 Informer
  2. 防止相同类型的 Infomer 重复注册
// WithEventHandlers 注册 handler
func (inf *Informer) WithEventHandlers(handlers ...InformerEventHandler) *Informer {
    for _, handler := range handlers {
        kind := handler.InformerKind()
        switch kind {
        case "deployment":
            inf.factory.Apps().V1().Deployments().Informer().AddEventHandler(handler)
        case "pod":
            inf.factory.Core().V1().Pods().Informer().AddEventHandler(handler)
        }
    }

    return inf
}

informer event handler

所有的 informer handler 满足接口

type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

即处理 OnAdd, OnUpdate, OnDelete 三种事件。

项目案例

例如在本项目中

k8sailor-with-informer.png

在本地创建了一个名为 k8scache 的存储空间, 使用 k8s informer 订阅了 Deployment 的数据并保存到了 本地 并对外提供 deployment 的查询功能。

由于项目本地都存的是数据副本,只提供了 查询 namespace 下的所有数据根据 name 查询某个 deployment 这样简单的功能。 原本 k8s 通过 label 查询 这样 好用 功能, 目前也就无法再提供了。

自此, biz 代码逻辑中与 Deployment 相关的查询使用的数据源都修改为 k8scache

// GetDeploymentByName 通过名称获取 deployment
func GetDeploymentByName(ctx context.Context, input GetDeploymentByNameInput) (*Deployment, error) {

    /* k8s api 返回的数据 */
    // v1dep, err := k8sdao.GetDeploymentByName(ctx, input.Namespace, input.Name)

    /* 使用本地的 k8scache */
    v1dep, err := k8scache.DepTank.GetDeploymentByName(ctx, input.Namespace, input.Name)
    if err != nil {
        return nil, err
    }

    dep := extractDeployment(*v1dep)
    return dep, nil
}

informer 启动

上面已经说了, 使用 informer 创建的是一个独立 应用/模块, 可以作为一个 模块存在于应用内部, 也可以作为一个 独立的应用

无论是怎么定义的, informer 的启动和关闭都 必须要 独立控制。

本项目中是作为一个模块, 在启动 http server 之前进行启动。

var cmdHttpserver = &cobra.Command{
    Use:  "httpserver",
    Long: "启动 web 服务器",
    Run: func(cmd *cobra.Command, args []string) {
        // 启动 informer
        runInformer()

        // 启动服务
        runHttpserver()
    },
}

func runInformer() {

    clientset := global.KubeClient.Client()
    informer := global.KubeInformer.WithClientset(clientset)

    k8scache.RegisterHandlers(informer)

    informer.Start()
}

分层后代码实现的问题

其实上述 切换数据源 的实现是有问题的。

数据源的切换不应该是在 biz 中完成, 而是应该在 k8sdao 中进行。

  1. 从逻辑上来说 k8scachek8s 是一层的, 都是数据的提供者。
  2. DAO 的含义是 数据访问对象 data access object, 其职责就是对 来自不同数据源的相同数据 执行 适合自身业务逻辑的抽象和封装

扩展阅读

深入理解 k8s informer: https://cloudnative.to/blog/client-go-informer-source-code/
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
1月前
|
人工智能 算法 调度
阿里云ACK托管集群Pro版共享GPU调度操作指南
本文介绍在阿里云ACK托管集群Pro版中,如何通过共享GPU调度实现显存与算力的精细化分配,涵盖前提条件、使用限制、节点池配置及任务部署全流程,提升GPU资源利用率,适用于AI训练与推理场景。
223 1
|
1月前
|
弹性计算 监控 调度
ACK One 注册集群云端节点池升级:IDC 集群一键接入云端 GPU 算力,接入效率提升 80%
ACK One注册集群节点池实现“一键接入”,免去手动编写脚本与GPU驱动安装,支持自动扩缩容与多场景调度,大幅提升K8s集群管理效率。
223 89
|
6月前
|
资源调度 Kubernetes 调度
从单集群到多集群的快速无损转型:ACK One 多集群应用分发
ACK One 的多集群应用分发,可以最小成本地结合您已有的单集群 CD 系统,无需对原先应用资源 YAML 进行修改,即可快速构建成多集群的 CD 系统,并同时获得强大的多集群资源调度和分发的能力。
272 9
|
6月前
|
资源调度 Kubernetes 调度
从单集群到多集群的快速无损转型:ACK One 多集群应用分发
本文介绍如何利用阿里云的分布式云容器平台ACK One的多集群应用分发功能,结合云效CD能力,快速将单集群CD系统升级为多集群CD系统。通过增加分发策略(PropagationPolicy)和差异化策略(OverridePolicy),并修改单集群kubeconfig为舰队kubeconfig,可实现无损改造。该方案具备多地域多集群智能资源调度、重调度及故障迁移等能力,帮助用户提升业务效率与可靠性。
|
8月前
|
存储 Kubernetes 监控
K8s集群实战:使用kubeadm和kuboard部署Kubernetes集群
总之,使用kubeadm和kuboard部署K8s集群就像回归童年一样,简单又有趣。不要忘记,技术是为人服务的,用K8s集群操控云端资源,我们不过是想在复杂的世界找寻简单。尽管部署过程可能遇到困难,但朝着简化复杂的目标,我们就能找到意义和乐趣。希望你也能利用这些工具,找到你的乐趣,满足你的需求。
826 33
|
8月前
|
Kubernetes 开发者 Docker
集群部署:使用Rancher部署Kubernetes集群。
以上就是使用 Rancher 部署 Kubernetes 集群的流程。使用 Rancher 和 Kubernetes,开发者可以受益于灵活性和可扩展性,允许他们在多种环境中运行多种应用,同时利用自动化工具使工作负载更加高效。
483 19
|
8月前
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
11月前
|
Prometheus Kubernetes 监控
OpenAI故障复盘 - 阿里云容器服务与可观测产品如何保障大规模K8s集群稳定性
聚焦近日OpenAI的大规模K8s集群故障,介绍阿里云容器服务与可观测团队在大规模K8s场景下我们的建设与沉淀。以及分享对类似故障问题的应对方案:包括在K8s和Prometheus的高可用架构设计方面、事前事后的稳定性保障体系方面。
|
8月前
|
Prometheus Kubernetes 监控
OpenAI故障复盘丨如何保障大规模K8s集群稳定性
OpenAI故障复盘丨如何保障大规模K8s集群稳定性
293 0
OpenAI故障复盘丨如何保障大规模K8s集群稳定性
|
10月前
|
缓存 容灾 网络协议
ACK One多集群网关:实现高效容灾方案
ACK One多集群网关可以帮助您快速构建同城跨AZ多活容灾系统、混合云同城跨AZ多活容灾系统,以及异地容灾系统。

推荐镜像

更多
下一篇
oss云网关配置