一. 介绍及基本案例
APScheduler用起来十分方便,提供了基于日期,时间间隔及crontab类型的任务。为我们提供了构建专用调度器或者调度服务的基础模块。
APScheduler不是一个守护进程或服务,它自身不带有任何命令行工具。它主要在现有的程序中运行,
安装:pip3 install apscheduler
一个简单的任务间隔实例
# -*- coding: utf-8 -*-
import os
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
二. cron类的定时任务
文档地址:https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html
Parameters:
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of month (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
jitter (int|None) – delay the job execution by jitter seconds at most
# -*- coding: utf-8 -*-
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
# 每天8点40执行任务
scheduler.add_job(tick, 'cron', hour=18, minute=40)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
任务定时参考:
minute = '*/3' # 表示每5分钟执行一次
hour = '19-21', minute = '23' # 表示19:23, 20:23, 21:23 各执行一次任务
三. 调度器事件监听
APScheduler监听代码异常并记录到日志文件:
# -*- coding: utf-8 -*-
# File Name: listen_event.py
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log1.txt',
filemode='a')
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def date_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
print(1/0)
def my_listener(event):
if event.exception:
print('任务出错了!!!!!!')
else:
print('任务照常运行...')
scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一次性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger = logging
scheduler.start()
四. 配置调度器并使用
4.1 使用默认的作业存储器
# -*- coding: utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def my_job(id='my_job'):
print(id,'-->',datetime.datetime.now())
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(10)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
end_date='2020-05-30')
scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
try:
scheduler.start()
except SystemExit:
print('exit')
exit()
4.2 使用数据库作为作业存储器
# -*- coding: utf-8 -*-
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def my_job(id='my_job'):
print (id,'-->',datetime.datetime.now())
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(10)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10',\
end_date='2018-05-30')
scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
try:
scheduler.start()
except SystemExit:
print('exit')
exit()