Kubernetes的Device Plugin机制源码解析(1)

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: Kubernetes 1.8 引入的Device Plugin机制,通过扩展的方式实现支持GPU、FPGA、高性能 NIC、InfiniBand等各种设备的集成。而Device Manager正是Kubelet内负责Device Plugin交互和设备生命周期管理的模块,在了解其基本设计后,本文对Device Manager的源码分析,理解其运作方式。

Kubernetes 1.8 引入的Device Plugin机制,通过扩展的方式实现支持GPU、FPGA、高性能 NIC、InfiniBand等各种设备的集成。而Device Manager正是Kubelet内负责Device Plugin交互和设备生命周期管理的模块,在了解其基本设计后,我们需要通过对Device Manager的源码分析,理解其运作方式。

基本原则

首先明确目标:

并不是搞懂Kubelet的所有实现,而是希望理解Device Manager如何在资源发现,Pod创建,设备健康检查过程中所做的工作以及其如何与Kubelet交互,所以我们会忽略掉与Device Manager无关的操作。

这里是我阅读代码的原则和一些体会:

  • 理解接口,搞清楚和外部模块的交互
  • 理解实现接口的结构体
  • 从用户场景的角度将方法调用和数据结构关联起来,好比将剧情和人物串联起来,了解了人物设定后,就可以更快速切入代码的调用过程;而代码调用的阅读也可以加深对数据结构设计的理解
  • Kubernetes的代码比较复杂,很难一下就搞清楚每一个数据结构定义的目的和用途,这时我们可以把问题和假设记下来,不要过分纠结,可以在后面求证。书读百变其意自现,代码也是一样,当你逐渐熟悉了代码的脉络的时候,有些问题也会迎刃而解
  • 由于Device Manager工作在Kubelet中,对于Kubelet的源码通篇的了解是理解具体模块运作机制的基础

P.S. 本文解析的Kubernetes源码版本是1.9.3

DeviceManager的核心代码都在 pkg/kubelet/cm/deviceplugin

devicePlugin.png

DeviceManager接口定义

所在的文件

pkg/kubelet/cm/deviceplugin/types.go

具体定义:

// Manager manages all the Device Plugins running on a node.
type Manager interface {
    // Start starts device plugin registration service.
    Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error

    // Devices is the map of devices that have registered themselves
    // against the manager.
    // The map key is the ResourceName of the device plugins.
    Devices() map[string][]pluginapi.Device

    // Allocate configures and assigns devices to pods. The pods are provided
    // through the pod admission attributes in the attrs argument. From the
    // requested device resources, Allocate will communicate with the owning
    // device plugin to allow setup procedures to take place, and for the
    // device plugin to provide runtime settings to use the device (environment
    // variables, mount points and device files). The node object is provided
    // for the device manager to update the node capacity to reflect the
    // currently available devices.
    Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error

    // Stop stops the manager.
    Stop() error

    // GetDeviceRunContainerOptions checks whether we have cached containerDevices
    // for the passed-in <pod, container> and returns its DeviceRunContainerOptions
    // for the found one. An empty struct is returned in case no cached state is found.
    GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions

    // GetCapacity returns the amount of available device plugin resource capacity
    // and inactive device plugin resources previously registered on the node.
    GetCapacity() (v1.ResourceList, []string)
}

从注释中可以看到DeviceManager负责管理节点上运行的所有设备插件,这里分别定义了可以和外界交互的6个方法:

  • Start()Stop()分别是启动设备插件注册和停止服务,这其实K8S中的常见套路
  • Devices()以map的形式列出device列表

以下3个方法是比较核心的工作:

  • Allocate()为Pod分配可用的设备,并且调用设备插件进行所需的设备初始化
  • GetDeviceRunContainerOptions()获得为容器配置设备所需要的参数,比如Environment,Volume和Device,这个方法会用于创建容器的过程中
  • GetCapacity()用于节点向API Server上报Extended Resource的数量

当然要更清楚的理解,还需要结合具体场景中的调用链路进行理解。这里DeviceManager接口有两个实现分别是:MangerImplManagerStub, ManagerStub实际上是一个空实现,无需细看。下面简单了解一下 MangerImpl的实现

DeviceManager接口实现

所在的文件

pkg/kubelet/cm/deviceplugin/manager.go

具体定义:

// ManagerImpl is the structure in charge of managing Device Plugins.
type ManagerImpl struct {
    socketname string
    socketdir  string

    endpoints map[string]endpoint // Key is ResourceName
    mutex     sync.Mutex

    server *grpc.Server

    // activePods is a method for listing active pods on the node
    // so the amount of pluginResources requested by existing pods
    // could be counted when updating allocated devices
    activePods ActivePodsFunc

    // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
    // We use it to determine when we can purge inactive pods from checkpointed state.
    sourcesReady config.SourcesReady

    // callback is used for updating devices' states in one time call.
    // e.g. a new device is advertised, two old devices are deleted and a running device fails.
    callback monitorCallback

    // allDevices contains all of registered resourceNames and their exported device IDs.
    allDevices map[string]sets.String

    // allocatedDevices contains allocated deviceIds, keyed by resourceName.
    allocatedDevices map[string]sets.String

    // podDevices contains pod to allocated device mapping.
    podDevices podDevices
}

