深入K8S Job(三):cronJob controller源码分析-阿里云开发者社区

开发者社区> 店家小二> 正文

深入K8S Job(三):cronJob controller源码分析

简介: 源码流程图 概述 cronJob controller 的实现比较简单,使用 Cron - Wikipedia 的方法,确定调度规则,底层的调度对象就是依赖了 job,它不会去检查任何 Pod。 该 controller 也没有依赖各种 informer,就简单创建了一个循环运行的协程,每次遍历现有的 jobs & cronJobs,整理它们的关系并进行管理。
+关注继续查看

源码流程图

cronJob流程图

概述

cronJob controller 的实现比较简单,使用 Cron - Wikipedia 的方法,确定调度 规则,底层的调度对象就是依赖了 job,它不会去检查任何 Pod。

该 controller 也没有依赖各种 informer,就简单创建了一个循环运行的协程,每次遍历现有的 jobs & cronJobs,整理它们的关系并进行管理。

注意:kubernetes version >= 1.4 (ScheduledJob),>= 1.5(CronJob),需要给 apiserver 传递 --runtime-config=batch/v2alpha1=true 开启 batch/v2alpha1 API 才可用。

spec 关键字段

.spec.schedule 是 cronJob 的必填字段,该值是 Cron - Wikipedia 格式的字符串,例如:0 * * * *,或者 @hourly,来确定调度策略。

.spec.startingDeadlineSeconds 是可选字段,表示启动 Job 的期限(秒级别),如果因为任何原因而错过了被调度的时间,那么错误执行时间的 Job 被认为是失败的。如果没有指定,则没有期限。

.spec.concurrencyPolicy 也是可选字段,指定了 cronJob 创建 Job 的并发执行策略:

  • Allow(默认):允许并发运行 Job。
  • Forbid:禁止并发运行,如果前一个还没有完成,则直接跳过。
  • Replace:取消当前正在运行的 Jobs,然后新建 Job 来替换。

.spec.suspend 也是可选字段,如果设置为 true,则后续所有的执行都会被过滤掉,但是对当前已经在运行的 Job 不影响。默认为false

.spec.successfulJobsHistoryLimit.spec.failedJobsHistoryLimit 这两个字段也是可选的。它们指定了可以保留完成和失败 Job 数量的限制。
默认没有限制,所有成功和失败的 Job 都会被保留。然而,当运行一个 Cron Job 时,很快就会堆积很多 Job,推荐设置这两个字段的值。设置限制值为 0,相关类型的 Job 完成后将不会被保留。

CronJobController 结构

路径:pkg/controller/cronjob/cronjob_controller.go

type CronJobController struct {
 // 访问 kube-apiserver 的 client.
 kubeClient clientset.Interface
 // job 控制器,用于创建和删除 job.
 jobControl jobControlInterface
 // cronJob 控制器,用于更新状态.
 sjControl sjControlInterface
 // pod 控制器,用于list & delete pods // 在删除 job 时,同时也清理 job 创建的 pods.
 podControl podControlInterface
 // cronJob 相关的events, 通过该 recorder 进行广播
 recorder record.EventRecorder
}
注意:代码中有很多sj,因为以前不叫 cronJob,叫 scheduled jobs。

startCronJobController()

路径:cmd/kube-controller-manager/app/batch.go

startCronJobController() 是启动 cronJob controller 的入口函数。它会初始化 CronJobController 对象,并Run().

func startCronJobController(ctx ControllerContext) (bool, error) {
 // 在启动 cronJob controller 之前,判断下 cronJob 是否有配置生效 // 用户可以在创建k8s clusters时,通过修改kube-apiserver --runtime-config配置想要生效的 resource if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
 return false, nil
 }
 // 初始化 CronJobController 对象
 cjc, err := cronjob.NewCronJobController(
 ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
 )
 if err != nil {
 return true, fmt.Errorf("error creating CronJob controller: %v", err)
 }
 // Run go cjc.Run(ctx.Stop)
 return true, nil
}

syncAll()

CronJobController Run() 方法比较简单,就是每10s 循环调用 syncAll() 函数。
syncAll() 逻辑也比较清楚,根据初始化的 kubeClient, 获取所有的 jobs 和 cronJobs,并遍历所有 Jobs, 根据ObjectMeta.OwnerReferences 字段匹配是否由 cronJob controller 所创建。最后基于 cronJob 的UUID 进行整理。
最后处理所有的 cronJobs,确认需要调度的时间并根据并行策略创建 jobs,同步完后再清理所有已经 finished jobs。

