From 68a426f0ba631c6347444fef42cd71ce104c093f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Sat, 13 May 2017 02:09:12 +0200 Subject: [PATCH] admin: add methods for global properties QubesOS/qubes-issues#2622 --- qubes/api/admin.py | 77 +++++++++++++++++++++++++++++++++------- qubes/tests/api_admin.py | 42 ++++++++++++++++++++++ 2 files changed, 107 insertions(+), 12 deletions(-) diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 6f20fb90..2716fbd0 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -105,9 +105,19 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def vm_property_list(self): '''List all properties on a qube''' + return self._property_list(self.dest) + + @qubes.api.method('admin.property.List', no_payload=True) + @asyncio.coroutine + def property_list(self): + '''List all global properties''' + assert self.dest.name == 'dom0' + return self._property_list(self.app) + + def _property_list(self, dest): assert not self.arg - properties = self.fire_event_for_filter(self.dest.property_list()) + properties = self.fire_event_for_filter(dest.property_list()) return ''.join('{}\n'.format(prop.__name__) for prop in properties) @@ -115,11 +125,21 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def vm_property_get(self): '''Get a value of one property''' - assert self.arg in self.dest.property_list() + return self._property_get(self.dest) + + @qubes.api.method('admin.property.Get', no_payload=True) + @asyncio.coroutine + def property_get(self): + '''Get a value of one global property''' + assert self.dest.name == 'dom0' + return self._property_get(self.app) + + def _property_get(self, dest): + assert self.arg in dest.property_list() self.fire_event_for_permission() - property_def = self.dest.property_get_def(self.arg) + property_def = dest.property_get_def(self.arg) # explicit list to be sure that it matches protocol spec if isinstance(property_def, qubes.vm.VMProperty): property_type = 'vm' @@ -133,38 +153,61 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): property_type = 'str' try: - value = getattr(self.dest, self.arg) + value = getattr(dest, self.arg) except AttributeError: return 'default=True type={} '.format(property_type) else: return 'default={} type={} {}'.format( - str(self.dest.property_is_default(self.arg)), + str(dest.property_is_default(self.arg)), property_type, str(value) if value is not None else '') @qubes.api.method('admin.vm.property.Set') @asyncio.coroutine def vm_property_set(self, untrusted_payload): - assert self.arg in self.dest.property_list() + '''Set property value''' + return self._property_set(self.dest, + untrusted_payload=untrusted_payload) - property_def = self.dest.property_get_def(self.arg) + @qubes.api.method('admin.property.Set') + @asyncio.coroutine + def property_set(self, untrusted_payload): + '''Set property value''' + assert self.dest.name == 'dom0' + return self._property_set(self.app, + untrusted_payload=untrusted_payload) + + def _property_set(self, dest, untrusted_payload): + assert self.arg in dest.property_list() + + property_def = dest.property_get_def(self.arg) newvalue = property_def.sanitize(untrusted_newvalue=untrusted_payload) self.fire_event_for_permission(newvalue=newvalue) - setattr(self.dest, self.arg, newvalue) + setattr(dest, self.arg, newvalue) self.app.save() @qubes.api.method('admin.vm.property.Help', no_payload=True) @asyncio.coroutine def vm_property_help(self): '''Get help for one property''' - assert self.arg in self.dest.property_list() + return self._property_help(self.dest) + + @qubes.api.method('admin.property.Help', no_payload=True) + @asyncio.coroutine + def property_help(self): + '''Get help for one property''' + assert self.dest.name == 'dom0' + return self._property_help(self.app) + + def _property_help(self, dest): + assert self.arg in dest.property_list() self.fire_event_for_permission() try: - doc = self.dest.property_get_def(self.arg).__doc__ + doc = dest.property_get_def(self.arg).__doc__ except AttributeError: return '' @@ -174,11 +217,21 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def vm_property_reset(self): '''Reset a property to a default value''' - assert self.arg in self.dest.property_list() + return self._property_reset(self.dest) + + @qubes.api.method('admin.property.Reset', no_payload=True) + @asyncio.coroutine + def property_reset(self): + '''Reset a property to a default value''' + assert self.dest.name == 'dom0' + return self._property_reset(self.app) + + def _property_reset(self, dest): + assert self.arg in dest.property_list() self.fire_event_for_permission() - delattr(self.dest, self.arg) + delattr(dest, self.arg) self.app.save() @qubes.api.method('admin.vm.volume.List', no_payload=True) diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index 3c60d8a8..afb8e9d6 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -1284,6 +1284,48 @@ class TC_00_VMs(AdminAPITestCase): self.assertFalse(self.app.save.called) + + def test_400_property_list(self): + # actual function tested for admin.vm.property.* already + # this test is kind of stupid, but at least check if appropriate + # mgmt-permission event is fired + value = self.call_mgmt_func(b'admin.property.List', b'dom0') + properties = self.app.property_list() + self.assertEqual(value, + ''.join('{}\n'.format(prop.__name__) for prop in properties)) + + def test_410_property_get_str(self): + # actual function tested for admin.vm.property.* already + value = self.call_mgmt_func(b'admin.property.Get', b'dom0', + b'default_kernel') + self.assertEqual(value, 'default=False type=str 1.0') + + def test_420_propert_set_str(self): + # actual function tested for admin.vm.property.* already + with unittest.mock.patch('qubes.property.__set__') as mock: + value = self.call_mgmt_func(b'admin.property.Set', b'dom0', + b'default_kernel', b'1.0') + self.assertIsNone(value) + mock.assert_called_once_with(self.app, '1.0') + self.app.save.assert_called_once_with() + + def test_440_property_help(self): + # actual function tested for admin.vm.property.* already + value = self.call_mgmt_func(b'admin.property.Help', b'dom0', + b'clockvm') + self.assertEqual(value, + 'Which VM to use as NTP proxy for updating AdminVM') + self.assertFalse(self.app.save.called) + + def test_450_property_reset(self): + # actual function tested for admin.vm.property.* already + with unittest.mock.patch('qubes.property.__delete__') as mock: + value = self.call_mgmt_func(b'admin.property.Reset', b'dom0', + b'clockvm') + mock.assert_called_with(self.app) + self.assertIsNone(value) + self.app.save.assert_called_once_with() + def test_990_vm_unexpected_payload(self): methods_with_no_payload = [ b'admin.vm.List',