Browse Source

qvm-check: refactor check mechanism and add filter for checking netvm

Fix QubesOS/qubes-issues#3496
Frédéric Pierret (fepitre) 4 years ago
parent
commit
7d93377b78
3 changed files with 165 additions and 105 deletions
  1. 6 1
      doc/manpages/qvm-check.rst
  2. 91 72
      qubesadmin/tests/tools/qvm_check.py
  3. 68 32
      qubesadmin/tools/qvm_check.py

+ 6 - 1
doc/manpages/qvm-check.rst

@@ -15,7 +15,7 @@
 Synopsis
 --------
 
-:command:`qvm-check` [-h] [--verbose] [--quiet] [--all] [--exclude *EXCLUDE*] [--running] [--paused] [--template] [*VMNAME* [*VMNAME* ...]]
+:command:`qvm-check` [-h] [--verbose] [--quiet] [--all] [--exclude *EXCLUDE*] [--running] [--paused] [--template] [--networked] [*VMNAME* [*VMNAME* ...]]
 
 Options
 -------
@@ -52,6 +52,10 @@ Options
 
    Determine if (any of given) VM is a template
 
+.. option:: --networked
+
+   Determine if (any of given) VM can reach network
+
 Authors
 -------
 
@@ -59,5 +63,6 @@ Authors
 | Rafal Wojtczuk <rafal at invisiblethingslab dot com>
 | Marek Marczykowski <marmarek at invisiblethingslab dot com>
 | Wojtek Porczyk <woju at invisiblethingslab dot com>
+| Frédéric Pierret <frederic dot pierret at qubes dash os dot com>
 
 .. vim: ts=3 sw=3 et tw=80

+ 91 - 72
qubesadmin/tests/tools/qvm_check.py

@@ -21,14 +21,14 @@
 import qubesadmin.tests
 import qubesadmin.tools.qvm_check
 
+
 class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
     def test_000_exists(self):
         self.app.expected_calls[
             ('dom0', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm class=AppVM state=Running\n'
         self.assertEqual(
-            qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app),
-            0)
+            qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app), 0)
         self.assertAllCalled()
 
     def test_001_exists_multi(self):
@@ -38,20 +38,17 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
             b'other-vm class=AppVM state=Running\n'
         self.assertEqual(
             qubesadmin.tools.qvm_check.main(['some-vm', 'other-vm'],
-                app=self.app),
-            0)
+                                            app=self.app), 0)
         self.assertAllCalled()
 
     def test_002_exists_verbose(self):
         self.app.expected_calls[
             ('dom0', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm class=AppVM state=Running\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VM some-vm exists\n')
+                qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app), 0)
+            self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: exists'])
         self.assertAllCalled()
 
     def test_003_exists_multi_verbose(self):
@@ -59,13 +56,12 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
             ('dom0', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm class=AppVM state=Running\n' \
             b'other-vm class=AppVM state=Running\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
                 qubesadmin.tools.qvm_check.main(['some-vm', 'other-vm'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VMs other-vm, some-vm exist\n')
+                                                app=self.app), 0)
+            self.assertEqual(logger.output, ['INFO:qvm-check:other-vm: exists',
+                                             'INFO:qvm-check:some-vm: exists'])
         self.assertAllCalled()
 
     def test_004_running_verbose(self):
@@ -77,13 +73,11 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
         self.app.expected_calls[
             ('some-vm', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm class=AppVM state=Running\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--running',
-                    'some-vm'], app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VM some-vm is running\n')
+                qubesadmin.tools.qvm_check.main(['--running', 'some-vm'],
+                                                app=self.app), 0)
+            self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: running'])
         self.assertAllCalled()
 
     def test_005_running_multi_verbose(self):
@@ -98,14 +92,12 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
         self.app.expected_calls[
             ('some-vm2', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm2 class=AppVM state=Running\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
-            self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--running',
-                    'some-vm', 'some-vm2'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VMs some-vm, some-vm2 are running\n')
+        with self.assertLogs() as logger:
+            self.assertEqual(qubesadmin.tools.qvm_check.main(
+                ['--running', 'some-vm', 'some-vm2'], app=self.app), 0)
+            self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: running',
+                                             'INFO:qvm-check:some-vm2: running']
+                             )
         self.assertAllCalled()
 
     def test_006_running_multi_verbose2(self):
@@ -123,14 +115,13 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
         self.app.expected_calls[
             ('some-vm3', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm3 class=AppVM state=Halted\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--running',
-                    '--all'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VMs some-vm, some-vm2 are running\n')
+                qubesadmin.tools.qvm_check.main(['--running', '--all'],
+                                                app=self.app), 3)
+            self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: running',
+                                             'INFO:qvm-check:some-vm2: running']
+                             )
         self.assertAllCalled()
 
     def test_007_not_running_verbose(self):
