diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 010603d..71d0902 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -38,6 +38,7 @@ import qubesadmin.storage import qubesadmin.utils import qubesadmin.vm import qubesadmin.config +import qubesadmin.devices class VMCollection(object): @@ -316,7 +317,9 @@ class QubesBase(qubesadmin.base.PropertyHolder): return self.domains[name] def clone_vm(self, src_vm, new_name, new_cls=None, pool=None, pools=None, - ignore_errors=False, ignore_volumes=None): + ignore_errors=False, ignore_volumes=None, + ignore_devices=False): + # pylint: disable=too-many-statements """Clone Virtual Machine Example usage with custom storage pools: @@ -337,6 +340,7 @@ class QubesBase(qubesadmin.base.PropertyHolder): logged, or abort the whole operation? :param list ignore_volumes: do not clone volumes on this list, like 'private' or 'root' + :param bool ignore_devices: if True, do not copy device assignments :return new VM object """ @@ -472,6 +476,22 @@ class QubesBase(qubesadmin.base.PropertyHolder): del self.domains[dst_vm.name] raise + if not ignore_devices: + try: + for devclass in src_vm.devices: + for assignment in src_vm.devices[devclass].assignments( + persistent=True): + new_assignment = qubesadmin.devices.DeviceAssignment( + backend_domain=assignment.backend_domain, + ident=assignment.ident, + options=assignment.options, + persistent=assignment.persistent) + dst_vm.devices[devclass].attach(new_assignment) + except qubesadmin.exc.QubesException: + if not ignore_errors: + del self.domains[dst_vm.name] + raise + return dst_vm def qubesd_call(self, dest, method, arg=None, payload=None, diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index ed9942a..c141dce 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -25,12 +25,8 @@ import unittest import multiprocessing -try: - import unittest.mock as mock - from unittest.mock import call -except ImportError: - import mock - from mock import call +import unittest.mock as mock +from unittest.mock import call import tempfile @@ -364,6 +360,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): b'test-vm class=AppVM state=Halted\n' \ b'test-template class=TemplateVM state=Halted\n' \ b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm('test-vm', 'new-name') @@ -384,6 +382,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): b'test-net class=AppVM state=Halted\n' self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() @@ -403,6 +403,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): b'test-vm class=AppVM state=Halted\n' \ b'test-template class=TemplateVM state=Halted\n' \ b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() @@ -423,6 +425,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): b'test-vm class=AppVM state=Halted\n' \ b'test-template class=TemplateVM state=Halted\n' \ b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', pools={'private': 'some-pool', 'volatile': 'other-pool'}) self.assertEqual(new_vm.name, 'new-name') @@ -454,6 +458,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \ b'0\x00' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', new_cls='StandaloneVM') self.assertEqual(new_vm.name, 'new-name') @@ -505,6 +511,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() @@ -521,6 +529,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \ b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() @@ -537,6 +547,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \ b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() @@ -554,6 +566,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): ('new-name', 'admin.vm.firewall.Set', None, b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \ b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() @@ -624,6 +638,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', 'test-template', b'name=new-name label=red pool:private=another')]\ = b'0\x00' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' new_vm = self.app.clone_vm('test-vm', 'new-name') self.assertEqual(new_vm.name, 'new-name') self.check_output_mock.assert_called_once_with( @@ -633,6 +649,78 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): ) self.assertAllCalled() + def test_043_clone_devices(self): + self.clone_setup_common_calls('test-vm', 'new-name') + + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-vm2 class=AppVM state=Halted\n' \ + b'test-vm3 class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = \ + b'0\0pci\n' + + self.app.expected_calls[ + ('test-vm', 'admin.vm.device.pci.List', None, None)] = \ + b'0\0test-vm2+dev1 ro=True\n' \ + b'test-vm3+dev2 persistent=True\n' + + self.app.expected_calls[ + ('new-name', 'admin.vm.device.pci.Attach', 'test-vm3+dev2', + b'persistent=True')] = b'0\0' + + new_vm = self.app.clone_vm('test-vm', 'new-name') + self.assertEqual(new_vm.name, 'new-name') + self.check_output_mock.assert_called_once_with( + ['qvm-appmenus', '--init', '--update', + '--source', 'test-vm', 'new-name'], + stderr=subprocess.STDOUT + ) + self.assertAllCalled() + + def test_044_clone_devices_fail(self): + self.clone_setup_common_calls('test-vm', 'new-name') + + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-vm2 class=AppVM state=Halted\n' \ + b'test-vm3 class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = \ + b'0\0pci\n' + + self.app.expected_calls[ + ('test-vm', 'admin.vm.device.pci.List', None, None)] = \ + b'0\0test-vm2+dev1 ro=True\n' \ + b'test-vm3+dev2 persistent=True\n' + + self.app.expected_calls[ + ('new-name', 'admin.vm.device.pci.Attach', 'test-vm3+dev2', + b'persistent=True')] = \ + b'2\0QubesException\0\0something happened\0' + + self.app.expected_calls[ + ('new-name', 'admin.vm.Remove', None, None)] = b'0\0' + + with self.assertRaises(qubesadmin.exc.QubesException): + new_vm = self.app.clone_vm('test-vm', 'new-name') + + self.assertAllCalled() + class TC_20_QubesLocal(unittest.TestCase): def setUp(self): diff --git a/qubesadmin/tests/tools/qvm_create.py b/qubesadmin/tests/tools/qvm_create.py index 930a9ae..4c5e665 100644 --- a/qubesadmin/tests/tools/qvm_create.py +++ b/qubesadmin/tests/tools/qvm_create.py @@ -310,6 +310,8 @@ class TC_00_qvm_create(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('new-vm', 'admin.vm.volume.CloneTo', 'root', b'clone-cookie')] = \ b'0\0' + self.app.expected_calls[ + ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0' qubesadmin.tools.qvm_create.main(['-C', 'StandaloneVM', '-t', 'template', '-l', 'red', 'new-vm'], app=self.app)