聚焦目标
理解 kubelet 的运行机制
目录
Run
从主函数找到run函数,代码较长,我精简了一下
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// 一长串的配置初始化与验证
// done channel,用来通知运行结束
done := make(chan struct{
})
// 注册到configz模块
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}
// 获取节点的相关信息
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
switch {
// 独立运行模式
case standaloneMode:
// 对客户端进行初始化
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
}
// cgroup 相关初始化
var cgroupRoots []string
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
} else if runtimeCgroup != "" {
cgroupRoots = append(cgroupRoots, runtimeCgroup)
}
if s.SystemCgroups != "" {
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
// 下面一大块都是对 ContainerManager 的初始化
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
// cpu相关信息
var reservedSystemCPUs cpuset.CPUSet
// ContainerManager的实例化
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
// Node 相关配置
cm.NodeConfig{
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}
// 内存OOM相关
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.Warning(err)
}
// 预初始化Runtime
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
if err != nil {
return err
}
// 运行Kubelet
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
// 通知deamon的systemd
go daemon.SdNotify(false, "READY=1")
// 阻塞
select {
case <-done:
break
case <-ctx.Done():
break
}
return nil
}
RunKubelet
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
// 获取节点信息
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
hostnameOverridden := len(kubeServer.HostnameOverride) > 0
// 创建并初始化 kubelet
k, err := createAndInitKubelet()
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
// 开始kubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}
// 开始运行,都是并发的
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// 运行
go k.Run(podCfg.Updates())
// 开启kubelet的http服务端
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)
}
// 只读端口
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
Kubelet
// 这里的k是一个interface定义,我们需要回头看看
type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler bool)
ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
// 查看对应的实例化函数
func createAndInitKubelet() (k kubelet.Bootstrap, err error) {
k, err = kubelet.NewMainKubelet()
return k, nil
}
func NewMainKubelet() (*Kubelet, error) {
// 参数的初始化
// klet 的实例化结构
klet := &Kubelet{
}
// 下面是klet中各种参数的填充
return klet, nil
}
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// 内部模块的初始化
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// 与kube-apiserver同步节点状态
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// 一个kill pod的goroutine
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
kl.statusManager.Start()
kl.probeManager.Start()
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
kl.pleg.Start()
// 同步的主逻辑
kl.syncLoop(updates, kl)
}
syncLoop
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
// 开始运行kubelet的主同步循环
klog.Info("Starting kubelet main sync loop.")
// ticker每秒一次
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// housekeeping 清理周期
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
for {
kl.syncLoopMonitor.Store(kl.clock.Now())
// 同步
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
// 这里的3个channel比较重要:configCh用于配置,syncCh用于触发同步,housekeepingCh用于触发清理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// config channel关闭
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
// 对应不同的操作
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
klog.Errorf("Kubelet does not support snapshot update")
default:
klog.Errorf("Invalid event type received: %d.", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
case <-syncCh:
// 获取需要同步的pod,里面的逻辑暂不细看
// 我们在这里接收到示例中要创建的nginx pod
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
// 开始处理
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// 清理没有ready,直接跳过
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
// 开始清理pod
klog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}
handler
往前查找代码,handler就是Kubelet
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
// 获取pod,然后分发
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// 调用UpdatePod的函数
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})
}
// 查到初始化的地方 klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
// 当pod不存在时,满足示例,是新建的pod
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 并发处理
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
// 同步pod的函数
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
p.wrapUp(update.Pod.UID, err)
}
}
// 找到syncPodFn被实例化的函数
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// 这里有一长串逻辑,不方便阅读,我们只关注最核心的部分
// 调用 container runtime进行创建pod,再往下就是容器相关了
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
return err
}
}
return nil
}
return nil
}
Summary
kubelet
是kubernetes的Node
节点上的管理者kubelet
接收来自kube-apiserver
上的pod消息,用Ticker
这种周期性的方式触发同步函数kubelet
会异步地对容器进行管理,调用对应容器的接口(Container Runtime Interface)