1
0
mirror of https://github.com/0O0o0oOoO00/Alas.git synced 2026-05-20 05:49:30 +08:00

Merge branch 'LmeSzinc:master' into 20230309

This commit is contained in:
Zuosizhu
2024-04-15 23:25:52 +08:00
committed by GitHub
28 changed files with 513 additions and 216 deletions

View File

@@ -4,6 +4,7 @@ import platform
import re
import socket
import subprocess
import sys
import time
from functools import wraps
@@ -805,8 +806,11 @@ class Connection(ConnectionAttr):
# brute_force_connect
if self.config.Emulator_Serial == 'auto' and available.count == 0:
logger.warning(f'No available device found')
brute_force_connect()
continue
if sys.platform == 'win32':
brute_force_connect()
continue
else:
break
else:
break

View File

@@ -1,6 +1,12 @@
import collections
from datetime import datetime
# Patch pkg_resources before importing adbutils and uiautomator2
from module.device.pkg_resources import get_distribution
# Just avoid being removed by import optimization
_ = get_distribution
from module.base.timer import Timer
from module.config.utils import get_server_next_update
from module.device.app_control import AppControl
@@ -92,6 +98,13 @@ class Device(Screenshot, Control, AppControl):
if not self.config.is_template_config and self.config.Emulator_ScreenshotMethod == 'auto':
self.run_simple_screenshot_benchmark()
# Early init
if self.config.is_actual_task:
if self.config.Emulator_ControlMethod == 'MaaTouch':
self.early_maatouch_init()
if self.config.Emulator_ControlMethod == 'minitouch':
self.early_minitouch_init()
def run_simple_screenshot_benchmark(self):
"""
Perform a screenshot method benchmark, test 3 times on each method.

View File

@@ -1,14 +1,15 @@
import socket
import threading
from functools import wraps
from adbutils.errors import AdbError
from module.base.decorator import cached_property, del_cached_property
from module.base.decorator import cached_property, del_cached_property, has_cached_property
from module.base.timer import Timer
from module.base.utils import *
from module.device.connection import Connection
from module.device.method.minitouch import CommandBuilder, insert_swipe
from module.device.method.utils import RETRY_TRIES, retry_sleep, handle_adb_error
from module.device.method.utils import RETRY_TRIES, handle_adb_error, retry_sleep
from module.exception import RequestHumanTakeover
from module.logger import logger
@@ -36,20 +37,20 @@ def retry(func):
def init():
self.adb_reconnect()
del_cached_property(self, 'maatouch_builder')
del_cached_property(self, '_maatouch_builder')
# Emulator closed
except ConnectionAbortedError as e:
logger.error(e)
def init():
self.adb_reconnect()
del_cached_property(self, 'maatouch_builder')
del_cached_property(self, '_maatouch_builder')
# AdbError
except AdbError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
del_cached_property(self, 'maatouch_builder')
del_cached_property(self, '_maatouch_builder')
else:
break
# MaaTouchNotInstalledError: Received "Aborted" from MaaTouch
@@ -58,12 +59,12 @@ def retry(func):
def init():
self.maatouch_install()
del_cached_property(self, 'maatouch_builder')
del_cached_property(self, '_maatouch_builder')
except BrokenPipeError as e:
logger.error(e)
def init():
del_cached_property(self, 'maatouch_builder')
del_cached_property(self, '_maatouch_builder')
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
@@ -77,6 +78,19 @@ def retry(func):
return retry_wrapper
class MaatouchBuilder(CommandBuilder):
def __init__(self, device, contact=0, handle_orientation=False):
"""
Args:
device (MaaTouch):
"""
super().__init__(device, contact, handle_orientation)
def send(self):
return self.device.maatouch_send(builder=self)
class MaaTouchNotInstalledError(Exception):
pass
@@ -90,12 +104,37 @@ class MaaTouch(Connection):
max_y: int
_maatouch_stream = socket.socket
_maatouch_stream_storage = None
_maatouch_init_thread = None
@cached_property
def maatouch_builder(self):
def _maatouch_builder(self):
self.maatouch_init()
# Orientation is handled inside MaaTouch
return CommandBuilder(self, handle_orientation=False)
return MaatouchBuilder(self)
@property
def maatouch_builder(self):
# Wait init thread
if self._maatouch_init_thread is not None:
self._maatouch_init_thread.join()
del self._maatouch_init_thread
self._maatouch_init_thread = None
return self._maatouch_builder
def early_maatouch_init(self):
"""
Start a thread to init maatouch connection while the Alas instance just starting to take screenshots
This would speed up the first click 0.2 ~ 0.4s.
"""
if has_cached_property(self, '_maatouch_builder'):
return
def early_maatouch_init_func():
_ = self._maatouch_builder
thread = threading.Thread(target=early_maatouch_init_func, daemon=True)
self._maatouch_init_thread = thread
thread.start()
def maatouch_init(self):
logger.hr('MaaTouch init')
@@ -166,14 +205,14 @@ class MaaTouch(Connection):
)
)
def maatouch_send(self):
content = self.maatouch_builder.to_minitouch()
def maatouch_send(self, builder: MaatouchBuilder):
content = builder.to_minitouch()
# logger.info("send operation: {}".format(content.replace("\n", "\\n")))
byte_content = content.encode('utf-8')
self._maatouch_stream.sendall(byte_content)
self._maatouch_stream.recv(0)
self.sleep(self.maatouch_builder.delay / 1000 + self.maatouch_builder.DEFAULT_DELAY)
self.maatouch_builder.clear()
self.sleep(self.maatouch_builder.delay / 1000 + builder.DEFAULT_DELAY)
builder.clear()
def maatouch_install(self):
logger.hr('MaaTouch install')
@@ -188,7 +227,7 @@ class MaaTouch(Connection):
builder = self.maatouch_builder
builder.down(x, y).commit()
builder.up().commit()
self.maatouch_send()
builder.send()
@retry
def long_click_maatouch(self, x, y, duration=1.0):
@@ -196,7 +235,7 @@ class MaaTouch(Connection):
builder = self.maatouch_builder
builder.down(x, y).commit().wait(duration)
builder.up().commit()
self.maatouch_send()
builder.send()
@retry
def swipe_maatouch(self, p1, p2):
@@ -204,14 +243,14 @@ class MaaTouch(Connection):
builder = self.maatouch_builder
builder.down(*points[0]).commit()
self.maatouch_send()
builder.send()
for point in points[1:]:
builder.move(*point).commit().wait(10)
self.maatouch_send()
builder.send()
builder.up().commit()
self.maatouch_send()
builder.send()
@retry
def drag_maatouch(self, p1, p2, point_random=(-10, -10, 10, 10)):
@@ -221,15 +260,15 @@ class MaaTouch(Connection):
builder = self.maatouch_builder
builder.down(*points[0]).commit()
self.maatouch_send()
builder.send()
for point in points[1:]:
builder.move(*point).commit().wait(10)
self.maatouch_send()
builder.send()
builder.move(*p2).commit().wait(140)
builder.move(*p2).commit().wait(140)
self.maatouch_send()
builder.send()
builder.up().commit()
self.maatouch_send()
builder.send()

View File

@@ -1,7 +1,7 @@
import asyncio
import json
import re
import socket
import threading
import time
from functools import wraps
from typing import List
@@ -10,11 +10,11 @@ import websockets
from adbutils.errors import AdbError
from uiautomator2 import _Service
from module.base.decorator import Config, cached_property, del_cached_property
from module.base.decorator import Config, cached_property, del_cached_property, has_cached_property
from module.base.timer import Timer
from module.base.utils import *
from module.device.connection import Connection
from module.device.method.utils import RETRY_TRIES, retry_sleep, handle_adb_error
from module.device.method.utils import RETRY_TRIES, handle_adb_error, retry_sleep
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
@@ -184,7 +184,7 @@ class CommandBuilder:
max_x = 1280
max_y = 720
def __init__(self, device, handle_orientation=True):
def __init__(self, device, contact=0, handle_orientation=True):
"""
Args:
device:
@@ -192,6 +192,7 @@ class CommandBuilder:
self.device = device
self.commands = []
self.delay = 0
self.contact = contact
self.handle_orientation = handle_orientation
@property
@@ -243,21 +244,21 @@ class CommandBuilder:
self.delay += ms
return self
def up(self, contact=0):
def up(self):
""" add minitouch command: 'u <contact>\n' """
self.commands.append(Command('u', contact=contact))
self.commands.append(Command('u', contact=self.contact))
return self
def down(self, x, y, contact=0, pressure=100):
def down(self, x, y, pressure=100):
""" add minitouch command: 'd <contact> <x> <y> <pressure>\n' """
x, y = self.convert(x, y)
self.commands.append(Command('d', x=x, y=y, contact=contact, pressure=pressure))
self.commands.append(Command('d', x=x, y=y, contact=self.contact, pressure=pressure))
return self
def move(self, x, y, contact=0, pressure=100):
def move(self, x, y, pressure=100):
""" add minitouch command: 'm <contact> <x> <y> <pressure>\n' """
x, y = self.convert(x, y)
self.commands.append(Command('m', x=x, y=y, contact=contact, pressure=pressure))
self.commands.append(Command('m', x=x, y=y, contact=self.contact, pressure=pressure))
return self
def clear(self):
@@ -271,6 +272,9 @@ class CommandBuilder:
def to_atx_agent(self) -> List[str]:
return [command.to_atx_agent(self.max_x, self.max_y) for command in self.commands]
def send(self):
return self.device.minitouch_send(builder=self)
class MinitouchNotInstalledError(Exception):
pass
@@ -324,7 +328,7 @@ def retry(func):
self.install_uiautomator2()
if self._minitouch_port:
self.adb_forward_remove(f'tcp:{self._minitouch_port}')
del_cached_property(self, 'minitouch_builder')
del_cached_property(self, '_minitouch_builder')
# MinitouchOccupiedError: Timeout when connecting to minitouch
except MinitouchOccupiedError as e:
logger.error(e)
@@ -333,7 +337,7 @@ def retry(func):
self.restart_atx()
if self._minitouch_port:
self.adb_forward_remove(f'tcp:{self._minitouch_port}')
del_cached_property(self, 'minitouch_builder')
del_cached_property(self, '_minitouch_builder')
# AdbError
except AdbError as e:
if handle_adb_error(e):
@@ -345,7 +349,7 @@ def retry(func):
logger.error(e)
def init():
del_cached_property(self, 'minitouch_builder')
del_cached_property(self, '_minitouch_builder')
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
@@ -366,12 +370,38 @@ class Minitouch(Connection):
_minitouch_ws: websockets.WebSocketClientProtocol
max_x: int
max_y: int
_minitouch_init_thread = None
@cached_property
def minitouch_builder(self):
def _minitouch_builder(self):
self.minitouch_init()
return CommandBuilder(self)
@property
def minitouch_builder(self):
# Wait init thread
if self._minitouch_init_thread is not None:
self._minitouch_init_thread.join()
del self._minitouch_init_thread
self._minitouch_init_thread = None
return self._minitouch_builder
def early_minitouch_init(self):
"""
Start a thread to init minitouch connection while the Alas instance just starting to take screenshots
This would speed up the first click 0.05s.
"""
if has_cached_property(self, '_minitouch_builder'):
return
def early_minitouch_init_func():
_ = self._minitouch_builder
thread = threading.Thread(target=early_minitouch_init_func, daemon=True)
self._minitouch_init_thread = thread
thread.start()
@Config.when(DEVICE_OVER_HTTP=False)
def minitouch_init(self):
logger.hr('MiniTouch init')
@@ -446,14 +476,14 @@ class Minitouch(Connection):
)
@Config.when(DEVICE_OVER_HTTP=False)
def minitouch_send(self):
content = self.minitouch_builder.to_minitouch()
def minitouch_send(self, builder: CommandBuilder):
content = builder.to_minitouch()
# logger.info("send operation: {}".format(content.replace("\n", "\\n")))
byte_content = content.encode('utf-8')
self._minitouch_client.sendall(byte_content)
self._minitouch_client.recv(0)
time.sleep(self.minitouch_builder.delay / 1000 + self.minitouch_builder.DEFAULT_DELAY)
self.minitouch_builder.clear()
time.sleep(self.minitouch_builder.delay / 1000 + builder.DEFAULT_DELAY)
builder.clear()
@cached_property
def _minitouch_loop(self):
@@ -514,8 +544,8 @@ class Minitouch(Connection):
self._minitouch_ws = self._minitouch_loop_run(connect())
@Config.when(DEVICE_OVER_HTTP=True)
def minitouch_send(self):
content = self.minitouch_builder.to_atx_agent()
def minitouch_send(self, builder: CommandBuilder):
content = builder.to_atx_agent()
async def send():
for row in content:
@@ -523,15 +553,15 @@ class Minitouch(Connection):
await self._minitouch_ws.send(row)
self._minitouch_loop_run(send())
time.sleep(self.minitouch_builder.delay / 1000 + self.minitouch_builder.DEFAULT_DELAY)
self.minitouch_builder.clear()
time.sleep(builder.delay / 1000 + builder.DEFAULT_DELAY)
builder.clear()
@retry
def click_minitouch(self, x, y):
builder = self.minitouch_builder
builder.down(x, y).commit()
builder.up().commit()
self.minitouch_send()
builder.send()
@retry
def long_click_minitouch(self, x, y, duration=1.0):
@@ -539,7 +569,7 @@ class Minitouch(Connection):
builder = self.minitouch_builder
builder.down(x, y).commit().wait(duration)
builder.up().commit()
self.minitouch_send()
builder.send()
@retry
def swipe_minitouch(self, p1, p2):
@@ -547,14 +577,14 @@ class Minitouch(Connection):
builder = self.minitouch_builder
builder.down(*points[0]).commit()
self.minitouch_send()
builder.send()
for point in points[1:]:
builder.move(*point).commit().wait(10)
self.minitouch_send()
builder.send()
builder.up().commit()
self.minitouch_send()
builder.send()
@retry
def drag_minitouch(self, p1, p2, point_random=(-10, -10, 10, 10)):
@@ -564,15 +594,15 @@ class Minitouch(Connection):
builder = self.minitouch_builder
builder.down(*points[0]).commit()
self.minitouch_send()
builder.send()
for point in points[1:]:
builder.move(*point).commit().wait(10)
self.minitouch_send()
builder.send()
builder.move(*p2).commit().wait(140)
builder.move(*p2).commit().wait(140)
self.minitouch_send()
builder.send()
builder.up().commit()
self.minitouch_send()
builder.send()

