pod创建源码分析

简介: pod创建源码分析

大家好,我是华仔。


接触kubernetes已经4年多了,不过多是停留在能够使用,对其原理、源码不是很熟悉。对于平常执行的命令,它背后执行的流程、逻辑也不是很清楚。所以,最近打算去看看k8s各模块的源码。一来是加深对k8s各模块的理解和认识;二来是方便以后遇到问题好分析问题的根本原因,有理有据,则可以服人;再者后续跳槽也不怕被面试官的技术问题所难到了。那么今天,就来简单说一说pod创建的源码吧。文章有错误的地方还请指正,轻喷。首先,k8s的源码在github上即可获取。本次我看的是1.21.3。另外,很多翻译都是直译或翻译软件翻译的。请谅解。


正文


1、k8s源码中针对pod的增删改查是在源码包/pkg/kubelet/kubelet.go中的syncLoop()进行。如下所示:


// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
// syncLoop是处理更改的主循环。它感知来自三个channel(file,apiserver,http)的pod的变化,并且聚合它们。有任何的改变发生,将运行状态同步为期望状态。反之,则在每个同步周期内同步最后已知的期望状态。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
 klog.InfoS("Starting kubelet main sync loop")


在syncLoop()中则通过kl.syncLoopIteration()针对pod具体执行具体的操作。


kl.syncLoopMonitor.Store(kl.clock.Now())
  if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
   break
  }


2、在syncLoopIteration有几个重要的参数,如下所示:


