kubernetes delta_fifo源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: kubernetes delta_fifo在实现先入先出队列思路上与kubernetes fifo类似,但其支持与key相关联事件入队,保存多个事件,是informer机制的基础

kubernetes delta_fifo源码解析
1.介绍
kubernetes delta_fifo是一个先入先出队列,相较于fifo,有两点不同:

• 与key相关联的不直接是obj,而是Deltas,它是一个切片,Delta不仅包含了obj,还包含了DeltaType
• 当Deltas最后一个元素Delta.DeltaType已经是Deleted类型时,再添加一个Deleted类型的Delta,Deltas不再新增 delta_fifo的API与fifo类型,不再具体分析
2.使用
参考TestDeltaFIFO_ReplaceMakesDeletions[1] `go // 取testFifoObject中name作为key func testFifoObjectKeyFunc(obj interface{}) (string, error) { return obj.(testFifoObject).name, nil }
type testFifoObject struct { name string val interface{} }

func mkFifoObj(name string, val interface{}) testFifoObject { return testFifoObject{name: name, val: val} }

// literalListerGetter实现了KeyListerGetter接口 type literalListerGetter func() []testFifoObject

var _ KeyListerGetter = literalListerGetter(nil)

func (kl literalListerGetter) ListKeys() []string { result := []string{} for _, fifoObj := range kl() { result = append(result, fifoObj.name) } return result }

func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) { for _, v := range kl() { if v.name == key { return v, true, nil } } return nil, false, nil }

func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), }) // 删除 f.Delete(mkFifoObj("baz", 10)) // 替换,f.emitDeltaTypeReplaced为false时action为Sync,否则action为Replace f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") // 期望的列表 expectedList := []Deltas{ {{Deleted, mkFifoObj("baz", 10)}}, {{Sync, mkFifoObj("foo", 5)}}, {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, }

for _, expected := range expectedList {

cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
    t.Errorf("Expected %#v, got %#v", e, a)
}

}

f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {

    return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),

})
f.Add(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")

expectedList = []Deltas{

{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
{{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},

}

for _, expected := range expectedList {

cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
    t.Errorf("Expected %#v, got %#v", e, a)
}

}

f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")

expectedList = []Deltas{

{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
{{Sync, mkFifoObj("foo", 5)}},

}

for _, expected := range expectedList {

cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
    t.Errorf("Expected %#v, got %#v", e, a)
}

}
}

3.源码解析

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    if opts.KeyFunction == nil {
        opts.KeyFunction = MetaNamespaceKeyFunc
    }
    
    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      opts.KeyFunction,
        knownObjects: opts.KnownObjects,
        
        emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
    }
    f.cond.L = &f.lock
    return f
}
// 计算obj对应的key
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    // 如果obj为Deltas类型
    if d, ok := obj.(Deltas); ok {
        // 如果没有值,抛err
        if len(d) == 0 {
            return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        // 取最新的obj
        obj = d.Newest().Object
    }
    // 如果obj为DeletedFinalStateUnknown类型,则直接返回DeletedFinalStateUnknown.Key
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil
    }
    // 否则,使用keyFunc
    return f.keyFunc(obj)
}

func (d Deltas) Newest() *Delta {
    if n := len(d); n > 0 {
        return &d[n-1]
    }
    return nil
}

// Delete方法添加Deleted类型的Delta,如果f.knownObjects为nil并且obj不存在时,不做处理;如果f.knownObjects不为nil,且f.knownObjects.GetByKey(id)不存在并且f.items[id]不存在,不做处理
func (f *DeltaFIFO) Delete(obj interface{}) error {
    // 计算obj对应的key
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    if f.knownObjects == nil {
        // 如果f.items不存在则不处理
        if _, exists := f.items[id]; !exists {
            return nil
        }
    } else {
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        // 如果f.knownObjects.GetByKey(id)和f.items[id]都不存在,则不处理
        if err == nil && !exists && !itemsExist {
            return nil
        }
    }
    // Deleted类型入队
    return f.queueActionLocked(Deleted, obj)
}

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 计算obj对应的key
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    // delete类型是否重复了
    newDeltas = dedupDeltas(newDeltas)
    
    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        // 正常情况,不应该走到这个分支
        if oldDeltas == nil {
            klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
            return nil
        }
        klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
        f.items[id] = newDeltas
        return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
    }
    return nil
}

func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    a := &deltas[n-1]
    b := &deltas[n-2]
    if out := isDup(a, b); out != nil {
        deltas[n-2] = *out
        return deltas[:n-1]
    }
    return deltas
}

func isDup(a, b *Delta) *Delta {
    // 是否删除类型重复
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
    return nil
}

func isDeletionDup(a, b *Delta) *Delta {
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    // 都为delete类型,并且b.Object是DeletedFinalStateUnknown类型,则保留a,否则保留b
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}
// Replace逻辑如下: (1) 添加Sync或Replace Delta类型对象
// (2) 删除操作:对于每个已经存在的keys,但不存在于list中的对象,添加Delete(DeletedFinalStateUnknown{K, O})对象,其中O是K关联的对象;
// 如果f.knownObjects为空, 已经存在的keys是f.items,O是K关联的Deltas.Newest();
// 如果f.knownObjects不为空,已经存在的keys是f.knownObjects,O是f.knownObjects.GetByKey(K)的返回值
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))
    
    // 兼容老版本的客户端
    action := Sync
    if f.emitDeltaTypeReplaced {
        action = Replaced
    }
    
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        // 每个list中的item添加Sync/Replaced类型
        if err := f.queueActionLocked(action, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }
    
    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        queuedDeletions := 0
        for k, oldItem := range f.items {
            if keys.Has(k) {
                continue
            }

            var deletedObj interface{}
            // 取最新的一个obj
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            queuedDeletions++
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }
    
        if !f.populated {
            f.populated = true
            f.initialPopulationCount = keys.Len() + queuedDeletions
        }
        
        return nil
    }
    
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        if keys.Has(k) {
        continue
    }
    // 取f.knownObjects.GetByKey的返回值
    deletedObj, exists, err := f.knownObjects.GetByKey(k)
    if err != nil {
        deletedObj = nil
        klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
    } else if !exists {
        deletedObj = nil
        klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
    }
    queuedDeletions++
    if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
        return err
        }
    }
    
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = keys.Len() + queuedDeletions
    }
    
    return nil
}
// Add 添加一个Added类型的obj
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}
// Pop按added/updated顺序返回一个Deltas,如果队列为空则阻塞
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // 当队列为空时,除了入队以外,也可以调用Close()退出循环
            if f.closed {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        // 取队头元素,先入先出
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // 不应该不存在f.items中
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        delete(f.items, id)
        // 当队列深度大于10的时候,打开trace日志
        if depth > 10 {
            trace := utiltrace.New("DeltaFIFO Pop Process",
                utiltrace.Field{Key: "ID", Value: id},
                utiltrace.Field{Key: "Depth", Value: depth},
                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
            defer trace.LogIfLong(100 * time.Millisecond)
        }
        // 调用PopProcessFunc函数处理item,返回ErrRequeue时,重入队
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // 直接返回item,不进行深拷贝,将item的所有权转移给调用者
        return item, err
    }
}
4.总结
kubernetes delta_fifo在实现先入先出队列思路上与kubernetes fifo类似,但其支持与key相关联事件入队,保存多个事件,是informer机制的基础

引用链接
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
6天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
21 2
|
7天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
13天前
|
运维 Kubernetes Cloud Native
Kubernetes云原生架构深度解析与实践指南####
本文深入探讨了Kubernetes作为领先的云原生应用编排平台,其设计理念、核心组件及高级特性。通过剖析Kubernetes的工作原理,结合具体案例分析,为读者呈现如何在实际项目中高效部署、管理和扩展容器化应用的策略与技巧。文章还涵盖了服务发现、负载均衡、配置管理、自动化伸缩等关键议题,旨在帮助开发者和运维人员掌握利用Kubernetes构建健壮、可伸缩的云原生生态系统的能力。 ####
|
19天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
39 3
|
11天前
|
存储 Kubernetes 调度
深度解析Kubernetes中的Pod生命周期管理
深度解析Kubernetes中的Pod生命周期管理
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
55 5
|
28天前
|
存储 Kubernetes 监控
深度解析Kubernetes在微服务架构中的应用与优化
【10月更文挑战第18天】深度解析Kubernetes在微服务架构中的应用与优化
102 0
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
67 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
54 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
60 0

推荐镜像

更多