Merge remote-tracking branch 'kalkin/device-assignments' into core3-devel

* kalkin/device-assignments: (21 commits)
  PCI extension cache PCIDevice objects
  Make pylint ♥
  Fix pylint warning no-else-return
  Fix pylint warning len-as-conditional
  device-list-attached event returns a dev/options tupples list
  DeviceAssignment options are now a dict
  Remove WrongAssignment exception
  Rename qubes.devices.BlockDevice to qubes.storage.BlockDevice
  Update relaxng devices option element
  Fix tests broken by the new assignment api
  Fix qubes.tests.devices
  Fix pci device integration tests
  qvm-device add support for assignments
  Update ext/pci to new api
  BaseVM add DeviceAssignment xml serialization
  Migrate DeviceCollection to new API
  Add PersistentCollection helper to qubes.devices
  Add DeviceAssignment
  qvm-device validates device parameters
  qvm-device fix handling of non block devices
  ...
This commit is contained in:
Marek Marczykowski-Górecki 2017-04-21 17:09:33 +02:00
commit 50b812190b
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
23 changed files with 509 additions and 269 deletions

View File

@ -24,6 +24,7 @@ import xml.parsers.expat
import lxml.etree import lxml.etree
import qubes import qubes
import qubes.devices
import qubes.vm.appvm import qubes.vm.appvm
import qubes.vm.standalonevm import qubes.vm.standalonevm
import qubes.vm.templatevm import qubes.vm.templatevm
@ -219,8 +220,10 @@ class Core2Qubes(qubes.Qubes):
pcidevs = ast.literal_eval(pcidevs) pcidevs = ast.literal_eval(pcidevs)
for pcidev in pcidevs: for pcidev in pcidevs:
try: try:
vm.devices["pci"].attach( dev = self.domains[0].devices['pci'][pcidev]
self.domains[0].devices['pci'][pcidev]) assignment = qubes.devices.DeviceAssignment(
backend_domain=dev.backend_domain, ident=dev.ident)
vm.devices["pci"].attach(assignment)
except qubes.exc.QubesException as e: except qubes.exc.QubesException as e:
self.log.error("VM {}: {}".format(vm.name, str(e))) self.log.error("VM {}: {}".format(vm.name, str(e)))
except (ValueError, LookupError) as err: except (ValueError, LookupError) as err:

View File

