2.2.6 Client-go Informer 解析
1. Client-go Informer 模块
Informer 可 以 对 Kubernetes APIServer 的 资 源 执 行 Watch 操 作 , 类 型 可 以 是Kubernetes 内置资源,也可以是 CRD 。其中最核心的模块是 Reflector、DeltaFIFO、Indexer。接下来我们逐个进行分析。
首先分析 Reflector,Reflector 用于监控指定资源的 Kubernetes。当资源发生变化时,如发生了资源添加(Added)、资源更新(Updated)等事件,Reflector 会将其资源对象存放在本地缓存 DeltaFIFO 中。它的作用就是获取 APIServer 中对象数据并实时地更新到本地,使得本地数据和 ETCD 数据完全一样。它的数据结构见代码清单2-37。
代码清单 2-37
type Reflector struct { name string // 这个 Reflector 的名称,默认为文件 : 行数 metrics *reflectorMetrics // 用于保存 Reflector 的一些监控指标 expectedType reflect.Type // 期望放到 Store 中的类型名称 store Store // 与 Watch 源同步的目标 Store listerWatcher ListerWatcher //ListerWatcher 接口,用于指定 List-Watch 方法 period time.Duration //Watch 周期 resyncPeriod time.Duration // 重新同步周期 ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock lastSyncResourceVersion string // 最后同步的资源的版本号 lastSyncResourceVersionMutex sync.RWMutex //lastSyncResourceVersion 的读写锁 }
通过 NewRefector 实例化 Reflector 对象,实例化过程中必须传入 ListerWatcher 数据接口对象,它拥有 List 和 Watch 方法,用于获取及监控资源列表,只要是实现了 List和 Watch 方法的对象都可以成为 ListerWatcher,Reflector 对象通过 run 函数启动监控并处理事件,而在 Reflector 源码实现中最主要的是 List-Watch 函数,它负责 List/Watch指定的 Kubernetes APIServer 资源,见代码清单 2-38。
代码清单 2-38
// NewNamedReflector same as NewReflector, but with a specified name for logging func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1) r := &Reflector{ name: name, // we need this to be unique per process (some names are still the same) but obvious who it belongs to metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt. Sprintf("reflector_"+name+"_%d", reflectorSuffix))), listerWatcher: lw, store: store, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, clock: &clock.RealClock{}, } return r }
List-Watch 是 怎 么 实 现 的? List-Watch 主 要 分 为 List 和 Watch 两 部 分。List 负责获取对应资源的全量列表,Watch 负责获取变化的部分。首先进行 List 操作,这里把Resource Version 设置为 0,因为要获取同步的对象的全部版本,所以从 0 开始 List,主要流程如下(见代码清单 2-39)。
(1) r.listerWatcher.List 用于获取资源下的所有对象的数据。
(2) listMetaInterface.GetResourceVersion 用于获取资源版本号(ResouceVersion),资源版本号非常重要,Kubernetes 中所有的资源都拥有该字段,它标识当前资源对象的版本号。每次修改当前资源对象时,Kubernetes APIServer 都会更改 ResouceVersion ,使得 Client-go 执行 Watch 操作时可以根据 ResourceVersion 来确定当前资源对象是否发生过变化。
(3) meta.ExtractList 用于将资源数据转换成资源对象列表,将 runtime.Object 转换成[]runtime.Object,因为 r.listerWatcher.List 只是获取一个列表。
(4) r.syncWith 用于将资源对象列表中的资源对象和资源版本号存储至 DeltaFIFO 中,并替换已存在的对象。
(5) r.setLastSyncResourceVersion 用于设置最新的资源版本号。
代码清单 2-39
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) var resourceVersion string options := metav1.ListOptions{ResourceVersion: "0"} r.metrics.numberOfLists.Inc() start := r.clock.Now() list, err := r.listerWatcher.List(options) if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } r.metrics.listDuration.Observe(time.Since(start).Seconds()) listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } r.metrics.numberOfItemsInList.Observe(float64(len(items))) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } r.setLastSyncResourceVersion(resourceVersion) resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { glog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: return nil default: } timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, TimeoutSeconds: &timeoutSeconds, } r.metrics.numberOfWatches.Inc() w, err := r.listerWatcher.Watch(options) if err != nil { switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) } if urlError, ok := err.(*url.Error); ok { if opError, ok := urlError.Err.(*net.OpError); ok { if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { time.Sleep(time.Second) continue } } } return nil } if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } return nil } } }
Watch 操 作 通 过 HTTP 与 Kubernetes APIServer 建 立 长 链 接, 接 收 Kubernetes APIServer 发来的变更时间,Watch 操作的实现机制使用的是 HTTP 的分块传输编码。当 Client-go 调 用 Kubernetes APIServer 时, 在 Response 的 HTTP Header 中 设 置Transfer-Encoding 的值为 Chunked。r.listerWatcher.Watch 实际调用了 Pod Informer的 watchfunc 函数。通过 ClientSet 客户端与 APIServer 建立长链接,监控指定资源的变更事件。r.watchHandler 用于处理资源的变更时间,当初发增删改 Added Updated 等事件时,将对应的资源对象更新到本地缓存 DeltaFIFO 中,并更新 ResouceVersion。至此实现了 Reflctor 组件的功能。
2. Client-go DeltaFIFO
DeltaFIFO 是一个 FIFO 队列,记录了资源对象的变化过程。作为一个 FIFO 队列,它的生产者就是 Reflector 组件,前面讲过 Reflector 将监听对象同步到 DeltaFIFO 中,DeltaFIFO 对这些资源对象做了什么,见代码清单 2-40。
代码清单 2-40
type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond // 条件变量,唤醒等待的协程 items map[string]Deltas //Delta 存储桶 queue []string // 队列存储对象键实际就是和 items 一起形成了一个有序 Map // true 通过 Replace() 第一批元素被插入队列或者 Delete/Add/Update 首次被调用 populated bool // Replace() 被首次调用时插入的元素数目 initialPopulationCount int // 函数计算元素 Key 值 keyFunc KeyFunc // 列出已知的对象 knownObjects KeyListerGetter // 队列是否被关闭,关闭互斥锁 closed bool closedLock sync.Mutex }
FIFO 接收 Reflector 的 Adds/Updates 添加和更新事件,并将它们按照顺序放入队列。元素在队列中被处理之前,如果有多个 Adds/Updates 事件发生,事件只会被处理一次。使用场景:(1)仅处理对象一次;(2)处理完当前事件后才能处理最新版本的对象;(3)删除对象之后不会处理;(4)不能周期性重新处理对象。这里的 Delta 对象就是 Kubernetes 系统中对象的变化。Delta 有 Type 和 Object 两个属性,DeltaType 就是资源变化的类型,比如 Add、Update 等,Delta Object 就是具体的 Kubernetes 资源对象,见代码清单 2-41。例如,此时 Reflector 中监听了一个 PodA 的 Add 事件,那么此时 DeltaType 就 是 Added,Delta Object 就 是 PodA,DeltaFIFO 中 的 数 据 是 什 么 样 的呢?此时 Items 中会有 Add 类型的 Delta,Queue 中也会有这个事件的 Key。这个 Key由 KeyFunc 生成。Client-go 中默认的 KeyFunc 是 MetaNamespaceKeyFunc,可以在tools/cache/store.go:76 中找到。由 MetaNamespaceKeyFunc 生成的 Key 格式为 / ,用来标识不同命名空间下的不同资源。
代码清单 2-41
type Delta struct { Type DeltaType // Delta 类型,比如增、减,后面有详细说明 Object interface{} // 对象,Delta 的粒度是一个对象 } type DeltaType string // Delta 的类型用字符串表达 const ( Added DeltaType = "Added" // 增加 Updated DeltaType = "Updated" // 更新 Deleted DeltaType = "Deleted" // 删除 Sync DeltaType = "Sync" // 同步 ) type Deltas []Delta // Delta 数组
既然 DeltaFIFO 是一个 FIFO,那么它就应该有基本的 FIFO 功能,这里 DeltaFIFO实现了 Queue 接口。下面看一下 Queue 接口功能的定义。我们可以看出 Queue 扩展了 Store 接口的功能,附加了 Pop、AddIfNotPresent、HasSynced、Close 方法。Store 是一个通用的对象存储和处理的接口,本身提供了 Add、Update、List、Get 等方法,Queue接口增加了 Pop 方法,实现了一个基本 FIFO 队列,具体见代码清单 2-42。
代码清单 2-42
type Queue interface { Store Pop(PopProcessFunc) (interface{}, error) AddIfNotPresent(interface{}) error HasSynced() bool Close() }
下面我们来看一下 FIFO 队列的基本功能是怎么实现的。首先是 Add 方法,我们可以看到 Add 方法会先根据 KeyFunc 计算出对象的 Key,如果队列中没有这个对象,我们就在这个队列尾部增补对象的 Key,并且将这个对象存入 Map,具体见代码清单 2-43。
代码清单 2-43
func (f *FIFO) Add(obj interface{}) error { id, err := f.keyFunc(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = obj f.cond.Broadcast() return nil }
接 下 来 我 们 看 一 下 Pop 方 法, 在 Queue 中 至 少 有 一 个 资 源 时 才 会 进 行 Pop 操作。在处理资源之前,资源会从队列(和存储)中移除,如果未成功处理资源,应该用 AddIfNotPresent() 函数将资源添加回队列。处理逻辑由 PopProcessFunc 进行执行,具体见代码清单 2-44。
代码清单 2-44
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, FIFOClosedError } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] item, ok := f.items[id] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } } func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { if d, ok := obj.(Deltas); ok { if len(d) == 0 { return "", KeyError{obj, ErrZeroLengthDeltasObject} } obj = d.Newest().Object } if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil } return f.keyFunc(obj) }
值得注意的是,DeltaFIFO 中用于计算对象键的函数 KeyOf 为什么要先进行一次 Deltas 的类型转换呢?是因为 Pop 出去的对象很可能还要再添加进来(比如处理失败需要再放进来),此时添加的对象就是已经封装好的 Delta 对象了。至此,已实现 DeltaFIFO 的基本功能。
3. Client-go Indexer
资源对象从 DeltaFIFO 中 Pop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndex Informer 说起。注意,在 sharedIndexInformer 的 Run 方法中,初始化了它的配置,并执行了 s.controller.Run 方法。我们可以看到 s.controller.Run 中初始化了 Reflector,开始了指定资源的 List-Watch 操作,并且同步到了 DeltaFIFO 中,同时执行了 processLoop方 法。 此 时 我 们 可 以 看 到 processLoop 方 法 不 断 从 DeltaFIFO 中 将 资 源 对 象 Pop 出来,并且交给了之前的 c.config.Process 方法进行处理。而 c.config.Process 方法就是sharedIndexInformer 的 HandleDeltas 方法,具体见代码清单 2-45。
代码清单 2-45
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { ... cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() ... s.controller.Run(stopCh) } func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() ... wait.Until(c.processLoop, time.Second, stopCh) } func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
综上可知,由 DeltaFIFO 中 Pop 出来的对象最后交给了 HandleDeltas 进行处理,而在 HandleDeltas 中,将资源对象同步到了 Indexer 中,至此我们引出了 Informer 模块中的第 3 个组件 Indexer。Indexer 是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource 的对象。Reflector 通过 DeltaFIFO Queue 将资源对象存储到 Indexer 中。需要注意的是,Indexer 中的数据与 ETCD 中的数据是完全一致的,当 Client-go 需要数据时,无须每次都从 APIServer 中获取,从而减轻了请求过多造成的对 APIServer 的压力,具体见代码清单 2-46。
代码清单 2-46
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
Indexer 是如何实现存储并快速查找资源的呢?我们先看一下 Indexer 接口提供的功能。Cache 是 Indexer 的一种非常经典的实现,所有的对象缓存在内存中,而且从 Cache这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。这里的 Store、Indexer 使用了一个 threadSafeMap 来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap 继承了 Store 接口,而 Indexer 扩展了 threadSafeMap,为 threadSafeMap 提供了索引操作。threadSafeMap 其实只能够存储和索引。存储即将 runtime.object 存储到 Items 的 Map 中; 索引即为 Items 的 Map建立三层索引:Indices Map 类型索引(如 namespace、nodeName 等);Index Map 类型索引(如 namespace1、namespace2……);runtime.object 类型索引,实现见代码清单 2-47。
代码清单 2-47
type Indexer interface { Store // indexName 索引类,obj 是对象,计算 obj 在 indexName 索引类中的索引键,通过索引键 获取所有的对象 // 基本就是获取符合 obj 特征的所有对象,所谓的特征就是对象在索引类中的索引键 Index(indexName string, obj interface{}) ([]interface{}, error) // indexKey 是 indexName 索引类中的一个索引键,函数返回 indexKey 指定的所有对象键 IndexKeys(indexName, indexedValue string) ([]string, error) // 获取 indexName 索引类中的所有索引键 ListIndexFuncValues(indexName string) []string // 这个函数和 Index 类似,只是返回值不是对象键,而是所有对象 ByIndex(indexName, indexedValue string) ([]interface{}, error) // 返回 Indexers GetIndexers() Indexers // 添加 Indexers,就是增加更多的索引分类 AddIndexers(newIndexers Indexers) error }
在 Kubernetes 中使用的比较多的索引函数是 MetaNamespaceIndexFunc()(代码位置 :client-go/tools/cache/index.go),Indexer 索引的实现是通过 index.ByIndex 来完成的,index.ByIndex 的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。
代码清单 2-48
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } index := c.indices[indexName] set := index[indexKey] list := make([]interface{}, 0, set.Len()) for _, key := range set.List() { list = append(list, c.items[key]) } return list, nil }
上述方法接收两个参数:indexName(索引器的名称)和 indexedValue(需要索引的Key)。首先根据索引器名称查找指定的索引器函数(c.indexers[indexName]) ;然后根据索引器 名 称 查 找 相 应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key(indexedValue)从缓存中进行数据查询,并返回查询结果。