带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十三)

简介: 带你读《云原生应用开发 Operator原理与实践》第二章 Operator 原理2.2Client-go 原理(十二)

1. Client-goIndexer

 

     资源对象从 DeltaFIFOPop 出去后又经过了哪些处理呢。这要从一开始的 sharedIndexInformer说起。注意,在 sharedIndexInformerRun 方法中,初始化了它的配置,并执行了 s.controller.Run方法。我们可以看到s.controller.Run中初始化了 Reflector,开始了指定资源的List-Watch 操作,并且同步到了DeltaFIFO中,同时执行了processLoop方法。此时我们可以看到 processLoop方法不断从DeltaFIFO中将资源对象 Pop来, 并且交给了之前的 c.config.Process方法进行处理。而c.config.Process方法就是sharedIndexInformerHandleDeltas方法,具体见代码清单 2-45



func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){

...

cfg:=&Config{

Queue:              fifo,

ListerWatcher:             s.listerWatcher,ObjectType:          s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:                 false,

ShouldResync:       s.processor.shouldResync,

 

Process:         s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,

}

 

func(){

s.startedLock.Lock()

defers.startedLock.Unlock()

 

...

}()


s.controller=New(cfg)s.controller.(*controller).clock=s.clocks.started=true

s.controller.Run(stopCh)

}

 

func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()

gofunc(){

<-stopCh

c.config.Queue.Close()

}()

r:=NewReflector(


c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,

)

r.ShouldResync=c.config.ShouldResync

r.clock=c.clock

...

}

c.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()

 

wait.Until(c.processLoop,time.Second,stopCh)

func(c*controller)processLoop(){for{

obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))

iferr!=nil{

iferr==FIFOClosedError{

return

}

ifc.config.RetryOnError{

//Thisisthesafewaytore-enqueue.

c.config.Queue.AddIfNotPresent(obj)

}

}

}

}

 

 

综上可知,由 DeltaFIFOPop出来的对象最后交给了 HandleDeltas进行处理,而HandleDeltas中,将资源对象同步到了 Indexer中,至此我们引出了 Informer模块中的3个组件 Indexer。IndexerClient-go 中实现的一个本地存储,它可以建立索引并存Resource的对象。Reflector通过 DeltaFIFOQueue将资源对象存储到Indexer中。需要注意的是,Indexer中的数据与 ETCD中的数据是完全一致的,当 Client-go需要数据时,无须每次都从 APIServer中获取,从而减轻了请求过多造成的对 APIServer的压力, 具体见代码清单 2-46。

func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{

s.blockDeltas.Lock()

defers.blockDeltas.Unlock()

 

//fromoldesttonewest

for_,d:=rangeobj.(Deltas){switchd.Type{

caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)

ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&

exists{

if err:=s.indexer.Update(d.Object);err!=nil{returnerr


}

 

requestedresync 

nil{

 

==nil{


isSync:=falseswitch{

cased.Type==Sync:

//Synceventsareonlypropagatedtolistenersthat

 

isSync=true

cased.Type==Replaced:

ifaccessor,err:=meta.Accessor(d.Object);err==ifoldAccessor,err:=meta.Accessor(old);err

//Replacedeventsthatdidn'tchange


resourceVersionaretreatedasresyncevents

//andonlypropagatedtolisteners


thatrequestedresync

 

==oldAccessor.GetResourceVersion()

}


isSync=accessor.GetResourceVersion()

}

}

s.processor.distribute(updateNotification{oldObj:old,

newObj:d.Object},isSync)

}else{

if err:=s.indexer.Add(d.Object);err!=nil{returnerr

 

false)

}

s.processor.distribute(addNotification{newObj:d.Object},

 

}

caseDeleted:

iferr:=s.indexer.Delete(d.Object);err!=nil{


 

returnerr

}

s.processor.distribute(deleteNotification{oldObj:d.Object},false)

}

}

returnnil

}

 

