import datetime import os import threading import time from module.counter import MaxCounter, CounterReachMaxCountException from module.logger import logger from module.notify import handle_notify class SchedulerWatcher: instance = None def __init__(self): self.alas_obj = None self.config = None self.watcher: threading.Thread = None self.warning_count = None self.warning_time = None self.current_task = None self.min_timedelta = None def set_alas_config(self, config): self.config = config def start_watch(self, min_timedelta, max_warning_count): if self.watcher is not None: if self.watcher.is_alive(): return self.min_timedelta = datetime.timedelta(minutes=min_timedelta) self.warning_count = MaxCounter(max_warning_count) logger.info(f"Scheduler watcher start") self.watcher = threading.Thread(target=self.watcher_thread, daemon=True) self.watcher.start() def request_extend_task_deadline(self): if self.watcher is None: return now = datetime.datetime.now() if self.warning_count.current_count > 0: self.warning_count.reset() self.warning_time = now + self.min_timedelta logger.info(f"Current task requests to extend the task deadline") else: if self.warning_time - now < datetime.timedelta(minutes=3): self.warning_time = now + self.min_timedelta logger.info(f"Current task requests to extend the task deadline") def no_task(self): if self.watcher is None: return self.current_task = None self.warning_time = None def switch_task(self, task): if self.watcher is None: return self.current_task = task self.warning_count.reset() self.warning_time = datetime.datetime.now() + self.min_timedelta def watcher_thread(self): while 1: time.sleep(60) if self.warning_time is None or self.current_task is None: continue now = datetime.datetime.now() if now > self.warning_time: try: self.warning_count.count_once() logger.warning(f"Current task reached time limit once") self.warning_time = now + self.min_timedelta except CounterReachMaxCountException: logger.error(f"Current task reached final time limit, assuming the scheduler is stuck") if not self.config.full_config.Restart_InstanceRestart_Enable: logger.error(f"Instance restart disabled, notify user") handle_notify( self.config.full_config.Alas_Error_OnePushConfig, title=f"Alas <{self.alas_obj.config_name}>: Scheduler Stuck", content=f"Task {self.current_task} reached final time limit, assuming the scheduler is stuck", ) os._exit(-1) def is_alive(self): if self.watcher is None: return False return self.watcher.is_alive() @staticmethod def get_instance() -> "SchedulerWatcher": if SchedulerWatcher.instance is None: SchedulerWatcher.instance = SchedulerWatcher() return SchedulerWatcher.instance class AzurLaneSchedulerWatcher: def __init__(self, config): full_config = config.full_config if full_config.Restart_SchedulerWatcher_Enable: self.watcher: SchedulerWatcher = SchedulerWatcher.get_instance() if not self.watcher.is_alive(): self.watcher.set_alas_config(config) self.watcher.start_watch( min_timedelta=full_config.Restart_SchedulerWatcher_TimeDelta, max_warning_count=full_config.Restart_SchedulerWatcher_WarningCount ) else: self.watcher = None def request_extend_task_deadline(self): if self.watcher is not None: self.watcher.request_extend_task_deadline() def switch_task(self, task): if self.watcher is not None: self.watcher.switch_task(task) def no_task(self): if self.watcher is not None: self.watcher.no_task()