带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)-阿里云开发者社区

开发者社区> 人民邮电出版社> 正文

带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十一)
+关注继续查看

Watch操作通过 HTTP与 KubernetesAPIServer建立长链接, 接收 KubernetesAPIServer发来的变更时间,Watch操作的实现机制使用的是 HTTP的分块传输编码。当 Client-go调 用 KubernetesAPIServer 时, 在 Response的 HTTPHeader 中 设 置Transfer-Encoding的值为 Chunked。r.listerWatcher.Watch实际调用了 PodInformer的 watchfunc函数。通过 ClientSet客户端与 APIServer 建立长链接,监控指定资源的变更事件。r.watchHandler用于处理资源的变更时间,当初发增删改AddedUpdated等事件时,将对应的资源对象更新到本地缓存 DeltaFIFO中,并更新 ResouceVersion。至此实现了 Reflctor组件的功能。

 

1. Client-goDeltaFIFO


DeltaFIFO是一个 FIFO队列,记录了资源对象的变化过程。作为一个 FIFO队列,它的生产者就是 Reflector组件,前面讲过 Reflector将监听对象同步到 DeltaFIFO中,DeltaFIFO对这些资源对象做了什么,见代码清单2-40。

typeDeltaFIFOstruct{

locksync.RWMutex

condsync.Cond//条件变量,唤醒等待的协程

itemsmap[string]Deltas//Delta存储桶

queue[]string//队列存储对象键实际就是和items⼀起形成了⼀个有序Map

//true通过Replace() 第⼀批元素被插⼊队列或者Delete/Add/Update⾸次被调⽤

populatedbool

//Replace() 被⾸次调⽤时插⼊的元素数⽬

initialPopulationCountint

 

//函数计算元素Key

keyFuncKeyFunc

 

//列出已知的对象

knownObjectsKeyListerGetter

//队列是否被关闭,关闭互斥锁

closedboolclosedLocksync.Mutex


}

FIFO接收 Reflector的 Adds/Updates 添加和更新事件,并将它们按照顺序放入队列。元素在队列中被处理之前,如果有多个Adds/Updates 事件发生,事件只会被处理一次。