// Arguments:
// 1.  configCh:       a channel to read config events from
// 2.  handler:        the SyncHandler to dispatch pods to
// 3.  syncCh:         a channel to read periodic sync events from
// 4.  housekeepingCh: a channel to read housekeeping events from
// 5.  plegCh:         a channel to read PLEG updates from
// * configCh: dispatch the pods for the config change to the appropriate
//             handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * housekeepingCh: trigger cleanup of pods
// * health manager: sync pods that have failed or in which one or more
//                     containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
 select {
 case u, open := <-configCh:
  // Update from a config source; dispatch it to the right handler
  // callback.
  if !open {
   klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
   return false
  }


SyncHandler是一个interface。包含对pod常见操作的几个方法。该接口由kubelet来实现。如下所示:


// SyncHandler is an interface implemented by Kubelet, for testability
# pod创建、更新、 删除...
type SyncHandler interface {
 HandlePodAdditions(pods []*v1.Pod) 
 HandlePodUpdates(pods []*v1.Pod) 
 HandlePodRemoves(pods []*v1.Pod)
 HandlePodReconcile(pods []*v1.Pod)
 HandlePodSyncs(pods []*v1.Pod)
 HandlePodCleanups() error
}


3、针对pod可进行的操作如下,每个操作都有对应的方法。比如ADD,就会去执行HandlePodAdditions方法


// These constants identify the PodOperations that can be made on a pod configuration.
const (
 // SET is the current pod configuration.
 SET PodOperation = iota
 // ADD signifies pods that are new to this source.
 ADD
 // DELETE signifies pods that are gracefully deleted from this source.
 DELETE
 // REMOVE signifies pods that have been removed from this source.
 REMOVE
 // UPDATE signifies pods have been updated in this source.
 UPDATE
 // RECONCILE signifies pods that have unexpected status in this source,
 // kubelet should reconcile status with this source.
 RECONCILE
)
switch u.Op {
  case kubetypes.ADD:
   klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
   // After restarting, kubelet will get all existing pods through
   // ADD as if they are new pods. These pods will then go through the
   // admission process and *may* be rejected. This can be resolved
   // once we have checkpointing.
   handler.HandlePodAdditions(u.Pods)


4、HandlePodAdditions又是如何去执行创建pod的呢?主要有以下几个操作:


1. 根据pod的创建时间进行排序
sort.Sort(sliceutils.PodsByCreationTime(pods))
2. 将pod添加到podmanager中.因为kubelet它会依赖这个pod manager作为期望状态的一个凭证。
如果一个在pod manager中无法查询,那么就意味着它已经被apiserver删除了,不再需要其他操作
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
3. 判断pod是不是静态pod
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
4. 通过dispatchWork分发任务
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
5. 将pod加入到probe manager,即健康检查.包括startup probe、liveness probe、readiness probe。 
kl.probeManager.AddPod(pod)


dispatchWork又做了哪些事情呢?如下:


// Run the sync in an async worker. 在一个异步worker中执行同步
 kl.podWorkers.UpdatePod(&UpdatePodOptions{
  Pod:        pod,
  MirrorPod:  mirrorPod,
  UpdateType: syncType,
  OnCompleteFunc: func(err error) {
   if err != nil {
    metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
   }
  },
 })


那么UpdatePod()又做哪些事情呢?


// Creating a new pod worker either means this is a new pod, or that the
  // kubelet just restarted. In either case the kubelet is willing to believe
  // the status of the pod for the first pod worker sync. See corresponding
  // comment in syncPod.
    // 创建一个新的pod worker,意味着这是一个新的pod
  go func() {
   defer runtime.HandleCrash()
   p.managePodLoop(podUpdates)
  }()


managePodLoop()去执行同步。


for update := range podUpdates {
  err := func() error {
   podUID := update.Pod.UID
   // This is a blocking call that would return only if the cache
   // has an entry for the pod that is newer than minRuntimeCache
   // Time. This ensures the worker doesn't start syncing until
   // after the cache is at least newer than the finished time of
   // the previous sync.
   status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
   if err != nil {
    // This is the legacy event thrown by manage pod loop
    // all other events are now dispatched from syncPodFn
    p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
    return err
   }
      // 这里去做同步
   err = p.syncPodFn(syncPodOptions{
    mirrorPod:      update.MirrorPod,
    pod:            update.Pod,
    podStatus:      status,
    killPodOptions: update.KillPodOptions,
    updateType:     update.UpdateType,
   })
   lastSyncTime = time.Now()
   return err
  }()


5、最终调用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()进行pod的创建


// SyncPod syncs the running pod into the desired pod by executing following steps:
// 执行以下的步骤将运行的pod同步到期望的状态
//  1. Compute sandbox and container changes.
// 计算sanbox和container改变
//  2. Kill pod sandbox if necessary.
// 如果有必要就删除pod sandbox
//  3. Kill any containers that should not be running.
// 删除不需要运行的容器
//  4. Create sandbox if necessary.
// 需要的情况下创建sandbox
//  5. Create ephemeral containers.
// 创建临时容器
//  6. Create init containers.
// 创建初始化容器
//  7. Create normal containers.
// 创建普通容器
func (m *kubeGenericRuntimeManager) SyncPod()
// Step 1: Compute sandbox and container changes.
 podContainerChanges := m.computePodActions(pod, podStatus)
 klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
 if podContainerChanges.CreateSandbox {
  ref, err := ref.GetReference(legacyscheme.Scheme, pod)
  if err != nil {
   klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
  }
  if podContainerChanges.SandboxID != "" {
   m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
  } else {
   klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
  }
 }
  // Step 2: Kill the pod if the sandbox has changed.
 if podContainerChanges.KillPod {
  // Step 3: kill any running containers in this pod which are not to keep.
  for containerID, containerInfo := range podContainerChanges.ContainersToKill {
   klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
   killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
   result.AddSyncResult(killContainerResult)
   if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
    killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
    klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
    return
   }
      // Step 4: Create a sandbox for the pod if necessary.
 podSandboxID := podContainerChanges.SandboxID
 if podContainerChanges.CreateSandbox {
  var msg string
  var err error
  klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
  createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
  result.AddSyncResult(createSandboxResult)
  podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    // Step 5: start ephemeral containers
 // These are started "prior" to init containers to allow running ephemeral containers even when there
 // are errors starting an init container. In practice init containers will start first since ephemeral
 // containers cannot be specified on pod creation.
 if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
  for _, idx := range podContainerChanges.EphemeralContainersToStart {
   start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
  }
 }
  // Step 6: start the init container.
 if container := podContainerChanges.NextInitContainerToStart; container != nil {
  // Start the next init container.
  if err := start("init container", containerStartSpec(container)); err != nil {
   return
  }
  // Successfully started the container; clear the entry in the failure
  klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
 }
 // Step 7: start containers in podContainerChanges.ContainersToStart.
 for _, idx := range podContainerChanges.ContainersToStart {
  start("container", containerStartSpec(&pod.Spec.Containers[idx]))
 }


6、另外,pod worker还要做以下事情:


# 创建pod数据目录、volume、获取image pull secrets。。。
newPodWorkers(klet.syncPod --->pkg/kubelet/kubelet.go) //通过syncPod
                               kubetypes.SyncPodKill
                                kubetypes.SyncPodCreate
                                podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
                                runnable.Admit
                                kubetypes.IsStaticPod(pod)
                               kl.makePodDataDirs(pod)
                                kl.volumeManager.WaitForAttachAndMount(pod)
                                kl.getPullSecretsForPod(pod)
                                kl.containerRuntime.SyncPod(pkg/kubelet/container
相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
存储 前端开发 JavaScript
ahooks 正式发布:值得拥抱的 React Hooks 工具库
ahook定位于一套基于 React Hooks 的工具库,核心围绕 React Hooks 的逻辑封装能力,降低代码复杂度和避免团队的重复建设为背景,共同建设和维护阿里经济体层面的 React Hooks 库。
24101 1
ahooks 正式发布:值得拥抱的 React Hooks 工具库
|
Kubernetes 应用服务中间件 HSF
容器服务 kubernetes(ACK)中应用优雅上下线
容器服务 kubernetes(ACK)中应用优雅上下线
7859 0
|
5月前
|
云安全 人工智能 安全
Ollama漏洞引发的“血案”—自建LLM的安全思考
「云安全技术观察」聚焦云计算时代安全技术前沿与实践,涵盖AI大模型风险、云原生安全体系建设及攻防对抗等内容,提供落地技术参考与前瞻性洞察。
615 0
|
1月前
|
Web App开发 JSON 监控
零配置方案:Zabbix + HTTP 代理实现内网穿透工具的自动化管理
本文作者蔡斯,Zabbix社区签约专家,Zabbix 6.0/7.0官方译者。文章探讨如何利用Zabbix的HTTP代理与低级别发现(LLD)功能,实现对免费内网穿透工具的自动化监控,解决多账号、多设备管理难、服务状态盲区等痛点,打造零侵入、零配置、无限扩展的智能监控体系。
162 0
|
安全 Go 开发者
使用 contextvars 管理上下文变量
使用 contextvars 管理上下文变量
356 0
|
存储 算法 Java
JVM组成结构详解:类加载、运行时数据区、执行引擎与垃圾收集器的协同工作
【8月更文挑战第25天】Java虚拟机(JVM)是Java平台的核心,它使Java程序能在任何支持JVM的平台上运行。JVM包含复杂的结构,如类加载子系统、运行时数据区、执行引擎、本地库接口和垃圾收集器。例如,当运行含有第三方库的程序时,类加载子系统会加载必要的.class文件;运行时数据区管理程序数据,如对象实例存储在堆中;执行引擎执行字节码;本地库接口允许Java调用本地应用程序;垃圾收集器则负责清理不再使用的对象,防止内存泄漏。这些组件协同工作,确保了Java程序的高效运行。
236 3
|
JSON 缓存 Java
一图读懂 苍穹外卖项目
一图读懂 苍穹外卖项目
753 4
|
存储 安全 算法
智能终端信息安全概念(五):硬件安全技术—加密芯片
智能终端信息安全概念(五):硬件安全技术—加密芯片
576 0
|
Kubernetes Cloud Native 应用服务中间件
kubectl-ai:K8S资源清单的GPT助手
kubectl-ai:K8S资源清单的GPT助手
|
监控 Java 测试技术
Mockito教程
Mockito教程
2718 0
Mockito教程