Kubernetes引入的Devic Plugin,通过扩展机制实现支持GPU、FPGA、高性能 NIC、InfiniBand等各种设备的集成,而Device Manager正是Kubelet内负责Device Plugin交互和设备生命周期管理的模块,在了解其基本设计后,我们需要通过对Device Manager的源码分析深入理解其运作方式。
首先明确目标:
并不是搞懂Kubelet的所有实现,而是希望理解Device Manager如何在资源发现,Pod创建,设备健康检查过程中所做的工作以及其如何与Kubelet交互,所以我们会忽略掉与Device Manager无关的操作。
这里是我阅读代码的原则和一些体会:
- 理解接口,搞清楚和外部模块的交互
- 理解实现接口的结构体
- 从用户场景的角度将方法调用和数据结构关联起来,好比将剧情和人物串联起来,了解了人物设定后,就可以更快速切入代码的调用过程;而代码调用也可以加深对数据结构设计的理解
- Kubernetes的代码比较复杂,很难一下就搞清楚看到的每一个数据结构和实现,这时我们可以把问题和假设记下来,不要过分纠结,可以在后面求证。书读百变其意自现,代码也是一样,当你逐渐熟悉了代码的脉络的时候,有些问题也会迎刃而解
- 由于Device Manager工作在Kubelet中,对于Kubelet的源码通篇的了解还是必要的
(Kubelet版本: 1.9.3)
DeviceManager的核心代码都在 pkg/kubelet/cm/deviceplugin
下
DeviceManager接口定义
所在的文件
pkg/kubelet/cm/deviceplugin/types.go
定义:
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins.
Devices() map[string][]pluginapi.Device
// Allocate configures and assigns devices to pods. The pods are provided
// through the pod admission attributes in the attrs argument. From the
// requested device resources, Allocate will communicate with the owning
// device plugin to allow setup procedures to take place, and for the
// device plugin to provide runtime settings to use the device (environment
// variables, mount points and device files). The node object is provided
// for the device manager to update the node capacity to reflect the
// currently available devices.
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
// Stop stops the manager.
Stop() error
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
// GetCapacity returns the amount of available device plugin resource capacity
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, []string)
}
从注释中可以看到DeviceManager负责管理节点上运行的所有设备插件,这里分别定义了可以和外界交互的6个方法:
Start()
和Stop()
分别是启动设备插件注册和停止服务,这其实K8S中的常见套路Devices()
以map的形式列出device列表
以下3个方法是比较核心的工作:
Allocate()
为Pod分配可用的设备,并且调用设备插件进行所需的设备初始化GetDeviceRunContainerOptions()
获得为容器配置设备所需要的参数,比如Environment,Volume和Device,这个方法会用于创建容器的过程中GetCapacity()
用于节点向API Server上报Extended Resource的数量
当然要更清楚的理解,还需要结合具体场景中的调用链路进行理解。这里DeviceManager接口有两个实现分别是:MangerImpl
和 ManagerStub
, ManagerStub实际上是一个空实现,无需细看。下面简单了解一下 MangerImpl
的实现
DeviceManager接口实现
所在的文件
pkg/kubelet/cm/deviceplugin/manager.go
具体定义:
// ManagerImpl is the structure in charge of managing Device Plugins.
type ManagerImpl struct {
socketname string
socketdir string
endpoints map[string]endpoint // Key is ResourceName
mutex sync.Mutex
server *grpc.Server
// activePods is a method for listing active pods on the node
// so the amount of pluginResources requested by existing pods
// could be counted when updating allocated devices
activePods ActivePodsFunc
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can purge inactive pods from checkpointed state.
sourcesReady config.SourcesReady
// callback is used for updating devices' states in one time call.
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback
// allDevices contains all of registered resourceNames and their exported device IDs.
allDevices map[string]sets.String
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
allocatedDevices map[string]sets.String
// podDevices contains pod to allocated device mapping.
podDevices podDevices
}
在ManagerImpl的定义和注释中,可以大致猜测它在做三件事:
- 提供grpc的服务,支持多个Device Plugin的注册
- 为Device Plugin提供回调函数
monitorCallback
,当设备的状态发生变化时,可以让Device Manager被通知,从而做一些相应的处理。比如当某个设备无法正常工作时,就需要将节点上可用资源总数减去一个 - 设备的分配和管理,具体讲就是记录某种设备一共有哪几个,已经分配出去的是哪几个。从这里看,Device Plugin需要为每个设备提供一个UUID, 这个UUID需要在本节点唯一并且不可改变,而Device Manager要做的事情就是维护这个UUID的集合,并且负责设备更新和分配
场景分类
这里主要涉及五个场景:
- Device Manager的初始化和启动
- 接收Device Plugin的endpoint注册,并且向Endpoint查询Device ID列表
- 定时上报节点上的设备信息
- 创建Pod时,将设备信息与Pod结合,生成创建容器所需要的配置(Environment, Device, Volume)
- 当设备状态不健康的时候,通知Kubelet更新可用设备的状态
Device Manager的初始化和启动过程
本文首先分析Device Manager的初始化和启动过程
Kubernetes的代码量巨大,但是细看每个模块的启动流程都有比较相似的套路,以Kubelet为例:
- 创建一个
KubeletServer
配置对象,这个对象保存着 kubelet 运行需要的所有配置信息 - 解析命令行,根据命令行的参数更新
KubeletServer
- 根据
KubeletServer
的配置创建真正的kubelet
运行时对象 - 通过
Start()
方法启动该kubelet
运行时对象
而DeviceManger的初始化就是发生在步骤3和步骤4
app.kubelet
对应的是cmd/kubelet/kubelet.go
server
对应的是cmd/kubelet/app/server.go
kubelet
对应的是pkg/kubelet/kubelet.go
container_manager_linux
对应的是pkg/kubelet/cm/container_manager_linux.go
device.manager
对应的是pkg/kubelet/cm/deviceplugin/manager.go
以上时序图就是Kubelet如何初始化和启动DeviceManager的流程(为了方便理解,这里会忽略和DeviceManager无关的方法)
可以看到server
中run()
方法做两件事情:NewMainKubelet
和startKubelet
,而Device Manager的初始化与启动也是在这两个步骤中完成,同时启动grpc注册服务,这时Device Plugin就可以注册进来。
DeviceManger
的初始化是在创建ContainerManager
对象时完成的,而ContainerManager
对象作为NewMainKubelet
创建Kubelet
运行时对象的参数,
实际定义在:pkg/kubelet/cm/container_manager_linux.go
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
...
glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.devicePluginManager, err = deviceplugin.NewManagerImpl()
} else {
cm.devicePluginManager, err = deviceplugin.NewManagerStub()
}
...
}
由于这个功能目前还比较新,需要通过feature gate打开, 即配置 --feature-gates=DevicePlugins=true,默认该功能是关闭的。当该功能打开时会调用deviceplugin.NewManagerImpl()
,否则会有stub实现,不作任何事情。
deviceplugin.NewManagerImpl()
定义在pkg/kubelet/cm/deviceplugin/manager.go
内,
// NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket)
}
实际上真正做初始的工作都是在下列方法完成的
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %v", socketPath)
}
dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpoint),
socketname: file,
socketdir: dir,
allDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
// The following structs are populated with real implementations in manager.Start()
// Before that, initializes them to perform no-op operations.
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
}
这里只是做ManagerImpl的初始化,有意义的工作只有两个
- 设置DeviceManager内置grpc服务的监听文件
socketPath
, 由于DeviceManager和Device Plugin部署在同一个节点,所以只需要利用Unix Socket的模式通信 - 设置设备状态的回调函数
genericDeviceUpdateCallback
DeviceManger
的Start()
是在启动Kubelet运行时
initializeModules调用的,具体还是
ContainerManager`启动的一部分。
func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {
...
// Starts device plugin manager.
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods), sourcesReady); err != nil {
return err
}
return nil
}
这里会把活跃的pod列表以及pod元数据的来源(FILE, URL, api-server)作为输入用来启动DeviceManager, 这两个参数在启动的时候并没有用到
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
glog.V(2).Infof("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
if err != nil {
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
socketPath := filepath.Join(m.socketdir, m.socketname)
os.MkdirAll(m.socketdir, 0755)
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
}
s, err := net.Listen("unix", socketPath)
if err != nil {
glog.Errorf(errListenSocket+" %+v", err)
return err
}
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterRegistrationServer(m.server, m)
go m.server.Serve(s)
glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
return nil
}
Start
主要核心做两件事情:
m.readCheckpoint()
负责从本地checkpoint(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint)中获取已经注册和分配了的设备信息,为什么要这样做呢?这主要是因为Kubelet负责设备的分配和管理工作, 这些信息只存在于Kubelet的内存中。一旦Kubelet重启了之后,哪些设备已经分配了出去,以及这些分配出去的设备具体和哪个Pod关联
DeviceManager在每次分配设备给Pod后会将Pod和设备的映射关系以json格式记录到本地的一个文件
go m.server.Serve(s)
以后台grouting的方式启动grpc服务,这样就可以完成Device Plugin的注册
小结:
阅读开源源代码可以帮助我们提升技术水平, 不但能深入技术底层原理,快速理解技术架构;同样也可以帮助我们学习优秀的代码风格和设计模式。本文这里只是抛砖引玉,对Device Manager初始化场景进行了分析,后续我们也会对其他场景继续研究,接续加深对Kubernetes的Device Plugin机制的理解。