Indexer   是如何实现存储并快速查找资源的呢?我们先看一下 Indexer接口提供的功能。CacheIndexer的一种非常经典的实现,所有的对象缓存在内存中,而且从Cache 这个类型的名称来看它属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。 这里的 Store、Indexer使用了一个 threadSafeMap来保证并发安全的存储。它拥有存储相关的增、删、改、查等方法。threadSafeMap继承了 Store接口,而 Indexer扩展了threadSafeMap, 为 threadSafeMap提供了索引操作。threadSafeMap其实只能够存储和索引。存储即将runtime.object存储到 ItemsMap中;索引即为ItemsMap建立三层索引:IndicesMap类型索引namespace、nodeName);IndexMap 类型索引(namespace1、namespace2……);runtime.object类型索引,实现见代码清 2-47


typeIndexerinterface{Store

//indexName索引类,obj是对象,计算objindexName索引类中的索引键,通过索引键

获取所有的对象

//基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键

Index(indexNamestring,objinterface{})([]interface{},error)

//indexKeyindexName索引类中的⼀个索引键,函数返回indexKey指定的所有对象键

IndexKeys(indexName,indexedValuestring)([]string,error)

//获取indexName索引类中的所有索引键

ListIndexFuncValues(indexNamestring)[]string

//这个函数和 Index类似,只是返回值不是对象键,⽽是所有对象

ByIndex(indexName,indexedValuestring)([]interface{},error)

//返回Indexers

GetIndexers()Indexers

//添加Indexers,就是增加更多的索引分类

AddIndexers(newIndexersIndexers)error

}

 

Kubernetes中使用的比较多的索引函数是MetaNamespaceIndexFunc() 代码位置:

 

client-go/tools/cache/index.go,Indexer索引的实现是通过index.ByIndex来完成的, index.ByIndex的实现见代码清单 2-48。这个函数返回了符合索引函数的值的对象列表。



func(c*threadSafeMap)ByIndex(indexName,indexKeystring)([]interface{},error){c.lock.RLock()

deferc.lock.RUnlock()

 

indexFunc:=c.indexers[indexName]ifindexFunc==nil{

returnnil,fmt.Errorf("Indexwithname%sdoesnotexist",indexName)

}

index:=c.indices[indexName]set:=index[indexKey]

list:=make([]interface{},0,set.Len())for_,key:=rangeset.List(){

list=append(list,c.items[key])

}

 

returnlist,nil


}

 

上述方法接收两个参数:indexName(索引器的名称)indexedValue需要索引的 Key。首先根据索引器名称查找指定的索引器函数c.indexers[indexName]);然后根据索引器名称查找相应的缓存器函数(c.indices[indexName]) ;最后根据索引 Key

indexedValue)从缓存中进行数据查询,并返回查询结果。

