From 7d93377b785f0feed988cc4e4448421c2899abf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Pierret=20=28fepitre=29?= Date: Wed, 31 Jul 2019 10:37:15 +0200 Subject: [PATCH] qvm-check: refactor check mechanism and add filter for checking netvm Fix QubesOS/qubes-issues#3496 --- doc/manpages/qvm-check.rst | 7 +- qubesadmin/tests/tools/qvm_check.py | 165 ++++++++++++++++------------ qubesadmin/tools/qvm_check.py | 100 +++++++++++------ 3 files changed, 166 insertions(+), 106 deletions(-) diff --git a/doc/manpages/qvm-check.rst b/doc/manpages/qvm-check.rst index 07a13c9..f694318 100644 --- a/doc/manpages/qvm-check.rst +++ b/doc/manpages/qvm-check.rst @@ -15,7 +15,7 @@ Synopsis -------- -:command:`qvm-check` [-h] [--verbose] [--quiet] [--all] [--exclude *EXCLUDE*] [--running] [--paused] [--template] [*VMNAME* [*VMNAME* ...]] +:command:`qvm-check` [-h] [--verbose] [--quiet] [--all] [--exclude *EXCLUDE*] [--running] [--paused] [--template] [--networked] [*VMNAME* [*VMNAME* ...]] Options ------- @@ -52,6 +52,10 @@ Options Determine if (any of given) VM is a template +.. option:: --networked + + Determine if (any of given) VM can reach network + Authors ------- @@ -59,5 +63,6 @@ Authors | Rafal Wojtczuk | Marek Marczykowski | Wojtek Porczyk +| Frédéric Pierret .. vim: ts=3 sw=3 et tw=80 diff --git a/qubesadmin/tests/tools/qvm_check.py b/qubesadmin/tests/tools/qvm_check.py index 4897925..b223fab 100644 --- a/qubesadmin/tests/tools/qvm_check.py +++ b/qubesadmin/tests/tools/qvm_check.py @@ -21,14 +21,14 @@ import qubesadmin.tests import qubesadmin.tools.qvm_check + class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): def test_000_exists(self): self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' self.assertEqual( - qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app), - 0) + qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app), 0) self.assertAllCalled() def test_001_exists_multi(self): @@ -38,20 +38,17 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): b'other-vm class=AppVM state=Running\n' self.assertEqual( qubesadmin.tools.qvm_check.main(['some-vm', 'other-vm'], - app=self.app), - 0) + app=self.app), 0) self.assertAllCalled() def test_002_exists_verbose(self): self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( - qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VM some-vm exists\n') + qubesadmin.tools.qvm_check.main(['some-vm'], app=self.app), 0) + self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: exists']) self.assertAllCalled() def test_003_exists_multi_verbose(self): @@ -59,13 +56,12 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' \ b'other-vm class=AppVM state=Running\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( qubesadmin.tools.qvm_check.main(['some-vm', 'other-vm'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VMs other-vm, some-vm exist\n') + app=self.app), 0) + self.assertEqual(logger.output, ['INFO:qvm-check:other-vm: exists', + 'INFO:qvm-check:some-vm: exists']) self.assertAllCalled() def test_004_running_verbose(self): @@ -77,13 +73,11 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('some-vm', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( - qubesadmin.tools.qvm_check.main(['--running', - 'some-vm'], app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VM some-vm is running\n') + qubesadmin.tools.qvm_check.main(['--running', 'some-vm'], + app=self.app), 0) + self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: running']) self.assertAllCalled() def test_005_running_multi_verbose(self): @@ -98,14 +92,12 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('some-vm2', 'admin.vm.List', None, None)] = \ b'0\x00some-vm2 class=AppVM state=Running\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: - self.assertEqual( - qubesadmin.tools.qvm_check.main(['--running', - 'some-vm', 'some-vm2'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VMs some-vm, some-vm2 are running\n') + with self.assertLogs() as logger: + self.assertEqual(qubesadmin.tools.qvm_check.main( + ['--running', 'some-vm', 'some-vm2'], app=self.app), 0) + self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: running', + 'INFO:qvm-check:some-vm2: running'] + ) self.assertAllCalled() def test_006_running_multi_verbose2(self): @@ -123,14 +115,13 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('some-vm3', 'admin.vm.List', None, None)] = \ b'0\x00some-vm3 class=AppVM state=Halted\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( - qubesadmin.tools.qvm_check.main(['--running', - '--all'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VMs some-vm, some-vm2 are running\n') + qubesadmin.tools.qvm_check.main(['--running', '--all'], + app=self.app), 3) + self.assertEqual(logger.output, ['INFO:qvm-check:some-vm: running', + 'INFO:qvm-check:some-vm2: running'] + ) self.assertAllCalled() def test_007_not_running_verbose(self): @@ -142,14 +133,12 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('some-vm3', 'admin.vm.List', None, None)] = \ b'0\x00some-vm3 class=AppVM state=Halted\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( - qubesadmin.tools.qvm_check.main(['--running', - 'some-vm3'], - app=self.app), - 1) - self.assertEqual(stdout.getvalue(), - 'None of given VM is running\n') + qubesadmin.tools.qvm_check.main(['--running', 'some-vm3'], + app=self.app), 1) + self.assertEqual(logger.output, + ['INFO:qvm-check:None of qubes: running']) self.assertAllCalled() def test_008_paused(self): @@ -161,14 +150,11 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('some-vm2', 'admin.vm.List', None, None)] = \ b'0\x00some-vm2 class=AppVM state=Paused\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( - qubesadmin.tools.qvm_check.main(['--paused', - 'some-vm2'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VM some-vm2 is paused\n') + qubesadmin.tools.qvm_check.main(['--paused', 'some-vm2'], + app=self.app), 0) + self.assertEqual(logger.output, ['INFO:qvm-check:some-vm2: paused']) self.assertAllCalled() def test_009_paused_multi(self): @@ -183,31 +169,24 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): self.app.expected_calls[ ('some-vm', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: - self.assertEqual( - qubesadmin.tools.qvm_check.main(['--paused', - 'some-vm2', 'some-vm'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VM some-vm2 is paused\n') + with self.assertLogs() as logger: + self.assertEqual(qubesadmin.tools.qvm_check.main( + ['--paused', 'some-vm2', 'some-vm'], app=self.app), 3) + self.assertEqual(logger.output, ['INFO:qvm-check:some-vm2: paused']) self.assertAllCalled() - def test_010_template(self): self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' \ b'some-vm2 class=AppVM state=Paused\n' \ b'some-vm3 class=TemplateVM state=Halted\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: + with self.assertLogs() as logger: self.assertEqual( - qubesadmin.tools.qvm_check.main(['--template', - 'some-vm3'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VM some-vm3 is a template\n') + qubesadmin.tools.qvm_check.main(['--template', 'some-vm3'], + app=self.app), 0) + self.assertEqual(logger.output, + ['INFO:qvm-check:some-vm3: template']) self.assertAllCalled() def test_011_template_multi(self): @@ -216,13 +195,53 @@ class TC_00_qvm_check(qubesadmin.tests.QubesTestCase): b'0\x00some-vm class=AppVM state=Running\n' \ b'some-vm2 class=AppVM state=Paused\n' \ b'some-vm3 class=TemplateVM state=Halted\n' - with qubesadmin.tests.tools.StdoutBuffer() as stdout: - self.assertEqual( - qubesadmin.tools.qvm_check.main(['--template', - 'some-vm2', 'some-vm3'], - app=self.app), - 0) - self.assertEqual(stdout.getvalue(), - 'VM some-vm3 is a template\n') + with self.assertLogs() as logger: + self.assertEqual(qubesadmin.tools.qvm_check.main( + ['--template', 'some-vm2', 'some-vm3'], app=self.app), 3) + self.assertEqual(logger.output, + ['INFO:qvm-check:some-vm3: template']) self.assertAllCalled() + def test_012_networked(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' \ + b'some-vm2 class=AppVM state=Running\n' + self.app.expected_calls[ + ('some-vm2', 'admin.vm.property.Get', 'provides_network', None)] = \ + b'0\x00default=false type=bool false' + self.app.expected_calls[ + ('some-vm2', 'admin.vm.property.Get', 'netvm', None)] = \ + b'0\x00default=false type=vm some-vm' + with self.assertLogs() as logger: + self.assertEqual( + qubesadmin.tools.qvm_check.main(['--networked', 'some-vm2'], + app=self.app), 0) + self.assertEqual(logger.output, + ['INFO:qvm-check:some-vm2: networked']) + self.assertAllCalled() + + def test_013_networked_multi(self): + self.app.expected_calls[ + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00some-vm class=AppVM state=Running\n' \ + b'some-vm2 class=AppVM state=Running\n' \ + b'some-vm3 class=TemplateVM state=Halted\n' + self.app.expected_calls[ + ('some-vm2', 'admin.vm.property.Get', 'provides_network', None)] = \ + b'0\x00default=false type=bool false' + self.app.expected_calls[ + ('some-vm3', 'admin.vm.property.Get', 'provides_network', None)] = \ + b'0\x00default=false type=bool false' + self.app.expected_calls[ + ('some-vm2', 'admin.vm.property.Get', 'netvm', None)] = \ + b'0\x00default=false type=vm some-vm' + self.app.expected_calls[ + ('some-vm3', 'admin.vm.property.Get', 'netvm', None)] = \ + b"0\x00default=false type=vm " + with self.assertLogs() as logger: + self.assertEqual(qubesadmin.tools.qvm_check.main( + ['--networked', 'some-vm2', 'some-vm3'], app=self.app), 3) + self.assertEqual(logger.output, + ['INFO:qvm-check:some-vm2: networked']) + self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_check.py b/qubesadmin/tools/qvm_check.py index ce0b705..bb0284f 100644 --- a/qubesadmin/tools/qvm_check.py +++ b/qubesadmin/tools/qvm_check.py @@ -4,6 +4,7 @@ # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2016 Bahtiar `kalkin-` Gadimov +# Copyright (C) 2019 Frédéric Pierret # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by @@ -19,9 +20,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # -''' Exits sucessfull if the provided domains exists, else returns failure ''' - -from __future__ import print_function +""" Exits sucessfull if the provided domains exists, else returns failure """ import sys @@ -29,48 +28,85 @@ import qubesadmin.tools import qubesadmin.vm parser = qubesadmin.tools.QubesArgumentParser(description=__doc__, - vmname_nargs='+') + vmname_nargs='+') parser.add_argument("--running", action="store_true", dest="running", - default=False, help="Determine if (any of given) VM is running") + default=False, + help="Determine if (any of given) VM is running") parser.add_argument("--paused", action="store_true", dest="paused", - default=False, help="Determine if (any of given) VM is paused") + default=False, + help="Determine if (any of given) VM is paused") parser.add_argument("--template", action="store_true", dest="template", - default=False, help="Determine if (any of given) VM is a template") + default=False, + help="Determine if (any of given) VM is a template") +parser.add_argument("--networked", action="store_true", dest="networked", + default=False, + help="Determine if (any of given) VM can reach network") -def print_msg(domains, what_single, what_plural): - '''Print message in appropriate form about given domain(s)''' +def print_msg(log, domains, status): + """Print message in appropriate form about given domain(s)""" if not domains: - print("None of given VM {!s}".format(what_single)) - elif len(domains) == 1: - print("VM {!s} {!s}".format(domains[0], what_single)) + log.info("None of qubes: {!s}".format(', '.join(status))) else: - txt = ", ".join([vm.name for vm in sorted(domains)]) - print("VMs {!s} {!s}".format(txt, what_plural)) + for vm in sorted(domains): + log.info("{!s}: {!s}".format(vm.name, ', '.join(status))) + + +def get_filters(args): + """Get status and check functions""" + filters = [] + + if args.running: + filters.append({'status': 'running', 'check': lambda x: x.is_running()}) + if args.paused: + filters.append({'status': 'paused', 'check': lambda x: x.is_paused()}) + if args.template: + filters.append( + {'status': 'template', 'check': lambda x: x.klass == 'TemplateVM'}) + if args.networked: + filters.append( + {'status': 'networked', 'check': lambda x: x.is_networked()}) + + return filters def main(args=None, app=None): - '''Main function of qvm-check tool''' + """Main function of qvm-check tool""" args = parser.parse_args(args, app=app) domains = args.domains - if args.running: - running = [vm for vm in domains if vm.is_running()] + return_code = 0 + + log = args.app.log + log.name = "qvm-check" + + status = [] + filters = get_filters(args) + filtered_domains = set(domains) + if filters: + for filt in filters: + status.append(filt['status']) + check = filt['check'] + filtered_domains = filtered_domains.intersection( + [vm for vm in domains if check(vm)]) + + filtered_domains = list(filtered_domains) + + if set(domains) & set(filtered_domains) != set(domains): + if not filtered_domains: + return_code = 1 + else: + return_code = 3 + if args.verbose: - print_msg(running, "is running", "are running") - return 0 if running else 1 - if args.paused: - paused = [vm for vm in domains if vm.is_paused()] - if args.verbose: - print_msg(paused, "is paused", "are paused") - return 0 if paused else 1 - if args.template: - template = [vm for vm in domains if vm.klass == 'TemplateVM'] - if args.verbose: - print_msg(template, "is a template", "are templates") - return 0 if template else 1 - if args.verbose: - print_msg(domains, "exists", "exist") - return 0 if domains else 1 + print_msg(log, filtered_domains, status) + else: + if not domains: + return_code = 1 + elif args.verbose: + print_msg(log, domains, ["exists"]) + + return return_code + if __name__ == '__main__': sys.exit(main())