Merge remote-tracking branch 'origin/pr/99'

* origin/pr/99:
  devices: add missing docstring for _get_device_classes
  devices: make iteration device classes compatible with Python2
  tools/qvm-device: make PEP8 happy
  tests/devices: add test for handling listing device classes
  tests/devices: make PEP8 happy
  devices: handle listing of available device classes
  devices: make PEP8 happy
This commit is contained in:
Marek Marczykowski-Górecki 2019-08-08 14:13:38 +02:00
commit 489efce9cb
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 170 additions and 124 deletions

View File

@ -20,7 +20,7 @@
# You should have received a copy of the GNU Lesser General Public License along # You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>. # with this program; if not, see <http://www.gnu.org/licenses/>.
'''API for various types of devices. """API for various types of devices.
Main concept is that some domain main Main concept is that some domain main
expose (potentially multiple) devices, which can be attached to other domains. expose (potentially multiple) devices, which can be attached to other domains.
@ -29,13 +29,14 @@ class is implemented by an extension.
Devices are identified by pair of (backend domain, `ident`), where `ident` is Devices are identified by pair of (backend domain, `ident`), where `ident` is
:py:class:`str`. :py:class:`str`.
''' """
class DeviceAssignment(object): # pylint: disable=too-few-public-methods class DeviceAssignment(object): # pylint: disable=too-few-public-methods
''' Maps a device to a frontend_domain. ''' """ Maps a device to a frontend_domain. """
def __init__(self, backend_domain, ident, options=None, def __init__(self, backend_domain, ident, options=None, persistent=False,
persistent=False, frontend_domain=None, devclass=None): frontend_domain=None, devclass=None):
self.backend_domain = backend_domain self.backend_domain = backend_domain
self.ident = ident self.ident = ident
self.devclass = devclass self.devclass = devclass
@ -57,7 +58,7 @@ class DeviceAssignment(object): # pylint: disable=too-few-public-methods
and self.ident == other.ident and self.ident == other.ident
def clone(self): def clone(self):
'''Clone object instance''' """Clone object instance"""
return self.__class__( return self.__class__(
self.backend_domain, self.backend_domain,
self.ident, self.ident,
@ -69,12 +70,13 @@ class DeviceAssignment(object): # pylint: disable=too-few-public-methods
@property @property
def device(self): def device(self):
'''Get DeviceInfo object corresponding to this DeviceAssignment''' """Get DeviceInfo object corresponding to this DeviceAssignment"""
return self.backend_domain.devices[self.devclass][self.ident] return self.backend_domain.devices[self.devclass][self.ident]
class DeviceInfo(object): class DeviceInfo(object):
''' Holds all information about a device ''' """ Holds all information about a device """
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, backend_domain, devclass, ident, description=None, def __init__(self, backend_domain, devclass, ident, description=None,
**kwargs): **kwargs):
@ -107,7 +109,7 @@ class DeviceInfo(object):
class UnknownDevice(DeviceInfo): class UnknownDevice(DeviceInfo):
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
'''Unknown device - for example exposed by domain not running currently''' """Unknown device - for example exposed by domain not running currently"""
def __init__(self, backend_domain, devclass, ident, description=None, def __init__(self, backend_domain, devclass, ident, description=None,
**kwargs): **kwargs):
@ -118,24 +120,25 @@ class UnknownDevice(DeviceInfo):
class DeviceCollection(object): class DeviceCollection(object):
'''Bag for devices. """Bag for devices.
Used as default value for :py:meth:`DeviceManager.__missing__` factory. Used as default value for :py:meth:`DeviceManager.__missing__` factory.
:param vm: VM for which we manage devices :param vm: VM for which we manage devices
:param class_: device class :param class_: device class
''' """
def __init__(self, vm, class_): def __init__(self, vm, class_):
self._vm = vm self._vm = vm
self._class = class_ self._class = class_
self._dev_cache = {} self._dev_cache = {}
def attach(self, device_assignment): def attach(self, device_assignment):
'''Attach (add) device to domain. """Attach (add) device to domain.
:param DeviceAssignment device_assignment: device object :param DeviceAssignment device_assignment: device object
''' """
if not device_assignment.frontend_domain: if not device_assignment.frontend_domain:
device_assignment.frontend_domain = self._vm device_assignment.frontend_domain = self._vm
@ -150,20 +153,21 @@ class DeviceCollection(object):
options = device_assignment.options.copy() options = device_assignment.options.copy()
if device_assignment.persistent: if device_assignment.persistent:
options['persistent'] = 'True' options['persistent'] = 'True'
options_str = ' '.join('{}={}'.format(opt, options_str = ' '.join('{}={}'.format(opt, val)
val) for opt, val in sorted(options.items())) for opt, val in sorted(options.items()))
self._vm.qubesd_call(None, self._vm.qubesd_call(None,
'admin.vm.device.{}.Attach'.format(self._class), 'admin.vm.device.{}.Attach'.format(self._class),
'{!s}+{!s}'.format(device_assignment.backend_domain, '{!s}+{!s}'.format(
device_assignment.backend_domain,
device_assignment.ident), device_assignment.ident),
options_str.encode('utf-8')) options_str.encode('utf-8'))
def detach(self, device_assignment): def detach(self, device_assignment):
'''Detach (remove) device from domain. """Detach (remove) device from domain.
:param DeviceAssignment device_assignment: device to detach :param DeviceAssignment device_assignment: device to detach
(obtained from :py:meth:`assignments`) (obtained from :py:meth:`assignments`)
''' """
if not device_assignment.frontend_domain: if not device_assignment.frontend_domain:
device_assignment.frontend_domain = self._vm device_assignment.frontend_domain = self._vm
else: else:
@ -176,11 +180,12 @@ class DeviceCollection(object):
self._vm.qubesd_call(None, self._vm.qubesd_call(None,
'admin.vm.device.{}.Detach'.format(self._class), 'admin.vm.device.{}.Detach'.format(self._class),
'{!s}+{!s}'.format(device_assignment.backend_domain, '{!s}+{!s}'.format(
device_assignment.backend_domain,
device_assignment.ident)) device_assignment.ident))
def assignments(self, persistent=None): def assignments(self, persistent=None):
'''List assignments for devices which are (or may be) attached to the """List assignments for devices which are (or may be) attached to the
vm. vm.
Devices may be attached persistently (so they are included in Devices may be attached persistently (so they are included in
@ -189,42 +194,47 @@ class DeviceCollection(object):
:param bool persistent: only include devices which are or are not :param bool persistent: only include devices which are or are not
attached persistently. attached persistently.
''' """
assignments_str = self._vm.qubesd_call(None, assignments_str = self._vm.qubesd_call(None,
'admin.vm.device.{}.List'.format(self._class)).decode() 'admin.vm.device.{}.List'.format(
self._class)).decode()
for assignment_str in assignments_str.splitlines(): for assignment_str in assignments_str.splitlines():
device, _, options_all = assignment_str.partition(' ') device, _, options_all = assignment_str.partition(' ')
backend_domain, ident = device.split('+', 1) backend_domain, ident = device.split('+', 1)
options = dict(opt_single.split('=', 1) options = dict(opt_single.split('=', 1)
for opt_single in options_all.split(' ') if opt_single) for opt_single in options_all.split(' ') if
opt_single)
dev_persistent = (options.pop('persistent', False) in dev_persistent = (options.pop('persistent', False) in
['True', 'yes', True]) ['True', 'yes', True])
if persistent is not None and dev_persistent != persistent: if persistent is not None and dev_persistent != persistent:
continue continue
backend_domain = self._vm.app.domains[backend_domain] backend_domain = self._vm.app.domains[backend_domain]
yield DeviceAssignment(backend_domain, ident, options, yield DeviceAssignment(backend_domain, ident, options,
persistent=dev_persistent, frontend_domain=self._vm, persistent=dev_persistent,
frontend_domain=self._vm,
devclass=self._class) devclass=self._class)
def attached(self): def attached(self):
'''List devices which are (or may be) attached to this vm ''' """List devices which are (or may be) attached to this vm """
for assignment in self.assignments(): for assignment in self.assignments():
yield assignment.device yield assignment.device
def persistent(self): def persistent(self):
''' Devices persistently attached and safe to access before libvirt """ Devices persistently attached and safe to access before libvirt
bootstrap. bootstrap.
''' """
for assignment in self.assignments(True): for assignment in self.assignments(True):
yield assignment.device yield assignment.device
def available(self): def available(self):
'''List devices exposed by this vm''' """List devices exposed by this vm"""
devices_str = self._vm.qubesd_call(None, devices_str = \
'admin.vm.device.{}.Available'.format(self._class)).decode() self._vm.qubesd_call(None,
'admin.vm.device.{}.Available'.format(
self._class)).decode()
for dev_str in devices_str.splitlines(): for dev_str in devices_str.splitlines():
ident, _, info = dev_str.partition(' ') ident, _, info = dev_str.partition(' ')
# description is special that it can contain spaces # description is special that it can contain spaces
@ -236,14 +246,15 @@ class DeviceCollection(object):
**info_dict) **info_dict)
def update_persistent(self, device, persistent): def update_persistent(self, device, persistent):
'''Update `persistent` flag of already attached device. """Update `persistent` flag of already attached device.
:param DeviceInfo device: device for which change persistent flag :param DeviceInfo device: device for which change persistent flag
:param bool persistent: new persistent flag :param bool persistent: new persistent flag
''' """
self._vm.qubesd_call(None, self._vm.qubesd_call(None,
'admin.vm.device.{}.Set.persistent'.format(self._class), 'admin.vm.device.{}.Set.persistent'.format(
self._class),
'{!s}+{!s}'.format(device.backend_domain, '{!s}+{!s}'.format(device.backend_domain,
device.ident), device.ident),
str(persistent).encode('utf-8')) str(persistent).encode('utf-8'))
@ -251,11 +262,11 @@ class DeviceCollection(object):
__iter__ = available __iter__ = available
def clear_cache(self): def clear_cache(self):
'''Clear cache of available devices''' """Clear cache of available devices"""
self._dev_cache.clear() self._dev_cache.clear()
def __getitem__(self, item): def __getitem__(self, item):
'''Get device object with given ident. """Get device object with given ident.
:returns: py:class:`DeviceInfo` :returns: py:class:`DeviceInfo`
@ -263,7 +274,7 @@ class DeviceCollection(object):
so return UnknownDevice object. Also do the same for non-existing so return UnknownDevice object. Also do the same for non-existing
devices - otherwise it will be impossible to detach already devices - otherwise it will be impossible to detach already
disconnected device. disconnected device.
''' """
# fist, check if we have cached device info # fist, check if we have cached device info
if item in self._dev_cache: if item in self._dev_cache:
return self._dev_cache[item] return self._dev_cache[item]
@ -277,12 +288,11 @@ class DeviceCollection(object):
return UnknownDevice(self._vm, self._class, item) return UnknownDevice(self._vm, self._class, item)
class DeviceManager(dict): class DeviceManager(dict):
'''Device manager that hold all devices by their classess. """Device manager that hold all devices by their classes.
:param vm: VM for which we manage devices :param vm: VM for which we manage devices
''' """
def __init__(self, vm): def __init__(self, vm):
super(DeviceManager, self).__init__() super(DeviceManager, self).__init__()
@ -291,3 +301,19 @@ class DeviceManager(dict):
def __missing__(self, key): def __missing__(self, key):
self[key] = DeviceCollection(self._vm, key) self[key] = DeviceCollection(self._vm, key)
return self[key] return self[key]
def __iter__(self):
return iter(self._get_device_classes())
def keys(self):
return self._get_device_classes()
def _get_device_classes(self):
"""Function used to call Qubesd in order to obtain
the device classes list
"""
device_classes = \
self._vm.app.qubesd_call('dom0', 'admin.deviceclass.List').decode()
device_classes = sorted(device_classes.splitlines())
return device_classes