func (jm *CronJobController) syncAll() {
 // 列出所有的 jobs
 jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
 if err != nil {
 utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
 return
 }
 js := jl.Items
 glog.V(4).Infof("Found %d jobs", len(js))

 // 列出所有的 cronJobs
 sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
 if err != nil {
 utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
 return
 }
 sjs := sjl.Items
 glog.V(4).Infof("Found %d cronjobs", len(sjs))

 // 遍历所有的 jobs, 根据 ObjectMeta.OwnerReferences 字段确定该 job 是否由 cronJob 所创建。 // 然后根据 cronJob uuid 进行排列
 jobsBySj := groupJobsByParent(js)
 glog.V(4).Infof("Found %d groups", len(jobsBySj))

 // 遍历所有的 cronJobs for _, sj := range sjs {
 // 进行同步 // 确定需要调度的时间,并根据 Spec.ConcurrencyPolicy 策略,确认如何来创建 jobs // 并更新 cronJob.Status
 syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
 // 清理所有已经完成的 jobs
 cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
 }
}

syncOne()

该接口就是 cronJob controller 中实现同步的关键部分。

func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
 nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)

 // 遍历所有获取到的 jobs // 1.记录到 childrenJobs 中,表示当前属于该 cronJob 的所有 Jobs,便于后面清理 cronJob 中记录的 active Jobs // 2.查看该 job 是否在 cronJob.Status.Active 的列表中 // - 如果在的话,且该 Job 已经 finished,则将该 job 从 active list 中删除 // - 如果不在,且该 Job 还没有 finished,则发送异常事件 
 childrenJobs := make(map[types.UID]bool)
 for _, j := range js {
 childrenJobs[j.ObjectMeta.UID] = true
 found := inActiveList(*sj, j.ObjectMeta.UID)
 if !found && !IsJobFinished(&j) {
 recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
 } else if found && IsJobFinished(&j) {
 deleteFromActiveList(sj, j.ObjectMeta.UID)
 // TODO: event to call out failure vs success.
 recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
 }
 }

 // 遍历 cronJob 所有的 active jobs, 根据前面的 childrenJobs 来判断该继续的 active job 是否还存在,如果不存在的话,也从 active list 中删除。 for _, j := range sj.Status.Active {
 if found := childrenJobs[j.UID]; !found {
 recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
 deleteFromActiveList(sj, j.UID)
 }
 }

 // 上面更新了 cronJob.Status.Active 字段,所以需要更新一把 cronJob
 updatedSJ, err := sjc.UpdateStatus(sj)
 if err != nil {
 glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
 return
 }
 *sj = *updatedSJ

 // 如果 cronJob 已经被用户删除,则直接 return if sj.DeletionTimestamp != nil {
 return
 }

 // 如果 cronJob 已经被 suspend,也直接 return if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
 glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
 return
 }

 // 根据 cronJob 的创建时间或最近一次的调度时间,和 cronJob.Spec.Schedule 配置,计算出到现在为止所有应该调度的时间点。
 times, err := getRecentUnmetScheduleTimes(*sj, now)
 if err != nil {
 recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
 glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
 return
 }
 // 如果返回的时间点列表为空,则表示该 cronJob 暂时还不需要调度,直接 return if len(times) == 0 {
 glog.V(4).Infof("No unmet start times for %s", nameForLog)
 return
 }
 // 有多次未满足的调度时间 if len(times) > 1 {
 glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
 }

 // scheduledTime 取列表中的最后一次时间
 scheduledTime := times[len(times)-1]
 tooLate := false // 如果用户配置了 Spec.StartingDeadlineSeconds,则需要判断 scheduledTime 是否满足条件 // 如果 now - scheduledTime > Spec.StartingDeadlineSeconds,则直接 return if sj.Spec.StartingDeadlineSeconds != nil {
 tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
 }
 if tooLate {
 glog.V(4).Infof("Missed starting window for %s", nameForLog)
 return
 }
 // scheduledTime 满足各种条件的情况下,就需要查看 cronJob 配置的并发策略 // 如果 ForbidConcurrent,且 active jobs > 0, 则直接 return; // 否则继续往下创建; if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
 glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
 return
 }
 // 如果 ReplaceConcurrent,则删除所有的 active jobs, 等后面重新创建 if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
 for _, j := range sj.Status.Active {
 glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)

 job, err := jc.GetJob(j.Namespace, j.Name)
 if err != nil {
 recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
 return
 }
 if !deleteJob(sj, job, jc, pc, recorder, "") {
 return
 }
 }
 }

 // 根据 cronJob.spec.JobTemplate,填充 job 的完整结构 // 比如 name, labels, OwnerReferences 等等。
 jobReq, err := getJobFromTemplate(sj, scheduledTime)
 if err != nil {
 glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
 return
 }
 // 创建 job
 jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
 if err != nil {
 recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
 return
 }
 glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
 recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

 // 根据创建 job 返回的 response,获取 ObjectReference 结构 // 用于记录到 cronJob.Status.Active 中
 ref, err := getRef(jobResp)
 if err != nil {
 glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
 } else {
 sj.Status.Active = append(sj.Status.Active, *ref)
 }
 // 设置最近一次的调度时间
 sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
 // 更新 cronJob if _, err := sjc.UpdateStatus(sj); err != nil {
 glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
 }

 return
}
本文转自SegmentFault-深入K8S Job(三):cronJob controller源码分析

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
k8s与网络--Flannel源码分析
之前在k8s与网络--Flannel解读一文中,我们主要讲了Flannel整体的工作原理。今天主要针对Flannel v0.10.0版本进行源码分析。首先需要理解三个比较重要的概念: 网络(Network):整个集群中分配给 flannel 要管理的网络地址范围 子网(Subnet):flanne.
1638 0
javascript:使用document.getElementById读取数据为空分析
  今天写个网页,想在页面加载onLoad时,动态显示由后台其他程序传来的数据时,用document.getElementById获取控件对象总是为空。但是检查了这个id确实是存在的。在网上查阅一番后才知道了其中的原因。
