带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十二)

1. Client-goIndexer

 

     资源对象从 DeltaFIFOPop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndexInformer说起。注意,在 sharedIndexInformerRun 方法中,初始化了它的配置,并执行了 s.controller.Run方法。我们可以看到s.controller.Run中初始化了 Reflector,开始了指定资源的List-Watch 操作,并且同步到了DeltaFIFO中,同时执行了processLoop方法。此时我们可以看到 processLoop方法不断从DeltaFIFO中将资源对象 Pop来, 并且交给了之前的 c.config.Process方法进行处理。而c.config.Process方法就是sharedIndexInformerHandleDeltas方法,具体见代码清单 2-45



func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){

...

cfg:=&Config{

Queue:              fifo,

ListerWatcher:             s.listerWatcher,ObjectType:          s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:                 false,

ShouldResync:       s.processor.shouldResync,

 

Process:         s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,

}

 

func(){

s.startedLock.Lock()

defers.startedLock.Unlock()

 

...

}()


s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true

s.controller.Run(stopCh)

}

 

func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()

gofunc(){

<-stopCh

c.config.Queue.Close()

}()

r:=NewReflector(


c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,

)

r.ShouldResync=c.config.ShouldResync

r.clock=c.clock

...

}

c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()

 

wait.Until(c.processLoop,time.Second,stopCh)

func(c*controller)processLoop(){for{

obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))

iferr!=nil{

iferr==FIFOClosedError{

return

}

ifc.config.RetryOnError{

//Thisisthesafewaytore-enqueue.

c.config.Queue.AddIfNotPresent(obj)

}

}

}

}

 

 

综上可知,由 DeltaFIFOPop出来的对象最后交给了 HandleDeltas进行处理,而HandleDeltas中,将资源对象同步到了 Indexer中,至此我们引出了 Informer模块中的3个组件 Indexer。IndexerClient-go 中实现的一个本地存储,它可以建立索引并存Resource的对象。Reflector通过 DeltaFIFOQueue将资源对象存储到Indexer中。需要注意的是,Indexer中的数据与 ETCD中的数据是完全一致的,当 Client-go需要数据时,无须每次都从 APIServer中获取,从而减轻了请求过多造成的对 APIServer的压力, 具体见代码清单 2-46。

func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{

s.blockDeltas.Lock()

defers.blockDeltas.Unlock()

 

//fromoldesttonewest

for_,d:=rangeobj.(Deltas){switchd.Type{

caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)

ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&

exists{

if err:=s.indexer.Update(d.Object);err!=nil{returnerr


}

 

requestedresync 

nil{

 

==nil{


isSync:=falseswitch{

cased.Type==Sync:

//Synceventsareonlypropagatedtolistenersthat

 

isSync=true

cased.Type==Replaced:

ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err

//Replacedeventsthatdidn'tchange


resourceVersionaretreatedasresyncevents

//andonlypropagatedtolisteners


thatrequestedresync

 

==oldAccessor.GetResourceVersion()

}


isSync=accessor.GetResourceVersion()

}

}

s.processor.distribute(updateNotification{oldObj:old,

newObj:d.Object},isSync)

}else{

if err:=s.indexer.Add(d.Object);err!=nil{returnerr

 

false)

}

s.processor.distribute(addNotification{newObj:d.Object},

 

}

caseDeleted:

iferr:=s.indexer.Delete(d.Object);err!=nil{


 

returnerr

}

s.processor.distribute(deleteNotification{oldObj:d.Object},false)

}

}

returnnil

}

 

Indexer   是如何实现存储并快速查找资源的呢?我们先看一下 Indexer接口提供的功能。CacheIndexer的一种非常经典的实现,所有的对象缓存在内存中,而且从Cache 这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。 这里的 Store、Indexer使用了一个 threadSafeMap来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap继承了 Store接口,而 Indexer扩展了threadSafeMap, 为 threadSafeMap提供了索引操作。threadSafeMap其实只能够存储和索引。存储即将runtime.object存储到 ItemsMap中;索引即为ItemsMap建立三层索引:IndicesMap类型索引namespace、nodeName);IndexMap 类型索引(namespace1、namespace2……);runtime.object类型索引,实现见代码清 2-47


