import datetime import os import threading import time from module.counter import MaxCounter, CounterReachMaxCountException from module.logger import logger from module.notify import handle_notify from module.config.config import AzurLaneConfig class SchedulerWatcher: instance = None def __init__(self): self.alas_obj = None self.config: AzurLaneConfig = None self.watcher: threading.Thread = None self.warning_count = None self.warning_time = None self.current_task = None self.min_timedelta = None def set_alas_obj(self, alas_obj): self.alas_obj = alas_obj self.config = alas_obj.config def start_watch(self, min_timedelta, max_warning_count): if self.watcher is not None: if self.watcher.is_alive(): return self.min_timedelta = datetime.timedelta(minutes=min_timedelta) self.warning_count = MaxCounter(max_warning_count) logger.info(f"Scheduler watcher start") self.watcher = threading.Thread(target=self.watcher_thread, daemon=True) self.watcher.start() def request_extend_task_deadline(self): if self.watcher is None: return now = datetime.datetime.now() if self.warning_count.current_count > 0: self.warning_count.reset() self.warning_time = now + self.min_timedelta logger.info(f"Current task requests to extend the task deadline") else: if self.warning_time - now < datetime.timedelta(minutes=3): self.warning_time = now + self.min_timedelta logger.info(f"Current task requests to extend the task deadline") def no_task(self): if self.watcher is None: return self.current_task = None self.warning_time = None def switch_task(self, task): if self.watcher is None: return self.current_task = task self.warning_count.reset() self.warning_time = datetime.datetime.now() + self.min_timedelta def watcher_thread(self): while 1: time.sleep(60) if self.warning_time is None or self.current_task is None: continue now = datetime.datetime.now() if now > self.warning_time: try: self.warning_count.count_once() logger.warning(f"Current task reached time limit once") self.warning_time = now + self.min_timedelta except CounterReachMaxCountException: logger.error(f"Current task reached final time limit, assuming the scheduler is stuck") if not self.config.full_config.Restart_InstanceRestart_Enable: logger.error(f"Instance restart disabled, notify user") handle_notify( self.config.full_config.Alas_Error_OnePushConfig, title=f"Alas <{self.alas_obj.config_name}>: Scheduler Stuck", content=f"Task {self.current_task} reached final time limit, assuming the scheduler is stuck", ) os._exit(-1) @staticmethod def get_instance() -> "SchedulerWatcher": if SchedulerWatcher.instance is None: SchedulerWatcher.instance = SchedulerWatcher() return SchedulerWatcher.instance def request_extend_task_deadline(): ins = SchedulerWatcher.get_instance() ins.request_extend_task_deadline() def no_task(): ins = SchedulerWatcher.get_instance() ins.no_task()