带你读《云原生应用开发:Operator原理与实践》——2.2.8 Controller 关于 Client-go 典型场景

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 带你读《云原生应用开发:Operator原理与实践》——2.2.8 Controller 关于 Client-go 典型场景

2.2.8 Controller 关于 Client-go 典型场景


我们了解了 Client-go 的各个组件(Reflector、Informer、Indexer),Client-go 中包含编写自定义 Controller 所使用的各种机制,这些机制在 Client-go 库中的 Tools 包和Util 包中进行了定义。在 k8s 中,可以利用 Client-go 中提供的 Controller 机制对所需资源的变化进行监控,根据资源状态的变化进行一系列操作。为加深对前面知识的理解,下面利用 Client-go 工具实现一个简单的 Controller。

下面编写一个简易的 Controller,用于监听 Pod 创建、删除信息,并将信息打印出来。

Controller 逻辑如下。

(1)首先我们需要定义一个 Controller 结构体,见代码清单 2-54

代码清单 2-54

type Controller struct {
 indexer cache.Indexer // Indexer 的引用
 queue workqueue.RateLimitingInterface //Workqueue 的引用
 informer cache.Controller // Informer 的引用
}

(2)初始化一个 Controller,见代码清单 2-55

代码清单 2-55

// 将 Workqueue、Informer、Indexer 的引用作为参数返回一个新的 Controller
func NewController(queue workqueue.RateLimitingInterface, indexer cache.
Indexer, informer cache.Controller) *Controller {
 return &Controller{
 informer: informer,
 indexer: indexer,
 queue: queue,
 }
}

(3)定义 Controller 的工作流,见代码清单 2-56

代码清单 2-56

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
 defer runtime.HandleCrash()
 defer c.queue.ShutDown()
 klog.Info("Starting pod controller")
 // 启动 Informer 线程,Run 函数做两件事情 :第一,运行一个 Reflector,并从 ListerWatcher
中获取对象的通知放到队列中(Delta Queue);第二,从队列中取出对象并处理该对象相关业务
 go c.informer.Run(stopCh)
 // 等待缓存同步队列
 if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
 runtime.HandleError(fmt.Errorf("Time out waitng for caches to sync"))
 return
 }
 // 启动多个 Worker 线程处理 Workqueue 中的 Object
  for i := 0; i < threadiness; i++ {
 go wait.Until(c.runWorker, time.Second, stopCh)
 }
 <-stopCh
 klog.Info("Stopping Pod controller")
}

(4)具体处理 Worker Queue 中对象的流程,见代码清单 2-57

代码清单 2-57

func (c *Controller) runWorker() {
 // 启动无限循环,接收并处理消息
 for c.processNextItem() {
 }
}
// 从 Workqueue 中获取对象,并打印信息。
func (c *Controller) processNextItem() bool {
 key, shutdown := c.queue.Get()
 // 退出
 if shutdown {
 return false
 }
 // 标记此 Key 已经处理
 defer c.queue.Done(key)
 // 打印 Key 对应的 Object 的信息
 err := c.syncToStdout(key.(string))
 c.handleError(err, key)
 return true
}
// 获取 Key 对应的 Object,并打印相关信息
func (c *Controller) syncToStdout(key string) error {
 obj, exists, err := c.indexer.GetByKey(key)
 if err != nil {
 klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
 return err
 }
 if !exists {
 fmt.Printf("Pod %s does not exist anymore\n", key)
 } else {
 fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*core_v1.Pod).
GetName())
 }
 return nil
}

(5) Main 函数逻辑,见代码清单 2-58

代码清单 2-58

func main() {
 var kubeconfig string
 var master string
 // 从外部获取集群信息 (kube.config)
 flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file")
 // 获取集群 master 的 url
 flag.StringVar(&master, "master", "", "master url")
 // 读取构建 config
 config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
 if err != nil {
 klog.Fatal(err)
 }
 // 创建 k8s Client
 clientset, err := kubernetes.NewForConfig(config)
 if err != nil {
 klog.Fatal(err)
 }
 // 从指定的客户端、资源、命名空间和字段选择器创建一个新的 List-Watch
 podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), 
"pods", v1.NamespaceDefault, fields.Everything())
 // 构造一个具有速率限制排队功能的新的 Workqueue
 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
 // 创建 Indexer 和 Informer
 indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 
0, cache.ResourceEventHandlerFuncs{
 //当有Pod创建时,根据Delta Queue弹出的Object生成对应的Key,并加入Workqueue中。
此处可以根据 Object 的一些属性进行过滤
 AddFunc: func(obj interface{}) {
 key, err := cache.MetaNamespaceKeyFunc(new)
 if err == nil {
 queue.Add(key)
 }
 },
 //Pod 删除操作
 DeleteFunc: func(obj interface{}) {
 // 在生成 Key 之前检查对象。因为资源删除后有可能会进行重建等操作,如果监听时错过
了删除信息,会导致该条记录是陈旧的
 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
 if err == nil {
 queue.Add(key)
 }
 },
 }, cache.Indexers{})
 // 创建新的 Controller
 controller := NewController(queue, indexer, informer)
 stop := make(chan struct{})
 defer close(stop)
 // 启动 Controller
 go controller.Run(1, stop)
 select {}
}

至此一个简单的 Controller 就完成了,然后我们从已有的 k8s 环境中复制 Config 文件,将 Config 文件存放在 /root/.kube/ 目录下,配置运行代码,运行结果见代码清单 2-59。

代码清单 2-59