892 0
Linux USB Host-Controller的初始化代码框架分析【转】
转自:http://blog.csdn.net/zkami/article/details/2496770 usb_hcd_omap_probe (const struct hc_driver *driver) (dev/ohci/ohci-omap.
816 0
LinqToExcel.Extend 源码分析
废话不多说,我们直接来分析源码,首先我们查看目录结构 目录结构.png 目录结构功能 Extend 通用扩展方法 Parameter 公共实体类 Parser 解析器 Validate 验证工具集 目录结构展开.png 展开目录结构,我们能够更加请详细的分析出每个目录所完成的功能模块。
752 0
纯C实现的词法分析和lex实现的词法分析的对比
版权声明:您好,转载请留下本人博客的地址,谢谢 https://blog.csdn.net/hongbochen1223/article/details/50523336 (一):写在前面 在上面的学习当中,我们通过简单的lex例子,进一步扩展lex例子,通过和yacc的融合来进行简单英语语法分析。
815 0
React源码分析5 -- 组件通信,refs,key,ReactDOM
React源码系列文章,请多支持: [React源码分析1 — 组件和对象的创建(createClass,createElement)](https://www.atatech.org/articles/72905) [React源码分析2 — React组件插入DOM流程](http://www.atatech.org/articles/72908) [React源码分析3 — React
3642 0
k8s与日志--journalbeat源码解读
前言 对于日志系统的重要性不言而喻,参照沪江的一篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程 日志+大数据+AI的确有很多想象空间。
1227 0
Java并发:深入浅出AQS之独占锁模式源码分析
作者:凌风郎少原文链接:https://mp.weixin.qq.com/s/0WxKOqfvq1kVJDgk6NwWlgAbstractQueuedSynchronizer(以下简称AQS)作为java.util.concurrent包的基础,它提供了一套完整的同步编程框架,开发人员只需要实现其中几个简单的方法就能自由的使用诸如独占,共享,条件队列等多种同步模式。
626 0
深入K8S Job(三):cronJob controller源码分析
源码流程图 概述 cronJob controller 的实现比较简单,使用 Cron - Wikipedia 的方法,确定调度规则,底层的调度对象就是依赖了 job,它不会去检查任何 Pod。 该 controller 也没有依赖各种 informer,就简单创建了一个循环运行的协程,每次遍历现有的 jobs & cronJobs,整理它们的关系并进行管理。
1235 0
+关注
651
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载