相关文章
|
19天前
|
运维 Cloud Native 持续交付
云原生架构的演进与实践####
【10月更文挑战第16天】 云原生,这一概念自提出以来,便以其独特的魅力和无限的可能性,引领着现代软件开发与部署的新浪潮。本文旨在探讨云原生架构的核心理念、关键技术及其在实际项目中的应用实践,揭示其如何帮助企业实现更高效、更灵活、更可靠的IT系统构建与管理。通过深入剖析容器化、微服务、持续集成/持续部署(CI/CD)等核心技术,结合具体案例,本文将展现云原生架构如何赋能企业数字化转型,推动业务创新与发展。 ####
119 47
|
7天前
|
弹性计算 Kubernetes Cloud Native
云原生架构下的微服务设计原则与实践####
本文深入探讨了在云原生环境中,微服务架构的设计原则、关键技术及实践案例。通过剖析传统单体架构面临的挑战,引出微服务作为解决方案的优势,并详细阐述了微服务设计的几大核心原则:单一职责、独立部署、弹性伸缩和服务自治。文章还介绍了容器化技术、Kubernetes等云原生工具如何助力微服务的高效实施,并通过一个实际项目案例,展示了从服务拆分到持续集成/持续部署(CI/CD)流程的完整实现路径,为读者提供了宝贵的实践经验和启发。 ####
|
8天前
|
Kubernetes 负载均衡 Cloud Native
云原生应用:Kubernetes在容器编排中的实践与挑战
【10月更文挑战第27天】Kubernetes(简称K8s)是云原生应用的核心容器编排平台,提供自动化、扩展和管理容器化应用的能力。本文介绍Kubernetes的基本概念、安装配置、核心组件(如Pod和Deployment)、服务发现与负载均衡、网络配置及安全性挑战,帮助读者理解和实践Kubernetes在容器编排中的应用。
32 4
|
12天前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第21天】本文探讨了MongoDB Atlas的核心特性、实践应用及对云原生数据库未来的思考。MongoDB Atlas作为MongoDB的云原生版本,提供全球分布式、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了云原生数据库的未来趋势,如架构灵活性、智能化运维和混合云支持,并分享了实施MongoDB Atlas的最佳实践。
|
13天前
|
监控 Cloud Native Java
云原生架构下微服务治理策略与实践####
【10月更文挑战第20天】 本文深入探讨了云原生环境下微服务架构的治理策略,通过分析当前技术趋势与挑战,提出了一系列高效、可扩展的微服务治理最佳实践方案。不同于传统摘要概述内容要点,本部分直接聚焦于治理核心——如何在动态多变的分布式系统中实现服务的自动发现、配置管理、流量控制及故障恢复,旨在为开发者提供一套系统性的方法论,助力企业在云端构建更加健壮、灵活的应用程序。 ####
59 10
|
8天前
|
Kubernetes Cloud Native API
云原生架构下微服务治理的深度探索与实践####
本文旨在深入剖析云原生环境下微服务治理的核心要素与最佳实践,通过实际案例分析,揭示高效、稳定的微服务架构设计原则及实施策略。在快速迭代的云计算领域,微服务架构以其高度解耦、灵活扩展的特性成为众多企业的首选。然而,伴随而来的服务间通信、故障隔离、配置管理等挑战亦不容忽视。本研究聚焦于云原生技术栈如何赋能微服务治理,涵盖容器编排(如Kubernetes)、服务网格(如Istio/Envoy)、API网关、分布式追踪系统等关键技术组件的应用与优化,为读者提供一套系统性的解决方案框架,助力企业在云端构建更加健壮、可维护的服务生态。 ####
|
8天前
|
Kubernetes 监控 Cloud Native
云原生应用:Kubernetes在容器编排中的实践与挑战
【10月更文挑战第26天】随着云计算技术的发展,容器化成为现代应用部署的核心趋势。Kubernetes(K8s)作为容器编排领域的佼佼者,以其强大的可扩展性和自动化能力,为开发者提供了高效管理和部署容器化应用的平台。本文将详细介绍Kubernetes的基本概念、核心组件、实践过程及面临的挑战,帮助读者更好地理解和应用这一技术。
36 3
|
13天前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第20天】本文探讨了MongoDB Atlas的核心特性、实践应用及对未来云原生数据库的思考。MongoDB Atlas作为云原生数据库服务,具备全球分布、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了实施MongoDB Atlas的最佳实践和职业心得,展望了云原生数据库的发展趋势。
|
9天前
|
监控 安全 Cloud Native
云原生安全:Istio在微服务架构中的安全策略与实践
【10月更文挑战第26天】随着云计算的发展,云原生架构成为企业数字化转型的关键。微服务作为其核心组件,虽具备灵活性和可扩展性,但也带来安全挑战。Istio作为开源服务网格,通过双向TLS加密、细粒度访问控制和强大的审计监控功能,有效保障微服务间的通信安全,成为云原生安全的重要工具。
27 2
|
8天前
|
监控 Cloud Native 持续交付
云原生技术深度解析:重塑现代应用开发与部署范式####
本文深入探讨了云原生技术的核心概念、关键技术组件及其在现代软件开发中的重要性。通过剖析容器化、微服务架构、持续集成/持续部署(CI/CD)等关键技术,本文旨在揭示云原生技术如何促进应用的敏捷性、可扩展性和高可用性,进而推动企业数字化转型进程。不同于传统摘要仅概述内容要点,本部分将融入具体案例分析,直观展示云原生技术在实际应用中的显著成效与挑战应对策略,为读者提供更加丰富、立体的理解视角。 ####