View File

@ -103,7 +103,8 @@ class TC_00_DeviceCollection(qubesadmin.tests.QubesTestCase):
def test_020_attach(self): def test_020_attach(self):
self.app.expected_calls[ self.app.expected_calls[
('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1', b'')] = \ ('test-vm', 'admin.vm.device.test.Attach', 'test-vm2+dev1',
b'')] = \
b'0\0' b'0\0'
assign = qubesadmin.devices.DeviceAssignment( assign = qubesadmin.devices.DeviceAssignment(
self.app.domains['test-vm2'], 'dev1') self.app.domains['test-vm2'], 'dev1')
@ -305,3 +306,13 @@ class TC_00_DeviceCollection(qubesadmin.tests.QubesTestCase):
self.app.domains['test-vm2'], 'test', 'dev1') self.app.domains['test-vm2'], 'test', 'dev1')
self.vm.devices['test'].update_persistent(dev, False) self.vm.devices['test'].update_persistent(dev, False)
self.assertAllCalled() self.assertAllCalled()
def test_072_list(self):
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = \
b'0\x00block\nmic\nusb\n'
seen = set()
for devclass in self.app.domains['test-vm'].devices:
self.assertNotIn(devclass, seen)
seen.add(devclass)
self.assertEqual(seen, {'block', 'mic', 'usb'})