@@ -142,14 +133,12 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
         self.app.expected_calls[
             ('some-vm3', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm3 class=AppVM state=Halted\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--running',
-                    'some-vm3'],
-                    app=self.app),
-                1)
-            self.assertEqual(stdout.getvalue(),
-                'None of given VM is running\n')
+                qubesadmin.tools.qvm_check.main(['--running', 'some-vm3'],
+                                                app=self.app), 1)
+            self.assertEqual(logger.output,
+                             ['INFO:qvm-check:None of qubes: running'])
         self.assertAllCalled()
 
     def test_008_paused(self):
@@ -161,14 +150,11 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
         self.app.expected_calls[
             ('some-vm2', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm2 class=AppVM state=Paused\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--paused',
-                    'some-vm2'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VM some-vm2 is paused\n')
+                qubesadmin.tools.qvm_check.main(['--paused', 'some-vm2'],
+                                                app=self.app), 0)
+            self.assertEqual(logger.output, ['INFO:qvm-check:some-vm2: paused'])
         self.assertAllCalled()
 
     def test_009_paused_multi(self):
@@ -183,31 +169,24 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
         self.app.expected_calls[
             ('some-vm', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm class=AppVM state=Running\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
-            self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--paused',
-                    'some-vm2', 'some-vm'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VM some-vm2 is paused\n')
+        with self.assertLogs() as logger:
+            self.assertEqual(qubesadmin.tools.qvm_check.main(
+                ['--paused', 'some-vm2', 'some-vm'], app=self.app), 3)
+            self.assertEqual(logger.output, ['INFO:qvm-check:some-vm2: paused'])
         self.assertAllCalled()
 
-
     def test_010_template(self):
         self.app.expected_calls[
             ('dom0', 'admin.vm.List', None, None)] = \
             b'0\x00some-vm class=AppVM state=Running\n' \
             b'some-vm2 class=AppVM state=Paused\n' \
             b'some-vm3 class=TemplateVM state=Halted\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--template',
-                    'some-vm3'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VM some-vm3 is a template\n')
+                qubesadmin.tools.qvm_check.main(['--template', 'some-vm3'],
+                                                app=self.app), 0)
+            self.assertEqual(logger.output,
+                             ['INFO:qvm-check:some-vm3: template'])
         self.assertAllCalled()
 
     def test_011_template_multi(self):
@@ -216,13 +195,53 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase):
             b'0\x00some-vm class=AppVM state=Running\n' \
             b'some-vm2 class=AppVM state=Paused\n' \
             b'some-vm3 class=TemplateVM state=Halted\n'
-        with qubesadmin.tests.tools.StdoutBuffer() as stdout:
+        with self.assertLogs() as logger:
+            self.assertEqual(qubesadmin.tools.qvm_check.main(
+                ['--template', 'some-vm2', 'some-vm3'], app=self.app), 3)
+            self.assertEqual(logger.output,
+                             ['INFO:qvm-check:some-vm3: template'])
+        self.assertAllCalled()
+
+    def test_012_networked(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n' \
+            b'some-vm2 class=AppVM state=Running\n'
+        self.app.expected_calls[
+            ('some-vm2', 'admin.vm.property.Get', 'provides_network', None)] = \
+            b'0\x00default=false type=bool false'
+        self.app.expected_calls[
+            ('some-vm2', 'admin.vm.property.Get', 'netvm', None)] = \
+            b'0\x00default=false type=vm some-vm'
+        with self.assertLogs() as logger:
             self.assertEqual(
-                qubesadmin.tools.qvm_check.main(['--template',
-                    'some-vm2', 'some-vm3'],
-                    app=self.app),
-                0)
-            self.assertEqual(stdout.getvalue(),
-                'VM some-vm3 is a template\n')
+                qubesadmin.tools.qvm_check.main(['--networked', 'some-vm2'],
+                                                app=self.app), 0)
+            self.assertEqual(logger.output,
+                             ['INFO:qvm-check:some-vm2: networked'])
         self.assertAllCalled()
 
+    def test_013_networked_multi(self):
+        self.app.expected_calls[
+            ('dom0', 'admin.vm.List', None, None)] = \
+            b'0\x00some-vm class=AppVM state=Running\n' \
+            b'some-vm2 class=AppVM state=Running\n' \
+            b'some-vm3 class=TemplateVM state=Halted\n'
+        self.app.expected_calls[
+            ('some-vm2', 'admin.vm.property.Get', 'provides_network', None)] = \
+            b'0\x00default=false type=bool false'
+        self.app.expected_calls[
+            ('some-vm3', 'admin.vm.property.Get', 'provides_network', None)] = \
+            b'0\x00default=false type=bool false'
+        self.app.expected_calls[
+            ('some-vm2', 'admin.vm.property.Get', 'netvm', None)] = \
+            b'0\x00default=false type=vm some-vm'
+        self.app.expected_calls[
+            ('some-vm3', 'admin.vm.property.Get', 'netvm', None)] = \
+            b"0\x00default=false type=vm "
+        with self.assertLogs() as logger:
+            self.assertEqual(qubesadmin.tools.qvm_check.main(
+                ['--networked', 'some-vm2', 'some-vm3'], app=self.app), 3)
+            self.assertEqual(logger.output,
+                             ['INFO:qvm-check:some-vm2: networked'])
+        self.assertAllCalled()

+ 68 - 32
qubesadmin/tools/qvm_check.py

@@ -4,6 +4,7 @@
 # The Qubes OS Project, http://www.qubes-os.org
 #
 # Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
+# Copyright (C) 2019 Frédéric Pierret <frederic.pierret@qubes-os.org>
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
@@ -19,9 +20,7 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 #
-''' Exits sucessfull if the provided domains exists, else returns failure '''
-
-from __future__ import print_function
+""" Exits sucessfull if the provided domains exists, else returns failure """
 
 import sys
 
@@ -29,48 +28,85 @@ import qubesadmin.tools
 import qubesadmin.vm
 
 parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
-    vmname_nargs='+')
+                                              vmname_nargs='+')
 parser.add_argument("--running", action="store_true", dest="running",
-    default=False, help="Determine if (any of given) VM is running")
+                    default=False,
+                    help="Determine if (any of given) VM is running")
 parser.add_argument("--paused", action="store_true", dest="paused",
-    default=False, help="Determine if (any of given) VM is paused")
+                    default=False,
+                    help="Determine if (any of given) VM is paused")
 parser.add_argument("--template", action="store_true", dest="template",
-    default=False, help="Determine if (any of given) VM is a template")
+                    default=False,
+                    help="Determine if (any of given) VM is a template")
+parser.add_argument("--networked", action="store_true", dest="networked",
+                    default=False,
+                    help="Determine if (any of given) VM can reach network")
 
 
-def print_msg(domains, what_single, what_plural):
-    '''Print message in appropriate form about given domain(s)'''
+def print_msg(log, domains, status):
+    """Print message in appropriate form about given domain(s)"""
     if not domains:
