带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)-阿里云开发者社区

开发者社区> 人民邮电出版社> 正文

带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十二)
+关注继续查看

1. Client-goIndexer

 

资源对象从 DeltaFIFOPop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndexInformer说起。注意,在 sharedIndexInformer的 Run 方法中,初始化了它的配置,并执行了 s.controller.Run方法。我们可以看到s.controller.Run中初始化了 Reflector,开始了指定资源的List-Watch 操作,并且同步到了DeltaFIFO中,同时执行了processLoop方法。此时我们可以看到 processLoop方法不断从DeltaFIFO中将资源对象 Pop出来, 并且交给了之前的 c.config.Process方法进行处理。而c.config.Process方法就是sharedIndexInformer的 HandleDeltas方法,具体见代码清单 2-45。



func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){

...

cfg:=&Config{

Queue:              fifo,

ListerWatcher:             s.listerWatcher,ObjectType:          s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:                 false,

ShouldResync:       s.processor.shouldResync,

 

Process:         s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,

}

 

func(){

s.startedLock.Lock()

defers.startedLock.Unlock()

 

...

}()


s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true

s.controller.Run(stopCh)

}

 

func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()

gofunc(){

<-stopCh

c.config.Queue.Close()

}()

r:=NewReflector(


c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,

)

r.ShouldResync=c.config.ShouldResync

r.clock=c.clock

...

}

c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()

 

wait.Until(c.processLoop,time.Second,stopCh)

func(c*controller)processLoop(){for{

obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))

iferr!=nil{

iferr==FIFOClosedError{

return

}

ifc.config.RetryOnError{

//Thisisthesafewaytore-enqueue.

c.config.Queue.AddIfNotPresent(obj)

}

}

}

}

 

 

综上可知,由 DeltaFIFO中Pop出来的对象最后交给了 HandleDeltas进行处理,而在 HandleDeltas中,将资源对象同步到了 Indexer中,至此我们引出了 Informer模块中的第 3个组件 Indexer。Indexer是 Client-go 中实现的一个本地存储,它可以建立索引并存储 Resource的对象。Reflector通过 DeltaFIFOQueue将资源对象存储到Indexer中。需要注意的是,Indexer中的数据与 ETCD中的数据是完全一致的,当 Client-go需要数据时,无须每次都从 APIServer中获取,从而减轻了请求过多造成的对 APIServer的压力, 具体见代码清单 2-46。

func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{

s.blockDeltas.Lock()

defers.blockDeltas.Unlock()

 

//fromoldesttonewest

for_,d:=rangeobj.(Deltas){switchd.Type{

caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)

ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&

exists{

if err:=s.indexer.Update(d.Object);err!=nil{returnerr


}

 

requestedresync 

nil{

 

==nil{


isSync:=falseswitch{

cased.Type==Sync:

//Synceventsareonlypropagatedtolistenersthat

 

isSync=true

cased.Type==Replaced:

ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err

//Replacedeventsthatdidn'tchange


resourceVersionaretreatedasresyncevents

//andonlypropagatedtolisteners


thatrequestedresync

 

==oldAccessor.GetResourceVersion()

}


isSync=accessor.GetResourceVersion()

}

}

s.processor.distribute(updateNotification{oldObj:old,

newObj:d.Object},isSync)

}else{

if err:=s.indexer.Add(d.Object);err!=nil{returnerr

 

false)

}

s.processor.distribute(addNotification{newObj:d.Object},

 

}

caseDeleted:

iferr:=s.indexer.Delete(d.Object);err!=nil{


 

returnerr

}

s.processor.distribute(deleteNotification{oldObj:d.Object},false)

}

}

returnnil

}

 

Indexer   是如何实现存储并快速查找资源的呢?我们先看一下 Indexer接口提供的功能。Cache是 Indexer的一种非常经典的实现,所有的对象缓存在内存中,而且从Cache 这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。 这里的 Store、Indexer使用了一个 threadSafeMap来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap继承了 Store接口,而 Indexer扩展了threadSafeMap, 为 threadSafeMap提供了索引操作。threadSafeMap其实只能够存储和索引。存储即将runtime.object存储到 Items的 Map中;索引即为Items的 Map建立三层索引:IndicesMap类型索引(如 namespace、nodeName等);IndexMap 类型索引(如 namespace1、namespace2……);runtime.object类型索引,实现见代码清 单2-47。


typeIndexerinterface{Store

//indexName索引类,obj是对象,计算objindexName索引类中的索引键,通过索引键

获取所有的对象

//基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键

Index(indexNamestring,objinterface{})([]interface{},error)

//indexKey是 indexName索引类中的⼀个索引键,函数返回indexKey指定的所有对象键

IndexKeys(indexName,indexedValuestring)([]string,error)

//获取indexName索引类中的所有索引键

ListIndexFuncValues(indexNamestring)[]string

//这个函数和 Index类似,只是返回值不是对象键,⽽是所有对象

ByIndex(indexName,indexedValuestring)([]interface{},error)

//返回Indexers

GetIndexers()Indexers

//添加Indexers,就是增加更多的索引分类

AddIndexers(newIndexersIndexers)error

}

 

Kubernetes中使用的比较多的索引函数是MetaNamespaceIndexFunc() 代码位置:

 

client-go/tools/cache/index.go),Indexer索引的实现是通过index.ByIndex来完成的, index.ByIndex的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。



func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()

deferc.lock.RUnlock()

 

indexFunc:=c.indexers[indexName]ifindexFunc==nil{

returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)

}

index:=c.indices[indexName]set:=index[indexKey]

list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){

list=append(list,c.items[key])

}

 

returnlist,nil


}

 

上述方法接收两个参数:indexName(索引器的名称)indexedValue需要索引的 Key。首先根据索引器名称查找指定的索引器函数c.indexers[indexName]);然后根据索引器名称查找相应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key

(indexedValue)从缓存中进行数据查询,并返回查询结果。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
使用OWASP Top Ten保证Web应用程序的安全
 http://info.52z.com/html/14014.html
562 0
KubeMeet 杭州站报名:「云原生应用管理」开发者专场来啦!
4月17日杭州,云原生基金会CNCF和阿里巴巴联合主办的「KubeMeet 开发者沙龙·云原生应用管理专场」来啦!这里有Kubernetes 生态开发者都在关注的开源项目,以及阿里巴巴、携程、第四范式的一线云原生落地实践。赶紧报名吧!
777 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
7575 0
Elasticsearch Top5典型应用场景
题记 刚接触Elasticsearch的朋友,或多或少会遇到一个问题,Elasticsearch在实际公司应用中除了搜索到底能做什么? 本文给出了答案。 除了“You Know, for Search”,Elasticsearch的使用会不断增长和变化。ObjectRocket作为一家托管云计算公司,已经在ObjectRocket平台上提供托管Elasticsearch一段时间了,并且能够看到我们客户之间的一些明确趋势以及他们如何使用该产品。以下是我们在平台上看到的Top5场景用例:
7 0
472
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载