kubebuilder operator的运行逻辑

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: kubebuilder operator的运行逻辑

kubebuilder 的运行逻辑

目录

更新

  • 2023/12/18:代码描述更新至v0.16.0,新增watch部分内容

概述

下面是kubebuilder 的架构图。可以看到最外层是通过名为Manager的组件驱动的,Manager中包含了多个组件,其中Cache中保存了gvk和informer的映射关系,用于通过informer的方式缓存kubernetes 的对象。Controller使用workqueue的方式缓存informer传递过来的对象,后续提取workqueue中的对象,传递给Reconciler进行处理。

本文不介绍kuberbuilder的用法,如有需要可以参考如下三篇文章:

本次使用的controller-runtime的版本是:v0.16.0

引用自:Controller Runtime 的四种使用姿势

下面展示了用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller。先看用户侧的,Manager 是用户初始化的时候需要创建的,用来启动 Controller Runtime 的组件;Reconciler 是用户自己需要提供的组件,用于处理自己的业务逻辑。

而 controller-runtime 侧的组件,Cache 顾名思义就是缓存,用于建立 Informer 对 ApiServer 进行连接 watch 资源,并将 watch 到的 object 推入队列;Controller 一方面会向 Informer 注册 eventHandler,另一方面会从队列中拿数据并执行用户侧 Reconciler 的函数。

controller-runtime 侧整个工作流程如下:

首先 Controller 会先向 Informer 注册特定资源的 eventHandler;然后 Cache 会启动 Informer,Informer 向 ApiServer 发出请求,建立连接;当 Informer 检测到有资源变动后,使用 Controller 注册进来的 eventHandler 判断是否推入队列中;当队列中有元素被推入时,Controller 会将元素取出,并执行用户侧的 Reconciler。

下述例子的代码生成参考:Building your own kubernetes CRDs

Managers

manager负责运行controllers和webhooks,并设置公共依赖,如clients、caches、schemes等。

kubebuilder的处理

kubebuilder会自动在main.go中创建Manager:

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    Scheme:                 scheme,
    MetricsBindAddress:     metricsAddr,
    Port:                   9443,
    HealthProbeBindAddress: probeAddr,
    LeaderElection:         enableLeaderElection,
    LeaderElectionID:       "3b9f5c61.com.bolingcavalry",
  })

controllers是通过调用Manager.Start接口启动的。

Controllers

controller使用events来触发reconcile的请求。通过controller.New接口可以初始化一个controller,并通过manager.Start启动该controller。

func New(name string, mgr manager.Manager, options Options) (Controller, error) {
  c, err := NewUnmanaged(name, mgr, options)
  if err != nil {
    return nil, err
  }
  // Add the controller as a Manager components
  return c, mgr.Add(c) // 将controller添加到manager中
}

kubebuilder的处理

kubebuilder会自动在main.go中生成一个SetupWithManager函数,在Complete中创建并将controller添加到manager,具体见下文:

func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
  return ctrl.NewControllerManagedBy(mgr).
    For(&webappv1.Guestbook{}).
    Complete(r)
}

main.go中调用Manager.Start接口来启动controller:

mgr.Start(ctrl.SetupSignalHandler())

Reconcilers

Controller的核心是实现了Reconciler接口。Reconciler 会接收到一个reconcile请求,该请求中包含对象的name和namespace。reconcile会对比对象和其所拥有(own)的资源的当前状态与期望状态,并据此做出相应的调整。

通常Controller会根据集群事件(如Creating、Updating、Deleting Kubernetes对象)或外部事件(如GitHub Webhooks、轮询外部资源等)触发reconcile。

注意:Reconciler中传入的reqeust中仅包含对象的名称和命名空间,并没有对象的其他信息,因此需要通过kubernetes client来获取对象的相关信息。

