Merge remote-tracking branch 'origin/pr/136'

* origin/pr/136:
  Added tests for cloning VMs with persistent devices
  Remove Python 2 workarounds
  Cloning a vm now clones persistent PCI device assignments
This commit is contained in:
Marek Marczykowski-Górecki 2020-04-09 05:24:13 +02:00
commit c80314523f
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 117 additions and 7 deletions

View File

@ -38,6 +38,7 @@ import qubesadmin.storage
import qubesadmin.utils
import qubesadmin.vm
import qubesadmin.config
import qubesadmin.devices
class VMCollection(object):
@ -316,7 +317,9 @@ class QubesBase(qubesadmin.base.PropertyHolder):
return self.domains[name]
def clone_vm(self, src_vm, new_name, new_cls=None, pool=None, pools=None,
ignore_errors=False, ignore_volumes=None):
ignore_errors=False, ignore_volumes=None,
ignore_devices=False):
# pylint: disable=too-many-statements
"""Clone Virtual Machine
Example usage with custom storage pools:
@ -337,6 +340,7 @@ class QubesBase(qubesadmin.base.PropertyHolder):
logged, or abort the whole operation?
:param list ignore_volumes: do not clone volumes on this list,
like 'private' or 'root'
:param bool ignore_devices: if True, do not copy device assignments
:return new VM object
"""
@ -472,6 +476,22 @@ class QubesBase(qubesadmin.base.PropertyHolder):
del self.domains[dst_vm.name]
raise
if not ignore_devices:
try:
for devclass in src_vm.devices:
for assignment in src_vm.devices[devclass].assignments(
persistent=True):
new_assignment = qubesadmin.devices.DeviceAssignment(
backend_domain=assignment.backend_domain,
ident=assignment.ident,
options=assignment.options,
persistent=assignment.persistent)
dst_vm.devices[devclass].attach(new_assignment)
except qubesadmin.exc.QubesException:
if not ignore_errors:
del self.domains[dst_vm.name]
raise
return dst_vm
def qubesd_call(self, dest, method, arg=None, payload=None,

View File

@ -25,12 +25,8 @@ import unittest
import multiprocessing
try:
import unittest.mock as mock
from unittest.mock import call
except ImportError:
import mock
from mock import call
import tempfile
@ -364,6 +360,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
new_vm = self.app.clone_vm('test-vm', 'new-name')
@ -384,6 +382,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
@ -403,6 +403,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
@ -423,6 +425,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
b'test-vm class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name',
pools={'private': 'some-pool', 'volatile': 'other-pool'})
self.assertEqual(new_vm.name, 'new-name')
@ -454,6 +458,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \
b'0\x00'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name',
new_cls='StandaloneVM')
self.assertEqual(new_vm.name, 'new-name')
@ -505,6 +511,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
@ -521,6 +529,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
@ -537,6 +547,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
@ -554,6 +566,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
('new-name', 'admin.vm.firewall.Set', None,
b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
self.assertEqual(new_vm.name, 'new-name')
self.assertAllCalled()
@ -624,6 +638,8 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
'test-template', b'name=new-name label=red pool:private=another')]\
= b'0\x00'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name')
self.assertEqual(new_vm.name, 'new-name')
self.check_output_mock.assert_called_once_with(
@ -633,6 +649,78 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
)
self.assertAllCalled()
def test_043_clone_devices(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-vm2 class=AppVM state=Halted\n' \
b'test-vm3 class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = \
b'0\0pci\n'
self.app.expected_calls[
('test-vm', 'admin.vm.device.pci.List', None, None)] = \
b'0\0test-vm2+dev1 ro=True\n' \
b'test-vm3+dev2 persistent=True\n'
self.app.expected_calls[
('new-name', 'admin.vm.device.pci.Attach', 'test-vm3+dev2',
b'persistent=True')] = b'0\0'
new_vm = self.app.clone_vm('test-vm', 'new-name')
self.assertEqual(new_vm.name, 'new-name')
self.check_output_mock.assert_called_once_with(
['qvm-appmenus', '--init', '--update',
'--source', 'test-vm', 'new-name'],
stderr=subprocess.STDOUT
)
self.assertAllCalled()
def test_044_clone_devices_fail(self):
self.clone_setup_common_calls('test-vm', 'new-name')
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Halted\n' \
b'test-vm class=AppVM state=Halted\n' \
b'test-vm2 class=AppVM state=Halted\n' \
b'test-vm3 class=AppVM state=Halted\n' \
b'test-template class=TemplateVM state=Halted\n' \
b'test-net class=AppVM state=Halted\n'
self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
'test-template', b'name=new-name label=red')] = b'0\x00'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = \
b'0\0pci\n'
self.app.expected_calls[
('test-vm', 'admin.vm.device.pci.List', None, None)] = \
b'0\0test-vm2+dev1 ro=True\n' \
b'test-vm3+dev2 persistent=True\n'
self.app.expected_calls[
('new-name', 'admin.vm.device.pci.Attach', 'test-vm3+dev2',
b'persistent=True')] = \
b'2\0QubesException\0\0something happened\0'
self.app.expected_calls[
('new-name', 'admin.vm.Remove', None, None)] = b'0\0'
with self.assertRaises(qubesadmin.exc.QubesException):
new_vm = self.app.clone_vm('test-vm', 'new-name')
self.assertAllCalled()
class TC_20_QubesLocal(unittest.TestCase):
def setUp(self):

View File

@ -310,6 +310,8 @@ class TC_00_qvm_create(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('new-vm', 'admin.vm.volume.CloneTo', 'root', b'clone-cookie')] = \
b'0\0'
self.app.expected_calls[
('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
qubesadmin.tools.qvm_create.main(['-C', 'StandaloneVM',
'-t', 'template', '-l', 'red', 'new-vm'],
app=self.app)