mgmt: implement storage-related methods

QubesOS/qubes-issues#2622
This commit is contained in:
Marek Marczykowski-Górecki 2017-03-09 02:42:05 +01:00
parent 417cb6e912
commit fb7bd6823a
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 161 additions and 0 deletions

View File

@ -24,8 +24,10 @@ Qubes OS Management API
import asyncio import asyncio
import reprlib import reprlib
import string
import qubes.vm.qubesvm import qubes.vm.qubesvm
import qubes.storage
class ProtocolRepr(reprlib.Repr): class ProtocolRepr(reprlib.Repr):
@ -194,3 +196,153 @@ class QubesMgmt(object):
self.fire_event_for_permission() self.fire_event_for_permission()
delattr(self.dest, self.arg) delattr(self.dest, self.arg)
@asyncio.coroutine
def vm_volume_list(self, untrusted_payload):
assert not self.arg
assert not untrusted_payload
del untrusted_payload
volume_names = self.fire_event_for_filter(self.dest.volumes.keys())
return ''.join('{}\n'.format(name) for name in volume_names)
@asyncio.coroutine
def vm_volume_info(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
volume = self.dest.volumes[self.arg]
# properties defined in API
volume_properties = [
'pool', 'vid', 'size', 'usage', 'rw', 'internal', 'source',
'save_on_stop', 'snap_on_start']
return ''.join('{}={}\n'.format(key, getattr(volume, key)) for key in
volume_properties)
@asyncio.coroutine
def vm_volume_listsnapshots(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
volume = self.dest.volumes[self.arg]
return ''.join('{}\n'.format(revision) for revision in volume.revisions)
@asyncio.coroutine
def vm_volume_revert(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
untrusted_revision = untrusted_payload.decode('ascii').strip()
del untrusted_payload
volume = self.dest.volumes[self.arg]
snapshots = volume.revisions
assert untrusted_revision in snapshots
revision = untrusted_revision
self.fire_event_for_permission(revision=revision)
self.dest.storage.get_pool(volume).revert(revision)
@asyncio.coroutine
def vm_volume_resize(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
untrusted_size = untrusted_payload.decode('ascii').strip()
del untrusted_payload
assert untrusted_size.isdigit() # only digits, forbid '-' too
assert len(untrusted_size) <= 20 # limit to about 2^64
size = int(untrusted_size)
self.fire_event_for_permission(size=size)
self.dest.storage.resize(self.arg, size)
@asyncio.coroutine
def pool_list(self, untrusted_payload):
assert not self.arg
assert self.dest.name == 'dom0'
assert not untrusted_payload
del untrusted_payload
pools = self.fire_event_for_filter(self.app.pools)
return ''.join('{}\n'.format(pool) for pool in pools)
@asyncio.coroutine
def pool_listdrivers(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert not self.arg
assert not untrusted_payload
del untrusted_payload
drivers = self.fire_event_for_filter(qubes.storage.pool_drivers())
return ''.join('{} {}\n'.format(
driver,
' '.join(qubes.storage.driver_parameters(driver)))
for driver in drivers)
@asyncio.coroutine
def pool_info(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
assert not untrusted_payload
del untrusted_payload
pool = self.app.pools[self.arg]
self.fire_event_for_permission(pool=pool)
return ''.join('{}={}\n'.format(prop, val)
for prop, val in sorted(pool.config.items()))
@asyncio.coroutine
def pool_add(self, untrusted_payload):
assert self.dest.name == 'dom0'
drivers = qubes.storage.pool_drivers()
assert self.arg in drivers
untrusted_pool_config = untrusted_payload.decode('ascii').splitlines()
del untrusted_payload
assert all(('=' in line) for line in untrusted_pool_config)
# pairs of (option, value)
untrusted_pool_config = [line.split('=', 1)
for line in untrusted_pool_config]
# reject duplicated options
assert len(set(x[0] for x in untrusted_pool_config)) == \
len([x[0] for x in untrusted_pool_config])
# and convert to dict
untrusted_pool_config = dict(untrusted_pool_config)
assert 'name' in untrusted_pool_config
untrusted_pool_name = untrusted_pool_config.pop('name')
allowed_chars = string.ascii_letters + string.digits + '-_.'
assert all(c in allowed_chars for c in untrusted_pool_name)
pool_name = untrusted_pool_name
assert pool_name not in self.app.pools
driver_parameters = qubes.storage.driver_parameters(self.arg)
assert all(key in driver_parameters for key in untrusted_pool_config)
# option names validated, validation of option values is delegated to
# extension (through events mechanism)
self.fire_event_for_permission(name=pool_name,
untrusted_pool_config=untrusted_pool_config)
pool_config = untrusted_pool_config
self.app.add_pool(name=pool_name, driver=self.arg, **pool_config)
@asyncio.coroutine
def pool_remove(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
self.app.remove_pool(self.arg)

View File

@ -635,6 +635,15 @@ def pool_drivers():
for ep in pkg_resources.iter_entry_points(STORAGE_ENTRY_POINT)] for ep in pkg_resources.iter_entry_points(STORAGE_ENTRY_POINT)]
def driver_parameters(name):
''' Get __init__ parameters from a driver with out `self` & `name`. '''
init_function = qubes.utils.get_entry_point_one(
qubes.storage.STORAGE_ENTRY_POINT, name).__init__
params = init_function.func_code.co_varnames
ignored_params = ['self', 'name']
return [p for p in params if p not in ignored_params]
def isodate(seconds=time.time()): def isodate(seconds=time.time()):
''' Helper method which returns an iso date ''' ''' Helper method which returns an iso date '''
return datetime.utcfromtimestamp(seconds).isoformat("T") return datetime.utcfromtimestamp(seconds).isoformat("T")