【K8s源码品读】013:Phase 1 - kubelet - 节点上控制容器生命周期的管理者

简介: 理解 kubelet 的运行机制

聚焦目标

理解 kubelet 的运行机制

目录

  1. 运行的主函数
  2. 运行kubelet
  3. 核心数据管理Kubelet
  4. 同步循环
  5. 处理pod的同步工作
  6. 总结

Run

从主函数找到run函数,代码较长,我精简了一下

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
   
    // 一长串的配置初始化与验证

  // done channel,用来通知运行结束
    done := make(chan struct{
   })

    // 注册到configz模块
    err = initConfigz(&s.KubeletConfiguration)
    if err != nil {
   
        klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
    }

  // 获取节点的相关信息
    hostName, err := nodeutil.GetHostname(s.HostnameOverride)
    if err != nil {
   
        return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
    if err != nil {
   
        return err
    }

    switch {
   
  // 独立运行模式
    case standaloneMode:
    // 对客户端进行初始化
    case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
    }

  // cgroup 相关初始化
    var cgroupRoots []string
    nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
    cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
    kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
    if err != nil {
   
        klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
    } else if kubeletCgroup != "" {
   
        cgroupRoots = append(cgroupRoots, kubeletCgroup)
    }

    runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
    if err != nil {
   
        klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
    } else if runtimeCgroup != "" {
   
        cgroupRoots = append(cgroupRoots, runtimeCgroup)
    }

    if s.SystemCgroups != "" {
   
        cgroupRoots = append(cgroupRoots, s.SystemCgroups)
    }

  // 下面一大块都是对 ContainerManager 的初始化
    if kubeDeps.ContainerManager == nil {
   
        if s.CgroupsPerQOS && s.CgroupRoot == "" {
   
            klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
            s.CgroupRoot = "/"
        }

        // cpu相关信息
        var reservedSystemCPUs cpuset.CPUSet

    // ContainerManager的实例化
        kubeDeps.ContainerManager, err = cm.NewContainerManager(
            kubeDeps.Mounter,
            kubeDeps.CAdvisorInterface,
      // Node 相关配置
            cm.NodeConfig{
   },
            s.FailSwapOn,
            devicePluginEnabled,
            kubeDeps.Recorder)

        if err != nil {
   
            return err
        }
    }

    // 内存OOM相关
    oomAdjuster := kubeDeps.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
   
        klog.Warning(err)
    }

    // 预初始化Runtime
    err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
        kubeDeps, &s.ContainerRuntimeOptions,
        s.ContainerRuntime,
        s.RuntimeCgroups,
        s.RemoteRuntimeEndpoint,
        s.RemoteImageEndpoint,
        s.NonMasqueradeCIDR)
    if err != nil {
   
        return err
    }

  // 运行Kubelet
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
   
        return err
    }

    // 通知deamon的systemd
    go daemon.SdNotify(false, "READY=1")

  // 阻塞
    select {
   
    case <-done:
        break
    case <-ctx.Done():
        break
    }

    return nil
}

RunKubelet

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
   
    // 获取节点信息
  hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
    if err != nil {
   
        return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
    if err != nil {
   
        return err
    }
    hostnameOverridden := len(kubeServer.HostnameOverride) > 0

  // 创建并初始化 kubelet
    k, err := createAndInitKubelet()
    if err != nil {
   
        return fmt.Errorf("failed to create kubelet: %v", err)
    }

    if runOnce {
   
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
   
            return fmt.Errorf("runonce failed: %v", err)
        }
        klog.Info("Started kubelet as runonce")
    } else {
   
    // 开始kubelet
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
        klog.Info("Started kubelet")
    }
    return nil
}

// 开始运行,都是并发的
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
   
    // 运行
    go k.Run(podCfg.Updates())

    // 开启kubelet的http服务端
    if enableServer {
   
        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
            enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)

    }
  // 只读端口
    if kubeCfg.ReadOnlyPort > 0 {
   
        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
   
        go k.ListenAndServePodResources()
    }
}

Kubelet

// 这里的k是一个interface定义,我们需要回头看看
type Bootstrap interface {
   
    GetConfiguration() kubeletconfiginternal.KubeletConfiguration
    BirthCry()
    StartGarbageCollection()
    ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler bool)
    ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
    ListenAndServePodResources()
    Run(<-chan kubetypes.PodUpdate)
    RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}

// 查看对应的实例化函数
func createAndInitKubelet() (k kubelet.Bootstrap, err error) {
   
    k, err = kubelet.NewMainKubelet()
    return k, nil
}

