replication controller是kube-controller-manager中一个重要的控制器,主要是rs进行控制,确保pods的数量恰好和rs的规定一致。因此replication controller主要对这两类进行watch,一类是replicationset,另一类是pods。本文是replication controller的源码阅读笔记,会包括client-go的Informer机制,希望帮助开始阅读kubernetes源码的小伙伴们,更希望与对kubernetes源码阅读感兴趣的小伙伴儿们交流,有错误的地方也希望能指出,共同进步。
入口程序
cmd/kube-controller-manager/controller-manager.go main
1. 调用options.NewCMServer,构建CMServer;
2. 调用app.Run方法,运行CMServer;
启动CMServer
cmd/kube-controller-manager/app/controllermanager.go Run
1. 调用createClients, 创建apiserver客户端,通过REST方式访问APIserver提供的API服务;
2. 启动协程调用go startHTTP,运行http Server;
3. 调用record.NewBroadcaster,创建eventBroadcaster对象,接收EventBroadcaster发送的event,输出到logging中,并输出到EventSink,并使用recorder记录”controller-mananger”的事件;
4. 调用CreateControllerContext,在CreateControllerContext方法中,会调用informers.NewSharedInformerFactory,创建sharedInformerFactory(client-go/informers/factory.go)对象;
5. 调用StartControllers,启动Controllers;
6. 调用ctx.InformerFactory.Start,在这里是调用sharedInformerFactory.start(client-go/informers/factory.go);
7. saTokenControllerInitFunc和NewControllerInitializers定义了controllers的InitFunc;
startReplicationController
cmd/kube-controller-manager/app/core.go startReplicationController
1. 协程启动调用replicationcontroller.NewReplicationManager,构建ReplicationManager;
(1) 调用ctx.InformerFactory.Core().V1().Pods(),这里调用sharedInformerFactory(client-go/informers/factory.go)对象的Core().V1().Pods()方法,将会构建PodInformer对象,
(2) 以此方式,创建ReplicationControllersInformer;
2. 运行ReplicationManager;
NewReplicationManager
pkg/controller/replication/replication_set.go NewReplicationManager
1. 在NewReplicationManager中,
(1) 首先,调用record.NewBroadcaster,创建eventBroadcaster对象,调用eventBroadcaster.StartLogging,接收EventBroadcaster发送的event,输出到logging中;调用 eventBroadcaster.StartRecordingToSink,event输出到EventSink,并调用eventBroadcaster.NewRecorder记录”replication-controller”的事件;
(2) 将调用NewBaseController;
2. 在NewBaseController方法中,
(1) 构建ReplicaSetController对象,包括了podControl,它定义了对Pod的操作,是由RealPodControl去调用apiserver完成创建实现;
(2) 将调用 rsInformer.Informer().AddEventHandler,这将调用rsInformer的构造函数NewReplicaSetInformer,rsInformer将event handler包装成listerner,然后添加到s.processor.listeners中,并定义对象处理的回调函数AddFunc、UpdateFunc、DeleteFunc;
(3) 同时,调用rsInformer的Lister方法;
(4) 最后,调用rsInformer.Informer().HasSynced,判断是否缓存完成;
(5) 以此方式,调用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;
(6) 设置rsc.syncHandler;syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;
PodInformer
client-go/informers/core/v1/pod.go NewPodInformer
1. 构建cache.listWatch对象,定义了ListFunc和WatchFunc;
2. 调用cache.NewSharedIndexInformer;
NewSharedIndexInformer
client-go/tools/cache/shared_informer.go NewSharedIndexInformer
运行ReplicationManager
pkg/controller/replicaset/replica_set.go Run
1. 调用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,将调用ca che.WaitForCacheSync;
2. 调用rsc.worker, 将启动workers调用rsc.syncHandler,syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;
sharedInformerFactory.Start
client-go/informers/factory.go Start
1. 调用Informer.Run,这里调用SharedIndexInformer.Run;
SharedIndexInformer.Run
client-go/tools/cache/shared_informer.go Run
1. 调用NewDeltaFIFO,创建queue;
2. 定义Deltas处理函数s.HandleDeltas;
3. 调用New(cfg),构建sharedIndexInformer的controller;
4. 调用s.cacheMutationDetector.Run,检查缓存对象是否变化;
5. 调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;
6. 调用s.controller.Run,构建Reflector,进行对etcd的缓存;
sharedIndexedInformer.controller.Run
client-go/tools/cache/controller.go Run
1. 调用NewReflector,构建Reflector;
(1) Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;
2. 调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;
3. 调用c.processLoop,reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;
controller.processLoop
client-go/tools/cache/controller.go processLoop
1. cache.PopProcessFunc(c.config.Process)将前面Process函数传递进去;
DeltaFIFO.Pop
client-go/tools/cache/delta_fifo.go Pop
1. 主要从f.items取出object,然后调用process函数进行处理;
处理DeltaFIFO
client-go/tools/cache/shared_informer.go HandleDeltas
1. 调用s.process.distribute,将调用Listener.add,负责将watch的资源传到listener;
Listener.add/pop/run
client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;
1. listenser的add函数负责将notify装进pendingNotifications;
2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel;
3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet;
rsc.addPod
pkg/controller/replicaset/replica_set.go addPod
1. 首先会根据pod返回rc,当pod不属于任何rc时,则返回。找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。然后将这个rc放入队列;
2. 调用rsc.enqueueReplicaSet,将调用rsc.queue.Add;
rsc.worker
pkg/controller/replicaset/replica_set.go worker()
1. 调用rsc.syncHandler,这里会调用rsc.syncReplicaSet,syncReplicaSet负责pod与rc的同步,确保Pod副本数与rc规定的相同;