大家好,我是华仔。
接触kubernetes已经4年多了,不过多是停留在能够使用,对其原理、源码不是很熟悉。对于平常执行的命令,它背后执行的流程、逻辑也不是很清楚。所以,最近打算去看看k8s各模块的源码。一来是加深对k8s各模块的理解和认识;二来是方便以后遇到问题好分析问题的根本原因,有理有据,则可以服人;再者后续跳槽也不怕被面试官的技术问题所难到了。那么今天,就来简单说一说pod创建的源码吧。文章有错误的地方还请指正,轻喷。首先,k8s的源码在github上即可获取。本次我看的是1.21.3。另外,很多翻译都是直译或翻译软件翻译的。请谅解。
正文
1、k8s源码中针对pod的增删改查是在源码包/pkg/kubelet/kubelet.go中的syncLoop()进行。如下所示:
// syncLoop is the main loop for processing changes. It watches for changes from // three channels (file, apiserver, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync-frequency seconds. Never returns. // syncLoop是处理更改的主循环。它感知来自三个channel(file,apiserver,http)的pod的变化,并且聚合它们。有任何的改变发生,将运行状态同步为期望状态。反之,则在每个同步周期内同步最后已知的期望状态。 func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { klog.InfoS("Starting kubelet main sync loop")
在syncLoop()中则通过kl.syncLoopIteration()针对pod具体执行具体的操作。
kl.syncLoopMonitor.Store(kl.clock.Now()) if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break }
2、在syncLoopIteration有几个重要的参数,如下所示:
// Arguments: // 1. configCh: a channel to read config events from // 2. handler: the SyncHandler to dispatch pods to // 3. syncCh: a channel to read periodic sync events from // 4. housekeepingCh: a channel to read housekeeping events from // 5. plegCh: a channel to read PLEG updates from // * configCh: dispatch the pods for the config change to the appropriate // handler callback for the event type // * plegCh: update the runtime cache; sync pod // * syncCh: sync all pods waiting for sync // * housekeepingCh: trigger cleanup of pods // * health manager: sync pods that have failed or in which one or more // containers have failed health checks func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { case u, open := <-configCh: // Update from a config source; dispatch it to the right handler // callback. if !open { klog.ErrorS(nil, "Update channel is closed, exiting the sync loop") return false }
SyncHandler是一个interface。包含对pod常见操作的几个方法。该接口由kubelet来实现。如下所示:
// SyncHandler is an interface implemented by Kubelet, for testability # pod创建、更新、 删除... type SyncHandler interface { HandlePodAdditions(pods []*v1.Pod) HandlePodUpdates(pods []*v1.Pod) HandlePodRemoves(pods []*v1.Pod) HandlePodReconcile(pods []*v1.Pod) HandlePodSyncs(pods []*v1.Pod) HandlePodCleanups() error }
3、针对pod可进行的操作如下,每个操作都有对应的方法。比如ADD,就会去执行HandlePodAdditions方法
// These constants identify the PodOperations that can be made on a pod configuration. const ( // SET is the current pod configuration. SET PodOperation = iota // ADD signifies pods that are new to this source. ADD // DELETE signifies pods that are gracefully deleted from this source. DELETE // REMOVE signifies pods that have been removed from this source. REMOVE // UPDATE signifies pods have been updated in this source. UPDATE // RECONCILE signifies pods that have unexpected status in this source, // kubelet should reconcile status with this source. RECONCILE ) switch u.Op { case kubetypes.ADD: klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods)) // After restarting, kubelet will get all existing pods through // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. handler.HandlePodAdditions(u.Pods)
4、HandlePodAdditions又是如何去执行创建pod的呢?主要有以下几个操作:
1. 根据pod的创建时间进行排序 sort.Sort(sliceutils.PodsByCreationTime(pods)) 2. 将pod添加到podmanager中.因为kubelet它会依赖这个pod manager作为期望状态的一个凭证。 如果一个在pod manager中无法查询,那么就意味着它已经被apiserver删除了,不再需要其他操作 // Always add the pod to the pod manager. Kubelet relies on the pod // manager as the source of truth for the desired state. If a pod does // not exist in the pod manager, it means that it has been deleted in // the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod) 3. 判断pod是不是静态pod mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) 4. 通过dispatchWork分发任务 kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) 5. 将pod加入到probe manager,即健康检查.包括startup probe、liveness probe、readiness probe。 kl.probeManager.AddPod(pod)
dispatchWork又做了哪些事情呢?如下:
// Run the sync in an async worker. 在一个异步worker中执行同步 kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) { if err != nil { metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start)) } }, })
那么UpdatePod()又做哪些事情呢?
// Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. // 创建一个新的pod worker,意味着这是一个新的pod go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }()
managePodLoop()去执行同步。
for update := range podUpdates { err := func() error { podUID := update.Pod.UID // This is a blocking call that would return only if the cache // has an entry for the pod that is newer than minRuntimeCache // Time. This ensures the worker doesn't start syncing until // after the cache is at least newer than the finished time of // the previous sync. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) if err != nil { // This is the legacy event thrown by manage pod loop // all other events are now dispatched from syncPodFn p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) return err } // 这里去做同步 err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now() return err }()
5、最终调用到pkg/kubelet/kuberuntime/kuberuntime_manager.go中SyncPod()进行pod的创建
// SyncPod syncs the running pod into the desired pod by executing following steps: // 执行以下的步骤将运行的pod同步到期望的状态 // 1. Compute sandbox and container changes. // 计算sanbox和container改变 // 2. Kill pod sandbox if necessary. // 如果有必要就删除pod sandbox // 3. Kill any containers that should not be running. // 删除不需要运行的容器 // 4. Create sandbox if necessary. // 需要的情况下创建sandbox // 5. Create ephemeral containers. // 创建临时容器 // 6. Create init containers. // 创建初始化容器 // 7. Create normal containers. // 创建普通容器 func (m *kubeGenericRuntimeManager) SyncPod() // Step 1: Compute sandbox and container changes. podContainerChanges := m.computePodActions(pod, podStatus) klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod)) if podContainerChanges.CreateSandbox { ref, err := ref.GetReference(legacyscheme.Scheme, pod) if err != nil { klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod)) } if podContainerChanges.SandboxID != "" { m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.") } else { klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod)) } } // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.KillPod { // Step 3: kill any running containers in this pod which are not to keep. for containerID, containerInfo := range podContainerChanges.ContainersToKill { klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod)) return } // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod)) createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) // Step 5: start ephemeral containers // These are started "prior" to init containers to allow running ephemeral containers even when there // are errors starting an init container. In practice init containers will start first since ephemeral // containers cannot be specified on pod creation. if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { for _, idx := range podContainerChanges.EphemeralContainersToStart { start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx])) } } // Step 6: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. if err := start("init container", containerStartSpec(container)); err != nil { return } // Successfully started the container; clear the entry in the failure klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod)) } // Step 7: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { start("container", containerStartSpec(&pod.Spec.Containers[idx])) }
6、另外,pod worker还要做以下事情:
# 创建pod数据目录、volume、获取image pull secrets。。。 newPodWorkers(klet.syncPod --->pkg/kubelet/kubelet.go) //通过syncPod kubetypes.SyncPodKill kubetypes.SyncPodCreate podStatus.IPs = append(podStatus.IPs, ipInfo.IP) runnable.Admit kubetypes.IsStaticPod(pod) kl.makePodDataDirs(pod) kl.volumeManager.WaitForAttachAndMount(pod) kl.getPullSecretsForPod(pod) kl.containerRuntime.SyncPod(pkg/kubelet/container