今年三月中旬containerd 1.7 版本发布,关注到一个新特性NRI(Node Resource Interface),在调研NRI之前,首先我们需要回顾下,一个Pod是如何启动的。
一. Pod 创建与调度
scheduling cycle and binding cycle
二. Kubelet(1.25 版本)
在调度器调度到该节点,ApiServer写入etcd后,kubelet-syncLoop()
通过 List&Watch机制监听Apisercer获取pod的事件和状态,首先检测资源是否充足,若不充足返回outofcpu,比照是否满足节点的亲和性,节点名称是否与本节点相同等。
podWorker 感知到pod到更新事件后,执行kubelet-syncPod()
,通过cri(Container Runtime Interface),获取当前节点pod对应的容器是不是在启动的状态。通过期望的pod状态与目前的比照,如果没有启动,检测网络插件的状态是否就绪,提前创建pod cgroups,创建POD的目录(pod volume plugin等),检测存储是否就绪(attach or mount),判断是否重新创建SandBox等。检查完后调用kubeGenericRuntimeManager-syncpod()
然后kubeGenericRuntimeManager-syncpod()
就会请求CRI去启动这些容器进程,创建pod日志目录,通过remote Container Runtime调用containerd的Runtime service下的RunPodSandbox
接口,接受到返回后请求createcontainer
接口,接受到请求返回后,请求startcontainer
接口。
三. contianerd-CRI(1.7.5版本):
containerd是cs架构,实现了kubernetes 的 cri接口规范,被kubelet调用了Runtime service下的RunPodSandbox接口:
RunPodSandbox
: 拉取pause镜像,若容器网络不是hostnetwork,则通过CNI的SetUp创建容器网络(Net Namespace),为当前pod创建一个主目录,启动shim server,创建sandbox的task,拉起pause容器,在sandboxStore存储sandbox对象,返回sandboxid给kubelet。createcontainer
:接收每个容器的配置,并为其创建容器实例,获取sandbox,判断本地是否有该容器的镜像,生成ContainerConfig(容器id,镜像等),创建container目录,然后合并 Pod Sandbox 和 ContainerConfig 生成容器的完整配置规范OCI Spec,在OCI Spec 中配置 container 的 Linux Namespace 参数,使用OCI Spec,runtime,sandbox等创建对应的容器(调用containerd container service),返回容器对象的容器id给kubelet。containerd container service
,Container 容器对象创建(仅仅创建对象,并非实质的容器创建),是在containerd metadata containerStore中生成container的数据对象,并发布/containers/create事件。startcontainer
: 根据sandboxid和containerid获取这些对象,创建容器的task,并启动task,调用task service。Task service-start
通过请求shim start,来完成容器的创建。Containerd-shim
通过执行runc create 和 runc start 命令来完成容器的创建- runc create :读取符合OCI标准的容器配置,再将其转换成与
libcontainer
兼容的格式,libcontainer创建一个路径为/run/runc/$ID/exec.fifo
的管道文件,然后根据配置创建ParentProcess
对象,Parentproces
创建runc init
子进程,用init管道将容器配置信息传递给run init
进程,这时会被/runc/libcontainer/nsenter
劫持,对容器的namespace进行配置,使runc init
子进程位于容器配置指定的各个namespace内。run init会根据容器初始化流程,创建容器cgroup,修改cgroup为指定值,对容器网络,路由,hostname,环境变量进行配置,创建容器文件系统等。容器初始化完成,往exec.fifo执行写操作,然后处于阻塞状态,直到用户执行runc start后,通过系统调用执行exec,执行用户进程,系统调用后从内核空间切换回用户空间,容器创建成功。
runc start:获取容器,识别容器状态为created后,开始exec操作,在exec.fifo执行读操作。
总之,RunPodSandBox会创建Pod的网络,CreateContainer会生成OCI spec,创建Container对象,StartContainer会根据spec创建指定容器,创建并修改容器cgroup,配置容器网络,将容器划分到指定namespace,切换至指定用户,传递环境变量等。
其他:
所有流程:
源码查看:
kubelet.go /pkg/kubelet/kubelet.go 通过podworker触发 syncpod
资源检查-node resourcece-plugin
原理解析:
NRI 代码
Kubelet 架构解析
CRI 规范
二. CRI 拓展接口-NRI
了解完Pod创建的流程后,我们针对pod的生命周期和容器的生命周期探讨NRI。
源码Docs(人翻非机翻):
什么是NRI
NRI允许开发者将自定义逻辑插入OCI兼容的运行时,也可以在容器生命中期中的特定点,执行oci范围之外的额外操作,例如可以在容器创建前,更改容器资源的分配。
在initial oci spec
阶段,重新访问的API是NRI的一次重大重新,改变了NRI的范围以及如何将其集成到runtimes中,NRI Plugins 描述了开发者的自定义逻辑,它与runtime互相通信,并定义了对容器需要进行怎样的修改。
使用了类似OCI钩子的一次性插件调用机制,为每个NRI事件生成一个单独的插件实例。NRI Plugins类似于守护进程的实体,负责处理NRI事件和NRI请求,NRI Adaptation与NRI Plugin之间使用unix套接字通信,是一个基于protobuf的NRI 插件协议,该协议被编译为ttRPC。提高通信效率,降低每条消息的开销。
NRI plugin 注册
NRI-Runtim负责提供 注册plugin,自发更新容器,这些接口由NRI Plugin请求,NRI Plugin 提供 配置plugin,获取以及存在的容器的init spec,将插件挂接到pod或者容器的生命周期中,关闭插件,这些都是由NRI-Runtime请求的。
在NRI Plugins向NRI-Runtime注册时:
- NRI Plugins向NRI-Runtime标识自己
- NRI-Runtime为NRI Plugins提供特定于插件的配置数据
- NRI Plugins订阅感兴趣的pod和容器生命周期事件
- NRI-Runtime向NRI Plugins发送现有pod和容器的列表
- NRI Plugins请求对现有容器进行任何必要的更新
- 当NRI Plugins向NRI-Runtime注册后,它将根据订阅开始接收pod和容器生命周期事件。
NRI plugin 执行
在注册时NRI-Runtime会给plugin生成插件索引(plugin index),在容器的生命周期处理事件的执行顺序由插件索引来决定,如下图,外圈是pod的生命周期,内圈是容器的生命周期。
订阅pod生命周期
NRI 插件可以订阅如下pod生命周期:
creation stopping removal
NRI插件可以获取如下pod 元数据
ID name UID namespace labels annotations cgroup parent directory runtime handler name
订阅容器生命周期
NRI 插件可以订阅如下容器生命周期:
creation (*) post-creation starting post-start updating (*) post-update stopping (*) removal
(*) 的生命周期表示NRI Plugin 可以请求对容器进行调整(adjustment)或更新(updates)以响应这些事件。
ID pod ID name state labels annotations command line arguments environment variables mounts OCI hooks rlimits linux namespace IDs devices resources memory limit reservation swap limit kernel limit kernel TCP limit swappiness OOM disabled flag hierarchical accounting flag hugepage limits CPU shares quota period realtime runtime realtime period cpuset CPUs cpuset memory Block I/O class RDT class
除了识别容器的数据外,这些信息还代表了容器的OCI规格中的相应数据。
容器调整(adjust)
在容器创建过程中,插件可以请求对以下容器参数进行更改。
- annotations - mounts - environment variables - OCI hooks - rlimits - linux - devices - resources - memory - limit - reservation - swap limit - kernel limit - kernel TCP limit - swappiness - OOM disabled flag - hierarchical accounting flag - hugepage limits - CPU - shares - quota - period - realtime runtime - realtime period - cpuset CPUs - cpuset memory - Block I/O class - RDT class
容器更新(update):
一旦创建了容器,插件就可以请求对其进行更新。这些更新可以响应于另一个容器创建请求、响应于任何容器更新请求、响应任何容器停止请求而被请求,或者它们可以作为自发的容器更新请求的一部分而被请求。以下容器参数可以通过这种方式更新:
- resources - memory - limit - reservation - swap limit - kernel limit - kernel TCP limit - swappiness - OOM disabled flag - hierarchical accounting flag - hugepage limits - CPU - shares - quota - period - realtime runtime - realtime period - cpuset CPUs - cpuset memory - Block I/O class - RDT class
NRI Adaptation
NRI runtime adaptation 实现了基本的插件发现、启动和配置。它还提供了从运行时将NRI插件hook到pod和容器的生命周期事件中所需的功能。是NRI runtime与NRI plugins之间通信的具体实现,还负责按正确的顺序调用插件,并将多个插件的响应组合为一个插件。
个人理解:
Containerd CRI-NRI 源码分析
从源码角度分析,如何修改的,也就是nri是如何融入containerd的
Nri continaerd/pkg/cri/nri/nri_api_linux.go
criService continaerd/pkg/nri
- 有设置超时事件,若插件处理超时,则不再处理。
- nri注入contianerd,在容器的生命周期运行相应的plugin。
NRI与其他hook的对比
webhook:
https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/webhook/
https://www.jianshu.com/p/1db3586950db
ocihook:一般谈及如何在Pod的生命周期执行自定义逻辑,我们很容易想到pod的lifecycle。配置HttpGet请求开发者自己开发的服务。
...... lifecycle: postStart: httpGet: path: #uri地址 port: host: scheme: HTTP #支持的协议,http或者https ......
若是针对容器的修改,这种方式会显得比较鸡肋,首先请求需要发送容器的描述信息,对接口的实现有一定的要求,而且不能深入容器的生命周期,仅仅在Pod的生命周期。
webhook:定义两种类型的准入 Webhook, 即验证性质的准入 Webhook 和变更性质的准入 Webhook。可以监听pod对象,并做出相应的自定义逻辑,但是若service挂掉,pod可能起不来。
NRI-安全隐患
虽然nri有自己的优势,但是目前能获取socket_path(默认 /var/run/nri/nri.sock) 指定sock文件的进程,均能注册成功,在containerd注册plugin,可能带来不可挽回的损失,可以修改文件的读取权限,来限制非root用户,目前已知版本未提供鉴权。
三. 官方案例与环境搭建
3.1 环境搭建
更换containerd版本大于1.7(通过 kubectl get node -o wide 可以查看节点的容器运行时和它的版本),修改Containerd的NRI配置,开启NRI,编写NRI plugin,注册插件。
3.2 官方示例 Example
官方demo:NRI-Plugins
logger differ device injector OCI hook injector ulimit adjuster NRI v0.1.0 plugin adapter
四. 如何编写NRI实例
4.1 通过containerd和nri源码查看plugin如何注册
初始化流程:
0.1 CRI在初始化时,运行adaptation,adaptation一方面读取默认socket_path,配置nri plugin,另一方面暴露RegisterPlugin接口,等待plugin注册。
0.2 用户写plugin,配置pluginName,pluginIdx,socketpath等,实现stub对象,调用stub.run()
注册流程:
1.1 stub.run()->stub.connect() 向adaptation标识自己
2.1 adaptation.connect() 为plugin开放RegisterPlugin 和 UpdateContainers 接口
3.1 stub.run -> stub.start -> stub.register -> 通过ttRPC与adaptation通信,调用RegisterPlugin实现注册,订阅感兴趣的pod和容器生命周期事件
4.1 adaptation.RegisterPlugin 接口被调用后,注册该插件,然后调用syncFn -> syncPlugin -> 发送当前节点所有的pod 和 containers 通过 plugin 的 synchronize接口,发送给plugin。
5.1 plugin 执行synchronize ,根据stub.filter 过滤pod,将指定pod执行更新操作,后续对订阅的pod执行相关操作
4.2 日志读写(官方)
在容器的生命周期和pod的生命周期打印pod 和 container信息
package main import ( "context" "flag" "fmt" "os" "strings" "github.com/sirupsen/logrus" "sigs.k8s.io/yaml" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" ) type config struct { LogFile string `json:"logFile"` Events []string `json:"events"` AddAnnotation string `json:"addAnnotation"` SetAnnotation string `json:"setAnnotation"` AddEnv string `json:"addEnv"` SetEnv string `json:"setEnv"` } type plugin struct { stub stub.Stub mask stub.EventMask } var ( cfg config log *logrus.Logger _ = stub.ConfigureInterface(&plugin{}) ) func (p *plugin) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { log.Infof("got configuration data: %q from runtime %s %s", config, runtime, version) if config == "" { return p.mask, nil } oldCfg := cfg err := yaml.Unmarshal([]byte(config), &cfg) err := yaml.Unmarshal([]byte(config), &cfg) if err != nil { return 0, fmt.Errorf("failed to parse provided configuration: %w", err) } p.mask, err = api.ParseEventMask(cfg.Events...) if err != nil { return 0, fmt.Errorf("failed to parse events in configuration: %w", err) } if cfg.LogFile != oldCfg.LogFile { f, err := os.OpenFile(cfg.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Errorf("failed to open log file %q: %v", cfg.LogFile, err) return 0, fmt.Errorf("failed to open log file %q: %w", cfg.LogFile, err) } log.SetOutput(f) } return p.mask, nil } func (p *plugin) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) ([]*api.ContainerUpdate, error) { dump("Synchronize", "pods", pods, "containers", containers) return nil, nil } func (p *plugin) Shutdown() { dump("Shutdown") } func (p *plugin) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error { dump("RunPodSandbox", "pod", pod) return nil } func (p *plugin) StopPodSandbox(_ context.Context, pod *api.PodSandbox) error { dump("StopPodSandbox", "pod", pod) return nil } func (p *plugin) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error { dump("RemovePodSandbox", "pod", pod) return nil } func (p *plugin) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { dump("CreateContainer", "pod", pod, "container", container) adjust := &api.ContainerAdjustment{} if cfg.AddAnnotation != "" { adjust.AddAnnotation(cfg.AddAnnotation, fmt.Sprintf("logger-pid-%d", os.Getpid())) } if cfg.SetAnnotation != "" { adjust.RemoveAnnotation(cfg.SetAnnotation) adjust.AddAnnotation(cfg.SetAnnotation, fmt.Sprintf("logger-pid-%d", os.Getpid())) } if cfg.AddEnv != "" { adjust.AddEnv(cfg.AddEnv, fmt.Sprintf("logger-pid-%d", os.Getpid())) } if cfg.SetEnv != "" { adjust.RemoveEnv(cfg.SetEnv) adjust.AddEnv(cfg.SetEnv, fmt.Sprintf("logger-pid-%d", os.Getpid())) } return adjust, nil, nil } func (p *plugin) PostCreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) error { dump("PostCreateContainer", "pod", pod, "container", container) return nil } func (p *plugin) StartContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) error { dump("StartContainer", "pod", pod, "container", container) return nil } func (p *plugin) PostStartContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) error { dump("PostStartContainer", "pod", pod, "container", container) return nil } func (p *plugin) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources) ([]*api.ContainerUpdate, error) { dump("UpdateContainer", "pod", pod, "container", container, "resources", r) return nil, nil } func (p *plugin) PostUpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) error { dump("PostUpdateContainer", "pod", pod, "container", container) return nil } func (p *plugin) StopContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) ([]*api.ContainerUpdate, error) { dump("StopContainer", "pod", pod, "container", container) return nil, nil } func (p *plugin) RemoveContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) error { dump("RemoveContainer", "pod", pod, "container", container) return nil } func (p *plugin) onClose() { os.Exit(0) } // Dump one or more objects, with an optional global prefix and per-object tags. func dump(args ...interface{}) { var ( prefix string idx int ) if len(args)&0x1 == 1 { prefix = args[0].(string) idx++ } for ; idx < len(args)-1; idx += 2 { tag, obj := args[idx], args[idx+1] msg, err := yaml.Marshal(obj) if err != nil { log.Infof("%s: %s: failed to dump object: %v", prefix, tag, err) continue } if prefix != "" { log.Infof("%s: %s:", prefix, tag) for _, line := range strings.Split(strings.TrimSpace(string(msg)), "\n") { log.Infof("%s: %s", prefix, line) } } else { log.Infof("%s:", tag) for _, line := range strings.Split(strings.TrimSpace(string(msg)), "\n") { log.Infof(" %s", line) } } } } func main() { var ( pluginName string pluginIdx string events string opts []stub.Option err error ) log = logrus.StandardLogger() log.SetFormatter(&logrus.TextFormatter{ PadLevelText: true, }) flag.StringVar(&pluginName, "name", "", "plugin name to register to NRI") flag.StringVar(&pluginIdx, "idx", "", "plugin index to register to NRI") flag.StringVar(&events, "events", "all", "comma-separated list of events to subscribe for") flag.StringVar(&cfg.LogFile, "log-file", "", "logfile name, if logging to a file") flag.StringVar(&cfg.AddAnnotation, "add-annotation", "", "add this annotation to containers") flag.StringVar(&cfg.SetAnnotation, "set-annotation", "", "set this annotation on containers") flag.StringVar(&cfg.AddEnv, "add-env", "", "add this environment variable for containers") flag.StringVar(&cfg.SetEnv, "set-env", "", "set this environment variable for containers") flag.Parse() if cfg.LogFile != "" { f, err := os.OpenFile(cfg.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Fatalf("failed to open log file %q: %v", cfg.LogFile, err) } log.SetOutput(f) } if pluginName != "" { opts = append(opts, stub.WithPluginName(pluginName)) } if pluginIdx != "" { opts = append(opts, stub.WithPluginIdx(pluginIdx)) } p := &plugin{} if p.mask, err = api.ParseEventMask(events); err != nil { log.Fatalf("failed to parse events: %v", err) } cfg.Events = strings.Split(events, ",") if p.stub, err = stub.New(p, append(opts, stub.WithOnClose(p.onClose))...); err != nil { log.Fatalf("failed to create plugin stub: %v", err) } err = p.stub.Run(context.Background()) if err != nil { log.Errorf("plugin exited with error %v", err) os.Exit(1) } }
4.3 Cgroup修改
package main import ( "context" "flag" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" "github.com/sirupsen/logrus" "os" ) var ( log *logrus.Logger verbose bool ) // our injector plugin type plugin struct { stub stub.Stub } // CreateContainer handles container creation requests. func (p *plugin) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { adjust := &api.ContainerAdjustment{} adjust.SetLinuxCPUSetCPUs("1") // time.Sleep(20 * time.Second) 验证超时 return adjust, nil, nil } func main() { var ( pluginName string pluginIdx string opts []stub.Option err error ) log = logrus.StandardLogger() log.SetFormatter(&logrus.TextFormatter{ PadLevelText: true, }) flag.StringVar(&pluginName, "name", "cgrpupEdit", "plugin name to register to NRI") flag.StringVar(&pluginIdx, "idx", "", "plugin index to register to NRI") flag.BoolVar(&verbose, "verbose", false, "enable (more) verbose logging") flag.Parse() if pluginName != "" { opts = append(opts, stub.WithPluginName(pluginName)) } if pluginIdx != "" { opts = append(opts, stub.WithPluginIdx(pluginIdx)) } p := &plugin{} if p.stub, err = stub.New(p, opts...); err != nil { log.Fatalf("failed to create plugin stub: %v", err) } err = p.stub.Run(context.Background()) if err != nil { log.Errorf("plugin exited with error %v", err) os.Exit(1) } }
// 注册plugin go run cgroupEdit.go -idx 00 // 启动一个容器调度到该节点
4.4 device-injector
如果一个容器需要使用GPU,那么它需要将宿主机上的GPU设备以及与GPU设备相关的库文件、可执行文件等挂载到容器中,该容器才能正常使用GPU。
为了使得容器能使用GPU等资源,kubernetes提出CDI,CDI 是支持挂载第三方设备(比如:GPU、FPGA等)机制,nvidia device plugin是nvidia在CDI的实现,用户可以在resource写nvidia.com/gpu: 1,表示需要的GPU,kubelet请求device plugin,在执行allocate()函数后,device plugin 返回包含设备路径和驱动目录信息的response 给 kubelet,kubelet在容器的startcontainer挂载设备路径和驱动到容器中,但是停留在数量上的选择,并不能选择特定的GPU。
NRI提供了另外一种实现方式,但是绕开了官方提供的device plugin机制,且在运行时,NRI runtime请求plugin超时,不会再次请求,导致分配不到GPU,但是pod可能会是运行的状态,不符合预期。
// CreateContainer handles container creation requests. func (p *plugin) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { var ( ctrName string devices []device mounts []mount err error ) ctrName = containerName(pod, container) if verbose { dump("CreateContainer", "pod", pod, "container", container) } adjust := &api.ContainerAdjustment{} // inject devices to container devices, err = parseDevices(container.Name, pod.Annotations) if err != nil { return nil, nil, err } if len(devices) == 0 { log.Infof("%s: no devices annotated...", ctrName) } else { if verbose { dump(ctrName, "annotated devices", devices) } for _, d := range devices { adjust.AddDevice(d.toNRI()) if !verbose { log.Infof("%s: injected device %q...", ctrName, d.Path) } } } // inject mounts to container mounts, err = parseMounts(container.Name, pod.Annotations) if err != nil { return nil, nil, err } if len(mounts) == 0 { log.Infof("%s: no mounts annotated...", ctrName) } else { if verbose { dump(ctrName, "annotated mounts", mounts) } for _, m := range mounts { adjust.AddMount(m.toNRI()) if !verbose { log.Infof("%s: injected mount %q -> %q...", ctrName, m.Source, m.Destination) } } } if verbose { dump(ctrName, "ContainerAdjustment", adjust) } return adjust, nil, nil }
总结:
NRI使得我们能在容器的生命周期和Pod的生命周期执行自定义逻辑,除了有安全隐患,还是可以适配很多场景的,体验下来和自定义的调度器还是有很大的相似之处,期待社区更新更多的使用场景。
Realease
- Koordinator
- 官方文档
- 【知乎】Containerd深度剖析-NRI篇
- Kubecon EU Introduction-containerd
- K8S 生态周报| Docker v24.0.0-beta.1 发布
- Containerd 的前世今生和保姆级入门教程
- Containerd shim 原理深入解读
- 技术干货|Docker和 Containerd 的区别,看这一篇就够了
- 初识containerd(一)
- Containerd深度剖析-NRI篇
- Containerd深度剖析-NRI篇-wx
- Containerd深度剖析-runtime篇-wx
- KubeCon-NA-2022-NRI
- NRI Support In Containerd
- Containerd NRI 插件
- containerd系列——什么是containerd?
- Container Runtime CDI与NRI介绍-阿里云开发者社区
- 容器运行时的内部结构和最新趋势(2023)下-阿里云开发者社区
- pod lifecycle