# 只有满足条件才执行 if self.call_condition is None or self.call_condition != self.cur_condition: self.call_condition = self.cur_condition return True return False def call(self): """执行定时回调""" try: self.call_able(*self.call_args) except Exception as e: print("call", self.call_able, "error:", e) @staticmethod def check_target_value(src_value, des_value): """检查指定值""" return src_value == des_value @staticmethod def check_delay_value(src_value, des_value): """检查每隔多久""" return src_value > des_value: @staticmethod def check_range_value(src_value, min_value, max_value): """检查区间""" return src_value >= min_value and src_value <= max_value @staticmethod def check_multi_value(src_value, values): """检查多个值""" return src_value in values @staticmethod def _sub_months(st, ed): return (ed.year - st.year)*12 + (ed.month - st.month) @staticmethod def _parse_part(value): """解析单个数据""" value = value.strip() if value == "*": return -1, -1 # 指定时间一次 if value.isdigit(): return 0, int(value, 10) # */v 每隔多久一次 fraction_idx = value.find("/") if fraction_idx >= 0 : denominator = value[fraction_idx + 1:] if not denominator.isdigit(): return -1, -1 if value.startswith("*"): return 1, int(denominator, 10) else: # 特殊区间A-B/N 每个多久,且在区间A,B内 range_idx = value.find("-") if range_idx < 0: return -1, -1 # 区间 + 间隔 values = [] range_values = value[:fraction_idx].split("-") if len(range_values) != 2: return -1, -1 # 必须是数字,且只存在2个值 if range_values[0].isdigit() and range_values[1].isdigit(): values.append(int(range_values[0], 10)) values.append(int(range_values[1], 10)) values.append(int(denominator, 10)) return 1, values # 多个时间点 A,B,...,Z dom_idx = value.find(",") if dom_idx >= 0: values = [] for v in value.split(","): if v.isdigit(): values.append(int(v, 10)) return 2, values # 区间A-B range_idx = value.find("-") if range_idx >= 0: values = [] range_values = value.split("-") if len(range_values) != 2: return -1, -1 # 必须是数字,且只存在2个值 if range_values[0].isdigit() and range_values[1].isdigit(): values.append(int(range_values[0],10)) values.append(int(range_values[1],10)) return 3, values return -1, -1 return -1, -1 class Timer: def __init__(self, not_thread=False): """创建定时器对象""" self._thread = None self._timer_data = {} if not not_thread: self._thread = threading.Thread(target=self._update, args=()) self._thread.start() def register(self, time_fmt, call_able, *args): """注册定时器""" timer_data = TimerInfo(time_fmt, call_able, args) if not timer_data.enable: return timer_id = uuid.uuid4() self._timer_data[timer_id] = timer_data return timer_id def _update(self): """内部更新""" while self._thread: self.update(datetime.datetime.now()) time.sleep(5) def update(self, date_time): """外部更新使用""" if not isinstance(date_time, datetime.datetime): raise TypeError("date_time must be datetime") remove_list = [] for tid, node in self._timer_data.items(): if not node.enable: remove_list.append(tid) continue if node.on_timer(date_time): node.call() # 删除已标记删除的定时器列表 for tid in remove_list: del self._timer_data[tid] def remove(self, timer_id): """移除定时器""" if timer_id in self._timer_data: # 只标记 self._timer_data[timer_id].enable = False