CRD(Custom Resource Definition)+ Operator 模式是一种用于扩展 Kubernetes 功能的方法。
CRD 允许用户自定义 Kubernetes 资源类型。它就像是在 Kubernetes 中定义了一种新的“物种”,使得用户可以根据自己的需求创建和管理特定类型的资源。
Operator 则是描述、部署和管理 Kubernetes 应用的一套机制。从实现上来说,它可以理解为 CRD 配合可选的 Webhook 与 Controller 来实现用户业务逻辑,即 Operator = CRD + Webhook + Controller。
常见的工作流程如下:
- 用户创建一个自定义资源(CRD)。
- API Server 根据自己注册的规则列表,把该 CRD 的请求转发给 Webhook。Webhook 一般会完成该 CRD 的缺省值设定和参数检验等工作。经过 Webhook 处理后,相应的 CR(Custom Resource,即 CRD 的一个具体实例)会被写入数据库,并返回给用户。
- 与此同时,Controller 会在后台监测该自定义资源,按照业务逻辑,处理与该自定义资源相关联的特殊操作。
- 上述处理一般会引起集群内的状态变化,Controller 会监测这些关联的变化,并把这些变化记录到 CRD 的状态中。
这种模式的优点包括:
- 灵活性高:可以根据具体业务需求定制资源和操作逻辑。
- 简化管理:将特定业务的管理逻辑封装在 Operator 中,使得用户只需关注自定义资源的定义和期望状态,而无需了解复杂的底层实现细节。
- 可扩展性强:方便扩展 Kubernetes 以支持更复杂的应用场景。
```js
package main
import (
"flag"
"time"
"context"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
// 这里使用的是 app-controller 包,你需要根据实际情况修改为你自己的包路径
clientset "app-controller/pkg/generated/clientset/versioned"
informers "app-controller/pkg/generated/informers/externalversions"
)
var (
masterUrl string
kubeConfig string
)
func main() {
klog.InitFlags(nil)
flag.Parse()
// 设置信号处理,以便优雅地处理关闭信号
ctx := signals.SetupSignalHandler()
logger := klog.FromContext(ctx)
// 创建 Kubernetes 客户端配置
config, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeConfig)
if err!= nil {
klog.Fatalf("Error building kubeconfig: %v", err)
}
// 创建 Kubernetes 客户端
kubeClient, err := kubernetes.NewForConfig(config)
if err!= nil {
klog.Fatalf("Error creating kubernetes clientset: %v", err)
}
// 创建自定义资源的客户端
customClient, err := clientset.NewForConfig(config)
if err!= nil {
klog.Fatalf("Error creating custom clientset: %v", err)
}
// 创建 informer factory,用于监听 Kubernetes 资源的事件
informerFactory := informers.NewSharedInformerFactory(kubeClient, time.Minute)
// 注册自定义资源的 informer,并添加事件处理函数
appInformer := informerFactory.Appcontroller().V1alpha1().Apps().Informer()
appInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 处理添加自定义资源的事件,创建相关的 Deployment、Service 和 Ingress
c := &controller{
kubeClient: kubeClient,
customClient: customClient,
appLister: appInformer.Lister(),
}
c.onCreate(obj.(*v1alpha1.App))
},
UpdateFunc: func(oldObj, newObj interface{}) {
// 处理更新自定义资源的事件
c := &controller{
kubeClient: kubeClient,
customClient: customClient,
appLister: appInformer.Lister(),
}
c.onUpdate(oldObj.(*v1alpha1.App), newObj.(*v1alpha1.App))
},
DeleteFunc: func(obj interface{}) {
// 处理删除自定义资源的事件,清理相关资源
// 这里根据实际需求添加清理逻辑
},
})
// 启动 informer
informerFactory.Start(ctx.Done())
// 等待 informer 的缓存同步完成
if!cache.WaitForCacheSync(ctx.Done(), appInformer.HasSynced) {
klog.Fatal("Failed to sync caches")
}
// 运行自定义资源的控制器
controller := &controller{
kubeClient: kubeClient,
customClient: customClient,
appLister: appInformer.Lister(),
}
go wait.Forever(func() {
controller.run(ctx.Done())
}, time.Second)
// 阻塞等待信号
<-ctx.Done()
}
```