Merge branch 'core3-devices' into core3-devel

* core3-devices:
  Fix core2migration and tests for new devices API
  tests: more qubes.devices tests
  qubes/ext/pci: implement pci-no-strict-reset/BDF feature
  qubes/tools: allow calling qvm-device as qvm-devclass (like qvm-pci)
  qubes: make pylint happy
  qubes/tools: add qvm-device tool (and tests)
  tests: load qubes.tests.tools.qvm_ls
  tests: PCI devices tests
  tests: add context manager to catch stdout
  qubes/ext/pci: move PCI devices handling to an extension
  qubes/devices: use more detailed exceptions than just KeyError
  qubes/devices: allow non-persistent attach
  qubes/storage: misc fixes for VM-exposed block devices handling
  qubes: new devices API

Fixes QubesOS/qubes-issues#2257
This commit is contained in:
Marek Marczykowski-Górecki 2016-09-04 21:32:01 +02:00
commit 16db68b053
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
26 changed files with 1251 additions and 175 deletions

View File

@ -57,6 +57,8 @@ ifeq ($(OS),Linux)
$(MAKE) install -C linux/system-config $(MAKE) install -C linux/system-config
endif endif
python setup.py install -O1 --skip-build --root $(DESTDIR) python setup.py install -O1 --skip-build --root $(DESTDIR)
ln -s qvm-device $(DESTDIR)/usr/bin/qvm-pci
ln -s qvm-device $(DESTDIR)/usr/bin/qvm-usb
# $(MAKE) install -C tests # $(MAKE) install -C tests
$(MAKE) install -C relaxng $(MAKE) install -C relaxng
mkdir -p $(DESTDIR)/etc/qubes mkdir -p $(DESTDIR)/etc/qubes

View File

@ -22,7 +22,7 @@
</features> </features>
<devices class="pci"> <devices class="pci">
<device>01:23.45</device> <device backend-domain="dom0" id="01:23.45"/>
</devices> </devices>
</domain> </domain>

View File

@ -187,12 +187,12 @@ class Core2Qubes(qubes.Qubes):
services = ast.literal_eval(services) services = ast.literal_eval(services)
else: else:
services = {} services = {}
for service, value in services.iteritems(): for service, value in services.items():
feature = service feature = service
for repl_feature, repl_service in \ for repl_feature, repl_service in \
qubes.ext.r3compatibility.\ qubes.ext.r3compatibility.\
R3Compatibility.features_to_services.\ R3Compatibility.features_to_services.\
iteritems(): items():
if repl_service == service: if repl_service == service:
feature = repl_feature feature = repl_feature
vm.features[feature] = value vm.features[feature] = value
@ -205,7 +205,8 @@ class Core2Qubes(qubes.Qubes):
pcidevs = ast.literal_eval(pcidevs) pcidevs = ast.literal_eval(pcidevs)
for pcidev in pcidevs: for pcidev in pcidevs:
try: try:
vm.devices["pci"].attach(pcidev) vm.devices["pci"].attach(
self.domains[0].devices['pci'][pcidev])
except qubes.exc.QubesException as e: except qubes.exc.QubesException as e:
self.log.error("VM {}: {}".format(vm.name, str(e))) self.log.error("VM {}: {}".format(vm.name, str(e)))
except (ValueError, LookupError) as err: except (ValueError, LookupError) as err:

View File

