2.2.6 Client-go Informer 解析
1. Client-goInformer模块
Informer可以对 KubernetesAPIServer的资源执行 Watch操作 , 类型可以是Kubernetes内置资源,也可以是 CRD。其中最核心的模块是 Reflector、DeltaFIFO、Indexer。接下来我们逐个进行分析。
首先分析 Reflector,Reflector 用于监控指定资源的 Kubernetes。当资源发生变化时,如发生了资源添加(Added)、资源更新(Updated)等事件,Reflector会将其资源对象存放在本地缓存 DeltaFIFO 中。它的作用就是获取 APIServer中对象数据并实时地更新到本地,使得本地数据和ETCD数据完全一样。它的数据结构见代码清单2-37。
typeReflectorstruct{
namestring//这个 Reflector的名称,默认为⽂件 : ⾏数metrics*reflectorMetrics//⽤于保存 Reflector的⼀些监控指标expectedTypereflect.Type//期望放到 Store中的类型名称storeStore//与 Watch源同步的⽬标Store
listerWatcherListerWatcher//ListerWatcher接⼝,⽤于指定 List-Watch⽅法
period time.Duration//Watch周期resyncPeriodtime.Duration//重新同步周期ShouldResyncfunc() bool
//clockallowsteststomanipulatetimeclockclock.Clock
lastSyncResourceVersionstring//最后同步的资源的版本号
lastSyncResourceVersionMutexsync.RWMutex//lastSyncResourceVersion的读写锁
}
通过 NewRefector实例化 Reflector对象,实例化过程中必须传入 ListerWatcher数据接口对象,它拥有List和 Watch方法,用于获取及监控资源列表,只要是实现了 List和 Watch方法的对象都可以成为 ListerWatcher,Reflector对象通过 run函数启动监控并处理事件,而在 Reflector源码实现中最主要的是List-Watch函数,它负责 List/Watch指定的 KubernetesAPIServer资源,见代码清单 2-38。
//NewNamedReflectorsameasNewReflector,butwithaspecifiednameforloggingfunc NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},storeStore,resyncPeriodtime.Duration)*Reflector{
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)r:=&Reflector{
name:name,
//weneedthistobeuniqueperprocess(somenamesarestillthesame)butobviouswhoitbelongsto
metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.
Sprintf("reflector_"+name+"_%d",reflectorSuffix))),
listerWatcher:lw,store:store,
expectedType:reflect.TypeOf(expectedType),
period: time.Second,resyncPeriod:resyncPeriod,clock:&clock.RealClock{},
}
returnr
}
List-Watch是怎么实现的? List-Watch主要分为 List和 Watch两部分。List负责获取对应资源的全量列表,Watch负责获取变化的部分。首先进行 List操作,这里把ResourceVersion设置为 0,因为要获取同步的对象的全部版本,所以从 0开始 List,主要流程如下(见代码清单2-39)。
(1)r.listerWatcher.List 用于获取资源下的所有对象的数据。
(2) listMetaInterface.GetResourceVersion 用于获取资源版本号(ResouceVersion),资源版本号非常重要,Kubernetes中所有的资源都拥有该字段,它标识当前资源对象的版本号。每次修改当前资源对象时,KubernetesAPIServer都会更改 ResouceVersion,使得 Client-go执行 Watch操作时可以根据 ResourceVersion 来确定当前资源对象是否发生过变化。
(3) meta.ExtractList用于将资源数据转换成资源对象列表,将runtime.Object转换成[]runtime.Object,因为 r.listerWatcher.List只是获取一个列表。
(4)r.syncWith 用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并替换已存在的对象。
(5)r.setLastSyncResourceVersion 用于设置最新的资源版本号。
func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{glog.V(3).Infof("Listingandwatching%vfrom%s",r.expectedType,r.name)varresourceVersionstring
options:=metav1.ListOptions{ResourceVersion:"0"}r.metrics.numberOfLists.Inc()
start:=r.clock.Now()
list,err:=r.listerWatcher.List(options)iferr!=nil{
returnfmt.Errorf("%s:Failedtolist%v:%v",r.name,r.expectedType,
err)
}
r.metrics.listDuration.Observe(time.Since(start).Seconds())listMetaInterface,err:=meta.ListAccessor(list)
iferr!=nil{
returnfmt.Errorf("%s:Unabletounderstandlistresult%#v:%v",
r.name,list,err)
}
resourceVersion=listMetaInterface.GetResourceVersion()items,err:=meta.ExtractList(list)
iferr!=nil{
returnfmt.Errorf("%s:Unabletounderstandlistresult%#v(%v)",r.name,list,err)
}
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
iferr:=r.syncWith(items,resourceVersion);err!=nil{
returnfmt.Errorf("%s:Unabletosynclistresult:%v",r.name,err)
}
r.setLastSyncResourceVersion(resourceVersion)
resyncerrc:=make(chanerror,1)
cancelCh:=make(chanstruct{})deferclose(cancelCh)
gofunc(){
resyncCh,cleanup:=r.resyncChan()deferfunc(){
cleanup()//Callthelastonewrittenintocleanup
}()
for{
select{
case<-resyncCh:case<-stopCh:
returncase<-cancelCh:
return
}
ifr.ShouldResync==nil||r.ShouldResync(){glog.V(4).Infof("%s:forcingresync",r.name)iferr:=r.store.Resync();err!=nil{
resyncerrc<-errreturn
}
}
}()
for{
cleanup()
resyncCh,cleanup=r.resyncChan()
}
//givethestopChachancetostoptheloop,evenincaseofcontinue
statementsfurtherdownonerrorsselect{
case<-stopCh:
returnnildefault:
}
timeoutSeconds:=int64(minWatchTimeout.Seconds()* (rand.Float64()+
1.0))
options=metav1.ListOptions{ResourceVersion:resourceVersion,TimeoutSeconds:&timeoutSeconds,
}
r.metrics.numberOfWatches.Inc()
w,err:=r.listerWatcher.Watch(options)
iferr!=nil{
switcherr{
caseio.EOF:
//watchclosednormally
caseio.ErrUnexpectedEOF:
glog.V(1).Infof("%s:Watchfor%vclosedwithunexpected
EOF:%v",r.name, r.expectedType, err)
default:
utilruntime.HandleError(fmt.Errorf("%s:Failedtowatch
%v:%v",r.name,r.expectedType,err))
}
ifurlError,ok:=err.(*url.Error);ok{
ifopError,ok:=urlError.Err.(*net.OpError);ok{
iferrno,ok:=opError.Err.(syscall.Errno);ok&&
errno==syscall.ECONNREFUSED{
time.Sleep(time.Second)continue
}
}
}
returnnil
}
err!=nil{
iferr:=r.watchHandler(w,&resourceVersion,resyncerrc,stopCh);
iferr!=errorStopRequested{
glog.Warningf("%s:watchof%vendedwith:%v",r.name,
r.expectedType,err)
}
returnnil
}
}
}