// Request contains the information necessary to reconcile a Kubernetes object.  This includes the
// information to uniquely identify the object - its Name and Namespace.  It does NOT contain information about
// any specific Event or the object contents itself.
type Request struct {
  // NamespacedName is the name and namespace of the object to reconcile.
  types.NamespacedName
}
// NamespacedName comprises a resource name, with a mandatory namespace,
// rendered as "<namespace>/<name>".  Being a type captures intent and
// helps make sure that UIDs, namespaced names and non-namespaced names
// do not get conflated in code.  For most use cases, namespace and name
// will already have been format validated at the API entry point, so we
// don't do that here.  Where that's not the case (e.g. in testing),
// consider using NamespacedNameOrDie() in testing.go in this package.
type NamespacedName struct {
  Namespace string
  Name      string
}

Reconciler接口的描述如下,其中给出了其处理逻辑的例子:

  • 读取一个对象以及其所拥有的所有pod
  • 观察到对象期望的副本数为5,但实际只有一个pod副本
  • 创建4个pods,并设置OwnerReferences
/*
Reconciler implements a Kubernetes API for a specific Resource by Creating, Updating or Deleting Kubernetes
objects, or by making changes to systems external to the cluster (e.g. cloudproviders, github, etc).
 
reconcile implementations compare the state specified in an object by a user against the actual cluster state,
and then perform operations to make the actual cluster state reflect the state specified by the user.
 
Typically, reconcile is triggered by a Controller in response to cluster Events (e.g. Creating, Updating,
Deleting Kubernetes objects) or external Events (GitHub Webhooks, polling external sources, etc).
 
Example reconcile Logic:
 
  * Read an object and all the Pods it owns.
  * Observe that the object spec specifies 5 replicas but actual cluster contains only 1 Pod replica.
  * Create 4 Pods and set their OwnerReferences to the object.
 
reconcile may be implemented as either a type:
 
  type reconcile struct {}
 
  func (reconcile) reconcile(controller.Request) (controller.Result, error) {
    // Implement business logic of reading and writing objects here
    return controller.Result{}, nil
  }
 
Or as a function:
 
  controller.Func(func(o controller.Request) (controller.Result, error) {
    // Implement business logic of reading and writing objects here
    return controller.Result{}, nil
  })
 
Reconciliation is level-based, meaning action isn't driven off changes in individual Events, but instead is
driven by actual cluster state read from the apiserver or a local cache.
For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted,
instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
*/
type Reconciler interface {
  // Reconcile performs a full reconciliation for the object referred to by the Request.
  // The Controller will requeue the Request to be processed again if an error is non-nil or
  // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
  Reconcile(context.Context, Request) (Result, error)
}

重新执行Reconciler

Reconcile除了根据事件执行之外还可以重复调用,方法比较简单,即在实现Reconcile(context.Context, Request) (Result, error)的方法中,将返回值Result.Requeue设置为true,此时会非周期性地重复调用Reconcile;另一种是给Result.RequeueAfter设置一个时间范围,当超时之后会重新调用Reconcile。其重复执行的处理逻辑位于reconcileHandler方法中,其实就是将老的obj从workqueue中删除,然后重新入队列,两种情况的处理逻辑如下:

case err != nil:
    if errors.Is(err, reconcile.TerminalError(nil)) {
      ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
    } else {
      c.Queue.AddRateLimited(req)
    }
    ...
  case result.RequeueAfter > 0: //超时方式重新调用
    log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
    // The result.RequeueAfter request will be lost, if it is returned
    // along with a non-nil error. But this is intended as
    // We need to drive to stable reconcile loops before queuing due
    // to result.RequestAfter
    c.Queue.Forget(obj)
    c.Queue.AddAfter(req, result.RequeueAfter)
    ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
  case result.Requeue: //非周期性方式调用
    log.V(5).Info("Reconcile done, requeueing")
    c.Queue.AddRateLimited(req)
    ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()

Requeue和RequeueAfter对结果的影响如下:

Requeue RequeueAfter Error Result
any any !nil Requeue with rate limiting.
true 0 nil Requeue with rate limiting.
any >0 nil Requeue after specified RequeueAfter.
false 0 nil Do not requeue.