在ManagerImpl的定义和注释中,可以大致猜测它在做三件事:

  • 提供grpc的服务,支持多个Device Plugin的注册
  • 为Device Plugin提供回调函数monitorCallback,当设备的状态发生变化时,可以让Device Manager被通知,从而做一些相应的处理。比如当某个设备无法正常工作时,就需要将节点上可用资源总数减去一个
  • 设备的分配和管理,具体讲就是记录某种设备一共有哪几个,已经分配出去的是哪几个。从这里看,Device Plugin需要为每个设备提供一个UUID, 这个UUID需要在本节点唯一并且不可改变,而Device Manager要做的事情就是维护这个UUID的集合,并且负责设备更新和分配

场景分类

这里主要涉及五个场景:

  • Device Manager的初始化和启动
  • 接收Device Plugin的endpoint注册,并且向Endpoint查询Device ID列表
  • 定时上报节点上的设备信息
  • 创建Pod时,将设备信息与Pod结合,生成创建容器所需要的配置(Environment, Device, Volume)
  • 当设备状态不健康的时候,通知Kubelet更新可用设备的状态

本文首先分析场景一:Device Manager的初始化和启动过程

Device Manager的初始化和启动过程

Kubernetes的代码量巨大,但是细看每个模块的启动流程都有比较相似的套路,以Kubelet为例:

  1. 创建一个 KubeletServer 配置对象,这个对象保存着 kubelet 运行需要的所有配置信息
  2. 解析命令行,根据命令行的参数更新 KubeletServer
  3. 根据 KubeletServer 的配置创建真正的 kubelet 运行时对象
  4. 通过Start()方法启动该 kubelet 运行时对象

而DeviceManger的初始化就是发生在步骤3和步骤4

deviceManagerInit.png

  • app.kubelet对应的是cmd/kubelet/kubelet.go
  • server对应的是cmd/kubelet/app/server.go
  • kubelet对应的是pkg/kubelet/kubelet.go
  • container_manager_linux对应的是pkg/kubelet/cm/container_manager_linux.go
  • device.manager对应的是pkg/kubelet/cm/deviceplugin/manager.go

以上时序图就是Kubelet如何初始化和启动DeviceManager的流程(为了方便理解,这里会忽略和DeviceManager无关的方法)

可以看到serverrun()方法做两件事情:NewMainKubeletstartKubelet,而Device Manager的初始化与启动也是在这两个步骤中完成,同时启动grpc注册服务,这时Device Plugin就可以注册进来。

  1. DeviceManger的初始化是在创建ContainerManager对象时完成的,而ContainerManager对象作为NewMainKubelet创建Kubelet运行时对象的参数,

实际定义在:pkg/kubelet/cm/container_manager_linux.go

func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
...

glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
    if devicePluginEnabled {
        cm.devicePluginManager, err = deviceplugin.NewManagerImpl()
    } else {
        cm.devicePluginManager, err = deviceplugin.NewManagerStub()
    }

...
}

由于这个功能目前还比较新,需要通过feature gate打开, 即配置 --feature-gates=DevicePlugins=true,默认该功能是关闭的。当该功能打开时会调用deviceplugin.NewManagerImpl(),否则会有stub实现,不作任何事情。

deviceplugin.NewManagerImpl()定义在pkg/kubelet/cm/deviceplugin/manager.go内,

// NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) {
    return newManagerImpl(pluginapi.KubeletSocket)
}

实际上真正做初始的工作都是在下列方法完成的

func newManagerImpl(socketPath string) (*ManagerImpl, error) {
    glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)

    if socketPath == "" || !filepath.IsAbs(socketPath) {
        return nil, fmt.Errorf(errBadSocket+" %v", socketPath)
    }

    dir, file := filepath.Split(socketPath)
    manager := &ManagerImpl{
        endpoints:        make(map[string]endpoint),
        socketname:       file,
        socketdir:        dir,
        allDevices:       make(map[string]sets.String),
        allocatedDevices: make(map[string]sets.String),
        podDevices:       make(podDevices),
    }
    manager.callback = manager.genericDeviceUpdateCallback

    // The following structs are populated with real implementations in manager.Start()
    // Before that, initializes them to perform no-op operations.
    manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
    manager.sourcesReady = &sourcesReadyStub{}

    return manager, nil
}

这里只是做ManagerImpl的初始化,有意义的工作只有两个

  • 设置DeviceManager内置grpc服务的监听文件 socketPath, 由于DeviceManager和Device Plugin部署在同一个节点,所以只需要利用Unix Socket的模式通信
  • 设置设备状态的回调函数 genericDeviceUpdateCallback

