1
0
mirror of https://github.com/0O0o0oOoO00/Alas.git synced 2026-05-14 17:09:26 +08:00
Files
Alas/scheduler_watcher.py
2025-08-07 18:50:14 +08:00

115 lines
3.8 KiB
Python

import datetime
import threading
import time
from module.logger import logger
from module.config.deep import deep_get
from module.notify import handle_notify
class CounterReachMaxCountException(Exception):
...
class Counter:
def __init__(self, max_count):
self.max_count = max_count
self.current_count = 0
def count_once(self):
self.current_count += 1
if self.current_count >= self.max_count:
raise CounterReachMaxCountException()
def reset(self):
self.current_count = 0
class SchedulerWatcher:
instance = None
def __init__(self):
self.alas_obj = None
self.watcher: threading.Thread = None
self.warning_count = Counter(3)
self.warning_time = None
self.current_task = None
def set_alas_obj(self, alas_obj):
self.alas_obj = alas_obj
def start_watch(self):
if self.watcher is not None:
if self.watcher.is_alive():
return
logger.info(f"Scheduler watcher start")
self.watcher = threading.Thread(target=self.watcher_thread)
self.watcher.start()
def request_extend_task_deadline(self):
if self.watcher is None:
return
now = datetime.datetime.now()
if self.warning_count.current_count > 0:
self.warning_count.reset()
self.warning_time = now + datetime.timedelta(minutes=10)
logger.info(f"Current task requests to extend the task deadline")
else:
if self.warning_time - now < datetime.timedelta(minutes=3):
self.warning_time = now + datetime.timedelta(minutes=10)
logger.info(f"Current task requests to extend the task deadline")
def no_task(self):
if self.watcher is None:
return
self.current_task = None
self.warning_time = None
def switch_task(self, task):
if self.watcher is None:
return
self.current_task = task
self.warning_count.reset()
self.warning_time = datetime.datetime.now() + datetime.timedelta(minutes=10)
def watcher_thread(self):
while 1:
time.sleep(60)
if self.warning_time is None or self.current_task is None:
continue
now = datetime.datetime.now()
if now > self.warning_time:
try:
self.warning_count.count_once()
logger.warning(f"Current task reached time limit once")
except CounterReachMaxCountException:
logger.error(f"Current task reached final time limit, assuming the scheduler is stuck")
config_data = self.alas_obj.config.data
is_instance_restart_enabled = deep_get(config_data, "Restart.InstanceRestart.Enabled", False)
if not is_instance_restart_enabled:
logger.error(f"Instance restart disabled, notify user")
err_push_config = deep_get(config_data, "Alas.Error.OnePushConfig")
handle_notify(
err_push_config,
title=f"Alas <{self.alas_obj.config_name}>: Scheduler Stuck",
content=f"Task {self.current_task} reached final time limit, assuming the scheduler is stuck",
)
exit(-1)
@staticmethod
def get_instance() -> "SchedulerWatcher":
if SchedulerWatcher.instance is None:
SchedulerWatcher.instance = SchedulerWatcher()
return SchedulerWatcher.instance
def request_extend_task_deadline():
ins = SchedulerWatcher.get_instance()
ins.request_extend_task_deadline()
def no_task():
ins = SchedulerWatcher.get_instance()
ins.no_task()