I0312 15:46:38.849495 25524 main.go:125] Starting Pod controller
Sync/Add/Update for Pod curl-666-6f68d49784-r2gln
Sync/Add/Update for Pod busybox
Pod default/mypod does not exist anymore

结果显示:程序启动了一个 Pod Controller,Controller 监听到在 Default 命名空间下有两个 Pod:busybox 和 curl-666-6f68d49784-r2gln,缓存中的 mypod 已经不存在了。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1月前
|
Kubernetes 监控 Cloud Native
云原生时代下的应用开发与部署实践
【10月更文挑战第4天】在云原生的浪潮中,开发者和运维人员面临着新的挑战和机遇。本文将通过实际案例,展示如何在云平台上高效地开发、部署和管理应用,同时确保系统的可扩展性和高可用性。我们将深入探讨容器化技术、微服务架构以及持续集成/持续部署(CI/CD)流程的实施策略,旨在为读者提供一套完整的云原生解决方案框架。
|
2月前
|
Kubernetes Cloud Native 持续交付
云原生技术在现代应用开发中的角色与实践
【9月更文挑战第9天】 随着云计算技术的飞速发展,云原生(Cloud Native)已经成为推动企业数字化转型的核心力量。本文将深入探讨云原生的基本概念、关键技术及其在实际开发中的应用案例,旨在为读者提供一条清晰的云原生技术学习路径和应用指南。通过实例分析,我们将揭示云原生如何优化资源管理、提升应用性能及加快部署速度,进而帮助企业构建更加灵活、可靠和高效的软件系统。
|
3月前
|
并行计算 数据挖掘 大数据
[go 面试] 并行与并发的区别及应用场景解析
[go 面试] 并行与并发的区别及应用场景解析
|
7天前
|
Kubernetes Cloud Native 持续交付
云原生技术在现代应用开发中的实践与思考
【10月更文挑战第35天】云原生技术,作为云计算的进阶形态,正引领着软件开发和运维的新潮流。本文将深入探讨云原生技术的核心理念、关键技术组件以及在实际项目中的应用案例,帮助读者理解如何利用云原生技术优化应用架构,提高开发效率和系统稳定性。我们将从容器化、微服务、持续集成/持续部署(CI/CD)等角度出发,结合实际代码示例,展现云原生技术的强大能力。
|
16天前
|
监控 Cloud Native 持续交付
云原生技术深度解析:重塑现代应用开发与部署范式####
本文深入探讨了云原生技术的核心概念、关键技术组件及其在现代软件开发中的重要性。通过剖析容器化、微服务架构、持续集成/持续部署(CI/CD)等关键技术,本文旨在揭示云原生技术如何促进应用的敏捷性、可扩展性和高可用性,进而推动企业数字化转型进程。不同于传统摘要仅概述内容要点,本部分将融入具体案例分析,直观展示云原生技术在实际应用中的显著成效与挑战应对策略,为读者提供更加丰富、立体的理解视角。 ####
|
27天前
|
Kubernetes Cloud Native 持续交付
云原生技术:重塑现代应用开发与部署模式####
本文深入探讨了云原生技术的核心概念、发展历程及其在现代软件开发和部署中的关键作用。通过分析云原生架构的特点,如容器化、微服务、持续集成与持续部署(CI/CD),以及它如何促进应用的可伸缩性、灵活性和效率,本文旨在为读者提供一个关于云原生技术全面而深入的理解。此外,还将探讨实施云原生策略时面临的挑战及应对策略,帮助组织更好地把握数字化转型的机遇。 ####
|
27天前
|
人工智能 Serverless API
云原生应用开发平台CAP:一站式应用开发及生命周期管理解决方案
阿里云的云应用开发平台CAP(Cloud Application Platform)是一款一站式应用开发及应用生命周期管理平台。它提供丰富的Serverless与AI应用模板、高效的开发者工具链及企业级应用管理功能,帮助开发者快速构建、部署和管理云上应用,大幅提升研发、部署和运维效能。
83 1
|
1月前
|
Kubernetes Cloud Native 持续交付
云原生技术在现代应用开发中的实践与展望
【10月更文挑战第7天】随着技术的不断演进,云计算已从简单的资源租用模式转变为支持复杂、高效、灵活的云原生应用架构。本文将深入探讨云原生技术的核心概念及其在现代应用开发中的应用,通过分析Kubernetes容器编排和微服务架构的实践案例,揭示云原生技术如何推动软件开发的现代化进程。文章旨在为开发者和架构师提供一套实用的云原生应用开发指南,同时展望未来云原生技术的发展方向。
29 8
|
1月前
|
Cloud Native 测试技术 云计算
云原生技术在现代应用开发中的角色与实践
【9月更文挑战第31天】本文深入探讨了云原生技术如何革新现代应用开发流程,通过实际案例分析,揭示了其对提高开发效率、确保系统可扩展性和可靠性的显著影响。文章不仅介绍了云原生的核心概念,还提供了实施策略和最佳实践,旨在为开发者提供一条清晰的云原生转型之路。
|
2月前
|
Kubernetes Cloud Native 持续交付
云原生技术在现代应用开发中的实践与思考
【9月更文挑战第23天】本文将深入探讨云原生技术如何革新现代应用的开发流程。通过分析云原生的核心概念、优势以及实际应用案例,我们旨在揭示这一新兴技术范式如何助力开发者和企业更高效、灵活地构建和部署应用程序。文章还将提供具体代码示例,展示云原生技术在实际项目中的应用,帮助读者更好地理解和掌握该技术。