@ -23,11 +23,42 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# #
import re '''API for various types of devices.
Main concept is that some domain main
expose (potentially multiple) devices, which can be attached to other domains.
Devices can be of different classes (like 'pci', 'usb', etc). Each device
class is implemented by an extension.
Devices are identified by pair of (backend domain, `ident`), where `ident` is
:py:class:`str`.
Such extension should provide:
- `qubes.devices` endpoint - a class descendant from
:py:class:`qubes.devices.DeviceInfo`, designed to hold device description (
including class-specific properties)
- handle `device-attach:class` and `device-detach:class` events for
performing the attach/detach action; events are fired even when domain isn't
running and extension should be prepared for this
- handle `device-list:class` event - list devices exposed by particular
domain; it should return list of appropriate DeviceInfo objects
- handle `device-get:class` event - get one device object exposed by this
domain of given identifier
- handle `device-list-attached:class` event - list currently attached
devices to this domain
'''
import qubes.utils import qubes.utils
class DeviceNotAttached(qubes.exc.QubesException, KeyError):
'''Trying to detach not attached device'''
pass
class DeviceAlreadyAttached(qubes.exc.QubesException, KeyError):
'''Trying to attach already attached device'''
pass
class DeviceCollection(object): class DeviceCollection(object):
'''Bag for devices. '''Bag for devices.
@ -35,6 +66,52 @@ class DeviceCollection(object):
:param vm: VM for which we manage devices :param vm: VM for which we manage devices
:param class_: device class :param class_: device class
This class emits following events on VM object:
.. event:: device-attach:<class> (device)
Fired when device is attached to a VM.
:param device: :py:class:`DeviceInfo` object to be attached
.. event:: device-pre-attach:<class> (device)
Fired before device is attached to a VM
:param device: :py:class:`DeviceInfo` object to be attached
.. event:: device-detach:<class> (device)
Fired when device is detached from a VM.
:param device: :py:class:`DeviceInfo` object to be attached
.. event:: device-pre-detach:<class> (device)
Fired before device is detached from a VM
:param device: :py:class:`DeviceInfo` object to be attached
.. event:: device-list:<class>
Fired to get list of devices exposed by a VM. Handlers of this
event should return a list of py:class:`DeviceInfo` objects (or
appropriate class specific descendant)
.. event:: device-get:<class> (ident)
Fired to get a single device, given by the `ident` parameter.
Handlers of this event should either return appropriate object of
:py:class:`DeviceInfo`, or :py:obj:`None`. Especially should not
raise :py:class:`exceptions.KeyError`.
.. event:: device-list-attached:<class> (persistent)
Fired to get list of currently attached devices to a VM. Handlers
of this event should return list of devices actually attached to
a domain, regardless of its settings.
''' '''
def __init__(self, vm, class_): def __init__(self, vm, class_):
@ -42,56 +119,107 @@ class DeviceCollection(object):
self._class = class_ self._class = class_
self._set = set() self._set = set()
self.devclass = qubes.utils.get_entry_point_one(
'qubes.devices', self._class)
def attach(self, device): def attach(self, device, persistent=True):
'''Attach (add) device to domain. '''Attach (add) device to domain.
:param str device: device identifier (format is class-dependent) :param DeviceInfo device: device object
''' '''
try: if device in self.attached():
devclass = qubes.utils.get_entry_point_one( raise DeviceAlreadyAttached(
'qubes.devices', self._class)
except KeyError:
devclass = str
if not isinstance(device, devclass):
device = devclass(device)
if device in self:
raise KeyError(
'device {!r} of class {} already attached to {!r}'.format( 'device {!r} of class {} already attached to {!r}'.format(
device, self._class, self._vm)) device, self._class, self._vm))
self._vm.fire_event_pre('device-pre-attach:' + self._class, device) self._vm.fire_event_pre('device-pre-attach:' + self._class, device)
self._set.add(device) if persistent:
self._set.add(device)
self._vm.fire_event('device-attach:' + self._class, device) self._vm.fire_event('device-attach:' + self._class, device)
def detach(self, device): def detach(self, device, persistent=True):
'''Detach (remove) device from domain. '''Detach (remove) device from domain.
:param str device: device identifier (format is class-dependent) :param DeviceInfo device: device object
''' '''
if device not in self: if device not in self.attached():
raise KeyError( raise DeviceNotAttached(
'device {!r} of class {} not attached to {!r}'.format( 'device {!s} of class {} not attached to {!s}'.format(
device, self._class, self._vm)) device, self._class, self._vm))
self._vm.fire_event_pre('device-pre-detach:' + self._class, device) self._vm.fire_event_pre('device-pre-detach:' + self._class, device)
self._set.remove(device) if persistent:
self._set.remove(device)
self._vm.fire_event('device-detach:' + self._class, device) self._vm.fire_event('device-detach:' + self._class, device)
def attached(self, persistent=None):
'''List devices which are (or may be) attached to this vm
Devices may be attached persistently (so they are included in
:file:`qubes.xml`) or not. Device can also be in :file:`qubes.xml`,
but be temporarily detached.
:param bool persistent: only include devices which are (or are not) \
attached persistently - None means both
'''
seen = self._set.copy()
# ask for really attached devices only when requested not only
# persistent ones
if persistent is not True:
attached = self._vm.fire_event(
'device-list-attached:' + self._class,
persistent=persistent)
for device in attached:
device_persistent = device in self._set
if persistent is not None and device_persistent != persistent:
continue
assert device.frontend_domain == self._vm, \
'{!r} != {!r}'.format(device.frontend_domain, self._vm)
yield device
try:
seen.remove(device)
except KeyError:
pass
if persistent is False:
return
for device in seen:
# get fresh object - may contain updated information
device = device.backend_domain.devices[self._class][device.ident]
yield device
def available(self):
'''List devices exposed by this vm'''
devices = self._vm.fire_event('device-list:' + self._class)
return devices
def __iter__(self): def __iter__(self):
return iter(self._set) return iter(self.available())
def __getitem__(self, ident):
'''Get device object with given ident.
def __contains__(self, item): :returns: py:class:`DeviceInfo`
return item in self._set
If domain isn't running, it is impossible to check device validity,
so return UnknownDevice object. Also do the same for non-existing
devices - otherwise it will be impossible to detach already
disconnected device.
def __len__(self): :raises AssertionError: when multiple devices with the same ident are
return len(self._set) found
'''
dev = self._vm.fire_event('device-get:' + self._class, ident)
if dev:
assert len(dev) == 1
return dev[0]
else:
return UnknownDevice(self._vm, ident)
class DeviceManager(dict): class DeviceManager(dict):
@ -109,30 +237,59 @@ class DeviceManager(dict):
return self[key] return self[key]
class RegexDevice(str): class DeviceInfo(object):
regex = None # pylint: disable=too-few-public-methods
def __init__(self, *args, **kwargs): def __init__(self, backend_domain, ident, description=None,
super(RegexDevice, self).__init__(*args, **kwargs) frontend_domain=None, **kwargs):
#: domain providing this device
self.backend_domain = backend_domain
#: device identifier (unique for given domain and device type)
self.ident = ident
# allow redefining those as dynamic properties in subclasses
try:
#: human readable description/name of the device
self.description = description
except AttributeError:
pass
try:
#: (running) domain to which device is currently attached
self.frontend_domain = frontend_domain
except AttributeError:
pass
self.data = kwargs
if self.regex is None: if hasattr(self, 'regex'):
raise NotImplementedError( # pylint: disable=no-member
'You should overload .regex attribute in subclass') dev_match = self.regex.match(ident)
if not dev_match:
raise ValueError('Invalid device identifier: {!r}'.format(
ident))
dev_match = self.regex.match(self) for group in self.regex.groupindex:
if not dev_match: setattr(self, group, dev_match.group(group))
raise ValueError('Invalid device identifier: {!r}'.format(self))
for group in self.regex.groupindex: def __hash__(self):
setattr(self, group, dev_match.group(group)) return hash(self.ident)
def __eq__(self, other):
return (
self.backend_domain == other.backend_domain and
self.ident == other.ident
)
class PCIDevice(RegexDevice): def __str__(self):
regex = re.compile( return '{!s}:{!s}'.format(self.backend_domain, self.ident)
r'^(?P<bus>[0-9a-f]+):(?P<device>[0-9a-f]+)\.(?P<function>[0-9a-f]+)$')
@property class UnknownDevice(DeviceInfo):
def libvirt_name(self): # pylint: disable=too-few-public-methods
return 'pci_0000_{}_{}_{}'.format(self.bus, self.device, self.function) '''Unknown device - for example exposed by domain not running currently'''
def __init__(self, backend_domain, ident, description=None,
frontend_domain=None, **kwargs):
if description is None:
description = "Unknown device"
super(UnknownDevice, self).__init__(backend_domain, ident, description,
frontend_domain, **kwargs)
class BlockDevice(object): class BlockDevice(object):

View File

@ -129,7 +129,7 @@ class Emitter(object):
''' '''
if not self.events_enabled: if not self.events_enabled:
return return []
effects = [] effects = []
for cls in order: for cls in order:

303
qubes/ext/pci.py Normal file
View File

