admin: admin.vm.device.* methods

QubesOS/qubes-issues#2622
This commit is contained in:
Marek Marczykowski-Górecki 2017-05-15 14:42:36 +02:00
parent f3c7fb8129
commit 75ac611c90
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -25,9 +25,11 @@ Qubes OS Management API
import asyncio import asyncio
import string import string
import itertools
import pkg_resources import pkg_resources
import qubes.api import qubes.api
import qubes.devices
import qubes.storage import qubes.storage
import qubes.utils import qubes.utils
import qubes.vm import qubes.vm
@ -738,3 +740,132 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
del self.app.domains[dst_vm] del self.app.domains[dst_vm]
raise raise
self.app.save() self.app.save()
@qubes.api.method('admin.vm.device.{endpoint}.Available', endpoints=(ep.name
for ep in pkg_resources.iter_entry_points('qubes.devices')),
no_payload=True)
@asyncio.coroutine
def vm_device_available(self, endpoint):
devclass = endpoint
devices = self.dest.devices[devclass].available()
if self.arg:
devices = [dev for dev in devices if dev.ident == self.arg]
# no duplicated devices, but device may not exists, in which case
# the list is empty
assert len(devices) <= 1
devices = self.fire_event_for_filter(devices, devclass=devclass)
dev_info = {}
for dev in devices:
non_default_attrs = set(attr for attr in dir(dev) if
not attr.startswith('_')).difference((
'backend_domain', 'ident', 'frontend_domain',
'description', 'options', 'data'))
properties_txt = ' '.join(
'{}={!s}'.format(prop, value) for prop, value
in itertools.chain(
((key, getattr(dev, key)) for key in non_default_attrs),
dev.data.items(),
# keep description as the last one, according to API
# specification
(('description', dev.description),)
))
assert '\n' not in properties_txt
dev_info[dev.ident] = properties_txt
return ''.join('{} {}\n'.format(ident, dev_info[ident])
for ident in sorted(dev_info))
@qubes.api.method('admin.vm.device.{endpoint}.List', endpoints=(ep.name
for ep in pkg_resources.iter_entry_points('qubes.devices')),
no_payload=True)
@asyncio.coroutine
def vm_device_list(self, endpoint):
devclass = endpoint
device_assignments = self.dest.devices[devclass].assignments()
if self.arg:
select_backend, select_ident = self.arg.split('+', 1)
device_assignments = [dev for dev in device_assignments
if (str(dev.backend_domain), dev.ident)
== (select_backend, select_ident)]
# no duplicated devices, but device may not exists, in which case
# the list is empty
assert len(device_assignments) <= 1
device_assignments = self.fire_event_for_filter(device_assignments,
devclass=devclass)
dev_info = {}
for dev in device_assignments:
properties_txt = ' '.join(
'{}={!s}'.format(opt, value) for opt, value
in itertools.chain(
dev.options.items(),
(('persistent', 'yes' if dev.persistent else 'no'),)
))
assert '\n' not in properties_txt
ident = '{!s}+{!s}'.format(dev.backend_domain, dev.ident)
dev_info[ident] = properties_txt
return ''.join('{} {}\n'.format(ident, dev_info[ident])
for ident in sorted(dev_info))
@qubes.api.method('admin.vm.device.{endpoint}.Attach', endpoints=(ep.name
for ep in pkg_resources.iter_entry_points('qubes.devices')))
@asyncio.coroutine
def vm_device_attach(self, endpoint, untrusted_payload):
devclass = endpoint
options = {}
persistent = False
for untrusted_option in untrusted_payload.decode('ascii').split():
try:
untrusted_key, untrusted_value = untrusted_option.split('=', 1)
except ValueError:
raise qubes.api.ProtocolError('Invalid options format')
if untrusted_key == 'persistent':
persistent = qubes.property.bool(None, None, untrusted_value)
else:
allowed_chars_key = string.digits + string.ascii_letters + '-_.'
allowed_chars_value = allowed_chars_key + ',+:'
if any(x not in allowed_chars_key for x in untrusted_key):
raise qubes.api.ProtocolError(
'Invalid chars in option name')
if any(x not in allowed_chars_value for x in untrusted_value):
raise qubes.api.ProtocolError(
'Invalid chars in option value')
options[untrusted_key] = untrusted_value
# qrexec already verified that no strange characters are in self.arg
backend_domain, ident = self.arg.split('+', 1)
# may raise KeyError, either on domain or ident
dev = self.app.domains[backend_domain].devices[devclass][ident]
self.fire_event_for_permission(device=dev,
devclass=devclass, persistent=persistent,
options=options)
assignment = qubes.devices.DeviceAssignment(dev.backend_domain,
dev.ident, options=options, persistent=persistent)
self.dest.devices[devclass].attach(assignment)
self.app.save()
@qubes.api.method('admin.vm.device.{endpoint}.Detach', endpoints=(ep.name
for ep in pkg_resources.iter_entry_points('qubes.devices')),
no_payload=True)
@asyncio.coroutine
def vm_device_detach(self, endpoint):
devclass = endpoint
# qrexec already verified that no strange characters are in self.arg
backend_domain, ident = self.arg.split('+', 1)
# may raise KeyError; if device isn't found, it will be UnknownDevice
# instance - but allow it, otherwise it will be impossible to detach
# already removed device
dev = self.app.domains[backend_domain].devices[devclass][ident]
self.fire_event_for_permission(device=dev,
devclass=devclass)
assignment = qubes.devices.DeviceAssignment(dev.backend_domain,
dev.ident)
self.dest.devices[devclass].detach(assignment)
self.app.save()