Kubernetes Client-go Informer 源码分析

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Object,下面我们从源码分析角度来看一下Client go Informer 的机制。

几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Object,下面我们从源码分析角度来看一下Client go Informer 的机制。

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
    klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)

controller := NewController(kubeClient, exampleClient,
    kubeInformerFactory.Apps().V1().Deployments(),
    exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)

这里的例子是以https://github.com/kubernetes/sample-controller/blob/master/main.go节选,主要以 k8s 默认的Deployment Informer 为例子。可以看到直接使用Client-go Informer 还是非常简单的,先不管NewCOntroller函数里面执行了什么,顺着代码来看一下kubeInformerFactory.Start 都干了啥。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

可以看到这里遍历了f.informers,而informers 的定义我们来看一眼数据结构

type sharedInformerFactory struct {
    client           kubernetes.Interface
    namespace        string
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration

    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    startedInformers map[reflect.Type]bool
}

我们这里的例子,在运行的时候,f.informers里面含有的内容如下

type *v1.Deployment informer &{0xc000379fa0 <nil> 0xc00038ccb0 {} 0xc000379f80 0xc00033bb00 30000000000 30000000000 0x28e5ec8 false false {0 0} {0 0}}

也就是说,每一种k8s 类型都会有自己的Informer函数。下面我们来看一下这个函数是在哪里注册的,这里以Deployment Informer 为例。

首先回到刚开始初始化kubeClient 的代码,

controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
        
        
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                // Periodic resync will send update events for all known Deployments.
                // Two different versions of the same Deployment will always have different RVs.
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

注意这里的传参, kubeInformerFactory.Apps().V1().Deployments(), 这句话的意思就是指创建一个只关注Deployment 的Informer.

controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
        recorder:          recorder,
    }

deploymentInformer.Lister() 这里就是初始化了一个Deployment Lister,下面来看一下Lister函数里面做了什么。

// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).Watch(options)
            },
        },
        &appsv1.Deployment{},
        resyncPeriod,
        indexers,
    )
}

func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

func (f *deploymentInformer) Lister() v1.DeploymentLister {
    return v1.NewDeploymentLister(f.Informer().GetIndexer())
}

注意这里的Lister 函数,它调用了Informer ,然后触发了f.factory.InformerFor
这就最终调用了sharedInformerFactory InformerFor函数,

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

这里可以看到,informer = newFunc(f.client, resyncPeriod)这句话最终完成了对于informer的创建,并且注册到了Struct object中,完成了前面我们的问题。

下面我们再回到informer start

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

这里可以看到,它会遍历所有的informer,然后选择异步调用Informer 的RUN方法。我们来全局看一下Run方法

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    s.controller.Run(stopCh)
}

首先它根据得到的 key 拆分函数和Store index 创建一个FIFO队列,这个队列是一个先进先出的队列,主要用来保存对象的各种事件。

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      keyFunc,
        knownObjects: knownObjects,
    }
    f.cond.L = &f.lock
    return f
}

可以看到这个队列创建的比较简单,就是使用 Map 来存放数据,String 数组来存放队列的 Key。

后面根据client 创建的List 和Watch 函数,还有队列创建了一个 config,下面将根据这个config 来初始化controller. 这个controller是client-go 的Cache controller ,主要用来控制从 APIServer 获得的对象的 cache 以及更新对象。

下面主要关注这个函数调用

wg.StartWithChannel(processorStopCh, s.processor.run)

这里进行了真正的Listering 调用。

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

主要看 run 方法,还记得前面已经把ADD UPDATE DELETE 注册了自定义的处理函数了吗。这里就实现了前面函数的触发

func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

可以看到当p.nexhCh channel 接收到一个对象进入的时候,就会根据通知类型的不同,选择对应的用户注册函数去调用。那么这个channel 谁来向其中传入参数呢

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

答案就是这个pop 函数,这里会从p.addCh中读取增加的通知,然后转给p.nexhCh 并且保证每个通知只会读取一次。

下面就是最终的Controller run 函数,我们来看看到底干了什么

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()

    wg.StartWithChannel(stopCh, r.Run)

    wait.Until(c.processLoop, time.Second, stopCh)
}

这里主要的就是wg.StartWithChannel(stopCh, r.Run)

// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

这里就调用了r.ListAndWatch 方法,这个方法比较复杂,我们慢慢来看。

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()
    // update metrics
    defer func() {
        r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
        r.metrics.watchDuration.Observe(time.Since(start).Seconds())
    }()

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrs.FromObject(event.Object)
            }
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                continue
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Now().Sub(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        r.metrics.numberOfShortWatches.Inc()
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
    return nil
}

