【K8s源码品读】010:Phase 1 - kube-scheduler - Informer是如何保存数据的

简介: 了解Informer在发现资源变化后,是怎么处理的

聚焦目标

了解Informer在发现资源变化后,是怎么处理的

目录

  1. 查看消费的过程
  2. 掌握Index数据结构
  3. 信息的分发distribute
  4. Informer的综合思考

Process

func (c *controller) processLoop() {
   
    for {
   
    // Pop出Object元素
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
   
            if err == ErrFIFOClosed {
   
                return
            }
            if c.config.RetryOnError {
   
                // 重新进队列
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

// 去查看Pop的具体实现
func (f *FIFO) Pop(process PopProcessFunc) (interface{
   }, error) {
   
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
   
        // 调用process去处理item,然后返回
        item, ok := f.items[id]
        delete(f.items, id)
        err := process(item)
        return item, err
    }
}

// 然后去查一下 PopProcessFunc 的定义,在创建controller前
cfg := &Config{
   
        Process:           s.HandleDeltas,
    }

func (s *sharedIndexInformer) HandleDeltas(obj interface{
   }) error {
   
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    for _, d := range obj.(Deltas) {
   
        switch d.Type {
   
    // 增、改、替换、同步
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
      // 先去indexer查询
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
   
        // 如果数据已经存在,就执行Update逻辑
                if err := s.indexer.Update(d.Object); err != nil {
   
                    return err
                }

                isSync := false
                switch {
   
                case d.Type == Sync:
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
   
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
          // 分发Update事件
                s.processor.distribute(updateNotification{
   oldObj: old, newObj: d.Object}, isSync)
            } else {
   
          // 没查到数据,就执行Add操作
                if err := s.indexer.Add(d.Object); err != nil {
   
                    return err
                }
          // 分发 Add 事件
                s.processor.distribute(addNotification{
   newObj: d.Object}, false)
            }
       // 删除
        case Deleted:
        // 去indexer删除
            if err := s.indexer.Delete(d.Object); err != nil {
   
                return err
            }
        // 分发 delete 事件
            s.processor.distribute(deleteNotification{
   oldObj: d.Object}, false)
        }
    }
    return nil
}

Index

Index 的定义为资源的本地存储,保持与etcd中的资源信息一致。

// 我们去看看Index是怎么创建的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
   
    realClock := &clock.RealClock{
   }
    sharedIndexInformer := &sharedIndexInformer{
   
        processor:                       &sharedProcessor{
   clock: realClock},
    // indexer 的初始化
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

// 生成一个map和func组合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
   
    return &cache{
   
        cacheStorage: NewThreadSafeStore(indexers, Indices{
   }),
        keyFunc:      keyFunc,
}

// ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
   
    return &threadSafeMap{
   
        items:    map[string]interface{
   }{
   },
        indexers: indexers,
        indices:  indices,
    }
}

distribute

// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数
s.processor.distribute()

// 分发process的创建
func NewSharedIndexInformer() SharedIndexInformer {
   
    sharedIndexInformer := &sharedIndexInformer{
   
        processor:                       &sharedProcessor{
   clock: realClock},
    }
    return sharedIndexInformer
}

// sharedProcessor的结构
type sharedProcessor struct {
   
    listenersStarted bool
     // 读写锁
    listenersLock    sync.RWMutex
  // 普通监听列表
    listeners        []*processorListener
  // 同步监听列表
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

// 查看distribute函数
func (p *sharedProcessor) distribute(obj interface{
   }, sync bool) {
   
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    // 将object分发到 同步监听 或者 普通监听 的列表
    if sync {
   
        for _, listener := range p.syncingListeners {
   
            listener.add(obj)
        }
    } else {
   
        for _, listener := range p.listeners {
   
            listener.add(obj)
        }
    }
}

// 这个add的操作是利用了channel
func (p *processorListener) add(notification interface{
   }) {
   
    p.addCh <- notification
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成
  4. DeltaFIFOQueue的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
    1. 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
    2. 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理
相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
目录
相关文章
|
6月前
|
存储 Kubernetes Linux
Kubernetes 集群使用 GlusterFS 作为数据持久化存储
Kubernetes 集群使用 GlusterFS 作为数据持久化存储
61 0
|
7月前
|
Kubernetes 数据管理 容器
|
7月前
|
Kubernetes 应用服务中间件 nginx
|
7月前
|
Kubernetes 算法 调度
|
7天前
|
运维 Kubernetes 监控
Kubernetes 集群的持续性能优化实践
【4月更文挑战第26天】 在动态且不断增长的云计算环境中,维护高性能的 Kubernetes 集群是一个挑战。本文将探讨一系列实用的策略和工具,旨在帮助运维专家监控、分析和优化 Kubernetes 集群的性能。我们将讨论资源分配的最佳实践,包括 CPU 和内存管理,以及集群规模调整的策略。此外,文中还将介绍延迟和吞吐量的重要性,并提供日志和监控工具的使用技巧,以实现持续改进的目标。
|
9天前
|
存储 运维 Kubernetes
Kubernetes 集群的监控与维护策略
【4月更文挑战第23天】 在微服务架构日益盛行的当下,容器编排工具如 Kubernetes 成为了运维工作的重要环节。然而,随着集群规模的增长和复杂性的提升,如何确保 Kubernetes 集群的高效稳定运行成为了一大挑战。本文将深入探讨 Kubernetes 集群的监控要点、常见问题及解决方案,并提出一系列切实可行的维护策略,旨在帮助运维人员有效管理和维护 Kubernetes 环境,保障服务的持续可用性和性能优化。
|
11天前
|
存储 运维 Kubernetes
Kubernetes 集群的持续性能优化实践
【4月更文挑战第22天】在动态且复杂的微服务架构中,确保 Kubernetes 集群的高性能运行是至关重要的。本文将深入探讨针对 Kubernetes 集群性能优化的策略与实践,从节点资源配置、网络优化到应用部署模式等多个维度展开,旨在为运维工程师提供一套系统的性能调优方法论。通过实际案例分析与经验总结,读者可以掌握持续优化 Kubernetes 集群性能的有效手段,以适应不断变化的业务需求和技术挑战。
|
21天前
|
运维 Kubernetes 监控
Kubernetes 集群的监控与维护策略
【4月更文挑战第12天】在微服务架构日益普及的当下,Kubernetes 作为容器编排的事实标准,承载着运行和管理大量服务的重要职责。本文将深入探讨 Kubernetes 集群的监控要点,并提出一系列切实可行的维护策略,旨在帮助运维人员确保集群的稳定性和性能优化。
|
23天前
|
Kubernetes 搜索推荐 网络协议
使用 kubeadm 部署 Kubernetes 集群(三)kubeadm 初始化 k8s 证书过期解决方案
使用 kubeadm 部署 Kubernetes 集群(三)kubeadm 初始化 k8s 证书过期解决方案
37 8
|
2天前
|
运维 Kubernetes 监控
Kubernetes 集群的监控与维护策略
【4月更文挑战第30天】 在现代云计算环境中,容器化技术已成为应用程序部署和管理的重要手段。其中,Kubernetes 作为一个开源的容器编排平台,以其强大的功能和灵活性受到广泛欢迎。然而,随之而来的是对 Kubernetes 集群监控和维护的复杂性增加。本文将探讨针对 Kubernetes 集群的监控策略和维护技巧,旨在帮助运维人员确保集群的稳定性和高效性。通过分析常见的性能瓶颈、故障诊断方法以及自动化维护工具的应用,我们将提供一套实用的解决方案,以优化 Kubernetes 环境的性能和可靠性。