|
- # Copyright 2014 Google Inc. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """A libusb1-based ADB reimplementation.
- ADB was giving us trouble with its client/server architecture, which is great
- for users and developers, but not so great for reliable scripting. This will
- allow us to more easily catch errors as Python exceptions instead of checking
- random exit codes, and all the other great benefits from not going through
- subprocess and a network socket.
- All timeouts are in milliseconds.
- """
- import io
- import os
- import socket
- import posixpath
- from adb import adb_protocol
- from adb import common
- from adb import filesync_protocol
- # From adb.h
- CLASS = 0xFF
- SUBCLASS = 0x42
- PROTOCOL = 0x01
- # pylint: disable=invalid-name
- DeviceIsAvailable = common.InterfaceMatcher(CLASS, SUBCLASS, PROTOCOL)
- try:
- # Imported locally to keep compatibility with previous code.
- from adb.sign_m2crypto import M2CryptoSigner
- except ImportError:
- # Ignore this error when M2Crypto is not installed, there are other options.
- pass
- class AdbCommands(object):
- """Exposes adb-like methods for use.
- Some methods are more-pythonic and/or have more options.
- """
- protocol_handler = adb_protocol.AdbMessage
- filesync_handler = filesync_protocol.FilesyncProtocol
- def __init__(self):
- self.__reset()
- def __reset(self):
- self.build_props = None
- self._handle = None
- self._device_state = None
- # Connection table tracks each open AdbConnection objects per service type for program functions
- # that choose to persist an AdbConnection object for their functionality, using
- # self._get_service_connection
- self._service_connections = {}
- def _get_service_connection(self, service, service_command=None, create=True, timeout_ms=None):
- """
- Based on the service, get the AdbConnection for that service or create one if it doesnt exist
- :param service:
- :param service_command: Additional service parameters to append
- :param create: If False, dont create a connection if it does not exist
- :return:
- """
- connection = self._service_connections.get(service, None)
- if connection:
- return connection
- if not connection and not create:
- return None
- if service_command:
- destination_str = b'%s:%s' % (service, service_command)
- else:
- destination_str = service
- connection = self.protocol_handler.Open(
- self._handle, destination=destination_str, timeout_ms=timeout_ms)
- self._service_connections.update({service: connection})
- return connection
- def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, **kwargs):
- """Convenience function to setup a transport handle for the adb device from
- usb path or serial then connect to it.
- Args:
- port_path: The filename of usb port to use.
- serial: The serial number of the device to use.
- default_timeout_ms: The default timeout in milliseconds to use.
- kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle)
- banner: Connection banner to pass to the remote device
- rsa_keys: List of AuthSigner subclass instances to be used for
- authentication. The device can either accept one of these via the Sign
- method, or we will send the result of GetPublicKey from the first one
- if the device doesn't accept any of them.
- auth_timeout_ms: Timeout to wait for when sending a new public key. This
- is only relevant when we send a new public key. The device shows a
- dialog and this timeout is how long to wait for that dialog. If used
- in automation, this should be low to catch such a case as a failure
- quickly; while in interactive settings it should be high to allow
- users to accept the dialog. We default to automation here, so it's low
- by default.
- If serial specifies a TCP address:port, then a TCP connection is
- used instead of a USB connection.
- """
- # If there isnt a handle override (used by tests), build one here
- if 'handle' in kwargs:
- self._handle = kwargs.pop('handle')
- else:
- # if necessary, convert serial to a unicode string
- if isinstance(serial, (bytes, bytearray)):
- serial = serial.decode('utf-8')
- if serial and ':' in serial:
- self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms)
- elif 'tty' in serial:
- self._handle = common.SerialHandle(serial, timeout_ms=default_timeout_ms)
- else:
- self._handle = common.UsbHandle.FindAndOpen(
- DeviceIsAvailable, port_path=port_path, serial=serial,
- timeout_ms=default_timeout_ms)
- self._Connect(**kwargs)
- return self
- def Close(self):
- for conn in list(self._service_connections.values()):
- if conn:
- try:
- conn.Close()
- except:
- pass
- if self._handle:
- self._handle.Close()
- self.__reset()
- def _Connect(self, banner=None, **kwargs):
- """Connect to the device.
- Args:
- banner: See protocol_handler.Connect.
- **kwargs: See protocol_handler.Connect and adb_commands.ConnectDevice for kwargs.
- Includes handle, rsa_keys, and auth_timeout_ms.
- Returns:
- An instance of this class if the device connected successfully.
- """
- if not banner:
- banner = socket.gethostname().encode()
- conn_str = self.protocol_handler.Connect(self._handle, banner=banner, **kwargs)
- # Remove banner and colons after device state (state::banner)
- parts = conn_str.split(b'::')
- self._device_state = parts[0]
- # Break out the build prop info
- self.build_props = str(parts[1].split(b';'))
- return True
- @classmethod
- def Devices(cls):
- """Get a generator of UsbHandle for devices available."""
- return common.UsbHandle.FindDevices(DeviceIsAvailable)
- def GetState(self):
- return self._device_state
- def Install(self, apk_path, destination_dir='', replace_existing=True,
- grant_permissions=False, timeout_ms=None, transfer_progress_callback=None):
- """Install an apk to the device.
- Doesn't support verifier file, instead allows destination directory to be
- overridden.
- Args:
- apk_path: Local path to apk to install.
- destination_dir: Optional destination directory. Use /system/app/ for
- persistent applications.
- replace_existing: whether to replace existing application
- grant_permissions: If True, grant all permissions to the app specified in its manifest
- timeout_ms: Expected timeout for pushing and installing.
- transfer_progress_callback: callback method that accepts filename, bytes_written and total_bytes of APK transfer
- Returns:
- The pm install output.
- """
- if not destination_dir:
- destination_dir = '/data/local/tmp/'
- basename = os.path.basename(apk_path)
- destination_path = posixpath.join(destination_dir, basename)
- self.Push(apk_path, destination_path, timeout_ms=timeout_ms, progress_callback=transfer_progress_callback)
- cmd = ['pm install']
- if grant_permissions:
- cmd.append('-g')
- if replace_existing:
- cmd.append('-r')
- cmd.append('"{}"'.format(destination_path))
- ret = self.Shell(' '.join(cmd), timeout_ms=timeout_ms)
- # Remove the apk
- rm_cmd = ['rm', destination_path]
- rmret = self.Shell(' '.join(rm_cmd), timeout_ms=timeout_ms)
- return ret
- def Uninstall(self, package_name, keep_data=False, timeout_ms=None):
- """Removes a package from the device.
- Args:
- package_name: Package name of target package.
- keep_data: whether to keep the data and cache directories
- timeout_ms: Expected timeout for pushing and installing.
- Returns:
- The pm uninstall output.
- """
- cmd = ['pm uninstall']
- if keep_data:
- cmd.append('-k')
- cmd.append('"%s"' % package_name)
- return self.Shell(' '.join(cmd), timeout_ms=timeout_ms)
- def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progress_callback=None, st_mode=None):
- """Push a file or directory to the device.
- Args:
- source_file: Either a filename, a directory or file-like object to push to
- the device.
- device_filename: Destination on the device to write to.
- mtime: Optional, modification time to set on the file.
- timeout_ms: Expected timeout for any part of the push.
- st_mode: stat mode for filename
- progress_callback: callback method that accepts filename, bytes_written and total_bytes,
- total_bytes will be -1 for file-like objects
- """
- if isinstance(source_file, str):
- if os.path.isdir(source_file):
- self.Shell("mkdir " + device_filename)
- for f in os.listdir(source_file):
- self.Push(os.path.join(source_file, f), device_filename + '/' + f,
- progress_callback=progress_callback)
- return
- source_file = open(source_file, "rb")
- with source_file:
- connection = self.protocol_handler.Open(
- self._handle, destination=b'installer:', timeout_ms=timeout_ms)
- kwargs={}
- if st_mode is not None:
- kwargs['st_mode'] = st_mode
- self.filesync_handler.Push(connection, source_file, device_filename,
- mtime=int(mtime), progress_callback=progress_callback, **kwargs)
- connection.Close()
- def Pull(self, device_filename, dest_file=None, timeout_ms=None, progress_callback=None):
- """Pull a file from the device.
- Args:
- device_filename: Filename on the device to pull.
- dest_file: If set, a filename or writable file-like object.
- timeout_ms: Expected timeout for any part of the pull.
- progress_callback: callback method that accepts filename, bytes_written and total_bytes,
- total_bytes will be -1 for file-like objects
- Returns:
- The file data if dest_file is not set. Otherwise, True if the destination file exists
- """
- if not dest_file:
- dest_file = io.BytesIO()
- elif isinstance(dest_file, str):
- dest_file = open(dest_file, 'wb')
- elif isinstance(dest_file, file):
- pass
- else:
- raise ValueError("destfile is of unknown type")
- conn = self.protocol_handler.Open(
- self._handle, destination=b'installer:', timeout_ms=timeout_ms)
- self.filesync_handler.Pull(conn, device_filename, dest_file, progress_callback)
- conn.Close()
- if isinstance(dest_file, io.BytesIO):
- return dest_file.getvalue()
- else:
- dest_file.close()
- if hasattr(dest_file, 'name'):
- return os.path.exists(dest_file.name)
- # We don't know what the path is, so we just assume it exists.
- return True
- def Stat(self, device_filename):
- """Get a file's stat() information."""
- connection = self.protocol_handler.Open(self._handle, destination=b'installer:')
- mode, size, mtime = self.filesync_handler.Stat(
- connection, device_filename)
- connection.Close()
- return mode, size, mtime
- def List(self, device_path):
- """Return a directory listing of the given path.
- Args:
- device_path: Directory to list.
- """
- connection = self.protocol_handler.Open(self._handle, destination=b'installer:')
- listing = self.filesync_handler.List(connection, device_path)
- connection.Close()
- return listing
- def Reboot(self, destination=b''):
- """Reboot the device.
- Args:
- destination: Specify 'bootloader' for fastboot.
- """
- self.protocol_handler.Open(self._handle, b'reboot:%s' % destination)
- def RebootBootloader(self):
- """Reboot device into fastboot."""
- self.Reboot(b'bootloader')
- def Remount(self):
- """Remount / as read-write."""
- return self.protocol_handler.Command(self._handle, service=b'remount')
- def Root(self):
- """Restart adbd as root on the device."""
- return self.protocol_handler.Command(self._handle, service=b'root')
- def EnableVerity(self):
- """Re-enable dm-verity checking on userdebug builds"""
- return self.protocol_handler.Command(self._handle, service=b'enable-verity')
- def DisableVerity(self):
- """Disable dm-verity checking on userdebug builds"""
- return self.protocol_handler.Command(self._handle, service=b'disable-verity')
- def Shell(self, command, timeout_ms=None):
- """Run command on the device, returning the output.
- Args:
- command: Shell command to run
- timeout_ms: Maximum time to allow the command to run.
- """
- return self.protocol_handler.Command(
- self._handle, service=b'shell', command=command,
- timeout_ms=timeout_ms)
- def StreamingShell(self, command, timeout_ms=None):
- """Run command on the device, yielding each line of output.
- Args:
- command: Command to run on the target.
- timeout_ms: Maximum time to allow the command to run.
- Yields:
- The responses from the shell command.
- """
- return self.protocol_handler.StreamingCommand(
- self._handle, service=b'shell', command=command,
- timeout_ms=timeout_ms)
- def Logcat(self, options, timeout_ms=None):
- """Run 'shell logcat' and stream the output to stdout.
- Args:
- options: Arguments to pass to 'logcat'.
- timeout_ms: Maximum time to allow the command to run.
- """
- return self.StreamingShell('logcat %s' % options, timeout_ms)
- def InteractiveShell(self, cmd=None, strip_cmd=True, delim=None, strip_delim=True):
- """Get stdout from the currently open interactive shell and optionally run a command
- on the device, returning all output.
- Args:
- cmd: Optional. Command to run on the target.
- strip_cmd: Optional (default True). Strip command name from stdout.
- delim: Optional. Delimiter to look for in the output to know when to stop expecting more output
- (usually the shell prompt)
- strip_delim: Optional (default True): Strip the provided delimiter from the output
- Returns:
- The stdout from the shell command.
- """
- conn = self._get_service_connection(b'shell:')
- return self.protocol_handler.InteractiveShellCommand(
- conn, cmd=cmd, strip_cmd=strip_cmd,
- delim=delim, strip_delim=strip_delim)
|