8、创建生成代码的脚本
以下代码主要参考
sample-controller
【3】
(1)在项目根目录下,创建hack目录,代码生成的脚本配置在该目录下
# mkdir hack && cd hack
(2)创建tools.go文件,添加 code-generator 依赖
//go:build tools // +build tools // This package imports things required by build scripts, to force `go mod` to see them as dependencies package tools import _ "k8s.io/code-generator"
(3)创建update-codegen.sh文件,用来生成代码
#!/usr/bin/env bash set -o errexit set -o nounset set -o pipefail SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)} # generate the code with: # --output-base because this script should also be able to run inside the vendor dir of # k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir # instead of the $GOPATH directly. For normal projects this can be dropped. bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \ database-manager-controller/pkg/client database-manager-controller/pkg/apis \ databasemanager:v1alpha1 \ --output-base "$(dirname "${BASH_SOURCE[0]}")/../.." \ --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt # To use your own boilerplate text append: # --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
其中以下代码段根据实际情况进行修改。
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \ database-manager-controller/pkg/client database-manager-controller/pkg/apis \ databasemanager:v1alpha1 \ --output-base "$(dirname "${BASH_SOURCE[0]}")/../.." \ --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
(4)创建verify-codegen.sh文件,主要用于校验生成的代码是否为最新的
#!/usr/bin/env bash set -o errexit set -o nounset set -o pipefail SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. DIFFROOT="${SCRIPT_ROOT}/pkg" TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg" _tmp="${SCRIPT_ROOT}/_tmp" cleanup() { rm -rf "${_tmp}" } trap "cleanup" EXIT SIGINT cleanup mkdir -p "${TMP_DIFFROOT}" cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}" "${SCRIPT_ROOT}/hack/update-codegen.sh" echo "diffing ${DIFFROOT} against freshly generated codegen" ret=0 diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$? cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}" if [[ $ret -eq 0 ]] then echo "${DIFFROOT} up to date." else echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh" exit 1 fi
(5)创建boilerplate.go.txt,主要用于为代码添加开源协议
/* Copyright The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */
(6)配置go vendor依赖目录
从update-codegen.sh
脚本可以看到该代码生成脚本是利用vendor
目录下的依赖进行的,我们项目本身没有配置,执行以下命令进行创建。
# go mod vendor
(7)在项目根目录下执行脚本生成代码
# chmod +x hack/update-codegen.sh # ./hack/update-codegen.sh Generating deepcopy funcs Generating clientset for databasemanager:v1alpha1 at database-manager-controller/pkg/client/clientset Generating listers for databasemanager:v1alpha1 at database-manager-controller/pkg/client/listers Generating informers for databasemanager:v1alpha1 at database-manager-controller/pkg/client/informers
然后新的目录结构如下:
# tree pkg/ pkg/ ├── apis │ └── databasemanager │ ├── register.go │ └── v1alpha1 │ ├── doc.go │ ├── register.go │ ├── type.go │ └── zz_generated.deepcopy.go └── client ├── clientset │ └── versioned │ ├── clientset.go │ ├── doc.go │ ├── fake │ │ ├── clientset_generated.go │ │ ├── doc.go │ │ └── register.go │ ├── scheme │ │ ├── doc.go │ │ └── register.go │ └── typed │ └── databasemanager │ └── v1alpha1 │ ├── databasemanager_client.go │ ├── databasemanager.go │ ├── doc.go │ ├── fake │ │ ├── doc.go │ │ ├── fake_databasemanager_client.go │ │ └── fake_databasemanager.go │ └── generated_expansion.go ├── informers │ └── externalversions │ ├── databasemanager │ │ ├── interface.go │ │ └── v1alpha1 │ │ ├── databasemanager.go │ │ └── interface.go │ ├── factory.go │ ├── generic.go │ └── internalinterfaces │ └── factory_interfaces.go └── listers └── databasemanager └── v1alpha1 ├── databasemanager.go └── expansion_generated.go
Controller开发
上面已经完成了自动代码的生成,生成了informer
、lister
、clientset
的代码,下面就开始编写真正的Controller
功能了。
我们需要实现的功能是:
- 创建数据库实例
- 更新数据库实例
- 删除数据库实例
(1)在代码根目录创建controller.go文件,编写如下内容
package main import ( "context" dbmanagerv1 "database-manager-controller/pkg/apis/databasemanager/v1alpha1" clientset "database-manager-controller/pkg/client/clientset/versioned" dbmanagerscheme "database-manager-controller/pkg/client/clientset/versioned/scheme" informers "database-manager-controller/pkg/client/informers/externalversions/databasemanager/v1alpha1" listers "database-manager-controller/pkg/client/listers/databasemanager/v1alpha1" "fmt" "github.com/golang/glog" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "time" ) const controllerAgentName = "database-manager-controller" const ( // SuccessSynced 用来表示事件被成功同步 SuccessSynced = "Synced" // MessageResourceSynced 表示事件被触发时的消息信息 MessageResourceSynced = "database manager synced successfully" MessageResourceExists = "Resource %q already exists and is not managed by DatabaseManager" ErrResourceExists = "ErrResourceExists" ) type Controller struct { // kubeclientset 是kubernetes的clientset kubeclientset kubernetes.Interface // dbmanagerclientset 是自己定义的API Group的clientset dbmanagerclientset clientset.Interface // deploymentsLister list deployment 对象 deploymentsLister appslisters.DeploymentLister // deploymentsSynced 同步deployment对象 deploymentsSynced cache.InformerSynced // dbmanagerLister list databasemanager 对象 dbmanagerLister listers.DatabaseManagerLister // dbmanagerSynced 同步DatabaseManager对象 dbmanagerSynced cache.InformerSynced // workqueue 限速的队列 workqueue workqueue.RateLimitingInterface // recorder 事件记录器 recorder record.EventRecorder } // NewController 初始化Controller func NewController(kubeclientset kubernetes.Interface, dbmanagerclientset clientset.Interface, dbmanagerinformer informers.DatabaseManagerInformer, deploymentInformer appsinformers.DeploymentInformer) *Controller { utilruntime.Must(dbmanagerscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Create event broadcaster") // 创建eventBroadcaster eventBroadcaster := record.NewBroadcaster() // 保存events到日志 eventBroadcaster.StartLogging(glog.Infof) // 上报events到APIServer eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) // 初始化Controller controller := &Controller{ kubeclientset: kubeclientset, dbmanagerclientset: dbmanagerclientset, deploymentsLister: deploymentInformer.Lister(), deploymentsSynced: deploymentInformer.Informer().HasSynced, dbmanagerLister: dbmanagerinformer.Lister(), dbmanagerSynced: dbmanagerinformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DatabaseManagers"), recorder: recorder, } glog.Info("Start up event handlers") // 注册Event Handler,分别对于添加、更新、删除事件,具体的操作由事件对应的API将其加入队列中 dbmanagerinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueDatabaseManager, UpdateFunc: func(oldObj, newObj interface{}) { oldDBManager := oldObj.(*dbmanagerv1.DatabaseManager) newDBManager := newObj.(*dbmanagerv1.DatabaseManager) if oldDBManager.ResourceVersion == newDBManager.ResourceVersion { return } controller.enqueueDatabaseManager(newObj) }, DeleteFunc: controller.enqueueDatabaseManagerForDelete, }) // 注册Deployment Event Handler deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { // 如果没有改变,就返回 return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, }) return controller } // Run 启动入口 func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() defer c.workqueue.ShuttingDown() glog.Info("start controller, cache sync") // 同步缓存数据 if ok := cache.WaitForCacheSync(stopCh, c.dbmanagerSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } glog.Info("begin start worker thread") // 开启work线程 for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } glog.Info("worker thread started!!!!!!") <-stopCh glog.Info("worker thread stopped!!!!!!") return nil } // runWorker 是一个死循环,会一直调用processNextWorkItem从workqueue中取出数据 func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // processNextWorkItem 从workqueue中取出数据进行处理 func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } // We wrap this block in a func so we can defer c.workqueue.Done. err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { c.workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } // 在syncHandler中处理业务 if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } c.workqueue.Forget(obj) glog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { runtime.HandleError(err) return true } return true } // syncHandler 处理业务Handler func (c *Controller) syncHandler(key string) error { // 通过split得到namespace和name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } // 从缓存中取对象 dbManager, err := c.dbmanagerLister.DatabaseManagers(namespace).Get(name) if err != nil { // 如果DatabaseManager对象被删除了,就会走到这里 if errors.IsNotFound(err) { glog.Infof("DatabaseManager对象被删除,请在这里执行实际的删除业务: %s/%s ...", namespace, name) return nil } runtime.HandleError(fmt.Errorf("failed to list DatabaseManager by: %s/%s", namespace, name)) return err } glog.Infof("这里是databasemanager对象的期望状态: %#v ...", dbManager) // 获取是否有deploymentName deploymentName := dbManager.Spec.DeploymentName if deploymentName == "" { utilruntime.HandleError(fmt.Errorf("%s: deploymentName 不能为空", key)) return nil } // 判断deployment是否在集群中存在 deployment, err := c.deploymentsLister.Deployments(dbManager.Namespace).Get(deploymentName) if errors.IsNotFound(err) { // 如果没有找到,就创建 deployment, err = c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Create( context.TODO(), newDeployment(dbManager), metav1.CreateOptions{}) } // 如果Create 或者 Get 都出错,则返回 if err != nil { return err } // 如果这个deployment不是由DatabaseManager控制,应该报告这个事件 if !metav1.IsControlledBy(deployment, dbManager) { msg := fmt.Sprintf(MessageResourceExists, deployment.Name) c.recorder.Event(dbManager, corev1.EventTypeWarning, ErrResourceExists, msg) return fmt.Errorf("%s", msg) } // 如果replicas和期望的不等,则更新deployment if dbManager.Spec.Replicas != nil && *dbManager.Spec.Replicas != *deployment.Spec.Replicas { klog.V(4).Infof("DatabaseManager %s replicas: %d, deployment replicas: %d", name, *dbManager.Spec.Replicas, *deployment.Spec.Replicas) deployment, err = c.kubeclientset.AppsV1().Deployments(dbManager.Namespace).Update(context.TODO(), newDeployment(dbManager), metav1.UpdateOptions{}) } if err != nil { return err } // 更新状态 err = c.updateDatabaseManagerStatus(dbManager, deployment) if err != nil { return err } glog.Infof("实际状态是从业务层面得到的,此处应该去的实际状态,与期望状态做对比,并根据差异做出响应(新增或者删除)") c.recorder.Event(dbManager, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) return nil } // updateDatabaseManagerStatus 更新DatabaseManager状态 func (c *Controller) updateDatabaseManagerStatus(dbmanager *dbmanagerv1.DatabaseManager, deployment *appsv1.Deployment) error { dbmanagerCopy := dbmanager.DeepCopy() dbmanagerCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas _, err := c.dbmanagerclientset.CoolopsV1alpha1().DatabaseManagers(dbmanager.Namespace).Update(context.TODO(), dbmanagerCopy, metav1.UpdateOptions{}) return err } func (c *Controller) handleObject(obj interface{}) { var object metav1.Object var ok bool if object, ok = obj.(metav1.Object); !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) return } object, ok = tombstone.Obj.(metav1.Object) if !ok { utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) return } klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) } klog.V(4).Infof("Processing object: %s", object.GetName()) if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { // 检查对象是否和DatabaseManager对象关联,如果不是就退出 if ownerRef.Kind != "DatabaseManager" { return } dbmanage, err := c.dbmanagerLister.DatabaseManagers(object.GetNamespace()).Get(ownerRef.Name) if err != nil { klog.V(4).Infof("ignoring orphaned object '%s' of databaseManager '%s'", object.GetSelfLink(), ownerRef.Name) return } c.enqueueDatabaseManager(dbmanage) return } } func newDeployment(dbmanager *dbmanagerv1.DatabaseManager) *appsv1.Deployment { var image string var name string switch dbmanager.Spec.Dbtype { case "mysql": image = "mysql:5.7" name = "mysql" case "mariadb": image = "mariadb:10.7.1" name = "mariadb" default: image = "mysql:5.7" name = "mysql" } labels := map[string]string{ "app": dbmanager.Spec.Dbtype, } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: dbmanager.Namespace, Name: dbmanager.Name, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(dbmanager, dbmanagerv1.SchemeGroupVersion.WithKind("DatabaseManager")), }, }, Spec: appsv1.DeploymentSpec{ Replicas: dbmanager.Spec.Replicas, Selector: &metav1.LabelSelector{MatchLabels: labels}, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{Labels: labels}, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: name, Image: image, }, }, }, }, }, } } // 数据先放入缓存,再入队列 func (c *Controller) enqueueDatabaseManager(obj interface{}) { var key string var err error // 将对象放入缓存 if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { runtime.HandleError(err) return } // 将key放入队列 c.workqueue.AddRateLimited(key) } // 删除操作 func (c *Controller) enqueueDatabaseManagerForDelete(obj interface{}) { var key string var err error // 从缓存中删除指定对象 key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) return } //再将key放入队列 c.workqueue.AddRateLimited(key) }
其主要逻辑和文章开头介绍的Controller实现逻辑
一样,其中关键点在于:
- 在
NewController
方法中,定义了DatabaseManager
和Deployment
对象的Event Handler,除了同步缓存外,还将对应的Key放入queue中。 - 实际处理业务的方法是
syncHandler
,可以根据实际请求来编写代码以达到业务需求。
2、在项目根目录下创建main.go,编写入口函数
(1)编写处理系统信号量的Handler
这部分直接使用的demo中的代码【3】
(2)编写入口main函数
package main import ( "flag" "time" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" clientset "database-manager-controller/pkg/client/clientset/versioned" informers "database-manager-controller/pkg/client/informers/externalversions" "database-manager-controller/pkg/signals" ) var ( masterURL string kubeconfig string ) func main() { // klog.InitFlags(nil) flag.Parse() // 设置处理系统信号的Channel stopCh := signals.SetupSignalHandler() // 处理入参 cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { klog.Fatalf("Error building kubeconfig: %s", err.Error()) } // 初始化kubeClient kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } // 初始化dbmanagerClient dbmanagerClient, err := clientset.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building example clientset: %s", err.Error()) } kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) dbmanagerInformerFactory := informers.NewSharedInformerFactory(dbmanagerClient, time.Second*30) // 初始化controller controller := NewController(kubeClient, dbmanagerClient, dbmanagerInformerFactory.Coolops().V1alpha1().DatabaseManagers(), kubeInformerFactory.Apps().V1().Deployments()) // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. kubeInformerFactory.Start(stopCh) dbmanagerInformerFactory.Start(stopCh) if err = controller.Run(2, stopCh); err != nil { klog.Fatalf("Error running controller: %s", err.Error()) } } func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") }
测试Controller
1、在项目目录下添加一个Makefile
build: echo "build database manager controller" CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build .
2、执行make build进行编译
# make build echo "build database manager controller" build database manager controller CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build .
然后会输出database-manager-controller
一个二进制文件。
3、运行controller
# chmod +x database-manager-controller # ./database-manager-controller -kubeconfig=$HOME/.kube/config -alsologtostderr=true I1123 09:52:41.595726 29173 controller.go:81] Start up event handlers I1123 09:52:41.597448 29173 controller.go:120] start controller, cache sync I1123 09:52:41.699716 29173 controller.go:125] begin start worker thread I1123 09:52:41.699737 29173 controller.go:130] worker thread started!!!!!!
4、创建一个CRD测试用例,观察日志以及是否创建deployment
(1)测试样例如下
# cat example-mysql.yaml apiVersion: coolops.cn/v1alpha1 kind: DatabaseManager metadata: name: example-mysql spec: dbtype: "mysql" deploymentName: "mysql" replicas: 1
(2)执行以下命令进行创建,观察日志
# kubectl apply -f example-mysql.yaml databasemanager.coolops.cn/example-mysql created
可以看到对于的deployment和pod已经创建,不过由于Deployment的配置没有配置完全,mysql没有正常启动。
我们其实是可以看到Controller获取到了事件。
如果我们删除对象,也可以从日志里正常看到响应。
总结
上面就是自定义Controller的整个开发过程,相对来说还是比较简单,大部分东西社区都做好了,我们只需要套模子,然后实现自己的逻辑就行。
整个过程主要是参考sample-controller
【3】 ,现在简单整理如下:
- 确定好目的,然后创建CRD,定义需要的对象
- 按规定编写代码,定义好CRD所需要的type,然后使用code-generator进行代码自动生成,生成需要的informer、lister、clientset。
- 编写Controller,实现具体的业务逻辑
- 编写完成后就是验证,看看是否符合预期,根据具体情况再做进一步的调整