python任务调度利器-APScheduler

本文涉及的产品
云数据库 MongoDB,通用型 2核4GB
简介: APScheduler是Python的任务调度库,提供基于时间、固定时间点和CRONTAB的任务调度,适用于离线作业和缓存更新等场景。它包含触发器、调度器、任务存储器、执行器和任务事件组件。安装使用`pip install apscheduler`,简单示例展示了如何配置调度器、添加任务并监听任务异常。支持的触发器有间隔、日期和CRON类型,执行器包括线程池和进程池等。任务存储器可选择内存或各种数据库存储。调度器模式有BlockingScheduler和BackgroundScheduler等,可进行任务的添加、删除、暂停和修改,并监听任务事件。5月更文挑战第20天

python任务调度利器-APScheduler

任务调度应用场景

所谓的任务调度是指安排任务的执行计划,即何时执行,怎么执行等。在现实项目中经常出现它们的身影;
特别是数据类项目,比如实时统计每5分钟网站的访问量,就需要每5分钟定时从日志数据分析访问量。

总结下任务调度应用场景:

  • 离线作业调度: 按时间粒度执行某项任务
  • 共享缓存更新: 定时刷新缓存,如redis缓存;不同进程间的共享数据

任务调度工具

  • linux的crontab, 支持按照分钟/小时/天/月/周粒度,执行任务
  • java的Quartz
  • windows的任务计划

本文介绍的是python中的任务调度库,APScheduler(advance python scheduler)。如果你了解Quartz的话,可以看出APScheduler是Quartz的python实现;APScheduler提供了基于时间,固定时间点和crontab方式的任务调用方案, 可以当作一个跨平台的调度工具来使用。

APScheduler

组件介绍

APScheduler由5个部分组成:触发器、调度器、任务存储器、执行器和任务事件。

  • 任务job: 任务id和任务执行func
  • 触发器triggers:确定任务何时开始执行
  • 任务存储器job stores: 保存任务的状态
  • 执行器executors:确定任务怎么执行
  • 任务事件event:监控任务执行异常情况
  • 调度器schedulers:串联任务的整个生命周期,添加编辑任务任务存储器,在任务的执行时间到来时,把任务交给执行器执行返回结果;同时发出事件监听,监控任务事件

apscheduler.jpg

安装

pip install apscheduler

简单例子

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
import datetime

# 任务执行函数
def job_func(job_id):
    print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

# 事件监听
def job_exception_listener(event):
    if event.exception:
        # todo: 异常处理, 告警等
        print('The job crashed :(')
    else:
        print('The job worked :)')

# 日志
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

# 定义一个后台任务非阻塞调度器
scheduler = BackgroundScheduler()
# 添加一个任务到内存中 
# 触发器:trigger='interval' seconds=10 每10s触发执行一次
# 执行器:executor='default' 线程执行
# 任务存储器:jobstore='default' 默认内存存储
# 最大并发数:max_instances
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
# 设置任务监听
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

# 启动调度器
scheduler.start()

运行情况:

job 1 is runed at 2020-03-21 20:00:38
The job worked :)
job 1 is runed at 2020-03-21 20:00:48
The job worked :)
job 1 is runed at 2020-03-21 20:00:58
The job worked :)

触发器

触发器决定何时执行任务,APScheduler支持的触发器有3种

  • trigger='interval': 按固定时间周期执行,支持weeks,days,hours,minutes, seconds, 还可指定时间范围
      sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
    
  • trigger='date': 固定时间,执行一次
      sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
    
  • trigger='cron': 支持crontab方式,执行任务

    • 参数:分钟/小时/天/月/周粒度,也可指定时间范围
         year (int|str)4-digit year
         month (int|str) – month (1-12)
         day (int|str) – day of the (1-31)
         week (int|str) – ISO week (1-53)
         day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
         hour (int|str) – hour (0-23)
         minute (int|str) – minute (0-59)
         second (int|str) – second (0-59)
         start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
         end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
      
    • 例子

         # 星期一到星期五,5点30执行任务job_function,直到2014-05-30 00:00:00
         sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
      
         # 按照crontab格式执行, 格式为:分钟 小时 天 月 周,*表示所有
         # 5月到8月的1号到15号,0点0分执行任务job_function
         sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
      

执行器

执行器决定如何执行任务;APScheduler支持4种不同执行器,常用的有pool(线程/进程)和gevent(io多路复用,支持高并发),默认为pool中线程池, 不同的执行器可以在调度器的配置中进行配置(见调度器)

  • apscheduler.executors.asyncio: 同步io,阻塞
  • apscheduler.executors.gevent:io多路复用,非阻塞
  • apscheduler.executors.pool: 线程ThreadPoolExecutor和进程ProcessPoolExecutor
  • apscheduler.executors.twisted: 基于事件驱动

任务存储器

任务存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有:

  • apscheduler.jobstores.memory: 内存
  • apscheduler.jobstores.mongodb: 存储在mongodb
  • apscheduler.jobstores.redis: 存储在redis
  • apscheduler.jobstores.rethinkdb:存储在rethinkdb
  • apscheduler.jobstores.sqlalchemy:支持sqlalchemy的数据库如mysql,sqlite等
  • apscheduler.jobstores.zookeeper:zookeeper

不同的任务存储器可以在调度器的配置中进行配置(见调度器)

调度器

