diff --git a/qubes/tests/mgmt.py b/qubes/tests/mgmt.py index 4168e54c..8c44b9e3 100644 --- a/qubes/tests/mgmt.py +++ b/qubes/tests/mgmt.py @@ -79,14 +79,6 @@ class TC_00_VMs(MgmtTestCase): self.assertEqual(value, 'test-vm1 class=AppVM state=Halted\n') - def test_002_vm_list_unexpected_arg(self): - with self.assertRaises(AssertionError): - self.call_mgmt_func(b'mgmt.vm.List', b'dom0', b'test-vm1', b'') - - def test_003_vm_list_unexpected_payload(self): - with self.assertRaises(AssertionError): - self.call_mgmt_func(b'mgmt.vm.List', b'dom0', b'', b'test-vm1') - def test_010_vm_property_list(self): # this test is kind of stupid, but at least check if appropriate # mgmt-permission event is fired @@ -257,13 +249,6 @@ class TC_00_VMs(MgmtTestCase): 'padlock is set.') self.assertFalse(self.app.save.called) - def test_051_vm_property_help_unexpected_payload(self): - with self.assertRaises(AssertionError): - self.call_mgmt_func(b'mgmt.vm.property.Help', b'test-vm1', - b'label', b'asdasd') - - self.assertFalse(self.app.save.called) - def test_052_vm_property_help_invalid_property(self): with self.assertRaises(AssertionError): self.call_mgmt_func(b'mgmt.vm.property.Help', b'test-vm1', @@ -279,14 +264,6 @@ class TC_00_VMs(MgmtTestCase): self.assertIsNone(value) self.app.save.assert_called_once_with() - def test_061_vm_property_reset_unexpected_payload(self): - with unittest.mock.patch('qubes.property.__delete__') as mock: - with self.assertRaises(AssertionError): - self.call_mgmt_func(b'mgmt.vm.property.Help', b'test-vm1', - b'label', b'asdasd') - self.assertFalse(mock.called) - self.assertFalse(self.app.save.called) - def test_062_vm_property_reset_invalid_property(self): with unittest.mock.patch('qubes.property.__delete__') as mock: with self.assertRaises(AssertionError): @@ -294,3 +271,327 @@ class TC_00_VMs(MgmtTestCase): b'no-such-property') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) + + def test_990_vm_unexpected_payload(self): + methods_with_no_payload = [ + b'mgmt.vm.List', + b'mgmt.vm.Remove', + b'mgmt.vm.property.List', + b'mgmt.vm.property.Get', + b'mgmt.vm.property.Help', + b'mgmt.vm.property.HelpRst', + b'mgmt.vm.property.Reset', + b'mgmt.vm.feature.List', + b'mgmt.vm.feature.Get', + b'mgmt.vm.feature.CheckWithTemplate', + b'mgmt.vm.feature.Remove', + b'mgmt.vm.tag.List', + b'mgmt.vm.tag.Get', + b'mgmt.vm.tag.Remove', + b'mgmt.vm.tag.Set', + b'mgmt.vm.firewall.Get', + b'mgmt.vm.firewall.RemoveRule', + b'mgmt.vm.firewall.Flush', + b'mgmt.vm.device.pci.Attach', + b'mgmt.vm.device.pci.Detach', + b'mgmt.vm.device.pci.List', + b'mgmt.vm.device.pci.Available', + b'mgmt.vm.microphone.Attach', + b'mgmt.vm.microphone.Detach', + b'mgmt.vm.microphone.Status', + b'mgmt.vm.volume.ListSnapshots', + b'mgmt.vm.volume.List', + b'mgmt.vm.volume.Info', + b'mgmt.vm.Start', + b'mgmt.vm.Shutdown', + b'mgmt.vm.Pause', + b'mgmt.vm.Unpause', + b'mgmt.vm.Kill', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_payload: + # should reject payload regardless of having argument or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_991_vm_unexpected_argument(self): + methods_with_no_argument = [ + b'mgmt.vm.List', + b'mgmt.vm.Clone', + b'mgmt.vm.Remove', + b'mgmt.vm.property.List', + b'mgmt.vm.feature.List', + b'mgmt.vm.tag.List', + b'mgmt.vm.firewall.List', + b'mgmt.vm.firewall.Flush', + b'mgmt.vm.device.pci.List', + b'mgmt.vm.device.pci.Available', + b'mgmt.vm.microphone.Attach', + b'mgmt.vm.microphone.Detach', + b'mgmt.vm.microphone.Status', + b'mgmt.vm.volume.List', + b'mgmt.vm.Start', + b'mgmt.vm.Shutdown', + b'mgmt.vm.Pause', + b'mgmt.vm.Unpause', + b'mgmt.vm.Kill', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_argument: + # should reject argument regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'unexpected-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_992_dom0_unexpected_payload(self): + methods_with_no_payload = [ + b'mgmt.vmclass.List', + b'mgmt.vm.List', + b'mgmt.label.List', + b'mgmt.label.Get', + b'mgmt.label.Remove', + b'mgmt.property.List', + b'mgmt.property.Get', + b'mgmt.property.Help', + b'mgmt.property.HelpRst', + b'mgmt.property.Reset', + b'mgmt.pool.List', + b'mgmt.pool.ListDrivers', + b'mgmt.pool.Info', + b'mgmt.pool.Remove', + b'mgmt.backup.Execute', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_payload: + # should reject payload regardless of having argument or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_993_dom0_unexpected_argument(self): + methods_with_no_argument = [ + b'mgmt.vmclass.List', + b'mgmt.vm.List', + b'mgmt.label.List', + b'mgmt.property.List', + b'mgmt.pool.List', + b'mgmt.pool.ListDrivers', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_argument: + # should reject argument regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'unexpected-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_994_dom0_only_calls(self): + # TODO set some better arguments, to make sure the call was rejected + # because of invalid destination, not invalid arguments + methods_for_dom0_only = [ + b'mgmt.vmclass.List', + b'mgmt.vm.Create.AppVM', + b'mgmt.vm.CreateInPool.AppVM', + b'mgmt.vm.CreateTemplate', + b'mgmt.label.List', + b'mgmt.label.Create', + b'mgmt.label.Get', + b'mgmt.label.Remove', + b'mgmt.property.List', + b'mgmt.property.Get', + b'mgmt.property.Set', + b'mgmt.property.Help', + b'mgmt.property.HelpRst', + b'mgmt.property.Reset', + b'mgmt.pool.List', + b'mgmt.pool.ListDrivers', + b'mgmt.pool.Info', + b'mgmt.pool.Add', + b'mgmt.pool.Remove', + b'mgmt.pool.volume.List', + b'mgmt.pool.volume.Info', + b'mgmt.pool.volume.ListSnapshots', + b'mgmt.pool.volume.Snapshot', + b'mgmt.pool.volume.Revert', + b'mgmt.pool.volume.Resize', + b'mgmt.backup.Execute', + b'mgmt.backup.Info', + b'mgmt.backup.Restore', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_for_dom0_only: + # should reject call regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'', + b'payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + @unittest.skip('undecided') + def test_995_vm_only_calls(self): + # XXX is it really a good idea to prevent those calls this early? + # TODO set some better arguments, to make sure the call was rejected + # because of invalid destination, not invalid arguments + methods_for_vm_only = [ + b'mgmt.vm.Clone', + b'mgmt.vm.Remove', + b'mgmt.vm.property.List', + b'mgmt.vm.property.Get', + b'mgmt.vm.property.Set', + b'mgmt.vm.property.Help', + b'mgmt.vm.property.HelpRst', + b'mgmt.vm.property.Reset', + b'mgmt.vm.feature.List', + b'mgmt.vm.feature.Get', + b'mgmt.vm.feature.Set', + b'mgmt.vm.feature.CheckWithTemplate', + b'mgmt.vm.feature.Remove', + b'mgmt.vm.tag.List', + b'mgmt.vm.tag.Get', + b'mgmt.vm.tag.Remove', + b'mgmt.vm.tag.Set', + b'mgmt.vm.firewall.Get', + b'mgmt.vm.firewall.RemoveRule', + b'mgmt.vm.firewall.InsertRule', + b'mgmt.vm.firewall.Flush', + b'mgmt.vm.device.pci.Attach', + b'mgmt.vm.device.pci.Detach', + b'mgmt.vm.device.pci.List', + b'mgmt.vm.device.pci.Available', + b'mgmt.vm.microphone.Attach', + b'mgmt.vm.microphone.Detach', + b'mgmt.vm.microphone.Status', + b'mgmt.vm.volume.ListSnapshots', + b'mgmt.vm.volume.List', + b'mgmt.vm.volume.Info', + b'mgmt.vm.volume.Revert', + b'mgmt.vm.volume.Resize', + b'mgmt.vm.Start', + b'mgmt.vm.Shutdown', + b'mgmt.vm.Pause', + b'mgmt.vm.Unpause', + b'mgmt.vm.Kill', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_for_vm_only: + # should reject payload regardless of having argument or not + # should reject call regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'', + b'payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called)