diff --git a/qubesmgmt/devices.py b/qubesmgmt/devices.py new file mode 100644 index 0000000..ed598f3 --- /dev/null +++ b/qubesmgmt/devices.py @@ -0,0 +1,254 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2015-2016 Wojtek Porczyk +# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, see . + +'''API for various types of devices. + +Main concept is that some domain main +expose (potentially multiple) devices, which can be attached to other domains. +Devices can be of different classes (like 'pci', 'usb', etc). Each device +class is implemented by an extension. + +Devices are identified by pair of (backend domain, `ident`), where `ident` is +:py:class:`str`. +''' + +class DeviceAssignment(object): # pylint: disable=too-few-public-methods + ''' Maps a device to a frontend_domain. ''' + + def __init__(self, backend_domain, ident, options=None, persistent=False, + frontend_domain=None): + self.backend_domain = backend_domain + self.ident = ident + self.options = options or {} + self.persistent = persistent + self.frontend_domain = frontend_domain + + def __repr__(self): + return "[%s]:%s" % (self.backend_domain, self.ident) + + def __hash__(self): + return hash((self.backend_domain, self.ident)) + + def __eq__(self, other): + if not isinstance(self, other.__class__): + return NotImplemented + + return self.backend_domain == other.backend_domain \ + and self.ident == other.ident + + +class DeviceInfo(object): + ''' Holds all information about a device ''' + # pylint: disable=too-few-public-methods + def __init__(self, backend_domain, ident, description=None, + options=None, **kwargs): + #: domain providing this device + self.backend_domain = backend_domain + #: device identifier (unique for given domain and device type) + self.ident = ident + #: human readable description/name of the device + self.description = description + self.options = options or dict() + self.data = kwargs + + def __hash__(self): + return hash((str(self.backend_domain), self.ident)) + + def __eq__(self, other): + return ( + self.backend_domain == other.backend_domain and + self.ident == other.ident + ) + + def __str__(self): + return '{!s}:{!s}'.format(self.backend_domain, self.ident) + + +class UnknownDevice(DeviceInfo): + # pylint: disable=too-few-public-methods + '''Unknown device - for example exposed by domain not running currently''' + + def __init__(self, backend_domain, ident, description=None, + **kwargs): + if description is None: + description = "Unknown device" + super(UnknownDevice, self).__init__(backend_domain, ident, description, + **kwargs) + + +class DeviceCollection(object): + '''Bag for devices. + + Used as default value for :py:meth:`DeviceManager.__missing__` factory. + + :param vm: VM for which we manage devices + :param class_: device class + + ''' + def __init__(self, vm, class_): + self._vm = vm + self._class = class_ + self._dev_cache = {} + + def attach(self, device_assignment): + '''Attach (add) device to domain. + + :param DeviceAssignment device_assignment: device object + ''' + + if not device_assignment.frontend_domain: + device_assignment.frontend_domain = self._vm + else: + assert device_assignment.frontend_domain == self._vm, \ + "Trying to attach DeviceAssignment belonging to other domain" + + options = device_assignment.options.copy() + if device_assignment.persistent: + options['persistent'] = 'yes' + options_str = ' '.join('{}={}'.format(opt, + val) for opt, val in sorted(options.items())) + self._vm.qubesd_call(None, + 'mgmt.vm.device.{}.Attach'.format(self._class), + '{!s}+{!s}'.format(device_assignment.backend_domain, + device_assignment.ident), + options_str.encode('utf-8')) + + def detach(self, device_assignment): + '''Detach (remove) device from domain. + + :param DeviceAssignment device_assignment: device to detach + (obtained from :py:meth:`assignments`) + ''' + if not device_assignment.frontend_domain: + device_assignment.frontend_domain = self._vm + else: + assert device_assignment.frontend_domain == self._vm, \ + "Trying to detach DeviceAssignment belonging to other domain" + + self._vm.qubesd_call(None, + 'mgmt.vm.device.{}.Detach'.format(self._class), + '{!s}+{!s}'.format(device_assignment.backend_domain, + device_assignment.ident)) + + def assignments(self, persistent=None): + '''List assignments for devices which are (or may be) attached to the + vm. + + Devices may be attached persistently (so they are included in + :file:`qubes.xml`) or not. Device can also be in :file:`qubes.xml`, + but be temporarily detached. + + :param bool persistent: only include devices which are or are not + attached persistently. + ''' + + assignments_str = self._vm.qubesd_call(None, + 'mgmt.vm.device.{}.List'.format(self._class)).decode() + for assignment_str in assignments_str.splitlines(): + device, _, options_all = assignment_str.partition(' ') + backend_domain, ident = device.split('+', 1) + options = dict(opt_single.split('=', 1) + for opt_single in options_all.split(' ') if opt_single) + dev_persistent = (options.pop('persistent', False) in + ['True', 'yes', True]) + if persistent is not None and dev_persistent != persistent: + continue + backend_domain = self._vm.app.domains[backend_domain] + yield DeviceAssignment(backend_domain, ident, options, + persistent=dev_persistent, frontend_domain=self._vm) + + def attached(self): + '''List devices which are (or may be) attached to this vm ''' + + for assignment in self.assignments(): + yield self._device(assignment) + + def persistent(self): + ''' Devices persistently attached and safe to access before libvirt + bootstrap. + ''' + + for assignment in self.assignments(True): + yield self._device(assignment) + + def _device(self, assignment): + ''' Helper method for geting a `qubes.devices.DeviceInfo` object from + `qubes.devices.DeviceAssignment`. ''' + + return assignment.backend_domain.devices[self._class][assignment.ident] + + def available(self): + '''List devices exposed by this vm''' + devices_str = self._vm.qubesd_call(None, + 'mgmt.vm.device.{}.Available'.format(self._class)).decode() + for dev_str in devices_str.splitlines(): + ident, _, info = dev_str.partition(' ') + # description is special that it can contain spaces + info, _, description = info.partition('description=') + info_dict = dict(info_single.split('=', 1) + for info_single in info.split(' ') if info_single) + yield DeviceInfo(self._vm, ident, description=description, + options=None, **info_dict) + + __iter__ = available + + def clear_cache(self): + '''Clear cache of available devices''' + self._dev_cache.clear() + + def __getitem__(self, item): + '''Get device object with given ident. + + :returns: py:class:`DeviceInfo` + + If domain isn't running, it is impossible to check device validity, + so return UnknownDevice object. Also do the same for non-existing + devices - otherwise it will be impossible to detach already + disconnected device. + ''' + # fist, check if we have cached device info + if item in self._dev_cache: + return self._dev_cache[item] + # then look for available devices + for dev in self.available(): + if dev.ident == item: + self._dev_cache[item] = dev + return dev + # if still nothing, return UnknownDevice instance for the reason + # explained in docstring, but don't cache it + return UnknownDevice(self._vm, item) + + + +class DeviceManager(dict): + '''Device manager that hold all devices by their classess. + + :param vm: VM for which we manage devices + ''' + + def __init__(self, vm): + super(DeviceManager, self).__init__() + self._vm = vm + + def __missing__(self, key): + self[key] = DeviceCollection(self._vm, key) + return self[key] diff --git a/qubesmgmt/tests/devices.py b/qubesmgmt/tests/devices.py new file mode 100644 index 0000000..8c4313f --- /dev/null +++ b/qubesmgmt/tests/devices.py @@ -0,0 +1,280 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, see . + +import qubesmgmt.tests +import qubesmgmt.devices + + +class TC_00_DeviceCollection(qubesmgmt.tests.QubesTestCase): + def setUp(self): + super(TC_00_DeviceCollection, self).setUp() + self.app.expected_calls[('dom0', 'mgmt.vm.List', None, None)] = \ + b'0\0test-vm class=AppVM state=Running\n' \ + b'test-vm2 class=AppVM state=Running\n' \ + b'test-vm3 class=AppVM state=Running\n' + self.vm = self.app.domains['test-vm'] + + def test_000_available(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev1\n' + devices = list(self.vm.devices['test'].available()) + self.assertEqual(len(devices), 1) + dev = devices[0] + self.assertIsInstance(dev, qubesmgmt.devices.DeviceInfo) + self.assertEqual(dev.backend_domain, self.vm) + self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.description, '') + self.assertEqual(dev.options, {}) + self.assertEqual(dev.data, {}) + self.assertEqual(str(dev), 'test-vm:dev1') + self.assertAllCalled() + + def test_001_available_desc(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev1 description=This is description\n' + devices = list(self.vm.devices['test'].available()) + self.assertEqual(len(devices), 1) + dev = devices[0] + self.assertIsInstance(dev, qubesmgmt.devices.DeviceInfo) + self.assertEqual(dev.backend_domain, self.vm) + self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.description, 'This is description') + self.assertEqual(dev.options, {}) + self.assertEqual(dev.data, {}) + self.assertEqual(str(dev), 'test-vm:dev1') + + def test_002_available_options(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev1 ro=True other=123 description=This is description\n' + devices = list(self.vm.devices['test'].available()) + self.assertEqual(len(devices), 1) + dev = devices[0] + self.assertIsInstance(dev, qubesmgmt.devices.DeviceInfo) + self.assertEqual(dev.backend_domain, self.vm) + self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.description, 'This is description') + self.assertEqual(dev.options, {}) + self.assertEqual(dev.data, {'ro': 'True', 'other': '123'}) + self.assertEqual(str(dev), 'test-vm:dev1') + self.assertAllCalled() + + def test_010_getitem(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev1 description=This is description\n' + dev = self.vm.devices['test']['dev1'] + self.assertIsInstance(dev, qubesmgmt.devices.DeviceInfo) + self.assertEqual(dev.backend_domain, self.vm) + self.assertEqual(dev.ident, 'dev1') + self.assertEqual(dev.description, 'This is description') + self.assertEqual(dev.options, {}) + self.assertEqual(dev.data, {}) + self.assertEqual(str(dev), 'test-vm:dev1') + self.assertAllCalled() + + def test_011_getitem_missing(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev1 description=This is description\n' + dev = self.vm.devices['test']['dev2'] + self.assertIsInstance(dev, qubesmgmt.devices.UnknownDevice) + self.assertEqual(dev.backend_domain, self.vm) + self.assertEqual(dev.ident, 'dev2') + self.assertEqual(dev.description, 'Unknown device') + self.assertEqual(dev.options, {}) + self.assertEqual(dev.data, {}) + self.assertEqual(str(dev), 'test-vm:dev2') + self.assertAllCalled() + + def test_020_attach(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Attach', 'test-vm2+dev1', b'')] = \ + b'0\0' + assign = qubesmgmt.devices.DeviceAssignment( + self.app.domains['test-vm2'], 'dev1') + self.vm.devices['test'].attach(assign) + self.assertAllCalled() + + def test_021_attach_options(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Attach', 'test-vm2+dev1', + b'ro=True something=value')] = b'0\0' + assign = qubesmgmt.devices.DeviceAssignment( + self.app.domains['test-vm2'], 'dev1') + assign.options['ro'] = True + assign.options['something'] = 'value' + self.vm.devices['test'].attach(assign) + self.assertAllCalled() + + def test_022_attach_persistent(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Attach', 'test-vm2+dev1', + b'persistent=yes')] = b'0\0' + assign = qubesmgmt.devices.DeviceAssignment( + self.app.domains['test-vm2'], 'dev1') + assign.persistent = True + self.vm.devices['test'].attach(assign) + self.assertAllCalled() + + def test_023_attach_persistent_options(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Attach', 'test-vm2+dev1', + b'persistent=yes ro=True')] = b'0\0' + assign = qubesmgmt.devices.DeviceAssignment( + self.app.domains['test-vm2'], 'dev1') + assign.persistent = True + assign.options['ro'] = True + self.vm.devices['test'].attach(assign) + self.assertAllCalled() + + def test_030_detach(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.Detach', 'test-vm2+dev1', + None)] = b'0\0' + assign = qubesmgmt.devices.DeviceAssignment( + self.app.domains['test-vm2'], 'dev1') + self.vm.devices['test'].detach(assign) + self.assertAllCalled() + + def test_040_assignments(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.List', None, None)] = \ + b'0\0test-vm2+dev1\n' \ + b'test-vm3+dev2\n' + assigns = list(self.vm.devices['test'].assignments()) + self.assertEqual(len(assigns), 2) + self.assertIsInstance(assigns[0], qubesmgmt.devices.DeviceAssignment) + self.assertEqual(assigns[0].backend_domain, + self.app.domains['test-vm2']) + self.assertEqual(assigns[0].ident, 'dev1') + self.assertEqual(assigns[0].frontend_domain, + self.app.domains['test-vm']) + self.assertEqual(assigns[0].options, {}) + + self.assertIsInstance(assigns[1], qubesmgmt.devices.DeviceAssignment) + self.assertEqual(assigns[1].backend_domain, + self.app.domains['test-vm3']) + self.assertEqual(assigns[1].ident, 'dev2') + self.assertEqual(assigns[1].frontend_domain, + self.app.domains['test-vm']) + self.assertEqual(assigns[1].options, {}) + + self.assertAllCalled() + + def test_041_assignments_options(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.List', None, None)] = \ + b'0\0test-vm2+dev1 ro=True\n' \ + b'test-vm3+dev2 ro=False persistent=True\n' + assigns = list(self.vm.devices['test'].assignments()) + self.assertEqual(len(assigns), 2) + self.assertIsInstance(assigns[0], qubesmgmt.devices.DeviceAssignment) + self.assertEqual(assigns[0].backend_domain, + self.app.domains['test-vm2']) + self.assertEqual(assigns[0].ident, 'dev1') + self.assertEqual(assigns[0].frontend_domain, + self.app.domains['test-vm']) + self.assertEqual(assigns[0].options, {'ro': 'True'}) + self.assertEqual(assigns[0].persistent, False) + + self.assertIsInstance(assigns[1], qubesmgmt.devices.DeviceAssignment) + self.assertEqual(assigns[1].backend_domain, + self.app.domains['test-vm3']) + self.assertEqual(assigns[1].ident, 'dev2') + self.assertEqual(assigns[1].frontend_domain, + self.app.domains['test-vm']) + self.assertEqual(assigns[1].options, {'ro': 'False'}) + self.assertEqual(assigns[1].persistent, True) + + self.assertAllCalled() + + def test_041_assignments_persistent(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.List', None, None)] = \ + b'0\0test-vm2+dev1\n' \ + b'test-vm3+dev2 persistent=True\n' + assigns = list(self.vm.devices['test'].assignments(True)) + self.assertEqual(len(assigns), 1) + self.assertIsInstance(assigns[0], qubesmgmt.devices.DeviceAssignment) + self.assertEqual(assigns[0].backend_domain, + self.app.domains['test-vm3']) + self.assertEqual(assigns[0].ident, 'dev2') + self.assertEqual(assigns[0].frontend_domain, + self.app.domains['test-vm']) + self.assertEqual(assigns[0].options, {}) + self.assertEqual(assigns[0].persistent, True) + self.assertAllCalled() + + def test_042_assignments_non_persistent(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.List', None, None)] = \ + b'0\0test-vm2+dev1\n' \ + b'test-vm3+dev2 persistent=True\n' + assigns = list(self.vm.devices['test'].assignments(False)) + self.assertEqual(len(assigns), 1) + self.assertIsInstance(assigns[0], qubesmgmt.devices.DeviceAssignment) + self.assertEqual(assigns[0].backend_domain, + self.app.domains['test-vm2']) + self.assertEqual(assigns[0].ident, 'dev1') + self.assertEqual(assigns[0].frontend_domain, + self.app.domains['test-vm']) + self.assertEqual(assigns[0].options, {}) + self.assertEqual(assigns[0].persistent, False) + self.assertAllCalled() + + def test_050_persistent(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.List', None, None)] = \ + b'0\0test-vm2+dev1\n' \ + b'test-vm3+dev2 persistent=True\n' + self.app.expected_calls[ + ('test-vm3', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev2\n' + devs = list(self.vm.devices['test'].persistent()) + self.assertEqual(len(devs), 1) + self.assertIsInstance(devs[0], qubesmgmt.devices.DeviceInfo) + self.assertEqual(devs[0].backend_domain, self.app.domains['test-vm3']) + self.assertEqual(devs[0].ident, 'dev2') + self.assertAllCalled() + + def test_060_attached(self): + self.app.expected_calls[ + ('test-vm', 'mgmt.vm.device.test.List', None, None)] = \ + b'0\0test-vm2+dev1\n' \ + b'test-vm3+dev2 persistent=True\n' + self.app.expected_calls[ + ('test-vm2', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev1\n' + self.app.expected_calls[ + ('test-vm3', 'mgmt.vm.device.test.Available', None, None)] = \ + b'0\0dev2\n' + devs = list(self.vm.devices['test'].attached()) + self.assertEqual(len(devs), 2) + self.assertIsInstance(devs[0], qubesmgmt.devices.DeviceInfo) + self.assertEqual(devs[0].backend_domain, self.app.domains['test-vm2']) + self.assertEqual(devs[0].ident, 'dev1') + self.assertIsInstance(devs[1], qubesmgmt.devices.DeviceInfo) + self.assertEqual(devs[1].backend_domain, self.app.domains['test-vm3']) + self.assertEqual(devs[1].ident, 'dev2') + self.assertAllCalled() + diff --git a/qubesmgmt/vm/__init__.py b/qubesmgmt/vm/__init__.py index 65def6f..4f4ee63 100644 --- a/qubesmgmt/vm/__init__.py +++ b/qubesmgmt/vm/__init__.py @@ -25,6 +25,7 @@ import qubesmgmt.base import qubesmgmt.exc import qubesmgmt.storage import qubesmgmt.features +import qubesmgmt.devices class QubesVM(qubesmgmt.base.PropertyHolder): @@ -34,11 +35,14 @@ class QubesVM(qubesmgmt.base.PropertyHolder): features = None + devices = None + def __init__(self, app, name): super(QubesVM, self).__init__(app, 'mgmt.vm.property.', name) self._volumes = None self.log = logging.getLogger(name) self.features = qubesmgmt.features.Features(self) + self.devices = qubesmgmt.devices.DeviceManager(self) @property def name(self):