Browse Source

tools: add qvm-tags tool

QubesOS/qubes-issues#2388
Marek Marczykowski-Górecki 6 years ago
parent
commit
8e5f90c273
3 changed files with 278 additions and 11 deletions
  1. 29 11
      doc/manpages/qvm-tags.rst
  2. 141 0
      qubesadmin/tests/tools/qvm_tags.py
  3. 108 0
      qubesadmin/tools/qvm_tags.py

+ 29 - 11
doc/manpages/qvm-tags.rst

@@ -15,7 +15,10 @@
 Synopsis
 --------
 
-:command:`qvm-tags` [-h] [--verbose] [--quiet] [--query | --set | --unset] *VMNAME* [*TAG*]
+| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {list,ls,l} [*TAG*]
+| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {add,a,set} *TAG* ...
+| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* {del,d,unset,u} *TAG* ...
+
 
 Options
 -------
@@ -32,22 +35,37 @@ Options
 
    Decrease verbosity.
 
-.. option:: --query
+Commands
+--------
+
+list
+^^^^
+
+| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* list [*TAG*]
+
+List tags. If tag name is given, check if this tag is set for the VM and signal
+this with exit code (0 - tag is set, 1 - it is not).
+
+aliases: ls, l
+
+add
+^^^
+
+| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* add *TAG* [*TAG* ...]
+
+Add tag(s) to a VM. If tag is already set for given VM, do nothing.
 
-   Query for the tag. Exit with zero (true) if the qube in question has the tag
-   and with non-zero (false) if it does not. If no tag specified, list all the
-   tags.
+aliases: a, set
 
-   This is the default mode.
+del
+^^^
 
-.. option:: --set, -s
+| :command:`qvm-tags` [-h] [--verbose] [--quiet] *VMNAME* del *TAG* [*TAG* ...]
 
-   Set the tag. The tag argument is mandatory. If tag is already set, do
-   nothing.
+Delete tag(s) from a VM. If tag is not set for given VM, do nothing.
 
-.. option:: --delete, --unset, -D
+aliases: d, unset, u
 
-   Unset the tag. The tag argument is mandatory. If tag is not set, do nothing.
 
 Authors
 -------

+ 141 - 0
qubesadmin/tests/tools/qvm_tags.py