View File

@ -21,7 +21,7 @@
# with this program; if not, write to the Free Software Foundation, Inc., # with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''Qubes volume and block device managment''' """Qubes volume and block device managment"""
import argparse import argparse
import os import os
@ -34,7 +34,7 @@ import qubesadmin.devices
def prepare_table(dev_list): def prepare_table(dev_list):
''' Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a """ Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a
list of tupples for the :py:func:`qubes.tools.print_table`. list of tupples for the :py:func:`qubes.tools.print_table`.
If :program:`qvm-devices` is running in a TTY, it will ommit duplicate If :program:`qvm-devices` is running in a TTY, it will ommit duplicate
@ -43,7 +43,7 @@ def prepare_table(dev_list):
:param iterable dev_list: List of :py:class:`qubes.devices.DeviceInfo` :param iterable dev_list: List of :py:class:`qubes.devices.DeviceInfo`
objects. objects.
:returns: list of tupples :returns: list of tupples
''' """
output = [] output = []
header = [] header = []
if sys.stdout.isatty(): if sys.stdout.isatty():
@ -60,7 +60,8 @@ def prepare_table(dev_list):
class Line(object): class Line(object):
'''Helper class to hold single device info for listing''' """Helper class to hold single device info for listing"""
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, device: qubesadmin.devices.DeviceInfo, attached_to=None): def __init__(self, device: qubesadmin.devices.DeviceInfo, attached_to=None):
self.ident = "{!s}:{!s}".format(device.backend_domain, device.ident) self.ident = "{!s}:{!s}".format(device.backend_domain, device.ident)
@ -70,13 +71,13 @@ class Line(object):
@property @property
def assignments(self): def assignments(self):
'''list of frontends the device is assigned to''' """list of frontends the device is assigned to"""
return ', '.join(self.frontends) return ', '.join(self.frontends)
def list_devices(args): def list_devices(args):
''' Called by the parser to execute the qubes-devices list """ Called by the parser to execute the qubes-devices list
subcommand. ''' subcommand. """
app = args.app app = args.app
devices = set() devices = set()
@ -105,7 +106,8 @@ def list_devices(args):
if assignment.options: if assignment.options:
result[dev].frontends.append('{!s} ({})'.format( result[dev].frontends.append('{!s} ({})'.format(
domain, ', '.join('{}={}'.format(key, value) domain, ', '.join('{}={}'.format(key, value)
for key, value in assignment.options.items()))) for key, value in
assignment.options.items())))
else: else:
result[dev].frontends.append(str(domain)) result[dev].frontends.append(str(domain))
@ -113,9 +115,9 @@ def list_devices(args):
def attach_device(args): def attach_device(args):
''' Called by the parser to execute the :program:`qvm-devices attach` """ Called by the parser to execute the :program:`qvm-devices attach`
subcommand. subcommand.
''' """
device_assignment = args.device_assignment device_assignment = args.device_assignment
vm = args.domains[0] vm = args.domains[0]
options = dict(opt.split('=', 1) for opt in args.option or []) options = dict(opt.split('=', 1) for opt in args.option or [])
@ -127,9 +129,9 @@ def attach_device(args):
def detach_device(args): def detach_device(args):
''' Called by the parser to execute the :program:`qvm-devices detach` """ Called by the parser to execute the :program:`qvm-devices detach`
subcommand. subcommand.
''' """
vm = args.domains[0] vm = args.domains[0]
if args.device_assignment: if args.device_assignment:
vm.devices[args.devclass].detach(args.device_assignment) vm.devices[args.devclass].detach(args.device_assignment)
@ -139,7 +141,7 @@ def detach_device(args):
def init_list_parser(sub_parsers): def init_list_parser(sub_parsers):
''' Configures the parser for the :program:`qvm-devices list` subcommand ''' """ Configures the parser for the :program:`qvm-devices list` subcommand """
# pylint: disable=protected-access # pylint: disable=protected-access
list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'), list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'),
help='list devices') help='list devices')
@ -152,10 +154,10 @@ def init_list_parser(sub_parsers):
class DeviceAction(qubesadmin.tools.QubesAction): class DeviceAction(qubesadmin.tools.QubesAction):
''' Action for argument parser that gets the """ Action for argument parser that gets the
:py:class:``qubesadmin.device.DeviceAssignment`` from a :py:class:``qubesadmin.device.DeviceAssignment`` from a
BACKEND:DEVICE_ID string. BACKEND:DEVICE_ID string.
''' # pylint: disable=too-few-public-methods """ # pylint: disable=too-few-public-methods
def __init__(self, help='A backend & device id combination', def __init__(self, help='A backend & device id combination',
required=True, allow_unknown=False, **kwargs): required=True, allow_unknown=False, **kwargs):
@ -165,7 +167,7 @@ class DeviceAction(qubesadmin.tools.QubesAction):
**kwargs) **kwargs)
def __call__(self, parser, namespace, values, option_string=None): def __call__(self, parser, namespace, values, option_string=None):
''' Set ``namespace.device_assignment`` to ``values`` ''' """ Set ``namespace.device_assignment`` to ``values`` """
setattr(namespace, self.dest, values) setattr(namespace, self.dest, values)
def parse_qubes_app(self, parser, namespace): def parse_qubes_app(self, parser, namespace):
@ -177,6 +179,7 @@ class DeviceAction(qubesadmin.tools.QubesAction):
try: try:
vmname, device_id = backend_device_id.split(':', 1) vmname, device_id = backend_device_id.split(':', 1)
vm = None
try: try:
vm = app.domains[vmname] vm = app.domains[vmname]
except KeyError: except KeyError:
@ -184,25 +187,26 @@ class DeviceAction(qubesadmin.tools.QubesAction):
try: try:
dev = vm.devices[devclass][device_id] dev = vm.devices[devclass][device_id]
if not self.allow_unknown and isinstance(dev, if not self.allow_unknown and \
qubesadmin.devices.UnknownDevice): isinstance(dev, qubesadmin.devices.UnknownDevice):
raise KeyError(device_id) raise KeyError(device_id)
except KeyError: except KeyError:
parser.error_runtime( parser.error_runtime(
"backend vm {!r} doesn't expose device {!r}" "backend vm {!r} doesn't expose device {!r}".format(
.format(vmname, device_id)) vmname, device_id))
device_assignment = qubesadmin.devices.DeviceAssignment(vm, device_assignment = qubesadmin.devices.DeviceAssignment(
device_id) vm, device_id)
setattr(namespace, self.dest, device_assignment) setattr(namespace, self.dest, device_assignment)
except ValueError: except ValueError:
parser.error('expected a backend vm & device id combination ' \ parser.error(
'like foo:bar got %s' % backend_device_id) 'expected a backend vm & device id combination like foo:bar '
'got %s' % backend_device_id)
def get_parser(device_class=None): def get_parser(device_class=None):
'''Create :py:class:`argparse.ArgumentParser` suitable for """Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-block`. :program:`qvm-block`.
''' """
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__, parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True) want_app=True)
parser.register('action', 'parsers', parser.register('action', 'parsers',
@ -237,18 +241,23 @@ def get_parser(device_class=None):
dest='device_assignment', dest='device_assignment',
action=DeviceAction) action=DeviceAction)
detach_parser.add_argument(metavar='BACKEND:DEVICE_ID', detach_parser.add_argument(metavar='BACKEND:DEVICE_ID',
dest='device_assignment', nargs=argparse.OPTIONAL, dest='device_assignment',
nargs=argparse.OPTIONAL,
action=DeviceAction, allow_unknown=True) action=DeviceAction, allow_unknown=True)
attach_parser.add_argument('--option', '-o', action='append', attach_parser.add_argument('--option', '-o', action='append',
help="Set option for the device in opt=value form (can be specified " help="Set option for the device in opt=value "
"multiple times), see man qvm-device for details") "form (can be specified "
"multiple times), see man qvm-device for "
"details")
attach_parser.add_argument('--ro', action='store_true', default=False, attach_parser.add_argument('--ro', action='store_true', default=False,
help="Attach device read-only (alias for read-only=yes option, " help="Attach device read-only (alias for "
"read-only=yes option, "
"takes precedence)") "takes precedence)")
attach_parser.add_argument('--persistent', '-p', action='store_true', attach_parser.add_argument('--persistent', '-p', action='store_true',
default=False, default=False,
help="Attach device persistently (so it will be automatically " help="Attach device persistently (so it will "
"be automatically "
"attached at qube startup)") "attached at qube startup)")
attach_parser.set_defaults(func=attach_device) attach_parser.set_defaults(func=attach_device)
@ -258,7 +267,7 @@ def get_parser(device_class=None):
def main(args=None, app=None): def main(args=None, app=None):
'''Main routine of :program:`qvm-block`.''' """Main routine of :program:`qvm-block`."""
basename = os.path.basename(sys.argv[0]) basename = os.path.basename(sys.argv[0])
devclass = None devclass = None
if basename.startswith('qvm-') and basename != 'qvm-device': if basename.startswith('qvm-') and basename != 'qvm-device':