kubernetes fifo源码解析

简介: kubernetes fifo是一个先入先出队列,在实现思路上,值得我们学习借鉴

kubernetes fifo源码解析
1.介绍
kubernetes fifo是一个先入先出队列,实现了Add、Update、Delete、Get、Pop等基本API,以及Replace、HasSynced等API,具体如下:

type FIFO struct {

lock sync.RWMutex
cond sync.Cond
// key和obj的映射
items map[string]interface{}
// key的队列,去重
queue []string

// 当Delete/Add/Update被首先调用,或Replace()的items全部被pop时populated为true
populated bool
// Replace()首先被调用时的objs的数量
initialPopulationCount int

// keyFunc是用来将obj生成key的
keyFunc KeyFunc

// 队列是否关闭,用在Pop方法内的循环控制中
closed bool

}
func NewFIFO(keyFunc KeyFunc) *FIFO

创建一个先入先出队列

func (f *FIFO) Add(obj interface{}) error

添加一个obj,当f.queue中已存在对应的key时,f.queue不再添加

func (f *FIFO) AddIfNotPresent(obj interface{}) error

当f.items不存在obj对应的key时才添加,这在单一生产者/消费者有用,消费者可以安全的重试,避免与生产者竞争以及重入队已消费的item

func (f *FIFO) Close()

关闭队列

func (f *FIFO) Delete(obj interface{}) error

删除不存在f.queue中的item,因为这个实现假设使用者只关心对象,而不关心创建/添加对象的顺序

func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error)

返回请求的item,不存在时exists为false

func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error)

返回请求的item,不存在时exists为false

func (f *FIFO) HasSynced() bool

当Add/Update/Delete/AddIfNotPresent先被调用,或者先被Replace()插入的items都被Pop时,HasSynced返回true

func (f *FIFO) IsClosed() bool

检车队列是否关闭

func (f *FIFO) List() []interface{}

返回所有items.

func (f *FIFO) ListKeys() []string

返回当前FIFO中所有的key

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error)

Pop会等到f.queue中有对象,并且会调用PopProcessFunc处理item。如果f.queue中有多个待处理的对象,则将按照Add/Update的顺序返回。在调用PopProcessFunc之前,会从队列(和存储)中删除item。如果PopProcessFunc返回ErrRequeue,会使用AddIfNotPresent()将其添加回来,因此保证可重复消费。PopProcessFunc是在锁定状态下调用的,因此在PopProcessFunc中操作FIFO的数据结构是安全的。

func (f *FIFO) Replace(list []interface{}, resourceVersion string) error

会根据list重新生成一个map,并将f.items指向新的map,依据该map重新入队f.queue,所以f.queue是无序的

func (f *FIFO) Resync() error

Resync会保证f.items中的key全部存在f.queue中,一般不应该调用该方法,因为其他api应当维持关联关系

func (f *FIFO) Update(obj interface{}) error

与Add实现一致

2.使用
参考TestFIFO_requeueOnPop[1]

// 取testFifoObject中name作为key
func testFifoObjectKeyFunc(obj interface{}) (string, error) {

return obj.(testFifoObject).name, nil

}

type testFifoObject struct {

name string
val  interface{}

}

func mkFifoObj(name string, val interface{}) testFifoObject {

return testFifoObject{name: name, val: val}

}

