# -*- encoding: utf8 -*- # # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2017 Marek Marczykowski-Górecki # # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, see . ''' Tests for management calls endpoints ''' import asyncio import operator import os import shutil import unittest.mock import libvirt import qubes import qubes.devices import qubes.firewall import qubes.api.admin import qubes.tests import qubes.storage # properties defined in API volume_properties = [ 'pool', 'vid', 'size', 'usage', 'rw', 'source', 'save_on_stop', 'snap_on_start'] class AdminAPITestCase(qubes.tests.QubesTestCase): def setUp(self): super().setUp() self.test_base_dir = '/tmp/qubes-test-dir' self.base_dir_patch = unittest.mock.patch.dict(qubes.config.system_path, {'qubes_base_dir': self.test_base_dir}) self.base_dir_patch2 = unittest.mock.patch( 'qubes.config.qubes_base_dir', self.test_base_dir) self.base_dir_patch3 = unittest.mock.patch.dict( qubes.config.defaults['pool_configs']['varlibqubes'], {'dir_path': self.test_base_dir}) self.base_dir_patch.start() self.base_dir_patch2.start() self.base_dir_patch3.start() app = qubes.Qubes('/tmp/qubes-test.xml', load=False) app.vmm = unittest.mock.Mock(spec=qubes.app.VMMConnection) app.load_initial_values() app.default_kernel = '1.0' app.default_netvm = None self.template = app.add_new_vm('TemplateVM', label='black', name='test-template') app.default_template = 'test-template' with qubes.tests.substitute_entry_points('qubes.storage', 'qubes.tests.storage'): app.add_pool('test', driver='test') app.default_pool = 'varlibqubes' app.save = unittest.mock.Mock() self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1', template='test-template') self.app = app libvirt_attrs = { 'libvirt_conn.lookupByUUID.return_value.isActive.return_value': False, 'libvirt_conn.lookupByUUID.return_value.state.return_value': [libvirt.VIR_DOMAIN_SHUTOFF], } app.vmm.configure_mock(**libvirt_attrs) self.emitter = qubes.tests.TestEmitter() self.app.domains[0].fire_event = self.emitter.fire_event def tearDown(self): self.base_dir_patch3.stop() self.base_dir_patch2.stop() self.base_dir_patch.stop() if os.path.exists(self.test_base_dir): shutil.rmtree(self.test_base_dir) super(AdminAPITestCase, self).tearDown() def call_mgmt_func(self, method, dest, arg=b'', payload=b''): mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', method, dest, arg) loop = asyncio.get_event_loop() response = loop.run_until_complete( mgmt_obj.execute(untrusted_payload=payload)) self.assertEventFired(self.emitter, 'admin-permission:' + method.decode('ascii')) return response class TC_00_VMs(AdminAPITestCase): def test_000_vm_list(self): value = self.call_mgmt_func(b'admin.vm.List', b'dom0') self.assertEqual(value, 'dom0 class=AdminVM state=Running\n' 'test-template class=TemplateVM state=Halted\n' 'test-vm1 class=AppVM state=Halted\n') def test_001_vm_list_single(self): value = self.call_mgmt_func(b'admin.vm.List', b'test-vm1') self.assertEqual(value, 'test-vm1 class=AppVM state=Halted\n') def test_010_vm_property_list(self): # this test is kind of stupid, but at least check if appropriate # admin-permission event is fired value = self.call_mgmt_func(b'admin.vm.property.List', b'test-vm1') properties = self.app.domains['test-vm1'].property_list() self.assertEqual(value, ''.join('{}\n'.format(prop.__name__) for prop in properties)) def test_020_vm_property_get_str(self): value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1', b'name') self.assertEqual(value, 'default=False type=str test-vm1') def test_021_vm_property_get_int(self): value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1', b'vcpus') self.assertEqual(value, 'default=True type=int 2') def test_022_vm_property_get_bool(self): value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1', b'provides_network') self.assertEqual(value, 'default=True type=bool False') def test_023_vm_property_get_label(self): value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1', b'label') self.assertEqual(value, 'default=False type=label red') def test_024_vm_property_get_vm(self): value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1', b'template') self.assertEqual(value, 'default=False type=vm test-template') def test_025_vm_property_get_vm_none(self): value = self.call_mgmt_func(b'admin.vm.property.Get', b'test-vm1', b'netvm') self.assertEqual(value, 'default=True type=vm ') def test_030_vm_property_set_vm(self): netvm = self.app.add_new_vm('AppVM', label='red', name='test-net', template='test-template', provides_network=True) with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'netvm', b'test-net') self.assertIsNone(value) mock.assert_called_once_with(self.vm, 'test-net') self.app.save.assert_called_once_with() def test_0301_vm_property_set_vm_none(self): netvm = self.app.add_new_vm('AppVM', label='red', name='test-net', template='test-template', provides_network=True) with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'netvm', b'') self.assertIsNone(value) mock.assert_called_once_with(self.vm, '') self.app.save.assert_called_once_with() def test_032_vm_property_set_vm_invalid1(self): with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'netvm', b'forbidden-chars/../!') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_033_vm_property_set_vm_invalid2(self): with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'netvm', b'\x80\x90\xa0') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_034_vm_propert_set_bool_true(self): with unittest.mock.patch('qubes.property.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'autostart', b'True') self.assertIsNone(value) mock.assert_called_once_with(self.vm, True) self.app.save.assert_called_once_with() def test_035_vm_propert_set_bool_false(self): with unittest.mock.patch('qubes.property.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'autostart', b'False') self.assertIsNone(value) mock.assert_called_once_with(self.vm, False) self.app.save.assert_called_once_with() def test_036_vm_propert_set_bool_invalid1(self): with unittest.mock.patch('qubes.property.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'autostart', b'some string') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_037_vm_propert_set_bool_invalid2(self): with unittest.mock.patch('qubes.property.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'autostart', b'\x80\x90@#$%^&*(') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_038_vm_propert_set_str(self): with unittest.mock.patch('qubes.property.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'kernel', b'1.0') self.assertIsNone(value) mock.assert_called_once_with(self.vm, '1.0') self.app.save.assert_called_once_with() def test_039_vm_propert_set_str_invalid1(self): with unittest.mock.patch('qubes.property.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'kernel', b'some, non-ASCII: \x80\xd2') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_040_vm_propert_set_int(self): with unittest.mock.patch('qubes.property.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'maxmem', b'1024000') self.assertIsNone(value) mock.assert_called_once_with(self.vm, 1024000) self.app.save.assert_called_once_with() def test_041_vm_propert_set_int_invalid1(self): with unittest.mock.patch('qubes.property.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'maxmem', b'fourty two') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_042_vm_propert_set_label(self): with unittest.mock.patch('qubes.property.__set__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'label', b'green') self.assertIsNone(value) mock.assert_called_once_with(self.vm, 'green') self.app.save.assert_called_once_with() def test_043_vm_propert_set_label_invalid1(self): with unittest.mock.patch('qubes.property.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'maxmem', b'some, non-ASCII: \x80\xd2') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) @unittest.skip('label existence not checked before actual setter yet') def test_044_vm_propert_set_label_invalid2(self): with unittest.mock.patch('qubes.property.__set__') as mock: with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.property.Set', b'test-vm1', b'maxmem', b'non-existing-color') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_050_vm_property_help(self): value = self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1', b'label') self.assertEqual(value, 'Colourful label assigned to VM. This is where the colour of the ' 'padlock is set.') self.assertFalse(self.app.save.called) def test_052_vm_property_help_invalid_property(self): with self.assertRaises(qubes.exc.QubesNoSuchPropertyError): self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1', b'no-such-property') self.assertFalse(self.app.save.called) def test_060_vm_property_reset(self): with unittest.mock.patch('qubes.property.__delete__') as mock: value = self.call_mgmt_func(b'admin.vm.property.Reset', b'test-vm1', b'default_user') mock.assert_called_with(self.vm) self.assertIsNone(value) self.app.save.assert_called_once_with() def test_062_vm_property_reset_invalid_property(self): with unittest.mock.patch('qubes.property.__delete__') as mock: with self.assertRaises(qubes.exc.QubesNoSuchPropertyError): self.call_mgmt_func(b'admin.vm.property.Help', b'test-vm1', b'no-such-property') self.assertFalse(mock.called) self.assertFalse(self.app.save.called) def test_070_vm_volume_list(self): self.vm.volumes = unittest.mock.Mock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] } self.vm.volumes.configure_mock(**volumes_conf) value = self.call_mgmt_func(b'admin.vm.volume.List', b'test-vm1') self.assertEqual(value, 'root\nprivate\nvolatile\nkernel\n') # check if _only_ keys were accessed self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) def test_080_vm_volume_info(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] } for prop in volume_properties: volumes_conf[ '__getitem__.return_value.{}'.format(prop)] = prop +'-value' self.vm.volumes.configure_mock(**volumes_conf) value = self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1', b'private') self.assertEqual(value, ''.join('{p}={p}-value\n'.format(p=p) for p in volume_properties)) self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys(), unittest.mock.call.__getattr__('__getitem__')('private')]) def test_080_vm_volume_info_invalid_volume(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] } self.vm.volumes.configure_mock(**volumes_conf) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.Info', b'test-vm1', b'no-such-volume') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) def test_090_vm_volume_listsnapshots(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], '__getitem__.return_value.revisions': ['rev1', 'rev2'], } self.vm.volumes.configure_mock(**volumes_conf) value = self.call_mgmt_func(b'admin.vm.volume.ListSnapshots', b'test-vm1', b'private') self.assertEqual(value, 'rev1\nrev2\n') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys(), unittest.mock.call.__getattr__('__getitem__')('private')]) def test_090_vm_volume_listsnapshots_invalid_volume(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] } self.vm.volumes.configure_mock(**volumes_conf) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.ListSnapshots', b'test-vm1', b'no-such-volume') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) @unittest.skip('method not implemented yet') def test_100_vm_volume_snapshot(self): pass @unittest.skip('method not implemented yet') def test_100_vm_volume_snapshot_invalid_volume(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], '__getitem__.return_value.revisions': ['rev1', 'rev2'], } self.vm.volumes.configure_mock(**volumes_conf) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.Snapshots', b'test-vm1', b'no-such-volume') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) @unittest.skip('method not implemented yet') def test_100_vm_volume_snapshot_invalid_revision(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] } self.vm.volumes.configure_mock(**volumes_conf) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.Snapshots', b'test-vm1', b'private', b'no-such-rev') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys(), unittest.mock.call.__getattr__('__getitem__')('private')]) def test_110_vm_volume_revert(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], '__getitem__.return_value.revisions': ['rev1', 'rev2'], } self.vm.volumes.configure_mock(**volumes_conf) del self.vm.volumes['private'].revert('rev1')._is_coroutine self.vm.storage = unittest.mock.Mock() value = self.call_mgmt_func(b'admin.vm.volume.Revert', b'test-vm1', b'private', b'rev1') self.assertIsNone(value) print(repr(self.vm.volumes.mock_calls)) self.assertEqual(self.vm.volumes.mock_calls, [ ('__getitem__', ('private', ), {}), ('__getitem__().revert', ('rev1', ), {}), ('keys', (), {}), ('__getitem__', ('private', ), {}), ('__getitem__().__hash__', (), {}), ('__getitem__().revert', ('rev1', ), {}), ]) self.assertEqual(self.vm.storage.mock_calls, []) def test_110_vm_volume_revert_invalid_rev(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], '__getitem__.return_value.revisions': ['rev1', 'rev2'], } self.vm.volumes.configure_mock(**volumes_conf) self.vm.storage = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.Revert', b'test-vm1', b'private', b'no-such-rev') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys(), unittest.mock.call.__getattr__('__getitem__')('private')]) self.assertFalse(self.vm.storage.called) def test_120_vm_volume_resize(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], } self.vm.volumes.configure_mock(**volumes_conf) self.vm.storage = unittest.mock.Mock() self.vm.storage.resize.side_effect = self.dummy_coro value = self.call_mgmt_func(b'admin.vm.volume.Resize', b'test-vm1', b'private', b'1024000000') self.assertIsNone(value) self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) self.assertEqual(self.vm.storage.mock_calls, [unittest.mock.call.resize('private', 1024000000)]) def test_120_vm_volume_resize_invalid_size1(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], } self.vm.volumes.configure_mock(**volumes_conf) self.vm.storage = unittest.mock.Mock() self.vm.storage.resize.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.Resize', b'test-vm1', b'private', b'no-int-size') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) self.assertFalse(self.vm.storage.called) def test_120_vm_volume_resize_invalid_size2(self): self.vm.volumes = unittest.mock.MagicMock() volumes_conf = { 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], } self.vm.volumes.configure_mock(**volumes_conf) self.vm.storage = unittest.mock.Mock() self.vm.storage.resize.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.Resize', b'test-vm1', b'private', b'-1') self.assertEqual(self.vm.volumes.mock_calls, [unittest.mock.call.keys()]) self.assertFalse(self.vm.storage.called) def test_130_pool_list(self): self.app.pools = ['file', 'lvm'] value = self.call_mgmt_func(b'admin.pool.List', b'dom0') self.assertEqual(value, 'file\nlvm\n') self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_140_pool_listdrivers(self, mock_parameters, mock_drivers): self.app.pools = ['file', 'lvm'] mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] value = self.call_mgmt_func(b'admin.pool.ListDrivers', b'dom0') self.assertEqual(value, 'driver1 param1 param2\ndriver2 param3 param4\n') self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, [unittest.mock.call('driver1'), unittest.mock.call('driver2')]) self.assertFalse(self.app.save.called) def test_150_pool_info(self): self.app.pools = { 'pool1': unittest.mock.Mock(config={ 'param1': 'value1', 'param2': 'value2'}) } value = self.call_mgmt_func(b'admin.pool.Info', b'dom0', b'pool1') self.assertEqual(value, 'param1=value1\nparam2=value2\n') self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_160_pool_add(self, mock_parameters, mock_drivers): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock() } mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] self.app.add_pool = unittest.mock.Mock() value = self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1', b'name=test-pool\nparam1=some-value\n') self.assertIsNone(value) self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, [unittest.mock.call('driver1')]) self.assertEqual(self.app.add_pool.mock_calls, [unittest.mock.call(name='test-pool', driver='driver1', param1='some-value')]) self.assertTrue(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_160_pool_add_invalid_driver(self, mock_parameters, mock_drivers): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock() } mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] self.app.add_pool = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'no-such-driver', b'name=test-pool\nparam1=some-value\n') self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, []) self.assertEqual(self.app.add_pool.mock_calls, []) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_160_pool_add_invalid_param(self, mock_parameters, mock_drivers): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock() } mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] self.app.add_pool = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1', b'name=test-pool\nparam3=some-value\n') self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, [unittest.mock.call('driver1')]) self.assertEqual(self.app.add_pool.mock_calls, []) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_160_pool_add_missing_name(self, mock_parameters, mock_drivers): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock() } mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] self.app.add_pool = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1', b'param1=value\nparam2=some-value\n') self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, []) self.assertEqual(self.app.add_pool.mock_calls, []) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_160_pool_add_existing_pool(self, mock_parameters, mock_drivers): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock() } mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] self.app.add_pool = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1', b'name=file\nparam1=value\nparam2=some-value\n') self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, []) self.assertEqual(self.app.add_pool.mock_calls, []) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.pool_drivers') @unittest.mock.patch('qubes.storage.driver_parameters') def test_160_pool_add_invalid_config_format(self, mock_parameters, mock_drivers): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock() } mock_drivers.return_value = ['driver1', 'driver2'] mock_parameters.side_effect = \ lambda driver: { 'driver1': ['param1', 'param2'], 'driver2': ['param3', 'param4'] }[driver] self.app.add_pool = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.pool.Add', b'dom0', b'driver1', b'name=test-pool\nparam 1=value\n_param2\n') self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) self.assertEqual(mock_parameters.mock_calls, []) self.assertEqual(self.app.add_pool.mock_calls, []) self.assertFalse(self.app.save.called) def test_170_pool_remove(self): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock(), 'test-pool': unittest.mock.Mock(), } self.app.remove_pool = unittest.mock.Mock() value = self.call_mgmt_func(b'admin.pool.Remove', b'dom0', b'test-pool') self.assertIsNone(value) self.assertEqual(self.app.remove_pool.mock_calls, [unittest.mock.call('test-pool')]) self.assertTrue(self.app.save.called) def test_170_pool_remove_invalid_pool(self): self.app.pools = { 'file': unittest.mock.Mock(), 'lvm': unittest.mock.Mock(), 'test-pool': unittest.mock.Mock(), } self.app.remove_pool = unittest.mock.Mock() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.pool.Remove', b'dom0', b'no-such-pool') self.assertEqual(self.app.remove_pool.mock_calls, []) self.assertFalse(self.app.save.called) def test_180_label_list(self): value = self.call_mgmt_func(b'admin.label.List', b'dom0') self.assertEqual(value, ''.join('{}\n'.format(l.name) for l in self.app.labels.values())) self.assertFalse(self.app.save.called) def test_190_label_get(self): self.app.get_label = unittest.mock.Mock() self.app.get_label.configure_mock(**{'return_value.color': '0xff0000'}) value = self.call_mgmt_func(b'admin.label.Get', b'dom0', b'red') self.assertEqual(value, '0xff0000') self.assertEqual(self.app.get_label.mock_calls, [unittest.mock.call('red')]) self.assertFalse(self.app.save.called) def test_195_label_index(self): self.app.get_label = unittest.mock.Mock() self.app.get_label.configure_mock(**{'return_value.index': 1}) value = self.call_mgmt_func(b'admin.label.Index', b'dom0', b'red') self.assertEqual(value, '1') self.assertEqual(self.app.get_label.mock_calls, [unittest.mock.call('red')]) self.assertFalse(self.app.save.called) def test_200_label_create(self): self.app.get_label = unittest.mock.Mock() self.app.get_label.side_effect=KeyError self.app.labels = unittest.mock.MagicMock() labels_config = { 'keys.return_value': range(1, 9), } self.app.labels.configure_mock(**labels_config) value = self.call_mgmt_func(b'admin.label.Create', b'dom0', b'cyan', b'0x00ffff') self.assertIsNone(value) self.assertEqual(self.app.get_label.mock_calls, [unittest.mock.call('cyan')]) self.assertEqual(self.app.labels.mock_calls, [unittest.mock.call.keys(), unittest.mock.call.__getattr__('__setitem__')(9, qubes.Label(9, '0x00ffff', 'cyan'))]) self.assertTrue(self.app.save.called) def test_200_label_create_invalid_color(self): self.app.get_label = unittest.mock.Mock() self.app.get_label.side_effect=KeyError self.app.labels = unittest.mock.MagicMock() labels_config = { 'keys.return_value': range(1, 9), } self.app.labels.configure_mock(**labels_config) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.label.Create', b'dom0', b'cyan', b'abcd') self.assertEqual(self.app.get_label.mock_calls, [unittest.mock.call('cyan')]) self.assertEqual(self.app.labels.mock_calls, []) self.assertFalse(self.app.save.called) def test_200_label_create_invalid_name(self): self.app.get_label = unittest.mock.Mock() self.app.get_label.side_effect=KeyError self.app.labels = unittest.mock.MagicMock() labels_config = { 'keys.return_value': range(1, 9), } self.app.labels.configure_mock(**labels_config) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.label.Create', b'dom0', b'01', b'0xff0000') with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.label.Create', b'dom0', b'../xxx', b'0xff0000') with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.label.Create', b'dom0', b'strange-name!@#$', b'0xff0000') self.assertEqual(self.app.get_label.mock_calls, []) self.assertEqual(self.app.labels.mock_calls, []) self.assertFalse(self.app.save.called) def test_200_label_create_already_exists(self): self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label) with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.label.Create', b'dom0', b'red', b'abcd') self.assertEqual(self.app.get_label.mock_calls, [unittest.mock.call('red')]) self.assertFalse(self.app.save.called) def test_210_label_remove(self): label = qubes.Label(9, '0x00ffff', 'cyan') self.app.labels[9] = label self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label, **{'return_value.index': 9}) self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels) value = self.call_mgmt_func(b'admin.label.Remove', b'dom0', b'cyan') self.assertIsNone(value) self.assertEqual(self.app.get_label.mock_calls, [unittest.mock.call('cyan')]) self.assertEqual(self.app.labels.mock_calls, [unittest.mock.call.__delitem__(9)]) self.assertTrue(self.app.save.called) def test_210_label_remove_invalid_label(self): with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.label.Remove', b'dom0', b'no-such-label') self.assertFalse(self.app.save.called) def test_210_label_remove_default_label(self): self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels) self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label, **{'return_value.index': 6}) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.label.Remove', b'dom0', b'blue') self.assertEqual(self.app.labels.mock_calls, []) self.assertFalse(self.app.save.called) def test_210_label_remove_in_use(self): self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels) self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label, **{'return_value.index': 1}) with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.label.Remove', b'dom0', b'red') self.assertEqual(self.app.labels.mock_calls, []) self.assertFalse(self.app.save.called) def test_220_start(self): func_mock = unittest.mock.Mock() @asyncio.coroutine def coroutine_mock(*args, **kwargs): return func_mock(*args, **kwargs) self.vm.start = coroutine_mock value = self.call_mgmt_func(b'admin.vm.Start', b'test-vm1') self.assertIsNone(value) func_mock.assert_called_once_with() def test_230_shutdown(self): func_mock = unittest.mock.Mock() @asyncio.coroutine def coroutine_mock(*args, **kwargs): return func_mock(*args, **kwargs) self.vm.shutdown = coroutine_mock value = self.call_mgmt_func(b'admin.vm.Shutdown', b'test-vm1') self.assertIsNone(value) func_mock.assert_called_once_with() def test_240_pause(self): func_mock = unittest.mock.Mock() @asyncio.coroutine def coroutine_mock(*args, **kwargs): return func_mock(*args, **kwargs) self.vm.pause = coroutine_mock value = self.call_mgmt_func(b'admin.vm.Pause', b'test-vm1') self.assertIsNone(value) func_mock.assert_called_once_with() def test_250_unpause(self): func_mock = unittest.mock.Mock() @asyncio.coroutine def coroutine_mock(*args, **kwargs): return func_mock(*args, **kwargs) self.vm.unpause = coroutine_mock value = self.call_mgmt_func(b'admin.vm.Unpause', b'test-vm1') self.assertIsNone(value) func_mock.assert_called_once_with() def test_260_kill(self): func_mock = unittest.mock.Mock() @asyncio.coroutine def coroutine_mock(*args, **kwargs): return func_mock(*args, **kwargs) self.vm.kill = coroutine_mock value = self.call_mgmt_func(b'admin.vm.Kill', b'test-vm1') self.assertIsNone(value) func_mock.assert_called_once_with() def test_270_events(self): send_event = unittest.mock.Mock(spec=[]) mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', b'admin.Events', b'dom0', b'', send_event=send_event) @asyncio.coroutine def fire_event(): self.vm.fire_event('test-event', arg1='abc') mgmt_obj.cancel() loop = asyncio.get_event_loop() execute_task = asyncio.ensure_future( mgmt_obj.execute(untrusted_payload=b'')) asyncio.ensure_future(fire_event()) loop.run_until_complete(execute_task) self.assertIsNone(execute_task.result()) self.assertEventFired(self.emitter, 'admin-permission:' + 'admin.Events') self.assertEqual(send_event.mock_calls, [ unittest.mock.call(self.app, 'connection-established'), unittest.mock.call(self.vm, 'test-event', arg1='abc') ]) def test_271_events_add_vm(self): send_event = unittest.mock.Mock(spec=[]) mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'dom0', b'admin.Events', b'dom0', b'', send_event=send_event) @asyncio.coroutine def fire_event(): self.vm.fire_event('test-event', arg1='abc') # add VM _after_ starting admin.Events call vm = self.app.add_new_vm('AppVM', label='red', name='test-vm2', template='test-template') vm.fire_event('test-event2', arg1='abc') mgmt_obj.cancel() return vm loop = asyncio.get_event_loop() execute_task = asyncio.ensure_future( mgmt_obj.execute(untrusted_payload=b'')) event_task = asyncio.ensure_future(fire_event()) loop.run_until_complete(execute_task) vm2 = event_task.result() self.assertIsNone(execute_task.result()) self.assertEventFired(self.emitter, 'admin-permission:' + 'admin.Events') self.assertEqual(send_event.mock_calls, [ unittest.mock.call(self.app, 'connection-established'), unittest.mock.call(self.vm, 'test-event', arg1='abc'), unittest.mock.call(self.app, 'domain-add', vm=vm2), unittest.mock.call(vm2, 'test-event2', arg1='abc'), ]) def test_280_feature_list(self): self.vm.features['test-feature'] = 'some-value' value = self.call_mgmt_func(b'admin.vm.feature.List', b'test-vm1') self.assertEqual(value, 'test-feature\n') self.assertFalse(self.app.save.called) def test_290_feature_get(self): self.vm.features['test-feature'] = 'some-value' value = self.call_mgmt_func(b'admin.vm.feature.Get', b'test-vm1', b'test-feature') self.assertEqual(value, 'some-value') self.assertFalse(self.app.save.called) def test_291_feature_get_none(self): with self.assertRaises(qubes.exc.QubesFeatureNotFoundError): self.call_mgmt_func(b'admin.vm.feature.Get', b'test-vm1', b'test-feature') self.assertFalse(self.app.save.called) def test_300_feature_remove(self): self.vm.features['test-feature'] = 'some-value' value = self.call_mgmt_func(b'admin.vm.feature.Remove', b'test-vm1', b'test-feature') self.assertIsNone(value, None) self.assertNotIn('test-feature', self.vm.features) self.assertTrue(self.app.save.called) def test_301_feature_remove_none(self): with self.assertRaises(qubes.exc.QubesFeatureNotFoundError): self.call_mgmt_func(b'admin.vm.feature.Remove', b'test-vm1', b'test-feature') self.assertFalse(self.app.save.called) def test_310_feature_checkwithtemplate(self): self.vm.features['test-feature'] = 'some-value' value = self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate', b'test-vm1', b'test-feature') self.assertEqual(value, 'some-value') self.assertFalse(self.app.save.called) def test_311_feature_checkwithtemplate_tpl(self): self.template.features['test-feature'] = 'some-value' value = self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate', b'test-vm1', b'test-feature') self.assertEqual(value, 'some-value') self.assertFalse(self.app.save.called) def test_312_feature_checkwithtemplate_none(self): with self.assertRaises(qubes.exc.QubesFeatureNotFoundError): self.call_mgmt_func(b'admin.vm.feature.CheckWithTemplate', b'test-vm1', b'test-feature') self.assertFalse(self.app.save.called) def test_320_feature_set(self): value = self.call_mgmt_func(b'admin.vm.feature.Set', b'test-vm1', b'test-feature', b'some-value') self.assertIsNone(value) self.assertEqual(self.vm.features['test-feature'], 'some-value') self.assertTrue(self.app.save.called) def test_321_feature_set_empty(self): value = self.call_mgmt_func(b'admin.vm.feature.Set', b'test-vm1', b'test-feature', b'') self.assertIsNone(value) self.assertEqual(self.vm.features['test-feature'], '') self.assertTrue(self.app.save.called) def test_320_feature_set_invalid(self): with self.assertRaises(UnicodeDecodeError): self.call_mgmt_func(b'admin.vm.feature.Set', b'test-vm1', b'test-feature', b'\x02\x03\xffsome-value') self.assertNotIn('test-feature', self.vm.features) self.assertFalse(self.app.save.called) @asyncio.coroutine def dummy_coro(self, *args, **kwargs): pass @unittest.mock.patch('qubes.storage.Storage.create') def test_330_vm_create_standalone(self, storage_mock): storage_mock.side_effect = self.dummy_coro self.call_mgmt_func(b'admin.vm.Create.StandaloneVM', b'dom0', b'', b'name=test-vm2 label=red') self.assertIn('test-vm2', self.app.domains) vm = self.app.domains['test-vm2'] self.assertIsInstance(vm, qubes.vm.standalonevm.StandaloneVM) self.assertEqual(vm.label, self.app.get_label('red')) self.assertEqual(storage_mock.mock_calls, [unittest.mock.call(self.app.domains['test-vm2']).create()]) self.assertTrue(os.path.exists(os.path.join( self.test_base_dir, 'appvms', 'test-vm2'))) self.assertTrue(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_331_vm_create_standalone_spurious_template(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.Create.StandaloneVM', b'dom0', b'test-template', b'name=test-vm2 label=red') self.assertNotIn('test-vm2', self.app.domains) self.assertEqual(storage_mock.mock_calls, []) self.assertFalse(os.path.exists(os.path.join( self.test_base_dir, 'appvms', 'test-vm2'))) self.assertNotIn('test-vm2', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_332_vm_create_app(self, storage_mock): storage_mock.side_effect = self.dummy_coro self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red') self.assertIn('test-vm2', self.app.domains) vm = self.app.domains['test-vm2'] self.assertEqual(vm.label, self.app.get_label('red')) self.assertEqual(vm.template, self.app.domains['test-template']) self.assertEqual(storage_mock.mock_calls, [unittest.mock.call(self.app.domains['test-vm2']).create()]) self.assertTrue(os.path.exists(os.path.join( self.test_base_dir, 'appvms', 'test-vm2'))) self.assertTrue(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_333_vm_create_app_default_template(self, storage_mock): storage_mock.side_effect = self.dummy_coro self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'', b'name=test-vm2 label=red') self.assertEqual(storage_mock.mock_calls, [unittest.mock.call(self.app.domains['test-vm2']).create()]) self.assertIn('test-vm2', self.app.domains) self.assertEqual(self.app.domains['test-vm2'].template, self.app.default_template) self.assertTrue(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_334_vm_create_invalid_name(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(qubes.exc.QubesValueError): self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'test-template', b'name=test-###') self.assertNotIn('test-###', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_335_vm_create_missing_name(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'test-template', b'label=red') self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_336_vm_create_spurious_pool(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red pool=default') self.assertNotIn('test-vm2', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_337_vm_create_duplicate_name(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(qubes.exc.QubesException): self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'test-template', b'name=test-vm1 label=red') self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_338_vm_create_name_twice(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.Create.AppVM', b'dom0', b'test-template', b'name=test-vm2 name=test-vm3 label=red') self.assertNotIn('test-vm2', self.app.domains) self.assertNotIn('test-vm3', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_340_vm_create_in_pool_app(self, storage_mock): storage_mock.side_effect = self.dummy_coro self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool=test') self.assertIn('test-vm2', self.app.domains) vm = self.app.domains['test-vm2'] self.assertEqual(vm.label, self.app.get_label('red')) self.assertEqual(vm.template, self.app.domains['test-template']) # setting pool= affect only volumes actually created for this VM, # not used from a template or so self.assertEqual(vm.volume_config['root']['pool'], self.template.volumes['root'].pool) self.assertEqual(vm.volume_config['private']['pool'], 'test') self.assertEqual(vm.volume_config['volatile']['pool'], 'test') self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel') self.assertEqual(storage_mock.mock_calls, [unittest.mock.call(self.app.domains['test-vm2']).create()]) self.assertTrue(os.path.exists(os.path.join( self.test_base_dir, 'appvms', 'test-vm2'))) self.assertTrue(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_341_vm_create_in_pool_private(self, storage_mock): storage_mock.side_effect = self.dummy_coro self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool:private=test') self.assertIn('test-vm2', self.app.domains) vm = self.app.domains['test-vm2'] self.assertEqual(vm.label, self.app.get_label('red')) self.assertEqual(vm.template, self.app.domains['test-template']) self.assertEqual(vm.volume_config['root']['pool'], self.template.volumes['root'].pool) self.assertEqual(vm.volume_config['private']['pool'], 'test') self.assertEqual(vm.volume_config['volatile']['pool'], self.app.default_pool_volatile) self.assertEqual(vm.volume_config['kernel']['pool'], 'linux-kernel') self.assertEqual(storage_mock.mock_calls, [unittest.mock.call(self.app.domains['test-vm2']).create()]) self.assertTrue(os.path.exists(os.path.join( self.test_base_dir, 'appvms', 'test-vm2'))) self.assertTrue(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_342_vm_create_in_pool_invalid_pool(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(qubes.exc.QubesException): self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool=no-such-pool') self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_343_vm_create_in_pool_invalid_pool2(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(qubes.exc.QubesException): self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool:private=no-such-pool') self.assertNotIn('test-vm2', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_344_vm_create_in_pool_invalid_volume(self, storage_mock): storage_mock.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool:invalid=test') self.assertNotIn('test-vm2', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_345_vm_create_in_pool_app_root(self, storage_mock): # setting custom pool for 'root' volume of AppVM should not be # allowed - this volume belongs to the template storage_mock.side_effect = self.dummy_coro with self.assertRaises(qubes.exc.QubesException): self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool:root=test') self.assertNotIn('test-vm2', self.app.domains) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.create') def test_346_vm_create_in_pool_duplicate_pool(self, storage_mock): # setting custom pool for 'root' volume of AppVM should not be # allowed - this volume belongs to the template storage_mock.side_effect = self.dummy_coro with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.CreateInPool.AppVM', b'dom0', b'test-template', b'name=test-vm2 label=red ' b'pool=test pool:root=test') self.assertNotIn('test-vm2', self.app.domains) self.assertFalse(self.app.save.called) def test_400_property_list(self): # actual function tested for admin.vm.property.* already # this test is kind of stupid, but at least check if appropriate # admin-permission event is fired value = self.call_mgmt_func(b'admin.property.List', b'dom0') properties = self.app.property_list() self.assertEqual(value, ''.join('{}\n'.format(prop.__name__) for prop in properties)) def test_410_property_get_str(self): # actual function tested for admin.vm.property.* already value = self.call_mgmt_func(b'admin.property.Get', b'dom0', b'default_kernel') self.assertEqual(value, 'default=False type=str 1.0') def test_420_propert_set_str(self): # actual function tested for admin.vm.property.* already with unittest.mock.patch('qubes.property.__set__') as mock: value = self.call_mgmt_func(b'admin.property.Set', b'dom0', b'default_kernel', b'1.0') self.assertIsNone(value) mock.assert_called_once_with(self.app, '1.0') self.app.save.assert_called_once_with() def test_440_property_help(self): # actual function tested for admin.vm.property.* already value = self.call_mgmt_func(b'admin.property.Help', b'dom0', b'clockvm') self.assertEqual(value, 'Which VM to use as NTP proxy for updating AdminVM') self.assertFalse(self.app.save.called) def test_450_property_reset(self): # actual function tested for admin.vm.property.* already with unittest.mock.patch('qubes.property.__delete__') as mock: value = self.call_mgmt_func(b'admin.property.Reset', b'dom0', b'clockvm') mock.assert_called_with(self.app) self.assertIsNone(value) self.app.save.assert_called_once_with() def device_list_testclass(self, vm, event): if vm is not self.vm: return dev = qubes.devices.DeviceInfo(self.vm, '1234') dev.description = 'Some device' dev.extra_prop = 'xx' yield dev dev = qubes.devices.DeviceInfo(self.vm, '4321') dev.description = 'Some other device' yield dev def test_460_vm_device_available(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) value = self.call_mgmt_func(b'admin.vm.device.testclass.Available', b'test-vm1') self.assertEqual(value, '1234 extra_prop=xx description=Some ' 'device\n' '4321 description=Some other device\n') self.assertFalse(self.app.save.called) def test_461_vm_device_available_specific(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) value = self.call_mgmt_func(b'admin.vm.device.testclass.Available', b'test-vm1', b'4321') self.assertEqual(value, '4321 description=Some other device\n') self.assertFalse(self.app.save.called) def test_462_vm_device_available_invalid(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) value = self.call_mgmt_func(b'admin.vm.device.testclass.Available', b'test-vm1', b'no-such-device') self.assertEqual(value, '') self.assertFalse(self.app.save.called) def test_470_vm_device_list_persistent(self): assignment = qubes.devices.DeviceAssignment(self.vm, '1234', persistent=True) self.loop.run_until_complete( self.vm.devices['testclass'].attach(assignment)) value = self.call_mgmt_func(b'admin.vm.device.testclass.List', b'test-vm1') self.assertEqual(value, 'test-vm1+1234 persistent=yes\n') self.assertFalse(self.app.save.called) def test_471_vm_device_list_persistent_options(self): assignment = qubes.devices.DeviceAssignment(self.vm, '1234', persistent=True, options={'opt1': 'value'}) self.loop.run_until_complete( self.vm.devices['testclass'].attach(assignment)) assignment = qubes.devices.DeviceAssignment(self.vm, '4321', persistent=True) self.loop.run_until_complete( self.vm.devices['testclass'].attach(assignment)) value = self.call_mgmt_func(b'admin.vm.device.testclass.List', b'test-vm1') self.assertEqual(value, 'test-vm1+1234 opt1=value persistent=yes\n' 'test-vm1+4321 persistent=yes\n') self.assertFalse(self.app.save.called) def device_list_attached_testclass(self, vm, event, **kwargs): if vm is not self.vm: return dev = qubes.devices.DeviceInfo(self.vm, '1234') yield (dev, {'attach_opt': 'value'}) def test_472_vm_device_list_temporary(self): self.vm.add_handler('device-list-attached:testclass', self.device_list_attached_testclass) value = self.call_mgmt_func(b'admin.vm.device.testclass.List', b'test-vm1') self.assertEqual(value, 'test-vm1+1234 attach_opt=value persistent=no\n') self.assertFalse(self.app.save.called) def test_473_vm_device_list_mixed(self): self.vm.add_handler('device-list-attached:testclass', self.device_list_attached_testclass) assignment = qubes.devices.DeviceAssignment(self.vm, '4321', persistent=True) self.loop.run_until_complete( self.vm.devices['testclass'].attach(assignment)) value = self.call_mgmt_func(b'admin.vm.device.testclass.List', b'test-vm1') self.assertEqual(value, 'test-vm1+1234 attach_opt=value persistent=no\n' 'test-vm1+4321 persistent=yes\n') self.assertFalse(self.app.save.called) def test_474_vm_device_list_specific(self): self.vm.add_handler('device-list-attached:testclass', self.device_list_attached_testclass) assignment = qubes.devices.DeviceAssignment(self.vm, '4321', persistent=True) self.loop.run_until_complete( self.vm.devices['testclass'].attach(assignment)) value = self.call_mgmt_func(b'admin.vm.device.testclass.List', b'test-vm1', b'test-vm1+1234') self.assertEqual(value, 'test-vm1+1234 attach_opt=value persistent=no\n') self.assertFalse(self.app.save.called) def test_480_vm_device_attach(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) mock_attach = unittest.mock.Mock() mock_attach.return_value = None del mock_attach._is_coroutine self.vm.add_handler('device-attach:testclass', mock_attach) with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM, 'is_halted', lambda _: False): value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach', b'test-vm1', b'test-vm1+1234') self.assertIsNone(value) mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass', device=self.vm.devices['testclass']['1234'], options={}) self.assertEqual(len(self.vm.devices['testclass'].persistent()), 0) self.app.save.assert_called_once_with() def test_481_vm_device_attach(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) mock_attach = unittest.mock.Mock() mock_attach.return_value = None del mock_attach._is_coroutine self.vm.add_handler('device-attach:testclass', mock_attach) with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM, 'is_halted', lambda _: False): value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach', b'test-vm1', b'test-vm1+1234', b'persistent=no') self.assertIsNone(value) mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass', device=self.vm.devices['testclass']['1234'], options={}) self.assertEqual(len(self.vm.devices['testclass'].persistent()), 0) self.app.save.assert_called_once_with() def test_482_vm_device_attach_not_running(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) mock_attach = unittest.mock.Mock() del mock_attach._is_coroutine self.vm.add_handler('device-attach:testclass', mock_attach) with self.assertRaises(qubes.exc.QubesVMNotRunningError): self.call_mgmt_func(b'admin.vm.device.testclass.Attach', b'test-vm1', b'test-vm1+1234') self.assertFalse(mock_attach.called) self.assertEqual(len(self.vm.devices['testclass'].persistent()), 0) self.assertFalse(self.app.save.called) def test_483_vm_device_attach_persistent(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) mock_attach = unittest.mock.Mock() mock_attach.return_value = None del mock_attach._is_coroutine self.vm.add_handler('device-attach:testclass', mock_attach) with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM, 'is_halted', lambda _: False): value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach', b'test-vm1', b'test-vm1+1234', b'persistent=yes') self.assertIsNone(value) dev = self.vm.devices['testclass']['1234'] mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass', device=dev, options={}) self.assertIn(dev, self.vm.devices['testclass'].persistent()) self.app.save.assert_called_once_with() def test_484_vm_device_attach_persistent_not_running(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) mock_attach = unittest.mock.Mock() mock_attach.return_value = None del mock_attach._is_coroutine self.vm.add_handler('device-attach:testclass', mock_attach) value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach', b'test-vm1', b'test-vm1+1234', b'persistent=yes') self.assertIsNone(value) dev = self.vm.devices['testclass']['1234'] mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass', device=dev, options={}) self.assertIn(dev, self.vm.devices['testclass'].persistent()) self.app.save.assert_called_once_with() def test_485_vm_device_attach_options(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) mock_attach = unittest.mock.Mock() mock_attach.return_value = None del mock_attach._is_coroutine self.vm.add_handler('device-attach:testclass', mock_attach) with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM, 'is_halted', lambda _: False): value = self.call_mgmt_func(b'admin.vm.device.testclass.Attach', b'test-vm1', b'test-vm1+1234', b'option1=value2') self.assertIsNone(value) dev = self.vm.devices['testclass']['1234'] mock_attach.assert_called_once_with(self.vm, 'device-attach:testclass', device=dev, options={'option1': 'value2'}) self.app.save.assert_called_once_with() def test_490_vm_device_detach(self): self.vm.add_handler('device-list:testclass', self.device_list_testclass) self.vm.add_handler('device-list-attached:testclass', self.device_list_attached_testclass) mock_detach = unittest.mock.Mock() mock_detach.return_value = None del mock_detach._is_coroutine self.vm.add_handler('device-detach:testclass', mock_detach) with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM, 'is_halted', lambda _: False): value = self.call_mgmt_func(b'admin.vm.device.testclass.Detach', b'test-vm1', b'test-vm1+1234') self.assertIsNone(value) mock_detach.assert_called_once_with(self.vm, 'device-detach:testclass', device=self.vm.devices['testclass']['1234']) self.app.save.assert_called_once_with() def test_491_vm_device_detach_not_attached(self): mock_detach = unittest.mock.Mock() mock_detach.return_value = None del mock_detach._is_coroutine self.vm.add_handler('device-detach:testclass', mock_detach) with unittest.mock.patch.object(qubes.vm.qubesvm.QubesVM, 'is_halted', lambda _: False): with self.assertRaises(qubes.devices.DeviceNotAttached): self.call_mgmt_func(b'admin.vm.device.testclass.Detach', b'test-vm1', b'test-vm1+1234') self.assertFalse(mock_detach.called) self.assertFalse(self.app.save.called) @unittest.mock.patch('qubes.storage.Storage.remove') @unittest.mock.patch('shutil.rmtree') def test_500_vm_remove(self, mock_rmtree, mock_remove): value = self.call_mgmt_func(b'admin.vm.Remove', b'test-vm1') self.assertIsNone(value) mock_rmtree.assert_called_once_with( '/tmp/qubes-test-dir/appvms/test-vm1') mock_remove.assert_called_once_with() self.app.save.assert_called_once_with() @unittest.mock.patch('qubes.storage.Storage.remove') @unittest.mock.patch('shutil.rmtree') def test_501_vm_remove_running(self, mock_rmtree, mock_remove): with unittest.mock.patch.object( self.vm, 'get_power_state', lambda: 'Running'): with self.assertRaises(qubes.exc.QubesVMNotHaltedError): self.call_mgmt_func(b'admin.vm.Remove', b'test-vm1') self.assertFalse(mock_rmtree.called) self.assertFalse(mock_remove.called) self.assertFalse(self.app.save.called) def test_510_vm_volume_import(self): value = self.call_mgmt_func(b'admin.vm.volume.Import', b'test-vm1', b'private') self.assertEqual(value, '{} {}'.format( 2*2**30, '/tmp/qubes-test-dir/appvms/test-vm1/private.img')) self.assertFalse(self.app.save.called) def test_511_vm_volume_import_running(self): with unittest.mock.patch.object( self.vm, 'get_power_state', lambda: 'Running'): with self.assertRaises(qubes.exc.QubesVMNotHaltedError): self.call_mgmt_func(b'admin.vm.volume.Import', b'test-vm1', b'private') def setup_for_clone(self): self.pool = unittest.mock.MagicMock() self.app.pools['test'] = self.pool self.vm2 = self.app.add_new_vm('AppVM', label='red', name='test-vm2', template='test-template', kernel='') self.pool.configure_mock(**{ 'volumes': qubes.storage.VolumesCollection(self.pool), 'init_volume.return_value.pool': self.pool, '__str__.return_value': 'test', 'get_volume.side_effect': (lambda vid: self.vm.volumes['private'] if vid is self.vm.volumes['private'].vid else self.vm2.volumes['private'] ), }) self.loop.run_until_complete( self.vm.create_on_disk(pool='test')) self.loop.run_until_complete( self.vm2.create_on_disk(pool='test')) # the call replaces self.vm.volumes[...] with result of import # operation - make sure it stays as the same object self.vm.volumes['private'].import_volume.return_value = \ self.vm.volumes['private'] self.vm2.volumes['private'].import_volume.return_value = \ self.vm2.volumes['private'] def test_520_vm_volume_clone(self): self.setup_for_clone() token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom', b'test-vm1', b'private', b'') # token self.assertEqual(len(token), 32) self.assertFalse(self.app.save.called) value = self.call_mgmt_func(b'admin.vm.volume.CloneTo', b'test-vm2', b'private', token.encode()) self.assertIsNone(value) self.vm2.volumes['private'].import_volume.assert_called_once_with( self.vm.volumes['private'] ) self.vm2.volumes['private'].import_volume.assert_called_once_with( self.vm2.volumes['private'] ) self.app.save.assert_called_once_with() def test_521_vm_volume_clone_invalid_volume(self): self.setup_for_clone() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.CloneFrom', b'test-vm1', b'private123', None) self.assertNotIn('init_volume().import_volume', map(operator.itemgetter(0), self.pool.mock_calls)) self.assertFalse(self.app.save.called) def test_522_vm_volume_clone_invalid_volume2(self): self.setup_for_clone() token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom', b'test-vm1', b'private', b'') with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.CloneTo', b'test-vm1', b'private123', token.encode()) self.assertNotIn('init_volume().import_volume', map(operator.itemgetter(0), self.pool.mock_calls)) self.assertFalse(self.app.save.called) def test_523_vm_volume_clone_removed_volume(self): self.setup_for_clone() token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom', b'test-vm1', b'private', b'') def get_volume(vid): if vid == self.vm.volumes['private']: raise KeyError(vid) else: return unittest.mock.DEFAULT self.pool.get_volume.side_effect = get_volume with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.CloneTo', b'test-vm1', b'private', token.encode()) self.assertNotIn('init_volume().import_volume', map(operator.itemgetter(0), self.pool.mock_calls)) self.assertFalse(self.app.save.called) def test_524_vm_volume_clone_invlid_token(self): self.setup_for_clone() with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.volume.CloneTo', b'test-vm1', b'private', b'no-such-token') self.assertNotIn('init_volume().import_volume', map(operator.itemgetter(0), self.pool.mock_calls)) self.assertFalse(self.app.save.called) def test_530_tag_list(self): self.vm.tags.add('tag1') self.vm.tags.add('tag2') value = self.call_mgmt_func(b'admin.vm.tag.List', b'test-vm1') self.assertEqual(value, 'tag1\ntag2\n') self.assertFalse(self.app.save.called) def test_540_tag_get(self): self.vm.tags.add('tag1') value = self.call_mgmt_func(b'admin.vm.tag.Get', b'test-vm1', b'tag1') self.assertEqual(value, '1') self.assertFalse(self.app.save.called) def test_541_tag_get_absent(self): value = self.call_mgmt_func(b'admin.vm.tag.Get', b'test-vm1', b'tag1') self.assertEqual(value, '0') self.assertFalse(self.app.save.called) def test_550_tag_remove(self): self.vm.tags.add('tag1') value = self.call_mgmt_func(b'admin.vm.tag.Remove', b'test-vm1', b'tag1') self.assertIsNone(value, None) self.assertNotIn('tag1', self.vm.tags) self.assertTrue(self.app.save.called) def test_551_tag_remove_absent(self): with self.assertRaises(qubes.exc.QubesTagNotFoundError): self.call_mgmt_func(b'admin.vm.tag.Remove', b'test-vm1', b'tag1') self.assertFalse(self.app.save.called) def test_560_tag_set(self): value = self.call_mgmt_func(b'admin.vm.tag.Set', b'test-vm1', b'tag1') self.assertIsNone(value) self.assertIn('tag1', self.vm.tags) self.assertTrue(self.app.save.called) def test_561_tag_set_invalid(self): with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.tag.Set', b'test-vm1', b'+.some-tag') self.assertNotIn('+.some-tag', self.vm.tags) self.assertFalse(self.app.save.called) def test_570_firewall_get(self): self.vm.firewall.save = unittest.mock.Mock() value = self.call_mgmt_func(b'admin.vm.firewall.Get', b'test-vm1', b'') self.assertEqual(value, 'action=accept\n') self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_571_firewall_get_non_default(self): self.vm.firewall.save = unittest.mock.Mock() self.vm.firewall.rules = [ qubes.firewall.Rule(action='accept', proto='tcp', dstports='1-1024'), qubes.firewall.Rule(action='drop', proto='icmp', comment='No ICMP'), qubes.firewall.Rule(action='drop', proto='udp', expire='1499450306'), qubes.firewall.Rule(action='accept'), ] value = self.call_mgmt_func(b'admin.vm.firewall.Get', b'test-vm1', b'') self.assertEqual(value, 'action=accept proto=tcp dstports=1-1024\n' 'action=drop proto=icmp comment=No ICMP\n' 'action=drop expire=1499450306 proto=udp\n' 'action=accept\n') self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_580_firewall_set_simple(self): self.vm.firewall.save = unittest.mock.Mock() value = self.call_mgmt_func(b'admin.vm.firewall.Set', b'test-vm1', b'', b'action=accept\n') self.assertEqual(self.vm.firewall.rules, ['action=accept']) self.assertTrue(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_581_firewall_set_multi(self): self.vm.firewall.save = unittest.mock.Mock() rules = [ qubes.firewall.Rule(action='accept', proto='tcp', dstports='1-1024'), qubes.firewall.Rule(action='drop', proto='icmp', comment='No ICMP'), qubes.firewall.Rule(action='drop', proto='udp', expire='1499450306'), qubes.firewall.Rule(action='accept'), ] rules_txt = ( 'action=accept proto=tcp dstports=1-1024\n' 'action=drop proto=icmp comment=No ICMP\n' 'action=drop expire=1499450306 proto=udp\n' 'action=accept\n') value = self.call_mgmt_func(b'admin.vm.firewall.Set', b'test-vm1', b'', rules_txt.encode()) self.assertEqual(self.vm.firewall.rules, rules) self.assertTrue(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_582_firewall_set_invalid(self): self.vm.firewall.save = unittest.mock.Mock() rules_txt = ( 'action=accept protoxyz=tcp dst4=127.0.0.1\n' 'action=drop\n') with self.assertRaises(ValueError): self.call_mgmt_func(b'admin.vm.firewall.Set', b'test-vm1', b'', rules_txt.encode()) self.assertEqual(self.vm.firewall.rules, [qubes.firewall.Rule(action='accept')]) self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_583_firewall_set_invalid(self): self.vm.firewall.save = unittest.mock.Mock() rules_txt = ( 'proto=tcp dstports=1-1024\n' 'action=drop\n') with self.assertRaises(AssertionError): self.call_mgmt_func(b'admin.vm.firewall.Set', b'test-vm1', b'', rules_txt.encode()) self.assertEqual(self.vm.firewall.rules, [qubes.firewall.Rule(action='accept')]) self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_584_firewall_set_invalid(self): self.vm.firewall.save = unittest.mock.Mock() rules_txt = ( 'action=accept proto=tcp dstports=1-1024 ' 'action=drop\n') with self.assertRaises(ValueError): self.call_mgmt_func(b'admin.vm.firewall.Set', b'test-vm1', b'', rules_txt.encode()) self.assertEqual(self.vm.firewall.rules, [qubes.firewall.Rule(action='accept')]) self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_585_firewall_set_invalid(self): self.vm.firewall.save = unittest.mock.Mock() rules_txt = ( 'action=accept dstports=1-1024 comment=ążźł\n' 'action=drop\n') with self.assertRaises(UnicodeDecodeError): self.call_mgmt_func(b'admin.vm.firewall.Set', b'test-vm1', b'', rules_txt.encode()) self.assertEqual(self.vm.firewall.rules, [qubes.firewall.Rule(action='accept')]) self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_590_firewall_reload(self): self.vm.firewall.save = unittest.mock.Mock() self.app.domains['test-vm1'].fire_event = self.emitter.fire_event value = self.call_mgmt_func(b'admin.vm.firewall.Reload', b'test-vm1', b'') self.assertIsNone(value) self.assertEventFired(self.emitter, 'firewall-changed') self.assertFalse(self.vm.firewall.save.called) self.assertFalse(self.app.save.called) def test_990_vm_unexpected_payload(self): methods_with_no_payload = [ b'admin.vm.List', b'admin.vm.Remove', b'admin.vm.property.List', b'admin.vm.property.Get', b'admin.vm.property.Help', b'admin.vm.property.HelpRst', b'admin.vm.property.Reset', b'admin.vm.feature.List', b'admin.vm.feature.Get', b'admin.vm.feature.CheckWithTemplate', b'admin.vm.feature.Remove', b'admin.vm.tag.List', b'admin.vm.tag.Get', b'admin.vm.tag.Remove', b'admin.vm.tag.Set', b'admin.vm.firewall.Get', b'admin.vm.firewall.Reload', b'admin.vm.device.pci.Attach', b'admin.vm.device.pci.Detach', b'admin.vm.device.pci.List', b'admin.vm.device.pci.Available', b'admin.vm.microphone.Attach', b'admin.vm.microphone.Detach', b'admin.vm.microphone.Status', b'admin.vm.volume.ListSnapshots', b'admin.vm.volume.List', b'admin.vm.volume.Info', b'admin.vm.Start', b'admin.vm.Shutdown', b'admin.vm.Pause', b'admin.vm.Unpause', b'admin.vm.Kill', b'admin.Events', b'admin.vm.feature.List', b'admin.vm.feature.Get', b'admin.vm.feature.Remove', b'admin.vm.feature.CheckWithTemplate', ] # make sure also no methods on actual VM gets called vm_mock = unittest.mock.MagicMock() vm_mock.name = self.vm.name vm_mock.qid = self.vm.qid vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) self.app.domains._dict[self.vm.qid] = vm_mock for method in methods_with_no_payload: # should reject payload regardless of having argument or not with self.subTest(method.decode('ascii')): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'', b'unexpected-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+arg'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'some-arg', b'unexpected-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) def test_991_vm_unexpected_argument(self): methods_with_no_argument = [ b'admin.vm.List', b'admin.vm.Clone', b'admin.vm.Remove', b'admin.vm.property.List', b'admin.vm.feature.List', b'admin.vm.tag.List', b'admin.vm.firewall.Get', b'admin.vm.firewall.Set', b'admin.vm.firewall.Reload', b'admin.vm.microphone.Attach', b'admin.vm.microphone.Detach', b'admin.vm.microphone.Status', b'admin.vm.volume.List', b'admin.vm.Start', b'admin.vm.Shutdown', b'admin.vm.Pause', b'admin.vm.Unpause', b'admin.vm.Kill', b'admin.Events', b'admin.vm.feature.List', ] # make sure also no methods on actual VM gets called vm_mock = unittest.mock.MagicMock() vm_mock.name = self.vm.name vm_mock.qid = self.vm.qid vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) self.app.domains._dict[self.vm.qid] = vm_mock for method in methods_with_no_argument: # should reject argument regardless of having payload or not with self.subTest(method.decode('ascii')): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'some-arg', b'') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+payload'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'unexpected-arg', b'some-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) def test_992_dom0_unexpected_payload(self): methods_with_no_payload = [ b'admin.vmclass.List', b'admin.vm.List', b'admin.label.List', b'admin.label.Get', b'admin.label.Remove', b'admin.property.List', b'admin.property.Get', b'admin.property.Help', b'admin.property.HelpRst', b'admin.property.Reset', b'admin.pool.List', b'admin.pool.ListDrivers', b'admin.pool.Info', b'admin.pool.Remove', b'admin.backup.Execute', b'admin.Events', ] # make sure also no methods on actual VM gets called vm_mock = unittest.mock.MagicMock() vm_mock.name = self.vm.name vm_mock.qid = self.vm.qid vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) self.app.domains._dict[self.vm.qid] = vm_mock for method in methods_with_no_payload: # should reject payload regardless of having argument or not with self.subTest(method.decode('ascii')): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'', b'unexpected-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+arg'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'some-arg', b'unexpected-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) def test_993_dom0_unexpected_argument(self): methods_with_no_argument = [ b'admin.vmclass.List', b'admin.vm.List', b'admin.label.List', b'admin.property.List', b'admin.pool.List', b'admin.pool.ListDrivers', b'admin.Events', ] # make sure also no methods on actual VM gets called vm_mock = unittest.mock.MagicMock() vm_mock.name = self.vm.name vm_mock.qid = self.vm.qid vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) self.app.domains._dict[self.vm.qid] = vm_mock for method in methods_with_no_argument: # should reject argument regardless of having payload or not with self.subTest(method.decode('ascii')): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'some-arg', b'') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+payload'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'unexpected-arg', b'some-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) def test_994_dom0_only_calls(self): # TODO set some better arguments, to make sure the call was rejected # because of invalid destination, not invalid arguments methods_for_dom0_only = [ b'admin.vmclass.List', b'admin.vm.Create.AppVM', b'admin.vm.CreateInPool.AppVM', b'admin.vm.CreateTemplate', b'admin.label.List', b'admin.label.Create', b'admin.label.Get', b'admin.label.Remove', b'admin.property.List', b'admin.property.Get', b'admin.property.Set', b'admin.property.Help', b'admin.property.HelpRst', b'admin.property.Reset', b'admin.pool.List', b'admin.pool.ListDrivers', b'admin.pool.Info', b'admin.pool.Add', b'admin.pool.Remove', b'admin.pool.volume.List', b'admin.pool.volume.Info', b'admin.pool.volume.ListSnapshots', b'admin.pool.volume.Snapshot', b'admin.pool.volume.Revert', b'admin.pool.volume.Resize', b'admin.backup.Execute', b'admin.backup.Info', b'admin.backup.Restore', ] # make sure also no methods on actual VM gets called vm_mock = unittest.mock.MagicMock() vm_mock.name = self.vm.name vm_mock.qid = self.vm.qid vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) self.app.domains._dict[self.vm.qid] = vm_mock for method in methods_for_dom0_only: # should reject call regardless of having payload or not with self.subTest(method.decode('ascii')): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'', b'') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+arg'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'some-arg', b'') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+payload'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'', b'payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+arg+payload'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'test-vm1', b'some-arg', b'some-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) @unittest.skip('undecided') def test_995_vm_only_calls(self): # XXX is it really a good idea to prevent those calls this early? # TODO set some better arguments, to make sure the call was rejected # because of invalid destination, not invalid arguments methods_for_vm_only = [ b'admin.vm.Clone', b'admin.vm.Remove', b'admin.vm.property.List', b'admin.vm.property.Get', b'admin.vm.property.Set', b'admin.vm.property.Help', b'admin.vm.property.HelpRst', b'admin.vm.property.Reset', b'admin.vm.feature.List', b'admin.vm.feature.Get', b'admin.vm.feature.Set', b'admin.vm.feature.CheckWithTemplate', b'admin.vm.feature.Remove', b'admin.vm.tag.List', b'admin.vm.tag.Get', b'admin.vm.tag.Remove', b'admin.vm.tag.Set', b'admin.vm.firewall.Get', b'admin.vm.firewall.Set', b'admin.vm.firewall.Reload', b'admin.vm.device.pci.Attach', b'admin.vm.device.pci.Detach', b'admin.vm.device.pci.List', b'admin.vm.device.pci.Available', b'admin.vm.microphone.Attach', b'admin.vm.microphone.Detach', b'admin.vm.microphone.Status', b'admin.vm.volume.ListSnapshots', b'admin.vm.volume.List', b'admin.vm.volume.Info', b'admin.vm.volume.Revert', b'admin.vm.volume.Resize', b'admin.vm.Start', b'admin.vm.Shutdown', b'admin.vm.Pause', b'admin.vm.Unpause', b'admin.vm.Kill', b'admin.vm.feature.List', b'admin.vm.feature.Get', b'admin.vm.feature.Set', b'admin.vm.feature.Remove', b'admin.vm.feature.CheckWithTemplate', ] # make sure also no methods on actual VM gets called vm_mock = unittest.mock.MagicMock() vm_mock.name = self.vm.name vm_mock.qid = self.vm.qid vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) self.app.domains._dict[self.vm.qid] = vm_mock for method in methods_for_vm_only: # should reject payload regardless of having argument or not # should reject call regardless of having payload or not with self.subTest(method.decode('ascii')): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'', b'') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+arg'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'some-arg', b'') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+payload'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'', b'payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called) with self.subTest(method.decode('ascii') + '+arg+payload'): with self.assertRaises(AssertionError): self.call_mgmt_func(method, b'dom0', b'some-arg', b'some-payload') self.assertFalse(vm_mock.called) self.assertFalse(self.app.save.called)