1. Client-goIndexer
资源对象从 DeltaFIFO中Pop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndexInformer说起。注意,在 sharedIndexInformer的 Run 方法中,初始化了它的配置,并执行了 s.controller.Run方法。我们可以看到s.controller.Run中初始化了 Reflector,开始了指定资源的List-Watch 操作,并且同步到了DeltaFIFO中,同时执行了processLoop方法。此时我们可以看到 processLoop方法不断从DeltaFIFO中将资源对象 Pop出来, 并且交给了之前的 c.config.Process方法进行处理。而c.config.Process方法就是sharedIndexInformer的 HandleDeltas方法,具体见代码清单 2-45。
func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){
...
cfg:=&Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,
}
func(){
s.startedLock.Lock()
defers.startedLock.Unlock()
...
}()
s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true
s.controller.Run(stopCh)
}
func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()
gofunc(){
<-stopCh
c.config.Queue.Close()
}()
r:=NewReflector(
c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,
)
r.ShouldResync=c.config.ShouldResync
r.clock=c.clock
...
}
c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()
wait.Until(c.processLoop,time.Second,stopCh)
func(c*controller)processLoop(){for{
obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))
iferr!=nil{
iferr==FIFOClosedError{
return
}
ifc.config.RetryOnError{
//Thisisthesafewaytore-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
综上可知,由 DeltaFIFO中Pop出来的对象最后交给了 HandleDeltas进行处理,而在 HandleDeltas中,将资源对象同步到了 Indexer中,至此我们引出了 Informer模块中的第 3个组件 Indexer。Indexer是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource的对象。Reflector通过 DeltaFIFOQueue将资源对象存储到Indexer中。需要注意的是,Indexer中的数据与 ETCD中的数据是完全一致的,当 Client-go需要数据时,无须每次都从 APIServer中获取,从而减轻了请求过多造成的对 APIServer的压力, 具体见代码清单 2-46。
func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{
s.blockDeltas.Lock()
defers.blockDeltas.Unlock()
//fromoldesttonewest
for_,d:=rangeobj.(Deltas){switchd.Type{
caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)
ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&
exists{
if err:=s.indexer.Update(d.Object);err!=nil{returnerr
}
requestedresync
nil{
==nil{
isSync:=falseswitch{
cased.Type==Sync:
//Synceventsareonlypropagatedtolistenersthat
isSync=true
cased.Type==Replaced:
ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err
//Replacedeventsthatdidn'tchange
resourceVersionaretreatedasresyncevents
//andonlypropagatedtolisteners
thatrequestedresync
==oldAccessor.GetResourceVersion()
}
isSync=accessor.GetResourceVersion()
}
}
s.processor.distribute(updateNotification{oldObj:old,
newObj:d.Object},isSync)
}else{
if err:=s.indexer.Add(d.Object);err!=nil{returnerr
false)
}
s.processor.distribute(addNotification{newObj:d.Object},
}
caseDeleted:
iferr:=s.indexer.Delete(d.Object);err!=nil{
returnerr
}
s.processor.distribute(deleteNotification{oldObj:d.Object},false)
}
}
returnnil
}
Indexer 是如何实现存储并快速查找资源的呢?我们先看一下 Indexer接口提供的功能。Cache是 Indexer的一种非常经典的实现,所有的对象缓存在内存中,而且从Cache 这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。 这里的 Store、Indexer使用了一个 threadSafeMap来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap继承了 Store接口,而 Indexer扩展了threadSafeMap, 为 threadSafeMap提供了索引操作。threadSafeMap其实只能够存储和索引。存储即将runtime.object存储到 Items的 Map中;索引即为Items的 Map建立三层索引:IndicesMap类型索引(如 namespace、nodeName等);IndexMap 类型索引(如 namespace1、namespace2……);runtime.object类型索引,实现见代码清 单2-47。
typeIndexerinterface{Store
//indexName索引类,obj是对象,计算obj在indexName索引类中的索引键,通过索引键
获取所有的对象
//基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键
Index(indexNamestring,objinterface{})([]interface{},error)
//indexKey是 indexName索引类中的⼀个索引键,函数返回indexKey指定的所有对象键
IndexKeys(indexName,indexedValuestring)([]string,error)
//获取indexName索引类中的所有索引键
ListIndexFuncValues(indexNamestring)[]string
//这个函数和 Index类似,只是返回值不是对象键,⽽是所有对象
ByIndex(indexName,indexedValuestring)([]interface{},error)
//返回Indexers
GetIndexers()Indexers
//添加Indexers,就是增加更多的索引分类
AddIndexers(newIndexersIndexers)error
}
在Kubernetes中使用的比较多的索引函数是MetaNamespaceIndexFunc(() 代码位置:
client-go/tools/cache/index.go),Indexer索引的实现是通过index.ByIndex来完成的, index.ByIndex的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。
func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()
deferc.lock.RUnlock()
indexFunc:=c.indexers[indexName]ifindexFunc==nil{
returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)
}
index:=c.indices[indexName]set:=index[indexKey]
list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){
list=append(list,c.items[key])
}
returnlist,nil
}
上述方法接收两个参数:indexName(索引器的名称)和indexedValue(需要索引的 Key)。首先根据索引器名称查找指定的索引器函数(c.indexers[indexName]);然后根据索引器名称查找相应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key
(indexedValue)从缓存中进行数据查询,并返回查询结果。