func TestFIFO_requeueOnPop(t *testing.T) {

// 创建FIFO实例
f := NewFIFO(testFifoObjectKeyFunc)
// 添加obj
f.Add(mkFifoObj("foo", 10))
// Pop操作,但返回ErrRequeue,这时会重入队
_, err := f.Pop(func(obj interface{}) error {
    if obj.(testFifoObject).name != "foo" {
        t.Fatalf("unexpected object: %#v", obj)
    }
    return ErrRequeue{Err: nil}
})
if err != nil {
    t.Fatalf("unexpected error: %v", err)
}
// GetByKey,还在队列中
if _, ok, err := f.GetByKey("foo"); !ok || err != nil {
    t.Fatalf("object should have been requeued: %t %v", ok, err)
}

_, err = f.Pop(func(obj interface{}) error {
    if obj.(testFifoObject).name != "foo" {
        t.Fatalf("unexpected object: %#v", obj)
    }
    return ErrRequeue{Err: fmt.Errorf("test error")}
})
if err == nil || err.Error() != "test error" {
    t.Fatalf("unexpected error: %v", err)
}
if _, ok, err := f.GetByKey("foo"); !ok || err != nil {
    t.Fatalf("object should have been requeued: %t %v", ok, err)
}
// Pop操作,返回nil,不在队列中了
_, err = f.Pop(func(obj interface{}) error {
    if obj.(testFifoObject).name != "foo" {
        t.Fatalf("unexpected object: %#v", obj)
    }
    return nil
})
if err != nil {
    t.Fatalf("unexpected error: %v", err)
}
// GetByKey,不在队列中
if _, ok, err := f.GetByKey("foo"); ok || err != nil {
    t.Fatalf("object should have been removed: %t %v", ok, err)
}

}
3.源码解析
func NewFIFO(keyFunc KeyFunc) *FIFO {

f := &FIFO{
    // key和obj的映射
    items:   map[string]interface{}{},
    // key的队列,先入先出
    queue:   []string{},
    // obj和key的映射函数
    keyFunc: keyFunc,
}
// f.cond.L持有f.lock
f.cond.L = &f.lock

return f
}
func (f *FIFO) Add(obj interface{}) error {

id, err := f.keyFunc(obj)
if err != nil {
    return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// items中不存在时,才入队
if _, exists := f.items[id]; !exists {
    f.queue = append(f.queue, id)
}
f.items[id] = obj
// 唤醒所有等待在f.cond的协程,其实就是Pop在等待f.cond
f.cond.Broadcast()
return nil

}
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {

f.lock.Lock()
defer f.lock.Unlock()
for {
    for len(f.queue) == 0 {
        // 当队列为空时, 避免只有item入队时Pop才可以退出;当f.Close()调用时,Pop也可以退出
        if f.closed {
            return nil, ErrFIFOClosed
        }
        // 等待条件变量唤醒
        f.cond.Wait()
    }
    // 从对头取,先入先出
    id := f.queue[0]
    f.queue = f.queue[1:]
    // 当Replace先被调用时,initialPopulationCount才可能大于0
    if f.initialPopulationCount > 0 {
        f.initialPopulationCount--
    }
    item, ok := f.items[id]
    if !ok {
        // item有可能随后被删除,当被删除时不进行后续操作
        continue
    }
    // 删除item
    delete(f.items, id)
    // 调用item处理函数,如果返回ErrRequeue时,重入队,以便重复消费
    err := process(item)
    if e, ok := err.(ErrRequeue); ok {
        f.addIfNotPresent(id, item)
        err = e.Err
    }
    return item, err
}

}
func (f *FIFO) addIfNotPresent(id string, obj interface{}) {

f.populated = true
if _, exists := f.items[id]; exists {
    return
}

f.queue = append(f.queue, id)
f.items[id] = obj
f.cond.Broadcast()

}
4.总结
kubernetes fifo在实现先入先出队列上,值得我们学习借鉴

引用链接
[1] TestFIFO_requeueOnPop: https://github.com/kubernetes/kubernetes/blob/v1.26.3/staging/src/k8s.io/client-go/tools/cache/fifo_test.go#L75

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
8月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
828 29
|
8月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
331 4
|
8月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
8月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
8月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
9月前
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
2348 1
|
8月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
1月前
|
人工智能 算法 调度
阿里云ACK托管集群Pro版共享GPU调度操作指南
本文介绍在阿里云ACK托管集群Pro版中,如何通过共享GPU调度实现显存与算力的精细化分配,涵盖前提条件、使用限制、节点池配置及任务部署全流程,提升GPU资源利用率,适用于AI训练与推理场景。
230 1
|
1月前
|
弹性计算 监控 调度
ACK One 注册集群云端节点池升级:IDC 集群一键接入云端 GPU 算力,接入效率提升 80%
ACK One注册集群节点池实现“一键接入”,免去手动编写脚本与GPU驱动安装,支持自动扩缩容与多场景调度,大幅提升K8s集群管理效率。
224 89
|
6月前
|
资源调度 Kubernetes 调度
从单集群到多集群的快速无损转型:ACK One 多集群应用分发
ACK One 的多集群应用分发,可以最小成本地结合您已有的单集群 CD 系统,无需对原先应用资源 YAML 进行修改,即可快速构建成多集群的 CD 系统,并同时获得强大的多集群资源调度和分发的能力。
273 9

热门文章

最新文章

推荐镜像

更多