2.2.2 Client-go 主体结构
Client-go 共支持 4 种与 Kubernetes APIServer 交互的客户端逻辑,如图 2-4 所示。
图 2-4 Client 交互图
(1) RESTClient: 最基础的客户端,它主要对 HTTP 请求进行了封装,并且支持 JSON 和 Protobuf 格式数据。
(2) DiscoveryClient:发现客户端,发现 APIServer 支持的资源组、资源版本和资源信息。如 Kubectl Api-Versions 。
(3) ClientSet:Kubernetes 自身内置资源的客户端集合,仅能操作已知类型的内置资源,如 Pods、Service 等。
(4) DynamicClient:动态客户端,可以对任意的 Kubernetes 资源执行通用操作,包括 CRD。
1. RESTClient
RESTClient 是所有客户端的父类,RESTClient 提供的 RESTful 方法(如 Get()、Put()、Post()、Delete() 等 ) 与 Kubernetes APIServer 进 行 交 互,ClientSet、DynamicClient 和 DiscoveryClient 等也都是基于 RESTClient 二次开发实现的。因此,RESTClient 可以操作 Kubernetes 自身内置的原生资源以及 CRD 。
前面 Example,目录中的 out-of-cluster-client-configuration 示例,用 RESTClient 实现的代码见代码清单 2-10。
代码清单 2-10
package main import ( "context" "fmt" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) func main() { // 加载配置文件,生成 config 对象 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { panic(err.Error()) } // 配置 API 路径和请求的资源组 / 资源版本信息 config.APIPath = "api" config.GroupVersion = &corev1.SchemeGroupVersion // 配置数据的编解码器 config.NegotiatedSerializer = scheme.Codecs // 实例化 RESTClient 对象 restClient, err := rest.RESTClientFor(config) if err != nil { panic(err.Error()) } // 预设返回值存放对象 result := &corev1.PodList{} // Get 方法设置 HTTP 请求方法 ;Namespace 方法设置操作的命名空间 // Resource 方法设置操作的资源类型 ;VersionedParams 方法设置请求的查询参数 // Do 方法发起请求并用 Into 方法将 APIServer 返回的结果写入 Result 变量中 err = restClient.Get(). Namespace("default"). Resource("pods"). VersionedParams(&metav1.ListOptions{Limit: 100}, scheme.ParameterCodec). Do(context.TODO()). Into(result) if err != nil { panic(err) } // 打印 Pod 信息 for _, d := range result.Items { fmt.Printf( "NAMESPACE:%v \t NAME: %v \t STATUS: %v\n", d.Namespace, d.Name, d.Status.Phase, ) } }
运行以上代码,会获得命名空间 Default 下的所有 Pod 资源的相关信息,部分信息打印输出见代码清单 2-11。
代码清单 2-11
# 运行输出 NAMESPACE:default NAME: nginx-deployment-6b474476c4-lpld7 STATUS: Running NAMESPACE:default NAME: nginx-deployment-6b474476c4-t6xl4 STATUS: Running
RESTClient 实际上是对 Kubernetes APIServer 的 RESTful API 的访问进行了封装抽象,底层调用的是 Go 语言 Net/Http 库。
分析 RESTClient 发起请求的过程如下。
(1) Get 方法返回 Request 类型对象(见代码清单 2-12)。
代码清单 2-12
// Get begins a GET request. Short for c.Verb("GET"). func (c *RESTClient) Get() *Request { return c.Verb("GET") }
(2) Request 结构体对象用于构建访问 APIServer 的请求,示例中依次调用的 Namespace、Resource、VersionedParams、Do 等方法都是 Request 结构体的方法,最终 Do 方法中 r.request 发起请求,r.transformResponse 将 APIServer 的返回值解析成 corev1.PodList 类型对象,即示例中的 Result 变量(见代码清单 2-13)。
代码清单 2-13
func (r *Request) Do(ctx context.Context) Result { var result Result err := r.request(ctx, func(req *http.Request, resp *http.Response) { result = r.transformResponse(resp, req) }) //... }
(3) r.request 方法首先检查是否设置 http client,如果没有,则使用 net/http 默认的 DefaultClient、r.URL.String 方法根据配置的请求参数生成请求的 RESTful URL,示例中生成的请求 URL 应该为 https://xxx/api/v1/namespaces/default/pods?limit=100。之后用 net/http 标准库构建 req 请求并发送该请求,最终 fn 函数变量对 APIServer 的返回值进行解析(见代码清单 2-14)。
代码清单 2-14
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error { //... client := r.c.Client if client == nil { client = http.DefaultClient } //... for { url := r.URL().String() req, err := http.NewRequest(r.verb, url, r.body) //... resp, err := client.Do(req) //... done := func() bool { //... fn(req, resp) return true }() //... } }
总结:Kubernetes APIServer 有很多操作资源的接口,而 RESTClient 就是对访问这些 API 的封装。
2. ClientSet
虽然 RESTClient 可以访问 Kubernetes 的任意资源对象,但在使用时需要配置的参数过于烦琐,为了更为优雅地处理,需要进一步封装。ClientSet 继承自 RESTClient,使用预生成的 API 对象与 APIServer 进行交互,方便开发者二次开发。
ClientSet 是一组资源客户端的集合,比如操作 Pods、Services、Secrets 资源的 Core V1Client ,操作 Deployments、ReplicaSets、DaemonSets 资源的 ExtensionsV1beta1Client 等,如图 2-5 所示,直接通过这些客户端提供的操作方法即可对 Kubernetes 内置的原生资源进行 Create、Update、Get、List、Delete 等多种操作。
图 2-5 ClientSet 交互图
ClientSet 的使用方式在前面已有讲解,这里不再赘述。下面分析核心代码 clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{}) 的执行流程。
ClientSet 包 含 众 多 资 源 客 户 端,CoreV1 方 法 负 责 返 回 CoreV1Client( 见 代 码 清单 2-15)。
代码清单 2-15
type Clientset struct { *discovery.DiscoveryClient //... coreV1 *corev1.CoreV1Client //... } //... // CoreV1 retrieves the CoreV1Client func (c *Clientset) CoreV1() corev1.CoreV1Interface { return c.coreV1 }
Pods 方法的参数用于设定 Namespace,内部调用 newPods 函数,该函数返回实现PodInterface 的对象(见代码清单 2-16)。
代码清单 2-16
func (c *CoreV1Client) Pods(namespace string) PodInterface { return newPods(c, namespace) }
可以看到 PodInterface 包含了操作 Pods 资源的全部方法,newPods 函数构造的 Pods对象内部包含 RESTClient,在 Pods 对象的 List 方法中,我们看到了熟悉的 RESTClient 操作资源的调用流程(见代码清单 2-17)。
代码清单 2-17
type PodInterface interface { Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error) Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error) UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1. UpdateOptions) (*v1.Pod, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error) List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error) //... } func newPods(c *CoreV1Client, namespace string) *pods { return &pods{ client: c.RESTClient(), ns: namespace, } } //... func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) { //... result = &v1.PodList{} err = c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result) return }
3. DynamicClient
DynamicClient 是一种动态客户端,通过动态指定资源组、资源版本和资源等信息,它可以操作任意的 Kubernetes 资源,即不仅可以操作 Kubernetes 自身内置的资源,还可操作 CRD。这也是 DynamicClient 与之前介绍的 ClientSet 客户端最显著的一个区别。
ClientSet 与 DynamicClient 的区别如下。
ClientSet 默认只能操作 Kubernetes 内置的资源,不能直接操作 CRD,并且使用类型化客户端 ClientSet 时,程序也会与所使用的版本和类型紧密耦合。DynamicClient 使用嵌套的 map[string]-interface{] 结构存储 Kubernetes APIServer 的返回值,利用反射机制在运行时进行数据绑定,松耦合意味着更高的灵活性,但无法获取强数据类型检查和验证的好处。
在介绍 DynamicClient 之前,首先了解一下 Object.runtime 接口和 Unstructured 结构体,这有助于理解 DynamicClient 的实现。
(1) Object.runtime:Kubernetes 中 所 有 的 资 源 对 象( 例 如,Pod、Deployment、CRD 等)都实现了 Object.runtime 接口,其包含 DeepCopyObject 和 GetObjectKind 方法,分别用于对象深拷贝和获取对象的具体资源类型(Kind)。
(2) Unstructured:Unstructured 结构体包含 map[string]interface{} 类型字段,在处理无法预知结构的数据时,将数据值存入 interface{} 中,待运行时利用反射判断。该结构体包含大量工具方法,方便处理非结构化数据。
DynamicClient 代码示例见代码清单 2-18。
代码清单 2-18
package main import ( "context" "fmt" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" ) func main() { // 加载 kubeconfig 文件,生成 config 对象 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { panic(err) } // dynamic.NewForConfig 函数通过 config 实例化 dynamicClient 对象 dynamicClient, err := dynamic.NewForConfig(config) if err != nil { panic(err) } // 通过 schema.GroupVersionResource 设置要请求对象的资源组、资源版本和资源 // 设置命名空间和请求参数 , 得到 unstructured.UnstructuredList 指针类型的 PodList gvr := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} unstructObj, err := dynamicClient.Resource(gvr).Namespace("kube-system").List( context.TODO(), metav1.ListOptions{Limit: 10}, ) if err != nil { panic(err) } // 通过 runtime.DefaultUnstructuredConverter 函数将 unstructured.UnstructuredList // 转为 DeploymentList 类型 deploymentList := &appsv1.DeploymentList{} err = runtime.DefaultUnstructuredConverter.FromUnstructured( unstructObj.UnstructuredContent(), deploymentList, ) if err != nil { panic(err) } for _, v := range deploymentList.Items { fmt.Printf( "KIND: %v \t NAMESPACE: %v \t NAME:%v \n", v.Kind, v.Namespace, v.Name, ) } }
运行以上代码,会获得 Kube-System 域中部分 Deployment 的信息,打印输出见代码清单 2-19。
代码清单 2-19
# 运行输出 KIND: Deployment NAMESPACE: kube-system NAME:calico-kube-controllers KIND: Deployment NAMESPACE: kube-system NAME:coredns KIND: Deployment NAMESPACE: kube-system NAME:kube-state-metrics KIND: Deployment NAMESPACE: kube-system NAME:metrics-server KIND: Deployment NAMESPACE: kube-system NAME:nginx ...
DynamicClient 发起请求的过程如下(见代码清单 2-20)。NewForConfig 获取 DynamicClient 对象,其中封装了 RESTClient 类型的客户端。
代码清单 2-20
func NewForConfig(inConfig *rest.Config) (Interface, error) { //... restClient, err := rest.RESTClientFor(config) //... return &dynamicClient{client: restClient}, nil }
构造 GVR 对象并作为参数传递给 Resource 方法,Resource 方法会返回 dynamic ResourceClient ,Namespace 和 List 都是 DynamicResourceClient 的方法(见代码清单 2-21)。
代码清单 2-21
func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface { return &dynamicResourceClient{client: c, resource: resource} }
List 方法中首先获得 Kubernetes APIServer 返回的 Deployment 信息,此时数据是二进制格式的 JSON,利用 UnstructuredJSONScheme 解析器将 JSON 格式的数据写入 Unstructured/UnstructuredList 类型的对象中并返回,由于使用 runtime.Object 接口作为返回类型,因此,后续需要进行类型强制转换,即 uncastObj.(*unstructured.UnstructuredList) 或 uncastObj.(*unstruc- tured.Unstructured)(见代码清单 2-22)。
代码清单 2-22
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1. ListOptions) (*unstructured.UnstructuredList, error) { result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).Speci- ficallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx) //... uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes) //... if list, ok := uncastObj.(*unstructured.UnstructuredList); ok { //... list, err := uncastObj.(*unstructured.Unstructured).ToList() //... }
此时已经获得 Unstructured/UnstructuredList 类型的 Deployment 信息,之后将其转化为标准的 Deployment/DeploymentList 结构即可,通过 DefaultUnstructuredConverter 结构体的 FromUnstructured 方法来实现,其利用反射机制将 unstructObj.Unstructured Content() 返回的 map[string]interface{} 类型对象转化为 DeploymentList 类型对象。
4. DiscoveryClient
RESTClient、DynamicClient、DiscoveryClient 都是面向资源对象的(例如,Deployment、Pod、CRD 等),而 DiscoveryClient 则聚焦资源,用于查看当前 Kubernetes 集群支持哪些资源组(Group)、资源版本(Version)、资源信息(Resource)。DiscoveryClient 代码示例见代码清单 2-23。
代码清单 2-23
package main import ( "fmt" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/clientcmd" ) func main() { // 加载 kubeconfig 文件,生成 config 对象 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { panic(err) } // 通过 config 实例化 DiscoveryClient 对象 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { panic(err) } // 返回 Kubernetes APIServer 所支持的资源组、资源版本和资源信息 _, APIResourceList, err := discoveryClient.ServerGroupsAndResources() if err != nil { panic(err) } // 输出所有资源信息 for _, list := range APIResourceList { gv, err := schema.ParseGroupVersion(list.GroupVersion) if err != nil { panic(err) } for _, resource := range list.APIResources { fmt.Printf("NAME: %v, GROUP: %v, VERSION: %v \n", resource. Name, gv.Group, gv.Version) } } }
运行以上代码会获得 Kubernetes APIServer 支持的 GVR 等相关信息,部分信息打印输出见代码清单 2-24。
代码清单 2-24
# 运行输出 NAME: bindings, GROUP: , VERSION: v1 NAME: componentstatuses, GROUP: , VERSION: v1 NAME: confifigmaps, GROUP: , VERSION: v1 ...
DiscoveryClient 发起请求的过程见代码清单 2-25。
NewDiscoveryClientForConfig 获取客户端对象,其中 DiscoveryClient 中封装了 RESTClient 类 型 的 客 户 端, 且 赋 值 LegacyPrefix 为 /api, 该 变 量 在 之 后 请 求 Kubernetes APIServer 时会被用到。
代码清单 2-25
func NewDiscoveryClientForConfifig(c *restclient.Confifig) (*DiscoveryClient, error) { //... client, err := restclient.UnversionedRESTClientFor(&confifig) return &DiscoveryClient{restClient: client, LegacyPrefifix: "/api"}, err }
ServerGroupsAndResources 方法中会调用 ServerGroupsAndResources 函数,该函数主要关注 ServerGroups 方法和 fetchGroupVersionResources 函数(见代码清单2-26)。
代码清单 2-26
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1. APIResourceList, error) { sgs, err := d.ServerGroups() //... groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs) //... }
ServerGroups 方 法 通 过 RESTClient 来 访 问 Kubernetes APIServer 的 /api 接 口(d.Legacy Prefix)和 /apis 接口,获得其所支持的 Group 和 Version 信息(见代码清单 2-27)。
代码清单 2-27
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) { // Get the groupVersions exposed at /api v := &metav1.APIVersions{} err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v) //... // Get the groupVersions exposed at /apis apiGroupList = &metav1.APIGroupList{} err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList) //... }
fetchGroupVersionResources 函数调用 ServerResourcesForGroupVersion 方法,同样通过 RESTClient 获取特定 Group 和 Version 中所包含的所有 Resource(见代码清单 2-28)。
代码清单 2-28
func (d *DiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (resources *metav1.APIResourceList, err error) { //... if len(d.LegacyPrefix) > 0 && groupVersion == "v1" { url.Path = d.LegacyPrefix + "/" + groupVersion } else { url.Path = "/apis/" + groupVersion } resources = &metav1.APIResourceList{ GroupVersion: groupVersion, } err = d.restClient.Get().AbsPath(url.String()).Do(context.TODO()).Into(resources) //...}