From 31988a9bd8386c277871d134f6c693d0b8786868 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 01:55:49 +0200 Subject: [PATCH 1/8] tags support QubesOS/qubes-issues#2622 --- qubesadmin/exc.py | 4 ++ qubesadmin/tags.py | 66 +++++++++++++++++++++++ qubesadmin/tests/tags.py | 110 ++++++++++++++++++++++++++++++++++++++ qubesadmin/vm/__init__.py | 4 ++ 4 files changed, 184 insertions(+) create mode 100644 qubesadmin/tags.py create mode 100644 qubesadmin/tests/tags.py diff --git a/qubesadmin/exc.py b/qubesadmin/exc.py index 9e60cc6..54fb224 100644 --- a/qubesadmin/exc.py +++ b/qubesadmin/exc.py @@ -116,6 +116,10 @@ class QubesFeatureNotFoundError(QubesException, KeyError): '''Feature not set for a given domain''' +class QubesTagNotFoundError(QubesException, KeyError): + '''Tag not set for a given domain''' + + class StoragePoolException(QubesException): ''' A general storage exception ''' diff --git a/qubesadmin/tags.py b/qubesadmin/tags.py new file mode 100644 index 0000000..37c09e9 --- /dev/null +++ b/qubesadmin/tags.py @@ -0,0 +1,66 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +'''VM tags interface''' + + +class Tags(object): + '''Manager of the tags. + + Tags are simple: tag either can be present on qube or not. Tag is a + simple string consisting of ASCII alphanumeric characters, plus `_` and + `-`. + ''' + # pylint: disable=too-few-public-methods + + def __init__(self, vm): + super(Tags, self).__init__() + self.vm = vm + + def remove(self, elem): + '''Remove a tag''' + self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Remove', elem) + + def add(self, elem): + '''Add a tag''' + self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Set', elem) + + def update(self, *others): + '''Add tags from iterable(s)''' + for other in others: + for elem in other: + self.add(elem) + + def discard(self, elem): + '''Remove a tag if present''' + try: + self.remove(elem) + except KeyError: + pass + + def __iter__(self): + qubesd_response = self.vm.qubesd_call(self.vm.name, + 'admin.vm.tag.List') + return iter(qubesd_response.decode('utf-8').splitlines()) + + def __contains__(self, elem): + '''Does the VM have a tag''' + response = self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Get', elem) + return response == b'1' diff --git a/qubesadmin/tests/tags.py b/qubesadmin/tests/tags.py new file mode 100644 index 0000000..00893f9 --- /dev/null +++ b/qubesadmin/tests/tags.py @@ -0,0 +1,110 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +import qubesadmin.tests +import qubesadmin.tags + +class TC_00_Tags(qubesadmin.tests.QubesTestCase): + def setUp(self): + super(TC_00_Tags, self).setUp() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\0test-vm class=AppVM state=Running\n' \ + b'test-vm2 class=AppVM state=Running\n' \ + b'test-vm3 class=AppVM state=Running\n' + self.vm = self.app.domains['test-vm'] + self.tags = qubesadmin.tags.Tags(self.vm) + + def test_000_list(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\0tag1\ntag2\n' + self.assertEqual(sorted(self.tags), + ['tag1', 'tag2']) + self.assertAllCalled() + + def test_010_get(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x001' + self.assertIn('tag1', self.tags) + self.assertAllCalled() + + def test_011_get_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x000' + self.assertNotIn('tag1', self.tags) + self.assertAllCalled() + + def test_020_set(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.tags.add('tag1') + self.assertAllCalled() + + def test_030_update(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag2', None)] = b'0\0' + self.tags.update(['tag1', 'tag2']) + self.assertAllCalled() + + def test_031_update_from_other(self): + self.app.expected_calls[ + ('test-vm2', 'admin.vm.tag.List', None, None)] = \ + b'0\0tag3\ntag4\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\0' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag4', None)] = b'0\0' + self.tags.update(self.app.domains['test-vm2'].tags) + self.assertAllCalled() + + def test_040_remove(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'0\0' + self.tags.remove('tag1') + self.assertAllCalled() + + def test_040_remove_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \ + b'tag1\0' + with self.assertRaises(KeyError): + self.tags.remove('tag1') + self.assertAllCalled() + + def test_050_discard(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'0\0' + self.tags.discard('tag1') + self.assertAllCalled() + + def test_051_discard_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \ + b'tag1\0' + self.tags.discard('tag1') + self.assertAllCalled() diff --git a/qubesadmin/vm/__init__.py b/qubesadmin/vm/__init__.py index 26f9554..605bdae 100644 --- a/qubesadmin/vm/__init__.py +++ b/qubesadmin/vm/__init__.py @@ -27,6 +27,7 @@ import qubesadmin.storage import qubesadmin.features import qubesadmin.devices import qubesadmin.firewall +import qubesadmin.tags class QubesVM(qubesadmin.base.PropertyHolder): @@ -34,6 +35,8 @@ class QubesVM(qubesadmin.base.PropertyHolder): log = None + tags = None + features = None devices = None @@ -44,6 +47,7 @@ class QubesVM(qubesadmin.base.PropertyHolder): super(QubesVM, self).__init__(app, 'admin.vm.property.', name) self._volumes = None self.log = logging.getLogger(name) + self.tags = qubesadmin.tags.Tags(self) self.features = qubesadmin.features.Features(self) self.devices = qubesadmin.devices.DeviceManager(self) self.firewall = qubesadmin.firewall.Firewall(self) From 8e5f90c27316178c93f03a389cce22d3d369d6b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 02:47:07 +0200 Subject: [PATCH 2/8] tools: add qvm-tags tool QubesOS/qubes-issues#2388 --- doc/manpages/qvm-tags.rst | 40 +++++--- qubesadmin/tests/tools/qvm_tags.py | 141 +++++++++++++++++++++++++++++ qubesadmin/tools/qvm_tags.py | 108 ++++++++++++++++++++++ 3 files changed, 278 insertions(+), 11 deletions(-) create mode 100644 qubesadmin/tests/tools/qvm_tags.py create mode 100644 qubesadmin/tools/qvm_tags.py diff --git a/doc/manpages/qvm-tags.rst b/doc/manpages/qvm-tags.rst index 3f23e2e..22dd6ba 100644 --- a/doc/manpages/qvm-tags.rst +++ b/doc/manpages/qvm-tags.rst @@ -15,7 +15,10 @@ Synopsis -------- -:command:`qvm-tags` [-h] [--verbose] [--quiet] [--query | --set | --unset] *VMNAME* [*TAG*] +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {list,ls,l} [*TAG*] +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {add,a,set} *TAG* ... +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {del,d,unset,u} *TAG* ... + Options ------- @@ -32,22 +35,37 @@ Options Decrease verbosity. -.. option:: --query +Commands +-------- - Query for the tag. Exit with zero (true) if the qube in question has the tag - and with non-zero (false) if it does not. If no tag specified, list all the - tags. +list +^^^^ - This is the default mode. +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* list [*TAG*] -.. option:: --set, -s +List tags. If tag name is given, check if this tag is set for the VM and signal +this with exit code (0 - tag is set, 1 - it is not). - Set the tag. The tag argument is mandatory. If tag is already set, do - nothing. +aliases: ls, l -.. option:: --delete, --unset, -D +add +^^^ + +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* add *TAG* [*TAG* ...] + +Add tag(s) to a VM. If tag is already set for given VM, do nothing. + +aliases: a, set + +del +^^^ + +| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* del *TAG* [*TAG* ...] + +Delete tag(s) from a VM. If tag is not set for given VM, do nothing. + +aliases: d, unset, u - Unset the tag. The tag argument is mandatory. If tag is not set, do nothing. Authors ------- diff --git a/qubesadmin/tests/tools/qvm_tags.py b/qubesadmin/tests/tools/qvm_tags.py new file mode 100644 index 0000000..79ed1e3 --- /dev/null +++ b/qubesadmin/tests/tools/qvm_tags.py @@ -0,0 +1,141 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +import qubesadmin.tests +import qubesadmin.tests.tools +import qubesadmin.tools.qvm_tags + + +class TC_00_qvm_tags(qubesadmin.tests.QubesTestCase): + def test_000_list(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00tag1\ntag2\n' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm'], app=self.app), + 0) + self.assertEqual(stdout.getvalue(), + 'tag1\n' + 'tag2\n') + self.assertAllCalled() + + def test_001_list_action(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00tag1\ntag2\n' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'list'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), + 'tag1\n' + 'tag2\n') + self.assertAllCalled() + + def test_002_list_alias(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00tag1\ntag2\n' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'ls'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), + 'tag1\n' + 'tag2\n') + self.assertAllCalled() + + def test_003_list_check(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x001' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'ls', 'tag1'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), 'tag1\n') + self.assertAllCalled() + + def test_004_list_check_missing(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x000' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'ls', 'tag1'], + app=self.app), 1) + self.assertEqual(stdout.getvalue(), '') + self.assertAllCalled() + + def test_005_list_empty(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\x00' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'list'], + app=self.app), 0) + self.assertEqual(stdout.getvalue(), '') + self.assertAllCalled() + + def test_010_add(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\x00' + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'add', 'tag3'], + app=self.app), + 0) + self.assertAllCalled() + + def test_020_del(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm', 'admin.vm.tag.Remove', 'tag3', None)] = b'0\x00' + with qubesadmin.tests.tools.StdoutBuffer() as stdout: + self.assertEqual( + qubesadmin.tools.qvm_tags.main(['some-vm', 'del', 'tag3'], + app=self.app), + 0) + self.assertEqual(stdout.getvalue(), '') + self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_tags.py b/qubesadmin/tools/qvm_tags.py new file mode 100644 index 0000000..4e9578f --- /dev/null +++ b/qubesadmin/tools/qvm_tags.py @@ -0,0 +1,108 @@ +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2010-2016 Joanna Rutkowska +# Copyright (C) 2016 Wojtek Porczyk +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +'''qvm-features - Manage domain's tags''' + +from __future__ import print_function + +import sys + +import qubesadmin +import qubesadmin.tools + +def mode_query(args): + '''Query/list tags''' + if not hasattr(args, 'tag') or args.tag is None: + # list + tags = list(sorted(args.domains[0].tags)) + if tags: + print('\n'.join(tags)) + else: + # real query + if args.tag not in args.domains[0].tags: + return 1 + print(args.tag) + return 0 + + +def mode_add(args): + '''Add tag''' + for tag in args.tag: + args.domains[0].tags.add(tag) + return 0 + + +def mode_del(args): + '''Delete tag''' + for tag in args.tag: + args.domains[0].tags.discard(tag) + return 0 + + +def get_parser(): + ''' Return qvm-tags tool command line parser ''' + parser = qubesadmin.tools.QubesArgumentParser( + vmname_nargs=1, + description='manage domain\'s tags') + parser.register('action', 'parsers', + qubesadmin.tools.AliasedSubParsersAction) + + sub_parsers = parser.add_subparsers( + title='commands', + description="For more information see qvm-tags command -h", + dest='command') + + list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'), + help='list tags') + list_parser.add_argument('tag', nargs='?', + action='store', default=None) + list_parser.set_defaults(func=mode_query) + + add_parser = sub_parsers.add_parser('add', aliases=('a', 'set'), + help='add tag') + add_parser.add_argument('tag', nargs='+', + action='store') + add_parser.set_defaults(func=mode_add) + + del_parser = sub_parsers.add_parser('del', aliases=('d', 'unset', 'u'), + help='add tag') + del_parser.add_argument('tag', nargs=1, + action='store') + del_parser.set_defaults(func=mode_del) + + parser.set_defaults(func=mode_query) + return parser + + +def main(args=None, app=None): + '''Main routine of :program:`qvm-tags`. + + :param list args: Optional arguments to override those delivered from \ + command line. + ''' + + parser = get_parser() + args = parser.parse_args(args, app=app) + return args.func(args) + + +if __name__ == '__main__': + sys.exit(main()) From 90141a1befc80e02bc8df301ac2145976dffa1fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 02:48:31 +0200 Subject: [PATCH 3/8] doc: fix skel-manpage tool --- doc/skel-manpage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/skel-manpage.py b/doc/skel-manpage.py index 8bb75ad..14ddfa7 100755 --- a/doc/skel-manpage.py +++ b/doc/skel-manpage.py @@ -5,7 +5,7 @@ import sys sys.path.insert(0, os.path.abspath('../')) import argparse -import qubesadmin.dochelpers +import qubesadmin.tools.dochelpers parser = argparse.ArgumentParser(description='prepare new manpage for command') parser.add_argument('command', metavar='COMMAND', @@ -14,7 +14,7 @@ parser.add_argument('command', metavar='COMMAND', def main(): args = parser.parse_args() - sys.stdout.write(qubesadmin.dochelpers.prepare_manpage(args.command)) + sys.stdout.write(qubesadmin.tools.dochelpers.prepare_manpage(args.command)) if __name__ == '__main__': main() From 998a42703f379da5df9fb6588330a4c2e61e6ca3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 03:02:43 +0200 Subject: [PATCH 4/8] storage: add volume clone method Clone volume without retrieving all the data. QubesOS/qubes-issues#2622 --- qubesadmin/storage.py | 25 +++++++++++++++++++++++ qubesadmin/tests/storage.py | 40 ++++++++++++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/qubesadmin/storage.py b/qubesadmin/storage.py index 21614a9..fb32fd2 100644 --- a/qubesadmin/storage.py +++ b/qubesadmin/storage.py @@ -93,6 +93,11 @@ class Volume(object): return self.pool == other.pool and self.vid == other.vid return NotImplemented + @property + def name(self): + '''per-VM volume name, if available''' + return self._vm_name + @property def pool(self): '''Storage volume pool name.''' @@ -195,6 +200,26 @@ class Volume(object): ''' self._qubesd_call('Import', payload_stream=stream) + def clone(self, source): + ''' Clone data from sane volume of another VM. + + This function override existing volume content. + This operation is implemented for VM volumes - those in vm.volumes + collection (not pool.volumes). + + :param source: source volume object + ''' + + # pylint: disable=protected-access + if source._vm_name is None or self._vm_name is None: + raise NotImplementedError( + 'Operation implemented only for VM volumes') + if source._vm_name != self._vm_name: + raise ValueError('Source and target volume must have the same type') + + # this call is to *source* volume, because we extract data from there + source._qubesd_call('Clone', payload=str(self._vm).encode()) + class Pool(object): ''' A Pool is used to manage different kind of volumes (File diff --git a/qubesadmin/tests/storage.py b/qubesadmin/tests/storage.py index b0cee76..76d3134 100644 --- a/qubesadmin/tests/storage.py +++ b/qubesadmin/tests/storage.py @@ -161,6 +161,30 @@ class TestVMVolume(qubesadmin.tests.QubesTestCase): input_proc.stdout.close() self.assertAllCalled() + def test_050_clone(self): + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.Clone', 'volname', b'test-vm')] = \ + b'0\x00' + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00source-vm class=AppVM state=Halted\n' + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.List', None, None)] = \ + b'0\x00volname\nother\n' + self.vol.clone(self.app.domains['source-vm'].volumes['volname']) + self.assertAllCalled() + + def test_050_clone_wrong_volume(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00source-vm class=AppVM state=Halted\n' + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.List', None, None)] = \ + b'0\x00volname\nother\n' + with self.assertRaises(ValueError): + self.vol.clone(self.app.domains['source-vm'].volumes['other']) + self.assertAllCalled() + class TestPoolVolume(TestVMVolume): def setUp(self): @@ -245,7 +269,21 @@ class TestPoolVolume(TestVMVolume): self.assertAllCalled() def test_040_import_data(self): - self.skipTest('admin.pool.vm.Import not supported') + self.skipTest('admin.pool.volume.Import not supported') + + def test_050_clone(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00source-vm class=AppVM state=Halted\n' + self.app.expected_calls[ + ('source-vm', 'admin.vm.volume.List', None, None)] = \ + b'0\x00volname\nother\n' + with self.assertRaises(NotImplementedError): + self.vol.clone(self.app.domains['source-vm'].volumes['volname']) + self.assertAllCalled() + + def test_050_clone_wrong_volume(self): + self.skipTest('admin.pool.volume.Clone not supported') class TestPool(qubesadmin.tests.QubesTestCase): From 12fd4eb035c6d8730d3cf4a5ba70fff765381eea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 23:57:36 +0200 Subject: [PATCH 5/8] doc: minor fixes to man pages --- doc/manpages/qvm-device.rst | 4 ++-- doc/manpages/qvm-volume.rst | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/manpages/qvm-device.rst b/doc/manpages/qvm-device.rst index 8104fc4..88869f8 100644 --- a/doc/manpages/qvm-device.rst +++ b/doc/manpages/qvm-device.rst @@ -49,7 +49,7 @@ aliases: ls, l attach ^^^^^^ -| :command:`qvm-volume attach` [-h] [--verbose] [--quiet] [--ro] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* +| :command:`qvm-device` *DEVICE_CLASS* attach [-h] [--verbose] [--quiet] [--ro] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* Attach the device with *DEVICE_ID* from *BACKEND_DOMAIN* to the domain *VMNAME* @@ -67,7 +67,7 @@ aliases: a, at detach ^^^^^^ -| :command:`qvm-volume detach` [-h] [--verbose] [--quiet] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* +| :command:`qvm-device` *DEVICE_CLASS* detach [-h] [--verbose] [--quiet] *VMNAME* *BACKEND_DOMAIN:DEVICE_ID* Detach the device with *BACKEND_DOMAIN:DEVICE_ID* from domain *VMNAME* diff --git a/doc/manpages/qvm-volume.rst b/doc/manpages/qvm-volume.rst index 04eda1b..7ec474a 100644 --- a/doc/manpages/qvm-volume.rst +++ b/doc/manpages/qvm-volume.rst @@ -1,7 +1,7 @@ .. program:: qvm-volume :program:`qvm-volume` -- Qubes volume and block device managment -=============================================================== +================================================================ Synopsis -------- From e94bdca2065ad7957574dfbb6f54972986ecb475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 23:59:49 +0200 Subject: [PATCH 6/8] base: add PropertyHolder.clone_properties Be compatible with core-admin --- qubesadmin/base.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/qubesadmin/base.py b/qubesadmin/base.py index 2a009b6..cb9d64a 100644 --- a/qubesadmin/base.py +++ b/qubesadmin/base.py @@ -154,6 +154,23 @@ class PropertyHolder(object): assert isinstance(is_default, bool) return is_default + def clone_properties(self, src, proplist=None): + '''Clone properties from other object. + + :param PropertyHolder src: source object + :param list proplist: list of properties \ + (:py:obj:`None` or omit for all properties) + ''' + + if proplist is None: + proplist = self.property_list() + + for prop in proplist: + try: + setattr(self, prop, getattr(src, prop)) + except AttributeError: + continue + def __getattr__(self, item): # pylint: disable=too-many-return-statements if item.startswith('_'): From a5a459840a91689c5b388105efb66be94c106f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Tue, 20 Jun 2017 00:00:35 +0200 Subject: [PATCH 7/8] storage: make Volumes sortable But do not fetch any additional info just for this purpose. --- qubesadmin/storage.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/qubesadmin/storage.py b/qubesadmin/storage.py index fb32fd2..5a72719 100644 --- a/qubesadmin/storage.py +++ b/qubesadmin/storage.py @@ -93,6 +93,16 @@ class Volume(object): return self.pool == other.pool and self.vid == other.vid return NotImplemented + def __lt__(self, other): + # pylint: disable=protected-access + if isinstance(other, Volume): + if self._vm and other._vm: + return (self._vm, self._vm_name) < (other._vm, other._vm_name) + elif self._vid and other._vid: + return (self._pool, self._vid) < (other._pool, other._vid) + return NotImplemented + + @property def name(self): '''per-VM volume name, if available''' From bcd026d141efa8cdc1a4dc4fb1c0395273faa3f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Tue, 20 Jun 2017 00:02:20 +0200 Subject: [PATCH 8/8] Implement VM clone as create + copy data+metadata This way we don't need separate admin.vm.Clone call, which is tricky to handler properly with policy. A VM may not have access to all the properties and other metadata, so add ignore_errors argument, for best-effort approach (copy what is possible). In any case, failure of cloning VM data fails the whole operation. When operation fails, VM is removed. While at it, allow to specify alternative VM class - this allows morphing one VM into another (for example AppVM -> StandaloneVM). Adjust qvm-clone tool and tests accordingly. QubesOS/qubes-issues#2622 --- doc/manpages/qvm-clone.rst | 8 +- qubesadmin/app.py | 104 +++++++-- qubesadmin/tests/app.py | 345 +++++++++++++++++++++++++++- qubesadmin/tests/tools/qvm_clone.py | 32 ++- qubesadmin/tools/qvm_clone.py | 6 +- 5 files changed, 460 insertions(+), 35 deletions(-) diff --git a/doc/manpages/qvm-clone.rst b/doc/manpages/qvm-clone.rst index 1fed0b9..f77e4ce 100644 --- a/doc/manpages/qvm-clone.rst +++ b/doc/manpages/qvm-clone.rst @@ -14,7 +14,7 @@ Synopsis -------- -:command:`qvm-clone` [-h] [--verbose] [--quiet] [-p *POOL:VOLUME* | -P POOL] *VMNAME* *NEWVM* +:command:`qvm-clone` [*options*] *VMNAME* *NEWVM* Options ------- @@ -23,6 +23,12 @@ Options Show this help message and exit +.. option:: --class=CLASS, -C CLASS + + Create VM of different class than source VM. The tool will try to copy as + much as possible data/metadata from source VM, but some information may be + impossible to preserve (for example target VM have no matching properties). + .. option:: -P POOL Pool to use for the new domain. All volumes besides snapshots volumes are diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 860df56..9eec86b 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -270,7 +270,8 @@ class QubesBase(qubesadmin.base.PropertyHolder): self.domains.clear_cache() return self.domains[name] - def clone_vm(self, src_vm, new_name, pool=None, pools=None): + def clone_vm(self, src_vm, new_name, new_cls=None, + pool=None, pools=None, ignore_errors=False): '''Clone Virtual Machine Example usage with custom storage pools: @@ -281,35 +282,110 @@ class QubesBase(qubesadmin.base.PropertyHolder): >>> vm = app.clone_vm(src_vm, 'my-new-vm', pools=pools) >>> vm.label = app.labels['green'] - :param str cls: name of VM class (`AppVM`, `TemplateVM` etc) - :param str name: name of VM - :param str label: label color for new VM - :param str template: template to use (if apply for given VM class), - can be also VM object; use None for default value + :param QubesVM or str src_vm: source VM + :param str new_name: name of new VM + :param str new_cls: name of VM class (`AppVM`, `TemplateVM` etc) - use + None to copy it from *src_vm* :param str pool: storage pool to use instead of default one :param dict pools: storage pool for specific volumes + :param bool ignore_errors: should errors on meta-data setting be only + logged, or abort the whole operation? :return new VM object ''' if pool and pools: raise ValueError('only one of pool= and pools= can be used') - if not isinstance(src_vm, str): - src_vm = str(src_vm) + if isinstance(src_vm, str): + src_vm = self.domains[src_vm] - method = 'admin.vm.Clone' - payload = 'name={}'.format(new_name) + if new_cls is None: + new_cls = src_vm.__class__.__name__ + + template = getattr(src_vm, 'template', None) + if template is not None: + template = str(template) + + label = src_vm.label + + method_prefix = 'admin.vm.Create.' + payload = 'name={} label={}'.format(new_name, label) if pool: payload += ' pool={}'.format(str(pool)) - method = 'admin.vm.CloneInPool' + method_prefix = 'admin.vm.CreateInPool.' if pools: payload += ''.join(' pool:{}={}'.format(vol, str(pool)) for vol, pool in sorted(pools.items())) - method = 'admin.vm.CloneInPool' + method_prefix = 'admin.vm.CreateInPool.' - self.qubesd_call(src_vm, method, None, payload.encode('utf-8')) + self.qubesd_call('dom0', method_prefix + new_cls, template, + payload.encode('utf-8')) - return self.domains[new_name] + self.domains.clear_cache() + dst_vm = self.domains[new_name] + try: + assert isinstance(dst_vm, qubesadmin.vm.QubesVM) + for prop in src_vm.property_list(): + # handled by admin.vm.Create call + if prop in ('name', 'qid', 'template', 'label'): + continue + if src_vm.property_is_default(prop): + continue + try: + setattr(dst_vm, prop, getattr(src_vm, prop)) + except AttributeError: + pass + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to set {!s} property: {!s}'.format(prop, e)) + if not ignore_errors: + raise + + for tag in src_vm.tags: + try: + dst_vm.tags.add(tag) + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to add {!s} tag: {!s}'.format(tag, e)) + if not ignore_errors: + raise + + for feature, value in src_vm.features.items(): + try: + dst_vm.features[feature] = value + except qubesadmin.exc.QubesException as e: + dst_vm.log.error( + 'Failed to set {!s} feature: {!s}'.format(feature, e)) + if not ignore_errors: + raise + + try: + dst_vm.firewall.policy = src_vm.firewall.policy + dst_vm.firewall.save_rules(src_vm.firewall.rules) + except qubesadmin.exc.QubesException as e: + self.log.error('Failed to set firewall: %s', e) + if not ignore_errors: + raise + + except qubesadmin.exc.QubesException: + if not ignore_errors: + del self.domains[dst_vm.name] + raise + + try: + for dst_volume in sorted(dst_vm.volumes.values()): + if not dst_volume.save_on_stop: + # clone only persistent volumes + continue + src_volume = src_vm.volumes[dst_volume.name] + dst_vm.log.info('Cloning {} volume'.format(dst_volume.name)) + dst_volume.clone(src_volume) + + except qubesadmin.exc.QubesException: + del self.domains[dst_vm.name] + raise + + return dst_vm def run_service(self, dest, service, filter_esc=False, user=None, localcmd=None, wait=True, **kwargs): diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 56ff6f4..f82b39f 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -32,6 +32,7 @@ except ImportError: import tempfile +import qubesadmin.exc import qubesadmin.tests @@ -154,48 +155,368 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): self.assertEqual(label.name, 'red') self.assertAllCalled() + def clone_setup_common_calls(self, src, dst): + # labels + self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \ + b'0\x00red\ngreen\nblue\n' + + # have each property type with default=no, each special-cased, + # and some with default=yes + properties = { + 'label': 'default=False type=label red', + 'template': 'default=False type=vm test-template', + 'memory': 'default=False type=int 400', + 'kernel': 'default=False type=str 4.9.31', + 'netvm': 'default=False type=vm test-net', + 'hvm': 'default=False type=bool True', + 'default_user': 'default=True type=str user', + } + self.app.expected_calls[ + (src, 'admin.vm.property.List', None, None)] = \ + b'0\0qid\nname\n' + \ + b'\n'.join(prop.encode() for prop in properties.keys()) + \ + b'\n' + for prop, value in properties.items(): + self.app.expected_calls[ + (src, 'admin.vm.property.Get', prop, None)] = \ + b'0\0' + value.encode() + # special cases handled by admin.vm.Create call + if prop in ('label', 'template'): + continue + # default properties should not be set + if 'default=True' in value: + continue + self.app.expected_calls[ + (dst, 'admin.vm.property.Set', prop, + value.split()[-1].encode())] = b'0\0' + + # tags + self.app.expected_calls[ + (src, 'admin.vm.tag.List', None, None)] = \ + b'0\0tag1\ntag2\n' + self.app.expected_calls[ + (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.app.expected_calls[ + (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0' + + # features + self.app.expected_calls[ + (src, 'admin.vm.feature.List', None, None)] = \ + b'0\0feat1\nfeat2\n' + self.app.expected_calls[ + (src, 'admin.vm.feature.Get', 'feat1', None)] = \ + b'0\0feat1-value with spaces' + self.app.expected_calls[ + (src, 'admin.vm.feature.Get', 'feat2', None)] = \ + b'0\x001' + self.app.expected_calls[ + (dst, 'admin.vm.feature.Set', 'feat1', + b'feat1-value with spaces')] = b'0\0' + self.app.expected_calls[ + (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0' + + # firewall + rules = ( + b'action=drop dst4=192.168.0.0/24\n' + b'action=accept\n' + ) + self.app.expected_calls[ + (src, 'admin.vm.firewall.GetPolicy', None, None)] = \ + b'0\x00accept' + self.app.expected_calls[ + (src, 'admin.vm.firewall.Get', None, None)] = \ + b'0\x00' + rules + self.app.expected_calls[ + (dst, 'admin.vm.firewall.SetPolicy', None, b'accept')] = \ + b'0\x00' + self.app.expected_calls[ + (dst, 'admin.vm.firewall.Set', None, rules)] = \ + b'0\x00' + + # storage + self.app.expected_calls[ + (dst, 'admin.vm.volume.List', None, None)] = \ + b'0\x00root\nprivate\nvolatile\nkernel\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'root', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/root\n' \ + b'size=10737418240\n' \ + b'usage=2147483648\n' \ + b'rw=False\n' \ + b'internal=True\n' \ + b'source=vm-test-template/root\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=True\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'private', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/private\n' \ + b'size=2147483648\n' \ + b'usage=214748364\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'save_on_stop=True\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'volatile', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-test-vm/volatile\n' \ + b'size=10737418240\n' \ + b'usage=0\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (dst, 'admin.vm.volume.Info', 'kernel', None)] = \ + b'0\x00pool=linux-kernel\n' \ + b'vid=\n' \ + b'size=0\n' \ + b'usage=0\n' \ + b'rw=False\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=False\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + (src, 'admin.vm.volume.List', None, None)] = \ + b'0\x00root\nprivate\nvolatile\nkernel\n' + self.app.expected_calls[ + (src, 'admin.vm.volume.Clone', 'private', dst.encode())] = \ + b'0\x00' + def test_030_clone(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-name')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm('test-vm', 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_031_clone_object(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-name')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_032_clone_pool(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None, - b'name=new-name pool=some-pool')] = b'0\x00' + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', + 'test-template', + b'name=new-name label=red pool=some-pool')] = b'0\x00' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool') self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() def test_033_clone_pools(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None, - b'name=new-name pool:private=some-pool ' + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', + 'test-template', + b'name=new-name label=red pool:private=some-pool ' b'pool:volatile=other-pool')] = b'0\x00' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-name class=AppVM state=Halted\n' \ - b'test-vm class=AppVM state=Halted\n' + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' new_vm = self.app.clone_vm('test-vm', 'new-name', pools={'private': 'some-pool', 'volatile': 'other-pool'}) self.assertEqual(new_vm.name, 'new-name') self.assertAllCalled() + def test_034_clone_class_change(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=StandaloneVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] = \ + b'0\x00pool=lvm\n' \ + b'vid=vm-new-name/root\n' \ + b'size=10737418240\n' \ + b'usage=2147483648\n' \ + b'rw=True\n' \ + b'internal=True\n' \ + b'source=None\n' \ + b'save_on_stop=True\n' \ + b'snap_on_start=False\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'root', b'new-name')] = \ + b'0\x00' + new_vm = self.app.clone_vm('test-vm', 'new-name', + new_cls='StandaloneVM') + self.assertEqual(new_vm.name, 'new-name') + self.assertEqual(new_vm.__class__.__name__, 'StandaloneVM') + self.assertAllCalled() + + def test_035_clone_fail(self): + self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \ + b'0\x00red\ngreen\nblue\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.List', None, None)] = \ + b'0\0qid\nname\ntemplate\nlabel\nmemory\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'label', None)] = \ + b'0\0default=False type=label red' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ + b'0\0default=False type=vm test-template' + self.app.expected_calls[ + ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \ + b'0\0default=False type=int 400' + self.app.expected_calls[ + ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name') + self.assertAllCalled() + + def test_036_clone_ignore_errors_prop(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_037_clone_ignore_errors_feature(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_038_clone_ignore_errors_tag(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \ + b'2\0QubesException\0\0something happened\0' + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_039_clone_ignore_errors_firewall(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('new-name', 'admin.vm.firewall.SetPolicy', None, b'accept')] = \ + b'2\0QubesException\0\0something happened\0' + del self.app.expected_calls[ + ('test-vm', 'admin.vm.firewall.Get', None, None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.firewall.Set', None, + b'action=drop dst4=192.168.0.0/24\naction=accept\n')] + new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertEqual(new_vm.name, 'new-name') + self.assertAllCalled() + + def test_040_clone_ignore_errors_storage(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'volatile', None)] + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name', ignore_errors=True) + self.assertAllCalled() + + def test_041_clone_fail_storage(self): + self.clone_setup_common_calls('test-vm', 'new-name') + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' \ + b'test-template class=TemplateVM state=Halted\n' \ + b'test-net class=AppVM state=Halted\n' + self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', + 'test-template', b'name=new-name label=red')] = b'0\x00' + self.app.expected_calls[ + ('test-vm', 'admin.vm.volume.Clone', 'private', b'new-name')] = \ + b'2\0QubesException\0\0something happened\0' + self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'root', None)] + del self.app.expected_calls[ + ('new-name', 'admin.vm.volume.Info', 'volatile', None)] + with self.assertRaises(qubesadmin.exc.QubesException): + self.app.clone_vm('test-vm', 'new-name') + self.assertAllCalled() + + class TC_20_QubesLocal(unittest.TestCase): def setUp(self): diff --git a/qubesadmin/tests/tools/qvm_clone.py b/qubesadmin/tests/tools/qvm_clone.py index b4c33a1..f8ca372 100644 --- a/qubesadmin/tests/tools/qvm_clone.py +++ b/qubesadmin/tests/tools/qvm_clone.py @@ -20,43 +20,61 @@ import qubesadmin.tests import qubesadmin.tests.tools import qubesadmin.tools.qvm_clone +from unittest import mock class TC_00_qvm_clone(qubesadmin.tests.QubesTestCase): def test_000_simple(self): - self.app.expected_calls[('test-vm', 'admin.vm.Clone', None, - b'name=new-vm')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool=None, pools={}) self.assertAllCalled() def test_001_missing_vm(self): + self.app.clone_vm = mock.Mock() with self.assertRaises(SystemExit): with qubesadmin.tests.tools.StderrBuffer() as stderr: qubesadmin.tools.qvm_clone.main(['test-vm'], app=self.app) self.assertIn('NAME', stderr.getvalue()) + self.assertFalse(self.app.clone_vm.called) self.assertAllCalled() def test_004_pool(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', - None, b'name=new-vm pool=some-pool')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['-P', 'some-pool', 'test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool='some-pool', pools={}) self.assertAllCalled() def test_005_pools(self): - self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', - None, b'name=new-vm pool:private=some-pool ' - b'pool:volatile=other-pool')] = b'0\x00' + self.app.clone_vm = mock.Mock() self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00new-vm class=AppVM state=Halted\n' \ b'test-vm class=AppVM state=Halted\n' qubesadmin.tools.qvm_clone.main(['--pool', 'private=some-pool', '--pool', 'volatile=other-pool', 'test-vm', 'new-vm'], app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls=None, pool=None, pools={'private': 'some-pool', + 'volatile': 'other-pool'}) + self.assertAllCalled() + + def test_006_new_cls(self): + self.app.clone_vm = mock.Mock() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-vm class=AppVM state=Halted\n' \ + b'test-vm class=AppVM state=Halted\n' + qubesadmin.tools.qvm_clone.main(['--class', 'StandaloneVM', + 'test-vm', 'new-vm'], + app=self.app) + self.app.clone_vm.assert_called_with(self.app.domains['test-vm'], + 'new-vm', new_cls='StandaloneVM', pool=None, pools={}) self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_clone.py b/qubesadmin/tools/qvm_clone.py index fb4cb4e..7623ec0 100644 --- a/qubesadmin/tools/qvm_clone.py +++ b/qubesadmin/tools/qvm_clone.py @@ -32,6 +32,10 @@ parser.add_argument('new_name', action='store', help='name of the domain to create') +parser.add_argument('--class', '-C', dest='cls', + default=None, + help='specify the class of the new domain (default: same as source)') + group = parser.add_mutually_exclusive_group() group.add_argument('-P', metavar='POOL', @@ -66,7 +70,7 @@ def main(args=None, app=None): parser.error( 'Pool argument must be of form: -P volume_name=pool_name') - app.clone_vm(src_vm, new_name, pool=pool, pools=pools) + app.clone_vm(src_vm, new_name, new_cls=args.cls, pool=pool, pools=pools) if __name__ == '__main__': sys.exit(main())