Quellcode durchsuchen

mgmt: vm.features handling (mgmt.vm.feature.*)

Actual implementation and tests.

QubesOS/qubes-issues#2622
Marek Marczykowski-Górecki vor 7 Jahren
Ursprung
Commit
0b737b3a55
3 geänderte Dateien mit 148 neuen und 1 gelöschten Zeilen
  1. 9 0
      qubes/exc.py
  2. 54 0
      qubes/mgmt.py
  3. 85 1
      qubes/tests/mgmt.py

+ 9 - 0
qubes/exc.py

@@ -142,3 +142,12 @@ class QubesMemoryError(QubesException, MemoryError):
         super(QubesMemoryError, self).__init__(
             msg or 'Not enough memory to start domain {!r}'.format(vm.name))
         self.vm = vm
+
+
+class QubesFeatureNotFoundError(QubesException, KeyError):
+    '''Feature not set for a given domain'''
+    def __init__(self, domain, feature):
+        super(QubesFeatureNotFoundError, self).__init__(
+            'Feature not set for domain {}: {}'.format(domain, feature))
+        self.feature = feature
+        self.vm = domain

+ 54 - 0
qubes/mgmt.py

@@ -595,3 +595,57 @@ class QubesMgmt(AbstractQubesMgmt):
         if self.dest.name == 'dom0':
             type(self.app).remove_handler('*', handler)
         qubes.vm.BaseVM.remove_handler('*', handler)
+
+    @api('mgmt.vm.feature.List', no_payload=True)
+    @asyncio.coroutine
+    def vm_feature_list(self):
+        assert not self.arg
+        features = self.fire_event_for_filter(self.dest.features.keys())
+        return ''.join('{}\n'.format(feature) for feature in features)
+
+    @api('mgmt.vm.feature.Get', no_payload=True)
+    @asyncio.coroutine
+    def vm_feature_get(self):
+        # validation of self.arg done by qrexec-policy is enough
+
+        self.fire_event_for_permission()
+        try:
+            value = self.dest.features[self.arg]
+        except KeyError:
+            raise qubes.exc.QubesFeatureNotFoundError(self.dest, self.arg)
+        return value
+
+    @api('mgmt.vm.feature.CheckWithTemplate', no_payload=True)
+    @asyncio.coroutine
+    def vm_feature_checkwithtemplate(self):
+        # validation of self.arg done by qrexec-policy is enough
+
+        self.fire_event_for_permission()
+        try:
+            value = self.dest.features.check_with_template(self.arg)
+        except KeyError:
+            raise qubes.exc.QubesFeatureNotFoundError(self.dest, self.arg)
+        return value
+
+    @api('mgmt.vm.feature.Remove', no_payload=True)
+    @asyncio.coroutine
+    def vm_feature_remove(self):
+        # validation of self.arg done by qrexec-policy is enough
+
+        self.fire_event_for_permission()
+        try:
+            del self.dest.features[self.arg]
+        except KeyError:
+            raise qubes.exc.QubesFeatureNotFoundError(self.dest, self.arg)
+        self.app.save()
+
+    @api('mgmt.vm.feature.Set')
+    @asyncio.coroutine
+    def vm_feature_set(self, untrusted_payload):
+        # validation of self.arg done by qrexec-policy is enough
+        value = untrusted_payload.decode('ascii', errors='strict')
+        del untrusted_payload
+
+        self.fire_event_for_permission(value=value)
+        self.dest.features[self.arg] = value
+        self.app.save()

+ 85 - 1
qubes/tests/mgmt.py

@@ -43,7 +43,8 @@ class MgmtTestCase(qubes.tests.QubesTestCase):
         app.load_initial_values()
         app.default_kernel = '1.0'
         app.default_netvm = None
-        app.add_new_vm('TemplateVM', label='black', name='test-template')
+        self.template = app.add_new_vm('TemplateVM', label='black',
+            name='test-template')
         app.default_template = 'test-template'
         app.save = unittest.mock.Mock()
         self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1',
@@ -861,6 +862,79 @@ class TC_00_VMs(MgmtTestCase):
                 unittest.mock.call(self.vm, 'test-event', arg1='abc')
             ])
 
