admin: add methods for global properties

QubesOS/qubes-issues#2622
This commit is contained in:
Marek Marczykowski-Górecki 2017-05-13 02:09:12 +02:00
parent 0160d6e2f0
commit 68a426f0ba
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 107 additions and 12 deletions

View File

@ -105,9 +105,19 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def vm_property_list(self):
'''List all properties on a qube'''
return self._property_list(self.dest)
@qubes.api.method('admin.property.List', no_payload=True)
@asyncio.coroutine
def property_list(self):
'''List all global properties'''
assert self.dest.name == 'dom0'
return self._property_list(self.app)
def _property_list(self, dest):
assert not self.arg
properties = self.fire_event_for_filter(self.dest.property_list())
properties = self.fire_event_for_filter(dest.property_list())
return ''.join('{}\n'.format(prop.__name__) for prop in properties)
@ -115,11 +125,21 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def vm_property_get(self):
'''Get a value of one property'''
assert self.arg in self.dest.property_list()
return self._property_get(self.dest)
@qubes.api.method('admin.property.Get', no_payload=True)
@asyncio.coroutine
def property_get(self):
'''Get a value of one global property'''
assert self.dest.name == 'dom0'
return self._property_get(self.app)
def _property_get(self, dest):
assert self.arg in dest.property_list()
self.fire_event_for_permission()
property_def = self.dest.property_get_def(self.arg)
property_def = dest.property_get_def(self.arg)
# explicit list to be sure that it matches protocol spec
if isinstance(property_def, qubes.vm.VMProperty):
property_type = 'vm'
@ -133,38 +153,61 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
property_type = 'str'
try:
value = getattr(self.dest, self.arg)
value = getattr(dest, self.arg)
except AttributeError:
return 'default=True type={} '.format(property_type)
else:
return 'default={} type={} {}'.format(
str(self.dest.property_is_default(self.arg)),
str(dest.property_is_default(self.arg)),
property_type,
str(value) if value is not None else '')
@qubes.api.method('admin.vm.property.Set')
@asyncio.coroutine
def vm_property_set(self, untrusted_payload):
assert self.arg in self.dest.property_list()
'''Set property value'''
return self._property_set(self.dest,
untrusted_payload=untrusted_payload)
property_def = self.dest.property_get_def(self.arg)
@qubes.api.method('admin.property.Set')
@asyncio.coroutine
def property_set(self, untrusted_payload):
'''Set property value'''
assert self.dest.name == 'dom0'
return self._property_set(self.app,
untrusted_payload=untrusted_payload)
def _property_set(self, dest, untrusted_payload):
assert self.arg in dest.property_list()
property_def = dest.property_get_def(self.arg)
newvalue = property_def.sanitize(untrusted_newvalue=untrusted_payload)
self.fire_event_for_permission(newvalue=newvalue)
setattr(self.dest, self.arg, newvalue)
setattr(dest, self.arg, newvalue)
self.app.save()
@qubes.api.method('admin.vm.property.Help', no_payload=True)
@asyncio.coroutine
def vm_property_help(self):
'''Get help for one property'''
assert self.arg in self.dest.property_list()
return self._property_help(self.dest)
@qubes.api.method('admin.property.Help', no_payload=True)
@asyncio.coroutine
def property_help(self):
'''Get help for one property'''
assert self.dest.name == 'dom0'
return self._property_help(self.app)
def _property_help(self, dest):
assert self.arg in dest.property_list()
self.fire_event_for_permission()
try:
doc = self.dest.property_get_def(self.arg).__doc__
doc = dest.property_get_def(self.arg).__doc__
except AttributeError:
return ''
@ -174,11 +217,21 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def vm_property_reset(self):
'''Reset a property to a default value'''
assert self.arg in self.dest.property_list()
return self._property_reset(self.dest)
@qubes.api.method('admin.property.Reset', no_payload=True)
@asyncio.coroutine
def property_reset(self):
'''Reset a property to a default value'''
assert self.dest.name == 'dom0'
return self._property_reset(self.app)
def _property_reset(self, dest):
assert self.arg in dest.property_list()
self.fire_event_for_permission()
delattr(self.dest, self.arg)
delattr(dest, self.arg)
self.app.save()
@qubes.api.method('admin.vm.volume.List', no_payload=True)

View File

@ -1284,6 +1284,48 @@ class TC_00_VMs(AdminAPITestCase):
self.assertFalse(self.app.save.called)
def test_400_property_list(self):
# actual function tested for admin.vm.property.* already
# this test is kind of stupid, but at least check if appropriate
# mgmt-permission event is fired
value = self.call_mgmt_func(b'admin.property.List', b'dom0')
properties = self.app.property_list()
self.assertEqual(value,
''.join('{}\n'.format(prop.__name__) for prop in properties))
def test_410_property_get_str(self):
# actual function tested for admin.vm.property.* already
value = self.call_mgmt_func(b'admin.property.Get', b'dom0',
b'default_kernel')
self.assertEqual(value, 'default=False type=str 1.0')
def test_420_propert_set_str(self):
# actual function tested for admin.vm.property.* already
with unittest.mock.patch('qubes.property.__set__') as mock:
value = self.call_mgmt_func(b'admin.property.Set', b'dom0',
b'default_kernel', b'1.0')
self.assertIsNone(value)
mock.assert_called_once_with(self.app, '1.0')
self.app.save.assert_called_once_with()
def test_440_property_help(self):
# actual function tested for admin.vm.property.* already
value = self.call_mgmt_func(b'admin.property.Help', b'dom0',
b'clockvm')
self.assertEqual(value,
'Which VM to use as NTP proxy for updating AdminVM')
self.assertFalse(self.app.save.called)
def test_450_property_reset(self):
# actual function tested for admin.vm.property.* already
with unittest.mock.patch('qubes.property.__delete__') as mock:
value = self.call_mgmt_func(b'admin.property.Reset', b'dom0',
b'clockvm')
mock.assert_called_with(self.app)
self.assertIsNone(value)
self.app.save.assert_called_once_with()
def test_990_vm_unexpected_payload(self):
methods_with_no_payload = [
b'admin.vm.List',