@@ -0,0 +1,141 @@
+# -*- encoding: utf8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2017 Marek Marczykowski-Górecki
+#                               <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, see <http://www.gnu.org/licenses/>.
+
+import qubesadmin.tests
+import qubesadmin.tests.tools
+import qubesadmin.tools.qvm_tags
+
+
+class TC_00_qvm_tags(qubesadmin.tests.QubesTestCase):
+    def test_000_list(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.List', None, None)] = \
+            b'0\x00tag1\ntag2\n'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm'], app=self.app),
+                0)
+            self.assertEqual(stdout.getvalue(),
+                'tag1\n'
+                'tag2\n')
+        self.assertAllCalled()
+
+    def test_001_list_action(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.List', None, None)] = \
+            b'0\x00tag1\ntag2\n'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm', 'list'],
+                    app=self.app), 0)
+            self.assertEqual(stdout.getvalue(),
+                'tag1\n'
+                'tag2\n')
+        self.assertAllCalled()
+
+    def test_002_list_alias(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.List', None, None)] = \
+            b'0\x00tag1\ntag2\n'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm', 'ls'],
+                    app=self.app), 0)
+            self.assertEqual(stdout.getvalue(),
+                'tag1\n'
+                'tag2\n')
+        self.assertAllCalled()
+
+    def test_003_list_check(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.Get', 'tag1', None)] = \
+            b'0\x001'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm', 'ls', 'tag1'],
+                    app=self.app), 0)
+            self.assertEqual(stdout.getvalue(), 'tag1\n')
+        self.assertAllCalled()
+
+    def test_004_list_check_missing(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.Get', 'tag1', None)] = \
+            b'0\x000'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm', 'ls', 'tag1'],
+                    app=self.app), 1)
+            self.assertEqual(stdout.getvalue(), '')
+        self.assertAllCalled()
+
+    def test_005_list_empty(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.List', None, None)] = \
+            b'0\x00'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm', 'list'],
+                    app=self.app), 0)
+            self.assertEqual(stdout.getvalue(), '')
+        self.assertAllCalled()
+
+    def test_010_add(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\x00'
+        self.assertEqual(
+            qubesadmin.tools.qvm_tags.main(['some-vm', 'add', 'tag3'],
+                app=self.app),
+            0)
+        self.assertAllCalled()
+
+    def test_020_del(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm', 'admin.vm.tag.Remove', 'tag3', None)] = b'0\x00'
+        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+            self.assertEqual(
+                qubesadmin.tools.qvm_tags.main(['some-vm', 'del', 'tag3'],
+                    app=self.app),
+                0)
+            self.assertEqual(stdout.getvalue(), '')
+        self.assertAllCalled()

+ 108 - 0
qubesadmin/tools/qvm_tags.py

@@ -0,0 +1,108 @@
+#
+# The Qubes OS Project, https://www.qubes-os.org/
+#
+# Copyright (C) 2010-2016  Joanna Rutkowska <joanna@invisiblethingslab.com>
+# Copyright (C) 2016       Wojtek Porczyk <woju@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+
+'''qvm-features - Manage domain's tags'''
+
+from __future__ import print_function
+
+import sys
+
+import qubesadmin
+import qubesadmin.tools
+
+def mode_query(args):
+    '''Query/list tags'''
+    if not hasattr(args, 'tag') or args.tag is None:
+        # list
+        tags = list(sorted(args.domains[0].tags))
+        if tags:
+            print('\n'.join(tags))
+    else:
+        # real query
+        if args.tag not in args.domains[0].tags:
+            return 1
+        print(args.tag)
+    return 0
+
+
+def mode_add(args):
+    '''Add tag'''
+    for tag in args.tag:
+        args.domains[0].tags.add(tag)
+    return 0
+
+
+def mode_del(args):
+    '''Delete tag'''
+    for tag in args.tag:
+        args.domains[0].tags.discard(tag)
+    return 0
+
+
+def get_parser():
+    ''' Return qvm-tags tool command line parser '''
+    parser = qubesadmin.tools.QubesArgumentParser(
+        vmname_nargs=1,
+        description='manage domain\'s tags')
+    parser.register('action', 'parsers',
+        qubesadmin.tools.AliasedSubParsersAction)
+
+    sub_parsers = parser.add_subparsers(
+        title='commands',
+        description="For more information see qvm-tags command -h",
+        dest='command')
+
+    list_parser = sub_parsers.add_parser('list', aliases=('ls', 'l'),
+        help='list tags')
+    list_parser.add_argument('tag', nargs='?',
+        action='store', default=None)
+    list_parser.set_defaults(func=mode_query)
+
+    add_parser = sub_parsers.add_parser('add', aliases=('a', 'set'),
+        help='add tag')
+    add_parser.add_argument('tag', nargs='+',
+        action='store')
+    add_parser.set_defaults(func=mode_add)
+
+    del_parser = sub_parsers.add_parser('del', aliases=('d', 'unset', 'u'),
+        help='add tag')
+    del_parser.add_argument('tag', nargs=1,
+        action='store')
+    del_parser.set_defaults(func=mode_del)
+
+    parser.set_defaults(func=mode_query)
+    return parser
+
+
+def main(args=None, app=None):
+    '''Main routine of :program:`qvm-tags`.
+
+    :param list args: Optional arguments to override those delivered from \
+        command line.
+    '''
+
+    parser = get_parser()
+    args = parser.parse_args(args, app=app)
+    return args.func(args)
+
+
+if __name__ == '__main__':
+    sys.exit(main())