Added serial interface support

This commit is contained in:
Giulio 2020-05-27 01:12:28 +02:00
parent 94d105e9bc
commit 8d00509ab2
4 changed files with 63 additions and 9 deletions

View File

@ -134,6 +134,8 @@ class AdbCommands(object):
if serial and ':' in serial: if serial and ':' in serial:
self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms) self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms)
elif 'tty' in serial:
self._handle = common.SerialHandle(serial, timeout_ms=default_timeout_ms)
else: else:
self._handle = common.UsbHandle.FindAndOpen( self._handle = common.UsbHandle.FindAndOpen(
DeviceIsAvailable, port_path=port_path, serial=serial, DeviceIsAvailable, port_path=port_path, serial=serial,

View File

@ -1,8 +0,0 @@
from adb import adb_commands
from adb import sign_m2crypto
# Connect to the device
device = adb_commands.AdbCommands()
device.ConnectDevice(port_path=Non, serial="192.168.43.100")
device.Stat('/tmp')

View File

@ -22,6 +22,8 @@ import threading
import weakref import weakref
import select import select
import serial
import libusb1 import libusb1
import usb1 import usb1
@ -285,6 +287,60 @@ class UsbHandle(object):
if device_matcher is None or device_matcher(handle): if device_matcher is None or device_matcher(handle):
yield handle yield handle
class SerialHandle(object):
def __init__(self, serial, timeout_ms=None):
if isinstance(serial, (bytes, bytearray)):
serial = serial.decode('utf-8')
if ',' in serial:
self.port, self.speed = serial.split(',')
else:
self.port = serial
self.speed = 115200
self._connection = None
self._serial_number = '%s,%s' % (self.port, self.speed)
self._timeout_ms = float(timeout_ms) if timeout_ms else None
self._connect()
def _connect(self):
timeout = self.TimeoutSeconds(self._timeout_ms)
self._connection = serial.Serial(self.port, self.speed, timeout=timeout)
@property
def serial_number(self):
return self._serial_number
def BulkWrite(self, data, timeout=None):
t = self.TimeoutSeconds(timeout)
_, writeable, _ = select.select([], [self._connection], [], t)
if writeable:
return self._connection.write(data)
msg = 'Sending data to {} timed out after {}s. No data was sent.'.format(
self.serial_number, t)
raise serial.SerialTimeoutException(msg)
def BulkRead(self, numbytes, timeout=None):
t = self.TimeoutSeconds(timeout)
readable, _, _ = select.select([self._connection], [], [], t)
if readable:
return self._connection.read(numbytes)
msg = 'Reading from {} timed out (Timeout {}s)'.format(
self._serial_number, t)
#raise serial.TcpTimeoutException(msg)
def Timeout(self, timeout_ms):
return float(timeout_ms) if timeout_ms is not None else self._timeout_ms
def TimeoutSeconds(self, timeout_ms):
timeout = self.Timeout(timeout_ms)
return timeout / 1000.0 if timeout is not None else timeout
def Close(self):
return self._connection.close()
class TcpHandle(object): class TcpHandle(object):
"""TCP connection object. """TCP connection object.

View File

@ -6,7 +6,11 @@ queue = list()
# Connect to the device # Connect to the device
device = adb_commands.AdbCommands() device = adb_commands.AdbCommands()
device.ConnectDevice(port_path=None, serial="192.168.43.168:5555") # XCB over wifi
#device.ConnectDevice(port_path=None, serial="192.168.43.168:5555")
# XCB over serial port
device.ConnectDevice(port_path=None, serial="/dev/ttyS11,115200")
if sys.argv[1] == 'ls': if sys.argv[1] == 'ls':
root = device.List(sys.argv[2]) root = device.List(sys.argv[2])