From bcd026d141efa8cdc1a4dc4fb1c0395273faa3f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Tue, 20 Jun 2017 00:02:20 +0200 Subject: [PATCH] Implement VM clone as create + copy data+metadata This way we don't need separate admin.vm.Clone call, which is tricky to handler properly with policy. A VM may not have access to all the properties and other metadata, so add ignore_errors argument, for best-effort approach (copy what is possible). In any case, failure of cloning VM data fails the whole operation. When operation fails, VM is removed. While at it, allow to specify alternative VM class - this allows morphing one VM into another (for example AppVM -> StandaloneVM). Adjust qvm-clone tool and tests accordingly. QubesOS/qubes-issues#2622 --- doc/manpages/qvm-clone.rst | 8 +- qubesadmin/app.py | 104 +++++++-- qubesadmin/tests/app.py | 345 +++++++++++++++++++++++++++- qubesadmin/tests/tools/qvm_clone.py | 32 ++- qubesadmin/tools/qvm_clone.py | 6 +- 5 files changed, 460 insertions(+), 35 deletions(-) diff --git a/doc/manpages/qvm-clone.rst b/doc/manpages/qvm-clone.rst index 1fed0b9..f77e4ce 100644 --- a/doc/manpages/qvm-clone.rst +++ b/doc/manpages/qvm-clone.rst @@ -14,7 +14,7 @@ Synopsis -------- -:command:`qvm-clone` [-h] [--verbose] [--quiet] [-p *POOL:VOLUME* | -P POOL] *VMNAME* *NEWVM* +:command:`qvm-clone` [*options*] *VMNAME* *NEWVM* Options ------- @@ -23,6 +23,12 @@ Options Show this help message and exit +.. option:: --class=CLASS, -C CLASS + + Create VM of different class than source VM. The tool will try to copy as + much as possible data/metadata from source VM, but some information may be + impossible to preserve (for example target VM have no matching properties). + .. option:: -P POOL Pool to use for the new domain. All volumes besides snapshots volumes are diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 860df56..9eec86b 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -270,7 +270,8 @@ class QubesBase(qubesadmin.base.PropertyHolder): self.domains.clear_cache() return self.domains[name] - def clone_vm(self, src_vm, new_name, pool=None, pools=None): + def clone_vm(self, src_vm, new_name, new_cls=None, + pool=None, pools=None, ignore_errors=False): '''Clone Virtual Machine Example usage with custom storage pools: @@ -281,35 +282,110 @@ class QubesBase(qubesadmin.base.PropertyHolder): >>> vm = app.clone_vm(src_vm, 'my-new-vm', pools=pools) >>> vm.label = app.labels['green'] - :param str cls: name of VM class (`AppVM`, `TemplateVM` etc) - :param str name: name of VM - :param str label: label color for new VM - :param str template: template to use (if apply for given VM class), - can be also VM object; use None for default value + :param QubesVM or str src_vm: source VM + :param str new_name: name of new VM + :param str new_cls: name of VM class (`AppVM`, `TemplateVM` etc) - use + None to copy it from *src_vm* :param str pool: storage pool to use instead of default one :param dict pools: storage pool for specific volumes + :param bool ignore_errors: should errors on meta-data setting be only + logged, or abort the whole operation? :return new VM object ''' if pool and pools: raise ValueError('only one of pool= and pools= can be used') - if not isinstance(src_vm, str): - src_vm = str(src_vm) + if isinstance(src_vm, str): + src_vm = self.domains[src_vm] - method = 'admin.vm.Clone' - payload = 'name={}'.format(new_name) + if new_cls is None: + new_cls = src_vm.__class__.__name__ + + template = getattr(src_vm, 'template', None) + if template is not None: + template = str(template) + + label = src_vm.label + + method_prefix = 'admin.vm.Create.' + payload = 'name={} label={}'.format(new_name, label) if pool: payload += ' pool={}'.format(str(pool)) - method = 'admin.vm.CloneInPool' + method_prefix = 'admin.vm.CreateInPool.' if pools: payload += ''.join(' pool:{}={}'.format(vol, str(pool)) for vol, pool in sorted(pools.items())) - method = 'admin.vm.CloneInPool' + method_prefix = 'admin.vm.CreateInPool.' - self.qubesd_call(src_vm, method, None, payload.encode('utf-8')) + self.qubesd_call('dom0', method_prefix + new_cls, template, + payload.encode('utf-8')) - return self.domains[new_name] + self.domains.clear_cache() + dst_vm = self.domains[new_name] + try: + assert isinstance(dst_vm, qubesadmin.vm.QubesVM) + for prop in src_vm.property_list(): + # handled by admin.vm.Create call + if prop in ('name', 'qid', 'template', 'label'): + continue + if src_vm.property_is_default(prop): + continue + try: + setattr(dst_vm, prop, getattr(src_vm, prop)) + except AttributeError: + pass + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to set {!s} property: {!s}'.format(prop, e)) + if not ignore_errors: + raise + + for tag in src_vm.tags: + try: + dst_vm.tags.add(tag) + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to add {!s} tag: {!s}'.format(tag, e)) + if not ignore_errors: + raise + + for feature, value in src_vm.features.items(): + try: + dst_vm.features[feature] = value + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to set {!s} feature: {!s}'.format(feature, e)) + if not ignore_errors: + raise + + try: + dst_vm.firewall.policy = src_vm.firewall.policy + dst_vm.firewall.save_rules(src_vm.firewall.rules) + except qubesadmin.exc.QubesException as e: + self.log.error('Failed to set firewall: %s', e) + if not ignore_errors: + raise + + except qubesadmin.exc.QubesException: + if not ignore_errors: + del self.domains[dst_vm.name] + raise + + try: + for dst_volume in sorted(dst_vm.volumes.values()): + if not dst_volume.save_on_stop: + # clone only persistent volumes + continue + src_volume = src_vm.volumes[dst_volume.name] + dst_vm.log.info('Cloning {} volume'.format(dst_volume.name)) + dst_volume.clone(src_volume) + + except qubesadmin.exc.QubesException: + del self.domains[dst_vm.name] + raise + + return dst_vm def run_service(self, dest, service, filter_esc=False, user=None, localcmd=None, wait=True, **kwargs): diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 56ff6f4..f82b39f 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -32,6 +32,7 @@ except ImportError: import tempfile +import qubesadmin.exc import qubesadmin.tests @@ -154,48 +155,368 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.assertEqual(label.name, 'red') self.assertAllCalled() + def clone_setup_common_calls(self, src, dst): + # labels + self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \ + b'0\x00red\ngreen\nblue\n' + + # have each property type with default=no, each special-cased, + # and some with default=yes + properties = { + 'label': 'default=False type=label red', + 'template': 'default=False type=vm test-template', + 'memory': 'default=False type=int 400', + 'kernel': 'default=False type=str 4.9.31', + 'netvm': 'default=False type=vm test-net', + 'hvm': 'default=False type=bool True', + 'default_user': 'default=True type=str user', + } + self.app.expected_calls[ + (src, 'admin.vm.property.List', None, None)] = \ + b'0\0qid\nname\n' + \ + b'\n'.join(prop.encode() for prop in properties.keys()) + \ + b'\n' + for prop, value in properties.items(): + self.app.expected_calls[ + (src, 'admin.vm.property.Get', prop, None)] = \ + b'0\0' + value.encode() + # special cases handled by admin.vm.Create call + if prop in ('label', 'template'): + continue + # default properties should not be set + if 'default=True' in value: + continue + self.app.expected_calls[ + (dst, 'admin.vm.property.Set', prop, + value.split()[-1].encode())] = b'0\0' + + # tags + self.app.expected_calls[ + (src, 'admin.vm.tag.List', None, None)] = \ + b'0\0tag1\ntag2\n' + self.app.expected_calls[ + (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.app.expected_calls[ + (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0' + + # features + self.app.expected_calls[ + (src, 'admin.vm.feature.List', None, None)] = \ + b'0\0feat1\nfeat2\n' + self.app.expected_calls[ + (src, 'admin.vm.feature.Get', 'feat1', None)] = \ + b'0\0feat1-value with spaces' + self.app.expected_calls[ + (src, 'admin.vm.feature.Get', 'feat2', None)] = \ + b'0\x001' + self.app.expected_calls[ + (dst, 'admin.vm.feature.Set', 'feat1', + b'feat1-value with spaces')] = b'0\0' + self.app.expected_calls[ + (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0' + + # firewall + rules = ( + b'action=drop dst4=192.168.0.0/24\n' + b'action=accept\n' + ) + self.app.expected_calls[ + (src, 'admin.vm.firewall.GetPolicy', None, None)] = \ + b'0\x00accept' + self.app.expected_calls[ + (src, 'admin.vm.firewall.Get', None, None)] = \ + b'0\x00' + rules + self.app.expected_calls[ + (dst, 'admin.vm.firewall.SetPolicy', None, b'accept')] = \ + b'0\x00' + self.app.expected_calls[ + (dst, 'admin.vm.firewall.Set', None, rules)] = \ + b'0\x00' + + # storage + self.app.expected_calls[ + (dst, 'admin.vm.volume.List', None, None)] = \ + b'0\x00root\nprivate\nvolatile\nkernel\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'root', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/root\n' \ + b'size=10737418240\n' \ + b'usage=2147483648\n' \ + b'rw=False\n' \ + b'internal=True\n' \ + b'source=vm-test-template/root\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=True\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'private', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/private\n' \ + b'size=2147483648\n' \ + b'usage=214748364\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'save_on_stop=True\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'volatile', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/volatile\n' \ + b'size=10737418240\n' \ + b'usage=0\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'kernel', None)] = \ + b'0\x00pool=linux-kernel\n' \ + b'vid=\n' \ + b'size=0\n' \ + b'usage=0\n' \ + b'rw=False\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (src, 'admin.vm.volume.List', None, None)] = \ + b'0\x00root\nprivate\nvolatile\nkernel\n' + self.app.expected_calls[ + (src, 'admin.vm.volume.Clone', 'private', dst.encode())] = \ + b'0\x00' + def test_030_clone(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-name')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm('test-vm', 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_031_clone_object(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-name')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_032_clone_pool(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None, - b'name=new-name pool=some-pool')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', + 'test-template', + b'name=new-name label=red pool=some-pool')] = b'0\x00' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_033_clone_pools(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None, - b'name=new-name pool:private=some-pool ' + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', + 'test-template', + b'name=new-name label=red pool:private=some-pool ' b'pool:volatile=other-pool')] = b'0\x00' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' new_vm = self.app.clone_vm('test-vm', 'new-name', pools={'private': 'some-pool', 'volatile': 'other-pool'}) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() + def test_034_clone_class_change(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=StandaloneVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-new-name/root\n' \ + b'size=10737418240\n' \ + b'usage=2147483648\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=True\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'root', b'new-name')] = \ + b'0\x00' + new_vm = self.app.clone_vm('test-vm', 'new-name', + new_cls='StandaloneVM') + self.assertEqual(new_vm.name, 'new-name') + self.assertEqual(new_vm.__class__.__name__, 'StandaloneVM') + self.assertAllCalled() + + def test_035_clone_fail(self): + self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \ + b'0\x00red\ngreen\nblue\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.List', None, None)] = \ + b'0\0qid\nname\ntemplate\nlabel\nmemory\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'label', None)] = \ + b'0\0default=False type=label red' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ + b'0\0default=False type=vm test-template' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \ + b'0\0default=False type=int 400' + self.app.expected_calls[ + ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name') + self.assertAllCalled() + + def test_036_clone_ignore_errors_prop(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_037_clone_ignore_errors_feature(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_038_clone_ignore_errors_tag(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_039_clone_ignore_errors_firewall(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.firewall.SetPolicy', None, b'accept')] = \ + b'2\0QubesException\0\0something happened\0' + del self.app.expected_calls[ + ('test-vm', 'admin.vm.firewall.Get', None, None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.firewall.Set', None, + b'action=drop dst4=192.168.0.0/24\naction=accept\n')] + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_040_clone_ignore_errors_storage(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'volatile', None)] + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertAllCalled() + + def test_041_clone_fail_storage(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'volatile', None)] + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name') + self.assertAllCalled() + + class TC_20_QubesLocal(unittest.TestCase): def setUp(self): diff --git a/qubesadmin/tests/tools/qvm_clone.py b/qubesadmin/tests/tools/qvm_clone.py index b4c33a1..f8ca372 100644 --- a/qubesadmin/tests/tools/qvm_clone.py +++ b/qubesadmin/tests/tools/qvm_clone.py @@ -20,43 +20,61 @@ import qubesadmin.tests import qubesadmin.tests.tools import qubesadmin.tools.qvm_clone +from unittest import mock class TC_00_qvm_clone(qubesadmin.tests.QubesTestCase): def test_000_simple(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-vm')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool=None, pools={}) self.assertAllCalled() def test_001_missing_vm(self): + self.app.clone_vm = mock.Mock() with self.assertRaises(SystemExit): with qubesadmin.tests.tools.StderrBuffer() as stderr: qubesadmin.tools.qvm_clone.main(['test-vm'], app=self.app) self.assertIn('NAME', stderr.getvalue()) + self.assertFalse(self.app.clone_vm.called) self.assertAllCalled() def test_004_pool(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', - None, b'name=new-vm pool=some-pool')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['-P', 'some-pool', 'test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool='some-pool', pools={}) self.assertAllCalled() def test_005_pools(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', - None, b'name=new-vm pool:private=some-pool ' - b'pool:volatile=other-pool')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['--pool', 'private=some-pool', '--pool', 'volatile=other-pool', 'test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool=None, pools={'private': 'some-pool', + 'volatile': 'other-pool'}) + self.assertAllCalled() + + def test_006_new_cls(self): + self.app.clone_vm = mock.Mock() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-vm class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' + qubesadmin.tools.qvm_clone.main(['--class', 'StandaloneVM', + 'test-vm', 'new-vm'], + app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls='StandaloneVM', pool=None, pools={}) self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_clone.py b/qubesadmin/tools/qvm_clone.py index fb4cb4e..7623ec0 100644 --- a/qubesadmin/tools/qvm_clone.py +++ b/qubesadmin/tools/qvm_clone.py @@ -32,6 +32,10 @@ parser.add_argument('new_name', action='store', help='name of the domain to create') +parser.add_argument('--class', '-C', dest='cls', + default=None, + help='specify the class of the new domain (default: same as source)') + group = parser.add_mutually_exclusive_group() group.add_argument('-P', metavar='POOL', @@ -66,7 +70,7 @@ def main(args=None, app=None): parser.error( 'Pool argument must be of form: -P volume_name=pool_name') - app.clone_vm(src_vm, new_name, pool=pool, pools=pools) + app.clone_vm(src_vm, new_name, new_cls=args.cls, pool=pool, pools=pools) if __name__ == '__main__': sys.exit(main())