@ -0,0 +1,303 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import os
import re
import subprocess
import libvirt
import lxml
import lxml.etree
import qubes.devices
import qubes.ext
#: cache of PCI device classes
pci_classes = None
def load_pci_classes():
# List of known device classes, subclasses and programming interfaces
# Syntax:
# C class class_name
# subclass subclass_name <-- single tab
# prog-if prog-if_name <-- two tabs
result = {}
with open('/usr/share/hwdata/pci.ids') as pciids:
class_id = None
subclass_id = None
for line in pciids.readlines():
line = line.rstrip()
if line.startswith('\t\t') and class_id and subclass_id:
(progif_id, _, class_name) = line[2:].split(' ', 2)
result[class_id + subclass_id + progif_id] = \
class_name
elif line.startswith('\t') and class_id:
(subclass_id, _, class_name) = line[1:].split(' ', 2)
# store both prog-if specific entry and generic one
result[class_id + subclass_id + '00'] = \
class_name
result[class_id + subclass_id] = \
class_name
elif line.startswith('C '):
(_, class_id, _, class_name) = line.split(' ', 3)
result[class_id + '0000'] = class_name
result[class_id + '00'] = class_name
subclass_id = None
return result
def pcidev_class(dev_xmldesc):
sysfs_path = dev_xmldesc.findtext('path')
assert sysfs_path
try:
class_id = open(sysfs_path + '/class').read().strip()
except OSError:
return "Unknown"
if not qubes.ext.pci.pci_classes:
qubes.ext.pci.pci_classes = load_pci_classes()
if class_id.startswith('0x'):
class_id = class_id[2:]
try:
# ignore prog-if
return qubes.ext.pci.pci_classes[class_id[0:4]]
except KeyError:
return "Unknown"
def attached_devices(app):
"""Return map device->domain-name for all currently attached devices"""
# Libvirt do not expose nice API to query where the device is
# attached. The only way would be to query _all_ the domains (
# each with separate libvirt call) and look if the device is
# there. Horrible waste of resources.
# Instead, do this on much lower level - xenstore info for
# xen-pciback driver, where we get all the info at once
xs = app.vmm.xs
devices = {}
for domid in xs.ls('', 'backend/pci'):
for devid in xs.ls('', 'backend/pci/' + domid):
devpath = 'backend/pci/' + domid + '/' + devid
domain_name = xs.read('', devpath + '/domain')
try:
domain = app.domains[domain_name]
except KeyError:
# unknown domain - maybe from another qubes.xml?
continue
devnum = xs.read('', devpath + '/num_devs')
for dev in range(int(devnum)):
dbdf = xs.read('', devpath + '/dev-' + str(dev))
bdf = dbdf[len('0000:'):]
devices[bdf] = domain
return devices
def _device_desc(hostdev_xml):
return '{devclass}: {vendor} {product}'.format(
devclass=pcidev_class(hostdev_xml),
vendor=hostdev_xml.findtext('capability/vendor'),
product=hostdev_xml.findtext('capability/product'),
)
class PCIDevice(qubes.devices.DeviceInfo):
# pylint: disable=too-few-public-methods
regex = re.compile(
r'^(?P<bus>[0-9a-f]+):(?P<device>[0-9a-f]+)\.(?P<function>[0-9a-f]+)$')
libvirt_regex = re.compile(
r'^pci_0000_(?P<bus>[0-9a-f]+)_(?P<device>[0-9a-f]+)_'
r'(?P<function>[0-9a-f]+)$')
def __init__(self, backend_domain, ident, libvirt_name=None):
if libvirt_name:
dev_match = self.libvirt_regex.match(libvirt_name)
assert dev_match
ident = '{bus}:{device}.{function}'.format(**dev_match.groupdict())
super(PCIDevice, self).__init__(backend_domain, ident, None)
# lazy loading
self._description = None
@property
def libvirt_name(self):
# pylint: disable=no-member
# noinspection PyUnresolvedReferences
return 'pci_0000_{}_{}_{}'.format(self.bus, self.device, self.function)
@property
def description(self):
if self._description is None:
hostdev_details = \
self.backend_domain.app.vmm.libvirt_conn.nodeDeviceLookupByName(
self.libvirt_name
)
self._description = _device_desc(lxml.etree.fromstring(
hostdev_details.XMLDesc()))
return self._description
@property
def frontend_domain(self):
# TODO: cache this
all_attached = attached_devices(self.backend_domain.app)
return all_attached.get(self.ident, None)
class PCIDeviceExtension(qubes.ext.Extension):
def __init__(self):
super(PCIDeviceExtension, self).__init__()
# lazy load this
self.pci_classes = {}
@qubes.ext.handler('device-list:pci')
def on_device_list_pci(self, vm, event):
# pylint: disable=unused-argument,no-self-use
# only dom0 expose PCI devices
if vm.qid != 0:
return
for dev in vm.app.vmm.libvirt_conn.listAllDevices():
if 'pci' not in dev.listCaps():
continue
xml_desc = lxml.etree.fromstring(dev.XMLDesc())
libvirt_name = xml_desc.findtext('name')
yield PCIDevice(vm, None, libvirt_name=libvirt_name)
@qubes.ext.handler('device-get:pci')
def on_device_get_pci(self, vm, event, ident):
# pylint: disable=unused-argument,no-self-use
if not vm.app.vmm.offline_mode:
yield PCIDevice(vm, ident)
@qubes.ext.handler('device-list-attached:pci')
def on_device_list_attached(self, vm, event, **kwargs):
# pylint: disable=unused-argument,no-self-use
if not vm.is_running() or isinstance(vm, qubes.vm.adminvm.AdminVM):
return
xml_desc = lxml.etree.fromstring(vm.libvirt_domain.XMLDesc())
for hostdev in xml_desc.findall('devices/hostdev'):
if hostdev.get('type') != 'pci':
continue
address = hostdev.find('source/address')
bus = address.get('bus')[2:]
device = address.get('slot')[2:]
function = address.get('function')[2:]
ident = '{bus}:{device}.{function}'.format(
bus=bus,
device=device,
function=function,
)
yield PCIDevice(vm.app.domains[0], ident)
@qubes.ext.handler('device-pre-attach:pci')
def on_device_pre_attached_pci(self, vm, event, device):
# pylint: disable=unused-argument
if not os.path.exists('/sys/bus/pci/devices/0000:{}'.format(
device.ident)):
raise qubes.exc.QubesException(
'Invalid PCI device: {}'.format(device.ident))
if not vm.is_running():
return
try:
self.bind_pci_to_pciback(vm.app, device)
vm.libvirt_domain.attachDevice(
vm.app.env.get_template('libvirt/devices/pci.xml').render(
device=device, vm=vm))
except subprocess.CalledProcessError as e:
vm.log.exception('Failed to attach PCI device {!r} on the fly,'
' changes will be seen after VM restart.'.format(
device.ident), e)
@qubes.ext.handler('device-pre-detach:pci')
def on_device_pre_detached_pci(self, vm, event, device):
# pylint: disable=unused-argument,no-self-use
if not vm.is_running():
return
# this cannot be converted to general API, because there is no
# provision in libvirt for extracting device-side BDF; we need it for
# qubes.DetachPciDevice, which unbinds driver, not to oops the kernel
p = subprocess.Popen(['xl', 'pci-list', str(vm.xid)],
stdout=subprocess.PIPE)
result = p.communicate()[0]
m = re.search(r'^(\d+.\d+)\s+0000:{}$'.format(device.ident), result,
flags=re.MULTILINE)
if not m:
vm.log.error('Device %s already detached', device.ident)
return
vmdev = m.group(1)
try:
vm.run_service('qubes.DetachPciDevice',
user='root', input='00:{}'.format(vmdev))
vm.libvirt_domain.detachDevice(
vm.app.env.get_template('libvirt/devices/pci.xml').render(
device=device, vm=vm))
except (subprocess.CalledProcessError, libvirt.libvirtError) as e:
vm.log.exception('Failed to detach PCI device {!r} on the fly,'
' changes will be seen after VM restart.'.format(
device.ident), e)
raise
@qubes.ext.handler('domain-pre-start')
def on_domain_pre_start(self, vm, _event, **_kwargs):
# Bind pci devices to pciback driver
for pci in vm.devices['pci'].attached():
self.bind_pci_to_pciback(vm.app, pci)
@staticmethod
def bind_pci_to_pciback(app, device):
'''Bind PCI device to pciback driver.
:param qubes.devices.PCIDevice device: device to attach
Devices should be unbound from their normal kernel drivers and bound to
the dummy driver, which allows for attaching them to a domain.
'''
try:
node = app.vmm.libvirt_conn.nodeDeviceLookupByName(
device.libvirt_name)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_NODE_DEVICE:
raise qubes.exc.QubesException(
'PCI device {!r} does not exist'.format(
device))
raise
try:
node.dettach()
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR:
# allreaddy dettached
pass
else:
raise

View File

@ -255,9 +255,9 @@ class Storage(object):
if not rw: if not rw:
lxml.etree.SubElement(disk, 'readonly') lxml.etree.SubElement(disk, 'readonly')
if self.vm.qid != 0: if volume.domain is not None:
lxml.etree.SubElement(disk, 'backenddomain').set( lxml.etree.SubElement(disk, 'backenddomain').set(
'name', volume.pool.split('p_')[1]) 'name', volume.domain.name)
xml_string = lxml.etree.tostring(disk, encoding='utf-8') xml_string = lxml.etree.tostring(disk, encoding='utf-8')
self.vm.libvirt_domain.attachDevice(xml_string) self.vm.libvirt_domain.attachDevice(xml_string)

View File