APScheduler支持的调度器方式如下,比较常用的为BlockingScheduler和BackgroundScheduler

  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
  • GeventScheduler:适用于使用gevent模块的应用程序。
  • TwistedScheduler:适用于构建Twisted的应用程序。
  • QtScheduler:适用于构建Qt的应用程序。

从前面的例子,我们可以看到,调度器可以操作任务(并为任务指定触发器、任务存储器和执行器)和监控任务。

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

我们来详细看下各个部分

  • 调度器配置: 在add_job我们看到jobstore和executor都是default,APScheduler在定义调度器时可以指定不同的任务存储和执行器,以及初始的参数

      from pytz import utc
    
      from apscheduler.schedulers.background import BackgroundScheduler
      from apscheduler.jobstores.mongodb import MongoDBJobStore
      from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
      from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
      # 通过dict方式执行不同的jobstores、executors和默认的参数
      jobstores = {
         
         
          'mongo': MongoDBJobStore(),
          'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
      }
      executors = {
         
         
          'default': ThreadPoolExecutor(20),
          'processpool': ProcessPoolExecutor(5)
      }
      job_defaults = {
         
         
          'coalesce': False,
          'max_instances': 3
      }
      # 定义调度器
      scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
    
      def job_func(job_id):
          print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
      # 添加任务
      scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)
      # 启动调度器
      scheduler.start()
    
  • 操作任务: 调度器可以增加,删除,暂停,恢复和修改任务。需要注意的是这里的操作只是对未执行的任务起作用,已经执行和正在执行的任务不受这些操作的影响。

    • add_job
         scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
      
    • remove_job: 通过任务唯一的id,删除的时候对应的任务存储器里记录也会删除
        scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
        scheduler.remove_job('my_job_id')
      
    • Pausing and resuming jobs: 暂停和重启任务
        scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
        scheduler.pause_job('my_job_id')
        scheduler.resume_job('my_job_id')
      
    • Modifying jobs: 修改任务的配置
        job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)
        # 修改任务的属性
        job.modify(max_instances=6, name='Alternate name')
        # 修改任务的触发器
        scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
      
  • 监控任务事件类型,比较常用的类型有:

    • EVENT_JOB_ERROR: 表示任务在执行过程的出现异常触发
    • EVENT_JOB_EXECUTED: 任务执行成功时
    • EVENT_JOB_MAX_INSTANCES: 调度器上执行的任务超过配置的参数时

         scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
      

参考文档:https://apscheduler.readthedocs.io/en/stable/userguide.html

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。   相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
目录
相关文章
|
1月前
|
JSON 监控 调度
局域网管理软件的自动化任务调度:Python 中的 APScheduler 库的应用
使用 Python 的 APScheduler 库可简化局域网管理中的自动化任务调度。APScheduler 是一个轻量级定时任务调度库,支持多种触发方式如间隔、时间、日期和 Cron 表达式。示例代码展示了如何创建每 10 秒执行一次的定时任务。在局域网管理场景中,可以利用 APScheduler 定期监控设备状态,当设备离线时自动提交数据到网站,提升管理效率。
100 0
|
1月前
|
调度 Python
Python任务调度神器:APScheduler使用详解
Python任务调度神器:APScheduler使用详解
119 3
|
1月前
|
存储 Linux 调度
太好用了!Python 定时任务调度框架 APScheduler 详解!
太好用了!Python 定时任务调度框架 APScheduler 详解!
197 0
|
1月前
|
Serverless 调度 Python
TemporalKit是一个用于Python的异步任务调度库
TemporalKit是一个用于Python的异步任务调度库
123 2
|
10月前
|
存储 Linux 调度
高效定时任务处理:深入学习Python中APScheduler库的奥秘
高效定时任务处理:深入学习Python中APScheduler库的奥秘
16790 7
高效定时任务处理:深入学习Python中APScheduler库的奥秘
|
JSON 网络协议 Shell
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 2
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现
211 0
|
网络协议 Linux 测试技术
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现 1
Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现
153 0
|
存储 监控 调度
运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践
我们一直都相信这样一种说法:协程是比多线程更高效的一种并发工作方式,它完全由程序本身所控制,也就是在用户态执行,协程避免了像线程切换那样产生的上下文切换,在性能方面得到了很大的提升。毫无疑问,这是颠扑不破的业界共识,是放之四海而皆准的真理。
运筹帷幄决胜千里,Python3.10原生协程asyncio工业级真实协程异步消费任务调度实践
|
存储 监控 NoSQL
日夕如是寒暑不间,基于Python3+Tornado6+APScheduler/Celery打造并发异步动态定时任务轮询服务
定时任务的典型落地场景在各行业中都很普遍,比如支付系统中,支付过程中因为网络或者其他因素导致出现掉单、卡单的情况,账单变成了“单边账”,这种情况对于支付用户来说,毫无疑问是灾难级别的体验,明明自己付了钱,扣了款,但是订单状态却未发生变化。所以,每一笔订单的支付任务流程中都需要一个定时轮询的备选方案,一旦支付中发生问题,定时轮询服务就可以及时发现和更正订单状态。
日夕如是寒暑不间,基于Python3+Tornado6+APScheduler/Celery打造并发异步动态定时任务轮询服务
|
自然语言处理 运维 调度
自动化运维?看看Python怎样完成自动任务调度⛵
本文讲解如何使用Python的schedule库完成自动化调度程序,通过代『发送邮件』这一代码实例展示了构建任务与任务调度的全过程。
2703 4
自动化运维?看看Python怎样完成自动任务调度⛵