2.2.5 List-Watch 原理
List-Watch 机制是 Kubernetes 的系统消息通知机制,该机制确保了消息的实时性、顺序性和可靠性。List-Watch 由两部分组成:List 和 Watch。List 负责调用资源的 List RESTful API ,基于 HTTP 短链接实现;Watch 则调用资源的 Watch RESTful API,负责监听资源变更事件,基于 HTTP 长链接实现,也是本节重点分析的对象。
以 Deployment 资源为例,调用其 List 和 Watch 接口,结果见代码清单 2-33。
List 接口返回 Deployment 资源列表,比较简单。
代码清单 2-33
GET /apis/apps/v1/deployments ... { "kind": "DeploymentList", "apiVersion": "apps/v1", "metadata": {...} "items": [ { "metadata": {...}, "spec": {...}, "status": {...} }, ... ] }
Watch 接口返回事件(Event),这里采用 HTTP 长链接持续监听 Deployment 资源相关事件,每当有事件产生就返回一个 Event 。返回值的类型有 ADDED、MODIFIED 等,表示增加、修改等操作,Object 包含变更后最新的资源信息。
这里 Watch 接口的实现利用了 HTTP/1.1 协议的分块传输编码(Chunked Transfer Encoding),当客户端调用 Watch 接口时,Kubernetes APIServer 在 Response Header 中设置 Transfer-Encoding 的值为 chunked(见代码清单 2-34),表示采用分块传输编码,客户端收到该信息后,便和服务端保持该链接,并等待下一个数据块,即资源的事件信息。
代码清单 2-34
GET /apis/apps/v1/watch/deployments?watch=yes --- HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked ... { "type": "MODIFIED", "object": { "kind": "Deployment", "apiVersion": "apps/v1", "metadata": {...}, "spec": {...}, "status": {...} } }
维 基 百 科
HTTP 分块传输编码允许服务器为动态生成的内容维持 HTTP 持久链接。通常,持久链接需要服务器在开始发送消息体前发送 Content-Length 消息头字段,但是对于动态生成的内容来说,在内容创建完之前是不可知的。使用分块传输编码将数据分解成一系列数据块,并以一个或多个块发送,这样服务器发送数据时不需要预先知道发送内容的总的大小。
List-Watch 功能对应到 Client-go 中,就由 Reflector 组件负责实现,其本质是将 Kubernetes 中的对象资源数据存储到本地并实时更新,拥有很高的可靠性、实时性和顺序性。Reflector 首先通过 List 获取 Kubernetes 中指定类型的资源对象,基于资源对象的 ResourceVersion 信息,使用 Watch 监听该类型资源事件,从而确保事件消息的实时性,并且资源对象 ResourceVersion 的递增特性确保了消息事件的顺序性。当 Watch 监听意外断开时,Reflector 会重新 List-Watch 资源,以确保可靠性,由于使用 Watch 长链接监听替换轮询 List 来获得最新资源状态,极大减轻了 Kubernetes APIServer 的访问压力,在确保消息事件实时性的同时也保证了性能。
下面分析 Reflector 的关键实现,首先通过 NewReflector 函数创建 Reflector 对象,通过 Run 方法启动监听并处理事件,而 Run 方法中最核心的就是 List-Watch 方法,其核心逻辑分为 List、定时同步、Watch 这 3 个部分。
(1) List:调用 List 方法获取资源数据,将其转化为资源对象列表,并最终同步到 DeltaFIFO 队列中。
(2)定时同步:利用定时器定时触发 Resync 机制,将 Indexer 中的资源对象同步到 DeltaFIFO 队列中。
(3) Watch:监听环境中资源的变化,并调用相应事件处理函数进行处理。
核心代码分析见代码清单 2-35。
代码清单 2-35
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { //... if err := func() error { //... go func() { //... // 如果 listerWatcher 支持,则尝试以块的形式收集列表 // 如果 listerWatcher 不支持,则尝试第一个列表请求返回完整的响应 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime. Object, error) { return r.listerWatcher.List(opts) })) //... // 返回完整列表 list, err = pager.List(context.Background(), options) }() //... // 获取资源版本号 resourceVersion = listMetaInterface.GetResourceVersion() // 将资源数据转换为资源对象列表 items, err := meta.ExtractList(list) // 将资源信息存储到 DeltaFIFO 中 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } //... }(); err != nil { return err } go func() { // 返回重新同步的定时通道,里面有计时器 resyncCh, cleanup := r.resyncChan() //... for { //... if r.ShouldResync == nil || r.ShouldResync() { // Resync 机制会将 Indexer 本地存储的资源对象同步到 DeltaFIFO 中 if err := r.store.Resync(); err != nil { //... } } // 重新启用定时器定时触发 resyncCh, cleanup = r.resyncChan() } }() for { ... // 监听资源变化 w, err := r.listerWatcher.Watch(options) // 处理监听到的各类事件,并调用预先注册的 Add、Delete、Update 函数进行处理 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); ... } }
其中, watchHandler 中设置了事件处理函数,从 ResultChan() 方法返回的 Channel 中获取事件,并根据事件类型(event.Type)将事件分发给对应的处理函数,这里处理函数的逻辑都是将事件同步到 DeltaFIFO 队列中,具体见代码清单 2-36。
代码清单 2-36
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { //... for { select { //... case event, ok := <-w.ResultChan(): //... switch event.Type { case watch.Added: err := r.store.Add(event.Object) case watch.Modified: err := r.store.Update(event.Object) case watch.Deleted: err := r.store.Delete(event.Object) } //... } } //... }