@ -86,7 +86,7 @@ class DomainPool(Pool):
devices[name][atr] = value devices[name][atr] = value
return [DomainVolume(n, self.name, **atrs) return [DomainVolume(n, self.vm, self.name, **atrs)
for n, atrs in devices.items()] for n, atrs in devices.items()]
def clone(self, source, target): def clone(self, source, target):
@ -99,11 +99,12 @@ class DomainPool(Pool):
class DomainVolume(Volume): class DomainVolume(Volume):
''' A volume provided by a block device in an domain ''' ''' A volume provided by a block device in an domain '''
def __init__(self, name, pool, desc, mode, **kwargs): def __init__(self, vm, name, pool, desc, mode, **kwargs):
rw = (mode == 'w') rw = (mode == 'w')
super(DomainVolume, self).__init__(desc, pool, vid=name, removable=True, super(DomainVolume, self).__init__(desc, pool, vid=name, removable=True,
rw=rw, **kwargs) rw=rw, **kwargs)
self.domain = vm
@property @property
def revisions(self): def revisions(self):

View File

@ -31,6 +31,7 @@ import os.path
import re import re
import subprocess import subprocess
import qubes.devices
import qubes.storage import qubes.storage
BLKSIZE = 512 BLKSIZE = 512

View File

@ -135,12 +135,15 @@ class TestEmitter(qubes.events.Emitter):
self.fired_events = collections.Counter() self.fired_events = collections.Counter()
def fire_event(self, event, *args, **kwargs): def fire_event(self, event, *args, **kwargs):
super(TestEmitter, self).fire_event(event, *args, **kwargs) effects = super(TestEmitter, self).fire_event(event, *args, **kwargs)
self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1 self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1
return effects
def fire_event_pre(self, event, *args, **kwargs): def fire_event_pre(self, event, *args, **kwargs):
super(TestEmitter, self).fire_event_pre(event, *args, **kwargs) effects = super(TestEmitter, self).fire_event_pre(event, *args,
**kwargs)
self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1 self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1
return effects
def expectedFailureIfTemplate(templates): def expectedFailureIfTemplate(templates):
""" """
@ -972,6 +975,8 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
'qubes.tests.vm.mix.net', 'qubes.tests.vm.mix.net',
'qubes.tests.vm.adminvm', 'qubes.tests.vm.adminvm',
'qubes.tests.app', 'qubes.tests.app',
'qubes.tests.tools.qvm_device',
'qubes.tests.tools.qvm_ls',
): ):
tests.addTests(loader.loadTestsFromName(modname)) tests.addTests(loader.loadTestsFromName(modname))
@ -984,6 +989,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
for modname in ( for modname in (
# integration tests # integration tests
'qubes.tests.int.basic', 'qubes.tests.int.basic',
'qubes.tests.int.devices_pci',
'qubes.tests.int.dom0_update', 'qubes.tests.int.dom0_update',
'qubes.tests.int.network', 'qubes.tests.int.network',
'qubes.tests.int.dispvm', 'qubes.tests.int.dispvm',

View File

@ -27,24 +27,68 @@ import qubes.devices
import qubes.tests import qubes.tests
class TestDevice(qubes.devices.DeviceInfo):
pass
class TestVMCollection(dict):
def __iter__(self):
return iter(set(self.values()))
class TestApp(object):
def __init__(self):
self.domains = TestVMCollection()
class TestVM(qubes.tests.TestEmitter):
def __init__(self, app, name, *args, **kwargs):
super(TestVM, self).__init__(*args, **kwargs)
self.app = app
self.name = name
self.device = TestDevice(self, 'testdev', 'Description')
self.events_enabled = True
self.devices = {
'testclass': qubes.devices.DeviceCollection(self, 'testclass')
}
self.app.domains[name] = self
self.app.domains[self] = self
def __str__(self):
return self.name
@qubes.events.handler('device-list-attached:testclass')
def dev_testclass_list_attached(self, event, persistent):
for vm in self.app.domains:
if vm.device.frontend_domain == self:
yield vm.device
@qubes.events.handler('device-list:testclass')
def dev_testclass_list(self, event):
yield self.device
class TC_00_DeviceCollection(qubes.tests.QubesTestCase): class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
def setUp(self): def setUp(self):
self.emitter = qubes.tests.TestEmitter() self.app = TestApp()
self.collection = qubes.devices.DeviceCollection(self.emitter, 'testclass') self.emitter = TestVM(self.app, 'vm')
self.app.domains['vm'] = self.emitter
self.device = self.emitter.device
self.collection = self.emitter.devices['testclass']
def test_000_init(self): def test_000_init(self):
self.assertFalse(self.collection._set) self.assertFalse(self.collection._set)
def test_001_attach(self): def test_001_attach(self):
self.collection.attach('testdev') self.collection.attach(self.device)
self.assertEventFired(self.emitter, 'device-pre-attach:testclass') self.assertEventFired(self.emitter, 'device-pre-attach:testclass')
self.assertEventFired(self.emitter, 'device-attach:testclass') self.assertEventFired(self.emitter, 'device-attach:testclass')
self.assertEventNotFired(self.emitter, 'device-pre-detach:testclass') self.assertEventNotFired(self.emitter, 'device-pre-detach:testclass')
self.assertEventNotFired(self.emitter, 'device-detach:testclass') self.assertEventNotFired(self.emitter, 'device-detach:testclass')
def test_002_detach(self): def test_002_detach(self):
self.collection.attach('testdev') self.collection.attach(self.device)
self.collection.detach('testdev') self.collection.detach(self.device)
self.assertEventFired(self.emitter, 'device-pre-attach:testclass') self.assertEventFired(self.emitter, 'device-pre-attach:testclass')
self.assertEventFired(self.emitter, 'device-attach:testclass') self.assertEventFired(self.emitter, 'device-attach:testclass')
self.assertEventFired(self.emitter, 'device-pre-detach:testclass') self.assertEventFired(self.emitter, 'device-pre-detach:testclass')
@ -52,31 +96,56 @@ class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
def test_010_empty_detach(self): def test_010_empty_detach(self):
with self.assertRaises(LookupError): with self.assertRaises(LookupError):
self.collection.detach('testdev') self.collection.detach(self.device)
def test_011_double_attach(self): def test_011_double_attach(self):
self.collection.attach('testdev') self.collection.attach(self.device)
with self.assertRaises(LookupError): with self.assertRaises(LookupError):
self.collection.attach('testdev') self.collection.attach(self.device)
def test_012_double_detach(self): def test_012_double_detach(self):
self.collection.attach('testdev') self.collection.attach(self.device)
self.collection.detach('testdev') self.collection.detach(self.device)
with self.assertRaises(LookupError): with self.assertRaises(LookupError):
self.collection.detach('testdev') self.collection.detach(self.device)
def test_013_list_attached_persistent(self):
self.assertEqual(set([]), set(self.collection.attached()))
self.collection.attach(self.device)
self.assertEqual({self.device}, set(self.collection.attached()))
self.assertEqual({self.device},
set(self.collection.attached(persistent=True)))
self.assertEqual(set([]),
set(self.collection.attached(persistent=False)))
def test_014_list_attached_non_persistent(self):
self.collection.attach(self.device, persistent=False)
# device-attach event not implemented, so manipulate object manually
self.device.frontend_domain = self.emitter
self.assertEqual({self.device},
set(self.collection.attached()))
self.assertEqual(set([]),
set(self.collection.attached(persistent=True)))
self.assertEqual({self.device},
set(self.collection.attached(persistent=False)))
def test_015_list_available(self):
self.assertEqual({self.device}, set(self.collection))
class TC_01_DeviceManager(qubes.tests.QubesTestCase): class TC_01_DeviceManager(qubes.tests.QubesTestCase):
def setUp(self): def setUp(self):
self.emitter = qubes.tests.TestEmitter() self.app = TestApp()
self.emitter = TestVM(self.app, 'vm')
self.manager = qubes.devices.DeviceManager(self.emitter) self.manager = qubes.devices.DeviceManager(self.emitter)
def test_000_init(self): def test_000_init(self):
self.assertEqual(self.manager, {}) self.assertEqual(self.manager, {})
def test_001_missing(self): def test_001_missing(self):
self.manager['testclass'].attach('testdev') device = TestDevice(self.emitter.app.domains['vm'], 'testdev')
self.manager['testclass'].attach(device)
self.assertEventFired(self.emitter, 'device-attach:testclass') self.assertEventFired(self.emitter, 'device-attach:testclass')

View File

@ -67,7 +67,7 @@ class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
self.assertEquals(orig_value, restored_value, self.assertEquals(orig_value, restored_value,
"VM {} - property {} not properly restored".format( "VM {} - property {} not properly restored".format(
vm.name, prop)) vm.name, prop))
for dev_class in ["pci", "usb"]: for dev_class in vm.devices.keys():
for dev in vm.devices[dev_class]: for dev in vm.devices[dev_class]:
self.assertIn(dev, restored_vm.devices[dev_class]) self.assertIn(dev, restored_vm.devices[dev_class])

View File

@ -0,0 +1,173 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
# pylint: disable=protected-access,pointless-statement
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015-2016 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2015-2016 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import os
import subprocess
import time
import qubes.devices
import qubes.ext.pci
import qubes.tests
class TC_00_Devices_PCI(qubes.tests.SystemTestsMixin,
qubes.tests.QubesTestCase):
def setUp(self):
super(TC_00_Devices_PCI, self).setUp()
if self._testMethodName not in ['test_000_list']:
pcidev = os.environ.get('QUBES_TEST_PCIDEV', None)
if pcidev is None:
self.skipTest('Specify PCI device with QUBES_TEST_PCIDEV '
'environment variable')
self.dev = self.app.domains[0].devices['pci'][pcidev]
if isinstance(self.dev, qubes.devices.UnknownDevice):
self.skipTest('Specified device {} does not exists'.format(pcidev))
self.init_default_template()
self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm'),
label='red',
)
self.vm.create_on_disk()
self.vm.features['pci-no-strict-reset/' + pcidev] = True
self.app.save()
def test_000_list(self):
p = subprocess.Popen(['lspci'], stdout=subprocess.PIPE)
# get a dict: BDF -> description
actual_devices = dict(
l.split(' (')[0].split(' ', 1)
for l in p.communicate()[0].splitlines())
for dev in self.app.domains[0].devices['pci']:
self.assertIsInstance(dev, qubes.ext.pci.PCIDevice)
self.assertEqual(dev.backend_domain, self.app.domains[0])
self.assertIn(dev.ident, actual_devices)
self.assertEqual(dev.description, actual_devices[dev.ident])
self.assertIsInstance(dev.frontend_domain,
(qubes.vm.BaseVM, None.__class__))
actual_devices.pop(dev.ident)
if actual_devices:
self.fail('Not all devices listed, missing: {}'.format(
actual_devices))
def test_010_attach_offline(self):
self.assertIsNone(self.dev.frontend_domain)
self.assertNotIn(self.dev, self.vm.devices['pci'].attached())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
self.vm.devices['pci'].attach(self.dev)
self.app.save()
# still should be None, as domain is not started yet
self.assertIsNone(self.dev.frontend_domain)
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
self.vm.start()
self.assertEqual(self.dev.frontend_domain, self.vm)
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate()
self.assertIn(self.dev.description, stdout)
def test_011_attach_online(self):
self.vm.start()
self.vm.devices['pci'].attach(self.dev)
self.assertEqual(self.dev.frontend_domain, self.vm)
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
# give VM kernel some time to discover new device
time.sleep(1)
p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate()
self.assertIn(self.dev.description, stdout)
def test_012_attach_online_temp(self):
self.vm.start()
self.vm.devices['pci'].attach(self.dev, persistent=False)
self.assertEqual(self.dev.frontend_domain, self.vm)
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
# give VM kernel some time to discover new device
time.sleep(1)
p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate()
self.assertIn(self.dev.description, stdout)
def test_020_detach_online(self):
self.vm.devices['pci'].attach(self.dev)
self.app.save()
self.vm.start()
self.assertIn(self.dev, self.vm.devices['pci'].attached())
self.assertIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
self.assertEqual(self.dev.frontend_domain, self.vm)
self.vm.devices['pci'].detach(self.dev)
self.assertIsNone(self.dev.frontend_domain)
self.assertNotIn(self.dev, self.vm.devices['pci'].attached())
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=True))
self.assertNotIn(self.dev, self.vm.devices['pci'].attached(
persistent=False))
p = self.vm.run('lspci', passio_popen=True)
(stdout, _) = p.communicate()
self.assertNotIn(self.dev.description, stdout)
# can't do this right now because of kernel bug - it cause the whole
# PCI bus being deregistered, which emit some warning in sysfs
# handling code (removing non-existing "0000:00" group)
#
# p = self.vm.run('dmesg', passio_popen=True)
# (stdout, _) = p.communicate()
# # check for potential oops
# self.assertNotIn('end trace', stdout)

