From 75ac611c90562052667b8aec4373f8b8d500d597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 15 May 2017 14:42:36 +0200 Subject: [PATCH] admin: admin.vm.device.* methods QubesOS/qubes-issues#2622 --- qubes/api/admin.py | 131 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 3d9dc977..33f1a161 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -25,9 +25,11 @@ Qubes OS Management API import asyncio import string +import itertools import pkg_resources import qubes.api +import qubes.devices import qubes.storage import qubes.utils import qubes.vm @@ -738,3 +740,132 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): del self.app.domains[dst_vm] raise self.app.save() + + @qubes.api.method('admin.vm.device.{endpoint}.Available', endpoints=(ep.name + for ep in pkg_resources.iter_entry_points('qubes.devices')), + no_payload=True) + @asyncio.coroutine + def vm_device_available(self, endpoint): + devclass = endpoint + devices = self.dest.devices[devclass].available() + if self.arg: + devices = [dev for dev in devices if dev.ident == self.arg] + # no duplicated devices, but device may not exists, in which case + # the list is empty + assert len(devices) <= 1 + devices = self.fire_event_for_filter(devices, devclass=devclass) + + dev_info = {} + for dev in devices: + non_default_attrs = set(attr for attr in dir(dev) if + not attr.startswith('_')).difference(( + 'backend_domain', 'ident', 'frontend_domain', + 'description', 'options', 'data')) + properties_txt = ' '.join( + '{}={!s}'.format(prop, value) for prop, value + in itertools.chain( + ((key, getattr(dev, key)) for key in non_default_attrs), + dev.data.items(), + # keep description as the last one, according to API + # specification + (('description', dev.description),) + )) + assert '\n' not in properties_txt + dev_info[dev.ident] = properties_txt + + return ''.join('{} {}\n'.format(ident, dev_info[ident]) + for ident in sorted(dev_info)) + + @qubes.api.method('admin.vm.device.{endpoint}.List', endpoints=(ep.name + for ep in pkg_resources.iter_entry_points('qubes.devices')), + no_payload=True) + @asyncio.coroutine + def vm_device_list(self, endpoint): + devclass = endpoint + device_assignments = self.dest.devices[devclass].assignments() + if self.arg: + select_backend, select_ident = self.arg.split('+', 1) + device_assignments = [dev for dev in device_assignments + if (str(dev.backend_domain), dev.ident) + == (select_backend, select_ident)] + # no duplicated devices, but device may not exists, in which case + # the list is empty + assert len(device_assignments) <= 1 + device_assignments = self.fire_event_for_filter(device_assignments, + devclass=devclass) + + dev_info = {} + for dev in device_assignments: + properties_txt = ' '.join( + '{}={!s}'.format(opt, value) for opt, value + in itertools.chain( + dev.options.items(), + (('persistent', 'yes' if dev.persistent else 'no'),) + )) + assert '\n' not in properties_txt + ident = '{!s}+{!s}'.format(dev.backend_domain, dev.ident) + dev_info[ident] = properties_txt + + return ''.join('{} {}\n'.format(ident, dev_info[ident]) + for ident in sorted(dev_info)) + + @qubes.api.method('admin.vm.device.{endpoint}.Attach', endpoints=(ep.name + for ep in pkg_resources.iter_entry_points('qubes.devices'))) + @asyncio.coroutine + def vm_device_attach(self, endpoint, untrusted_payload): + devclass = endpoint + options = {} + persistent = False + for untrusted_option in untrusted_payload.decode('ascii').split(): + try: + untrusted_key, untrusted_value = untrusted_option.split('=', 1) + except ValueError: + raise qubes.api.ProtocolError('Invalid options format') + if untrusted_key == 'persistent': + persistent = qubes.property.bool(None, None, untrusted_value) + else: + allowed_chars_key = string.digits + string.ascii_letters + '-_.' + allowed_chars_value = allowed_chars_key + ',+:' + if any(x not in allowed_chars_key for x in untrusted_key): + raise qubes.api.ProtocolError( + 'Invalid chars in option name') + if any(x not in allowed_chars_value for x in untrusted_value): + raise qubes.api.ProtocolError( + 'Invalid chars in option value') + options[untrusted_key] = untrusted_value + + # qrexec already verified that no strange characters are in self.arg + backend_domain, ident = self.arg.split('+', 1) + # may raise KeyError, either on domain or ident + dev = self.app.domains[backend_domain].devices[devclass][ident] + + self.fire_event_for_permission(device=dev, + devclass=devclass, persistent=persistent, + options=options) + + assignment = qubes.devices.DeviceAssignment(dev.backend_domain, + dev.ident, options=options, persistent=persistent) + self.dest.devices[devclass].attach(assignment) + self.app.save() + + @qubes.api.method('admin.vm.device.{endpoint}.Detach', endpoints=(ep.name + for ep in pkg_resources.iter_entry_points('qubes.devices')), + no_payload=True) + @asyncio.coroutine + def vm_device_detach(self, endpoint): + devclass = endpoint + + # qrexec already verified that no strange characters are in self.arg + backend_domain, ident = self.arg.split('+', 1) + # may raise KeyError; if device isn't found, it will be UnknownDevice + # instance - but allow it, otherwise it will be impossible to detach + # already removed device + dev = self.app.domains[backend_domain].devices[devclass][ident] + + self.fire_event_for_permission(device=dev, + devclass=devclass) + + assignment = qubes.devices.DeviceAssignment(dev.backend_domain, + dev.ident) + self.dest.devices[devclass].detach(assignment) + self.app.save()