kubebuilder的处理

kubebuilder会在guestbook_controller.go 中生成一个实现了Reconciler接口的模板:

func (r *GuestbookReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
  _ = log.FromContext(ctx)
  // TODO(user): your logic here
  return ctrl.Result{}, nil
}

那么Reconciler又是怎么和controller关联起来的呢?在上文提到 kubebuilder 会通过Complete(SetupWithManager中调用)创建并添加controller到manager,同时可以看到Complete中传入的就是reconcile.Reconciler接口,这就是controller和Reconciler关联的入口:

// Complete builds the Application Controller.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
  _, err := blder.Build(r)
  return err
}

后续会通过: Builder.Build -->Builder.doController-->newController 最终传递给controller的初始化接口controller.New,并赋值给Controller.Do变量。controller.New中创建的controller结构如下,可以看到还为MakeQueue赋予了一个创建workqueue的函数,新事件会缓存到该workqueue中,后续传递给Reconcile进行处理:

// Create controller with dependencies set
  return &controller.Controller{
    Do: options.Reconciler,
    MakeQueue: func() workqueue.RateLimitingInterface {
      return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
        Name: name,
      })
    },
    MaxConcurrentReconciles: options.MaxConcurrentReconciles,
    CacheSyncTimeout:        options.CacheSyncTimeout,
    Name:                    name,
    LogConstructor:          options.LogConstructor,
    RecoverPanic:            options.RecoverPanic,
    LeaderElected:           options.NeedLeaderElection,
  }, nil

上面有讲controller会根据事件来调用Reconciler,那它是如何传递事件的呢?

可以看下Controller的启动接口(Manager.Start中会调用Controller.Start接口),可以看到其调用了processNextWorkItem来处理workqueue中的事件:

func (c *Controller) Start(ctx context.Context) error {
  ...
  c.Queue = c.MakeQueue() //通过MakeQueue初始化一个workqueue
  ...
  wg := &sync.WaitGroup{}
  err := func() error {
    ...
    for _, watch := range c.startWatches { //启动watch object
      ...
      if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
        return err
      }
    }
    ...
    wg.Add(c.MaxConcurrentReconciles)
    for i := 0; i < c.MaxConcurrentReconciles; i++ { //并发处理时间
      go func() {
        defer wg.Done()
        for c.processNextWorkItem(ctx) {
        }
      }()
    }
    ...
  }()
  ...
}

继续查看processNextWorkItem,可以看到该处理逻辑与client-go中的workqueue的处理方式一样,从workqueue中拿出事件对象,然后传递给reconcileHandler

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
  obj, shutdown := c.Queue.Get() //获取workqueue中的对象
  if shutdown {
    // Stop working
    return false
  }
  defer c.Queue.Done(obj)
  ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
  defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
  c.reconcileHandler(ctx, obj)
  return true
}

后续会通过Controller.reconcileHandler --> Controller.Reconcile -->Controller.Do.Reconcile 最终将事件传递给Reconcile(自己实现的Reconcile赋值给了controller的Do变量)。

总结一下:kubebuilder首先通过SetupWithManagerReconcile赋值给controller,在Manager启动时会调用Controller.Start启动controller,controller会不断获取其workqueue中的对象,并传递给Reconcile进行处理。

Controller事件来源

上面讲了controller是如何处理事件的,那么workqueue中的事件是怎么来的呢?

回到Builder.Complete-->Builder.build,从上面内容可以知道在doController函数中进行了controller的初始化,并将Reconciler和controller关联起来。在下面有个doWatch函数,该函数中注册了需要watch的对象类型,以及eventHandler(类型为handler.EnqueueRequestForObject),并通过controller的Watch接口启动对资源的监控。