View File

@ -1,6 +1,5 @@
#!/usr/bin/python2 -O #!/usr/bin/python2 -O
# vim: fileencoding=utf-8 # vim: fileencoding=utf-8
# pylint: disable=protected-access,pointless-statement
# #
# The Qubes OS Project, https://www.qubes-os.org/ # The Qubes OS Project, https://www.qubes-os.org/
# #

View File

@ -1 +1,42 @@
# pylint: skip-file #!/usr/bin/python2 -O
# vim: fileencoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import sys
try:
import StringIO
except ImportError:
from io import StringIO
class StdoutBuffer(object):
def __init__(self):
self.stdout = StringIO.StringIO()
def __enter__(self):
sys.stdout = self.stdout
return self.stdout
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = sys.__stdout__
return False

View File

@ -0,0 +1,150 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
# pylint: disable=protected-access,pointless-statement
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import qubes
import qubes.devices
import qubes.tools.qvm_device
import qubes.tests
import qubes.tests.devices
import qubes.tests.tools
class TestNamespace(object):
def __init__(self, app, domains=None, device=None):
super(TestNamespace, self).__init__()
self.app = app
self.devclass = 'testclass'
if domains:
self.domains = domains
if device:
self.device = device
class TC_00_Actions(qubes.tests.QubesTestCase):
def setUp(self):
super(TC_00_Actions, self).setUp()
self.app = qubes.tests.devices.TestApp()
self.vm1 = qubes.tests.devices.TestVM(self.app, 'vm1')
self.vm2 = qubes.tests.devices.TestVM(self.app, 'vm2')
self.device = self.vm2.device
def test_000_list_all(self):
args = TestNamespace(self.app)
with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list:testclass')
self.assertEventFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm1:testdev Description \n'
'vm2:testdev Description \n'
)
def test_001_list_one(self):
args = TestNamespace(self.app, [self.vm1])
# simulate attach
self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(self.device)
with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm1,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm2:testdev Description vm1\n'
)
def test_002_list_one_non_persistent(self):
args = TestNamespace(self.app, [self.vm1])
# simulate attach
self.vm2.device.frontend_domain = self.vm1
with qubes.tests.tools.StdoutBuffer() as buf:
qubes.tools.qvm_device.list_devices(args)
self.assertEventFired(self.vm1,
'device-list-attached:testclass')
self.assertEventNotFired(self.vm1,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list:testclass')
self.assertEventNotFired(self.vm2,
'device-list-attached:testclass')
self.assertEqual(
buf.getvalue(),
'vm2:testdev Description vm1\n'
)
def test_010_attach(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
qubes.tools.qvm_device.attach_device(args)
self.assertEventFired(self.vm1,
'device-attach:testclass', [self.device])
self.assertEventNotFired(self.vm2,
'device-attach:testclass', [self.device])
def test_011_double_attach(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
qubes.tools.qvm_device.attach_device(args)
with self.assertRaises(qubes.exc.QubesException):
qubes.tools.qvm_device.attach_device(args)
def test_020_detach(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
# simulate attach
self.vm2.device.frontend_domain = self.vm1
self.vm1.devices['testclass']._set.add(self.device)
qubes.tools.qvm_device.detach_device(args)
def test_021_detach_not_attached(self):
args = TestNamespace(
self.app,
[self.vm1],
self.device
)
with self.assertRaises(qubes.exc.QubesException):
qubes.tools.qvm_device.detach_device(args)

View File

@ -31,6 +31,18 @@ import qubes.vm
import qubes.tests import qubes.tests
class TestVMM(object):
def __init__(self):
super(TestVMM, self).__init__()
self.offline_mode = True
class TestApp(object):
def __init__(self):
super(TestApp, self).__init__()
self.domains = {}
self.vmm = TestVMM()
class TestVM(qubes.vm.BaseVM): class TestVM(qubes.vm.BaseVM):
qid = qubes.property('qid', type=int) qid = qubes.property('qid', type=int)
@ -66,7 +78,7 @@ class TC_10_BaseVM(qubes.tests.QubesTestCase):
</features> </features>
<devices class="pci"> <devices class="pci">
<device>00:11.22</device> <device backend-domain="domain1" id="00:11.22"/>
</devices> </devices>
<devices class="usb" /> <devices class="usb" />
@ -81,7 +93,8 @@ class TC_10_BaseVM(qubes.tests.QubesTestCase):
def test_000_load(self): def test_000_load(self):
node = self.xml.xpath('//domain')[0] node = self.xml.xpath('//domain')[0]
vm = TestVM(None, node) vm = TestVM(TestApp(), node)
vm.app.domains['domain1'] = vm
vm.load_properties(load_stage=None) vm.load_properties(load_stage=None)
vm.load_extras() vm.load_extras()
@ -97,7 +110,8 @@ class TC_10_BaseVM(qubes.tests.QubesTestCase):
}) })
self.assertItemsEqual(vm.devices.keys(), ('pci',)) self.assertItemsEqual(vm.devices.keys(), ('pci',))
self.assertItemsEqual(vm.devices['pci'], ('00:11.22',)) self.assertItemsEqual(list(vm.devices['pci'].attached(persistent=True)),
[qubes.ext.pci.PCIDevice(vm, '00:11.22')])
self.assertXMLIsValid(vm.__xml__(), 'domain.rng') self.assertXMLIsValid(vm.__xml__(), 'domain.rng')

