123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, write to the Free Software Foundation, Inc.,
- # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- #
- '''
- Qubes OS Management API
- '''
- import asyncio
- import functools
- import string
- import pkg_resources
- import qubes.vm
- import qubes.vm.qubesvm
- import qubes.storage
- class ProtocolError(AssertionError):
- '''Raised when something is wrong with data received'''
- pass
- class PermissionDenied(Exception):
- '''Raised deliberately by handlers when we decide not to cooperate'''
- pass
- def api(name, *, no_payload=False):
- '''Decorator factory for methods intended to appear in API.
- The decorated method can be called from public API using a child of
- :py:class:`AbstractQubesMgmt` class. The method becomes "public", and can be
- called using remote management interface.
- :param str name: qrexec rpc method name
- :param bool no_payload: if :py:obj:`True`, will barf on non-empty payload; \
- also will not pass payload at all to the method
- The expected function method should have one argument (other than usual
- *self*), ``untrusted_payload``, which will contain the payload.
- .. warning::
- This argument has to be named such, to remind the programmer that the
- content of this variable is indeed untrusted.
- If *no_payload* is true, then the method is called with no arguments.
- '''
- # TODO regexp for vm/dev classess; supply regexp groups as untrusted_ kwargs
- def decorator(func):
- if no_payload:
- # the following assignment is needed for how closures work in Python
- _func = func
- @functools.wraps(_func)
- def wrapper(self, untrusted_payload):
- if untrusted_payload != b'':
- raise ProtocolError('unexpected payload')
- return _func(self)
- func = wrapper
- func._rpcname = name # pylint: disable=protected-access
- return func
- return decorator
- class AbstractQubesMgmt(object):
- '''Common code for Qubes Management Protocol handling
- Different interfaces can expose different API call sets, however they share
- common protocol and common implementation framework. This class is the
- latter.
- To implement a new interface, inherit from this class and write at least one
- method and decorate it with :py:func:`api` decorator. It will have access to
- pre-defined attributes: :py:attr:`app`, :py:attr:`src`, :py:attr:`dest`,
- :py:attr:`arg` and :py:attr:`method`.
- There are also two helper functions for firing events associated with API
- calls.
- '''
- def __init__(self, app, src, method, dest, arg):
- #: :py:class:`qubes.Qubes` object
- self.app = app
- #: source qube
- self.src = self.app.domains[src.decode('ascii')]
- #: destination qube
- self.dest = self.app.domains[dest.decode('ascii')]
- #: argument
- self.arg = arg.decode('ascii')
- #: name of the method
- self.method = method.decode('ascii')
- untrusted_candidates = []
- for attr in dir(self):
- untrusted_func = getattr(self, attr)
- if not callable(untrusted_func):
- continue
- try:
- # pylint: disable=protected-access
- if untrusted_func._rpcname != self.method:
- continue
- except AttributeError:
- continue
- untrusted_candidates.append(untrusted_func)
- if not untrusted_candidates:
- raise ProtocolError('no such method: {!r}'.format(self.method))
- assert len(untrusted_candidates) == 1, \
- 'multiple candidates for method {!r}'.format(self.method)
- #: the method to execute
- self.execute = untrusted_candidates[0]
- del untrusted_candidates
- def fire_event_for_permission(self, **kwargs):
- '''Fire an event on the source qube to check for permission'''
- return self.src.fire_event_pre('mgmt-permission:{}'.format(self.method),
- dest=self.dest, arg=self.arg, **kwargs)
- def fire_event_for_filter(self, iterable, **kwargs):
- '''Fire an event on the source qube to filter for permission'''
- for selector in self.fire_event_for_permission(**kwargs):
- iterable = filter(selector, iterable)
- return iterable
- class QubesMgmt(AbstractQubesMgmt):
- '''Implementation of Qubes Management API calls
- This class contains all the methods available in the main API.
- .. seealso::
- https://www.qubes-os.org/doc/mgmt1/
- '''
- @api('mgmt.vmclass.List', no_payload=True)
- @asyncio.coroutine
- def vmclass_list(self):
- '''List all VM classes'''
- assert not self.arg
- assert self.dest.name == 'dom0'
- entrypoints = self.fire_event_for_filter(
- pkg_resources.iter_entry_points(qubes.vm.VM_ENTRY_POINT))
- return ''.join('{}\n'.format(ep.name)
- for ep in entrypoints)
- @api('mgmt.vm.List', no_payload=True)
- @asyncio.coroutine
- def vm_list(self):
- '''List all the domains'''
- assert not self.arg
- if self.dest.name == 'dom0':
- domains = self.fire_event_for_filter(self.app.domains)
- else:
- domains = self.fire_event_for_filter([self.dest])
- return ''.join('{} class={} state={}\n'.format(
- vm.name,
- vm.__class__.__name__,
- vm.get_power_state())
- for vm in sorted(domains))
- @api('mgmt.vm.property.List', no_payload=True)
- @asyncio.coroutine
- def vm_property_list(self):
- '''List all properties on a qube'''
- assert not self.arg
- properties = self.fire_event_for_filter(self.dest.property_list())
- return ''.join('{}\n'.format(prop.__name__) for prop in properties)
- @api('mgmt.vm.property.Get', no_payload=True)
- @asyncio.coroutine
- def vm_property_get(self):
- '''Get a value of one property'''
- assert self.arg in self.dest.property_list()
- self.fire_event_for_permission()
- property_def = self.dest.property_get_def(self.arg)
- # explicit list to be sure that it matches protocol spec
- if isinstance(property_def, qubes.vm.VMProperty):
- property_type = 'vm'
- elif property_def.type is int:
- property_type = 'int'
- elif property_def.type is bool:
- property_type = 'bool'
- elif self.arg == 'label':
- property_type = 'label'
- else:
- property_type = 'str'
- try:
- value = getattr(self.dest, self.arg)
- except AttributeError:
- return 'default=True type={} '.format(property_type)
- else:
- return 'default={} type={} {}'.format(
- str(self.dest.property_is_default(self.arg)),
- property_type,
- str(value) if value is not None else '')
- @api('mgmt.vm.property.Set')
- @asyncio.coroutine
- def vm_property_set(self, untrusted_payload):
- assert self.arg in self.dest.property_list()
- property_def = self.dest.property_get_def(self.arg)
- newvalue = property_def.sanitize(untrusted_newvalue=untrusted_payload)
- self.fire_event_for_permission(newvalue=newvalue)
- setattr(self.dest, self.arg, newvalue)
- self.app.save()
- @api('mgmt.vm.property.Help', no_payload=True)
- @asyncio.coroutine
- def vm_property_help(self):
- '''Get help for one property'''
- assert self.arg in self.dest.property_list()
- self.fire_event_for_permission()
- try:
- doc = self.dest.property_get_def(self.arg).__doc__
- except AttributeError:
- return ''
- return qubes.utils.format_doc(doc)
- @api('mgmt.vm.property.Reset', no_payload=True)
- @asyncio.coroutine
- def vm_property_reset(self):
- '''Reset a property to a default value'''
- assert self.arg in self.dest.property_list()
- self.fire_event_for_permission()
- delattr(self.dest, self.arg)
- self.app.save()
- @api('mgmt.vm.volume.List', no_payload=True)
- @asyncio.coroutine
- def vm_volume_list(self):
- assert not self.arg
- volume_names = self.fire_event_for_filter(self.dest.volumes.keys())
- return ''.join('{}\n'.format(name) for name in volume_names)
- @api('mgmt.vm.volume.Info', no_payload=True)
- @asyncio.coroutine
- def vm_volume_info(self):
- assert self.arg in self.dest.volumes.keys()
- self.fire_event_for_permission()
- volume = self.dest.volumes[self.arg]
- # properties defined in API
- volume_properties = [
- 'pool', 'vid', 'size', 'usage', 'rw', 'internal', 'source',
- 'save_on_stop', 'snap_on_start']
- return ''.join('{}={}\n'.format(key, getattr(volume, key)) for key in
- volume_properties)
- @api('mgmt.vm.volume.ListSnapshots', no_payload=True)
- @asyncio.coroutine
- def vm_volume_listsnapshots(self):
- assert self.arg in self.dest.volumes.keys()
- volume = self.dest.volumes[self.arg]
- revisions = [revision for revision in volume.revisions]
- revisions = self.fire_event_for_filter(revisions)
- return ''.join('{}\n'.format(revision) for revision in revisions)
- @api('mgmt.vm.volume.Revert')
- @asyncio.coroutine
- def vm_volume_revert(self, untrusted_payload):
- assert self.arg in self.dest.volumes.keys()
- untrusted_revision = untrusted_payload.decode('ascii').strip()
- del untrusted_payload
- volume = self.dest.volumes[self.arg]
- snapshots = volume.revisions
- assert untrusted_revision in snapshots
- revision = untrusted_revision
- self.fire_event_for_permission(revision=revision)
- self.dest.storage.get_pool(volume).revert(revision)
- self.app.save()
- @api('mgmt.vm.volume.Resize')
- @asyncio.coroutine
- def vm_volume_resize(self, untrusted_payload):
- assert self.arg in self.dest.volumes.keys()
- untrusted_size = untrusted_payload.decode('ascii').strip()
- del untrusted_payload
- assert untrusted_size.isdigit() # only digits, forbid '-' too
- assert len(untrusted_size) <= 20 # limit to about 2^64
- size = int(untrusted_size)
- self.fire_event_for_permission(size=size)
- self.dest.storage.resize(self.arg, size)
- self.app.save()
- @api('mgmt.pool.List', no_payload=True)
- @asyncio.coroutine
- def pool_list(self):
- assert not self.arg
- assert self.dest.name == 'dom0'
- pools = self.fire_event_for_filter(self.app.pools)
- return ''.join('{}\n'.format(pool) for pool in pools)
- @api('mgmt.pool.ListDrivers', no_payload=True)
- @asyncio.coroutine
- def pool_listdrivers(self):
- assert self.dest.name == 'dom0'
- assert not self.arg
- drivers = self.fire_event_for_filter(qubes.storage.pool_drivers())
- return ''.join('{} {}\n'.format(
- driver,
- ' '.join(qubes.storage.driver_parameters(driver)))
- for driver in drivers)
- @api('mgmt.pool.Info', no_payload=True)
- @asyncio.coroutine
- def pool_info(self):
- assert self.dest.name == 'dom0'
- assert self.arg in self.app.pools.keys()
- pool = self.app.pools[self.arg]
- self.fire_event_for_permission(pool=pool)
- return ''.join('{}={}\n'.format(prop, val)
- for prop, val in sorted(pool.config.items()))
- @api('mgmt.pool.Add')
- @asyncio.coroutine
- def pool_add(self, untrusted_payload):
- assert self.dest.name == 'dom0'
- drivers = qubes.storage.pool_drivers()
- assert self.arg in drivers
- untrusted_pool_config = untrusted_payload.decode('ascii').splitlines()
- del untrusted_payload
- assert all(('=' in line) for line in untrusted_pool_config)
- # pairs of (option, value)
- untrusted_pool_config = [line.split('=', 1)
- for line in untrusted_pool_config]
- # reject duplicated options
- assert len(set(x[0] for x in untrusted_pool_config)) == \
- len([x[0] for x in untrusted_pool_config])
- # and convert to dict
- untrusted_pool_config = dict(untrusted_pool_config)
- assert 'name' in untrusted_pool_config
- untrusted_pool_name = untrusted_pool_config.pop('name')
- allowed_chars = string.ascii_letters + string.digits + '-_.'
- assert all(c in allowed_chars for c in untrusted_pool_name)
- pool_name = untrusted_pool_name
- assert pool_name not in self.app.pools
- driver_parameters = qubes.storage.driver_parameters(self.arg)
- assert all(key in driver_parameters for key in untrusted_pool_config)
- pool_config = untrusted_pool_config
- self.fire_event_for_permission(name=pool_name,
- pool_config=pool_config)
- self.app.add_pool(name=pool_name, driver=self.arg, **pool_config)
- self.app.save()
- @api('mgmt.pool.Remove', no_payload=True)
- @asyncio.coroutine
- def pool_remove(self):
- assert self.dest.name == 'dom0'
- assert self.arg in self.app.pools.keys()
- self.fire_event_for_permission()
- self.app.remove_pool(self.arg)
- self.app.save()
- @api('mgmt.label.List', no_payload=True)
- @asyncio.coroutine
- def label_list(self):
- assert self.dest.name == 'dom0'
- assert not self.arg
- labels = self.fire_event_for_filter(self.app.labels.values())
- return ''.join('{}\n'.format(label.name) for label in labels)
- @api('mgmt.label.Get', no_payload=True)
- @asyncio.coroutine
- def label_get(self):
- assert self.dest.name == 'dom0'
- try:
- label = self.app.get_label(self.arg)
- except KeyError:
- raise qubes.exc.QubesValueError
- self.fire_event_for_permission(label=label)
- return label.color
- @api('mgmt.label.Create')
- @asyncio.coroutine
- def label_create(self, untrusted_payload):
- assert self.dest.name == 'dom0'
- # don't confuse label name with label index
- assert not self.arg.isdigit()
- allowed_chars = string.ascii_letters + string.digits + '-_.'
- assert all(c in allowed_chars for c in self.arg)
- try:
- self.app.get_label(self.arg)
- except KeyError:
- # ok, no such label yet
- pass
- else:
- raise qubes.exc.QubesValueError('label already exists')
- untrusted_payload = untrusted_payload.decode('ascii').strip()
- assert len(untrusted_payload) == 8
- assert untrusted_payload.startswith('0x')
- # besides prefix, only hex digits are allowed
- assert all(x in string.hexdigits for x in untrusted_payload[2:])
- # SEE: #2732
- color = untrusted_payload
- self.fire_event_for_permission(color=color)
- # allocate new index, but make sure it's outside of default labels set
- new_index = max(
- qubes.config.max_default_label, *self.app.labels.keys()) + 1
- label = qubes.Label(new_index, color, self.arg)
- self.app.labels[new_index] = label
- self.app.save()
- @api('mgmt.label.Remove', no_payload=True)
- @asyncio.coroutine
- def label_remove(self):
- assert self.dest.name == 'dom0'
- try:
- label = self.app.get_label(self.arg)
- except KeyError:
- raise qubes.exc.QubesValueError
- # don't allow removing default labels
- assert label.index > qubes.config.max_default_label
- # FIXME: this should be in app.add_label()
- for vm in self.app.domains:
- if vm.label == label:
- raise qubes.exc.QubesException('label still in use')
- self.fire_event_for_permission(label=label)
- del self.app.labels[label.index]
- self.app.save()
- @api('mgmt.vm.Start', no_payload=True)
- @asyncio.coroutine
- def vm_start(self):
- assert not self.arg
- self.fire_event_for_permission()
- yield from self.dest.start()
- @api('mgmt.vm.Shutdown', no_payload=True)
- @asyncio.coroutine
- def vm_shutdown(self):
- assert not self.arg
- self.fire_event_for_permission()
- yield from self.dest.shutdown()
- @api('mgmt.vm.Pause', no_payload=True)
- @asyncio.coroutine
- def vm_pause(self):
- assert not self.arg
- self.fire_event_for_permission()
- yield from self.dest.pause()
- @api('mgmt.vm.Unpause', no_payload=True)
- @asyncio.coroutine
- def vm_unpause(self):
- assert not self.arg
- self.fire_event_for_permission()
- yield from self.dest.unpause()
- @api('mgmt.vm.Kill', no_payload=True)
- @asyncio.coroutine
- def vm_kill(self):
- assert not self.arg
- self.fire_event_for_permission()
- yield from self.dest.kill()
|