diff --git a/qubesmgmt/tests/tools/qvm_device.py b/qubesmgmt/tests/tools/qvm_device.py new file mode 100644 index 0000000..d12139c --- /dev/null +++ b/qubesmgmt/tests/tools/qvm_device.py @@ -0,0 +1,201 @@ +# pylint: disable=protected-access + +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +''' Tests for the `qvm-device` tool. ''' + +import unittest.mock as mock +import qubesmgmt.tests +import qubesmgmt.tests.tools +import qubesmgmt.devices +import qubesmgmt.tools.qvm_device + + +class TC_00_qvm_device(qubesmgmt.tests.QubesTestCase): + ''' Tests the output logic of the qvm-device tool ''' + def setUp(self): + super(TC_00_qvm_device, self).setUp() + self.app.expected_calls[('dom0', 'mgmt.vm.List', None, None)] = \ + b'0\0test-vm1 class=AppVM state=Running\n' \ + b'test-vm2 class=AppVM state=Running\n' \ + b'test-vm3 class=AppVM state=Running\n' + self.app.expected_calls[('test-vm1', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0dev1 description=Description here\n' + self.vm1 = self.app.domains['test-vm1'] + self.vm2 = self.app.domains['test-vm2'] + self.vm1_device = self.app.domains['test-vm1'].devices['test']['dev1'] + + def test_000_list_all(self): + ''' List all exposed vm devices. No devices are attached to other + domains. + ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0dev2 description=Description here2\n' + self.app.expected_calls[('test-vm3', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0' + + self.app.expected_calls[('test-vm1', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + self.app.expected_calls[('test-vm3', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + + with qubesmgmt.tests.tools.StdoutBuffer() as buf: + qubesmgmt.tools.qvm_device.main( + ['test', 'list'], app=self.app) + self.assertEqual( + [x.rstrip() for x in buf.getvalue().splitlines()], + ['test-vm1:dev1 Description here', + 'test-vm2:dev2 Description here2'] + ) + + def test_001_list_persistent_attach(self): + ''' Attach the device exposed by the `vm1` to the `vm3` persistently. + ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0dev2 description=Description here2\n' + self.app.expected_calls[('test-vm3', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0' + self.app.expected_calls[('test-vm1', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + self.app.expected_calls[('test-vm3', 'mgmt.vm.device.test.List', + None, None)] = \ + b'0\0test-vm1+dev1 persistent=yes\n' + + with qubesmgmt.tests.tools.StdoutBuffer() as buf: + qubesmgmt.tools.qvm_device.main( + ['test', 'list', 'test-vm3'], app=self.app) + self.assertEqual( + buf.getvalue(), + 'test-vm1:dev1 Description here test-vm3\n' + ) + + def test_002_list_list_temp_attach(self): + ''' Attach the device exposed by the `vm1` to the `vm3` + non-persistently. + ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0dev2 description=Description here2\n' + self.app.expected_calls[('test-vm3', 'mgmt.vm.device.test.Available', + None, None)] = \ + b'0\0' + + self.app.expected_calls[('test-vm1', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.List', + None, None)] = b'0\0' + self.app.expected_calls[('test-vm3', 'mgmt.vm.device.test.List', + None, None)] = \ + b'0\0test-vm1+dev1\n' + + with qubesmgmt.tests.tools.StdoutBuffer() as buf: + qubesmgmt.tools.qvm_device.main( + ['test', 'list', 'test-vm3'], app=self.app) + self.assertEqual( + buf.getvalue(), + 'test-vm1:dev1 Description here test-vm3\n' + ) + + def test_010_attach(self): + ''' Test attach action ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Attach', + 'test-vm1+dev1', b'')] = b'0\0' + qubesmgmt.tools.qvm_device.main( + ['test', 'attach', 'test-vm2', 'test-vm1:dev1'], app=self.app) + self.assertAllCalled() + + def test_011_attach_options(self): + ''' Test attach action ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Attach', + 'test-vm1+dev1', b'ro=True')] = b'0\0' + qubesmgmt.tools.qvm_device.main( + ['test', 'attach', '-o', 'ro=True', 'test-vm2', 'test-vm1:dev1'], + app=self.app) + self.assertAllCalled() + + def test_011_attach_persistent(self): + ''' Test attach action ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Attach', + 'test-vm1+dev1', b'persistent=yes')] = b'0\0' + qubesmgmt.tools.qvm_device.main( + ['test', 'attach', '-p', 'test-vm2', 'test-vm1:dev1'], + app=self.app) + self.assertAllCalled() + + def test_012_attach_invalid(self): + ''' Test attach action ''' + with qubesmgmt.tests.tools.StderrBuffer() as stderr: + with self.assertRaises(SystemExit): + qubesmgmt.tools.qvm_device.main( + ['test', 'attach', '-p', 'test-vm2', 'dev1'], + app=self.app) + self.assertIn('expected a backend vm & device id', + stderr.getvalue()) + self.assertAllCalled() + + def test_013_attach_invalid2(self): + ''' Test attach action ''' + with qubesmgmt.tests.tools.StderrBuffer() as stderr: + with self.assertRaises(SystemExit): + qubesmgmt.tools.qvm_device.main( + ['test', 'attach', '-p', 'test-vm2', 'test-vm1:invalid'], + app=self.app) + self.assertIn('doesn\'t expose device', + stderr.getvalue()) + self.assertAllCalled() + + def test_014_attach_invalid3(self): + ''' Test attach action ''' + with qubesmgmt.tests.tools.StderrBuffer() as stderr: + with self.assertRaises(SystemExit): + qubesmgmt.tools.qvm_device.main( + ['test', 'attach', '-p', 'test-vm2', 'no-such-vm:dev3'], + app=self.app) + self.assertIn('no backend vm', + stderr.getvalue()) + self.assertAllCalled() + + def test_020_detach(self): + ''' Test detach action ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Detach', + 'test-vm1+dev1', None)] = b'0\0' + qubesmgmt.tools.qvm_device.main( + ['test', 'detach', 'test-vm2', 'test-vm1:dev1'], app=self.app) + self.assertAllCalled() + + def test_021_detach_unknown(self): + ''' Test detach action ''' + self.app.expected_calls[('test-vm2', 'mgmt.vm.device.test.Detach', + 'test-vm1+dev7', None)] = b'0\0' + qubesmgmt.tools.qvm_device.main( + ['test', 'detach', 'test-vm2', 'test-vm1:dev7'], app=self.app) + self.assertAllCalled() + diff --git a/qubesmgmt/tools/qvm_device.py b/qubesmgmt/tools/qvm_device.py new file mode 100644 index 0000000..9ae3e93 --- /dev/null +++ b/qubesmgmt/tools/qvm_device.py @@ -0,0 +1,256 @@ +# encoding=utf-8 + +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +'''Qubes volume and block device managment''' + +import argparse +import os +import sys + +import qubesmgmt +import qubesmgmt.exc +import qubesmgmt.tools +import qubesmgmt.devices + + +def prepare_table(dev_list): + ''' Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a + list of tupples for the :py:func:`qubes.tools.print_table`. + + If :program:`qvm-devices` is running in a TTY, it will ommit duplicate + data. + + :param iterable dev_list: List of :py:class:`qubes.devices.DeviceInfo` + objects. + :returns: list of tupples + ''' + output = [] + header = [] + if sys.stdout.isatty(): + header += [('BACKEND:DEVID', 'DESCRIPTION', 'USED BY')] # NOQA + + for line in dev_list: + output += [( + line.ident, + line.description, + str(line.assignments), + )] + + return header + sorted(output) + + +class Line(object): + '''Helper class to hold single device info for listing''' + # pylint: disable=too-few-public-methods + def __init__(self, device: qubesmgmt.devices.DeviceInfo, attached_to=None): + self.ident = "{!s}:{!s}".format(device.backend_domain, device.ident) + self.description = device.description + self.attached_to = attached_to if attached_to else "" + self.frontends = [] + + @property + def assignments(self): + '''list of frontends the device is assigned to''' + return ', '.join(self.frontends) + + +def list_devices(args): + ''' Called by the parser to execute the qubes-devices list + subcommand. ''' + app = args.app + + devices = set() + if hasattr(args, 'domains') and args.domains: + for domain in args.domains: + for dev in domain.devices[args.devclass].attached(): + devices.add(dev) + for dev in domain.devices[args.devclass].available(): + devices.add(dev) + + else: + for domain in app.domains: + for dev in domain.devices[args.devclass].available(): + devices.add(dev) + + result = {dev: Line(dev) for dev in devices} + + for dev in result: + for domain in app.domains: + if domain == dev.backend_domain: + continue + + if dev in domain.devices[args.devclass].assignments(): + result[dev].frontends.append(str(domain)) + + + qubesmgmt.tools.print_table(prepare_table(result.values())) + + +def attach_device(args): + ''' Called by the parser to execute the :program:`qvm-devices attach` + subcommand. + ''' + device_assignment = args.device_assignment + vm = args.domains[0] + options = dict(opt.split('=', 1) for opt in args.option or []) + device_assignment.persistent = args.persistent + device_assignment.options = options + vm.devices[args.devclass].attach(device_assignment) + + +def detach_device(args): + ''' Called by the parser to execute the :program:`qvm-devices detach` + subcommand. + ''' + device_assignment = args.device_assignment + vm = args.domains[0] + vm.devices[args.devclass].detach(device_assignment) + + +def init_list_parser(sub_parsers): + ''' Configures the parser for the :program:`qvm-devices list` subcommand ''' + # pylint: disable=protected-access + list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'), + help='list devices') + + vm_name_group = qubesmgmt.tools.VmNameGroup( + list_parser, required=False, vm_action=qubesmgmt.tools.VmNameAction, + help='list devices assigned to specific domain(s)') + list_parser._mutually_exclusive_groups.append(vm_name_group) + list_parser.set_defaults(func=list_devices) + + +class DeviceAction(qubesmgmt.tools.QubesAction): + ''' Action for argument parser that gets the + :py:class:``qubesmgmt.device.DeviceAssignment`` from a + BACKEND:DEVICE_ID string. + ''' # pylint: disable=too-few-public-methods + + def __init__(self, help='A backend & device id combination', + required=True, allow_unknown=False, **kwargs): + # pylint: disable=redefined-builtin + self.allow_unknown = allow_unknown + super(DeviceAction, self).__init__(help=help, required=required, + **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + ''' Set ``namespace.device_assignment`` to ``values`` ''' + setattr(namespace, self.dest, values) + + def parse_qubes_app(self, parser, namespace): + app = namespace.app + backend_device_id = getattr(namespace, self.dest) + devclass = namespace.devclass + + try: + vmname, device_id = backend_device_id.split(':', 1) + try: + vm = app.domains[vmname] + except KeyError: + parser.error_runtime("no backend vm {!r}".format(vmname)) + + try: + dev = vm.devices[devclass][device_id] + if not self.allow_unknown and isinstance(dev, + qubesmgmt.devices.UnknownDevice): + raise KeyError(device_id) + except KeyError: + parser.error_runtime( + "backend vm {!r} doesn't expose device {!r}" + .format(vmname, device_id)) + device_assignment = qubesmgmt.devices.DeviceAssignment(vm, + device_id) + setattr(namespace, self.dest, device_assignment) + except ValueError: + parser.error('expected a backend vm & device id combination ' \ + 'like foo:bar got %s' % backend_device_id) + + +def get_parser(device_class=None): + '''Create :py:class:`argparse.ArgumentParser` suitable for + :program:`qvm-block`. + ''' + parser = qubesmgmt.tools.QubesArgumentParser(description=__doc__, + want_app=True) + parser.register('action', 'parsers', + qubesmgmt.tools.AliasedSubParsersAction) + if device_class: + parser.add_argument('devclass', const=device_class, + action='store_const', + help=argparse.SUPPRESS) + else: + parser.add_argument('devclass', metavar='DEVICE_CLASS', action='store', + help="Device class to manage ('pci', 'usb', etc)") + sub_parsers = parser.add_subparsers( + title='commands', + description="For more information see qvm-device command -h", + dest='command') + init_list_parser(sub_parsers) + attach_parser = sub_parsers.add_parser( + 'attach', help="Attach device to domain", aliases=('at', 'a')) + detach_parser = sub_parsers.add_parser( + "detach", help="Detach device from domain", aliases=('d', 'dt')) + + attach_parser.add_argument('VMNAME', nargs=1, + action=qubesmgmt.tools.VmNameAction) + detach_parser.add_argument('VMNAME', nargs=1, + action=qubesmgmt.tools.VmNameAction) + + attach_parser.add_argument(metavar='BACKEND:DEVICE_ID', + dest='device_assignment', + action=DeviceAction) + detach_parser.add_argument(metavar='BACKEND:DEVICE_ID', + dest='device_assignment', + action=DeviceAction, allow_unknown=True) + + attach_parser.add_argument('--option', '-o', action='append', + help="Set option for the device in opt=value form (can be specified " + "multiple times)") + attach_parser.add_argument('--persistent', '-p', action='store_true', + default=False, + help="Attach device persistently (so it will be automatically " + "attached at qube startup)") + + attach_parser.set_defaults(func=attach_device) + detach_parser.set_defaults(func=detach_device) + + return parser + + +def main(args=None, app=None): + '''Main routine of :program:`qvm-block`.''' + basename = os.path.basename(sys.argv[0]) + devclass = None + if basename.startswith('qvm-') and basename != 'qvm-device': + devclass = basename[4:] + args = get_parser(devclass).parse_args(args, app=app) + try: + args.func(args) + except qubesmgmt.exc.QubesException as e: + print(str(e), file=sys.stderr) + return 1 + return 0 + + +if __name__ == '__main__': + sys.exit(main())