1.APScheduler简介
APScheduler是一个Python库,可让您安排稍后执行的Python代码,是一套任务调度框架,可以用来做定时任务控制器,可以添加删除任务。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。
😁我这里先给出一个写调度api的模版吧
步骤一:写好自己运行的函数 def fuction()
步骤二:创建flask路由,在路由下定义一个需要调度的函数def job_function():,把之前的def fuction()函数放到里面
步骤三:创建调度任务函数,def task(),这里需要配置任务调度的方式等参数
步骤四:运行任务 task(),主要写在主函数外面
步骤五:主函数运行api接口,app.run()
from flask import Flask from flask_apscheduler import APScheduler import datetime def fuction() print("showmaker") app = Flask(__name__) @app.route("/test", methods=['GET','POST']) def job_function(): fuction() def task(): scheduler = APScheduler() scheduler.init_app(app) # 定时任务的格式 scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id') scheduler.start() # 写在main里面,IIS不会运行 task() if __name__ == "__main__": app.run(debug=False, port="5005")
2.APScheduler调度方式
APScheduler有三种可以调度任务的方式:
我们的目的就是通过api接口,在终端一直运行这个接口进程,让它自己去调度我们的程序,这里的程序也就是任务。
- 可以选择调度的任务,开始和结束的时间
- 基于一个时间区间,间隔的执行任务
- 一次性任务执行
3.APScheduler组件
知道了我们的任务有哪些调度方式,现在了解一下APScheduler的组件:
- triggers: 任务触发器组件,提供任务触发方式
- job stores: 任务商店组件,提供任务保存方式
- executors:任务调度组件,提供任务调度方式
- schedulers: 任务调度组件,提供任务工作方式
3.1 triggers
triggers: 支持三种任务触发方式
date:
固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建
参数 | 说明 |
run_date | 任务运行日期 |
timezone | 时区 |
scheduler.add_job(func=job_function, trigger='date',run_date='2021-10-27 13:00:00', id='my_job_id')
interval:
时间间隔触发器,每个一定时间间隔执行一次。
参数 | 说明 |
weeks (int) | 间隔几周 |
days (int) | 间隔几天 |
hours (int) | 间隔几小时 |
minutes (int) | 间隔几分钟 |
seconds (int) | 间隔多少秒 |
start_date (datetime 或 str) | 开始日期 |
end_date (datetime 或 str) | 结束日期 |
scheduler.add_job(func=job_function, trigger='interval',seconds=3600, id='my_job_id')
cron
cron风格的任务触发,这个不常用就不介绍了,我懒
3.2 job stores:支持四种任务存储方式
memory:默认配置任务存在内存中
mongdb:支持文档数据库存储
sqlalchemy:支持关系数据库存储
redis:支持键值对数据库存储
这里如果你的数据库是mysql,我推荐使用sqlalchemy作为数据库,用过create_engine来连接数据库实现数据库操作,比如说删除数据记录,写入数据记录。这里是我自己的例子
from flask import Flask from flask_apscheduler import APScheduler import datetime import pandas as pd from sqlalchemy import create_engine from sqlalchemy import MetaData,Table def delete(): engine = create_engine('mysql+pymysql://用户名:密码@ip:端口号/数据库?charset=gbk') meta = MetaData(bind=engine) tb_1 = Table('数据表名', meta, autoload=True, autoload_with=engine) tb_2 = Table('数据表名', meta, autoload=True, autoload_with=engine) dlt_1=tb_1.delete() dlt_2=tb_2.delete() conn = engine.connect() # 执行delete操作 conn.execute(dlt_1) conn.execute(dlt_2) conn.close() def write(t): r1=pd.DataFrame() r3=pd.DataFrame() engine = create_engine('mysql+pymysql://用户名:密码@ip:端口号/数据库?charset=gbk') r1.to_sql('数据表名', con=engine, if_exists='append', index=False) r3.to_sql('数据表名', con=engine, if_exists='append', index=False) def write_log(buf): print(buf) with open('test.txt', 'a') as f: f.write(buf + "\n") app = Flask(__name__) @app.route("/test", methods=['GET','POST']) def job_function(): now_time=datetime.datetime.now() t=now_time.strftime('%Y-%m-%d') write_log("update time:"+t) delete() write_log("delete data") write(t) write_log("write data") def task(): scheduler = APScheduler() scheduler.init_app(app) # 定时任务,每隔10s执行1次 scheduler.add_job(func=job_function, trigger='interval',seconds=60, id='my_job_id') scheduler.start() # 写在main里面,IIS不会运行 task() if __name__ == "__main__": app.run(debug=False, port="8888")
最后可以在test.txt查看自己的调用的时间
3.3 schedulers:
调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用
BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
BackgroundScheduler: 当不运行其它框架 的时候使用,并使你的任务在 后台运行
AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
GeventScheduler: 和 gevent 框架配套使用
TornadoScheduler: 和 tornado 框架配套使用
TwistedScheduler: 和 Twisted 框架配套使用
QtScheduler: 开发 qt 应用的时候使用