安装
$ pip install apscheduler
体系结构
schedulers(调度器) - BlockingScheduler : 主线程中运行,阻塞线程 - BackgroundScheduler : 后台线程中运行,不会阻塞线程 - AsyncIOScheduler - GeventScheduler - TornadoScheduler - TwistedScheduler - QtScheduler triggers(触发器) - date 一次性任务 - run_date (datetime 或 str) 作业的运行日期或时间 - timezone (datetime.tzinfo 或 str) 指定时区 - interval 循环任务 - weeks (int) 间隔几周 - days (int) 间隔几天 - hours (int) 间隔几小时 - minutes (int) 间隔几分钟 - seconds (int) 间隔多少秒 - start_date (datetime 或 str) 开始日期 - end_date (datetime 或 str) 结束日期 - timezone (datetime.tzinfo 或str) 时区 - job stores(作业存储器) - cron 定时任务 - year (int 或 str) 年,4位数字 - month (int 或 str) 月 (范围1-12) - day (int 或 str) 日 (范围1-31 - week (int 或 str) 周 (范围1-53) - day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) - hour (int 或 str) 时 (范围0-23) - minute (int 或 str) 分 (范围0-59) - second (int 或 str) 秒 (范围0-59) - start_date (datetime 或 str) 最早开始日期(包含) - end_date (datetime 或 str) 最晚结束时间(包含) - timezone (datetime.tzinfo 或str) 指定时区 job stores(作业存储器) - 添加 job - add_job() - scheduled_job() - 移除 job - remove_job() - job.remove() - 修改 job - modify_job() - 获取 job 列表 - get_jobs() - 关闭 job - scheduler.shutdown(wait=false) executors(执行器) - ProcessPoolExecutor - ThreadPoolExecutor
简单示例
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() # 方式一:interval 间隔任务 scheduler.add_job(task_name, 'interval', seconds=5) # 方式二:cron 定时任务 */3 0-10 * * * scheduler.add_job(main, 'cron', minute="*/3", hour="0-10", day="*", month="*",day_of_week="*") # 方式三:date 定时任务(执行一次) scheduler.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5)) scheduler.start()
说明:
task_name 需要修改为自定义任务函数名称
其他示例
from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() def jobA(): print("{}: {}".format("job A", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) def jobC(): print("{}: {}".format("job C", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 添加作业, 方式一,可指定job_id scheduler.add_job(jobA, 'interval', seconds=5) scheduler.add_job(jobC, 'interval', minutes=2, id="job_c") # 添加作业, 方式二 @scheduler.scheduled_job('interval', seconds=5) def jobB(): print("{}: {}".format("job B", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 移除作业 scheduler.remove_job(job_id="job_c") scheduler.start()
关闭日志
如果日志太多,看不到自己的重要日志,那么就关闭apscheduler的日志
import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.ERROR)
参考
定时任务框架APScheduler学习详解
apscheduler的使用
Advanced Python Scheduler