介绍
在K8s世界,CRD就像是api定义,CRD配套的Operator则是对应的api实现,在系统迭代过程中api会不断的发展,同样的,CRD也会不断的发展,v1alpha1 -> v1alpha2 -> v1beta1 -> v1 -> v2alpha2...,如何在K8s里面让用户轻易得地从低版本升级到高版本是一个十分通用的问题,而正好K8s CRD提供了引入并升级到新版本的工作流,本文将深入介绍CRD的多版本机制以及升级流程,希望对任何遇到CRD升级的同学能有一定借鉴作用。
CRD的多版本
例子
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: pizzas.restaurant.programming-kubernetes.info
spec:
group: restaurant.programming-kubernetes.info
names:
kind: Pizza
listKind: PizzaList
plural: pizzas
singular: pizza
scope: Namespaced
version: v1alpha1
versions:
- name: v1alpha1
served: true
storage: true
schema:
...
- name: v1beta1
served: true
storage: false
schema:
...
如上图,Group为restaurant.programming-kubernetes.info,Kind为Pizza的CRD具有两个版本:
- v1alpha1
- v1beta1
在实际开发中,初始化CRD只有一个版本,等到需要升级的时候再新增version即可,为了说明多版本,这里直接假设v1alpha1之后新增v1beta1版本,可以看到每个版本的基本结构为:
// CustomResourceDefinitionVersion describes a version for CRD.
type CustomResourceDefinitionVersion struct {
// name is the version name, e.g. “v1”, “v2beta1”, etc.
// The custom resources are served under this version at `/apis/<group>/<version>/...` if `served` is true.
Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
Served bool `json:"served" protobuf:"varint,2,opt,name=served"`
Storage bool `json:"storage" protobuf:"varint,3,opt,name=storage"`
...
Schema *CustomResourceValidation `json:"schema,omitempty" protobuf:"bytes,4,opt,name=schema"`
...
}
serve version
服务版本,当某个版本v的served为true的时候,/apis/<group>/<v>路径是有效的,否则无效,通过控制served字段,我们可以很简单地控制某个版本是否可以读写,可以有多个版本的served均为true。
storage version
存储版本,当某个版本v的storage为true的时候,说明etcd里面存储的资源版本为v,只能有1个版本的storage为true。1个存储版本,多个服务版本就意味着存储版本和服务版本之间需要做转换,具体转换的机制如何,请继续往下看。
K8s APIExtensionServer工作原理
K8s apiserver相信很多同学都不陌生,所有的K8s组件都与K8s apiserver交互,只有K8s apiserver可以与etcd交互,apiserver是一个restful web server,所有kubectl/client-go客户端的Create,Update,Delete,Get...请求的本质都是发送GET/POST/PUT...restful请求到apiserver,默认情况下apiserver是只认识内置的Deployment/StatefulSet/Service这些类型的资源,为了支持CRD定义的自定义资源引入了APIExtensionServer,在apiserver启动代码里面可以看到:
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
APIExtensionServer干的事情是引入了自定义资源的request handler:
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
这样一来任意一个不是内置类型的资源请求都会走到该handler,该handler负责自定义资源的读写,每个自定义资源的读写是由Storage对象完成:
storages[v.Name] = customresource.NewStorage(
resource.GroupResource(),
kind,
schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.ListKind},
customresource.NewStrategy(
typer,
crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
kind,
validator,
statusValidator,
structuralSchemas,
statusSpec,
scaleSpec,
),
crdConversionRESTOptionsGetter{
RESTOptionsGetter: r.restOptionsGetter,
converter: safeConverter,
decoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
encoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: storageVersion},
structuralSchemas: structuralSchemas,
structuralSchemaGK: kind.GroupKind(),
preserveUnknownFields: crd.Spec.PreserveUnknownFields,
},
crd.Status.AcceptedNames.Categories,
table,
)
可以看到crdConversionRESTOptionsGetter定义了资源读写的版本转换,具体来说当request里面的version与encoderVersion不一致时,就会进行转换:
// Perform a conversion if necessary
out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion)
if err != nil {
return err
}
转换的逻辑通常使用webhook来自定义,k8s里面有一个sample conversion webhook:
func (c *webhookConverter) Convert(in runtime.Object, toGV schema.GroupVersion) (runtime.Object, error) {
listObj, isList := in.(*unstructured.UnstructuredList)
requestUID := uuid.NewUUID()
desiredAPIVersion := toGV.String()
objectsToConvert := getObjectsToConvert(in, desiredAPIVersion)
request, response, err := createConversionReviewObjects(c.conversionReviewVersions, objectsToConvert, desiredAPIVersion, requestUID)
if err != nil {
return nil, err
}
...
convertedObjects, err := getConvertedObjectsFromResponse(requestUID, response)
if err != nil {
return nil, fmt.Errorf("conversion webhook for %v failed: %v", in.GetObjectKind().GroupVersionKind(), err)
}
if len(convertedObjects) != len(objectsToConvert) {
return nil, fmt.Errorf("conversion webhook for %v returned %d objects, expected %d", in.GetObjectKind().GroupVersionKind(), len(convertedObjects), len(objectsToConvert))
}
if isList {
// start a deepcopy of the input and fill in the converted objects from the response at the right spots.
// The response list might be sparse because objects had the right version already.
convertedList := listObj.DeepCopy()
...
convertedList.SetAPIVersion(toGV.String())
return convertedList, nil
}
if len(convertedObjects) != 1 {
// This should not happened
return nil, fmt.Errorf("conversion webhook for %v failed, no objects returned", in.GetObjectKind())
}
converted, err := getRawExtensionObject(convertedObjects[0])
...
return converted, nil
}
该webhook可以在CRD里面定义:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: pizzas.restaurant.programming-kubernetes.info
spec:
group: restaurant.programming-kubernetes.info
names:
kind: Pizza
listKind: PizzaList
plural: pizzas
singular: pizza
scope: Namespaced
version: v1alpha1
versions:
- name: v1alpha1
served: true
storage: true
...
- name: v1beta1
served: true
storage: false
...
preserveUnknownFields: false
conversion:
strategy: Webhook
webhookClientConfig:
caBundle: <CA>
service:
namespace: pizza-crd
name: webhook
path: /convert/v1beta1/pizza
整体的流程看起来就是:
一句话总结,当写自定义资源时,该资源会持久化为storage version,如果请求版本与storage version不一致会做转换;当读自定义资源时,如果请求版本与storage version不同,也会做转换。注意,如果storage version变了,底层etcd里面的资源版本不会自动改变,只有重新写才会改变。
CRD的版本升级机制
增加新版本
- spec.versions里面新增版本,并将其served置为true,storage置为false;
- 选定转换策略并部署conversion webhook;
- 配置spec.conversion.webhookClientConfig到conversion webhook;
当新版增加之后,新老两个版本都可以并行使用,对用户没有任何影响,要达到这样的状态意味着你的conversion需要做v1alpha1 -> v1beta1以及v1beta1 -> v1alpha1的双向支持,如果有N个版本,转换的可能性为N * (N-1),因此我建议尽量同时支持最多不超过3个版本,与此同时,如果版本没有对外开放,可以只做v1alpha1 -> v1beta1一个方向的转化,一把迁移过来。
迁移到最新的存储版本
- spec.versions里面将新版的storage置为true,老版本的storage置为false;
- 写一个migrator,将所有资源读1遍写1遍,这样自动写到最新的storage version了;
- 将旧版本从status.storedVersions去除;
下线旧版本
- 确保所有的客户端都升级到新版本,可以通过审计日志确定;
- spec.versions将旧版本的served置为false,这一步可以几个小时甚至数天,有问题可以置为true回滚;
- 确保存储版本已经升级到最新;
- spec.versions删除旧版本;
- 下掉conversion hook;
参考
文档
- https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definition-versioning/
- https://github.com/kubernetes/kubernetes/tree/v1.15.0/test/images/crd-conversion-webhook
- https://www.openshift.com/blog/kubernetes-deep-dive-api-server-part-1
- https://www.openshift.com/blog/kubernetes-deep-dive-api-server-part-2
- https://www.openshift.com/blog/kubernetes-deep-dive-api-server-part-3a
- https://github.com/jamiehannaford/what-happens-when-k8s