2025-08-05 23:29:22 +08:00
|
|
|
import datetime
|
2025-08-07 19:46:05 +08:00
|
|
|
import os
|
2025-08-05 23:29:22 +08:00
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
from module.logger import logger
|
|
|
|
|
from module.config.deep import deep_get
|
|
|
|
|
from module.notify import handle_notify
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CounterReachMaxCountException(Exception):
|
|
|
|
|
...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Counter:
|
|
|
|
|
def __init__(self, max_count):
|
|
|
|
|
self.max_count = max_count
|
|
|
|
|
self.current_count = 0
|
|
|
|
|
|
|
|
|
|
def count_once(self):
|
|
|
|
|
self.current_count += 1
|
|
|
|
|
if self.current_count >= self.max_count:
|
|
|
|
|
raise CounterReachMaxCountException()
|
|
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
|
self.current_count = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SchedulerWatcher:
|
|
|
|
|
instance = None
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.alas_obj = None
|
|
|
|
|
self.watcher: threading.Thread = None
|
2025-08-09 20:08:57 +08:00
|
|
|
self.warning_count = None
|
2025-08-05 23:29:22 +08:00
|
|
|
self.warning_time = None
|
|
|
|
|
self.current_task = None
|
2025-08-09 20:08:57 +08:00
|
|
|
self.min_timedelta = None
|
2025-08-05 23:29:22 +08:00
|
|
|
|
|
|
|
|
def set_alas_obj(self, alas_obj):
|
|
|
|
|
self.alas_obj = alas_obj
|
|
|
|
|
|
2025-08-09 20:08:57 +08:00
|
|
|
def start_watch(self, min_timedelta, max_warning_count):
|
2025-08-05 23:29:22 +08:00
|
|
|
if self.watcher is not None:
|
|
|
|
|
if self.watcher.is_alive():
|
|
|
|
|
return
|
2025-08-09 20:08:57 +08:00
|
|
|
self.min_timedelta = datetime.timedelta(minutes=min_timedelta)
|
|
|
|
|
self.warning_count = Counter(max_warning_count)
|
2025-08-05 23:54:45 +08:00
|
|
|
logger.info(f"Scheduler watcher start")
|
2025-08-08 17:56:32 +08:00
|
|
|
self.watcher = threading.Thread(target=self.watcher_thread, daemon=True)
|
2025-08-05 23:29:22 +08:00
|
|
|
self.watcher.start()
|
|
|
|
|
|
|
|
|
|
def request_extend_task_deadline(self):
|
2025-08-05 23:54:45 +08:00
|
|
|
if self.watcher is None:
|
|
|
|
|
return
|
2025-08-05 23:29:22 +08:00
|
|
|
now = datetime.datetime.now()
|
|
|
|
|
if self.warning_count.current_count > 0:
|
|
|
|
|
self.warning_count.reset()
|
2025-08-09 20:08:57 +08:00
|
|
|
self.warning_time = now + self.min_timedelta
|
2025-08-05 23:29:22 +08:00
|
|
|
logger.info(f"Current task requests to extend the task deadline")
|
|
|
|
|
else:
|
|
|
|
|
if self.warning_time - now < datetime.timedelta(minutes=3):
|
2025-08-09 20:08:57 +08:00
|
|
|
self.warning_time = now + self.min_timedelta
|
2025-08-05 23:29:22 +08:00
|
|
|
logger.info(f"Current task requests to extend the task deadline")
|
|
|
|
|
|
|
|
|
|
def no_task(self):
|
2025-08-05 23:54:45 +08:00
|
|
|
if self.watcher is None:
|
|
|
|
|
return
|
2025-08-05 23:29:22 +08:00
|
|
|
self.current_task = None
|
|
|
|
|
self.warning_time = None
|
|
|
|
|
|
|
|
|
|
def switch_task(self, task):
|
2025-08-05 23:54:45 +08:00
|
|
|
if self.watcher is None:
|
|
|
|
|
return
|
2025-08-05 23:29:22 +08:00
|
|
|
self.current_task = task
|
|
|
|
|
self.warning_count.reset()
|
2025-08-09 20:08:57 +08:00
|
|
|
self.warning_time = datetime.datetime.now() + self.min_timedelta
|
2025-08-05 23:29:22 +08:00
|
|
|
|
|
|
|
|
def watcher_thread(self):
|
|
|
|
|
while 1:
|
|
|
|
|
time.sleep(60)
|
|
|
|
|
if self.warning_time is None or self.current_task is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
now = datetime.datetime.now()
|
|
|
|
|
if now > self.warning_time:
|
|
|
|
|
try:
|
|
|
|
|
self.warning_count.count_once()
|
|
|
|
|
logger.warning(f"Current task reached time limit once")
|
2025-08-09 20:08:57 +08:00
|
|
|
self.warning_time = now + self.min_timedelta
|
2025-08-05 23:29:22 +08:00
|
|
|
except CounterReachMaxCountException:
|
|
|
|
|
logger.error(f"Current task reached final time limit, assuming the scheduler is stuck")
|
|
|
|
|
|
|
|
|
|
config_data = self.alas_obj.config.data
|
|
|
|
|
is_instance_restart_enabled = deep_get(config_data, "Restart.InstanceRestart.Enabled", False)
|
|
|
|
|
if not is_instance_restart_enabled:
|
|
|
|
|
logger.error(f"Instance restart disabled, notify user")
|
|
|
|
|
err_push_config = deep_get(config_data, "Alas.Error.OnePushConfig")
|
|
|
|
|
handle_notify(
|
|
|
|
|
err_push_config,
|
|
|
|
|
title=f"Alas <{self.alas_obj.config_name}>: Scheduler Stuck",
|
|
|
|
|
content=f"Task {self.current_task} reached final time limit, assuming the scheduler is stuck",
|
|
|
|
|
)
|
2025-08-07 19:46:05 +08:00
|
|
|
os._exit(-1)
|
2025-08-05 23:29:22 +08:00
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def get_instance() -> "SchedulerWatcher":
|
|
|
|
|
if SchedulerWatcher.instance is None:
|
|
|
|
|
SchedulerWatcher.instance = SchedulerWatcher()
|
|
|
|
|
return SchedulerWatcher.instance
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def request_extend_task_deadline():
|
|
|
|
|
ins = SchedulerWatcher.get_instance()
|
|
|
|
|
ins.request_extend_task_deadline()
|
2025-08-07 18:50:14 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def no_task():
|
|
|
|
|
ins = SchedulerWatcher.get_instance()
|
|
|
|
|
ins.no_task()
|