import socket import threading import time from queue import Queue from module.gamefree.netpkg import * from module.logger import logger class VmInterruptEvent: def __init__(self): self.true_event = threading.Event() self.false_event = threading.Event() self.false_event.set() def set_true(self): self.true_event.set() self.false_event.clear() def set_false(self): self.true_event.clear() self.false_event.set() def is_true(self): return self.true_event.is_set() def is_false(self): return self.false_event.is_set() def wait_until_true(self): self.true_event.wait() def wait_until_false(self): self.false_event.wait() def __enter__(self): self.wait_until_true() return self def __exit__(self, exc_type, exc_val, exc_tb): self.set_false() class AzurLaneNetworkClientVmExitException(Exception): ... class AzurLaneNetworkClient: instance = None def __init__(self): self.task_queue = Queue() self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.vm: Union[threading.Thread, None] = None self.vm_interrupt_event_queue = Queue() self.vm_exception: Union[Exception, None] = None self.vm_exception_event = VmInterruptEvent() self.vm_exit_request = threading.Event() def __del__(self): self.vm_stop() def connect(self, addr, port): self.server_socket.connect((addr, port)) def transfer(self, pkg: AzurLaneNetworkPackage) -> ba.ByteArray: self.server_socket.send(pkg.pack()) buffer = ba.ByteArray() while 1: returned_data = self.server_socket.recv(1024) if returned_data == 0: break buffer.writeBytes(returned_data) return buffer def clear_task_queue(self): while not self.task_queue.empty(): pkg = self.task_queue.get() pkg.is_aborted = True pkg.event.set() def vm_clear_task_queue(self): with self.vm_interrupt(): self.clear_task_queue() def vm_start(self): if self.vm is not None: if self.vm.is_alive(): return self.vm = threading.Thread(target=self.vm_thread) self.vm.start() def vm_interrupt(self) -> VmInterruptEvent: if self.vm_exit_request.is_set(): raise AzurLaneNetworkClientVmExitException("Client vm is trying to exit") interrupt_event = VmInterruptEvent() self.vm_interrupt_event_queue.put(interrupt_event) return interrupt_event def vm_resume(self, interrupt_event): interrupt_event.set_false() def vm_stop(self): with self.vm_interrupt(): self.vm_exit_request.set() self.clear_task_queue() self.queue_package(AzurLaneNetworkEndPackage()) def vm_wait_until_exception_handled(self): self.vm_exception_event.wait_until_false() def vm_get_exception(self) -> Exception: return self.vm_exception def vm_has_exception(self): return self.vm_exception is not None def vm_set_exception(self, e): self.vm_exception = e def vm_exception_handled_and_resume(self): self.vm_exception = None self.vm_exception_event.set_false() def vm_ensure_continue(self): while not self.vm_interrupt_event_queue.empty(): interrupt_event = self.vm_interrupt_event_queue.get() interrupt_event.set_true() interrupt_event.wait_until_false() def vm_thread(self): while 1: self.vm_ensure_continue() pkg = self.task_queue.get() if isinstance(pkg, AzurLaneNetworkEndPackage): break elif isinstance(pkg, AzurLaneNetworkPackage): try: data = self.transfer(pkg) pkg.returned_data = data pkg.event.set() except Exception as e: self.vm_set_exception(e) self.vm_exception_event.set_true() pkg.event.set() self.vm_wait_until_exception_handled() else: logger.warning(f"Unknown net package class: {type(pkg).__name__}") def queue_package(self, pkg): if self.vm_exit_request.is_set(): raise AzurLaneNetworkClientVmExitException("Client vm is trying to exit") self.task_queue.put(pkg) def queue_packages(self, *pkgs): if self.vm_exit_request.is_set(): raise AzurLaneNetworkClientVmExitException("Client vm is trying to exit") with self.vm_interrupt(): for i in pkgs: self.task_queue.put(i) @staticmethod def get_instance() -> "AzurLaneNetworkClient": if AzurLaneNetworkClient.instance is None: AzurLaneNetworkClient.instance = AzurLaneNetworkClient() return AzurLaneNetworkClient.instance class AzurLaneNetwork: def __init__(self): self.client = AzurLaneNetworkClient.get_instance() def send(self, pkg: AzurLaneNetworkPackage) -> ba.ByteArray: self.client.queue_package(pkg) pkg.event.wait() if self.client.vm_has_exception(): raise self.client.vm_get_exception() if pkg.is_aborted(): raise AzurLaneNetworkPackageAbort() return pkg.returned_data