Source code for androidtv.adb_manager.adb_manager_async

"""Classes to manage ADB connections.

* :py:class:`ADBPythonAsync` utilizes a Python implementation of the ADB protocol.
* :py:class:`ADBServerAsync` utilizes an ADB server to communicate with the device.

"""


import asyncio
from contextlib import asynccontextmanager
import logging

from adb_shell.adb_device_async import AdbDeviceTcpAsync
from adb_shell.auth.sign_pythonrsa import PythonRSASigner
import aiofiles
from ppadb.client import Client

from ..constants import DEFAULT_ADB_TIMEOUT_S, DEFAULT_AUTH_TIMEOUT_S, DEFAULT_LOCK_TIMEOUT_S
from ..exceptions import LockNotAcquiredException

_LOGGER = logging.getLogger(__name__)


[docs]class DeviceAsync: """An async wrapper for the pure-python-adb ``Device`` class.""" def __init__(self, device): self._device = device
[docs] async def pull(self, device_path, local_path): """Download a file.""" return await asyncio.get_running_loop().run_in_executor(None, self._device.pull, device_path, local_path)
[docs] async def push(self, local_path, device_path): """Upload a file.""" return await asyncio.get_running_loop().run_in_executor(None, self._device.push, local_path, device_path)
[docs] async def screencap(self): """Take a screencap.""" return await asyncio.get_running_loop().run_in_executor(None, self._device.screencap)
[docs] async def shell(self, cmd): """Send a shell command.""" return await asyncio.get_running_loop().run_in_executor(None, self._device.shell, cmd)
# pylint: disable=too-few-public-methods
[docs]class ClientAsync: """An async wrapper for the pure-python-adb ``Client`` class.""" def __init__(self, host, port): self._client = Client(host, port)
[docs] async def device(self, serial): """Get a ``DeviceAsync`` instance.""" dev = await asyncio.get_running_loop().run_in_executor(None, self._client.device, serial) if dev: return DeviceAsync(dev) return None
[docs]@asynccontextmanager async def _acquire(lock, timeout=DEFAULT_LOCK_TIMEOUT_S): """Handle acquisition and release of an ``asyncio.Lock`` object with a timeout. Parameters ---------- lock : asyncio.Lock The lock that we will try to acquire timeout : float The timeout in seconds Yields ------ acquired : bool Whether or not the lock was acquired Raises ------ LockNotAcquiredException Raised if the lock was not acquired """ try: acquired = False try: acquired = await asyncio.wait_for(lock.acquire(), timeout) if not acquired: raise LockNotAcquiredException yield acquired except asyncio.TimeoutError as exc: raise LockNotAcquiredException from exc finally: if acquired: lock.release()
[docs]class ADBPythonAsync(object): """A manager for ADB connections that uses a Python implementation of the ADB protocol. Parameters ---------- host : str The address of the device; may be an IP address or a host name port : int The device port to which we are connecting (default is 5555) adbkey : str The path to the ``adbkey`` file for ADB authentication signer : PythonRSASigner, None The signer for the ADB keys, as loaded by :meth:`ADBPythonAsync.load_adbkey` """ def __init__(self, host, port, adbkey='', signer=None): self.host = host self.port = int(port) self.adbkey = adbkey self._adb = AdbDeviceTcpAsync(host=self.host, port=self.port, default_transport_timeout_s=DEFAULT_ADB_TIMEOUT_S) self._signer = signer # keep track of whether the ADB connection is intact self._available = False # use a lock to make sure that ADB commands don't overlap self._adb_lock = asyncio.Lock() @property def available(self): """Check whether the ADB connection is intact. Returns ------- bool Whether or not the ADB connection is intact """ return self._adb.available
[docs] async def close(self): """Close the ADB socket connection. """ await self._adb.close()
[docs] async def connect(self, always_log_errors=True, auth_timeout_s=DEFAULT_AUTH_TIMEOUT_S): """Connect to an Android TV / Fire TV device. Parameters ---------- always_log_errors : bool If True, errors will always be logged; otherwise, errors will only be logged on the first failed reconnect attempt auth_timeout_s : float Authentication timeout (in seconds) Returns ------- bool Whether or not the connection was successfully established and the device is available """ try: async with _acquire(self._adb_lock): # Catch exceptions try: # Connect with authentication if self.adbkey: if not self._signer: self._signer = await self.load_adbkey(self.adbkey) await self._adb.connect(rsa_keys=[self._signer], auth_timeout_s=auth_timeout_s) # Connect without authentication else: await self._adb.connect(auth_timeout_s=auth_timeout_s) # ADB connection successfully established _LOGGER.debug("ADB connection to %s:%d successfully established", self.host, self.port) self._available = True return True except OSError as exc: if self._available or always_log_errors: if exc.strerror is None: exc.strerror = "Timed out trying to connect to ADB device." _LOGGER.warning("Couldn't connect to %s:%d. %s: %s", self.host, self.port, exc.__class__.__name__, exc.strerror) # ADB connection attempt failed await self.close() self._available = False return False except Exception as exc: # pylint: disable=broad-except if self._available or always_log_errors: _LOGGER.warning("Couldn't connect to %s:%d. %s: %s", self.host, self.port, exc.__class__.__name__, exc) # ADB connection attempt failed await self.close() self._available = False return False except LockNotAcquiredException: _LOGGER.warning("Couldn't connect to %s:%d because adb-shell lock not acquired.", self.host, self.port) await self.close() self._available = False return False
[docs] @staticmethod async def load_adbkey(adbkey): """Load the ADB keys. Parameters ---------- adbkey : str The path to the ``adbkey`` file for ADB authentication Returns ------- PythonRSASigner The ``PythonRSASigner`` with the key files loaded """ # private key async with aiofiles.open(adbkey) as f: priv = await f.read() # public key try: async with aiofiles.open(adbkey + '.pub') as f: pub = await f.read() except FileNotFoundError: pub = '' return PythonRSASigner(pub, priv)
[docs] async def pull(self, local_path, device_path): """Pull a file from the device using the Python ADB implementation. Parameters ---------- local_path : str The path where the file will be saved device_path : str The file on the device that will be pulled """ if not self.available: _LOGGER.debug("ADB command not sent to %s:%d because adb-shell connection is not established: pull(%s, %s)", self.host, self.port, local_path, device_path) return async with _acquire(self._adb_lock): _LOGGER.debug("Sending command to %s:%d via adb-shell: pull(%s, %s)", self.host, self.port, local_path, device_path) await self._adb.pull(device_path, local_path) return
[docs] async def push(self, local_path, device_path): """Push a file to the device using the Python ADB implementation. Parameters ---------- local_path : str The file that will be pushed to the device device_path : str The path where the file will be saved on the device """ if not self.available: _LOGGER.debug("ADB command not sent to %s:%d because adb-shell connection is not established: push(%s, %s)", self.host, self.port, local_path, device_path) return async with _acquire(self._adb_lock): _LOGGER.debug("Sending command to %s:%d via adb-shell: push(%s, %s)", self.host, self.port, local_path, device_path) await self._adb.push(local_path, device_path) return
[docs] async def screencap(self): """Take a screenshot using the Python ADB implementation. Returns ------- bytes The screencap as a binary .png image """ if not self.available: _LOGGER.debug("ADB screencap not taken from %s:%d because adb-shell connection is not established", self.host, self.port) return None async with _acquire(self._adb_lock): _LOGGER.debug("Taking screencap from %s:%d via adb-shell", self.host, self.port) result = await self._adb.shell("screencap -p", decode=False) if result and result[5:6] == b"\r": return result.replace(b"\r\n", b"\n") return result
[docs] async def shell(self, cmd): """Send an ADB command using the Python ADB implementation. Parameters ---------- cmd : str The ADB command to be sent Returns ------- str, None The response from the device, if there is a response """ if not self.available: _LOGGER.debug("ADB command not sent to %s:%d because adb-shell connection is not established: %s", self.host, self.port, cmd) return None async with _acquire(self._adb_lock): _LOGGER.debug("Sending command to %s:%d via adb-shell: %s", self.host, self.port, cmd) return await self._adb.shell(cmd)
[docs]class ADBServerAsync(object): """A manager for ADB connections that uses an ADB server. Parameters ---------- host : str The address of the device; may be an IP address or a host name port : int The device port to which we are connecting (default is 5555) adb_server_ip : str The IP address of the ADB server adb_server_port : int The port for the ADB server """ def __init__(self, host, port=5555, adb_server_ip='', adb_server_port=5037): self.host = host self.port = int(port) self.adb_server_ip = adb_server_ip self.adb_server_port = adb_server_port self._adb_client = None self._adb_device = None # keep track of whether the ADB connection is/was intact self._available = False self._was_available = False # use a lock to make sure that ADB commands don't overlap self._adb_lock = asyncio.Lock() @property def available(self): """Check whether the ADB connection is intact. Returns ------- bool Whether or not the ADB connection is intact """ if not self._adb_client or not self._adb_device: return False return self._available
[docs] async def close(self): """Close the ADB server socket connection. Currently, this doesn't do anything except set ``self._available = False``. """ self._available = False
[docs] async def connect(self, always_log_errors=True): """Connect to an Android TV / Fire TV device. Parameters ---------- always_log_errors : bool If True, errors will always be logged; otherwise, errors will only be logged on the first failed reconnect attempt Returns ------- bool Whether or not the connection was successfully established and the device is available """ try: async with _acquire(self._adb_lock): # Catch exceptions try: self._adb_client = ClientAsync(host=self.adb_server_ip, port=self.adb_server_port) self._adb_device = await self._adb_client.device('{}:{}'.format(self.host, self.port)) # ADB connection successfully established if self._adb_device: _LOGGER.debug("ADB connection to %s:%d via ADB server %s:%d successfully established", self.host, self.port, self.adb_server_ip, self.adb_server_port) self._available = True self._was_available = True return True # ADB connection attempt failed (without an exception) if self._was_available or always_log_errors: _LOGGER.warning("Couldn't connect to %s:%d via ADB server %s:%d because the server is not connected to the device", self.host, self.port, self.adb_server_ip, self.adb_server_port) await self.close() self._available = False self._was_available = False return False # ADB connection attempt failed except Exception as exc: # noqa pylint: disable=broad-except if self._was_available or always_log_errors: _LOGGER.warning("Couldn't connect to %s:%d via ADB server %s:%d, error: %s", self.host, self.port, self.adb_server_ip, self.adb_server_port, exc) await self.close() self._available = False self._was_available = False return False except LockNotAcquiredException: _LOGGER.warning("Couldn't connect to %s:%d via ADB server %s:%d because pure-python-adb lock not acquired.", self.host, self.port, self.adb_server_ip, self.adb_server_port) await self.close() self._available = False self._was_available = False return False
[docs] async def pull(self, local_path, device_path): """Pull a file from the device using an ADB server. Parameters ---------- local_path : str The path where the file will be saved device_path : str The file on the device that will be pulled """ if not self.available: _LOGGER.debug("ADB command not sent to %s:%d via ADB server %s:%d because pure-python-adb connection is not established: pull(%s, %s)", self.host, self.port, self.adb_server_ip, self.adb_server_port, local_path, device_path) return async with _acquire(self._adb_lock): _LOGGER.debug("Sending command to %s:%d via ADB server %s:%d: pull(%s, %s)", self.host, self.port, self.adb_server_ip, self.adb_server_port, local_path, device_path) await self._adb_device.pull(device_path, local_path) return
[docs] async def push(self, local_path, device_path): """Push a file to the device using an ADB server. Parameters ---------- local_path : str The file that will be pushed to the device device_path : str The path where the file will be saved on the device """ if not self.available: _LOGGER.debug("ADB command not sent to %s:%d via ADB server %s:%d because pure-python-adb connection is not established: push(%s, %s)", self.host, self.port, self.adb_server_ip, self.adb_server_port, local_path, device_path) return async with _acquire(self._adb_lock): _LOGGER.debug("Sending command to %s:%d via ADB server %s:%d: push(%s, %s)", self.host, self.port, self.adb_server_ip, self.adb_server_port, local_path, device_path) await self._adb_device.push(local_path, device_path) return
[docs] async def screencap(self): """Take a screenshot using an ADB server. Returns ------- bytes, None The screencap as a binary .png image, or ``None`` if there was an ``IndexError`` exception """ if not self.available: _LOGGER.debug("ADB screencap not taken from %s:%d via ADB server %s:%d because pure-python-adb connection is not established", self.host, self.port, self.adb_server_ip, self.adb_server_port) return None async with _acquire(self._adb_lock): _LOGGER.debug("Taking screencap from %s:%d via ADB server %s:%d", self.host, self.port, self.adb_server_ip, self.adb_server_port) return await self._adb_device.screencap()
[docs] async def shell(self, cmd): """Send an ADB command using an ADB server. Parameters ---------- cmd : str The ADB command to be sent Returns ------- str, None The response from the device, if there is a response """ if not self.available: _LOGGER.debug("ADB command not sent to %s:%d via ADB server %s:%d because pure-python-adb connection is not established: %s", self.host, self.port, self.adb_server_ip, self.adb_server_port, cmd) return None async with _acquire(self._adb_lock): _LOGGER.debug("Sending command to %s:%d via ADB server %s:%d: %s", self.host, self.port, self.adb_server_ip, self.adb_server_port, cmd) return await self._adb_device.shell(cmd)