import datetime import pathlib import threading import time from dataclasses import dataclass from typing import Dict from module.config.config import AzurLaneConfig from module.counter import MaxCounter from module.notify import handle_notify from module.submodule.utils import has_config_mod from module.webui.process_manager import ProcessManager @dataclass class InstanceSetting: counter: MaxCounter enable_notify: bool notify_config: str class InstanceWatcher: STATUS_DIR = "./bin/status" instance = None def __init__(self): self.watcher: threading.Thread = None self.instances: Dict[str, InstanceSetting] = dict() self.rest_delta = datetime.timedelta(hours=24) self.reset_time = datetime.datetime.now() + self.rest_delta def start(self): if self.watcher is not None: if self.watcher.is_alive(): return self.watcher = threading.Thread(target=self.watcher_thread, daemon=True) self.watcher.start() def check_counter(self): now = datetime.datetime.now() if now > self.reset_time: for instance in self.instances.values(): instance.counter.reset() self.reset_time = now + self.rest_delta def check_instances(self): ins_has_triggered = [] for name, setting in self.instances.items(): ins = ProcessManager.get_manager(name) if not ins.alive: if ins.state != 3: continue if setting.counter.count_once(throw=False): ins.start("alas") if setting.enable_notify: handle_notify( setting.notify_config, title=f"Alas <{ins.config_name}> instance auto restarted", content=f"Critical error occurred, instance restarted", ) else: ins_has_triggered.append(name) handle_notify( setting.notify_config, title=f"Alas <{ins.config_name}> instance restarted too many times", content=f"Too many critical error occurred, instance restarted too many times", ) for i in ins_has_triggered: self.remove_instance(i) def add_status_file(self, name): f = pathlib.Path(InstanceWatcher.STATUS_DIR) / f"{name}.status" if not f.exists(): f.parent.mkdir(parents=True, exist_ok=True) f.touch() def remove_status_file(self, name): f = pathlib.Path(InstanceWatcher.STATUS_DIR) / f"{name}.status" if f.exists(): f.parent.mkdir(parents=True, exist_ok=True) f.unlink() def get_all_prev_instance(self): f = pathlib.Path(InstanceWatcher.STATUS_DIR) if not f.exists(): f.mkdir(parents=True, exist_ok=True) return [i.stem for i in f.glob("*.status")] def has_status_file(self, name): f = pathlib.Path(InstanceWatcher.STATUS_DIR) / f"{name}.status" return f.exists() def try_add_instance(self, name): if name in self.instances: return config = AzurLaneConfig(name) full_config = config.full_config if full_config.Restart_InstanceRestart_Enable: setting = InstanceSetting( counter=MaxCounter(full_config.Restart_InstanceRestart_MaxRetryTimes), enable_notify=full_config.Restart_InstanceRestart_Notify, notify_config=full_config.Alas_Error_OnePushConfig ) self.instances[name] = setting self.add_status_file(name) def check_instance_status(self): for config_name in self.get_all_prev_instance(): if not has_config_mod(config_name): continue ins = ProcessManager.get_manager(config_name) if ins.alive: continue if ins.detailed_instance_status == ProcessManager.DetailInstanceStatus.NoRenderables: ins.start("alas") def remove_instance(self, name): try: self.instances.pop(name) self.remove_status_file(name) except Exception: ... def watcher_thread(self): while 1: time.sleep(60) try: self.check_counter() self.check_instances() self.check_instance_status() except Exception: ... @staticmethod def get_instance() -> "InstanceWatcher": if InstanceWatcher.instance is None: InstanceWatcher.instance = InstanceWatcher() return InstanceWatcher.instance