typeIndexerinterface{Store

//indexName索引类,obj是对象,计算objindexName索引类中的索引键,通过索引键

获取所有的对象

//基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键

Index(indexNamestring,objinterface{})([]interface{},error)

//indexKeyindexName索引类中的⼀个索引键,函数返回indexKey指定的所有对象键

IndexKeys(indexName,indexedValuestring)([]string,error)

//获取indexName索引类中的所有索引键

ListIndexFuncValues(indexNamestring)[]string

//这个函数和 Index类似,只是返回值不是对象键,⽽是所有对象

ByIndex(indexName,indexedValuestring)([]interface{},error)

//返回Indexers

GetIndexers()Indexers

//添加Indexers,就是增加更多的索引分类

AddIndexers(newIndexersIndexers)error

}

 

Kubernetes中使用的比较多的索引函数是MetaNamespaceIndexFunc() 代码位置:

 

client-go/tools/cache/index.go,Indexer索引的实现是通过index.ByIndex来完成的, index.ByIndex的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。



func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()

deferc.lock.RUnlock()

 

indexFunc:=c.indexers[indexName]ifindexFunc==nil{

returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)

}

index:=c.indices[indexName]set:=index[indexKey]

list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){

list=append(list,c.items[key])

}

 

returnlist,nil


}

 

上述方法接收两个参数:indexName(索引器的名称)indexedValue需要索引的 Key。首先根据索引器名称查找指定的索引器函数c.indexers[indexName]);然后根据索引器名称查找相应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key

indexedValue)从缓存中进行数据查询,并返回查询结果。

相关文章
|
1月前
|
缓存 Java API
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
【云原生】Spring Cloud Gateway的底层原理与实践方法探究
|
1月前
|
Kubernetes Cloud Native 开发工具
带你读《云原生应用开发:Operator原理与实践》精品文章合集
带你读《云原生应用开发:Operator原理与实践》精品文章合集
|
2月前
|
人工智能 缓存 Kubernetes
.NET 9 首个预览版发布:瞄准云原生和智能应用开发
.NET 9 首个预览版发布:瞄准云原生和智能应用开发
|
3月前
|
存储 SQL Cloud Native
深入了解云原生数据库CockroachDB的概念与实践
作为一种全球领先的分布式SQL数据库,CockroachDB以其高可用性、强一致性和灵活性等特点备受关注。本文将深入探讨CockroachDB的概念、设计思想以及实践应用,并结合实例演示其在云原生环境下的优越表现。
|
3月前
|
Cloud Native 关系型数据库 大数据
CockroachDB:云原生数据库的新概念与实践
本文将介绍CockroachDB,一种先进的云原生数据库,它具备分布式、强一致性和高可用性等特点。我们将探讨CockroachDB的基本原理、架构设计以及在实际应用中的种种优势和挑战。
|
1月前
|
Cloud Native 安全 持续交付
构建未来:云原生架构的演进与实践
【2月更文挑战第30天】 随着数字化转型的深入,企业对于信息技术的需求日益复杂化和动态化。传统的IT架构已难以满足快速迭代、灵活扩展及成本效率的双重要求。云原生技术作为解决这一矛盾的关键途径,通过容器化、微服务、持续集成/持续部署(CI/CD)等手段,实现了应用的快速开发、部署及运维。本文将探讨云原生架构的最新发展,分析其如何助力企业构建更加灵活、高效的业务系统,并结合实际案例,展示云原生转型过程中的最佳实践和面临的挑战。
|
11天前
|
Kubernetes 监控 Cloud Native
构建高效云原生应用:基于Kubernetes的微服务治理实践
【4月更文挑战第13天】 在当今数字化转型的浪潮中,企业纷纷将目光投向了云原生技术以支持其业务敏捷性和可扩展性。本文深入探讨了利用Kubernetes作为容器编排平台,实现微服务架构的有效治理,旨在为开发者和运维团队提供一套优化策略,以确保云原生应用的高性能和稳定性。通过分析微服务设计原则、Kubernetes的核心组件以及实际案例,本文揭示了在多变的业务需求下,如何确保系统的高可用性、弹性和安全性。
16 4
|
26天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
39 0
|
1月前
|
运维 Cloud Native 持续交付
云原生技术的未来展望:如何塑造下一代应用开发
【2月更文挑战第30天】 随着云计算的不断发展,云原生技术已经成为推动现代应用开发的重要力量。本文将深入探讨云原生技术的核心概念,分析其在提高开发效率、降低运维成本以及支持复杂业务场景中的作用。同时,文章还将预测云原生技术的发展趋势,并讨论如何在不断变化的技术环境中保持应用的敏捷性和可靠性。
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

热门文章

最新文章