func NewMainKubelet() (*Kubelet, error) {
   
    // 参数的初始化

  // klet 的实例化结构
    klet := &Kubelet{
   }

  // 下面是klet中各种参数的填充
    return klet, nil
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
   
    // 内部模块的初始化
    if err := kl.initializeModules(); err != nil {
   
        kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
        klog.Fatal(err)
    }

    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

    if kl.kubeClient != nil {
   
    // 与kube-apiserver同步节点状态
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
        go kl.fastStatusUpdateOnce()
        go kl.nodeLeaseController.Run(wait.NeverStop)
    }
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    if kl.makeIPTablesUtilChains {
   
        kl.initNetworkUtil()
    }

  // 一个kill pod的goroutine
    go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

    kl.statusManager.Start()
    kl.probeManager.Start()

    if kl.runtimeClassManager != nil {
   
        kl.runtimeClassManager.Start(wait.NeverStop)
    }

    kl.pleg.Start()
  // 同步的主逻辑
    kl.syncLoop(updates, kl)
}

syncLoop

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
   
  // 开始运行kubelet的主同步循环
    klog.Info("Starting kubelet main sync loop.")

  // ticker每秒一次
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
  // housekeeping 清理周期
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()

    for {
   
        kl.syncLoopMonitor.Store(kl.clock.Now())
    // 同步
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
   
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

// 这里的3个channel比较重要:configCh用于配置,syncCh用于触发同步,housekeepingCh用于触发清理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
   
    select {
   
    case u, open := <-configCh:
    // config channel关闭
        if !open {
   
            klog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }
        // 对应不同的操作
        switch u.Op {
   
        case kubetypes.ADD:
            klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.SET:
            klog.Errorf("Kubelet does not support snapshot update")
        default:
            klog.Errorf("Invalid event type received: %d.", u.Op)
        }

        kl.sourcesReady.AddSource(u.Source)

    case e := <-plegCh:

    case <-syncCh:
        // 获取需要同步的pod,里面的逻辑暂不细看
    // 我们在这里接收到示例中要创建的nginx pod
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
   
            break
        }
        klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
    // 开始处理
        handler.HandlePodSyncs(podsToSync)
    case update := <-kl.livenessManager.Updates():

    case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
   
      // 清理没有ready,直接跳过
            klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
        } else {
   
      // 开始清理pod
            klog.V(4).Infof("SyncLoop (housekeeping)")
            if err := handler.HandlePodCleanups(); err != nil {
   
                klog.Errorf("Failed cleaning pods: %v", err)
            }
        }
    }
    return true
}

handler

往前查找代码,handler就是Kubelet

func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
   
    start := kl.clock.Now()
    for _, pod := range pods {
   
    // 获取pod,然后分发
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
    }
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
   
    // 调用UpdatePod的函数
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
   
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
   
            if err != nil {
   
                metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
            }
        },
    })
}

// 查到初始化的地方     klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
   
    pod := options.Pod
    uid := pod.UID
    var podUpdates chan UpdatePodOptions
    var exists bool

    p.podLock.Lock()
    defer p.podLock.Unlock()
  // 当pod不存在时,满足示例,是新建的pod
    if podUpdates, exists = p.podUpdates[uid]; !exists {
   
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

    // 并发处理
        go func() {
   
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    if !p.isWorking[pod.UID] {
   
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } else {
   
        update, found := p.lastUndeliveredWorkUpdate[pod.UID]
        if !found || update.UpdateType != kubetypes.SyncPodKill {
   
            p.lastUndeliveredWorkUpdate[pod.UID] = *options
        }
    }
}

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
   
    var lastSyncTime time.Time
    for update := range podUpdates {
   
        err := func() error {
   
      // 同步pod的函数
            err = p.syncPodFn(syncPodOptions{
   
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            lastSyncTime = time.Now()
            return err
        }()

        p.wrapUp(update.Pod.UID, err)
    }
}

// 找到syncPodFn被实例化的函数
func (kl *Kubelet) syncPod(o syncPodOptions) error {
   

  // 这里有一长串逻辑,不方便阅读,我们只关注最核心的部分

  // 调用 container runtime进行创建pod,再往下就是容器相关了
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
   
        for _, r := range result.SyncResults {
   
            if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
   
                return err
            }
        }
        return nil
    }
    return nil
}

