2.2.5 List-Watch原理
List-Watch机制是 Kubernetes 的系统消息通知机制,该机制确保了消息的实时性、顺序性和可靠性。List-Watch由两部分组成:List和 Watch。List负责调用资源的 ListRESTfulAPI,基于 HTTP 短链接实现;Watch 则调用资源的 WatchRESTfulAPI,负责监听资源变更事件,基于HTTP长链接实现,也是本节重点分析的对象。
以 Deployment资源为例,调用其List和 Watch接口,结果见代码清单 2-33。List接口返回 Deployment资源列表,比较简单。
GET/apis/apps/v1/deployments
...
{
"kind":"DeploymentList","apiVersion":"apps/v1","metadata":{...}
"items":[
{
"metadata":{...},
"spec":{...},
"status":{...}
},
...
]
}
Watch接口返回事件(Event),这里采用HTTP长链接持续监听Deployment资源相关事件,每当有事件产生就返回一个Event。返回值的类型有 ADDED、MODIFIED等,表示增加、修改等操作,Object包含变更后最新的资源信息。
这里 Watch 接口的实现利用了 HTTP/1.1 协议的分块传输编码(ChunkedTransferEncoding),当客户端调用Watch接口时,KubernetesAPIServer在ResponseHeader中设置Transfer-Encoding的值为chunked(见代码清单2-34),表示采用分块传输编码,客户端收到该信息后,便和服务端保持该链接,并等待下一个数据块,即资源的事件信息。
GET/apis/apps/v1/watch/deployments?watch=yes
---
HTTP/1.1200OK
Content-Type:application/jsonTransfer-Encoding:chunked
...
{
"type":"MODIFIED",
"object":{
"kind":"Deployment","apiVersion":"apps/v1","metadata":{...},
"spec":{...},
"status":{...}
}
}
List-Watch功能对应到Client-go中,就由 Reflector 组件负责实现,其本质是将Kubernetes 中的对象资源数据存储到本地并实时更新,拥有很高的可靠性、实时性和顺序性。Reflector首先通过 List获取 Kubernetes 中指定类型的资源对象,基于资源对象的 ResourceVersion信息,使用 Watch 监听该类型资源事件,从而确保事件消息的实时性,并且资源对象 ResourceVersion的递增特性确保了消息事件的顺序性。当Watch监听意外断开时,Reflector会重新 List-Watch 资源,以确保可靠性,由于使用 Watch长链接监听替换轮询 List来获得最新资源状态,极大减轻了 KubernetesAPIServer的访问压力,在确保消息事件实时性的同时也保证了性能。
下面分析 Reflector的关键实现,首先通过NewReflector函数创建 Reflector对象,通过 Run方法启动监听并处理事件,而Run方法中最核心的就是 List-Watch方法,其核心逻辑分为 List、定时同步、Watch这 3个部分。
(1) List:调用 List方法获取资源数据,将其转化为资源对象列表,并最终同步到DeltaFIFO队列中。
(2) 定时同步:利用定时器定时触发 Resync机制,将 Indexer 中的资源对象同步到DeltaFIFO 队列中。
(3) Watch:监听环境中资源的变化,并调用相应事件处理函数进行处理。核心代码分析见代码清单 2-35。
func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{
//...
iferr:=func()error{
//...
gofunc(){
//...
//如果 listerWatcher⽀持,则尝试以块的形式收集列表
//如果listerWatcher不⽀持,则尝试第⼀个列表请求返回完整的响应
pager:=pager.New(pager.SimplePageFunc(func(optsmetav1.ListOptions)(runtime.
Object,error){
returnr.listerWatcher.List(opts)
}))
//...
//返回完整列表
list,err=pager.List(context.Background(),options)
}()
//...
//获取资源版本号
resourceVersion=listMetaInterface.GetResourceVersion()
//将资源数据转换为资源对象列表
items,err:=meta.ExtractList(list)
//将资源信息存储到DeltaFIFO中
iferr:=r.syncWith(items,resourceVersion);err!=nil{returnfmt.Errorf("unabletosynclistresult:%v",err)
}
//...
}();err!=nil{returnerr
}
gofunc(){
//返回重新同步的定时通道,⾥⾯有计时器
resyncCh,cleanup:=r.resyncChan()
//...for{
//...
ifr.ShouldResync==nil||r.ShouldResync(){
//Resync机制会将Indexer本地存储的资源对象同步到DeltaFIFO中
iferr:=r.store.Resync();err!=nil{
//...
}
}
}
}()
//重新启⽤定时器定时触发
resyncCh,cleanup=r.resyncChan()
for{
...
//监听资源变化
w,err:=r.listerWatcher.Watch(options)
//处理监听到的各类事件,并调⽤预先注册的 Add、Delete、Update函数进⾏处理
iferr:=r.watchHandler(start,w,&resourceVersion,resyncerrc,stopCh);
...
}
}
其中,watchHandler 中设置了事件处理函数,从 ResultChan()方法返回的 Channel中获取事件,并根据事件类型(event.Type)将事件分发给对应的处理函数,这里处理函数的逻辑都是将事件同步到 DeltaFIFO队列中,具体见代码清单 2-36。
func(r*Reflector)watchHandler(starttime.Time,wwatch.Interface,resourceVersion
*string,errcchanerror,stopCh<-chanstruct{})error{
//...for{
select{
//...
caseevent,ok:=<-w.ResultChan():
//...
switchevent.Type{
casewatch.Added:
err:=r.store.Add(event.Object)
casewatch.Modified:
err:=r.store.Update(event.Object)casewatch.Deleted:
err:=r.store.Delete(event.Object)
}
//...
}
}
//...
}