使用场景(1仅处理对象一次(2处理完当前事件后才能处理最新版本的对象;(3删除对象之后不会处理(4)不能周期性重新处理对象。这里的Delta对象就是 Kubernetes系统中对象的变化。Delta有 Type和 Object两个属性,DeltaType就是资源变化的类型, 比如 Add、Update等,DeltaObject就是具体的 Kubernetes资源对象,见代码清单 2-41。例如,此时 Reflector中监听了一个PodA的 Add事件,那么此时 DeltaType就是Added,DeltaObject就是 PodA,DeltaFIFO中的数据是什么样的呢?此时 Items中会有 Add类型的 Delta,Queue中也会有这个事件的 Key。这个 Key由 KeyFunc生成。Client-go中默认的 KeyFunc是 MetaNamespaceKeyFunc,可以在tools/cache/store.go:76中找到。由 MetaNamespaceKeyFunc生成的 Key格式为/,用来标识不同命名空间下的不同资源。

typeDeltastruct{

Type    DeltaType                   //Delta类型,⽐如增、减,后⾯有详细说明

Objectinterface{}                  //对象,Delta的粒度是⼀个对象

}

typeDeltaTypestring                  //Delta的类型⽤字符串表达

const(

AddedDeltaType= "Added" //增加UpdatedDeltaType= "Updated" //更新DeletedDeltaType= "Deleted" //删除SyncDeltaType= "Sync" //同步

)

typeDeltas[]Delta                    //Delta数组

 


既然 DeltaFIFO是一个 FIFO,那么它就应该有基本的 FIFO功能,这里 DeltaFIFO实现了 Queue接口。下面看一下 Queue接口功能的定义。我们可以看出 Queue扩展了Store接口的功能,附加了 Pop、AddIfNotPresent、HasSynced、Close方法。Store是一个通用的对象存储和处理的接口,本身提供了 Add、Update、List、Get等方法,Queue接口增加了 Pop方法,实现了一个基本 FIFO队列,具体见代码清单 2-42。

typeQueueinterface{Store

Pop(PopProcessFunc)(interface{},error)AddIfNotPresent(interface{})errorHasSynced()bool

Close()

}

 

下面我们来看一下FIFO队列的基本功能是怎么实现的。首先是 Add方法,我们可以看到 Add方法会先根据 KeyFunc计算出对象的 Key,如果队列中没有这个对象,我们就在这个队列尾部增补对象的Key,并且将这个对象存入 Map,具体见代码清2-43

func(f*FIFO)Add(objinterface{})error{id,err:=f.keyFunc(obj)

iferr!=nil{

returnKeyError{obj,err}

}

f.lock.Lock()

deferf.lock.Unlock()f.populated=true

if _,exists:=f.items[id];!exists{f.queue=append(f.queue,id)

}

f.items[id]=objf.cond.Broadcast()returnnil


}

 

接下来我们看一下 Pop方法, 在 Queue中至少有一个资源时才会进行 Pop作。在处理资源之前,资源会从队列(和存储)中移除,如果未成功处理资源,应该用AddIfNotPresent()函数将资源添加回队列。处理逻辑由 PopProcessFunc进行执行,具体见代码清单2-44。

func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){

f.lock.Lock()

deferf.lock.Unlock()for{

forlen(f.queue)==0{

//Whenthequeueisempty,invocationofPop()isblockeduntilnewitemisenqueued.

//WhenClose()iscalled,thef.closedissetandtheconditionisbroadcasted.

//Whichcausesthislooptocontinueandreturnfromthe

Pop().


iff.IsClosed(){

returnnil,FIFOClosedError


}

 

f.cond.Wait()

}

id:=f.queue[0]

 

f.queue=f.queue[1:]item,ok:=f.items[id]

iff.initialPopulationCount>0{f.initialPopulationCount--

}

if!ok{

//Itemmayhavebeendeletedsubsequently.continue

}

delete(f.items,id)err:=process(item)

if e,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err

}

//Don'tneedtocopyDeltashere,becausewe'retransferring

//ownershiptothecaller.returnitem,err

}

}

 

func(f*DeltaFIFO)KeyOf(objinterface{})(string,error)

if d,ok:=obj.(Deltas);ok{iflen(d)==0{

return"",KeyError{obj,ErrZeroLengthDeltasObject}

}

obj=d.Newest().Object

}

ifd,ok:=obj.(DeletedFinalStateUnknown);ok{

returnd.Key,nil

}

returnf.keyFunc(obj)

}

 

值得注意的是,DeltaFIFO中用于计算对象键的函数KeyOf为什么要先进行一次Deltas的类型转换呢?是因为Pop 出去的对象很可能还要再添加进来(比如处理失败需要再放进来),此时添加的对象就是已经封装好的Delta对象了。至此,已实现DeltaFIFO基本功能。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
怎么设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程
6915 0
使用OWASP Top Ten保证Web应用程序的安全
 http://info.52z.com/html/14014.html
562 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
7757 0
extjs desktop 应用项目:教学资源库云平台
采用extjs desktop  界面截图网址: http://www.linbsoft.com/LinBSoft/zykpreview/ demo网址: http://demo.
950 0
Elasticsearch Top5典型应用场景
题记 刚接触Elasticsearch的朋友,或多或少会遇到一个问题,Elasticsearch在实际公司应用中除了搜索到底能做什么? 本文给出了答案。 除了“You Know, for Search”,Elasticsearch的使用会不断增长和变化。ObjectRocket作为一家托管云计算公司,已经在ObjectRocket平台上提供托管Elasticsearch一段时间了,并且能够看到我们客户之间的一些明确趋势以及他们如何使用该产品。以下是我们在平台上看到的Top5场景用例:
8 0
Apache HttpClient 4.3开发指南
《Apache HttpClient 4.3开发指南》 一、概述 Apache HttpClient 4系列已经发布很久了,但由于它与HttpClient 3.x版本完全不兼容,以至于业内采用此库的公司较少,在互联网上也少有相关的文档资料分享。
862 0
472
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载