qubes/tools: add qvm-device tool (and tests)

Add a tool to manipulate various devices.

QubesOS/qubes-issues#2257
This commit is contained in:
Marek Marczykowski-Górecki 2016-09-03 02:24:07 +02:00
parent 6aae6863b0
commit e8d011b83f
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 352 additions and 0 deletions

View File

@ -975,6 +975,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
'qubes.tests.vm.mix.net', 'qubes.tests.vm.mix.net',
'qubes.tests.vm.adminvm', 'qubes.tests.vm.adminvm',
'qubes.tests.app', 'qubes.tests.app',
'qubes.tests.tools.qvm_device',
'qubes.tests.tools.qvm_ls', 'qubes.tests.tools.qvm_ls',
): ):
tests.addTests(loader.loadTestsFromName(modname)) tests.addTests(loader.loadTestsFromName(modname))

View File

@ -0,0 +1,150 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
# pylint: disable=protected-access,pointless-statement
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import qubes
import qubes.devices
import qubes.tools.qvm_device
import qubes.tests
import qubes.tests.devices
import qubes.tests.tools
class TestNamespace(object):
def __init__(self, app, domains=None, device=None):
super(TestNamespace, self).__init__()
self.app = app
self.devclass = 'testclass'
if domains:
self.domains = domains
if device:
self.device = device
class TC_00_Actions(qubes.tests.QubesTestCase):
def setUp(self):
super(TC_00_Actions, self).setUp()
self.app = qubes.tests.devices.TestApp()
self.vm1 = qubes.tests.devices.TestVM(self.app, 'vm1')
self.vm2 = qubes.tests.devices.TestVM(self.app, 'vm2')
self.device = self.vm2.device
def test_000_list_all(self):
args = TestNamespace(self.app)
with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list:testclass')
self.assertEventFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm1:testdev Description \n'
'vm2:testdev Description \n'
)
def test_001_list_one(self):
args = TestNamespace(self.app, [self.vm1])
# simulate attach
self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(self.device)
with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm1,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm2:testdev Description vm1\n'
)
def test_002_list_one_non_persistent(self):
args = TestNamespace(self.app, [self.vm1])
# simulate attach
self.vm2.device.frontend_domain = self.vm1
with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm1,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm2:testdev Description vm1\n'
)
def test_010_attach(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
qubes.tools.qvm_device.attach_device(args)
self.assertEventFired(self.vm1,
'device-attach:testclass', [self.device])
self.assertEventNotFired(self.vm2,
'device-attach:testclass', [self.device])
def test_011_double_attach(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
qubes.tools.qvm_device.attach_device(args)
with self.assertRaises(qubes.exc.QubesException):
qubes.tools.qvm_device.attach_device(args)
def test_020_detach(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
# simulate attach
self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(self.device)
qubes.tools.qvm_device.detach_device(args)
def test_021_detach_not_attached(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
with self.assertRaises(qubes.exc.QubesException):
qubes.tools.qvm_device.detach_device(args)

199
qubes/tools/qvm_device.py Normal file
View File

@ -0,0 +1,199 @@
#!/usr/bin/python2
# coding=utf-8
# pylint: disable=C,R
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''Qubes volume and block device managment'''
from __future__ import print_function
import argparse
import os
import sys
import qubes
import qubes.devices
import qubes.exc
import qubes.tools
def prepare_table(dev_list):
''' Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a
list of tupples for the :py:func:`qubes.tools.print_table`.
If :program:`qvm-devices` is running in a TTY, it will ommit duplicate
data.
:param list dev_list: List of :py:class:`qubes.devices.DeviceInfo`
objects.
:returns: list of tupples
'''
output = []
header = []
if sys.stdout.isatty():
header += [('BACKEND:DEVID', 'DESCRIPTION', 'USED BY')] # NOQA
for dev in dev_list:
output += [(
"{!s}:{!s}".format(dev.backend_domain, dev.ident),
dev.description,
str(dev.frontend_domain) if dev.frontend_domain else "",
)]
return header + sorted(output)
def list_devices(args):
''' Called by the parser to execute the qubes-devices list
subcommand. '''
app = args.app
result = []
if hasattr(args, 'domains') and args.domains:
for domain in args.domains:
result.extend(domain.devices[args.devclass].attached())
else:
for backend in app.domains:
result.extend(backend.devices[args.devclass])
qubes.tools.print_table(prepare_table(result))
def attach_device(args):
''' Called by the parser to execute the :program:`qvm-devices attach`
subcommand.
'''
device = args.device
vm = args.domains[0]
vm.devices[args.devclass].attach(device)
def detach_device(args):
''' Called by the parser to execute the :program:`qvm-devices detach`
subcommand.
'''
device = args.device
vm = args.domains[0]
vm.devices[args.devclass].detach(device)
def init_list_parser(sub_parsers):
''' Configures the parser for the :program:`qvm-devices list` subcommand '''
# pylint: disable=protected-access
list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'),
help='list devices')
vm_name_group = qubes.tools.VmNameGroup(
list_parser, required=False, vm_action=qubes.tools.VmNameAction,
help='list devices assigned to specific domain(s)')
list_parser._mutually_exclusive_groups.append(vm_name_group)
list_parser.set_defaults(func=list_devices)
class DeviceAction(qubes.tools.QubesAction):
''' Action for argument parser that gets the
:py:class:``qubes.storage.Volume`` from a POOL_NAME:VOLUME_ID string.
'''
# pylint: disable=too-few-public-methods
def __init__(self, help='A domain & device id combination',
required=True, allow_unknown=False, **kwargs):
# pylint: disable=redefined-builtin
super(DeviceAction, self).__init__(help=help, required=required,
**kwargs)
self.allow_unknown = allow_unknown
def __call__(self, parser, namespace, values, option_string=None):
''' Set ``namespace.device`` to ``values`` '''
setattr(namespace, self.dest, values)
def parse_qubes_app(self, parser, namespace):
''' Acquire the :py:class:``qubes.devices.DeviceInfo`` object from
``namespace.app``.
'''
assert hasattr(namespace, 'app')
assert hasattr(namespace, 'devclass')
app = namespace.app
devclass = namespace.devclass
try:
backend_name, devid = getattr(namespace, self.dest).split(':', 1)
try:
backend = app.domains[backend_name]
dev = backend.devices[devclass][devid]
if not self.allow_unknown and isinstance(dev,
qubes.devices.UnknownDevice):
parser.error_runtime('no device {!r} in qube {!r}'.format(
backend_name, devid))
except KeyError:
parser.error_runtime('no domain {!r}'.format(backend_name))
except ValueError:
parser.error('expected a domain & device id combination like '
'foo:bar')
def get_parser(device_class=None):
'''Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-block`.
'''
parser = qubes.tools.QubesArgumentParser(description=__doc__, want_app=True)
parser.register('action', 'parsers', qubes.tools.AliasedSubParsersAction)
if device_class:
parser.add_argument('devclass', const=device_class,
action='store_const',
help=argparse.SUPPRESS)
else:
parser.add_argument('devclass', metavar='DEVICE_CLASS', action='store',
help="Device class to manage ('pci', 'usb', etc)")
sub_parsers = parser.add_subparsers(
title='commands',
description="For more information see qvm-device command -h",
dest='command')
init_list_parser(sub_parsers)
attach_parser = sub_parsers.add_parser(
'attach', help="Attach device to domain", aliases=('at', 'a'))
attach_parser.add_argument('VMNAME', action=qubes.tools.RunningVmNameAction)
attach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
attach_parser.set_defaults(func=detach_device)
detach_parser = sub_parsers.add_parser(
"detach", help="Detach device from domain", aliases=('d', 'dt'))
detach_parser.add_argument('VMNAME', action=qubes.tools.RunningVmNameAction)
detach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
detach_parser.set_defaults(func=detach_device)
return parser
def main(args=None):
'''Main routine of :program:`qvm-block`.'''
args = get_parser().parse_args(args)
try:
args.func(args)
except qubes.exc.QubesException as e:
print(e.message, file=sys.stderr)
return 1
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -250,6 +250,7 @@ fi
%{python_sitelib}/qubes/tools/qvm_block.py* %{python_sitelib}/qubes/tools/qvm_block.py*
%{python_sitelib}/qubes/tools/qubes_lvm.py* %{python_sitelib}/qubes/tools/qubes_lvm.py*
%{python_sitelib}/qubes/tools/qvm_create.py* %{python_sitelib}/qubes/tools/qvm_create.py*
%{python_sitelib}/qubes/tools/qvm_device.py*
%{python_sitelib}/qubes/tools/qvm_features.py* %{python_sitelib}/qubes/tools/qvm_features.py*
%{python_sitelib}/qubes/tools/qvm_check.py* %{python_sitelib}/qubes/tools/qvm_check.py*
%{python_sitelib}/qubes/tools/qvm_clone.py* %{python_sitelib}/qubes/tools/qvm_clone.py*
@ -298,6 +299,7 @@ fi
%dir %{python_sitelib}/qubes/tests/tools %dir %{python_sitelib}/qubes/tests/tools
%{python_sitelib}/qubes/tests/tools/__init__.py* %{python_sitelib}/qubes/tests/tools/__init__.py*
%{python_sitelib}/qubes/tests/tools/init.py* %{python_sitelib}/qubes/tests/tools/init.py*
%{python_sitelib}/qubes/tests/tools/qvm_device.py*
%{python_sitelib}/qubes/tests/tools/qvm_ls.py* %{python_sitelib}/qubes/tests/tools/qvm_ls.py*
%dir %{python_sitelib}/qubes/tests/int %dir %{python_sitelib}/qubes/tests/int