# Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """A libusb1-based ADB reimplementation. ADB was giving us trouble with its client/server architecture, which is great for users and developers, but not so great for reliable scripting. This will allow us to more easily catch errors as Python exceptions instead of checking random exit codes, and all the other great benefits from not going through subprocess and a network socket. All timeouts are in milliseconds. """ import io import os import socket import posixpath from adb import adb_protocol from adb import common from adb import filesync_protocol # From adb.h CLASS = 0xFF SUBCLASS = 0x42 PROTOCOL = 0x01 # pylint: disable=invalid-name DeviceIsAvailable = common.InterfaceMatcher(CLASS, SUBCLASS, PROTOCOL) try: # Imported locally to keep compatibility with previous code. from adb.sign_m2crypto import M2CryptoSigner except ImportError: # Ignore this error when M2Crypto is not installed, there are other options. pass class AdbCommands(object): """Exposes adb-like methods for use. Some methods are more-pythonic and/or have more options. """ protocol_handler = adb_protocol.AdbMessage filesync_handler = filesync_protocol.FilesyncProtocol def __init__(self): self.__reset() def __reset(self): self.build_props = None self._handle = None self._device_state = None # Connection table tracks each open AdbConnection objects per service type for program functions # that choose to persist an AdbConnection object for their functionality, using # self._get_service_connection self._service_connections = {} def _get_service_connection(self, service, service_command=None, create=True, timeout_ms=None): """ Based on the service, get the AdbConnection for that service or create one if it doesnt exist :param service: :param service_command: Additional service parameters to append :param create: If False, dont create a connection if it does not exist :return: """ connection = self._service_connections.get(service, None) if connection: return connection if not connection and not create: return None if service_command: destination_str = b'%s:%s' % (service, service_command) else: destination_str = service connection = self.protocol_handler.Open( self._handle, destination=destination_str, timeout_ms=timeout_ms) self._service_connections.update({service: connection}) return connection def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, **kwargs): """Convenience function to setup a transport handle for the adb device from usb path or serial then connect to it. Args: port_path: The filename of usb port to use. serial: The serial number of the device to use. default_timeout_ms: The default timeout in milliseconds to use. kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle) banner: Connection banner to pass to the remote device rsa_keys: List of AuthSigner subclass instances to be used for authentication. The device can either accept one of these via the Sign method, or we will send the result of GetPublicKey from the first one if the device doesn't accept any of them. auth_timeout_ms: Timeout to wait for when sending a new public key. This is only relevant when we send a new public key. The device shows a dialog and this timeout is how long to wait for that dialog. If used in automation, this should be low to catch such a case as a failure quickly; while in interactive settings it should be high to allow users to accept the dialog. We default to automation here, so it's low by default. If serial specifies a TCP address:port, then a TCP connection is used instead of a USB connection. """ # If there isnt a handle override (used by tests), build one here if 'handle' in kwargs: self._handle = kwargs.pop('handle') else: # if necessary, convert serial to a unicode string if isinstance(serial, (bytes, bytearray)): serial = serial.decode('utf-8') if serial and ':' in serial: self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms) elif 'tty' in serial: self._handle = common.SerialHandle(serial, timeout_ms=default_timeout_ms) else: self._handle = common.UsbHandle.FindAndOpen( DeviceIsAvailable, port_path=port_path, serial=serial, timeout_ms=default_timeout_ms) self._Connect(**kwargs) return self def Close(self): for conn in list(self._service_connections.values()): if conn: try: conn.Close() except: pass if self._handle: self._handle.Close() self.__reset() def _Connect(self, banner=None, **kwargs): """Connect to the device. Args: banner: See protocol_handler.Connect. **kwargs: See protocol_handler.Connect and adb_commands.ConnectDevice for kwargs. Includes handle, rsa_keys, and auth_timeout_ms. Returns: An instance of this class if the device connected successfully. """ if not banner: banner = socket.gethostname().encode() conn_str = self.protocol_handler.Connect(self._handle, banner=banner, **kwargs) # Remove banner and colons after device state (state::banner) parts = conn_str.split(b'::') self._device_state = parts[0] # Break out the build prop info self.build_props = str(parts[1].split(b';')) return True @classmethod def Devices(cls): """Get a generator of UsbHandle for devices available.""" return common.UsbHandle.FindDevices(DeviceIsAvailable) def GetState(self): return self._device_state def Install(self, apk_path, destination_dir='', replace_existing=True, grant_permissions=False, timeout_ms=None, transfer_progress_callback=None): """Install an apk to the device. Doesn't support verifier file, instead allows destination directory to be overridden. Args: apk_path: Local path to apk to install. destination_dir: Optional destination directory. Use /system/app/ for persistent applications. replace_existing: whether to replace existing application grant_permissions: If True, grant all permissions to the app specified in its manifest timeout_ms: Expected timeout for pushing and installing. transfer_progress_callback: callback method that accepts filename, bytes_written and total_bytes of APK transfer Returns: The pm install output. """ if not destination_dir: destination_dir = '/data/local/tmp/' basename = os.path.basename(apk_path) destination_path = posixpath.join(destination_dir, basename) self.Push(apk_path, destination_path, timeout_ms=timeout_ms, progress_callback=transfer_progress_callback) cmd = ['pm install'] if grant_permissions: cmd.append('-g') if replace_existing: cmd.append('-r') cmd.append('"{}"'.format(destination_path)) ret = self.Shell(' '.join(cmd), timeout_ms=timeout_ms) # Remove the apk rm_cmd = ['rm', destination_path] rmret = self.Shell(' '.join(rm_cmd), timeout_ms=timeout_ms) return ret def Uninstall(self, package_name, keep_data=False, timeout_ms=None): """Removes a package from the device. Args: package_name: Package name of target package. keep_data: whether to keep the data and cache directories timeout_ms: Expected timeout for pushing and installing. Returns: The pm uninstall output. """ cmd = ['pm uninstall'] if keep_data: cmd.append('-k') cmd.append('"%s"' % package_name) return self.Shell(' '.join(cmd), timeout_ms=timeout_ms) def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progress_callback=None, st_mode=None): """Push a file or directory to the device. Args: source_file: Either a filename, a directory or file-like object to push to the device. device_filename: Destination on the device to write to. mtime: Optional, modification time to set on the file. timeout_ms: Expected timeout for any part of the push. st_mode: stat mode for filename progress_callback: callback method that accepts filename, bytes_written and total_bytes, total_bytes will be -1 for file-like objects """ if isinstance(source_file, str): if os.path.isdir(source_file): self.Shell("mkdir " + device_filename) for f in os.listdir(source_file): self.Push(os.path.join(source_file, f), device_filename + '/' + f, progress_callback=progress_callback) return source_file = open(source_file, "rb") with source_file: connection = self.protocol_handler.Open( self._handle, destination=b'installer:', timeout_ms=timeout_ms) kwargs={} if st_mode is not None: kwargs['st_mode'] = st_mode self.filesync_handler.Push(connection, source_file, device_filename, mtime=int(mtime), progress_callback=progress_callback, **kwargs) connection.Close() def Pull(self, device_filename, dest_file=None, timeout_ms=None, progress_callback=None): """Pull a file from the device. Args: device_filename: Filename on the device to pull. dest_file: If set, a filename or writable file-like object. timeout_ms: Expected timeout for any part of the pull. progress_callback: callback method that accepts filename, bytes_written and total_bytes, total_bytes will be -1 for file-like objects Returns: The file data if dest_file is not set. Otherwise, True if the destination file exists """ if not dest_file: dest_file = io.BytesIO() elif isinstance(dest_file, str): dest_file = open(dest_file, 'wb') elif isinstance(dest_file, file): pass else: raise ValueError("destfile is of unknown type") conn = self.protocol_handler.Open( self._handle, destination=b'installer:', timeout_ms=timeout_ms) self.filesync_handler.Pull(conn, device_filename, dest_file, progress_callback) conn.Close() if isinstance(dest_file, io.BytesIO): return dest_file.getvalue() else: dest_file.close() if hasattr(dest_file, 'name'): return os.path.exists(dest_file.name) # We don't know what the path is, so we just assume it exists. return True def Stat(self, device_filename): """Get a file's stat() information.""" connection = self.protocol_handler.Open(self._handle, destination=b'installer:') mode, size, mtime = self.filesync_handler.Stat( connection, device_filename) connection.Close() return mode, size, mtime def List(self, device_path): """Return a directory listing of the given path. Args: device_path: Directory to list. """ connection = self.protocol_handler.Open(self._handle, destination=b'installer:') listing = self.filesync_handler.List(connection, device_path) connection.Close() return listing def Reboot(self, destination=b''): """Reboot the device. Args: destination: Specify 'bootloader' for fastboot. """ self.protocol_handler.Open(self._handle, b'reboot:%s' % destination) def RebootBootloader(self): """Reboot device into fastboot.""" self.Reboot(b'bootloader') def Remount(self): """Remount / as read-write.""" return self.protocol_handler.Command(self._handle, service=b'remount') def Root(self): """Restart adbd as root on the device.""" return self.protocol_handler.Command(self._handle, service=b'root') def EnableVerity(self): """Re-enable dm-verity checking on userdebug builds""" return self.protocol_handler.Command(self._handle, service=b'enable-verity') def DisableVerity(self): """Disable dm-verity checking on userdebug builds""" return self.protocol_handler.Command(self._handle, service=b'disable-verity') def Shell(self, command, timeout_ms=None): """Run command on the device, returning the output. Args: command: Shell command to run timeout_ms: Maximum time to allow the command to run. """ return self.protocol_handler.Command( self._handle, service=b'shell', command=command, timeout_ms=timeout_ms) def StreamingShell(self, command, timeout_ms=None): """Run command on the device, yielding each line of output. Args: command: Command to run on the target. timeout_ms: Maximum time to allow the command to run. Yields: The responses from the shell command. """ return self.protocol_handler.StreamingCommand( self._handle, service=b'shell', command=command, timeout_ms=timeout_ms) def Logcat(self, options, timeout_ms=None): """Run 'shell logcat' and stream the output to stdout. Args: options: Arguments to pass to 'logcat'. timeout_ms: Maximum time to allow the command to run. """ return self.StreamingShell('logcat %s' % options, timeout_ms) def InteractiveShell(self, cmd=None, strip_cmd=True, delim=None, strip_delim=True): """Get stdout from the currently open interactive shell and optionally run a command on the device, returning all output. Args: cmd: Optional. Command to run on the target. strip_cmd: Optional (default True). Strip command name from stdout. delim: Optional. Delimiter to look for in the output to know when to stop expecting more output (usually the shell prompt) strip_delim: Optional (default True): Strip the provided delimiter from the output Returns: The stdout from the shell command. """ conn = self._get_service_connection(b'shell:') return self.protocol_handler.InteractiveShellCommand( conn, cmd=cmd, strip_cmd=strip_cmd, delim=delim, strip_delim=strip_delim)