View File

@@ -2,7 +2,7 @@ import asyncio
import ctypes
import os
import sys
from functools import wraps, partial
from functools import partial, wraps
import cv2
import numpy as np
@@ -147,6 +147,8 @@ class CaptureNemuIpc(CaptureStd):
# MuMuVMMSVC.exe died
# b'nemu_capture_display rpc error: 1726\r\n'
# No idea how to handle yet
if b'error: 1722' in self.stderr or b'error: 1726' in self.stderr:
raise NemuIpcError('Emulator instance is probably dead')
def retry(func):
@@ -172,7 +174,7 @@ def retry(func):
break
# Function call timeout
except asyncio.TimeoutError:
logger.warning(f'Func {func.__name__}() call timeout, retrying')
logger.warning(f'Func {func.__name__}() call timeout, retrying: {_}')
def init():
self.reconnect()
@@ -236,11 +238,10 @@ class NemuIpcImpl:
if self.connect_id > 0:
return
with CaptureNemuIpc():
connect_id = self.ev_run_sync(
self.lib.nemu_connect,
self.nemu_folder, self.instance_id
)
connect_id = self.ev_run_sync(
self.lib.nemu_connect,
self.nemu_folder, self.instance_id
)
if connect_id == 0:
raise NemuIpcError(
'Connection failed, please check if nemu_folder is correct and emulator is running'
@@ -253,11 +254,10 @@ class NemuIpcImpl:
if self.connect_id == 0:
return
with CaptureNemuIpc():
self.ev_run_sync(
self.lib.nemu_disconnect,
self.connect_id
)
self.ev_run_sync(
self.lib.nemu_disconnect,
self.connect_id
)
# logger.info(f'NemuIpc disconnected: {self.connect_id}')
self.connect_id = 0
@@ -288,7 +288,9 @@ class NemuIpcImpl:
asyncio.TimeoutError: If function call timeout
"""
func_wrapped = partial(func, *args, **kwargs)
result = await asyncio.wait_for(self._ev.run_in_executor(None, func_wrapped), timeout=0.05)
# Increased timeout for slow PCs
# Default screenshot interval is 0.2s, so a 0.15s timeout would have a fast retry without extra time costs
result = await asyncio.wait_for(self._ev.run_in_executor(None, func_wrapped), timeout=0.15)
return result
def ev_run_sync(self, func, *args, **kwargs):
@@ -300,8 +302,24 @@ class NemuIpcImpl:
Raises:
asyncio.TimeoutError: If function call timeout
NemuIpcIncompatible:
NemuIpcError
"""
result = self._ev.run_until_complete(self.ev_run_async(func, *args, **kwargs))
err = False
if func.__name__ == 'nemu_connect':
if result == 0:
err = True
else:
if result > 0:
err = True
# Get to actual error message printed in std
if err:
logger.warning(f'Failed to call {func.__name__}, result={result}')
with CaptureNemuIpc():
result = self._ev.run_until_complete(self.ev_run_async(func, *args, **kwargs))
return result
def get_resolution(self):
@@ -315,11 +333,10 @@ class NemuIpcImpl:
height_ptr = ctypes.pointer(ctypes.c_int(0))
nullptr = ctypes.POINTER(ctypes.c_int)()
with CaptureNemuIpc():
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, 0, width_ptr, height_ptr, nullptr
)
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, 0, width_ptr, height_ptr, nullptr
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during get_resolution()')
self.width = width_ptr.contents.value
@@ -335,18 +352,17 @@ class NemuIpcImpl:
if self.connect_id == 0:
self.connect()
with CaptureNemuIpc():
self.get_resolution()
self.get_resolution()
width_ptr = ctypes.pointer(ctypes.c_int(self.width))
height_ptr = ctypes.pointer(ctypes.c_int(self.height))
length = self.width * self.height * 4
pixels_pointer = ctypes.pointer((ctypes.c_ubyte * length)())
width_ptr = ctypes.pointer(ctypes.c_int(self.width))
height_ptr = ctypes.pointer(ctypes.c_int(self.height))
length = self.width * self.height * 4
pixels_pointer = ctypes.pointer((ctypes.c_ubyte * length)())
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, length, width_ptr, height_ptr, pixels_pointer
)
ret = self.ev_run_sync(
self.lib.nemu_capture_display,
self.connect_id, self.display_id, length, width_ptr, height_ptr, pixels_pointer
)
if ret > 0:
raise NemuIpcError('nemu_capture_display failed during screenshot()')
@@ -378,11 +394,10 @@ class NemuIpcImpl:
x, y = self.convert_xy(x, y)
with CaptureNemuIpc():
ret = self.ev_run_sync(
self.lib.nemu_input_event_touch_down,
self.connect_id, self.display_id, x, y
)
ret = self.ev_run_sync(
self.lib.nemu_input_event_touch_down,
self.connect_id, self.display_id, x, y
)
if ret > 0:
raise NemuIpcError('nemu_input_event_touch_down failed')
@@ -394,11 +409,10 @@ class NemuIpcImpl:
if self.connect_id == 0:
self.connect()
with CaptureNemuIpc():
ret = self.ev_run_sync(
self.lib.nemu_input_event_touch_up,
self.connect_id, self.display_id
)
ret = self.ev_run_sync(
self.lib.nemu_input_event_touch_up,
self.connect_id, self.display_id
)
if ret > 0:
raise NemuIpcError('nemu_input_event_touch_up failed')
@@ -448,7 +462,9 @@ class NemuIpc(Platform):
# Search emulator instance
# with E:\ProgramFiles\MuMuPlayer-12.0\shell\MuMuPlayer.exe
# installation path is E:\ProgramFiles\MuMuPlayer-12.0
_ = self.emulator_instance
if self.emulator_instance is None:
logger.error('Unable to use NemuIpc because emulator instance not found')
raise RequestHumanTakeover
try:
return NemuIpcImpl(
nemu_folder=self.emulator_instance.emulator.abspath('../'),
@@ -486,44 +502,40 @@ class NemuIpc(Platform):
def click_nemu_ipc(self, x, y):
down = ensure_time((0.010, 0.020))
with CaptureNemuIpc():
self.nemu_ipc.down(x, y)
self.sleep(down)
self.nemu_ipc.up()
self.sleep(0.050 - down)
self.nemu_ipc.down(x, y)
self.sleep(down)
self.nemu_ipc.up()
self.sleep(0.050 - down)
def long_click_nemu_ipc(self, x, y, duration=1.0):
with CaptureNemuIpc():
self.nemu_ipc.down(x, y)
self.sleep(duration)
self.nemu_ipc.up()
self.sleep(0.050)
self.nemu_ipc.down(x, y)
self.sleep(duration)
self.nemu_ipc.up()
self.sleep(0.050)
def swipe_nemu_ipc(self, p1, p2):
points = insert_swipe(p0=p1, p3=p2)
with CaptureNemuIpc():
for point in points:
self.nemu_ipc.down(*point)
self.sleep(0.010)
for point in points:
self.nemu_ipc.down(*point)
self.sleep(0.010)
self.nemu_ipc.up()
self.sleep(0.050)
self.nemu_ipc.up()
self.sleep(0.050)
def drag_nemu_ipc(self, p1, p2, point_random=(-10, -10, 10, 10)):
p1 = np.array(p1) - random_rectangle_point(point_random)
p2 = np.array(p2) - random_rectangle_point(point_random)
points = insert_swipe(p0=p1, p3=p2, speed=20)
with CaptureNemuIpc():
for point in points:
self.nemu_ipc.down(*point)
self.sleep(0.010)
for point in points:
self.nemu_ipc.down(*point)
self.sleep(0.010)
self.nemu_ipc.down(*p2)
self.sleep(0.140)
self.nemu_ipc.down(*p2)
self.sleep(0.140)
self.nemu_ipc.down(*p2)
self.sleep(0.140)
self.nemu_ipc.down(*p2)
self.sleep(0.140)
self.nemu_ipc.up()
self.sleep(0.050)
self.nemu_ipc.up()
self.sleep(0.050)

View File

@@ -0,0 +1,82 @@
import os
import re
import sys
from module.base.decorator import cached_property
"""
Importing pkg_resources is so slow, like 0.4 ~ 1.0s, just google it you will find it indeed really slow.
Since it was some kind of standard library there is no way to modify it or speed it up.
So here's a poor but fast implementation of pkg_resources returning the things in need.
To patch:
```
# Patch pkg_resources before importing adbutils and uiautomator2
from module.device.pkg_resources import get_distribution
# Just avoid being removed by import optimization
_ = get_distribution
```
"""
# Inject sys.modules, pretend we have pkg_resources imported
sys.modules['pkg_resources'] = sys.modules['module.device.pkg_resources']
class FakeDistributionObject:
def __init__(self, dist, version):
self.dist = dist
self.version = version
def __str__(self):
return f'{self.__class__.__name__}({self.dist}={self.version})'
__repr__ = __str__
class PackageCache:
@cached_property
def site_packages(self):
# Just whatever library to locate the `site-packages` directory
import requests
path = os.path.abspath(os.path.join(requests.__file__, '../../'))
return path
@cached_property
def dict_installed_packages(self):
"""
Returns:
dict: Key: str, package name
Value: FakeDistributionObject
"""
dic = {}
for file in os.listdir(self.site_packages):
# mxnet_cu101-1.6.0.dist-info
res = re.match(r'^(.+)-(.+)\.dist-info$', file)
if res:
obj = FakeDistributionObject(
dist=res.group(1),
version=res.group(2),
)
dic[obj.dist] = obj
return dic
PACKAGE_CACHE = PackageCache()
def resource_filename(*args):
if args == ("adbutils", "binaries"):
path = os.path.abspath(os.path.join(PACKAGE_CACHE.site_packages, *args))
return path
def get_distribution(dist):
"""Return a current distribution object for a Requirement or string"""
if dist == 'adbutils':
return PACKAGE_CACHE.dict_installed_packages.get('adbutils', '0.11.0')
if dist == 'uiautomator2':
return PACKAGE_CACHE.dict_installed_packages.get('uiautomator2', '2.16.17')
class DistributionNotFound(Exception):
pass

View File

@@ -36,6 +36,21 @@ def get_serial_pair(serial):
return None, None
def remove_duplicated_path(paths):
"""
Args:
paths (list[str]):
Returns:
list[str]:
"""
paths = sorted(set(paths))
dic = {}
for path in paths:
dic.setdefault(path.lower(), path)
return list(dic.values())
@dataclass
class EmulatorInstanceBase:
# Serial for adb connection
@@ -205,6 +220,14 @@ class EmulatorBase:
class EmulatorManagerBase:
@staticmethod
def iter_running_emulator():
"""
Yields:
str: Path to emulator executables, may contains duplicate values
"""
return
@cached_property
def all_emulators(self) -> t.List[EmulatorBase]:
"""

View File

@@ -8,7 +8,8 @@ from dataclasses import dataclass
# module/device/platform/emulator_base.py
# module/device/platform/emulator_windows.py
# Will be used in Alas Easy Install, they shouldn't import any Alas modules.
from module.device.platform.emulator_base import EmulatorBase, EmulatorInstanceBase, EmulatorManagerBase
from module.device.platform.emulator_base import EmulatorBase, EmulatorInstanceBase, EmulatorManagerBase, \
remove_duplicated_path
from module.device.platform.utils import cached_property, iter_folder
@@ -321,7 +322,7 @@ class EmulatorManager(EmulatorManagerBase):
Get recently executed programs in UserAssist
https://github.com/forensicmatt/MonitorUserAssist
Returns:
Yields:
str: Path to emulator executables, may contains duplicate values
"""
path = r'Software\Microsoft\Windows\CurrentVersion\Explorer\UserAssist'
@@ -452,6 +453,31 @@ class EmulatorManager(EmulatorManagerBase):
uninstall = res.group(1) if res else uninstall
yield uninstall
@staticmethod
def iter_running_emulator():
"""
Yields:
str: Path to emulator executables, may contains duplicate values
"""
try:
import psutil
except ModuleNotFoundError:
return
# Since this is a one-time-usage, we access psutil._psplatform.Process directly
# to bypass the call of psutil.Process.is_running().
# This only costs about 0.017s.
for pid in psutil.pids():
proc = psutil._psplatform.Process(pid)
try:
exe = proc.cmdline()
exe = exe[0].replace(r'\\', '/').replace('\\', '/')
except (psutil.AccessDenied, IndexError):
# psutil.AccessDenied
continue
if Emulator.is_emulator(exe):
yield exe
@cached_property
def all_emulators(self) -> t.List[Emulator]:
"""
@@ -479,7 +505,7 @@ class EmulatorManager(EmulatorManagerBase):
exe.add(ld)
# Uninstall registry
for uninstall in self.iter_uninstall_registry():
for uninstall in EmulatorManager.iter_uninstall_registry():
# Find emulator executable from uninstaller
for file in iter_folder(abspath(os.path.dirname(uninstall)), ext='.exe'):
if Emulator.is_emulator(file) and os.path.exists(file):
@@ -493,12 +519,14 @@ class EmulatorManager(EmulatorManagerBase):
if Emulator.is_emulator(file) and os.path.exists(file):
exe.add(file)
# Running
for file in EmulatorManager.iter_running_emulator():
if os.path.exists(file):
exe.add(file)
# De-redundancy
exe = [Emulator(path).path for path in exe if Emulator.is_emulator(path)]
exe = sorted(set(exe))
dic = {}
for path in exe:
dic.setdefault(path.lower(), path)
exe = [Emulator(path) for path in dic.values()]
exe = [Emulator(path) for path in remove_duplicated_path(exe)]
return exe
@cached_property

View File

@@ -6,7 +6,7 @@ from pydantic import BaseModel
from module.base.decorator import cached_property, del_cached_property
from module.device.connection import Connection
from module.device.method.utils import get_serial_pair
from module.device.platform.emulator_base import EmulatorInstanceBase, EmulatorManagerBase
from module.device.platform.emulator_base import EmulatorInstanceBase, EmulatorManagerBase, remove_duplicated_path
from module.logger import logger
from module.map.map_grids import SelectedGrids
@@ -136,7 +136,7 @@ class PlatformBase(Connection, EmulatorManagerBase):
# Search by serial
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instance with {search_args}')
logger.warning(f'No emulator instance with {search_args}, serial invalid')
return None
if select.count == 1:
instance = select[0]
@@ -149,9 +149,9 @@ class PlatformBase(Connection, EmulatorManagerBase):
search_args['name'] = name
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}')
return None
if select.count == 1:
logger.warning(f'No emulator instances with {search_args}, name invalid')
search_args.pop('name')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
@@ -162,9 +162,9 @@ class PlatformBase(Connection, EmulatorManagerBase):
search_args['path'] = path
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}')
return None
if select.count == 1:
logger.warning(f'No emulator instances with {search_args}, path invalid')
search_args.pop('path')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
@@ -175,9 +175,28 @@ class PlatformBase(Connection, EmulatorManagerBase):
search_args['type'] = emulator
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}')
return None
if select.count == 1:
logger.warning(f'No emulator instances with {search_args}, type invalid')
search_args.pop('type')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')
return instance
# Still too many instances, search from running emulators
running = remove_duplicated_path(list(self.iter_running_emulator()))
logger.info('Running emulators')
for exe in running:
logger.info(exe)
if len(running) == 1:
logger.info('Only one running emulator')
# Same as searching path
search_args['path'] = running[0]
select = instances.select(**search_args)
if select.count == 0:
logger.warning(f'No emulator instances with {search_args}, path invalid')
search_args.pop('path')
elif select.count == 1:
instance = select[0]
logger.hr('Emulator instance', level=2)
logger.info(f'Found emulator instance: {instance}')

View File

@@ -72,6 +72,10 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy, NemuIpc):
return self.image
@property
def has_cached_image(self):
return hasattr(self, 'image') and self.image is not None
def _handle_orientated_image(self, image):
"""
Args: