【K8s源码品读】010:Phase 1 - kube-scheduler - Informer是如何保存数据的

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 了解Informer在发现资源变化后,是怎么处理的

聚焦目标

了解Informer在发现资源变化后,是怎么处理的

目录

  1. 查看消费的过程
  2. 掌握Index数据结构
  3. 信息的分发distribute
  4. Informer的综合思考

Process

func (c *controller) processLoop() {
   
    for {
   
    // Pop出Object元素
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
   
            if err == ErrFIFOClosed {
   
                return
            }
            if c.config.RetryOnError {
   
                // 重新进队列
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

// 去查看Pop的具体实现
func (f *FIFO) Pop(process PopProcessFunc) (interface{
   }, error) {
   
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
   
        // 调用process去处理item,然后返回
        item, ok := f.items[id]
        delete(f.items, id)
        err := process(item)
        return item, err
    }
}

// 然后去查一下 PopProcessFunc 的定义,在创建controller前
cfg := &Config{
   
        Process:           s.HandleDeltas,
    }

func (s *sharedIndexInformer) HandleDeltas(obj interface{
   }) error {
   
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    for _, d := range obj.(Deltas) {
   
        switch d.Type {
   
    // 增、改、替换、同步
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
      // 先去indexer查询
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
   
        // 如果数据已经存在,就执行Update逻辑
                if err := s.indexer.Update(d.Object); err != nil {
   
                    return err
                }

                isSync := false
                switch {
   
                case d.Type == Sync:
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
   
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
          // 分发Update事件
                s.processor.distribute(updateNotification{
   oldObj: old, newObj: d.Object}, isSync)
            } else {
   
          // 没查到数据,就执行Add操作
                if err := s.indexer.Add(d.Object); err != nil {
   
                    return err
                }
          // 分发 Add 事件
                s.processor.distribute(addNotification{
   newObj: d.Object}, false)
            }
       // 删除
        case Deleted:
        // 去indexer删除
            if err := s.indexer.Delete(d.Object); err != nil {
   
                return err
            }
        // 分发 delete 事件
            s.processor.distribute(deleteNotification{
   oldObj: d.Object}, false)
        }
    }
    return nil
}

Index

Index 的定义为资源的本地存储,保持与etcd中的资源信息一致。

// 我们去看看Index是怎么创建的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
   
    realClock := &clock.RealClock{
   }
    sharedIndexInformer := &sharedIndexInformer{
   
        processor:                       &sharedProcessor{
   clock: realClock},
    // indexer 的初始化
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

// 生成一个map和func组合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
   
    return &cache{
   
        cacheStorage: NewThreadSafeStore(indexers, Indices{
   }),
        keyFunc:      keyFunc,
}

// ThreadSafeStore的底层是一个并发安全的map,具体实现我们暂不考虑
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
   
    return &threadSafeMap{
   
        items:    map[string]interface{
   }{
   },
        indexers: indexers,
        indices:  indices,
    }
}

distribute

// 在上面的Process代码中,我们看到了将数据存储到Indexer后,调用了一个分发的函数
s.processor.distribute()

// 分发process的创建
func NewSharedIndexInformer() SharedIndexInformer {
   
    sharedIndexInformer := &sharedIndexInformer{
   
        processor:                       &sharedProcessor{
   clock: realClock},
    }
    return sharedIndexInformer
}

// sharedProcessor的结构
type sharedProcessor struct {
   
    listenersStarted bool
     // 读写锁
    listenersLock    sync.RWMutex
  // 普通监听列表
    listeners        []*processorListener
  // 同步监听列表
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

// 查看distribute函数
func (p *sharedProcessor) distribute(obj interface{
   }, sync bool) {
   
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    // 将object分发到 同步监听 或者 普通监听 的列表
    if sync {
   
        for _, listener := range p.syncingListeners {
   
            listener.add(obj)
        }
    } else {
   
        for _, listener := range p.listeners {
   
            listener.add(obj)
        }
    }
}

// 这个add的操作是利用了channel
func (p *processorListener) add(notification interface{
   }) {
   
    p.addCh <- notification
}

Summary

  1. Informer 依赖于 Reflector 模块,它有个组件为 xxxInformer,如 podInformer
  2. 具体资源的 Informer 包含了一个连接到kube-apiserverclient,通过ListWatch接口查询资源变更情况
  3. 检测到资源发生变化后,通过Controller 将数据放入队列DeltaFIFOQueue里,生产阶段完成
  4. DeltaFIFOQueue的另一端,有消费者在不停地处理资源变化的事件,处理逻辑主要分2步
    1. 将数据保存到本地存储Indexer,它的底层实现是一个并发安全的threadSafeMap
    2. 有些组件需要实时关注资源变化,会实时监听listen,就将事件分发到对应注册上来的listener上,自行处理
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
2月前
|
存储 Kubernetes 关系型数据库
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
|
2月前
|
Kubernetes 算法 调度
Kubernetes的灵魂核心:kube-scheduler
本文介绍了Kubernetes中关键组件kube-scheduler的工作原理,详细解释了其通过预选和优选过程为Pod选择合适节点的机制,并提供了一个简化的Python示例来模拟这一过程,帮助读者更好地理解和管理Kubernetes集群。
|
3月前
|
存储 Kubernetes Cloud Native
告别数据丢失的噩梦!PersistentVolume全攻略,让你轻松玩转Kubernetes数据持久化秘籍!
【8月更文挑战第25天】随着容器技术的发展,Kubernetes已成为云原生应用的主流部署平台。然而,数据持久化成为一个亟待解决的问题。Kubernetes通过PersistentVolume(PV)提供了解决方案。PV是一种存储资源对象,它抽象出底层存储技术(例如Ceph、GlusterFS或NFS),让用户仅需关注存储容量和访问模式等属性。PV由管理员创建与维护,Pod通过PersistentVolumeClaim(PVC)请求存储资源。本文详细介绍了PV的工作原理、配置方法及示例,帮助读者更好地理解和应用此功能。
128 2
|
3月前
|
存储 Kubernetes 安全
在K8S中,你用的flannel是哪个工作模式及fannel的底层原理如何实现数据报文转发的?
在K8S中,你用的flannel是哪个工作模式及fannel的底层原理如何实现数据报文转发的?
|
3月前
|
C# Windows 开发者
超越选择焦虑:深入解析WinForms、WPF与UWP——谁才是打造顶级.NET桌面应用的终极利器?从开发效率到视觉享受,全面解读三大框架优劣,助你精准匹配项目需求,构建完美桌面应用生态系统
【8月更文挑战第31天】.NET框架为开发者提供了多种桌面应用开发选项,包括WinForms、WPF和UWP。WinForms简单易用,适合快速开发基本应用;WPF提供强大的UI设计工具和丰富的视觉体验,支持XAML,易于实现复杂布局;UWP专为Windows 10设计,支持多设备,充分利用现代硬件特性。本文通过示例代码详细介绍这三种框架的特点,帮助读者根据项目需求做出明智选择。以下是各框架的简单示例代码,便于理解其基本用法。
157 0
|
3月前
|
存储 Kubernetes 调度
在K8S中,是怎么实现数据持久化的?
在K8S中,是怎么实现数据持久化的?
|
3月前
|
Kubernetes 安全 Linux
在K8S中,calico和cilium这两种cni有什么区别?cailico的ipip模型和ciliume的vxlan模型,两种不通模型性能也不同,它们怎么处理数据的?
在K8S中,calico和cilium这两种cni有什么区别?cailico的ipip模型和ciliume的vxlan模型,两种不通模型性能也不同,它们怎么处理数据的?
|
3月前
|
域名解析 Kubernetes 负载均衡
在K8S中,外部访问容器服务,比如说提供了一个域名,链路怎么走?数据经过哪些组件?
在K8S中,外部访问容器服务,比如说提供了一个域名,链路怎么走?数据经过哪些组件?
|
3月前
|
Kubernetes 关系型数据库 MySQL
k8s练习--通过NFS+PV+PVC+POD,部署一个MySQL服务,并将MySQL的数据进行持久化存储
本文档介绍了如何使用Kubernetes (K8s)、NFS、PersistentVolume (PV)、PersistentVolumeClaim (PVC)和Pod来部署并实现MySQL服务的数据持久化存储。Kubernetes是一个用于自动化部署、扩展和管理容器化应用的强大平台。NFS作为一种网络文件系统协议,能够使Kubernetes集群中的Pod跨节点访问共享文件。PV和PVC机制则提供了持久化的存储解决方案,确保数据即使在Pod生命周期结束后仍得以保留。
147 0
|
3月前
|
存储 缓存 Kubernetes
在K8S中,业务Pod数据如何存储?
在K8S中,业务Pod数据如何存储?