就像注释中提到 The following structs are populated with real implementations in manager.Start()的一样,实际上在初始化阶段,并没有

  1. DeviceMangerStart()是在启动Kubelet运行时initializeModules调用的,具体还是ContainerManager`启动的一部分。
func (cm *containerManagerImpl) Start(node *v1.Node,
    activePods ActivePodsFunc,
    sourcesReady config.SourcesReady,
    podStatusProvider status.PodStatusProvider,
    runtimeService internalapi.RuntimeService) error {

...

// Starts device plugin manager.
    if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods), sourcesReady); err != nil {
        return err
    }
    return nil

}

这里会把活跃的pod列表以及pod元数据的来源(FILE, URL, api-server)作为输入用来启动DeviceManager, 这两个参数在启动的时候并没有用到

func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
    glog.V(2).Infof("Starting Device Plugin manager")

    m.activePods = activePods
    m.sourcesReady = sourcesReady

    // Loads in allocatedDevices information from disk.
    err := m.readCheckpoint()
    if err != nil {
        glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
    }

    socketPath := filepath.Join(m.socketdir, m.socketname)
    os.MkdirAll(m.socketdir, 0755)

    // Removes all stale sockets in m.socketdir. Device plugins can monitor
    // this and use it as a signal to re-register with the new Kubelet.
    if err := m.removeContents(m.socketdir); err != nil {
        glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
    }

    s, err := net.Listen("unix", socketPath)
    if err != nil {
        glog.Errorf(errListenSocket+" %+v", err)
        return err
    }

    m.server = grpc.NewServer([]grpc.ServerOption{}...)

    pluginapi.RegisterRegistrationServer(m.server, m)
    go m.server.Serve(s)

    glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)

    return nil
}

Start主要核心做两件事情:

  • m.readCheckpoint() 负责从本地checkpoint(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint)中获取已经注册和分配了的设备信息,为什么要这样做呢?这主要是因为Kubelet负责设备的分配和管理工作, 这些信息只存在于Kubelet的内存中。一旦Kubelet重启了之后,哪些设备已经分配了出去,以及这些分配出去的设备具体和哪个Pod关联

DeviceManager在每次分配设备给Pod后会将Pod和设备的映射关系以json格式记录到本地的一个文件

  • go m.server.Serve(s) 以后台grouting的方式启动grpc服务,这样就可以完成Device Plugin的注册,我们会在后面介绍grpc开放的服务如何与Device Plugin进行交互。

小结:

阅读开源源代码可以帮助我们提升技术水平, 不但能深入技术底层原理,快速理解技术架构;同样也可以帮助我们学习优秀的代码风格和设计模式。本文这里只是抛砖引玉,对Device Manager初始化场景进行了分析,后续我们也会对其他场景继续研究,加深对Kubernetes的Device Plugin机制的理解。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
2天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
21天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
3天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
1天前
|
Prometheus Kubernetes 监控
OpenAI故障复盘 - 阿里云容器服务与可观测产品如何保障大规模K8s集群稳定性
聚焦近日OpenAI的大规模K8s集群故障,介绍阿里云容器服务与可观测团队在大规模K8s场景下我们的建设与沉淀。以及分享对类似故障问题的应对方案:包括在K8s和Prometheus的高可用架构设计方面、事前事后的稳定性保障体系方面。
|
3天前
|
Kubernetes 网络协议 应用服务中间件
Kubernetes Ingress:灵活的集群外部网络访问的利器
《Kubernetes Ingress:集群外部访问的利器-打造灵活的集群网络》介绍了如何通过Ingress实现Kubernetes集群的外部访问。前提条件是已拥有Kubernetes集群并安装了kubectl工具。文章详细讲解了Ingress的基本组成(Ingress Controller和资源对象),选择合适的版本,以及具体的安装步骤,如下载配置文件、部署Nginx Ingress Controller等。此外,还提供了常见问题的解决方案,例如镜像下载失败的应对措施。最后,通过部署示例应用展示了Ingress的实际使用方法。
18 2
|
15天前
|
存储 Kubernetes 关系型数据库
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
本文源自2024云栖大会苏雅诗的演讲,探讨了K8s集群业务为何需要灾备及其重要性。文中强调了集群与业务高可用配置对稳定性的重要性,并指出人为误操作等风险,建议实施周期性和特定情况下的灾备措施。针对容器化业务,提出了灾备的新特性与需求,包括工作负载为核心、云资源信息的备份,以及有状态应用的数据保护。介绍了ACK推出的备份中心解决方案,支持命名空间、标签、资源类型等维度的备份,并具备存储卷数据保护功能,能够满足GitOps流程企业的特定需求。此外,还详细描述了备份中心的使用流程、控制台展示、灾备难点及解决方案等内容,展示了备份中心如何有效应对K8s集群资源和存储卷数据的灾备挑战。
|
1月前
|
Kubernetes 监控 Cloud Native
Kubernetes集群的高可用性与伸缩性实践
Kubernetes集群的高可用性与伸缩性实践
73 1
|
2月前
|
JSON Kubernetes 容灾
ACK One应用分发上线:高效管理多集群应用
ACK One应用分发上线,主要介绍了新能力的使用场景

相关产品

  • 容器服务Kubernetes版
  • 推荐镜像

    更多