1
0
mirror of https://github.com/0O0o0oOoO00/Alas.git synced 2026-05-14 16:49:26 +08:00
Files
Alas/module/instance_watcher.py

148 lines
5.0 KiB
Python

import datetime
import pathlib
import threading
import time
from dataclasses import dataclass
from typing import Dict
from module.config.config import AzurLaneConfig
from module.counter import MaxCounter
from module.notify import handle_notify
from module.submodule.utils import has_config_mod
from module.webui.process_manager import ProcessManager
@dataclass
class InstanceSetting:
counter: MaxCounter
enable_notify: bool
notify_config: str
class InstanceWatcher:
STATUS_DIR = "./bin/status"
instance = None
def __init__(self):
self.watcher: threading.Thread = None
self.instances: Dict[str, InstanceSetting] = dict()
self.rest_delta = datetime.timedelta(hours=24)
self.reset_time = datetime.datetime.now() + self.rest_delta
def start(self):
if self.watcher is not None:
if self.watcher.is_alive():
return
self.watcher = threading.Thread(target=self.watcher_thread, daemon=True)
self.watcher.start()
def check_counter(self):
now = datetime.datetime.now()
if now > self.reset_time:
for instance in self.instances.values():
instance.counter.reset()
self.reset_time = now + self.rest_delta
def check_instances(self):
ins_has_triggered = []
ins_inexists = []
for name, setting in self.instances.items():
if not has_config_mod(name):
ins_inexists.append(name)
continue
ins = ProcessManager.get_manager(name)
if not ins.alive:
if ins.state != 3:
continue
if setting.counter.count_once(throw=False):
ins.start("alas")
if setting.enable_notify:
handle_notify(
setting.notify_config,
title=f"Alas <{ins.config_name}> instance auto restarted",
content=f"Critical error occurred, instance restarted",
)
else:
ins_has_triggered.append(name)
handle_notify(
setting.notify_config,
title=f"Alas <{ins.config_name}> instance restarted too many times",
content=f"Too many critical error occurred, instance restarted too many times",
)
for i in ins_inexists:
self.remove_instance(i)
for i in ins_has_triggered:
self.remove_instance(i)
def add_status_file(self, name):
f = pathlib.Path(InstanceWatcher.STATUS_DIR) / f"{name}.status"
if not f.exists():
f.parent.mkdir(parents=True, exist_ok=True)
f.touch()
def remove_status_file(self, name):
f = pathlib.Path(InstanceWatcher.STATUS_DIR) / f"{name}.status"
if f.exists():
f.unlink()
def get_all_prev_instance(self):
f = pathlib.Path(InstanceWatcher.STATUS_DIR)
if not f.exists():
f.mkdir(parents=True, exist_ok=True)
return [i.stem for i in f.glob("*.status")]
def has_status_file(self, name):
f = pathlib.Path(InstanceWatcher.STATUS_DIR) / f"{name}.status"
return f.exists()
def try_add_instance(self, name, add_status_cache_file=True):
if name in self.instances:
return
config = AzurLaneConfig(name)
full_config = config.full_config
if full_config.Restart_InstanceRestart_Enable:
setting = InstanceSetting(
counter=MaxCounter(full_config.Restart_InstanceRestart_MaxRetryTimes),
enable_notify=full_config.Restart_InstanceRestart_Notify,
notify_config=full_config.Alas_Error_OnePushConfig
)
self.instances[name] = setting
if add_status_cache_file:
self.add_status_file(name)
def check_instance_status(self):
for config_name in self.get_all_prev_instance():
if not has_config_mod(config_name):
self.remove_status_file(config_name)
continue
ins = ProcessManager.get_manager(config_name)
if ins.alive:
continue
if ins.detailed_instance_status == ProcessManager.DetailInstanceStatus.NoRenderables:
ins.start("alas")
def remove_instance(self, name, remove_status_cache_file=True):
try:
self.instances.pop(name)
except Exception:
...
if remove_status_cache_file:
self.remove_status_file(name)
def watcher_thread(self):
while 1:
time.sleep(60)
try:
self.check_counter()
self.check_instances()
self.check_instance_status()
except Exception:
...
@staticmethod
def get_instance() -> "InstanceWatcher":
if InstanceWatcher.instance is None:
InstanceWatcher.instance = InstanceWatcher()
return InstanceWatcher.instance