2.2.8 Controller 关于 Client-go 典型场景
我们了解了 Client-go 的各个组件(Reflector、Informer、Indexer),Client-go 中包含编写自定义 Controller 所使用的各种机制,这些机制在 Client-go 库中的 Tools 包和Util 包中进行了定义。在 k8s 中,可以利用 Client-go 中提供的 Controller 机制对所需资源的变化进行监控,根据资源状态的变化进行一系列操作。为加深对前面知识的理解,下面利用 Client-go 工具实现一个简单的 Controller。
下面编写一个简易的 Controller,用于监听 Pod 创建、删除信息,并将信息打印出来。
Controller 逻辑如下。
(1)首先我们需要定义一个 Controller 结构体,见代码清单 2-54
代码清单 2-54
type Controller struct { indexer cache.Indexer // Indexer 的引用 queue workqueue.RateLimitingInterface //Workqueue 的引用 informer cache.Controller // Informer 的引用 }
(2)初始化一个 Controller,见代码清单 2-55
代码清单 2-55
// 将 Workqueue、Informer、Indexer 的引用作为参数返回一个新的 Controller func NewController(queue workqueue.RateLimitingInterface, indexer cache. Indexer, informer cache.Controller) *Controller { return &Controller{ informer: informer, indexer: indexer, queue: queue, } }
(3)定义 Controller 的工作流,见代码清单 2-56
代码清单 2-56
func (c *Controller) Run(threadiness int, stopCh chan struct{}) { defer runtime.HandleCrash() defer c.queue.ShutDown() klog.Info("Starting pod controller") // 启动 Informer 线程,Run 函数做两件事情 :第一,运行一个 Reflector,并从 ListerWatcher 中获取对象的通知放到队列中(Delta Queue);第二,从队列中取出对象并处理该对象相关业务 go c.informer.Run(stopCh) // 等待缓存同步队列 if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("Time out waitng for caches to sync")) return } // 启动多个 Worker 线程处理 Workqueue 中的 Object for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } <-stopCh klog.Info("Stopping Pod controller") }
(4)具体处理 Worker Queue 中对象的流程,见代码清单 2-57
代码清单 2-57
func (c *Controller) runWorker() { // 启动无限循环,接收并处理消息 for c.processNextItem() { } } // 从 Workqueue 中获取对象,并打印信息。 func (c *Controller) processNextItem() bool { key, shutdown := c.queue.Get() // 退出 if shutdown { return false } // 标记此 Key 已经处理 defer c.queue.Done(key) // 打印 Key 对应的 Object 的信息 err := c.syncToStdout(key.(string)) c.handleError(err, key) return true } // 获取 Key 对应的 Object,并打印相关信息 func (c *Controller) syncToStdout(key string) error { obj, exists, err := c.indexer.GetByKey(key) if err != nil { klog.Errorf("Fetching object with key %s from store failed with %v", key, err) return err } if !exists { fmt.Printf("Pod %s does not exist anymore\n", key) } else { fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*core_v1.Pod). GetName()) } return nil }
(5) Main 函数逻辑,见代码清单 2-58
代码清单 2-58
func main() { var kubeconfig string var master string // 从外部获取集群信息 (kube.config) flag.StringVar(&kubeconfig, "kubeconfig", "", "kubeconfig file") // 获取集群 master 的 url flag.StringVar(&master, "master", "", "master url") // 读取构建 config config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig) if err != nil { klog.Fatal(err) } // 创建 k8s Client clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatal(err) } // 从指定的客户端、资源、命名空间和字段选择器创建一个新的 List-Watch podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) // 构造一个具有速率限制排队功能的新的 Workqueue queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) // 创建 Indexer 和 Informer indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ //当有Pod创建时,根据Delta Queue弹出的Object生成对应的Key,并加入Workqueue中。 此处可以根据 Object 的一些属性进行过滤 AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { queue.Add(key) } }, //Pod 删除操作 DeleteFunc: func(obj interface{}) { // 在生成 Key 之前检查对象。因为资源删除后有可能会进行重建等操作,如果监听时错过 了删除信息,会导致该条记录是陈旧的 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, }, cache.Indexers{}) // 创建新的 Controller controller := NewController(queue, indexer, informer) stop := make(chan struct{}) defer close(stop) // 启动 Controller go controller.Run(1, stop) select {} }
至此一个简单的 Controller 就完成了,然后我们从已有的 k8s 环境中复制 Config 文件,将 Config 文件存放在 /root/.kube/ 目录下,配置运行代码,运行结果见代码清单 2-59。
代码清单 2-59
I0312 15:46:38.849495 25524 main.go:125] Starting Pod controller Sync/Add/Update for Pod curl-666-6f68d49784-r2gln Sync/Add/Update for Pod busybox Pod default/mypod does not exist anymore
结果显示:程序启动了一个 Pod Controller,Controller 监听到在 Default 命名空间下有两个 Pod:busybox 和 curl-666-6f68d49784-r2gln,缓存中的 mypod 已经不存在了。