前言
在开发中我们会遇到需要 watch 某个 kuberentes 资源的变化,来进行一些操作。本文介绍了关于如何利用 client-go 来对资源进行 watch,以及 client-go 自带的工具类 retrywatcher 的使用。
环境准备
这里我们利用 kind 快速拉起一套 kubernetes 的环境,并在项目中引用对应 kubernetes 版本的 client-go 包。
需要注意的是 client-go 包与 kubenretes 的对应版本第一位需要 1 -》0
kind create cluster --name dev --image kindest/node:v1.22.7 go get k8s.io/client-go@v0.22.7
环境准备好后,我们准备一个 configmap 作为需要 watch 的对象
apiVersion v1 data field1 value1 field2 value2 kind ConfigMap metadata name my-configuration namespace default
场景实践
这里我们直接利用 kind 拉起集群的 kubeconfig 来初始化 client,具体可以参考 开发中与 APIServer 常见的几种认证方式。
varkubeconfig*stringifhome :=homedir.HomeDir(); home!="" { kubeconfig=flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "") } else { kubeconfig=flag.String("kubeconfig", "", "") } flag.Parse() rc, err :=clientcmd.BuildConfigFromFlags("", *kubeconfig) iferr!=nil { panic(err) } cs, err :=kubernetes.NewForConfig(rc) iferr!=nil { panic(err) }
clientset 初始化完成后,利用 watch 接口是可以的,但是 APIServer 会定期关闭 watch 链接,这样就会导致中断,根据 github issue 中,官方推荐使用 watch 的工具包。
获取 RetryWatcher
直接上代码,我们来分步介绍下
funcGetRetryWatcher(ctxcontext.Context, cs*kubernetes.Clientset, cmNamestring) (*watchtools.RetryWatcher, error) { namespace :=metav1.NamespaceDefaultcmInit, err :=cs.CoreV1().ConfigMaps(namespace).Get(ctx, cmName, metav1.GetOptions{}) iferr!=nil { returnnil, err } // create retry watcherretryWatcher, err :=watchtools.NewRetryWatcher(cmInit.ResourceVersion, &cache.ListWatch{ WatchFunc: func(optionsmetav1.ListOptions) (watch.Interface, error) { returncs.CoreV1().ConfigMaps(namespace).Watch(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", cmName), }) }, }) iferr!=nil { returnnil, fmt.Errorf("create retry watcher error: %v", err) } returnretryWatcher, nil}
首先我们通过 get 先获取到当前需要 watch 的 configmap 的实例对象。这一步两个目的:
- 第一个是需要确认下我们需要 watch 的资源是否存在
- 第二个也是最重要的是获取当前资源的 ResourceVersion,这也是 watch 的基线
cmInit, err :=cs.CoreV1().ConfigMaps(namespace).Get(ctx, cmName, metav1.GetOptions{}) iferr!=nil { // k8serrors.IsNotFound(err)}returnnil, err}
接下来我们就可以实例化 RetryWatcher 对象了,其中需要传入:
- 当前资源的 ResourceVersion
- watch资源的过滤条件,由于我们只需要 watch 一个对象,直接使用 FieldSelector 根据 name 过滤,批量情况下建议使用 Label 过滤。
// create retry watcherretryWatcher, err :=watchtools.NewRetryWatcher(cmInit.ResourceVersion, &cache.ListWatch{ WatchFunc: func(optionsmetav1.ListOptions) (watch.Interface, error) { returncs.CoreV1().ConfigMaps(namespace).Watch(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", cmName), }) }, })
到这里的话,我们的 retryWatcher 对象创建完成了
使用 RetryWatcher
ctx :=context.Background() cmName :="my-configuration"retryWatcher, err :=GetRetryWatcher(ctx, cs, cmName) iferr!=nil { panic(err) } deferretryWatcher.Stop() for { select { caseevent :=<-retryWatcher.ResultChan(): cm, ok :=event.Object.(*corev1.ConfigMap) if!ok { continue } switchevent.Type { // "k8s.io/apimachinery/pkg/watch"casewatch.Deleted: // do nothingcasewatch.Added: // do nothingcasewatch.Modified: fmt.Println("change to", cm.Data) } case<-ctx.Done(): return } }
可以看到上述代码中,获取到 retrywatcher 后我们可以监听资源变化发送出来的不同事件,从而进行不同的操作。
我们在上面的代码中对于 cm 变更的时候打印一下新的内容。
我们尝试修改 field1 对应的 value 试一下,执行如下 patch
kubectl patch cm my-configuration --type=json -p="[{\"op\":\"replace\", \"path\":\"/data/field1\", \"value\":\"newvalue1\"}]"
可以看到成功 watch 到了变化
挂载文件 + filewatcher
watch 相对来说适用于一些对于变更要求很敏感的场景,如果对于一些不太敏感的场景,我们也可以利用将 secret/configmap 内容当作文件挂载到 Pod 中,进程利用 filewatcher 去感知变更。
这里利用了 kubernetes 的特性,如果 secret/configmap 当作文件挂载到 Pod 中,当源头发生变更后,kubenretes 会将变更同步到挂载的文件中,当然这个不是实时的,会存在一定延迟。
以下案例中,我们通过将 configmap 挂载到容器根目录中的 my-configuration
目录中
apiVersion v1 kind Pod metadata name pod-demo spec containersname test-po image busybox command top volumeMountsmountPath my-configuration name my-configuration readOnlytrue volumesname my-configuration configMap name my-configuration
key 以文件的方式在该目录下存储
我们可以利用 fsnotify 对文件进行 watch 即可。