import datetime import os import threading import time from module.logger import logger from module.config.deep import deep_get from module.notify import handle_notify class CounterReachMaxCountException(Exception): ... class Counter: def __init__(self, max_count): self.max_count = max_count self.current_count = 0 def count_once(self): self.current_count += 1 if self.current_count >= self.max_count: raise CounterReachMaxCountException() def reset(self): self.current_count = 0 class SchedulerWatcher: instance = None def __init__(self): self.alas_obj = None self.watcher: threading.Thread = None self.warning_count = None self.warning_time = None self.current_task = None self.min_timedelta = None def set_alas_obj(self, alas_obj): self.alas_obj = alas_obj def start_watch(self, min_timedelta, max_warning_count): if self.watcher is not None: if self.watcher.is_alive(): return self.min_timedelta = datetime.timedelta(minutes=min_timedelta) self.warning_count = Counter(max_warning_count) logger.info(f"Scheduler watcher start") self.watcher = threading.Thread(target=self.watcher_thread, daemon=True) self.watcher.start() def request_extend_task_deadline(self): if self.watcher is None: return now = datetime.datetime.now() if self.warning_count.current_count > 0: self.warning_count.reset() self.warning_time = now + self.min_timedelta logger.info(f"Current task requests to extend the task deadline") else: if self.warning_time - now < datetime.timedelta(minutes=3): self.warning_time = now + self.min_timedelta logger.info(f"Current task requests to extend the task deadline") def no_task(self): if self.watcher is None: return self.current_task = None self.warning_time = None def switch_task(self, task): if self.watcher is None: return self.current_task = task self.warning_count.reset() self.warning_time = datetime.datetime.now() + self.min_timedelta def watcher_thread(self): while 1: time.sleep(60) if self.warning_time is None or self.current_task is None: continue now = datetime.datetime.now() if now > self.warning_time: try: self.warning_count.count_once() logger.warning(f"Current task reached time limit once") self.warning_time = now + self.min_timedelta except CounterReachMaxCountException: logger.error(f"Current task reached final time limit, assuming the scheduler is stuck") config_data = self.alas_obj.config.data is_instance_restart_enabled = deep_get(config_data, "Restart.InstanceRestart.Enabled", False) if not is_instance_restart_enabled: logger.error(f"Instance restart disabled, notify user") err_push_config = deep_get(config_data, "Alas.Error.OnePushConfig") handle_notify( err_push_config, title=f"Alas <{self.alas_obj.config_name}>: Scheduler Stuck", content=f"Task {self.current_task} reached final time limit, assuming the scheduler is stuck", ) os._exit(-1) @staticmethod def get_instance() -> "SchedulerWatcher": if SchedulerWatcher.instance is None: SchedulerWatcher.instance = SchedulerWatcher() return SchedulerWatcher.instance def request_extend_task_deadline(): ins = SchedulerWatcher.get_instance() ins.request_extend_task_deadline() def no_task(): ins = SchedulerWatcher.get_instance() ins.no_task()