Summary

  1. kubelet是kubernetes的Node节点上的管理者

  2. kubelet接收来自kube-apiserver上的pod消息,用Ticker这种周期性的方式触发同步函数

  3. kubelet会异步地对容器进行管理,调用对应容器的接口(Container Runtime Interface)
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
6天前
|
Perl
|
2天前
|
Kubernetes Linux 虚拟化
入门级容器技术解析:Docker和K8s的区别与关系
本文介绍了容器技术的发展历程及其重要组成部分Docker和Kubernetes。从传统物理机到虚拟机,再到容器化,每一步都旨在更高效地利用服务器资源并简化应用部署。容器技术通过隔离环境、减少依赖冲突和提高可移植性,解决了传统部署方式中的诸多问题。Docker作为容器化平台,专注于创建和管理容器;而Kubernetes则是一个强大的容器编排系统,用于自动化部署、扩展和管理容器化应用。两者相辅相成,共同推动了现代云原生应用的快速发展。
29 10
|
16天前
|
Prometheus Kubernetes 监控
OpenAI故障复盘 - 阿里云容器服务与可观测产品如何保障大规模K8s集群稳定性
聚焦近日OpenAI的大规模K8s集群故障,介绍阿里云容器服务与可观测团队在大规模K8s场景下我们的建设与沉淀。以及分享对类似故障问题的应对方案:包括在K8s和Prometheus的高可用架构设计方面、事前事后的稳定性保障体系方面。
|
2月前
|
运维 Kubernetes Shell
【赵渝强老师】K8s中Pod的临时容器
Pod 是 Kubernetes 中的基本调度单位,由一个或多个容器组成,包括业务容器、基础容器、初始化容器和临时容器。临时容器用于故障排查和性能诊断,不适用于构建应用程序。当 Pod 中的容器异常退出或容器镜像不包含调试工具时,临时容器非常有用。文中通过示例展示了如何使用 `kubectl debug` 命令创建临时容器进行调试。
|
2月前
|
Kubernetes 调度 容器
【赵渝强老师】K8s中Pod中的业务容器
Pod 是 Kubernetes 中的基本调度单元,由一个或多个容器组成。除了业务容器,Pod 还包括基础容器、初始化容器和临时容器。本文通过示例介绍如何创建包含业务容器的 Pod,并提供了一个视频讲解。示例中创建了一个名为 &quot;busybox-container&quot; 的业务容器,并使用 `kubectl create -f firstpod.yaml` 命令部署 Pod。
|
2月前
|
Kubernetes 容器 Perl
【赵渝强老师】K8s中Pod中的初始化容器
Kubernetes的Pod包含业务容器、基础容器、初始化容器和临时容器。初始化容器在业务容器前运行,用于执行必要的初始化任务。本文介绍了初始化容器的作用、配置方法及优势,并提供了一个示例。
|
3天前
|
缓存 容灾 网络协议
ACK One多集群网关:实现高效容灾方案
ACK One多集群网关可以帮助您快速构建同城跨AZ多活容灾系统、混合云同城跨AZ多活容灾系统,以及异地容灾系统。
|
14天前
|
Kubernetes Ubuntu 网络安全
ubuntu使用kubeadm搭建k8s集群
通过以上步骤,您可以在 Ubuntu 系统上使用 kubeadm 成功搭建一个 Kubernetes 集群。本文详细介绍了从环境准备、安装 Kubernetes 组件、初始化集群到管理和使用集群的完整过程,希望对您有所帮助。在实际应用中,您可以根据具体需求调整配置,进一步优化集群性能和安全性。
61 12
|
18天前
|
Kubernetes 网络协议 应用服务中间件
Kubernetes Ingress:灵活的集群外部网络访问的利器
《Kubernetes Ingress:集群外部访问的利器-打造灵活的集群网络》介绍了如何通过Ingress实现Kubernetes集群的外部访问。前提条件是已拥有Kubernetes集群并安装了kubectl工具。文章详细讲解了Ingress的基本组成(Ingress Controller和资源对象),选择合适的版本,以及具体的安装步骤,如下载配置文件、部署Nginx Ingress Controller等。此外,还提供了常见问题的解决方案,例如镜像下载失败的应对措施。最后,通过部署示例应用展示了Ingress的实际使用方法。
36 2
|
30天前
|
存储 Kubernetes 关系型数据库
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
本文源自2024云栖大会苏雅诗的演讲,探讨了K8s集群业务为何需要灾备及其重要性。文中强调了集群与业务高可用配置对稳定性的重要性,并指出人为误操作等风险,建议实施周期性和特定情况下的灾备措施。针对容器化业务,提出了灾备的新特性与需求,包括工作负载为核心、云资源信息的备份,以及有状态应用的数据保护。介绍了ACK推出的备份中心解决方案,支持命名空间、标签、资源类型等维度的备份,并具备存储卷数据保护功能,能够满足GitOps流程企业的特定需求。此外,还详细描述了备份中心的使用流程、控制台展示、灾备难点及解决方案等内容,展示了备份中心如何有效应对K8s集群资源和存储卷数据的灾备挑战。