diff --git a/doc/manpages/qvm-clone.rst b/doc/manpages/qvm-clone.rst index 1fed0b9..f77e4ce 100644 --- a/doc/manpages/qvm-clone.rst +++ b/doc/manpages/qvm-clone.rst @@ -14,7 +14,7 @@ Synopsis -------- -:command:`qvm-clone` [-h] [--verbose] [--quiet] [-p *POOL:VOLUME* | -P POOL] *VMNAME* *NEWVM* +:command:`qvm-clone` [*options*] *VMNAME* *NEWVM* Options ------- @@ -23,6 +23,12 @@ Options Show this help message and exit +.. option:: --class=CLASS, -C CLASS + + Create VM of different class than source VM. The tool will try to copy as + much as possible data/metadata from source VM, but some information may be + impossible to preserve (for example target VM have no matching properties). + .. option:: -P POOL Pool to use for the new domain. All volumes besides snapshots volumes are diff --git a/doc/manpages/qvm-device.rst b/doc/manpages/qvm-device.rst index 8104fc4..88869f8 100644 --- a/doc/manpages/qvm-device.rst +++ b/doc/manpages/qvm-device.rst @@ -49,7 +49,7 @@ aliases: ls, l attach ^^^^^^ -| :command:`qvm-volume attach` [-h] [--verbose] [--quiet] [--ro] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* +| :command:`qvm-device` *DEVICE_CLASS* attach [-h] [--verbose] [--quiet] [--ro] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* Attach the device with *DEVICE_ID* from *BACKEND_DOMAIN* to the domain *VMNAME* @@ -67,7 +67,7 @@ aliases: a, at detach ^^^^^^ -| :command:`qvm-volume detach` [-h] [--verbose] [--quiet] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* +| :command:`qvm-device` *DEVICE_CLASS* detach [-h] [--verbose] [--quiet] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* Detach the device with *BACKEND_DOMAIN:DEVICE_ID* from domain *VMNAME* diff --git a/doc/manpages/qvm-tags.rst b/doc/manpages/qvm-tags.rst index 3f23e2e..22dd6ba 100644 --- a/doc/manpages/qvm-tags.rst +++ b/doc/manpages/qvm-tags.rst @@ -15,7 +15,10 @@ Synopsis -------- -:command:`qvm-tags` [-h] [--verbose] [--quiet] [--query | --set | --unset] *VMNAME* [*TAG*] +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {list,ls,l} [*TAG*] +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {add,a,set} *TAG* ... +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {del,d,unset,u} *TAG* ... + Options ------- @@ -32,22 +35,37 @@ Options Decrease verbosity. -.. option:: --query +Commands +-------- - Query for the tag. Exit with zero (true) if the qube in question has the tag - and with non-zero (false) if it does not. If no tag specified, list all the - tags. +list +^^^^ - This is the default mode. +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* list [*TAG*] -.. option:: --set, -s +List tags. If tag name is given, check if this tag is set for the VM and signal +this with exit code (0 - tag is set, 1 - it is not). - Set the tag. The tag argument is mandatory. If tag is already set, do - nothing. +aliases: ls, l -.. option:: --delete, --unset, -D +add +^^^ + +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* add *TAG* [*TAG* ...] + +Add tag(s) to a VM. If tag is already set for given VM, do nothing. + +aliases: a, set + +del +^^^ + +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* del *TAG* [*TAG* ...] + +Delete tag(s) from a VM. If tag is not set for given VM, do nothing. + +aliases: d, unset, u - Unset the tag. The tag argument is mandatory. If tag is not set, do nothing. Authors ------- diff --git a/doc/manpages/qvm-volume.rst b/doc/manpages/qvm-volume.rst index 04eda1b..7ec474a 100644 --- a/doc/manpages/qvm-volume.rst +++ b/doc/manpages/qvm-volume.rst @@ -1,7 +1,7 @@ .. program:: qvm-volume :program:`qvm-volume` -- Qubes volume and block device managment -=============================================================== +================================================================ Synopsis -------- diff --git a/doc/skel-manpage.py b/doc/skel-manpage.py index 8bb75ad..14ddfa7 100755 --- a/doc/skel-manpage.py +++ b/doc/skel-manpage.py @@ -5,7 +5,7 @@ import sys sys.path.insert(0, os.path.abspath('../')) import argparse -import qubesadmin.dochelpers +import qubesadmin.tools.dochelpers parser = argparse.ArgumentParser(description='prepare new manpage for command') parser.add_argument('command', metavar='COMMAND', @@ -14,7 +14,7 @@ parser.add_argument('command', metavar='COMMAND', def main(): args = parser.parse_args() - sys.stdout.write(qubesadmin.dochelpers.prepare_manpage(args.command)) + sys.stdout.write(qubesadmin.tools.dochelpers.prepare_manpage(args.command)) if __name__ == '__main__': main() diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 860df56..9eec86b 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -270,7 +270,8 @@ class QubesBase(qubesadmin.base.PropertyHolder): self.domains.clear_cache() return self.domains[name] - def clone_vm(self, src_vm, new_name, pool=None, pools=None): + def clone_vm(self, src_vm, new_name, new_cls=None, + pool=None, pools=None, ignore_errors=False): '''Clone Virtual Machine Example usage with custom storage pools: @@ -281,35 +282,110 @@ class QubesBase(qubesadmin.base.PropertyHolder): >>> vm = app.clone_vm(src_vm, 'my-new-vm', pools=pools) >>> vm.label = app.labels['green'] - :param str cls: name of VM class (`AppVM`, `TemplateVM` etc) - :param str name: name of VM - :param str label: label color for new VM - :param str template: template to use (if apply for given VM class), - can be also VM object; use None for default value + :param QubesVM or str src_vm: source VM + :param str new_name: name of new VM + :param str new_cls: name of VM class (`AppVM`, `TemplateVM` etc) - use + None to copy it from *src_vm* :param str pool: storage pool to use instead of default one :param dict pools: storage pool for specific volumes + :param bool ignore_errors: should errors on meta-data setting be only + logged, or abort the whole operation? :return new VM object ''' if pool and pools: raise ValueError('only one of pool= and pools= can be used') - if not isinstance(src_vm, str): - src_vm = str(src_vm) + if isinstance(src_vm, str): + src_vm = self.domains[src_vm] - method = 'admin.vm.Clone' - payload = 'name={}'.format(new_name) + if new_cls is None: + new_cls = src_vm.__class__.__name__ + + template = getattr(src_vm, 'template', None) + if template is not None: + template = str(template) + + label = src_vm.label + + method_prefix = 'admin.vm.Create.' + payload = 'name={} label={}'.format(new_name, label) if pool: payload += ' pool={}'.format(str(pool)) - method = 'admin.vm.CloneInPool' + method_prefix = 'admin.vm.CreateInPool.' if pools: payload += ''.join(' pool:{}={}'.format(vol, str(pool)) for vol, pool in sorted(pools.items())) - method = 'admin.vm.CloneInPool' + method_prefix = 'admin.vm.CreateInPool.' - self.qubesd_call(src_vm, method, None, payload.encode('utf-8')) + self.qubesd_call('dom0', method_prefix + new_cls, template, + payload.encode('utf-8')) - return self.domains[new_name] + self.domains.clear_cache() + dst_vm = self.domains[new_name] + try: + assert isinstance(dst_vm, qubesadmin.vm.QubesVM) + for prop in src_vm.property_list(): + # handled by admin.vm.Create call + if prop in ('name', 'qid', 'template', 'label'): + continue + if src_vm.property_is_default(prop): + continue + try: + setattr(dst_vm, prop, getattr(src_vm, prop)) + except AttributeError: + pass + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to set {!s} property: {!s}'.format(prop, e)) + if not ignore_errors: + raise + + for tag in src_vm.tags: + try: + dst_vm.tags.add(tag) + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to add {!s} tag: {!s}'.format(tag, e)) + if not ignore_errors: + raise + + for feature, value in src_vm.features.items(): + try: + dst_vm.features[feature] = value + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to set {!s} feature: {!s}'.format(feature, e)) + if not ignore_errors: + raise + + try: + dst_vm.firewall.policy = src_vm.firewall.policy + dst_vm.firewall.save_rules(src_vm.firewall.rules) + except qubesadmin.exc.QubesException as e: + self.log.error('Failed to set firewall: %s', e) + if not ignore_errors: + raise + + except qubesadmin.exc.QubesException: + if not ignore_errors: + del self.domains[dst_vm.name] + raise + + try: + for dst_volume in sorted(dst_vm.volumes.values()): + if not dst_volume.save_on_stop: + # clone only persistent volumes + continue + src_volume = src_vm.volumes[dst_volume.name] + dst_vm.log.info('Cloning {} volume'.format(dst_volume.name)) + dst_volume.clone(src_volume) + + except qubesadmin.exc.QubesException: + del self.domains[dst_vm.name] + raise + + return dst_vm def run_service(self, dest, service, filter_esc=False, user=None, localcmd=None, wait=True, **kwargs): diff --git a/qubesadmin/base.py b/qubesadmin/base.py index 2a009b6..cb9d64a 100644 --- a/qubesadmin/base.py +++ b/qubesadmin/base.py @@ -154,6 +154,23 @@ class PropertyHolder(object): assert isinstance(is_default, bool) return is_default + def clone_properties(self, src, proplist=None): + '''Clone properties from other object. + + :param PropertyHolder src: source object + :param list proplist: list of properties \ + (:py:obj:`None` or omit for all properties) + ''' + + if proplist is None: + proplist = self.property_list() + + for prop in proplist: + try: + setattr(self, prop, getattr(src, prop)) + except AttributeError: + continue + def __getattr__(self, item): # pylint: disable=too-many-return-statements if item.startswith('_'): diff --git a/qubesadmin/exc.py b/qubesadmin/exc.py index 9e60cc6..54fb224 100644 --- a/qubesadmin/exc.py +++ b/qubesadmin/exc.py @@ -116,6 +116,10 @@ class QubesFeatureNotFoundError(QubesException, KeyError): '''Feature not set for a given domain''' +class QubesTagNotFoundError(QubesException, KeyError): + '''Tag not set for a given domain''' + + class StoragePoolException(QubesException): ''' A general storage exception ''' diff --git a/qubesadmin/storage.py b/qubesadmin/storage.py index 21614a9..5a72719 100644 --- a/qubesadmin/storage.py +++ b/qubesadmin/storage.py @@ -93,6 +93,21 @@ class Volume(object): return self.pool == other.pool and self.vid == other.vid return NotImplemented + def __lt__(self, other): + # pylint: disable=protected-access + if isinstance(other, Volume): + if self._vm and other._vm: + return (self._vm, self._vm_name) < (other._vm, other._vm_name) + elif self._vid and other._vid: + return (self._pool, self._vid) < (other._pool, other._vid) + return NotImplemented + + + @property + def name(self): + '''per-VM volume name, if available''' + return self._vm_name + @property def pool(self): '''Storage volume pool name.''' @@ -195,6 +210,26 @@ class Volume(object): ''' self._qubesd_call('Import', payload_stream=stream) + def clone(self, source): + ''' Clone data from sane volume of another VM. + + This function override existing volume content. + This operation is implemented for VM volumes - those in vm.volumes + collection (not pool.volumes). + + :param source: source volume object + ''' + + # pylint: disable=protected-access + if source._vm_name is None or self._vm_name is None: + raise NotImplementedError( + 'Operation implemented only for VM volumes') + if source._vm_name != self._vm_name: + raise ValueError('Source and target volume must have the same type') + + # this call is to *source* volume, because we extract data from there + source._qubesd_call('Clone', payload=str(self._vm).encode()) + class Pool(object): ''' A Pool is used to manage different kind of volumes (File diff --git a/qubesadmin/tags.py b/qubesadmin/tags.py new file mode 100644 index 0000000..37c09e9 --- /dev/null +++ b/qubesadmin/tags.py @@ -0,0 +1,66 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +'''VM tags interface''' + + +class Tags(object): + '''Manager of the tags. + + Tags are simple: tag either can be present on qube or not. Tag is a + simple string consisting of ASCII alphanumeric characters, plus `_` and + `-`. + ''' + # pylint: disable=too-few-public-methods + + def __init__(self, vm): + super(Tags, self).__init__() + self.vm = vm + + def remove(self, elem): + '''Remove a tag''' + self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Remove', elem) + + def add(self, elem): + '''Add a tag''' + self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Set', elem) + + def update(self, *others): + '''Add tags from iterable(s)''' + for other in others: + for elem in other: + self.add(elem) + + def discard(self, elem): + '''Remove a tag if present''' + try: + self.remove(elem) + except KeyError: + pass + + def __iter__(self): + qubesd_response = self.vm.qubesd_call(self.vm.name, + 'admin.vm.tag.List') + return iter(qubesd_response.decode('utf-8').splitlines()) + + def __contains__(self, elem): + '''Does the VM have a tag''' + response = self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Get', elem) + return response == b'1' diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 56ff6f4..f82b39f 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -32,6 +32,7 @@ except ImportError: import tempfile +import qubesadmin.exc import qubesadmin.tests @@ -154,48 +155,368 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.assertEqual(label.name, 'red') self.assertAllCalled() + def clone_setup_common_calls(self, src, dst): + # labels + self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \ + b'0\x00red\ngreen\nblue\n' + + # have each property type with default=no, each special-cased, + # and some with default=yes + properties = { + 'label': 'default=False type=label red', + 'template': 'default=False type=vm test-template', + 'memory': 'default=False type=int 400', + 'kernel': 'default=False type=str 4.9.31', + 'netvm': 'default=False type=vm test-net', + 'hvm': 'default=False type=bool True', + 'default_user': 'default=True type=str user', + } + self.app.expected_calls[ + (src, 'admin.vm.property.List', None, None)] = \ + b'0\0qid\nname\n' + \ + b'\n'.join(prop.encode() for prop in properties.keys()) + \ + b'\n' + for prop, value in properties.items(): + self.app.expected_calls[ + (src, 'admin.vm.property.Get', prop, None)] = \ + b'0\0' + value.encode() + # special cases handled by admin.vm.Create call + if prop in ('label', 'template'): + continue + # default properties should not be set + if 'default=True' in value: + continue + self.app.expected_calls[ + (dst, 'admin.vm.property.Set', prop, + value.split()[-1].encode())] = b'0\0' + + # tags + self.app.expected_calls[ + (src, 'admin.vm.tag.List', None, None)] = \ + b'0\0tag1\ntag2\n' + self.app.expected_calls[ + (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.app.expected_calls[ + (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0' + + # features + self.app.expected_calls[ + (src, 'admin.vm.feature.List', None, None)] = \ + b'0\0feat1\nfeat2\n' + self.app.expected_calls[ + (src, 'admin.vm.feature.Get', 'feat1', None)] = \ + b'0\0feat1-value with spaces' + self.app.expected_calls[ + (src, 'admin.vm.feature.Get', 'feat2', None)] = \ + b'0\x001' + self.app.expected_calls[ + (dst, 'admin.vm.feature.Set', 'feat1', + b'feat1-value with spaces')] = b'0\0' + self.app.expected_calls[ + (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0' + + # firewall + rules = ( + b'action=drop dst4=192.168.0.0/24\n' + b'action=accept\n' + ) + self.app.expected_calls[ + (src, 'admin.vm.firewall.GetPolicy', None, None)] = \ + b'0\x00accept' + self.app.expected_calls[ + (src, 'admin.vm.firewall.Get', None, None)] = \ + b'0\x00' + rules + self.app.expected_calls[ + (dst, 'admin.vm.firewall.SetPolicy', None, b'accept')] = \ + b'0\x00' + self.app.expected_calls[ + (dst, 'admin.vm.firewall.Set', None, rules)] = \ + b'0\x00' + + # storage + self.app.expected_calls[ + (dst, 'admin.vm.volume.List', None, None)] = \ + b'0\x00root\nprivate\nvolatile\nkernel\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'root', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/root\n' \ + b'size=10737418240\n' \ + b'usage=2147483648\n' \ + b'rw=False\n' \ + b'internal=True\n' \ + b'source=vm-test-template/root\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=True\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'private', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/private\n' \ + b'size=2147483648\n' \ + b'usage=214748364\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'save_on_stop=True\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'volatile', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/volatile\n' \ + b'size=10737418240\n' \ + b'usage=0\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'kernel', None)] = \ + b'0\x00pool=linux-kernel\n' \ + b'vid=\n' \ + b'size=0\n' \ + b'usage=0\n' \ + b'rw=False\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (src, 'admin.vm.volume.List', None, None)] = \ + b'0\x00root\nprivate\nvolatile\nkernel\n' + self.app.expected_calls[ + (src, 'admin.vm.volume.Clone', 'private', dst.encode())] = \ + b'0\x00' + def test_030_clone(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-name')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm('test-vm', 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_031_clone_object(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-name')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_032_clone_pool(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None, - b'name=new-name pool=some-pool')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', + 'test-template', + b'name=new-name label=red pool=some-pool')] = b'0\x00' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_033_clone_pools(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None, - b'name=new-name pool:private=some-pool ' + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', + 'test-template', + b'name=new-name label=red pool:private=some-pool ' b'pool:volatile=other-pool')] = b'0\x00' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' new_vm = self.app.clone_vm('test-vm', 'new-name', pools={'private': 'some-pool', 'volatile': 'other-pool'}) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() + def test_034_clone_class_change(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=StandaloneVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-new-name/root\n' \ + b'size=10737418240\n' \ + b'usage=2147483648\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=True\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'root', b'new-name')] = \ + b'0\x00' + new_vm = self.app.clone_vm('test-vm', 'new-name', + new_cls='StandaloneVM') + self.assertEqual(new_vm.name, 'new-name') + self.assertEqual(new_vm.__class__.__name__, 'StandaloneVM') + self.assertAllCalled() + + def test_035_clone_fail(self): + self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \ + b'0\x00red\ngreen\nblue\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.List', None, None)] = \ + b'0\0qid\nname\ntemplate\nlabel\nmemory\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'label', None)] = \ + b'0\0default=False type=label red' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ + b'0\0default=False type=vm test-template' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \ + b'0\0default=False type=int 400' + self.app.expected_calls[ + ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name') + self.assertAllCalled() + + def test_036_clone_ignore_errors_prop(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_037_clone_ignore_errors_feature(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_038_clone_ignore_errors_tag(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_039_clone_ignore_errors_firewall(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.firewall.SetPolicy', None, b'accept')] = \ + b'2\0QubesException\0\0something happened\0' + del self.app.expected_calls[ + ('test-vm', 'admin.vm.firewall.Get', None, None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.firewall.Set', None, + b'action=drop dst4=192.168.0.0/24\naction=accept\n')] + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_040_clone_ignore_errors_storage(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'volatile', None)] + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertAllCalled() + + def test_041_clone_fail_storage(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'volatile', None)] + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name') + self.assertAllCalled() + + class TC_20_QubesLocal(unittest.TestCase): def setUp(self): diff --git a/qubesadmin/tests/storage.py b/qubesadmin/tests/storage.py index b0cee76..76d3134 100644 --- a/qubesadmin/tests/storage.py +++ b/qubesadmin/tests/storage.py @@ -161,6 +161,30 @@ class TestVMVolume(qubesadmin.tests.QubesTestCase): input_proc.stdout.close() self.assertAllCalled() + def test_050_clone(self): + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.Clone', 'volname', b'test-vm')] = \ + b'0\x00' + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00source-vm class=AppVM state=Halted\n' + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.List', None, None)] = \ + b'0\x00volname\nother\n' + self.vol.clone(self.app.domains['source-vm'].volumes['volname']) + self.assertAllCalled() + + def test_050_clone_wrong_volume(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00source-vm class=AppVM state=Halted\n' + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.List', None, None)] = \ + b'0\x00volname\nother\n' + with self.assertRaises(ValueError): + self.vol.clone(self.app.domains['source-vm'].volumes['other']) + self.assertAllCalled() + class TestPoolVolume(TestVMVolume): def setUp(self): @@ -245,7 +269,21 @@ class TestPoolVolume(TestVMVolume): self.assertAllCalled() def test_040_import_data(self): - self.skipTest('admin.pool.vm.Import not supported') + self.skipTest('admin.pool.volume.Import not supported') + + def test_050_clone(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00source-vm class=AppVM state=Halted\n' + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.List', None, None)] = \ + b'0\x00volname\nother\n' + with self.assertRaises(NotImplementedError): + self.vol.clone(self.app.domains['source-vm'].volumes['volname']) + self.assertAllCalled() + + def test_050_clone_wrong_volume(self): + self.skipTest('admin.pool.volume.Clone not supported') class TestPool(qubesadmin.tests.QubesTestCase): diff --git a/qubesadmin/tests/tags.py b/qubesadmin/tests/tags.py new file mode 100644 index 0000000..00893f9 --- /dev/null +++ b/qubesadmin/tests/tags.py @@ -0,0 +1,110 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +import qubesadmin.tests +import qubesadmin.tags + +class TC_00_Tags(qubesadmin.tests.QubesTestCase): + def setUp(self): + super(TC_00_Tags, self).setUp() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\0test-vm class=AppVM state=Running\n' \ + b'test-vm2 class=AppVM state=Running\n' \ + b'test-vm3 class=AppVM state=Running\n' + self.vm = self.app.domains['test-vm'] + self.tags = qubesadmin.tags.Tags(self.vm) + + def test_000_list(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\0tag1\ntag2\n' + self.assertEqual(sorted(self.tags), + ['tag1', 'tag2']) + self.assertAllCalled() + + def test_010_get(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x001' + self.assertIn('tag1', self.tags) + self.assertAllCalled() + + def test_011_get_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x000' + self.assertNotIn('tag1', self.tags) + self.assertAllCalled() + + def test_020_set(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.tags.add('tag1') + self.assertAllCalled() + + def test_030_update(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag2', None)] = b'0\0' + self.tags.update(['tag1', 'tag2']) + self.assertAllCalled() + + def test_031_update_from_other(self): + self.app.expected_calls[ + ('test-vm2', 'admin.vm.tag.List', None, None)] = \ + b'0\0tag3\ntag4\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\0' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag4', None)] = b'0\0' + self.tags.update(self.app.domains['test-vm2'].tags) + self.assertAllCalled() + + def test_040_remove(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'0\0' + self.tags.remove('tag1') + self.assertAllCalled() + + def test_040_remove_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \ + b'tag1\0' + with self.assertRaises(KeyError): + self.tags.remove('tag1') + self.assertAllCalled() + + def test_050_discard(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'0\0' + self.tags.discard('tag1') + self.assertAllCalled() + + def test_051_discard_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \ + b'tag1\0' + self.tags.discard('tag1') + self.assertAllCalled() diff --git a/qubesadmin/tests/tools/qvm_clone.py b/qubesadmin/tests/tools/qvm_clone.py index b4c33a1..f8ca372 100644 --- a/qubesadmin/tests/tools/qvm_clone.py +++ b/qubesadmin/tests/tools/qvm_clone.py @@ -20,43 +20,61 @@ import qubesadmin.tests import qubesadmin.tests.tools import qubesadmin.tools.qvm_clone +from unittest import mock class TC_00_qvm_clone(qubesadmin.tests.QubesTestCase): def test_000_simple(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-vm')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool=None, pools={}) self.assertAllCalled() def test_001_missing_vm(self): + self.app.clone_vm = mock.Mock() with self.assertRaises(SystemExit): with qubesadmin.tests.tools.StderrBuffer() as stderr: qubesadmin.tools.qvm_clone.main(['test-vm'], app=self.app) self.assertIn('NAME', stderr.getvalue()) + self.assertFalse(self.app.clone_vm.called) self.assertAllCalled() def test_004_pool(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', - None, b'name=new-vm pool=some-pool')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['-P', 'some-pool', 'test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool='some-pool', pools={}) self.assertAllCalled() def test_005_pools(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', - None, b'name=new-vm pool:private=some-pool ' - b'pool:volatile=other-pool')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['--pool', 'private=some-pool', '--pool', 'volatile=other-pool', 'test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool=None, pools={'private': 'some-pool', + 'volatile': 'other-pool'}) + self.assertAllCalled() + + def test_006_new_cls(self): + self.app.clone_vm = mock.Mock() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-vm class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' + qubesadmin.tools.qvm_clone.main(['--class', 'StandaloneVM', + 'test-vm', 'new-vm'], + app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls='StandaloneVM', pool=None, pools={}) self.assertAllCalled() diff --git a/qubesadmin/tests/tools/qvm_tags.py b/qubesadmin/tests/tools/qvm_tags.py new file mode 100644 index 0000000..79ed1e3 --- /dev/null +++ b/qubesadmin/tests/tools/qvm_tags.py @@ -0,0 +1,141 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +import qubesadmin.tests +import qubesadmin.tests.tools +import qubesadmin.tools.qvm_tags + + +class TC_00_qvm_tags(qubesadmin.tests.QubesTestCase): + def test_000_list(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00tag1\ntag2\n' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm'], app=self.app), + 0) + self.assertEqual(stdout.getvalue(), + 'tag1\n' + 'tag2\n') + self.assertAllCalled() + + def test_001_list_action(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00tag1\ntag2\n' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'list'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), + 'tag1\n' + 'tag2\n') + self.assertAllCalled() + + def test_002_list_alias(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00tag1\ntag2\n' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'ls'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), + 'tag1\n' + 'tag2\n') + self.assertAllCalled() + + def test_003_list_check(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x001' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'ls', 'tag1'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), 'tag1\n') + self.assertAllCalled() + + def test_004_list_check_missing(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x000' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'ls', 'tag1'], + app=self.app), 1) + self.assertEqual(stdout.getvalue(), '') + self.assertAllCalled() + + def test_005_list_empty(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'list'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), '') + self.assertAllCalled() + + def test_010_add(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\x00' + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'add', 'tag3'], + app=self.app), + 0) + self.assertAllCalled() + + def test_020_del(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Remove', 'tag3', None)] = b'0\x00' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'del', 'tag3'], + app=self.app), + 0) + self.assertEqual(stdout.getvalue(), '') + self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_clone.py b/qubesadmin/tools/qvm_clone.py index fb4cb4e..7623ec0 100644 --- a/qubesadmin/tools/qvm_clone.py +++ b/qubesadmin/tools/qvm_clone.py @@ -32,6 +32,10 @@ parser.add_argument('new_name', action='store', help='name of the domain to create') +parser.add_argument('--class', '-C', dest='cls', + default=None, + help='specify the class of the new domain (default: same as source)') + group = parser.add_mutually_exclusive_group() group.add_argument('-P', metavar='POOL', @@ -66,7 +70,7 @@ def main(args=None, app=None): parser.error( 'Pool argument must be of form: -P volume_name=pool_name') - app.clone_vm(src_vm, new_name, pool=pool, pools=pools) + app.clone_vm(src_vm, new_name, new_cls=args.cls, pool=pool, pools=pools) if __name__ == '__main__': sys.exit(main()) diff --git a/qubesadmin/tools/qvm_tags.py b/qubesadmin/tools/qvm_tags.py new file mode 100644 index 0000000..4e9578f --- /dev/null +++ b/qubesadmin/tools/qvm_tags.py @@ -0,0 +1,108 @@ +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2010-2016 Joanna Rutkowska +# Copyright (C) 2016 Wojtek Porczyk +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +'''qvm-features - Manage domain's tags''' + +from __future__ import print_function + +import sys + +import qubesadmin +import qubesadmin.tools + +def mode_query(args): + '''Query/list tags''' + if not hasattr(args, 'tag') or args.tag is None: + # list + tags = list(sorted(args.domains[0].tags)) + if tags: + print('\n'.join(tags)) + else: + # real query + if args.tag not in args.domains[0].tags: + return 1 + print(args.tag) + return 0 + + +def mode_add(args): + '''Add tag''' + for tag in args.tag: + args.domains[0].tags.add(tag) + return 0 + + +def mode_del(args): + '''Delete tag''' + for tag in args.tag: + args.domains[0].tags.discard(tag) + return 0 + + +def get_parser(): + ''' Return qvm-tags tool command line parser ''' + parser = qubesadmin.tools.QubesArgumentParser( + vmname_nargs=1, + description='manage domain\'s tags') + parser.register('action', 'parsers', + qubesadmin.tools.AliasedSubParsersAction) + + sub_parsers = parser.add_subparsers( + title='commands', + description="For more information see qvm-tags command -h", + dest='command') + + list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'), + help='list tags') + list_parser.add_argument('tag', nargs='?', + action='store', default=None) + list_parser.set_defaults(func=mode_query) + + add_parser = sub_parsers.add_parser('add', aliases=('a', 'set'), + help='add tag') + add_parser.add_argument('tag', nargs='+', + action='store') + add_parser.set_defaults(func=mode_add) + + del_parser = sub_parsers.add_parser('del', aliases=('d', 'unset', 'u'), + help='add tag') + del_parser.add_argument('tag', nargs=1, + action='store') + del_parser.set_defaults(func=mode_del) + + parser.set_defaults(func=mode_query) + return parser + + +def main(args=None, app=None): + '''Main routine of :program:`qvm-tags`. + + :param list args: Optional arguments to override those delivered from \ + command line. + ''' + + parser = get_parser() + args = parser.parse_args(args, app=app) + return args.func(args) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/qubesadmin/vm/__init__.py b/qubesadmin/vm/__init__.py index 26f9554..605bdae 100644 --- a/qubesadmin/vm/__init__.py +++ b/qubesadmin/vm/__init__.py @@ -27,6 +27,7 @@ import qubesadmin.storage import qubesadmin.features import qubesadmin.devices import qubesadmin.firewall +import qubesadmin.tags class QubesVM(qubesadmin.base.PropertyHolder): @@ -34,6 +35,8 @@ class QubesVM(qubesadmin.base.PropertyHolder): log = None + tags = None + features = None devices = None @@ -44,6 +47,7 @@ class QubesVM(qubesadmin.base.PropertyHolder): super(QubesVM, self).__init__(app, 'admin.vm.property.', name) self._volumes = None self.log = logging.getLogger(name) + self.tags = qubesadmin.tags.Tags(self) self.features = qubesadmin.features.Features(self) self.devices = qubesadmin.devices.DeviceManager(self) self.firewall = qubesadmin.firewall.Firewall(self)