1
0
mirror of https://github.com/0O0o0oOoO00/Alas.git synced 2026-05-22 08:09:29 +08:00
Files
Alas/module/gamefree/netpkg.py

67 lines
1.7 KiB
Python

import threading
from typing import Union
from google.protobuf import message
import module.gamefree.bytearray as ba
class AzurLaneNetworkEndPackage:
...
class AzurLaneNetworkPackageAbort(Exception):
...
class AzurLaneNetworkPackage:
def __init__(self, id, proto_message: message.Message):
self.event = threading.Event()
self.proto_message = proto_message
self.id = id
self.returned_data: ba.ByteArray = None
self.abort = False
def is_aborted(self):
return self.abort
def pack(self) -> bytes:
buffer = ba.ByteArray()
serialized = self.proto_message.SerializeToString()
serialized_len = len(serialized)
if serialized_len == 0:
buffer.writeBigEndianUInt16(6)
else:
buffer.writeBigEndianUInt16(5 + serialized_len)
buffer.writeBigEndianUInt8(0)
buffer.writeBigEndianUInt16(self.id)
buffer.writeBigEndianUInt16(0)
buffer.writeString(serialized)
return buffer.toBytes()
def unpack(self, data: Union[bytes, ba.ByteArray]):
if isinstance(data, bytes):
buffer = ba.ByteArray.fromBytes(data)
elif isinstance(data, ba.ByteArray):
buffer = data
else:
raise TypeError(f"Invalid data type: {type(data).__name__}")
buffer.readBigEndianUInt16()
buffer.readBigEndianUInt8()
data_id = buffer.readBigEndianUInt16()
if data_id != self.id:
raise ValueError(f"Message's id is {self.id}, but got {data_id}")
buffer.readBigEndianUInt16()
message_data = buffer.readRemainingString()
self.proto_message.ParseFromString(message_data)
return self
class HeartBeatPackage:
...