kubernetes1.9源码阅读 replication controller的Informer机制

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: replication controller是kube-controller-manager中一个重要的控制器,主要是rs进行控制,确保pods的数量恰好和rs的规定一致。因此replication controller主要对这两类进行watch,一类是replicationset,另一类是pods。

replication controller是kube-controller-manager中一个重要的控制器,主要是rs进行控制,确保pods的数量恰好和rs的规定一致。因此replication controller主要对这两类进行watch,一类是replicationset,另一类是pods。本文是replication controller的源码阅读笔记,会包括client-go的Informer机制,希望帮助开始阅读kubernetes源码的小伙伴们,更希望与对kubernetes源码阅读感兴趣的小伙伴儿们交流,有错误的地方也希望能指出,共同进步。


入口程序

cmd/kube-controller-manager/controller-manager.go main

1. 调用options.NewCMServer,构建CMServer;

2. 调用app.Run方法,运行CMServer;

启动CMServer

cmd/kube-controller-manager/app/controllermanager.go Run

1. 调用createClients, 创建apiserver客户端,通过REST方式访问APIserver提供的API服务;

2. 启动协程调用go startHTTP,运行http Server;

3. 调用record.NewBroadcaster,创建eventBroadcaster对象,接收EventBroadcaster发送的event,输出到logging中,并输出到EventSink,并使用recorder记录”controller-mananger”的事件;

4. 调用CreateControllerContext,在CreateControllerContext方法中,会调用informers.NewSharedInformerFactory,创建sharedInformerFactory(client-go/informers/factory.go)对象;

5. 调用StartControllers,启动Controllers;

6. 调用ctx.InformerFactory.Start,在这里是调用sharedInformerFactory.start(client-go/informers/factory.go);

7. saTokenControllerInitFunc和NewControllerInitializers定义了controllers的InitFunc;

startReplicationController

cmd/kube-controller-manager/app/core.go startReplicationController

1. 协程启动调用replicationcontroller.NewReplicationManager,构建ReplicationManager;

(1) 调用ctx.InformerFactory.Core().V1().Pods(),这里调用sharedInformerFactory(client-go/informers/factory.go)对象的Core().V1().Pods()方法,将会构建PodInformer对象,

(2) 以此方式,创建ReplicationControllersInformer;

2. 运行ReplicationManager;

NewReplicationManager

pkg/controller/replication/replication_set.go NewReplicationManager

1. 在NewReplicationManager中,

(1) 首先,调用record.NewBroadcaster,创建eventBroadcaster对象,调用eventBroadcaster.StartLogging,接收EventBroadcaster发送的event,输出到logging中;调用 eventBroadcaster.StartRecordingToSink,event输出到EventSink,并调用eventBroadcaster.NewRecorder记录”replication-controller”的事件;

(2) 将调用NewBaseController;

2. 在NewBaseController方法中,

(1) 构建ReplicaSetController对象,包括了podControl,它定义了对Pod的操作,是由RealPodControl去调用apiserver完成创建实现;

(2) 将调用 rsInformer.Informer().AddEventHandler,这将调用rsInformer的构造函数NewReplicaSetInformer,rsInformer将event handler包装成listerner,然后添加到s.processor.listeners中,并定义对象处理的回调函数AddFunc、UpdateFunc、DeleteFunc;

(3) 同时,调用rsInformer的Lister方法;

(4) 最后,调用rsInformer.Informer().HasSynced,判断是否缓存完成;

(5) 以此方式,调用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;

(6) 设置rsc.syncHandler;syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

PodInformer

client-go/informers/core/v1/pod.go NewPodInformer

1. 构建cache.listWatch对象,定义了ListFunc和WatchFunc;

2. 调用cache.NewSharedIndexInformer;

NewSharedIndexInformer

client-go/tools/cache/shared_informer.go NewSharedIndexInformer

运行ReplicationManager

pkg/controller/replicaset/replica_set.go Run

1. 调用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,将调用ca che.WaitForCacheSync;

2. 调用rsc.worker, 将启动workers调用rsc.syncHandler,syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

sharedInformerFactory.Start

client-go/informers/factory.go Start

1. 调用Informer.Run,这里调用SharedIndexInformer.Run;

SharedIndexInformer.Run

client-go/tools/cache/shared_informer.go Run

1. 调用NewDeltaFIFO,创建queue;

2. 定义Deltas处理函数s.HandleDeltas;

3. 调用New(cfg),构建sharedIndexInformer的controller;

4. 调用s.cacheMutationDetector.Run,检查缓存对象是否变化;

5. 调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;

6. 调用s.controller.Run,构建Reflector,进行对etcd的缓存;

sharedIndexedInformer.controller.Run

client-go/tools/cache/controller.go Run

1. 调用NewReflector,构建Reflector;

(1) Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;

2. 调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;

3. 调用c.processLoop,reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;

controller.processLoop

client-go/tools/cache/controller.go processLoop

1. cache.PopProcessFunc(c.config.Process)将前面Process函数传递进去;

DeltaFIFO.Pop

client-go/tools/cache/delta_fifo.go Pop

1. 主要从f.items取出object,然后调用process函数进行处理;

处理DeltaFIFO

client-go/tools/cache/shared_informer.go HandleDeltas

1. 调用s.process.distribute,将调用Listener.add,负责将watch的资源传到listener;

Listener.add/pop/run

client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;

1. listenser的add函数负责将notify装进pendingNotifications;

2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel;

3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet;

rsc.addPod

pkg/controller/replicaset/replica_set.go addPod

1. 首先会根据pod返回rc,当pod不属于任何rc时,则返回。找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。然后将这个rc放入队列;

2. 调用rsc.enqueueReplicaSet,将调用rsc.queue.Add;

rsc.worker

pkg/controller/replicaset/replica_set.go worker()

1. 调用rsc.syncHandler,这里会调用rsc.syncReplicaSet,syncReplicaSet负责pod与rc的同步,确保Pod副本数与rc规定的相同;

本文转自kubernetes中文社区- kubernetes1.9源码阅读 replication controller的Informer机制
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1月前
|
Kubernetes 负载均衡 应用服务中间件
k8s学习--ingress详细解释与应用(nginx ingress controller))
k8s学习--ingress详细解释与应用(nginx ingress controller))
169 0
|
1月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
172 0
|
3月前
|
Kubernetes 监控 Perl
在k8S中,自动扩容机制是什么?
在k8S中,自动扩容机制是什么?
|
3月前
|
弹性计算 运维 Kubernetes
Kubernetes(K8S) Controller - Deployment 介绍
Kubernetes(K8S) Controller - Deployment 介绍
38 1
|
3月前
|
存储 网络安全 API
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
【Azure Service Bus】 Service Bus如何确保消息发送成功,发送端是否有Ack机制 
|
3月前
|
Kubernetes Java 调度
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
在K8S中,Pod突然挂掉,K8S有什么机制或功能自动清除Pod?
|
3月前
|
Kubernetes 安全 Linux
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
在k8S中,PodSecurityPolicy 机制能实现哪些安全策略?
|
3月前
|
Kubernetes 安全 调度
在k8S中, PodSecurityPolicy机制是什么?
在k8S中, PodSecurityPolicy机制是什么?
|
3月前
|
Kubernetes 容器 Perl
在K8S中,Replica Set和Replication Controller之间有什么区别?
在K8S中,Replica Set和Replication Controller之间有什么区别?
|
3月前
|
Kubernetes 监控 Perl
在K8S中,RC的机制是什么?
在K8S中,RC的机制是什么?