func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
  ...
  // Set the ControllerManagedBy
  if err := blder.doController(r); err != nil {//初始化controller
    return nil, err
  }
  // Set the Watch
  if err := blder.doWatch(); err != nil {
    return nil, err
  }
  return blder.ctrl, nil
}
func (blder *Builder) doWatch() error {
  // Reconcile type
if blder.forInput.object != nil { // watch For()方式引入的对象
    obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)//格式化资源类型
    if err != nil {
      return err
    }
    src := source.Kind(blder.mgr.GetCache(), obj) //初始化资源类型
    hdler := &handler.EnqueueRequestForObject{}   //初始化eventHandler
    allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
    if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { //启动对资源的监控
      return err
    }
  }
  ...
  for _, own := range blder.ownsInput { // watch Owns()方式引入的对象
    ...
    if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { //启动对资源的监控
      return err
    }
  ...
  }
  ...
  for _, w := range blder.watchesInput { // watch Watches()方式引入的对象
  ...
    if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil { //启动对资源的监控
      return err
    }
  ...
  }
    ...
}

watch的对象有三个来源:

  • For()、Own():分别对应下面的ForOwns函数:
_ = ctrl.NewControllerManagedBy(mgr).
    For(&appsv1.ReplicaSet{}).       // ReplicaSet is the Application API
    Owns(&corev1.Pod{}).             // ReplicaSet owns Pods created by it.
    Complete(&ReplicaSetReconciler{Client: manager.GetClient()})
  • Watches():来自使用Watches注册的对象:
_ = ctrl.NewControllerManagedBy(mgr).
    Watches(
        &source.Kind{Type: &v1core.Node{}},
        handler.EnqueueRequestsFromMapFunc(myNodeFilterFunc),
        builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
    ).
    Complete(r)

继续看controller.Watch接口,可以看到其调用了src.Start(src的类型为 source.Kind),将evthdler(&handler.EnqueueRequestForObject{})、c.Qeueue关联起来(c.Qeueue为Reconciler提供参数)

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
   ...
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

在Kind.Start 中会根据ks.Type选择合适的informer,并添加事件管理器internal.EventHandler

在Manager初始化时(如未指定)默认会创建一个Cache,该Cache中保存了gvkcache.SharedIndexInformer 的映射关系,ks.cache.GetInformer 中会提取对象的gvk信息,并根据gvk获取informer。

Manager.Start的时候会启动Cache中的informer。

func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
  prct ...predicate.Predicate) error {
  ...
  go func() {
    ...
    if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
      // Lookup the Informer from the Cache and add an EventHandler which populates the Queue
      i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
      ...
      return true, nil
    }); 
        ...
    _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())  
    ...
  }()
  return nil
}

NewEventHandler返回internal.EventHandler,其实现了SharedIndexInformer所需的ResourceEventHandler接口:

type ResourceEventHandler interface {
  OnAdd(obj interface{})
  OnUpdate(oldObj, newObj interface{})
  OnDelete(obj interface{})
}

看下EventHandler 是如何将OnAdd监听到的对象添加到队列中的:

func (e EventHandler) OnAdd(obj interface{}) {
  ...
  e.handler.Create(ctx, c, e.queue)
}

上面的e.handler就是doWatch中注册的eventHandler EnqueueRequestForObject,可以看到在EnqueueRequestForObject.Create中提取了对象的名称和命名空间,并添加到了队列中:

func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
  ...
  q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
    Name:      evt.Object.GetName(),
    Namespace: evt.Object.GetNamespace(),
  }})
}

至此将整个Kubebuilder串起来了。

与使用client-go的区别

client-go

在需要操作kubernetes资源时,通常会使用client-go来编写资源的CRUD逻辑,或使用informer机制来监听资源的变更,并在OnAdd、OnUpdate、OnDelete中进行相应的处理。

kubebuilder Operator

从上述讲解可以了解到,Operator一般会涉及两方面:object以及其所有(own)的资源。Reconcilers是核心处理逻辑,但其只能获取到资源的名称和命名空间,并不知道资源的操作(增删改)是什么,也不知道资源的其他信息,目的就是在收到资源变更时,根据object的期望状态来调整资源的状态。