+    def test_280_feature_list(self):
+        self.vm.features['test-feature'] = 'some-value'
+        value = self.call_mgmt_func(b'mgmt.vm.feature.List', b'test-vm1')
+        self.assertEqual(value, 'test-feature\n')
+        self.assertFalse(self.app.save.called)
+
+    def test_290_feature_get(self):
+        self.vm.features['test-feature'] = 'some-value'
+        value = self.call_mgmt_func(b'mgmt.vm.feature.Get', b'test-vm1',
+            b'test-feature')
+        self.assertEqual(value, 'some-value')
+        self.assertFalse(self.app.save.called)
+
+    def test_291_feature_get_none(self):
+        with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
+            self.call_mgmt_func(b'mgmt.vm.feature.Get',
+                b'test-vm1', b'test-feature')
+        self.assertFalse(self.app.save.called)
+
+    def test_300_feature_remove(self):
+        self.vm.features['test-feature'] = 'some-value'
+        value = self.call_mgmt_func(b'mgmt.vm.feature.Remove', b'test-vm1',
+            b'test-feature')
+        self.assertIsNone(value, None)
+        self.assertNotIn('test-feature', self.vm.features)
+        self.assertTrue(self.app.save.called)
+
+    def test_301_feature_remove_none(self):
+        with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
+            self.call_mgmt_func(b'mgmt.vm.feature.Remove',
+                b'test-vm1', b'test-feature')
+        self.assertFalse(self.app.save.called)
+
+    def test_310_feature_checkwithtemplate(self):
+        self.vm.features['test-feature'] = 'some-value'
+        value = self.call_mgmt_func(b'mgmt.vm.feature.CheckWithTemplate',
+            b'test-vm1', b'test-feature')
+        self.assertEqual(value, 'some-value')
+        self.assertFalse(self.app.save.called)
+
+    def test_311_feature_checkwithtemplate_tpl(self):
+        self.template.features['test-feature'] = 'some-value'
+        value = self.call_mgmt_func(b'mgmt.vm.feature.CheckWithTemplate',
+            b'test-vm1', b'test-feature')
+        self.assertEqual(value, 'some-value')
+        self.assertFalse(self.app.save.called)
+
+    def test_312_feature_checkwithtemplate_none(self):
+        with self.assertRaises(qubes.exc.QubesFeatureNotFoundError):
+            self.call_mgmt_func(b'mgmt.vm.feature.CheckWithTemplate',
+                b'test-vm1', b'test-feature')
+        self.assertFalse(self.app.save.called)
+
+    def test_320_feature_set(self):
+        value = self.call_mgmt_func(b'mgmt.vm.feature.Set',
+            b'test-vm1', b'test-feature', b'some-value')
+        self.assertIsNone(value)
+        self.assertEqual(self.vm.features['test-feature'], 'some-value')
+        self.assertTrue(self.app.save.called)
+
+    def test_321_feature_set_empty(self):
+        value = self.call_mgmt_func(b'mgmt.vm.feature.Set',
+            b'test-vm1', b'test-feature', b'')
+        self.assertIsNone(value)
+        self.assertEqual(self.vm.features['test-feature'], '')
+        self.assertTrue(self.app.save.called)
+
+    def test_320_feature_set_invalid(self):
+        with self.assertRaises(UnicodeDecodeError):
+            self.call_mgmt_func(b'mgmt.vm.feature.Set',
+                b'test-vm1', b'test-feature', b'\x02\x03\xffsome-value')
+        self.assertNotIn('test-feature', self.vm.features)
+        self.assertFalse(self.app.save.called)
 
     def test_990_vm_unexpected_payload(self):
         methods_with_no_payload = [
@@ -898,6 +972,10 @@ class TC_00_VMs(MgmtTestCase):
             b'mgmt.vm.Unpause',
             b'mgmt.vm.Kill',
             b'mgmt.Events',
+            b'mgmt.vm.feature.List',
+            b'mgmt.vm.feature.Get',
+            b'mgmt.vm.feature.Remove',
+            b'mgmt.vm.feature.CheckWithTemplate',
         ]
         # make sure also no methods on actual VM gets called
         vm_mock = unittest.mock.MagicMock()
@@ -943,6 +1021,7 @@ class TC_00_VMs(MgmtTestCase):
             b'mgmt.vm.Unpause',
             b'mgmt.vm.Kill',
             b'mgmt.Events',
+            b'mgmt.vm.feature.List',
         ]
         # make sure also no methods on actual VM gets called
         vm_mock = unittest.mock.MagicMock()
@@ -1152,6 +1231,11 @@ class TC_00_VMs(MgmtTestCase):
             b'mgmt.vm.Pause',
             b'mgmt.vm.Unpause',
             b'mgmt.vm.Kill',
+            b'mgmt.vm.feature.List',
+            b'mgmt.vm.feature.Get',
+            b'mgmt.vm.feature.Set',
+            b'mgmt.vm.feature.Remove',
+            b'mgmt.vm.feature.CheckWithTemplate',
         ]
         # make sure also no methods on actual VM gets called
         vm_mock = unittest.mock.MagicMock()