422 lines
16 KiB
Python
422 lines
16 KiB
Python
# Copyright 2014 Google Inc. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""A libusb1-based ADB reimplementation.
|
|
|
|
ADB was giving us trouble with its client/server architecture, which is great
|
|
for users and developers, but not so great for reliable scripting. This will
|
|
allow us to more easily catch errors as Python exceptions instead of checking
|
|
random exit codes, and all the other great benefits from not going through
|
|
subprocess and a network socket.
|
|
|
|
All timeouts are in milliseconds.
|
|
"""
|
|
|
|
import io
|
|
import os
|
|
import socket
|
|
import posixpath
|
|
|
|
from adb import adb_protocol
|
|
from adb import common
|
|
from adb import filesync_protocol
|
|
|
|
# From adb.h
|
|
CLASS = 0xFF
|
|
SUBCLASS = 0x42
|
|
PROTOCOL = 0x01
|
|
# pylint: disable=invalid-name
|
|
DeviceIsAvailable = common.InterfaceMatcher(CLASS, SUBCLASS, PROTOCOL)
|
|
|
|
try:
|
|
# Imported locally to keep compatibility with previous code.
|
|
from adb.sign_m2crypto import M2CryptoSigner
|
|
except ImportError:
|
|
# Ignore this error when M2Crypto is not installed, there are other options.
|
|
pass
|
|
|
|
|
|
class AdbCommands(object):
|
|
"""Exposes adb-like methods for use.
|
|
|
|
Some methods are more-pythonic and/or have more options.
|
|
"""
|
|
protocol_handler = adb_protocol.AdbMessage
|
|
filesync_handler = filesync_protocol.FilesyncProtocol
|
|
|
|
def __init__(self):
|
|
|
|
self.__reset()
|
|
|
|
def __reset(self):
|
|
self.build_props = None
|
|
self._handle = None
|
|
self._device_state = None
|
|
|
|
# Connection table tracks each open AdbConnection objects per service type for program functions
|
|
# that choose to persist an AdbConnection object for their functionality, using
|
|
# self._get_service_connection
|
|
self._service_connections = {}
|
|
|
|
def _get_service_connection(self, service, service_command=None, create=True, timeout_ms=None):
|
|
"""
|
|
Based on the service, get the AdbConnection for that service or create one if it doesnt exist
|
|
|
|
:param service:
|
|
:param service_command: Additional service parameters to append
|
|
:param create: If False, dont create a connection if it does not exist
|
|
:return:
|
|
"""
|
|
|
|
connection = self._service_connections.get(service, None)
|
|
|
|
if connection:
|
|
return connection
|
|
|
|
if not connection and not create:
|
|
return None
|
|
|
|
if service_command:
|
|
destination_str = b'%s:%s' % (service, service_command)
|
|
else:
|
|
destination_str = service
|
|
|
|
connection = self.protocol_handler.Open(
|
|
self._handle, destination=destination_str, timeout_ms=timeout_ms)
|
|
|
|
self._service_connections.update({service: connection})
|
|
|
|
return connection
|
|
|
|
def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, **kwargs):
|
|
"""Convenience function to setup a transport handle for the adb device from
|
|
usb path or serial then connect to it.
|
|
|
|
Args:
|
|
port_path: The filename of usb port to use.
|
|
serial: The serial number of the device to use.
|
|
default_timeout_ms: The default timeout in milliseconds to use.
|
|
kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle)
|
|
banner: Connection banner to pass to the remote device
|
|
rsa_keys: List of AuthSigner subclass instances to be used for
|
|
authentication. The device can either accept one of these via the Sign
|
|
method, or we will send the result of GetPublicKey from the first one
|
|
if the device doesn't accept any of them.
|
|
auth_timeout_ms: Timeout to wait for when sending a new public key. This
|
|
is only relevant when we send a new public key. The device shows a
|
|
dialog and this timeout is how long to wait for that dialog. If used
|
|
in automation, this should be low to catch such a case as a failure
|
|
quickly; while in interactive settings it should be high to allow
|
|
users to accept the dialog. We default to automation here, so it's low
|
|
by default.
|
|
|
|
If serial specifies a TCP address:port, then a TCP connection is
|
|
used instead of a USB connection.
|
|
"""
|
|
|
|
# If there isnt a handle override (used by tests), build one here
|
|
if 'handle' in kwargs:
|
|
self._handle = kwargs.pop('handle')
|
|
else:
|
|
# if necessary, convert serial to a unicode string
|
|
if isinstance(serial, (bytes, bytearray)):
|
|
serial = serial.decode('utf-8')
|
|
|
|
if serial and ':' in serial:
|
|
self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms)
|
|
elif 'tty' in serial:
|
|
self._handle = common.SerialHandle(serial, timeout_ms=default_timeout_ms)
|
|
else:
|
|
self._handle = common.UsbHandle.FindAndOpen(
|
|
DeviceIsAvailable, port_path=port_path, serial=serial,
|
|
timeout_ms=default_timeout_ms)
|
|
|
|
self._Connect(**kwargs)
|
|
|
|
return self
|
|
|
|
def Close(self):
|
|
for conn in list(self._service_connections.values()):
|
|
if conn:
|
|
try:
|
|
conn.Close()
|
|
except:
|
|
pass
|
|
|
|
if self._handle:
|
|
self._handle.Close()
|
|
|
|
self.__reset()
|
|
|
|
def _Connect(self, banner=None, **kwargs):
|
|
"""Connect to the device.
|
|
|
|
Args:
|
|
banner: See protocol_handler.Connect.
|
|
**kwargs: See protocol_handler.Connect and adb_commands.ConnectDevice for kwargs.
|
|
Includes handle, rsa_keys, and auth_timeout_ms.
|
|
Returns:
|
|
An instance of this class if the device connected successfully.
|
|
"""
|
|
|
|
if not banner:
|
|
banner = socket.gethostname().encode()
|
|
|
|
conn_str = self.protocol_handler.Connect(self._handle, banner=banner, **kwargs)
|
|
|
|
# Remove banner and colons after device state (state::banner)
|
|
parts = conn_str.split(b'::')
|
|
self._device_state = parts[0]
|
|
|
|
# Break out the build prop info
|
|
self.build_props = str(parts[1].split(b';'))
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def Devices(cls):
|
|
"""Get a generator of UsbHandle for devices available."""
|
|
return common.UsbHandle.FindDevices(DeviceIsAvailable)
|
|
|
|
def GetState(self):
|
|
return self._device_state
|
|
|
|
def Install(self, apk_path, destination_dir='', replace_existing=True,
|
|
grant_permissions=False, timeout_ms=None, transfer_progress_callback=None):
|
|
"""Install an apk to the device.
|
|
|
|
Doesn't support verifier file, instead allows destination directory to be
|
|
overridden.
|
|
|
|
Args:
|
|
apk_path: Local path to apk to install.
|
|
destination_dir: Optional destination directory. Use /system/app/ for
|
|
persistent applications.
|
|
replace_existing: whether to replace existing application
|
|
grant_permissions: If True, grant all permissions to the app specified in its manifest
|
|
timeout_ms: Expected timeout for pushing and installing.
|
|
transfer_progress_callback: callback method that accepts filename, bytes_written and total_bytes of APK transfer
|
|
|
|
Returns:
|
|
The pm install output.
|
|
"""
|
|
if not destination_dir:
|
|
destination_dir = '/data/local/tmp/'
|
|
basename = os.path.basename(apk_path)
|
|
destination_path = posixpath.join(destination_dir, basename)
|
|
self.Push(apk_path, destination_path, timeout_ms=timeout_ms, progress_callback=transfer_progress_callback)
|
|
|
|
cmd = ['pm install']
|
|
if grant_permissions:
|
|
cmd.append('-g')
|
|
if replace_existing:
|
|
cmd.append('-r')
|
|
cmd.append('"{}"'.format(destination_path))
|
|
|
|
ret = self.Shell(' '.join(cmd), timeout_ms=timeout_ms)
|
|
|
|
# Remove the apk
|
|
rm_cmd = ['rm', destination_path]
|
|
rmret = self.Shell(' '.join(rm_cmd), timeout_ms=timeout_ms)
|
|
|
|
return ret
|
|
|
|
def Uninstall(self, package_name, keep_data=False, timeout_ms=None):
|
|
"""Removes a package from the device.
|
|
|
|
Args:
|
|
package_name: Package name of target package.
|
|
keep_data: whether to keep the data and cache directories
|
|
timeout_ms: Expected timeout for pushing and installing.
|
|
|
|
Returns:
|
|
The pm uninstall output.
|
|
"""
|
|
cmd = ['pm uninstall']
|
|
if keep_data:
|
|
cmd.append('-k')
|
|
cmd.append('"%s"' % package_name)
|
|
|
|
return self.Shell(' '.join(cmd), timeout_ms=timeout_ms)
|
|
|
|
def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progress_callback=None, st_mode=None):
|
|
"""Push a file or directory to the device.
|
|
|
|
Args:
|
|
source_file: Either a filename, a directory or file-like object to push to
|
|
the device.
|
|
device_filename: Destination on the device to write to.
|
|
mtime: Optional, modification time to set on the file.
|
|
timeout_ms: Expected timeout for any part of the push.
|
|
st_mode: stat mode for filename
|
|
progress_callback: callback method that accepts filename, bytes_written and total_bytes,
|
|
total_bytes will be -1 for file-like objects
|
|
"""
|
|
|
|
if isinstance(source_file, str):
|
|
if os.path.isdir(source_file):
|
|
self.Shell("mkdir " + device_filename)
|
|
for f in os.listdir(source_file):
|
|
self.Push(os.path.join(source_file, f), device_filename + '/' + f,
|
|
progress_callback=progress_callback)
|
|
return
|
|
source_file = open(source_file, "rb")
|
|
|
|
with source_file:
|
|
connection = self.protocol_handler.Open(
|
|
self._handle, destination=b'installer:', timeout_ms=timeout_ms)
|
|
kwargs={}
|
|
if st_mode is not None:
|
|
kwargs['st_mode'] = st_mode
|
|
self.filesync_handler.Push(connection, source_file, device_filename,
|
|
mtime=int(mtime), progress_callback=progress_callback, **kwargs)
|
|
connection.Close()
|
|
|
|
def Pull(self, device_filename, dest_file=None, timeout_ms=None, progress_callback=None):
|
|
"""Pull a file from the device.
|
|
|
|
Args:
|
|
device_filename: Filename on the device to pull.
|
|
dest_file: If set, a filename or writable file-like object.
|
|
timeout_ms: Expected timeout for any part of the pull.
|
|
progress_callback: callback method that accepts filename, bytes_written and total_bytes,
|
|
total_bytes will be -1 for file-like objects
|
|
|
|
Returns:
|
|
The file data if dest_file is not set. Otherwise, True if the destination file exists
|
|
"""
|
|
if not dest_file:
|
|
dest_file = io.BytesIO()
|
|
elif isinstance(dest_file, str):
|
|
dest_file = open(dest_file, 'wb')
|
|
elif isinstance(dest_file, file):
|
|
pass
|
|
else:
|
|
raise ValueError("destfile is of unknown type")
|
|
|
|
conn = self.protocol_handler.Open(
|
|
self._handle, destination=b'installer:', timeout_ms=timeout_ms)
|
|
|
|
self.filesync_handler.Pull(conn, device_filename, dest_file, progress_callback)
|
|
|
|
conn.Close()
|
|
if isinstance(dest_file, io.BytesIO):
|
|
return dest_file.getvalue()
|
|
else:
|
|
dest_file.close()
|
|
if hasattr(dest_file, 'name'):
|
|
return os.path.exists(dest_file.name)
|
|
# We don't know what the path is, so we just assume it exists.
|
|
return True
|
|
|
|
def Stat(self, device_filename):
|
|
"""Get a file's stat() information."""
|
|
connection = self.protocol_handler.Open(self._handle, destination=b'installer:')
|
|
mode, size, mtime = self.filesync_handler.Stat(
|
|
connection, device_filename)
|
|
connection.Close()
|
|
return mode, size, mtime
|
|
|
|
def List(self, device_path):
|
|
"""Return a directory listing of the given path.
|
|
|
|
Args:
|
|
device_path: Directory to list.
|
|
"""
|
|
connection = self.protocol_handler.Open(self._handle, destination=b'installer:')
|
|
listing = self.filesync_handler.List(connection, device_path)
|
|
connection.Close()
|
|
return listing
|
|
|
|
def Reboot(self, destination=b''):
|
|
"""Reboot the device.
|
|
|
|
Args:
|
|
destination: Specify 'bootloader' for fastboot.
|
|
"""
|
|
self.protocol_handler.Open(self._handle, b'reboot:%s' % destination)
|
|
|
|
def RebootBootloader(self):
|
|
"""Reboot device into fastboot."""
|
|
self.Reboot(b'bootloader')
|
|
|
|
def Remount(self):
|
|
"""Remount / as read-write."""
|
|
return self.protocol_handler.Command(self._handle, service=b'remount')
|
|
|
|
def Root(self):
|
|
"""Restart adbd as root on the device."""
|
|
return self.protocol_handler.Command(self._handle, service=b'root')
|
|
|
|
def EnableVerity(self):
|
|
"""Re-enable dm-verity checking on userdebug builds"""
|
|
return self.protocol_handler.Command(self._handle, service=b'enable-verity')
|
|
|
|
def DisableVerity(self):
|
|
"""Disable dm-verity checking on userdebug builds"""
|
|
return self.protocol_handler.Command(self._handle, service=b'disable-verity')
|
|
|
|
def Shell(self, command, timeout_ms=None):
|
|
"""Run command on the device, returning the output.
|
|
|
|
Args:
|
|
command: Shell command to run
|
|
timeout_ms: Maximum time to allow the command to run.
|
|
"""
|
|
return self.protocol_handler.Command(
|
|
self._handle, service=b'shell', command=command,
|
|
timeout_ms=timeout_ms)
|
|
|
|
def StreamingShell(self, command, timeout_ms=None):
|
|
"""Run command on the device, yielding each line of output.
|
|
|
|
Args:
|
|
command: Command to run on the target.
|
|
timeout_ms: Maximum time to allow the command to run.
|
|
|
|
Yields:
|
|
The responses from the shell command.
|
|
"""
|
|
return self.protocol_handler.StreamingCommand(
|
|
self._handle, service=b'shell', command=command,
|
|
timeout_ms=timeout_ms)
|
|
|
|
def Logcat(self, options, timeout_ms=None):
|
|
"""Run 'shell logcat' and stream the output to stdout.
|
|
|
|
Args:
|
|
options: Arguments to pass to 'logcat'.
|
|
timeout_ms: Maximum time to allow the command to run.
|
|
"""
|
|
return self.StreamingShell('logcat %s' % options, timeout_ms)
|
|
|
|
def InteractiveShell(self, cmd=None, strip_cmd=True, delim=None, strip_delim=True):
|
|
"""Get stdout from the currently open interactive shell and optionally run a command
|
|
on the device, returning all output.
|
|
|
|
Args:
|
|
cmd: Optional. Command to run on the target.
|
|
strip_cmd: Optional (default True). Strip command name from stdout.
|
|
delim: Optional. Delimiter to look for in the output to know when to stop expecting more output
|
|
(usually the shell prompt)
|
|
strip_delim: Optional (default True): Strip the provided delimiter from the output
|
|
|
|
Returns:
|
|
The stdout from the shell command.
|
|
"""
|
|
conn = self._get_service_connection(b'shell:')
|
|
|
|
return self.protocol_handler.InteractiveShellCommand(
|
|
conn, cmd=cmd, strip_cmd=strip_cmd,
|
|
delim=delim, strip_delim=strip_delim)
|