kubebuilder也提供了client库,可以对kubernetes资源进行CRUD操作,但建议这种情况下直接使用client-go进行操作:

package main
import (
  "context"
  corev1 "k8s.io/api/core/v1"
  "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  "k8s.io/apimachinery/pkg/runtime/schema"
  "sigs.k8s.io/controller-runtime/pkg/client"
)
var c client.Client
func main() {
  // Using a typed object.
  pod := &corev1.Pod{}
  // c is a created client.
  _ = c.Get(context.Background(), client.ObjectKey{
    Namespace: "namespace",
    Name:      "name",
  }, pod)
  // Using a unstructured object.
  u := &unstructured.Unstructured{}
  u.SetGroupVersionKind(schema.GroupVersionKind{
    Group:   "apps",
    Kind:    "Deployment",
    Version: "v1",
  })
  _ = c.Get(context.Background(), client.ObjectKey{
    Namespace: "namespace",
    Name:      "name",
  }, u)
}
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
容器 Kubernetes API
深入解析 Kubebuilder:让编写 CRD 变得更简单
作者 | 刘洋(炎寻) 阿里云高级开发工程师 导读:自定义资源 CRD(Custom Resource Definition)可以扩展 Kubernetes API,掌握 CRD 是成为 Kubernetes 高级玩家的必备技能,本文将介绍 CRD 和 Controller 的概念,并对 CRD 编写框架 Kubebuilder 进行深入分析,让您真正理解并能快速开发 CRD。
11965 3
|
2月前
|
Kubernetes API 数据库
CRD + Operator模式
【7月更文挑战第25天】
52 2
|
3月前
|
运维 负载均衡 Serverless
函数计算产品使用问题之yaml如果写多个function,可不可以yaml在构建的时候能构建多个函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
Kubernetes Go API
|
11月前
|
Kubernetes 容器
【K8s源码品读】002:Phase 1 - kubectl - create的调用逻辑
我们的目标是查看`kubectl create -f nginx_pod.yaml` 这个命令是怎么运行的。
51 0
|
存储 Kubernetes Linux
【探索 Kubernetes|作业管理篇 系列 8】探究 Pod 的 API 对象属性级别与重要字段用法
【探索 Kubernetes|作业管理篇 系列 8】探究 Pod 的 API 对象属性级别与重要字段用法
94 1
|
消息中间件 域名解析 Kubernetes
Kubernetes ExternalName的使用场景例子及配置步骤
Kubernetes 的 ExternalName 是一种用于将 Kubernetes 集群中的服务映射到集群外部服务的机制。它通常用于将集群内的服务与集群外部的服务进行互联,比如连接到外部数据库、消息队列或者其他无法直接暴露在集群中的服务。 以下是一个使用场景的例子:假设你的应用程序需要连接到一个名为 "external-service.example.com" 的外部服务,该服务可能是在 Kubernetes 集群之外运行的数据库或其他类型的服务。你可以使用 ExternalName 来将该外部服务暴露给 Kubernetes 集群内的其他服务。 下面是配置步骤: 1. 创建一个 `
1083 0
|
Perl
k8s-helm yaml文件高效复用---参数方式
k8s-helm yaml文件高效复用---参数方式
293 0
k8s-helm yaml文件高效复用---参数方式
|
存储 缓存 Kubernetes
带你读《云原生应用开发 Operator原理与实践》第三章 Kubebuilder 原理3.3 Controller-runtime 模块分析(四)
带你读《云原生应用开发 Operator原理与实践》第三章 Kubebuilder 原理3.3 Controller-runtime 模块分析
|
监控 Cloud Native API
带你读《云原生应用开发 Operator原理与实践》第三章 Kubebuilder 原理3.3 Controller-runtime 模块分析(六)
带你读《云原生应用开发 Operator原理与实践》第三章 Kubebuilder 原理3.3 Controller-runtime 模块分析