apscheduler
apscheduler 是 Python 的一个库,用于周期性地触发单个任务调度,实际上我们完全可以用 apscheduler 来实现一个自己的 cron。
apscheduler 中的几个概念:
- triggers,触发的计算引擎,apscheduler 除了支持 cron 之外,还支持 date 和 interval 两种;
- job store,用于记录每次的运行结果,上次运行时间等,这样当有错过的任务时才能知道需要补充执行多少次。默认是记在内存里,不过也支持 redis, mongo, mysql;
- executor,执行任务的 worker,常用的有 ThreadPoolExecutor 和 ProcessPoolExecutor, 也就是线程池和进程池;
- scheduler, 把以上几个概念串联起来做调度。
apscheduler 的使用也非常简单,直接看函数名大概就知道了。
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # scheduler.add_executor('processpool') # 使用进程池,默认是线程池 # scheduler.add_job_store("redis") # 使用 redis 作为 job store, 默认是内存 scheduler.add_job( myfunc, # 要执行的函数 trigger='cron', # 触发机制 id='my_job_id', # job_id args=[], # 执行函数的参数 kwargs={}, # 执行函数的字典参数 ) scheduler.remove_job('my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id') scheduler.reschedule_job("my_job_id") # 感觉叫 modify_job 更好一点。所有属性都可以改,除了 ID scheduler.start() scheduler.pause() scheduler.resume() scheduler.shutdown()
apscheduler 如何处理上面的三个问题
- 可以通过
max_instances
参数设置最大执行的实例个数; - 可以通过
misfire_grace_time
参数设置错过的任务的捞回时间,也就是在如果错过的时间不超过该值,就补充触发一次; - 可以通过
coalesce
参数设置当需要执行多次的时候是否合并为执行一次。
另外需要注意的一点是,apscheduler 并没有像传统的 vixie cron 一样每分钟都会唤醒一次,而是会休眠到最近的可执行任务需要触发的时候。同时为了能在休眠期间增加任务,每次调用 add_job 的时候会直接唤醒 scheduler。
在计算下次可运行时间的时候,apscheduler 会维护一个按照下次触发时间排序的队列,插入新任务会采用二分查找位置插入(不过我感觉用堆好一点啊……)。当使用其他的外部 job store 的时候则会利用这些数据库的不同机制,比如 redis 中就会使用 zset。
apscheduler 还支持添加 event listener 获取 job 的运行信息:
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
K8S 中的 cron job
在 kubernetes 中,除了 deployment 以外,我们也可以构建一次性或者定时运行的 job。定时任务也是按照 crontab 的格式来定义的。
apiVersion: batch/v1beta1 kind: CronJob metadata: name: hello spec: schedule: "*/1 * * * *" # cron format jobTemplate: spec: template: spec: containers: - name: hello image: busybox args: - /bin/sh - -c - date; echo Hello from the Kubernetes cluster restartPolicy: OnFailure
在 K8S 中,我们可以通过 .spec.concurrencyPolicy
来控制最多有多少个实例运行。K8S 建议每个 cron job 最好是幂等的,以免并发执行造成不可预料的结果。可选参数为:
- Allow(default),允许
- Forbid, 不允许
- Replace,干掉原来的,执行新的
当任务执行失败的时候,K8S 的行为非常令人迷惑,如果 .spec.startingDeadlineSeconds
没有设置的话,那么任务重试 100 次失败之后就彻底放弃了……WTF……关于这个具体实现不再赘述,可以参考后面的链接 9.
在现代的分布式系统中,除了定时任务之外,更重要的是不同的任务之间的执行次序和依赖关系,在后面的文章中,会介绍一下 airflow, luigi, argo 等工具的使用和实现。敬请期待。
PS. K8S 官方文档写得真是太烂了,典型的 over engineering。