-        print("None of given VM {!s}".format(what_single))
-    elif len(domains) == 1:
-        print("VM {!s} {!s}".format(domains[0], what_single))
+        log.info("None of qubes: {!s}".format(', '.join(status)))
     else:
-        txt = ", ".join([vm.name for vm in sorted(domains)])
-        print("VMs {!s} {!s}".format(txt, what_plural))
+        for vm in sorted(domains):
+            log.info("{!s}: {!s}".format(vm.name, ', '.join(status)))
 
 
-def main(args=None, app=None):
-    '''Main function of qvm-check tool'''
-    args = parser.parse_args(args, app=app)
-    domains = args.domains
+def get_filters(args):
+    """Get status and check functions"""
+    filters = []
+
     if args.running:
-        running = [vm for vm in domains if vm.is_running()]
-        if args.verbose:
-            print_msg(running, "is running", "are running")
-        return 0 if running else 1
+        filters.append({'status': 'running', 'check': lambda x: x.is_running()})
     if args.paused:
-        paused = [vm for vm in domains if vm.is_paused()]
-        if args.verbose:
-            print_msg(paused, "is paused", "are paused")
-        return 0 if paused else 1
+        filters.append({'status': 'paused', 'check': lambda x: x.is_paused()})
     if args.template:
-        template = [vm for vm in domains if vm.klass == 'TemplateVM']
+        filters.append(
+            {'status': 'template', 'check': lambda x: x.klass == 'TemplateVM'})
+    if args.networked:
+        filters.append(
+            {'status': 'networked', 'check': lambda x: x.is_networked()})
+
+    return filters
+
+
+def main(args=None, app=None):
+    """Main function of qvm-check tool"""
+    args = parser.parse_args(args, app=app)
+    domains = args.domains
+    return_code = 0
+
+    log = args.app.log
+    log.name = "qvm-check"
+
+    status = []
+    filters = get_filters(args)
+    filtered_domains = set(domains)
+    if filters:
+        for filt in filters:
+            status.append(filt['status'])
+            check = filt['check']
+            filtered_domains = filtered_domains.intersection(
+                [vm for vm in domains if check(vm)])
+
+        filtered_domains = list(filtered_domains)
+
+        if set(domains) & set(filtered_domains) != set(domains):
+            if not filtered_domains:
+                return_code = 1
+            else:
+                return_code = 3
+
         if args.verbose:
-            print_msg(template, "is a template", "are templates")
-        return 0 if template else 1
-    if args.verbose:
-        print_msg(domains, "exists", "exist")
-    return 0 if domains else 1
+            print_msg(log, filtered_domains, status)
+    else:
+        if not domains:
+            return_code = 1
+        elif args.verbose:
+            print_msg(log, domains, ["exists"])
+
+    return return_code
+
 
 if __name__ == '__main__':
     sys.exit(main())