View File

@ -525,6 +525,13 @@ def print_table(table):
cmd = ['column', '-t', '-s', unit_separator] cmd = ['column', '-t', '-s', unit_separator]
text_table = '\n'.join([unit_separator.join(row) for row in table]) text_table = '\n'.join([unit_separator.join(row) for row in table])
p = subprocess.Popen(cmd, stdin=subprocess.PIPE) # for tests...
p.stdin.write(text_table) if sys.stdout != sys.__stdout__:
p.communicate() p = subprocess.Popen(cmd + ['-c', '80'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
p.stdin.write(text_table)
(out, _) = p.communicate()
sys.stdout.write(out)
else:
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
p.communicate(text_table)

203
qubes/tools/qvm_device.py Normal file
View File

@ -0,0 +1,203 @@
#!/usr/bin/python2
# coding=utf-8
# pylint: disable=C,R
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''Qubes volume and block device managment'''
from __future__ import print_function
import argparse
import os
import sys
import qubes
import qubes.devices
import qubes.exc
import qubes.tools
def prepare_table(dev_list):
''' Converts a list of :py:class:`qubes.devices.DeviceInfo` objects to a
list of tupples for the :py:func:`qubes.tools.print_table`.
If :program:`qvm-devices` is running in a TTY, it will ommit duplicate
data.
:param list dev_list: List of :py:class:`qubes.devices.DeviceInfo`
objects.
:returns: list of tupples
'''
output = []
header = []
if sys.stdout.isatty():
header += [('BACKEND:DEVID', 'DESCRIPTION', 'USED BY')] # NOQA
for dev in dev_list:
output += [(
"{!s}:{!s}".format(dev.backend_domain, dev.ident),
dev.description,
str(dev.frontend_domain) if dev.frontend_domain else "",
)]
return header + sorted(output)
def list_devices(args):
''' Called by the parser to execute the qubes-devices list
subcommand. '''
app = args.app
result = []
if hasattr(args, 'domains') and args.domains:
for domain in args.domains:
result.extend(domain.devices[args.devclass].attached())
else:
for backend in app.domains:
result.extend(backend.devices[args.devclass])
qubes.tools.print_table(prepare_table(result))
def attach_device(args):
''' Called by the parser to execute the :program:`qvm-devices attach`
subcommand.
'''
device = args.device
vm = args.domains[0]
vm.devices[args.devclass].attach(device)
def detach_device(args):
''' Called by the parser to execute the :program:`qvm-devices detach`
subcommand.
'''
device = args.device
vm = args.domains[0]
vm.devices[args.devclass].detach(device)
def init_list_parser(sub_parsers):
''' Configures the parser for the :program:`qvm-devices list` subcommand '''
# pylint: disable=protected-access
list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'),
help='list devices')
vm_name_group = qubes.tools.VmNameGroup(
list_parser, required=False, vm_action=qubes.tools.VmNameAction,
help='list devices assigned to specific domain(s)')
list_parser._mutually_exclusive_groups.append(vm_name_group)
list_parser.set_defaults(func=list_devices)
class DeviceAction(qubes.tools.QubesAction):
''' Action for argument parser that gets the
:py:class:``qubes.storage.Volume`` from a POOL_NAME:VOLUME_ID string.
'''
# pylint: disable=too-few-public-methods
def __init__(self, help='A domain & device id combination',
required=True, allow_unknown=False, **kwargs):
# pylint: disable=redefined-builtin
super(DeviceAction, self).__init__(help=help, required=required,
**kwargs)
self.allow_unknown = allow_unknown
def __call__(self, parser, namespace, values, option_string=None):
''' Set ``namespace.device`` to ``values`` '''
setattr(namespace, self.dest, values)
def parse_qubes_app(self, parser, namespace):
''' Acquire the :py:class:``qubes.devices.DeviceInfo`` object from
``namespace.app``.
'''
assert hasattr(namespace, 'app')
assert hasattr(namespace, 'devclass')
app = namespace.app
devclass = namespace.devclass
try:
backend_name, devid = getattr(namespace, self.dest).split(':', 1)
try:
backend = app.domains[backend_name]
dev = backend.devices[devclass][devid]
if not self.allow_unknown and isinstance(dev,
qubes.devices.UnknownDevice):
parser.error_runtime('no device {!r} in qube {!r}'.format(
backend_name, devid))
except KeyError:
parser.error_runtime('no domain {!r}'.format(backend_name))
except ValueError:
parser.error('expected a domain & device id combination like '
'foo:bar')
def get_parser(device_class=None):
'''Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-block`.
'''
parser = qubes.tools.QubesArgumentParser(description=__doc__, want_app=True)
parser.register('action', 'parsers', qubes.tools.AliasedSubParsersAction)
if device_class:
parser.add_argument('devclass', const=device_class,
action='store_const',
help=argparse.SUPPRESS)
else:
parser.add_argument('devclass', metavar='DEVICE_CLASS', action='store',
help="Device class to manage ('pci', 'usb', etc)")
sub_parsers = parser.add_subparsers(
title='commands',
description="For more information see qvm-device command -h",
dest='command')
init_list_parser(sub_parsers)
attach_parser = sub_parsers.add_parser(
'attach', help="Attach device to domain", aliases=('at', 'a'))
attach_parser.add_argument('VMNAME', action=qubes.tools.RunningVmNameAction)
attach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
attach_parser.set_defaults(func=detach_device)
detach_parser = sub_parsers.add_parser(
"detach", help="Detach device from domain", aliases=('d', 'dt'))
detach_parser.add_argument('VMNAME', action=qubes.tools.RunningVmNameAction)
detach_parser.add_argument(metavar='BACKEND:DEVICE_ID', dest='device',
action=qubes.tools.VolumeAction)
detach_parser.set_defaults(func=detach_device)
return parser
def main(args=None):
'''Main routine of :program:`qvm-block`.'''
basename = os.path.basename(sys.argv[0])
devclass = None
if basename.startswith('qvm-') and basename != 'qvm-device':
devclass = basename[4:]
args = get_parser(devclass).parse_args(args)
try:
args.func(args)
except qubes.exc.QubesException as e:
print(e.message, file=sys.stderr)
return 1
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -198,7 +198,11 @@ class BaseVM(qubes.PropertyHolder):
for parent in self.xml.xpath('./devices'): for parent in self.xml.xpath('./devices'):
devclass = parent.get('class') devclass = parent.get('class')
for node in parent.xpath('./device'): for node in parent.xpath('./device'):
self.devices[devclass].attach(node.text) device = self.devices[devclass].devclass(
self.app.domains[node.get('backend-domain')],
node.get('id')
)
self.devices[devclass].attach(device)
# tags # tags
for node in self.xml.xpath('./tags/tag'): for node in self.xml.xpath('./tags/tag'):
@ -227,9 +231,10 @@ class BaseVM(qubes.PropertyHolder):
for devclass in self.devices: for devclass in self.devices:
devices = lxml.etree.Element('devices') devices = lxml.etree.Element('devices')
devices.set('class', devclass) devices.set('class', devclass)
for device in self.devices[devclass]: for device in self.devices[devclass].attached(persistent=True):
node = lxml.etree.Element('device') node = lxml.etree.Element('device')
node.text = device node.set('backend-domain', device.backend_domain.name)
node.set('id', device.ident)
devices.append(node) devices.append(node)
element.append(devices) element.append(devices)

View File

@ -243,7 +243,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# CORE2: swallowed uses_default_kernelopts # CORE2: swallowed uses_default_kernelopts
kernelopts = qubes.property('kernelopts', type=str, load_stage=4, kernelopts = qubes.property('kernelopts', type=str, load_stage=4,
default=(lambda self: qubes.config.defaults['kernelopts_pcidevs'] default=(lambda self: qubes.config.defaults['kernelopts_pcidevs']
if len(self.devices['pci']) > 0 if list(self.devices['pci'].attached(persistent=True))
else self.template.kernelopts if hasattr(self, 'template') else self.template.kernelopts if hasattr(self, 'template')
else qubes.config.defaults['kernelopts']), else qubes.config.defaults['kernelopts']),
ls_width=30, ls_width=30,
@ -476,6 +476,9 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.events_enabled = True self.events_enabled = True
self.fire_event('domain-init') self.fire_event('domain-init')
def __hash__(self):
return self.qid
def __xml__(self): def __xml__(self):
element = super(QubesVM, self).__xml__() element = super(QubesVM, self).__xml__()
@ -585,82 +588,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
raise qubes.exc.QubesException( raise qubes.exc.QubesException(
'Failed to reset autostart for VM in systemd') 'Failed to reset autostart for VM in systemd')
@qubes.events.handler('device-pre-attach:pci')
def on_device_pre_attached_pci(self, event, device):
# pylint: disable=unused-argument
if not os.path.exists('/sys/bus/pci/devices/0000:{}'.format(device)):
raise qubes.exc.QubesException(
'Invalid PCI device: {}'.format(device))
if not self.is_running():
return
try:
self.bind_pci_to_pciback(device)
self.libvirt_domain.attachDevice(
self.app.env.get_template('libvirt/devices/pci.xml').render(
device=device))
except subprocess.CalledProcessError as e:
self.log.exception('Failed to attach PCI device {!r} on the fly,'
' changes will be seen after VM restart.'.format(device), e)
@qubes.events.handler('device-pre-detach:pci')
def on_device_pre_detached_pci(self, event, device):
# pylint: disable=unused-argument
if not self.is_running():
return
# this cannot be converted to general API, because there is no
# provision in libvirt for extracting device-side BDF; we need it for
# qubes.DetachPciDevice, which unbinds driver, not to oops the kernel
p = subprocess.Popen(['xl', 'pci-list', str(self.xid)],
stdout=subprocess.PIPE)
result = p.communicate()[0]
m = re.search(r'^(\d+.\d+)\s+0000:{}$'.format(device), result,
flags=re.MULTILINE)
if not m:
self.log.error('Device %s already detached', device)
return
vmdev = m.group(1)
try:
self.run_service('qubes.DetachPciDevice',
user='root', input='00:{}'.format(vmdev))
self.libvirt_domain.detachDevice(
self.app.env.get_template('libvirt/devices/pci.xml').render(
device=device))
except (subprocess.CalledProcessError, libvirt.libvirtError) as e:
self.log.exception('Failed to detach PCI device {!r} on the fly,'
' changes will be seen after VM restart.'.format(device), e)
raise
def bind_pci_to_pciback(self, device):
'''Bind PCI device to pciback driver.
:param qubes.devices.PCIDevice device: device to attach
Devices should be unbound from their normal kernel drivers and bound to
the dummy driver, which allows for attaching them to a domain.
'''
try:
node = self.app.vmm.libvirt_conn.nodeDeviceLookupByName(
device.libvirt_name)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_NODE_DEVICE:
raise qubes.exc.QubesException(
'PCI device {!r} does not exist (domain {!r})'.format(
device, self.name))
raise
try:
node.dettach()
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR:
# allreaddy dettached
pass
else:
raise
# #
# methods for changing domain state # methods for changing domain state
# #
@ -699,10 +626,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
qmemman_client = self.request_memory(mem_required) qmemman_client = self.request_memory(mem_required)
# Bind pci devices to pciback driver
for pci in self.devices['pci']:
self.bind_pci_to_pciback(pci)
self.libvirt_domain.createWithFlags(libvirt.VIR_DOMAIN_START_PAUSED) self.libvirt_domain.createWithFlags(libvirt.VIR_DOMAIN_START_PAUSED)
try: try:
@ -802,7 +725,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if not self.is_running() and not self.is_paused(): if not self.is_running() and not self.is_paused():
raise qubes.exc.QubesVMNotRunningError(self) raise qubes.exc.QubesVMNotRunningError(self)
if len(self.devices['pci']) > 0: if list(self.devices['pci'].attached()):
raise qubes.exc.QubesNotImplementedError( raise qubes.exc.QubesNotImplementedError(
'Cannot suspend domain {!r} which has PCI devices attached' 'Cannot suspend domain {!r} which has PCI devices attached'
.format(self.name)) .format(self.name))

View File

@ -206,14 +206,24 @@ the parser will complain about missing combine= attribute on the second <start>.
<oneOrMore> <oneOrMore>
<element name="device"> <element name="device">
<doc:description> <doc:description>
One device. This tag should contain some One device. It's identified by by a pair of
identifier, format of which depends on backend domain and some identifier (device class
particular device class. dependant).
</doc:description> </doc:description>
<!-- TODO: pattern dependent on class! --> <attribute name="backend-domain">
<data type="string"> <doc:description>
<param name="pattern">[0-9a-f]{2}:[0-9a-f]{2}.[0-9a-f]{2}</param> Backend domain name.
</data> </doc:description>
<data type="string">
<param name="pattern">[a-z0-9_]+</param>
</data>
</attribute>
<attribute name="id">
<!-- TODO: pattern dependent on class! -->
<data type="string">
<param name="pattern">[0-9a-f]{2}:[0-9a-f]{2}.[0-9a-f]{2}</param>
</data>
</attribute>
</element> </element>
</oneOrMore> </oneOrMore>
</element> </element>

View File

@ -250,6 +250,7 @@ fi
%{python_sitelib}/qubes/tools/qvm_block.py* %{python_sitelib}/qubes/tools/qvm_block.py*
%{python_sitelib}/qubes/tools/qubes_lvm.py* %{python_sitelib}/qubes/tools/qubes_lvm.py*
%{python_sitelib}/qubes/tools/qvm_create.py* %{python_sitelib}/qubes/tools/qvm_create.py*
%{python_sitelib}/qubes/tools/qvm_device.py*
%{python_sitelib}/qubes/tools/qvm_features.py* %{python_sitelib}/qubes/tools/qvm_features.py*
%{python_sitelib}/qubes/tools/qvm_check.py* %{python_sitelib}/qubes/tools/qvm_check.py*
%{python_sitelib}/qubes/tools/qvm_clone.py* %{python_sitelib}/qubes/tools/qvm_clone.py*
@ -268,6 +269,7 @@ fi
%dir %{python_sitelib}/qubes/ext %dir %{python_sitelib}/qubes/ext
%{python_sitelib}/qubes/ext/__init__.py* %{python_sitelib}/qubes/ext/__init__.py*
%{python_sitelib}/qubes/ext/gui.py* %{python_sitelib}/qubes/ext/gui.py*
%{python_sitelib}/qubes/ext/pci.py*
%{python_sitelib}/qubes/ext/qubesmanager.py* %{python_sitelib}/qubes/ext/qubesmanager.py*
%{python_sitelib}/qubes/ext/r3compatibility.py* %{python_sitelib}/qubes/ext/r3compatibility.py*
@ -297,6 +299,7 @@ fi
%dir %{python_sitelib}/qubes/tests/tools %dir %{python_sitelib}/qubes/tests/tools
%{python_sitelib}/qubes/tests/tools/__init__.py* %{python_sitelib}/qubes/tests/tools/__init__.py*
%{python_sitelib}/qubes/tests/tools/init.py* %{python_sitelib}/qubes/tests/tools/init.py*
%{python_sitelib}/qubes/tests/tools/qvm_device.py*
%{python_sitelib}/qubes/tests/tools/qvm_ls.py* %{python_sitelib}/qubes/tests/tools/qvm_ls.py*
%dir %{python_sitelib}/qubes/tests/int %dir %{python_sitelib}/qubes/tests/int
@ -304,6 +307,7 @@ fi
%{python_sitelib}/qubes/tests/int/backup.py* %{python_sitelib}/qubes/tests/int/backup.py*
%{python_sitelib}/qubes/tests/int/backupcompatibility.py* %{python_sitelib}/qubes/tests/int/backupcompatibility.py*
%{python_sitelib}/qubes/tests/int/basic.py* %{python_sitelib}/qubes/tests/int/basic.py*
%{python_sitelib}/qubes/tests/int/devices_pci.py*
%{python_sitelib}/qubes/tests/int/dispvm.py* %{python_sitelib}/qubes/tests/int/dispvm.py*
%{python_sitelib}/qubes/tests/int/dom0_update.py* %{python_sitelib}/qubes/tests/int/dom0_update.py*
%{python_sitelib}/qubes/tests/int/network.py* %{python_sitelib}/qubes/tests/int/network.py*

View File

@ -40,9 +40,11 @@ if __name__ == '__main__':
'qubes.ext.qubesmanager = qubes.ext.qubesmanager:QubesManager', 'qubes.ext.qubesmanager = qubes.ext.qubesmanager:QubesManager',
'qubes.ext.gui = qubes.ext.gui:GUI', 'qubes.ext.gui = qubes.ext.gui:GUI',
'qubes.ext.r3compatibility = qubes.ext.r3compatibility:R3Compatibility', 'qubes.ext.r3compatibility = qubes.ext.r3compatibility:R3Compatibility',
'qubes.ext.pci = qubes.ext.pci:PCIDeviceExtension',
], ],
'qubes.devices': [ 'qubes.devices': [
'pci = qubes.devices:PCIDevice', 'pci = qubes.ext.pci:PCIDevice',
'testclass = qubes.tests.devices:TestDevice',
], ],
'qubes.storage': [ 'qubes.storage': [
'file = qubes.storage.file:FilePool', 'file = qubes.storage.file:FilePool',

View File

@ -1,4 +1,8 @@
<hostdev type="pci" managed="yes"> <hostdev type="pci" managed="yes"
{% if vm.features.get('pci-no-strict-reset/' + device.ident, False) %}
nostrictreset="yes"
{% endif %}
>
<source> <source>
<address <address
bus="0x{{ device.bus }}" bus="0x{{ device.bus }}"

View File

@ -27,7 +27,8 @@
<viridian/> <viridian/>
{% endif %} {% endif %}
{% if vm.devices['pci'] and vm.features.get('pci-e820-host', True) %} {% if vm.devices['pci'].attached(persistent=True) | list
and vm.features.get('pci-e820-host', True) %}
<xen> <xen>
<e820_host state="on"/> <e820_host state="on"/>
</xen> </xen>
@ -80,7 +81,7 @@
{% endif %} {% endif %}
{% if device.domain %} {% if device.domain %}
<domain name="{{ domain }}" /> <backenddomain name="{{ domain }}" />
{% endif %} {% endif %}
{% if device.script %} {% if device.script %}
@ -90,10 +91,10 @@
{% endfor %} {% endfor %}
{% if vm.netvm %} {% if vm.netvm %}
{% include 'libvirt/devices/net.xml' %} {% include 'libvirt/devices/net.xml' with context %}
{% endif %} {% endif %}
{% for device in vm.devices.pci %} {% for device in vm.devices.pci.attached(persistent=True) %}
{% include 'libvirt/devices/pci.xml' %} {% include 'libvirt/devices/pci.xml' %}
{% endfor %} {% endfor %}