5315bbf8f9
When qubesd returns an name of VM or other object, as part of another call (reading a property, listing devices etc), it's safe to assume that object exists. Do not try to list it, which could be prevented by qrexec policy. This means a VM object would be returned (for example in vm.netvm property), which potentially could not be listed through app.domains collection. This may lead to some corner cases, but generally should ease handling of restricted policy. This does not affect practical information the management VM have access too, as those names are already returned. It's just client side python wrapper that didn't allowed to access them. QubesOS/qubes-issues#5099
310 lines
11 KiB
Python
310 lines
11 KiB
Python
# -*- encoding: utf8 -*-
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2015-2016 Wojtek Porczyk <woju@invisiblethingslab.com>
|
|
# Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
|
|
# Copyright (C) 2017 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation; either version 2.1 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License along
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""API for various types of devices.
|
|
|
|
Main concept is that some domain main
|
|
expose (potentially multiple) devices, which can be attached to other domains.
|
|
Devices can be of different classes (like 'pci', 'usb', etc). Each device
|
|
class is implemented by an extension.
|
|
|
|
Devices are identified by pair of (backend domain, `ident`), where `ident` is
|
|
:py:class:`str`.
|
|
"""
|
|
|
|
|
|
class DeviceAssignment(object): # pylint: disable=too-few-public-methods
|
|
""" Maps a device to a frontend_domain. """
|
|
|
|
def __init__(self, backend_domain, ident, options=None, persistent=False,
|
|
frontend_domain=None, devclass=None):
|
|
self.backend_domain = backend_domain
|
|
self.ident = ident
|
|
self.devclass = devclass
|
|
self.options = options or {}
|
|
self.persistent = persistent
|
|
self.frontend_domain = frontend_domain
|
|
|
|
def __repr__(self):
|
|
return "[%s]:%s" % (self.backend_domain, self.ident)
|
|
|
|
def __hash__(self):
|
|
return hash((self.backend_domain, self.ident))
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(self, other.__class__):
|
|
return NotImplemented
|
|
|
|
return self.backend_domain == other.backend_domain \
|
|
and self.ident == other.ident
|
|
|
|
def clone(self):
|
|
"""Clone object instance"""
|
|
return self.__class__(
|
|
self.backend_domain,
|
|
self.ident,
|
|
self.options,
|
|
self.persistent,
|
|
self.frontend_domain,
|
|
self.devclass,
|
|
)
|
|
|
|
@property
|
|
def device(self):
|
|
"""Get DeviceInfo object corresponding to this DeviceAssignment"""
|
|
return self.backend_domain.devices[self.devclass][self.ident]
|
|
|
|
|
|
class DeviceInfo(object):
|
|
""" Holds all information about a device """
|
|
|
|
# pylint: disable=too-few-public-methods
|
|
def __init__(self, backend_domain, devclass, ident, description=None,
|
|
**kwargs):
|
|
#: domain providing this device
|
|
self.backend_domain = backend_domain
|
|
#: device class
|
|
self.devclass = devclass
|
|
#: device identifier (unique for given domain and device type)
|
|
self.ident = ident
|
|
#: human readable description/name of the device
|
|
self.description = description
|
|
self.data = kwargs
|
|
|
|
def __hash__(self):
|
|
return hash((str(self.backend_domain), self.ident))
|
|
|
|
def __eq__(self, other):
|
|
try:
|
|
return (
|
|
self.devclass == other.devclass and
|
|
self.backend_domain == other.backend_domain and
|
|
self.ident == other.ident
|
|
)
|
|
except AttributeError:
|
|
return False
|
|
|
|
def __str__(self):
|
|
return '{!s}:{!s}'.format(self.backend_domain, self.ident)
|
|
|
|
|
|
class UnknownDevice(DeviceInfo):
|
|
# pylint: disable=too-few-public-methods
|
|
"""Unknown device - for example exposed by domain not running currently"""
|
|
|
|
def __init__(self, backend_domain, devclass, ident, description=None,
|
|
**kwargs):
|
|
if description is None:
|
|
description = "Unknown device"
|
|
super(UnknownDevice, self).__init__(backend_domain, devclass, ident,
|
|
description, **kwargs)
|
|
|
|
|
|
class DeviceCollection(object):
|
|
"""Bag for devices.
|
|
|
|
Used as default value for :py:meth:`DeviceManager.__missing__` factory.
|
|
|
|
:param vm: VM for which we manage devices
|
|
:param class_: device class
|
|
|
|
"""
|
|
|
|
def __init__(self, vm, class_):
|
|
self._vm = vm
|
|
self._class = class_
|
|
self._dev_cache = {}
|
|
|
|
def attach(self, device_assignment):
|
|
"""Attach (add) device to domain.
|
|
|
|
:param DeviceAssignment device_assignment: device object
|
|
"""
|
|
|
|
if not device_assignment.frontend_domain:
|
|
device_assignment.frontend_domain = self._vm
|
|
else:
|
|
assert device_assignment.frontend_domain == self._vm, \
|
|
"Trying to attach DeviceAssignment belonging to other domain"
|
|
if device_assignment.devclass is None:
|
|
device_assignment.devclass = self._class
|
|
else:
|
|
assert device_assignment.devclass == self._class
|
|
|
|
options = device_assignment.options.copy()
|
|
if device_assignment.persistent:
|
|
options['persistent'] = 'True'
|
|
options_str = ' '.join('{}={}'.format(opt, val)
|
|
for opt, val in sorted(options.items()))
|
|
self._vm.qubesd_call(None,
|
|
'admin.vm.device.{}.Attach'.format(self._class),
|
|
'{!s}+{!s}'.format(
|
|
device_assignment.backend_domain,
|
|
device_assignment.ident),
|
|
options_str.encode('utf-8'))
|
|
|
|
def detach(self, device_assignment):
|
|
"""Detach (remove) device from domain.
|
|
|
|
:param DeviceAssignment device_assignment: device to detach
|
|
(obtained from :py:meth:`assignments`)
|
|
"""
|
|
if not device_assignment.frontend_domain:
|
|
device_assignment.frontend_domain = self._vm
|
|
else:
|
|
assert device_assignment.frontend_domain == self._vm, \
|
|
"Trying to detach DeviceAssignment belonging to other domain"
|
|
if device_assignment.devclass is None:
|
|
device_assignment.devclass = self._class
|
|
else:
|
|
assert device_assignment.devclass == self._class
|
|
|
|
self._vm.qubesd_call(None,
|
|
'admin.vm.device.{}.Detach'.format(self._class),
|
|
'{!s}+{!s}'.format(
|
|
device_assignment.backend_domain,
|
|
device_assignment.ident))
|
|
|
|
def assignments(self, persistent=None):
|
|
"""List assignments for devices which are (or may be) attached to the
|
|
vm.
|
|
|
|
Devices may be attached persistently (so they are included in
|
|
:file:`qubes.xml`) or not. Device can also be in :file:`qubes.xml`,
|
|
but be temporarily detached.
|
|
|
|
:param bool persistent: only include devices which are or are not
|
|
attached persistently.
|
|
"""
|
|
|
|
assignments_str = self._vm.qubesd_call(None,
|
|
'admin.vm.device.{}.List'.format(
|
|
self._class)).decode()
|
|
for assignment_str in assignments_str.splitlines():
|
|
device, _, options_all = assignment_str.partition(' ')
|
|
backend_domain, ident = device.split('+', 1)
|
|
options = dict(opt_single.split('=', 1)
|
|
for opt_single in options_all.split(' ') if
|
|
opt_single)
|
|
dev_persistent = (options.pop('persistent', False) in
|
|
['True', 'yes', True])
|
|
if persistent is not None and dev_persistent != persistent:
|
|
continue
|
|
backend_domain = self._vm.app.domains.get_blind(backend_domain)
|
|
yield DeviceAssignment(backend_domain, ident, options,
|
|
persistent=dev_persistent,
|
|
frontend_domain=self._vm,
|
|
devclass=self._class)
|
|
|
|
def attached(self):
|
|
"""List devices which are (or may be) attached to this vm """
|
|
|
|
for assignment in self.assignments():
|
|
yield assignment.device
|
|
|
|
def persistent(self):
|
|
""" Devices persistently attached and safe to access before libvirt
|
|
bootstrap.
|
|
"""
|
|
|
|
for assignment in self.assignments(True):
|
|
yield assignment.device
|
|
|
|
def available(self):
|
|
"""List devices exposed by this vm"""
|
|
devices_str = \
|
|
self._vm.qubesd_call(None,
|
|
'admin.vm.device.{}.Available'.format(
|
|
self._class)).decode()
|
|
for dev_str in devices_str.splitlines():
|
|
ident, _, info = dev_str.partition(' ')
|
|
# description is special that it can contain spaces
|
|
info, _, description = info.partition('description=')
|
|
info_dict = dict(info_single.split('=', 1)
|
|
for info_single in info.split(' ') if info_single)
|
|
yield DeviceInfo(self._vm, self._class, ident,
|
|
description=description,
|
|
**info_dict)
|
|
|
|
def update_persistent(self, device, persistent):
|
|
"""Update `persistent` flag of already attached device.
|
|
|
|
:param DeviceInfo device: device for which change persistent flag
|
|
:param bool persistent: new persistent flag
|
|
"""
|
|
|
|
self._vm.qubesd_call(None,
|
|
'admin.vm.device.{}.Set.persistent'.format(
|
|
self._class),
|
|
'{!s}+{!s}'.format(device.backend_domain,
|
|
device.ident),
|
|
str(persistent).encode('utf-8'))
|
|
|
|
__iter__ = available
|
|
|
|
def clear_cache(self):
|
|
"""Clear cache of available devices"""
|
|
self._dev_cache.clear()
|
|
|
|
def __getitem__(self, item):
|
|
"""Get device object with given ident.
|
|
|
|
:returns: py:class:`DeviceInfo`
|
|
|
|
If domain isn't running, it is impossible to check device validity,
|
|
so return UnknownDevice object. Also do the same for non-existing
|
|
devices - otherwise it will be impossible to detach already
|
|
disconnected device.
|
|
"""
|
|
# fist, check if we have cached device info
|
|
if item in self._dev_cache:
|
|
return self._dev_cache[item]
|
|
# then look for available devices
|
|
for dev in self.available():
|
|
if dev.ident == item:
|
|
self._dev_cache[item] = dev
|
|
return dev
|
|
# if still nothing, return UnknownDevice instance for the reason
|
|
# explained in docstring, but don't cache it
|
|
return UnknownDevice(self._vm, self._class, item)
|
|
|
|
|
|
class DeviceManager(dict):
|
|
"""Device manager that hold all devices by their classes.
|
|
|
|
:param vm: VM for which we manage devices
|
|
"""
|
|
|
|
def __init__(self, vm):
|
|
super(DeviceManager, self).__init__()
|
|
self._vm = vm
|
|
|
|
def __missing__(self, key):
|
|
self[key] = DeviceCollection(self._vm, key)
|
|
return self[key]
|
|
|
|
def __iter__(self):
|
|
return iter(self._vm.app.list_deviceclass())
|
|
|
|
def keys(self):
|
|
return self._vm.app.list_deviceclass()
|