【Python】300行代码实现crontab定时器功能
熟悉Linux的都知道在Linux下有一个crontab的定时任务,可以很方便的进行各种定时、计划任务的执行。有时候写代码也需要用到定时器业务,因此我使用Python实现了一个类似的定时器模块,可以很方便的做定时业务,使用例子如下:
具体Timer模块的代码也贴在下面,提供了与crontab定时任务类似的功能实现,第一次贴全部代码,格式弄的有点乱。
# -*- coding:utf-8 -*- import re import uuid import time import datetime import threading class TimerInfo: """定时器配置""" def __init__(self, timer_info, call_able, call_args): """创建定时器配置""" self.minutes = (-1, -1) self.hours = (-1, -1) self.day_of_month = (-1, -1) self.month = (-1, -1) self.day_of_week = (-1, -1) self.enable = False # 默认不可用 self.last_check = None self._parse(timer_info) self.call_able = call_able self.call_args = call_args self.call_condition = None self.cur_condition = None def _parse(self, timer_info): """解析数据""" try: m, h, dfm, month, dfw = re.findall(r"\S+", timer_info) self.minutes = self._parse_part(m) self.hours = self._parse_part(h) self.day_of_month = self._parse_part(dfm) self.month = self._parse_part(month) self.day_of_week = self._parse_part(dfw) self.enable = True self.last_check = datetime.datetime.now() except Exception as e: print("invalid timer config : %s, error:%s" % (timer_info, e)) return def _check_condition(self, condition, cur_value, delay_param, delay_transform=1): """检查条件""" if condition[0] < 0: return True if condition[0] == 0: if not self.check_target_value(condition[1], cur_value): return False # 每隔多久执行一次 */N 每隔N分钟执行一次 elif condition[0] == 1: # 存在区间 if isinstance(condition[1], list): if not self.check_range_value(cur_value, condition[1][0], condition[1][1]): return False if not self.check_delay_value(delay_param, condition[1][2] * delay_transform): return False # 间隔 elif not self.check_delay_value(delay_param, condition[1] * delay_transform): return False # 多个M,N,X 在M,N,X时间执行 elif condition[0] == 2: if not self.check_multi_value(cur_value, condition[1]): return False # 区间A-B elif condition[0] == 3: if not self.check_range_value(cur_value, condition[1][0], condition[1][1]): return False # 异常 else: return False # 添加满足的条件 self.cur_condition += "%s" % cur_value return True def on_timer(self, date_time): """执行检查""" if not self.enable: return False # 计算日期差值 sub_time = date_time - self.last_check # 创建一个添加key,执行检查可能会成功,但运行过就不运行 self.cur_condition = "" # 分 if not self._check_condition(self.minutes, date_time.minute, sub_time.seconds, 60): self.call_condition = None return False # 时 if not self._check_condition(self.hours, date_time.hour, sub_time.seconds, 3600): self.call_condition = None return False # 日 if not self._check_condition(self.day_of_month, date_time.day, sub_time.days, 1): self.call_condition = None return False # 月 if not self._check_condition(self.month, date_time.month, self._sub_months(self.last_check, date_time), 1): self.call_condition = None return False # 周 if not self._check_condition(self.day_of_week, date_time.weekday(), sub_time.days/ 7, 1): self.call_condition = None return False # 满足条件则更新检查时间 self.last_check = date_time