@ -47,7 +47,6 @@ Such extension should provide:
import qubes.utils import qubes.utils
class DeviceNotAttached(qubes.exc.QubesException, KeyError): class DeviceNotAttached(qubes.exc.QubesException, KeyError):
'''Trying to detach not attached device''' '''Trying to detach not attached device'''
pass pass
@ -57,6 +56,30 @@ class DeviceAlreadyAttached(qubes.exc.QubesException, KeyError):
pass pass
class DeviceAssignment(object): # pylint: disable=too-few-public-methods
''' Maps a device to a frontend_domain. '''
def __init__(self, backend_domain, ident, options=None, persistent=False,
frontend_domain=None):
self.backend_domain = backend_domain
self.ident = ident
self.options = options or {}
self.persistent = persistent
self.frontend_domain = frontend_domain
def __repr__(self):
return "[%s]:%s" % (self.backend_domain, self.ident)
def __hash__(self):
return hash((self.backend_domain, self.ident))
def __eq__(self, other):
if not isinstance(self, other.__class__):
raise NotImplementedError
return self.backend_domain == other.backend_domain \
and self.ident == other.ident
class DeviceCollection(object): class DeviceCollection(object):
'''Bag for devices. '''Bag for devices.
@ -115,87 +138,117 @@ class DeviceCollection(object):
def __init__(self, vm, class_): def __init__(self, vm, class_):
self._vm = vm self._vm = vm
self._class = class_ self._class = class_
self._set = set() self._set = PersistentCollection()
self.devclass = qubes.utils.get_entry_point_one( self.devclass = qubes.utils.get_entry_point_one(
'qubes.devices', self._class) 'qubes.devices', self._class)
def attach(self, device, persistent=True): def attach(self, device_assignment: DeviceAssignment):
'''Attach (add) device to domain. '''Attach (add) device to domain.
:param DeviceInfo device: device object :param DeviceInfo device: device object
''' '''
if device in self.attached(): if not device_assignment.frontend_domain:
device_assignment.frontend_domain = self._vm
else:
assert device_assignment.frontend_domain == self._vm, \
"Trying to attach DeviceAssignment belonging to other domain"
if not device_assignment.persistent and self._vm.is_halted():
raise qubes.exc.QubesVMNotRunningError(self._vm,
"Devices can only be attached non-persistent to a running vm")
device = self._device(device_assignment)
if device in self.assignments():
raise DeviceAlreadyAttached( raise DeviceAlreadyAttached(
'device {!r} of class {} already attached to {!r}'.format( 'device {!s} of class {} already attached to {!s}'.format(
device, self._class, self._vm)) device, self._class, self._vm))
self._vm.fire_event_pre('device-pre-attach:'+self._class, device=device) self._vm.fire_event_pre('device-pre-attach:'+self._class, device=device)
if persistent: if device_assignment.persistent:
self._set.add(device) self._set.add(device_assignment)
self._vm.fire_event('device-attach:' + self._class, device=device) self._vm.fire_event('device-attach:' + self._class, device=device)
def detach(self, device_assignment: DeviceAssignment):
def detach(self, device, persistent=True):
'''Detach (remove) device from domain. '''Detach (remove) device from domain.
:param DeviceInfo device: device object :param DeviceInfo device: device object
''' '''
if device not in self.attached(): if not device_assignment.frontend_domain:
device_assignment.frontend_domain = self._vm
if device_assignment in self._set and not self._vm.is_halted():
raise qubes.exc.QubesVMNotHaltedError(self._vm,
"Can not remove a persistent attachment from a non halted vm")
if device_assignment not in self.assignments():
raise DeviceNotAttached( raise DeviceNotAttached(
'device {!s} of class {} not attached to {!s}'.format( 'device {!s} of class {} not attached to {!s}'.format(
device, self._class, self._vm)) device_assignment.ident, self._class, self._vm))
device = self._device(device_assignment)
self._vm.fire_event_pre('device-pre-detach:'+self._class, device=device) self._vm.fire_event_pre('device-pre-detach:'+self._class, device=device)
if persistent: if device in self._set:
self._set.remove(device) device_assignment.persistent = True
self._set.discard(device_assignment)
self._vm.fire_event('device-detach:' + self._class, device=device) self._vm.fire_event('device-detach:' + self._class, device=device)
def attached(self, persistent=None): def attached(self):
'''List devices which are (or may be) attached to this vm '''List devices which are (or may be) attached to this vm '''
attached = self._vm.fire_event('device-list-attached:' + self._class)
if attached:
return [dev for dev, _ in attached]
return []
def persistent(self):
''' Devices persistently attached and safe to access before libvirt
bootstrap.
'''
return [self._device(a) for a in self._set]
def assignments(self, persistent=None):
'''List assignments for devices which are (or may be) attached to the
vm.
Devices may be attached persistently (so they are included in Devices may be attached persistently (so they are included in
:file:`qubes.xml`) or not. Device can also be in :file:`qubes.xml`, :file:`qubes.xml`) or not. Device can also be in :file:`qubes.xml`,
but be temporarily detached. but be temporarily detached.
:param bool persistent: only include devices which are (or are not) \ :param bool persistent: only include devices which are or are not
attached persistently - None means both attached persistently.
''' '''
seen = self._set.copy()
# ask for really attached devices only when requested not only devices = self._vm.fire_event('device-list-attached:' + self._class,
# persistent ones persistent=persistent)
if persistent is not True: result = []
attached = self._vm.fire_event( for dev, options in devices:
'device-list-attached:' + self._class, if dev in self._set and persistent is False:
persistent=persistent) continue
for device in attached: elif dev in self._set:
device_persistent = device in self._set result.append(self._set.get(dev))
if persistent is not None and device_persistent != persistent: elif dev not in self._set and persistent is True:
continue continue
assert device.frontend_domain == self._vm, \ else:
'{!r} != {!r}'.format(device.frontend_domain, self._vm) result.append(
DeviceAssignment(backend_domain=dev.backend_domain,
yield device ident=dev.ident, options=options,
frontend_domain=self._vm))
try: if persistent is not False and not result:
seen.remove(device) result.extend(self._set)
except KeyError: return result
pass
if persistent is False:
return
for device in seen:
# get fresh object - may contain updated information
device = device.backend_domain.devices[self._class][device.ident]
yield device
def available(self): def available(self):
'''List devices exposed by this vm''' '''List devices exposed by this vm'''
devices = self._vm.fire_event('device-list:' + self._class) devices = self._vm.fire_event('device-list:' + self._class)
return devices return devices
def _device(self, assignment: DeviceAssignment):
''' Helper method for geting a `qubes.devices.DeviceInfo` object from
`qubes.devices.DeviceAssignment`. '''
return assignment.backend_domain.devices[self._class][assignment.ident]
def __iter__(self): def __iter__(self):
return iter(self.available()) return iter(self.available())
@ -216,6 +269,7 @@ class DeviceCollection(object):
if dev: if dev:
assert len(dev) == 1 assert len(dev) == 1
return dev[0] return dev[0]
return UnknownDevice(self._vm, ident) return UnknownDevice(self._vm, ident)
@ -235,9 +289,10 @@ class DeviceManager(dict):
class DeviceInfo(object): class DeviceInfo(object):
''' Holds all information about a device '''
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
def __init__(self, backend_domain, ident, description=None, def __init__(self, backend_domain, ident, description=None,
frontend_domain=None, **kwargs): frontend_domain=None, options=None, **kwargs):
#: domain providing this device #: domain providing this device
self.backend_domain = backend_domain self.backend_domain = backend_domain
#: device identifier (unique for given domain and device type) #: device identifier (unique for given domain and device type)
@ -253,6 +308,7 @@ class DeviceInfo(object):
self.frontend_domain = frontend_domain self.frontend_domain = frontend_domain
except AttributeError: except AttributeError:
pass pass
self.options = options or dict()
self.data = kwargs self.data = kwargs
if hasattr(self, 'regex'): if hasattr(self, 'regex'):
@ -289,15 +345,52 @@ class UnknownDevice(DeviceInfo):
frontend_domain, **kwargs) frontend_domain, **kwargs)
class BlockDevice(object): class PersistentCollection(object):
# pylint: disable=too-few-public-methods
def __init__(self, path, name, script=None, rw=True, domain=None, ''' Helper object managing persistent `DeviceAssignment`s.
devtype='disk'): '''
assert name, 'Missing device name'
assert path, 'Missing device path' def __init__(self):
self.path = path self._dict = {}
self.name = name
self.rw = rw def add(self, assignment: DeviceAssignment):
self.script = script ''' Add assignment to collection '''
self.domain = domain assert assignment.persistent and assignment.frontend_domain
self.devtype = devtype vm = assignment.backend_domain
ident = assignment.ident
key = (vm, ident)
assert key not in self._dict
self._dict[key] = assignment
def discard(self, assignment):
''' Discard assignment from collection '''
assert assignment.persistent and assignment.frontend_domain
vm = assignment.backend_domain
ident = assignment.ident
key = (vm, ident)
if key not in self._dict:
raise KeyError
del self._dict[key]
def __contains__(self, device) -> bool:
vm = device.backend_domain
ident = device.ident
key = (vm, ident)
return key in self._dict
def get(self, device: DeviceInfo) -> DeviceAssignment:
''' Returns the corresponding `qubes.devices.DeviceAssignment` for the
device. '''
vm = device.backend_domain
ident = device.ident
key = (vm, ident)
if key not in self._dict:
raise KeyError
return self._dict[key]
def __iter__(self):
return self._dict.values().__iter__()
def __len__(self) -> int:
return len(self._dict.keys())

View File

@ -19,6 +19,9 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# #
''' Qubes PCI Extensions '''
import functools
import os import os
import re import re
import subprocess import subprocess
@ -34,7 +37,7 @@ pci_classes = None
def load_pci_classes(): def load_pci_classes():
# List of known device classes, subclasses and programming interfaces ''' List of known device classes, subclasses and programming interfaces. '''
# Syntax: # Syntax:
# C class class_name # C class class_name
# subclass subclass_name <-- single tab # subclass subclass_name <-- single tab
@ -122,7 +125,6 @@ def _device_desc(hostdev_xml):
) )
class PCIDevice(qubes.devices.DeviceInfo): class PCIDevice(qubes.devices.DeviceInfo):
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
regex = re.compile( regex = re.compile(
@ -191,7 +193,7 @@ class PCIDeviceExtension(qubes.ext.Extension):
def on_device_get_pci(self, vm, event, ident): def on_device_get_pci(self, vm, event, ident):
# pylint: disable=unused-argument,no-self-use # pylint: disable=unused-argument,no-self-use
if not vm.app.vmm.offline_mode: if not vm.app.vmm.offline_mode:
yield PCIDevice(vm, ident) yield _cache_get(vm, ident)
@qubes.ext.handler('device-list-attached:pci') @qubes.ext.handler('device-list-attached:pci')
def on_device_list_attached(self, vm, event, **kwargs): def on_device_list_attached(self, vm, event, **kwargs):
@ -213,7 +215,7 @@ class PCIDeviceExtension(qubes.ext.Extension):
device=device, device=device,
function=function, function=function,
) )
yield PCIDevice(vm.app.domains[0], ident) yield (PCIDevice(vm.app.domains[0], ident), {})
@qubes.ext.handler('device-pre-attach:pci') @qubes.ext.handler('device-pre-attach:pci')
def on_device_pre_attached_pci(self, vm, event, device): def on_device_pre_attached_pci(self, vm, event, device):
@ -227,6 +229,7 @@ class PCIDeviceExtension(qubes.ext.Extension):
return return
try: try:
device = _cache_get(vm, device.ident)
self.bind_pci_to_pciback(vm.app, device) self.bind_pci_to_pciback(vm.app, device)
vm.libvirt_domain.attachDevice( vm.libvirt_domain.attachDevice(
vm.app.env.get_template('libvirt/devices/pci.xml').render( vm.app.env.get_template('libvirt/devices/pci.xml').render(
@ -246,9 +249,10 @@ class PCIDeviceExtension(qubes.ext.Extension):
# provision in libvirt for extracting device-side BDF; we need it for # provision in libvirt for extracting device-side BDF; we need it for
# qubes.DetachPciDevice, which unbinds driver, not to oops the kernel # qubes.DetachPciDevice, which unbinds driver, not to oops the kernel
device = _cache_get(vm, device.ident)
p = subprocess.Popen(['xl', 'pci-list', str(vm.xid)], p = subprocess.Popen(['xl', 'pci-list', str(vm.xid)],
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
result = p.communicate()[0] result = p.communicate()[0].decode()
m = re.search(r'^(\d+.\d+)\s+0000:{}$'.format(device.ident), result, m = re.search(r'^(\d+.\d+)\s+0000:{}$'.format(device.ident), result,
flags=re.MULTILINE) flags=re.MULTILINE)
if not m: if not m:
@ -270,8 +274,9 @@ class PCIDeviceExtension(qubes.ext.Extension):
@qubes.ext.handler('domain-pre-start') @qubes.ext.handler('domain-pre-start')
def on_domain_pre_start(self, vm, _event, **_kwargs): def on_domain_pre_start(self, vm, _event, **_kwargs):
# Bind pci devices to pciback driver # Bind pci devices to pciback driver
for pci in vm.devices['pci'].attached(): for assignment in vm.devices['pci'].persistent():
self.bind_pci_to_pciback(vm.app, pci) device = _cache_get(vm, assignment.ident)
self.bind_pci_to_pciback(vm.app, device)
@staticmethod @staticmethod
def bind_pci_to_pciback(app, device): def bind_pci_to_pciback(app, device):
@ -300,3 +305,8 @@ class PCIDeviceExtension(qubes.ext.Extension):
pass pass
else: else:
raise raise
@functools.lru_cache(maxsize=None)
def _cache_get(vm, ident):
''' Caching wrapper around `PCIDevice(vm, ident)`. '''
return PCIDevice(vm, ident)

View File

@ -55,6 +55,7 @@ class Element(object):
if wrap: if wrap:
return ''.join(self.schema.wrapper.fill(p) + '\n\n' return ''.join(self.schema.wrapper.fill(p) + '\n\n'
for p in textwrap.dedent(xml.text.strip('\n')).split('\n\n')) for p in textwrap.dedent(xml.text.strip('\n')).split('\n\n'))
return ' '.join(xml.text.strip().split()) return ' '.join(xml.text.strip().split())
@ -211,6 +212,7 @@ Quick example, worth thousands lines of specification:
if __name__ == '__main__': if __name__ == '__main__':
main(*sys.argv[1:]) # pylint: disable=no-value-for-parameter # pylint: disable=no-value-for-parameter
main(*sys.argv[1:])
# vim: ts=4 sw=4 et # vim: ts=4 sw=4 et

View File

@ -34,7 +34,6 @@ from datetime import datetime
import lxml.etree import lxml.etree
import pkg_resources import pkg_resources
import qubes import qubes
import qubes.devices
import qubes.exc import qubes.exc
import qubes.utils import qubes.utils
@ -46,6 +45,21 @@ class StoragePoolException(qubes.exc.QubesException):
pass pass
class BlockDevice(object):
''' Represents a storage block device. '''
# pylint: disable=too-few-public-methods
def __init__(self, path, name, script=None, rw=True, domain=None,
devtype='disk'):
assert name, 'Missing device name'
assert path, 'Missing device path'
self.path = path
self.name = name
self.rw = rw
self.script = script
self.domain = domain
self.devtype = devtype
class Volume(object): class Volume(object):
''' Encapsulates all data about a volume for serialization to qubes.xml and ''' Encapsulates all data about a volume for serialization to qubes.xml and
libvirt config. libvirt config.
@ -119,10 +133,10 @@ class Volume(object):
return lxml.etree.Element('volume', **config) return lxml.etree.Element('volume', **config)
def block_device(self): def block_device(self):
''' Return :py:class:`qubes.devices.BlockDevice` for serialization in ''' Return :py:class:`BlockDevice` for serialization in
the libvirt XML template as <disk>. the libvirt XML template as <disk>.
''' '''
return qubes.devices.BlockDevice(self.path, self.name, self.script, return BlockDevice(self.path, self.name, self.script,
self.rw, self.domain, self.devtype) self.rw, self.domain, self.devtype)
@property @property
@ -446,6 +460,7 @@ class Storage(object):
"You need to pass a Volume or pool name as str" "You need to pass a Volume or pool name as str"
if isinstance(volume, Volume): if isinstance(volume, Volume):
return self.pools[volume.name] return self.pools[volume.name]
return self.vm.app.pools[volume] return self.vm.app.pools[volume]
def commit(self): def commit(self):
@ -475,6 +490,7 @@ class Storage(object):
"You need to pass a Volume or pool name as str" "You need to pass a Volume or pool name as str"
if isinstance(volume, Volume): if isinstance(volume, Volume):
return self.pools[volume.name].export(volume) return self.pools[volume.name].export(volume)
return self.pools[volume].export(self.vm.volumes[volume]) return self.pools[volume].export(self.vm.volumes[volume])

View File

@ -30,7 +30,6 @@ import os.path
import re import re
import subprocess import subprocess
import qubes.devices
import qubes.storage import qubes.storage
BLKSIZE = 512 BLKSIZE = 512
@ -358,7 +357,7 @@ class FileVolume(qubes.storage.Volume):
return 'block-snapshot' return 'block-snapshot'
def block_device(self): def block_device(self):
''' Return :py:class:`qubes.devices.BlockDevice` for serialization in ''' Return :py:class:`qubes.storage.BlockDevice` for serialization in
the libvirt XML template as <disk>. the libvirt XML template as <disk>.
''' '''
path = self.path path = self.path
@ -366,7 +365,7 @@ class FileVolume(qubes.storage.Volume):
path += ":" + self.path_source_cow path += ":" + self.path_source_cow
if self._is_origin or self._is_snapshot: if self._is_origin or self._is_snapshot:
path += ":" + self.path_cow path += ":" + self.path_cow
return qubes.devices.BlockDevice(path, self.name, self.script, self.rw, return qubes.storage.BlockDevice(path, self.name, self.script, self.rw,
self.domain, self.devtype) self.domain, self.devtype)
@property @property
@ -378,6 +377,7 @@ class FileVolume(qubes.storage.Volume):
if not os.path.exists(old_revision): if not os.path.exists(old_revision):
return {} return {}
seconds = os.path.getctime(old_revision) seconds = os.path.getctime(old_revision)
iso_date = qubes.storage.isodate(seconds).split('.', 1)[0] iso_date = qubes.storage.isodate(seconds).split('.', 1)[0]
return {iso_date: old_revision} return {iso_date: old_revision}

View File

@ -394,13 +394,14 @@ class ThinVolume(qubes.storage.Volume):
"You shouldn't use lvm size setter") "You shouldn't use lvm size setter")
def block_device(self): def block_device(self):
''' Return :py:class:`qubes.devices.BlockDevice` for serialization in ''' Return :py:class:`qubes.storage.BlockDevice` for serialization in
the libvirt XML template as <disk>. the libvirt XML template as <disk>.
''' '''
if self.snap_on_start: if self.snap_on_start:
return qubes.devices.BlockDevice( return qubes.storage.BlockDevice(
'/dev/' + self._vid_snap, self.name, self.script, '/dev/' + self._vid_snap, self.name, self.script,
self.rw, self.domain, self.devtype) self.rw, self.domain, self.devtype)
return super(ThinVolume, self).block_device() return super(ThinVolume, self).block_device()
@property @property

View File

@ -26,6 +26,7 @@ import qubes.devices
import qubes.tests import qubes.tests
class TestDevice(qubes.devices.DeviceInfo): class TestDevice(qubes.devices.DeviceInfo):
# pylint: disable=too-few-public-methods
pass pass
@ -35,6 +36,7 @@ class TestVMCollection(dict):
class TestApp(object): class TestApp(object):
# pylint: disable=too-few-public-methods
def __init__(self): def __init__(self):
self.domains = TestVMCollection() self.domains = TestVMCollection()
@ -51,20 +53,25 @@ class TestVM(qubes.tests.TestEmitter):
} }
self.app.domains[name] = self self.app.domains[name] = self
self.app.domains[self] = self self.app.domains[self] = self
self.running = False
def __str__(self): def __str__(self):
return self.name return self.name
@qubes.events.handler('device-list-attached:testclass') @qubes.events.handler('device-list-attached:testclass')
def dev_testclass_list_attached(self, event, persistent): def dev_testclass_list_attached(self, event, persistent = False):
for vm in self.app.domains: for vm in self.app.domains:
if vm.device.frontend_domain == self: if vm.device.frontend_domain == self:
yield vm.device yield (vm.device, {})
@qubes.events.handler('device-list:testclass') @qubes.events.handler('device-list:testclass')
def dev_testclass_list(self, event): def dev_testclass_list(self, event):
yield self.device yield self.device
def is_halted(self):
return not self.running
class TC_00_DeviceCollection(qubes.tests.QubesTestCase): class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
def setUp(self): def setUp(self):
@ -73,20 +80,25 @@ class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
self.app.domains['vm'] = self.emitter self.app.domains['vm'] = self.emitter
self.device = self.emitter.device self.device = self.emitter.device
self.collection = self.emitter.devices['testclass'] self.collection = self.emitter.devices['testclass']
self.assignment = qubes.devices.DeviceAssignment(
backend_domain = self.device.backend_domain,
ident = self.device.ident,
persistent=True
)
def test_000_init(self): def test_000_init(self):
self.assertFalse(self.collection._set) self.assertFalse(self.collection._set)
def test_001_attach(self): def test_001_attach(self):
self.collection.attach(self.device) self.collection.attach(self.assignment)
self.assertEventFired(self.emitter, 'device-pre-attach:testclass') self.assertEventFired(self.emitter, 'device-pre-attach:testclass')
self.assertEventFired(self.emitter, 'device-attach:testclass') self.assertEventFired(self.emitter, 'device-attach:testclass')
self.assertEventNotFired(self.emitter, 'device-pre-detach:testclass') self.assertEventNotFired(self.emitter, 'device-pre-detach:testclass')
self.assertEventNotFired(self.emitter, 'device-detach:testclass') self.assertEventNotFired(self.emitter, 'device-detach:testclass')
def test_002_detach(self): def test_002_detach(self):
self.collection.attach(self.device) self.collection.attach(self.assignment)
self.collection.detach(self.device) self.collection.detach(self.assignment)
self.assertEventFired(self.emitter, 'device-pre-attach:testclass') self.assertEventFired(self.emitter, 'device-pre-attach:testclass')
self.assertEventFired(self.emitter, 'device-attach:testclass') self.assertEventFired(self.emitter, 'device-attach:testclass')
self.assertEventFired(self.emitter, 'device-pre-detach:testclass') self.assertEventFired(self.emitter, 'device-pre-detach:testclass')
@ -94,41 +106,43 @@ class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
def test_010_empty_detach(self): def test_010_empty_detach(self):
with self.assertRaises(LookupError): with self.assertRaises(LookupError):
self.collection.detach(self.device) self.collection.detach(self.assignment)
def test_011_double_attach(self): def test_011_double_attach(self):
self.collection.attach(self.device) self.collection.attach(self.assignment)
with self.assertRaises(LookupError): with self.assertRaises(LookupError):
self.collection.attach(self.device) self.collection.attach(self.assignment)
def test_012_double_detach(self): def test_012_double_detach(self):
self.collection.attach(self.device) self.collection.attach(self.assignment)
self.collection.detach(self.device) self.collection.detach(self.assignment)
with self.assertRaises(LookupError): with self.assertRaises(qubes.devices.DeviceNotAttached):
self.collection.detach(self.device) self.collection.detach(self.assignment)
def test_013_list_attached_persistent(self): def test_013_list_attached_persistent(self):
self.assertEqual(set([]), set(self.collection.attached())) self.assertEqual(set([]), set(self.collection.persistent()))
self.collection.attach(self.assignment)
self.assertEventFired(self.emitter, 'device-list-attached:testclass') self.assertEventFired(self.emitter, 'device-list-attached:testclass')
self.collection.attach(self.device) self.assertEqual({self.device}, set(self.collection.persistent()))
self.assertEqual({self.device}, set(self.collection.attached()))
self.assertEqual({self.device}, self.assertEqual({self.device},
set(self.collection.attached(persistent=True))) set(self.collection.persistent()))
self.assertEqual(set([]), self.assertEqual(set([]),
set(self.collection.attached(persistent=False))) set(self.collection.attached()))
def test_014_list_attached_non_persistent(self): def test_014_list_attached_non_persistent(self):
self.collection.attach(self.device, persistent=False) self.assignment.persistent = False
self.emitter.running = True
self.collection.attach(self.assignment)
# device-attach event not implemented, so manipulate object manually # device-attach event not implemented, so manipulate object manually
self.device.frontend_domain = self.emitter self.device.frontend_domain = self.emitter
self.assertEqual({self.device}, self.assertEqual({self.device},
set(self.collection.attached())) set(self.collection.attached()))
self.assertEqual(set([]), self.assertEqual(set([]),
set(self.collection.attached(persistent=True))) set(self.collection.persistent()))
self.assertEqual({self.device}, self.assertEqual({self.device},
set(self.collection.attached(persistent=False))) set(self.collection.attached()))
self.assertEventFired(self.emitter, 'device-list-attached:testclass') self.assertEventFired(self.emitter, 'device-list-attached:testclass')
def test_015_list_available(self): def test_015_list_available(self):
@ -147,6 +161,7 @@ class TC_01_DeviceManager(qubes.tests.QubesTestCase):
def test_001_missing(self): def test_001_missing(self):
device = TestDevice(self.emitter.app.domains['vm'], 'testdev') device = TestDevice(self.emitter.app.domains['vm'], 'testdev')
self.manager['testclass'].attach(device) assignment = qubes.devices.DeviceAssignment(backend_domain=device.backend_domain, ident=device.ident, persistent=True)
self.manager['testclass'].attach(assignment)
self.assertEventFired(self.emitter, 'device-attach:testclass') self.assertEventFired(self.emitter, 'device-attach:testclass')

View File

@ -41,6 +41,7 @@ class TC_00_Devices_PCI(qubes.tests.SystemTestsMixin,
self.skipTest('Specify PCI device with QUBES_TEST_PCIDEV ' self.skipTest('Specify PCI device with QUBES_TEST_PCIDEV '
'environment variable') 'environment variable')
self.dev = self.app.domains[0].devices['pci'][pcidev] self.dev = self.app.domains[0].devices['pci'][pcidev]
self.assignment = qubes.devices.DeviceAssignment(backend_domain=self.dev.backend_domain, ident=self.dev.ident, persistent=True)
if isinstance(self.dev, qubes.devices.UnknownDevice): if isinstance(self.dev, qubes.devices.UnknownDevice):
self.skipTest('Specified device {} does not exists'.format(pcidev)) self.skipTest('Specified device {} does not exists'.format(pcidev))
self.init_default_template() self.init_default_template()
@ -64,110 +65,111 @@ class TC_00_Devices_PCI(qubes.tests.SystemTestsMixin,
self.assertEqual(dev.backend_domain, self.app.domains[0]) self.assertEqual(dev.backend_domain, self.app.domains[0])
self.assertIn(dev.ident, actual_devices) self.assertIn(dev.ident, actual_devices)
self.assertEqual(dev.description, actual_devices[dev.ident]) self.assertEqual(dev.description, actual_devices[dev.ident])
self.assertIsInstance(dev.frontend_domain,
(qubes.vm.BaseVM, None.__class__))
actual_devices.pop(dev.ident) actual_devices.pop(dev.ident)
if actual_devices: if actual_devices:
self.fail('Not all devices listed, missing: {}'.format( self.fail('Not all devices listed, missing: {}'.format(
actual_devices)) actual_devices))
def test_010_attach_offline(self): def assertDeviceNotInCollection(self, dev, dev_col):
self.assertIsNone(self.dev.frontend_domain) self.assertNotIn(dev, dev_col.attached())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached()) self.assertNotIn(dev, dev_col.persistent())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached( self.assertNotIn(dev, dev_col.assignments())
persistent=True)) self.assertNotIn(dev, dev_col.assignments(persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
self.vm.devices['pci'].attach(self.dev) def test_010_attach_offline_persistent(self):
dev_col = self.vm.devices['pci']
self.assertDeviceNotInCollection(self.dev, dev_col)
dev_col.attach(self.assignment)
self.app.save() self.app.save()
self.assertNotIn(self.dev, dev_col.attached())
self.assertIn(self.dev, dev_col.persistent())
self.assertIn(self.dev, dev_col.assignments())
self.assertIn(self.dev, dev_col.assignments(persistent=True))
self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
# still should be None, as domain is not started yet
self.assertIsNone(self.dev.frontend_domain)
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
self.vm.start() self.vm.start()
self.assertEqual(self.dev.frontend_domain, self.vm) self.assertIn(self.dev, dev_col.attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
p = self.vm.run('lspci', passio_popen=True) p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate() (stdout, _) = p.communicate()
self.assertIn(self.dev.description, stdout) self.assertIn(self.dev.description, stdout.decode())
def test_011_attach_online(self):
def test_011_attach_offline_temp_fail(self):
dev_col = self.vm.devices['pci']
self.assertDeviceNotInCollection(self.dev, dev_col)
self.assignment.persistent = False
with self.assertRaises(qubes.exc.QubesVMNotRunningError):
dev_col.attach(self.assignment)
def test_020_attach_online_persistent(self):
self.vm.start() self.vm.start()
self.vm.devices['pci'].attach(self.dev) dev_col = self.vm.devices['pci']
self.assertDeviceNotInCollection(self.dev, dev_col)
dev_col.attach(self.assignment)
self.assertEqual(self.dev.frontend_domain, self.vm) self.assertIn(self.dev, dev_col.attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached()) self.assertIn(self.dev, dev_col.persistent())
self.assertIn(self.dev, self.vm.devices['pci'].attached( self.assertIn(self.dev, dev_col.assignments())
persistent=True)) self.assertIn(self.dev, dev_col.assignments(persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached( self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
persistent=False))
# give VM kernel some time to discover new device # give VM kernel some time to discover new device
time.sleep(1) time.sleep(1)
p = self.vm.run('lspci', passio_popen=True) p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate() (stdout, _) = p.communicate()
self.assertIn(self.dev.description, stdout) self.assertIn(self.dev.description, stdout.decode())
def test_012_attach_online_temp(self):
def test_021_persist_detach_online_fail(self):
dev_col = self.vm.devices['pci']
self.assertDeviceNotInCollection(self.dev, dev_col)
dev_col.attach(self.assignment)
self.app.save()
self.vm.start() self.vm.start()
self.vm.devices['pci'].attach(self.dev, persistent=False) with self.assertRaises(qubes.exc.QubesVMNotHaltedError):
self.vm.devices['pci'].detach(self.assignment)
def test_030_persist_attach_detach_offline(self):
dev_col = self.vm.devices['pci']
self.assertDeviceNotInCollection(self.dev, dev_col)
dev_col.attach(self.assignment)
self.app.save()
self.assertNotIn(self.dev, dev_col.attached())
self.assertIn(self.dev, dev_col.persistent())
self.assertIn(self.dev, dev_col.assignments())
self.assertIn(self.dev, dev_col.assignments(persistent=True))
self.assertNotIn(self.dev, dev_col.assignments(persistent=False))
dev_col.detach(self.assignment)
self.assertDeviceNotInCollection(self.dev, dev_col)
def test_031_attach_detach_online_temp(self):
dev_col = self.vm.devices['pci']
self.vm.start()
self.assignment.persistent = False
self.assertDeviceNotInCollection(self.dev, dev_col)
dev_col.attach(self.assignment)
self.assertIn(self.dev, dev_col.attached())
self.assertNotIn(self.dev, dev_col.persistent())
self.assertIn(self.dev, dev_col.assignments())
self.assertIn(self.dev, dev_col.assignments(persistent=False))
self.assertNotIn(self.dev, dev_col.assignments(persistent=True))
self.assertIn(self.dev, dev_col.assignments(persistent=False))
self.assertEqual(self.dev.frontend_domain, self.vm)
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
# give VM kernel some time to discover new device # give VM kernel some time to discover new device
time.sleep(1) time.sleep(1)
p = self.vm.run('lspci', passio_popen=True) p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate() (stdout, _) = p.communicate()
self.assertIn(self.dev.description, stdout)
def test_020_detach_online(self): self.assertIn(self.dev.description, stdout.decode())
self.vm.devices['pci'].attach(self.dev) dev_col.detach(self.assignment)
self.app.save() self.assertDeviceNotInCollection(self.dev, dev_col)
self.vm.start()
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
self.assertEqual(self.dev.frontend_domain, self.vm)
self.vm.devices['pci'].detach(self.dev)
self.assertIsNone(self.dev.frontend_domain)
self.assertNotIn(self.dev, self.vm.devices['pci'].attached())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
p = self.vm.run('lspci', passio_popen=True) p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate() (stdout, _) = p.communicate()
self.assertNotIn(self.dev.description, stdout) self.assertNotIn(self.dev.description, stdout.decode())
# can't do this right now because of kernel bug - it cause the whole
# PCI bus being deregistered, which emit some warning in sysfs
# handling code (removing non-existing "0000:00" group)
#
# p = self.vm.run('dmesg', passio_popen=True)
# (stdout, _) = p.communicate()
# # check for potential oops
# self.assertNotIn('end trace', stdout)

View File

@ -1,4 +1,4 @@
# pylint: disable=protected-access,pointless-statement # pylint: disable=protected-access
# #
# The Qubes OS Project, https://www.qubes-os.org/ # The Qubes OS Project, https://www.qubes-os.org/
@ -21,6 +21,8 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# #
''' Tests for the `qvm-device` tool. '''
import qubes import qubes
import qubes.devices import qubes.devices
import qubes.tools.qvm_device import qubes.tools.qvm_device
@ -30,82 +32,86 @@ import qubes.tests.devices
import qubes.tests.tools import qubes.tests.tools
class TestNamespace(object): class TestNamespace(object):
''' A mock object for `argparse.Namespace`.
''' # pylint: disable=too-few-public-methods
def __init__(self, app, domains=None, device=None): def __init__(self, app, domains=None, device=None):
super(TestNamespace, self).__init__() super(TestNamespace, self).__init__()
self.app = app self.app = app
self.devclass = 'testclass' self.devclass = 'testclass'
self.persistent = True
if domains: if domains:
self.domains = domains self.domains = domains
if device: if device:
self.device = device self.device = device
self.device_assignment = qubes.devices.DeviceAssignment(
backend_domain=self.device.backend_domain,
ident=self.device.ident, persistent=self.persistent)
class TC_00_Actions(qubes.tests.QubesTestCase): class TC_00_Actions(qubes.tests.QubesTestCase):
''' Tests the output logic of the qvm-device tool '''
def setUp(self): def setUp(self):
super(TC_00_Actions, self).setUp() super(TC_00_Actions, self).setUp()
self.app = qubes.tests.devices.TestApp() self.app = qubes.tests.devices.TestApp()
def save():
''' A mock method for simulating a successful save '''
return True
self.app.save = save
self.vm1 = qubes.tests.devices.TestVM(self.app, 'vm1') self.vm1 = qubes.tests.devices.TestVM(self.app, 'vm1')
self.vm2 = qubes.tests.devices.TestVM(self.app, 'vm2') self.vm2 = qubes.tests.devices.TestVM(self.app, 'vm2')
self.device = self.vm2.device self.device = self.vm2.device
def test_000_list_all(self): def test_000_list_all(self):
''' List all exposed vm devices. No devices are attached to other
domains.
'''
args = TestNamespace(self.app) args = TestNamespace(self.app)
with qubes.tests.tools.StdoutBuffer() as buf: with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args) qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list:testclass')
self.assertEventFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual( self.assertEqual(
[x.rstrip() for x in buf.getvalue().splitlines()], [x.rstrip() for x in buf.getvalue().splitlines()],
['vm1:testdev Description', ['vm1:testdev Description',
'vm2:testdev Description'] 'vm2:testdev Description']
) )
def test_001_list_one(self): def test_001_list_persistent_attach(self):
''' Attach the device exposed by the `vm2` to the `vm1` persistently.
'''
args = TestNamespace(self.app, [self.vm1]) args = TestNamespace(self.app, [self.vm1])
# simulate attach # simulate attach
assignment = qubes.devices.DeviceAssignment(backend_domain=self.vm2,
ident=self.device.ident, persistent=True, frontend_domain=self.vm1)
self.vm2.device.frontend_domain = self.vm1 self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(self.device) self.vm1.devices['testclass']._set.add(assignment)
with qubes.tests.tools.StdoutBuffer() as buf: with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args) qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm1,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual( self.assertEqual(
buf.getvalue(), buf.getvalue(),
'vm2:testdev Description vm1\n' 'vm1:testdev Description\n'
'vm2:testdev Description vm1 vm1\n'
) )
def test_002_list_one_non_persistent(self): def test_002_list_list_temp_attach(self):
''' Attach the device exposed by the `vm2` to the `vm1`
non-persistently.
'''
args = TestNamespace(self.app, [self.vm1]) args = TestNamespace(self.app, [self.vm1])
# simulate attach # simulate attach
assignment = qubes.devices.DeviceAssignment(backend_domain=self.vm2,
ident=self.device.ident, persistent=True, frontend_domain=self.vm1)
self.vm2.device.frontend_domain = self.vm1 self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(assignment)
with qubes.tests.tools.StdoutBuffer() as buf: with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args) qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1, self.assertEqual(buf.getvalue(),
'device-list-attached:testclass') 'vm1:testdev Description\n'
self.assertEventNotFired(self.vm1, 'vm2:testdev Description vm1 vm1\n')
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm2:testdev Description vm1\n'
)
def test_010_attach(self): def test_010_attach(self):
''' Test attach action '''
args = TestNamespace( args = TestNamespace(
self.app, self.app,
[self.vm1], [self.vm1],
@ -118,6 +124,7 @@ class TC_00_Actions(qubes.tests.QubesTestCase):
'device-attach:testclass', kwargs={'device': self.device}) 'device-attach:testclass', kwargs={'device': self.device})
def test_011_double_attach(self): def test_011_double_attach(self):
''' Double attach should not be possible '''
args = TestNamespace( args = TestNamespace(
self.app, self.app,
[self.vm1], [self.vm1],
@ -128,6 +135,7 @@ class TC_00_Actions(qubes.tests.QubesTestCase):
qubes.tools.qvm_device.attach_device(args) qubes.tools.qvm_device.attach_device(args)
def test_020_detach(self): def test_020_detach(self):
''' Test detach action '''
args = TestNamespace( args = TestNamespace(
self.app, self.app,
[self.vm1], [self.vm1],
@ -135,10 +143,12 @@ class TC_00_Actions(qubes.tests.QubesTestCase):
) )
# simulate attach # simulate attach
self.vm2.device.frontend_domain = self.vm1 self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(self.device) args.device_assignment.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(args.device_assignment)
qubes.tools.qvm_device.detach_device(args) qubes.tools.qvm_device.detach_device(args)
def test_021_detach_not_attached(self): def test_021_detach_not_attached(self):
''' Invalid detach action should not be possible '''
args = TestNamespace( args = TestNamespace(
self.app, self.app,
[self.vm1], [self.vm1],

View File

@ -108,7 +108,7 @@ class TC_10_BaseVM(qubes.tests.QubesTestCase):
}) })
self.assertCountEqual(vm.devices.keys(), ('pci',)) self.assertCountEqual(vm.devices.keys(), ('pci',))
self.assertCountEqual(list(vm.devices['pci'].attached(persistent=True)), self.assertCountEqual(list(vm.devices['pci'].persistent()),
[qubes.ext.pci.PCIDevice(vm, '00:11.22')]) [qubes.ext.pci.PCIDevice(vm, '00:11.22')])
self.assertXMLIsValid(vm.__xml__(), 'domain.rng') self.assertXMLIsValid(vm.__xml__(), 'domain.rng')

View File

@ -257,7 +257,7 @@ class VolumeAction(QubesAction):
try: try:
pool = app.pools[pool_name] pool = app.pools[pool_name]
volume = [v for v in pool.volumes if v.vid == vid] volume = [v for v in pool.volumes if v.vid == vid]
assert volume > 1, 'Duplicate vids in pool %s' % pool_name assert len(volume) == 1, 'Duplicate vids in pool %s' % pool_name
if not volume: if not volume:
parser.error_runtime( parser.error_runtime(
'no volume with id {!r} pool: {!r}'.format(vid, 'no volume with id {!r} pool: {!r}'.format(vid,
@ -352,9 +352,8 @@ class QubesArgumentParser(argparse.ArgumentParser):
self.set_defaults(verbose=1, quiet=0) self.set_defaults(verbose=1, quiet=0)
def parse_args(self, *args, **kwargs): def parse_args(self, args=None, namespace=None):
# pylint: disable=arguments-differ namespace = super(QubesArgumentParser, self).parse_args(args, namespace)
namespace = super(QubesArgumentParser, self).parse_args(*args, **kwargs)
if self._want_app and not self._want_app_no_instance: if self._want_app and not self._want_app_no_instance:
self.set_qubes_verbosity(namespace) self.set_qubes_verbosity(namespace)
@ -442,8 +441,8 @@ class AliasedSubParsersAction(argparse._SubParsersAction):
sup = super(AliasedSubParsersAction._AliasedPseudoAction, self) sup = super(AliasedSubParsersAction._AliasedPseudoAction, self)
sup.__init__(option_strings=[], dest=dest, help=help) sup.__init__(option_strings=[], dest=dest, help=help)
def __call__(self, **kwargs): def __call__(self, parser, namespace, values, option_string=None):
pass raise NotImplementedError
def add_parser(self, name, **kwargs): def add_parser(self, name, **kwargs):
if 'aliases' in kwargs: if 'aliases' in kwargs:

View File

@ -41,8 +41,7 @@ class QubesDaemonProtocol(asyncio.Protocol):
print('connection_lost(exc={!r})'.format(exc)) print('connection_lost(exc={!r})'.format(exc))
self.untrusted_buffer.close() self.untrusted_buffer.close()
def data_received(self, untrusted_data): def data_received(self, untrusted_data): # pylint: disable=arguments-differ
# pylint: disable=arguments-differ
print('data_received(untrusted_data={!r})'.format(untrusted_data)) print('data_received(untrusted_data={!r})'.format(untrusted_data))
if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size: if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
self.app.log.warning('request too long') self.app.log.warning('request too long')

View File

@ -38,8 +38,7 @@ parser.add_argument("--template", action="store_true", dest="template",
def print_msg(domains, what_single, what_plural): def print_msg(domains, what_single, what_plural):
# pylint: disable=len-as-condition if not domains:
if len(domains) == 0:
print("None of given VM {!s}".format(what_single)) print("None of given VM {!s}".format(what_single))
elif len(domains) == 1: elif len(domains) == 1:
print("VM {!s} {!s}".format(domains[0], what_single)) print("VM {!s} {!s}".format(domains[0], what_single))

View File

@ -49,17 +49,30 @@ def prepare_table(dev_list):
output = [] output = []
header = [] header = []
if sys.stdout.isatty(): if sys.stdout.isatty():
header += [('BACKEND:DEVID', 'DESCRIPTION', 'USED BY')] # NOQA header += [('VMNAME:DEVID', 'DESCRIPTION', 'USED BY', 'ASSIGNED')] # NOQA
for dev in dev_list: for dev in dev_list:
output += [( output += [(
"{!s}:{!s}".format(dev.backend_domain, dev.ident), dev.id,
dev.description, dev.description,
str(dev.frontend_domain) if dev.frontend_domain else "", str(dev.attached_to),
dev.assignments
)] )]
return header + sorted(output) return header + sorted(output)
class Line(object):
def __init__(self, device: qubes.devices.DeviceInfo, attached_to = None):
self.id = "{!s}:{!s}".format(device.backend_domain, device.ident)
self.description = device.description
self.attached_to = attached_to if attached_to else ""
self.frontends = []
@property
def assignments(self):
return ', '.join(self.frontends)
def list_devices(args): def list_devices(args):
''' Called by the parser to execute the qubes-devices list ''' Called by the parser to execute the qubes-devices list
@ -67,32 +80,60 @@ def list_devices(args):
app = args.app app = args.app
result = [] result = []
devices = set()
if hasattr(args, 'domains') and args.domains: if hasattr(args, 'domains') and args.domains:
for domain in args.domains: for domain in args.domains:
result.extend(domain.devices[args.devclass].attached()) for dev in domain.devices[args.devclass].attached():
else: devices.add(dev)
for backend in app.domains: for dev in domain.devices[args.devclass].available():
result.extend(backend.devices[args.devclass]) devices.add(dev)
qubes.tools.print_table(prepare_table(result)) else:
for domain in app.domains:
for dev in domain.devices[args.devclass].available():
devices.add(dev)
result = {dev: Line(dev) for dev in devices}
for dev in result:
for domain in app.domains:
if domain == dev.backend_domain:
continue
elif dev in domain.devices[args.devclass].attached():
result[dev].attached_to = str(domain)
if dev in domain.devices[args.devclass].assignments():
if dev in domain.devices[args.devclass].persistent():
result[dev].frontends.append(str(domain))
qubes.tools.print_table(prepare_table(result.values()))
def attach_device(args): def attach_device(args):
''' Called by the parser to execute the :program:`qvm-devices attach` ''' Called by the parser to execute the :program:`qvm-devices attach`
subcommand. subcommand.
''' '''
device = args.device device_assignment = args.device_assignment
vm = args.domains[0] vm = args.domains[0]
vm.devices[args.devclass].attach(device) app = args.app
device_assignment.persistent = args.persistent
vm.devices[args.devclass].attach(device_assignment)
if device_assignment.persistent:
app.save()
def detach_device(args): def detach_device(args):
''' Called by the parser to execute the :program:`qvm-devices detach` ''' Called by the parser to execute the :program:`qvm-devices detach`
subcommand. subcommand.
''' '''
device = args.device device_assignment = args.device_assignment
vm = args.domains[0] vm = args.domains[0]
vm.devices[args.devclass].detach(device) before = len(vm.devices[args.devclass].persistent())
vm.devices[args.devclass].detach(device_assignment)
after = len(vm.devices[args.devclass].persistent())
if after < before:
args.app.save()
def init_list_parser(sub_parsers): def init_list_parser(sub_parsers):
@ -110,44 +151,48 @@ def init_list_parser(sub_parsers):
class DeviceAction(qubes.tools.QubesAction): class DeviceAction(qubes.tools.QubesAction):
''' Action for argument parser that gets the ''' Action for argument parser that gets the
:py:class:``qubes.storage.Volume`` from a POOL_NAME:VOLUME_ID string. :py:class:``qubes.device.DeviceInfo`` from a BACKEND:DEVICE_ID string.
''' ''' # pylint: disable=too-few-public-methods
# pylint: disable=too-few-public-methods
def __init__(self, help='A domain & device id combination', def __init__(self, help='A pool & volume id combination',
required=True, allow_unknown=False, **kwargs): required=True, **kwargs):
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
super(DeviceAction, self).__init__(help=help, required=required, super(DeviceAction, self).__init__(help=help, required=required,
**kwargs) **kwargs)
self.allow_unknown = allow_unknown
def __call__(self, parser, namespace, values, option_string=None): def __call__(self, parser, namespace, values, option_string=None):
''' Set ``namespace.device`` to ``values`` ''' ''' Set ``namespace.vmname`` to ``values`` '''
setattr(namespace, self.dest, values) setattr(namespace, self.dest, values)
def parse_qubes_app(self, parser, namespace): def parse_qubes_app(self, parser, namespace):
''' Acquire the :py:class:``qubes.devices.DeviceInfo`` object from
``namespace.app``.
'''
assert hasattr(namespace, 'app') assert hasattr(namespace, 'app')
assert hasattr(namespace, 'devclass')
app = namespace.app app = namespace.app
assert hasattr(namespace, 'device')
backend_device_id = getattr(namespace, self.dest)
assert hasattr(namespace, 'devclass')
devclass = namespace.devclass devclass = namespace.devclass
try: try:
backend_name, devid = getattr(namespace, self.dest).split(':', 1) vmname, device_id = backend_device_id.split(':', 1)
try: try:
backend = app.domains[backend_name] vm = app.domains[vmname]
dev = backend.devices[devclass][devid]
if not self.allow_unknown and isinstance(dev,
qubes.devices.UnknownDevice):
parser.error_runtime('no device {!r} in qube {!r}'.format(
backend_name, devid))
except KeyError: except KeyError:
parser.error_runtime('no domain {!r}'.format(backend_name)) parser.error_runtime("no backend vm {!r}".format(vmname))
try:
vm.devices[devclass][device_id]
except KeyError:
parser.error_runtime(
"backend vm {!r} doesn't expose device {!r}"
.format(vmname, device_id))
device_assignment = qubes.devices.DeviceAssignment(vm, device_id,)
setattr(namespace, 'device_assignment', device_assignment)
except ValueError: except ValueError:
parser.error('expected a domain & device id combination like ' parser.error('expected a backend vm & device id combination ' \
'foo:bar') 'like foo:bar got %s' % backend_device_id)
def get_parser(device_class=None): def get_parser(device_class=None):
'''Create :py:class:`argparse.ArgumentParser` suitable for '''Create :py:class:`argparse.ArgumentParser` suitable for
@ -169,15 +214,29 @@ def get_parser(device_class=None):
init_list_parser(sub_parsers) init_list_parser(sub_parsers)
attach_parser = sub_parsers.add_parser( attach_parser = sub_parsers.add_parser(
'attach', help="Attach device to domain", aliases=('at', 'a')) 'attach', help="Attach device to domain", aliases=('at', 'a'))
attach_parser.add_argument('VMNAME', action=qubes.tools.RunningVmNameAction)
attach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
attach_parser.set_defaults(func=detach_device)
detach_parser = sub_parsers.add_parser( detach_parser = sub_parsers.add_parser(
"detach", help="Detach device from domain", aliases=('d', 'dt')) "detach", help="Detach device from domain", aliases=('d', 'dt'))
detach_parser.add_argument('VMNAME', action=qubes.tools.RunningVmNameAction)
detach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device', attach_parser.add_argument('VMNAME', action=qubes.tools.VmNameAction)
action=qubes.tools.VolumeAction) detach_parser.add_argument('VMNAME', action=qubes.tools.VmNameAction)
if device_class == 'block':
attach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
detach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
else:
attach_parser.add_argument(metavar='BACKEND:DEVICE_ID',
dest='device',
action=DeviceAction)
attach_parser.add_argument('-p', '--persistent', default=False,
help='device will attached on each start of the VMNAME',
action='store_true')
detach_parser.add_argument(metavar='BACKEND:DEVICE_ID',
dest='device',
action=DeviceAction)
attach_parser.set_defaults(func=attach_device)
detach_parser.set_defaults(func=detach_device) detach_parser.set_defaults(func=detach_device)
return parser return parser

View File

@ -309,6 +309,7 @@ class StatusColumn(Column):
if ret is not None: if ret is not None:
if getattr(vm, 'hvm', False): if getattr(vm, 'hvm', False):
return ret.upper() return ret.upper()
return ret return ret
@ -477,6 +478,7 @@ class Table(object):
'''Format single table row (all columns for one domain).''' '''Format single table row (all columns for one domain).'''
if self.raw_data: if self.raw_data:
return '|'.join(col.format(vm) for col in self.columns) return '|'.join(col.format(vm) for col in self.columns)
return ''.join(col.cell(vm) for col in self.columns) return ''.join(col.cell(vm) for col in self.columns)

View File

@ -111,18 +111,21 @@ def parse_size(size):
def mbytes_to_kmg(size): def mbytes_to_kmg(size):
if size > 1024: if size > 1024:
return "%d GiB" % (size / 1024) return "%d GiB" % (size / 1024)
return "%d MiB" % size return "%d MiB" % size
def kbytes_to_kmg(size): def kbytes_to_kmg(size):
if size > 1024: if size > 1024:
return mbytes_to_kmg(size / 1024) return mbytes_to_kmg(size / 1024)
return "%d KiB" % size return "%d KiB" % size
def bytes_to_kmg(size): def bytes_to_kmg(size):
if size > 1024: if size > 1024:
return kbytes_to_kmg(size / 1024) return kbytes_to_kmg(size / 1024)
return "%d B" % size return "%d B" % size
@ -134,6 +137,7 @@ def size_to_human(size):
return str(round(size / 1024.0, 1)) + ' KiB' return str(round(size / 1024.0, 1)) + ' KiB'
elif size < 1024 * 1024 * 1024: elif size < 1024 * 1024 * 1024:
return str(round(size / (1024.0 * 1024), 1)) + ' MiB' return str(round(size / (1024.0 * 1024), 1)) + ' MiB'
return str(round(size / (1024.0 * 1024 * 1024), 1)) + ' GiB' return str(round(size / (1024.0 * 1024 * 1024), 1)) + ' GiB'

View File

@ -217,11 +217,17 @@ class BaseVM(qubes.PropertyHolder, metaclass=BaseVMMeta):
for parent in self.xml.xpath('./devices'): for parent in self.xml.xpath('./devices'):
devclass = parent.get('class') devclass = parent.get('class')
for node in parent.xpath('./device'): for node in parent.xpath('./device'):
device = self.devices[devclass].devclass( options = {}
if node.get('options'):
options = node.get('options').attribs(),
device_assignment = qubes.devices.DeviceAssignment(
self.app.domains[node.get('backend-domain')], self.app.domains[node.get('backend-domain')],
node.get('id') node.get('id'),
options,
persistent=True
) )
self.devices[devclass].attach(device) self.devices[devclass].attach(device_assignment)
# tags # tags
for node in self.xml.xpath('./tags/tag'): for node in self.xml.xpath('./tags/tag'):
@ -250,10 +256,14 @@ class BaseVM(qubes.PropertyHolder, metaclass=BaseVMMeta):
for devclass in self.devices: for devclass in self.devices:
devices = lxml.etree.Element('devices') devices = lxml.etree.Element('devices')
devices.set('class', devclass) devices.set('class', devclass)
for device in self.devices[devclass].attached(persistent=True): for device in self.devices[devclass].assignments(persistent=True):
node = lxml.etree.Element('device') node = lxml.etree.Element('device')
node.set('backend-domain', device.backend_domain.name) node.set('backend-domain', device.backend_domain.name)
node.set('id', device.ident) node.set('id', device.ident)
options_node = lxml.etree.Element('options')
for key, val in device.options:
options_node.set(key, val)
node.append(options_node)
devices.append(node) devices.append(node)
element.append(devices) element.append(devices)

View File

@ -115,8 +115,7 @@ class AdminVM(qubes.vm.qubesvm.QubesVM):
try: try:
return self.app.vmm.libvirt_conn.getInfo()[1] return self.app.vmm.libvirt_conn.getInfo()[1]
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
self.log.warning( self.log.warning('Failed to get memory limit for dom0: %s', e)
'Failed to get memory limit for dom0: {}'.format(e))
return 4096 return 4096
def verify_files(self): def verify_files(self):
@ -127,7 +126,8 @@ class AdminVM(qubes.vm.qubesvm.QubesVM):
''' # pylint: disable=no-self-use ''' # pylint: disable=no-self-use
return True return True
def start(self, **kwargs): def start(self, preparing_dvm=False, start_guid=True, notify_function=None,
mem_required=None):
'''Always raises an exception. '''Always raises an exception.
.. seealso: .. seealso:

View File

@ -49,6 +49,7 @@ def _default_ip(self):
return None return None
if self.netvm is not None: if self.netvm is not None:
return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member
return self.get_ip_for_vm(self) return self.get_ip_for_vm(self)
@ -172,6 +173,7 @@ class NetVMMixin(qubes.events.Emitter):
'10.139.1.1', '10.139.1.1',
'10.139.1.2', '10.139.1.2',
) )
return None return None
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):

View File

@ -430,7 +430,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# pylint: disable=no-member # pylint: disable=no-member
kernelopts = qubes.property('kernelopts', type=str, load_stage=4, kernelopts = qubes.property('kernelopts', type=str, load_stage=4,
default=(lambda self: qubes.config.defaults['kernelopts_pcidevs'] default=(lambda self: qubes.config.defaults['kernelopts_pcidevs']
if list(self.devices['pci'].attached(persistent=True)) # pylint: disable=no-member
if list(self.devices['pci'].persistent())
else self.template.kernelopts if hasattr(self, 'template') else self.template.kernelopts if hasattr(self, 'template')
else qubes.config.defaults['kernelopts']), else qubes.config.defaults['kernelopts']),
ls_width=30, ls_width=30,
@ -444,6 +445,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# XXX shouldn't this go to standalone VM and TemplateVM, and leave here # XXX shouldn't this go to standalone VM and TemplateVM, and leave here
# only plain property? # only plain property?
default_user = qubes.property('default_user', type=str, default_user = qubes.property('default_user', type=str,
# pylint: disable=no-member
default=(lambda self: self.template.default_user default=(lambda self: self.template.default_user
if hasattr(self, 'template') else 'user'), if hasattr(self, 'template') else 'user'),
setter=_setter_default_user, setter=_setter_default_user,
@ -570,7 +572,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
@property @property
def block_devices(self): def block_devices(self):
''' Return all :py:class:`qubes.devices.BlockDevice`s for current domain ''' Return all :py:class:`qubes.storage.BlockDevice`s for current domain
for serialization in the libvirt XML template as <disk>. for serialization in the libvirt XML template as <disk>.
''' '''
return [v.block_device() for v in self.volumes.values()] return [v.block_device() for v in self.volumes.values()]
@ -1435,7 +1437,9 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
else: else:
if not self.is_fully_usable(): if not self.is_fully_usable():
return "Transient" return "Transient"
return "Running" return "Running"
return 'Halted' return 'Halted'
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
@ -1614,6 +1618,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
'/vm/{}/start_time'.format(self.uuid)) '/vm/{}/start_time'.format(self.uuid))
if start_time != '': if start_time != '':
return datetime.datetime.fromtimestamp(float(start_time)) return datetime.datetime.fromtimestamp(float(start_time))
return None return None
def is_outdated(self): def is_outdated(self):

View File

@ -212,8 +212,8 @@ the parser will complain about missing combine= attribute on the second <start>.
</doc:description> </doc:description>
<attribute name="backend-domain"> <attribute name="backend-domain">
<doc:description> <doc:description>
Backend domain name. Backend domain name.
</doc:description> </doc:description>
<data type="string"> <data type="string">
<param name="pattern">[a-z0-9_]+</param> <param name="pattern">[a-z0-9_]+</param>
</data> </data>
@ -224,6 +224,15 @@ the parser will complain about missing combine= attribute on the second <start>.
<param name="pattern">[0-9a-f]{2}:[0-9a-f]{2}.[0-9a-f]{2}</param> <param name="pattern">[0-9a-f]{2}:[0-9a-f]{2}.[0-9a-f]{2}</param>
</data> </data>
</attribute> </attribute>
<optional>
<element name="options">
<doc:description>
Options
</doc:description>
<data type="string">
</data>
</element>
</optional>
</element> </element>
</oneOrMore> </oneOrMore>
</element> </element>

View File

@ -32,7 +32,7 @@
<viridian/> <viridian/>
{% endif %} {% endif %}
{% if vm.devices['pci'].attached(persistent=True) | list {% if vm.devices['pci'].persistent() | list
and vm.features.get('pci-e820-host', True) %} and vm.features.get('pci-e820-host', True) %}
<xen> <xen>
<e820_host state="on"/> <e820_host state="on"/>
@ -106,7 +106,7 @@
{% include 'libvirt/devices/net.xml' with context %} {% include 'libvirt/devices/net.xml' with context %}
{% endif %} {% endif %}
{% for device in vm.devices.pci.attached(persistent=True) %} {% for device in vm.devices.pci.persistent() %}
{% include 'libvirt/devices/pci.xml' %} {% include 'libvirt/devices/pci.xml' %}
{% endfor %} {% endfor %}