这里就是真正调用watch 方法,根据返回的watch 事件,将其放入到前面创建的 FIFO 队列中。

最终调用了controller 的POP 方法

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == FIFOClosedError {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

前面是将 watch 到的对象加入到队列中,这里的goroutine 就是用来消费的。具体的消费函数就是前面创建的Process 函数

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

这个函数就是根据传进来的obj,先从自己的cache 中取一下,看是否存在,如果存在就代表是Update ,那么更新自己的队列后,调用用户注册的Update 函数,如果不存在,就调用用户的 Add 函数。

到此Client-go 的Informer 流程源码分析基本完毕。

相关实践学习
巧用云服务器ECS制作节日贺卡
本场景带您体验如何在一台CentOS 7操作系统的ECS实例上,通过搭建web服务器,上传源码到web容器,制作节日贺卡网页。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
Kubernetes Cloud Native jenkins
下篇:使用jenkins发布go项目到k8s,接上篇的手工体验改造为自动化发布
下篇:使用jenkins发布go项目到k8s,接上篇的手工体验改造为自动化发布
607 1
|
3月前
|
Kubernetes Go Docker
在K8s编程中如何使用Go
一文带你了解在K8s编程中如何使用Go
90 3
|
4月前
|
Kubernetes 监控 Cloud Native
"解锁K8s新姿势!Cobra+Client-go强强联手,打造你的专属K8s监控神器,让资源优化与性能监控尽在掌握!"
【8月更文挑战第14天】在云原生领域,Kubernetes以出色的扩展性和定制化能力引领潮流。面对独特需求,自定义插件成为必要。本文通过Cobra与Client-go两大利器,打造一款监测特定标签Pods资源使用的K8s插件。Cobra简化CLI开发,Client-go则负责与K8s API交互。从初始化项目到实现查询逻辑,一步步引导你构建个性化工具,开启K8s集群智能化管理之旅。
55 2
|
4月前
|
运维 Kubernetes Go
"解锁K8s二开新姿势!client-go:你不可不知的Go语言神器,让Kubernetes集群管理如虎添翼,秒变运维大神!"
【8月更文挑战第14天】随着云原生技术的发展,Kubernetes (K8s) 成为容器编排的首选。client-go作为K8s的官方Go语言客户端库,通过封装RESTful API,使开发者能便捷地管理集群资源,如Pods和服务。本文介绍client-go基本概念、使用方法及自定义操作。涵盖ClientSet、DynamicClient等客户端实现,以及lister、informer等组件,通过示例展示如何列出集群中的所有Pods。client-go的强大功能助力高效开发和运维。
374 1
|
7月前
|
Kubernetes Cloud Native Go
Golang深入浅出之-Go语言中的云原生开发:Kubernetes与Docker
【5月更文挑战第5天】本文探讨了Go语言在云原生开发中的应用,特别是在Kubernetes和Docker中的使用。Docker利用Go语言的性能和跨平台能力编写Dockerfile和构建镜像。Kubernetes,主要由Go语言编写,提供了方便的客户端库与集群交互。文章列举了Dockerfile编写、Kubernetes资源定义和服务发现的常见问题及解决方案,并给出了Go语言构建Docker镜像和与Kubernetes交互的代码示例。通过掌握这些技巧,开发者能更高效地进行云原生应用开发。
191 1
|
存储 Kubernetes 安全
【K8s源码品读】010:Phase 1 - kube-scheduler - Informer是如何保存数据的
了解Informer在发现资源变化后,是怎么处理的
62 0
|
Kubernetes 容器
【K8s源码品读】009:Phase 1 - kube-scheduler - Informer监听资源变化
了解Informer是如何从kube-apiserver监听资源变化的情况
113 0
|
Kubernetes 监控 Cloud Native
Go微服务架构实战 中篇:2. 基于k8s部署服务和注册中心,验证服务注册和发现
Go微服务架构实战 中篇:2. 基于k8s部署服务和注册中心,验证服务注册和发现
Go微服务架构实战 中篇:2. 基于k8s部署服务和注册中心,验证服务注册和发现
|
JSON Kubernetes 测试技术
在 Go 中使用 Kubernetes 对象
通常,在某些情况下,我们需要通用的方法去使用 Kubernetes 资源对象,而不是编写代码来处理特定类型。
204 0
|
运维 Kubernetes 负载均衡
Go微服务架构实战 中篇:5. k8s基于ingress和service实现金丝雀发布和蓝绿发布
Go微服务架构实战 中篇:5. k8s基于ingress和service实现金丝雀发布和蓝绿发布

热门文章

最新文章

相关产品

  • 容器服务Kubernetes版