From dc3fcc3141bf40b89f9f25aa2226bde9889f8135 Mon Sep 17 00:00:00 2001 From: Wojtek Porczyk Date: Sat, 17 Oct 2015 00:10:15 +0200 Subject: [PATCH] qubes/tests: port again the backported test runner part of QubesOS/qubes-issues#1248 --- .pylintrc | 2 +- qubes/tests/__init__.py | 263 ++++++++++++++++++++++++++++- qubes/tests/run.py | 201 ++++++++++++++++++---- tests/__init__.py | 364 ---------------------------------------- tests/run.py | 361 --------------------------------------- 5 files changed, 429 insertions(+), 762 deletions(-) delete mode 100755 tests/run.py diff --git a/.pylintrc b/.pylintrc index 07ad5a44..c48c0ef9 100644 --- a/.pylintrc +++ b/.pylintrc @@ -129,7 +129,7 @@ variable-rgx=[a-z_][a-z0-9_]{2,30}$ inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ # Good variable names which should always be accepted, separated by a comma -good-names=e,i,j,k,m,p,ex,Run,_,log,vm,xc,xs,ip,fd,fh,rw,st +good-names=e,i,j,k,m,p,ex,Run,_,log,vm,xc,xs,ip,fd,fh,rw,st,tb # Bad variable names which should always be refused, separated by a comma bad-names=foo,bar,baz,toto,tutu,tata diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index a1a93852..031c8493 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -1,10 +1,13 @@ #!/usr/bin/python2 -O # vim: fileencoding=utf-8 +# pylint: disable=invalid-name # # The Qubes OS Project, https://www.qubes-os.org/ # # Copyright (C) 2014-2015 Joanna Rutkowska +# Copyright (C) 2014-2015 +# Marek Marczykowski-Górecki # Copyright (C) 2014-2015 Wojtek Porczyk # # This program is free software; you can redistribute it and/or modify @@ -23,8 +26,12 @@ # import collections +import multiprocessing +import logging import os +import shutil import subprocess +import sys import unittest import lxml.etree @@ -32,6 +39,8 @@ import lxml.etree import qubes.config import qubes.events +VMPREFIX = 'test-' + #: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise in_dom0 = False @@ -44,7 +53,6 @@ try: import libvirt libvirt.openReadOnly(qubes.config.defaults['libvirt_uri']).close() in_dom0 = True - del libvirt except libvirt.libvirtError: pass @@ -66,7 +74,7 @@ def skipUnlessDom0(test_item): Some tests (especially integration tests) have to be run in more or less working dom0. This is checked by connecting to libvirt. - ''' # pylint: disable=invalid-name + ''' return unittest.skipUnless(in_dom0, 'outside dom0')(test_item) @@ -76,7 +84,7 @@ def skipUnlessGit(test_item): There are very few tests that an be run only in git. One example is correctness of example code that won't get included in RPM. - ''' # pylint: disable=invalid-name + ''' return unittest.skipUnless(in_git, 'outside git tree')(test_item) @@ -113,10 +121,64 @@ class TestEmitter(qubes.events.Emitter): self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1 +class _AssertNotRaisesContext(object): + """A context manager used to implement TestCase.assertNotRaises methods. + + Stolen from unittest and hacked. Regexp support stripped. + """ # pylint: disable=too-few-public-methods + + def __init__(self, expected, test_case, expected_regexp=None): + if expected_regexp is not None: + raise NotImplementedError('expected_regexp is unsupported') + + self.expected = expected + self.exception = None + + self.failureException = test_case.failureException + + + def __enter__(self): + return self + + + def __exit__(self, exc_type, exc_value, tb): + if exc_type is None: + return True + + try: + exc_name = self.expected.__name__ + except AttributeError: + exc_name = str(self.expected) + + if issubclass(exc_type, self.expected): + raise self.failureException( + "{0} raised".format(exc_name)) + else: + # pass through + return False + + self.exception = exc_value # store for later retrieval + + +class BeforeCleanExit(BaseException): + '''Raised from :py:meth:`QubesTestCase.tearDown` when + :py:attr:`qubes.tests.run.QubesDNCTestResult.do_not_clean` is set.''' + pass + + class QubesTestCase(unittest.TestCase): '''Base class for Qubes unit tests. ''' + def __init__(self, *args, **kwargs): + super(QubesTestCase, self).__init__(*args, **kwargs) + self.longMessage = True + self.log = logging.getLogger('{}.{}.{}'.format( + self.__class__.__module__, + self.__class__.__name__, + self._testMethodName)) + + def __str__(self): return '{}/{}/{}'.format( '.'.join(self.__class__.__module__.split('.')[2:]), @@ -124,6 +186,49 @@ class QubesTestCase(unittest.TestCase): self._testMethodName) + def tearDown(self): + super(QubesTestCase, self).tearDown() + + result = self._resultForDoCleanups + failed_test_cases = result.failures \ + + result.errors \ + + [(tc, None) for tc in result.unexpectedSuccesses] + + if getattr(result, 'do_not_clean', False) \ + and any(tc is self for tc, exc in failed_test_cases): + raise BeforeCleanExit() + + + def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs): + """Fail if an exception of class excClass is raised + by callableObj when invoked with arguments args and keyword + arguments kwargs. If a different type of exception is + raised, it will not be caught, and the test case will be + deemed to have suffered an error, exactly as for an + unexpected exception. + + If called with callableObj omitted or None, will return a + context object used like this:: + + with self.assertRaises(SomeException): + do_something() + + The context manager keeps a reference to the exception as + the 'exception' attribute. This allows you to inspect the + exception after the assertion:: + + with self.assertRaises(SomeException) as cm: + do_something() + the_exception = cm.exception + self.assertEqual(the_exception.error_code, 3) + """ + context = _AssertNotRaisesContext(excClass, self) + if callableObj is None: + return context + with context: + callableObj(*args, **kwargs) + + def assertXMLEqual(self, xml1, xml2): '''Check for equality of two XML objects. @@ -131,7 +236,7 @@ class QubesTestCase(unittest.TestCase): :param xml2: second element :type xml1: :py:class:`lxml.etree._Element` :type xml2: :py:class:`lxml.etree._Element` - ''' # pylint: disable=invalid-name + ''' self.assertEqual(xml1.tag, xml2.tag) self.assertEqual(xml1.text, xml2.text) @@ -151,7 +256,7 @@ class QubesTestCase(unittest.TestCase): an event :param list kwargs: when given, all items must appear in kwargs passed \ to an event - ''' # pylint: disable=invalid-name + ''' for ev, ev_args, ev_kwargs in emitter.fired_events: if ev != event: @@ -176,7 +281,7 @@ class QubesTestCase(unittest.TestCase): an event :param list kwargs: when given, all items must appear in kwargs passed \ to an event - ''' # pylint: disable=invalid-name + ''' for ev, ev_args, ev_kwargs in emitter.fired_events: if ev != event: @@ -205,7 +310,7 @@ class QubesTestCase(unittest.TestCase): :param lxml.etree._Element xml: XML element instance to check :param str file: filename of Relax NG schema :param str schema: optional explicit schema string - ''' # pylint: disable=invalid-name,redefined-builtin + ''' # pylint: disable=redefined-builtin if schema is not None and file is None: relaxng = schema @@ -240,3 +345,147 @@ class QubesTestCase(unittest.TestCase): raise except AssertionError as e: self.fail(str(e)) + + +class SystemTestsMixin(object): + def setUp(self): + super(SystemTestsMixin, self).setUp() + self.remove_test_vms() + + tearDown = setUp + + + @staticmethod + def make_vm_name(name): + return VMPREFIX + name + + + def _remove_vm_qubes(self, vm): + vmname = vm.name + + try: + # XXX .is_running() may throw libvirtError if undefined + if vm.is_running(): + vm.force_shutdown() + except: # pylint: disable=bare-except + pass + + try: + vm.remove_from_disk() + except: # pylint: disable=bare-except + pass + + try: + vm.libvirt_domain.undefine() + except libvirt.libvirtError: + pass + + del self.app.domains[vm] + del vm + + # Now ensure it really went away. This may not have happened, + # for example if vm.libvirt_domain malfunctioned. + try: + dom = self.conn.lookupByName(vmname) + except: # pylint: disable=bare-except + pass + else: + self._remove_vm_libvirt(dom) + + self._remove_vm_disk(vmname) + + + @staticmethod + def _remove_vm_libvirt(dom): + try: + dom.destroy() + except libvirt.libvirtError: # not running + pass + dom.undefine() + + + @staticmethod + def _remove_vm_disk(vmname): + for dirspec in ( + 'qubes_appvms_dir', + 'qubes_servicevms_dir', + 'qubes_templates_dir'): + dirpath = os.path.join(qubes.config.system_path['qubes_base_dir'], + qubes.config.system_path[dirspec], vmname) + if os.path.exists(dirpath): + if os.path.isdir(dirpath): + shutil.rmtree(dirpath) + else: + os.unlink(dirpath) + + + def remove_vms(self, vms): + for vm in vms: + self._remove_vm_qubes(vm) + self.save_and_reload_db() + + + def remove_test_vms(self): + '''Aggresively remove any domain that has name in testing namespace. + + .. warning:: + The test suite hereby claims any domain whose name starts with + :py:data:`VMPREFIX` as fair game. This is needed to enforce sane + test executing environment. If you have domains named ``test-*``, + don't run the tests. + ''' + + # first, remove them Qubes-way + something_removed = False + for vm in self.app.domains: + if vm.name.startswith(VMPREFIX): + self._remove_vm_qubes(vm) + something_removed = True + if something_removed: + self.save_and_reload_db() + + # now remove what was only in libvirt + for dom in self.conn.listAllDomains(): + if dom.name().startswith(VMPREFIX): + self._remove_vm_libvirt(dom) + + # finally remove anything that is left on disk + vmnames = set() + for dirspec in ( + 'qubes_appvms_dir', + 'qubes_servicevms_dir', + 'qubes_templates_dir'): + dirpath = os.path.join(qubes.config.system_path['qubes_base_dir'], + qubes.config.system_path[dirspec]) + for name in os.listdir(dirpath): + if name.startswith(VMPREFIX): + vmnames.add(name) + for vmname in vmnames: + self._remove_vm_disk(vmname) + + +def load_tests(loader, tests, pattern): # pylint: disable=unused-argument + # discard any tests from this module, because it hosts base classes + tests = unittest.TestSuite() + + for modname in ( + # unit tests + 'qubes.tests.events', + 'qubes.tests.vm.init', + 'qubes.tests.vm.qubesvm', + 'qubes.tests.vm.adminvm', + 'qubes.tests.init', + 'qubes.tests.tools', + + # integration tests +# 'qubes.tests.init.basic', +# 'qubes.tests.dom0_update', +# 'qubes.tests.network', +# 'qubes.tests.vm_qrexec_gui', +# 'qubes.tests.backup', +# 'qubes.tests.backupcompatibility', +# 'qubes.tests.regressions', + ): + tests.addTests(loader.loadTestsFromName(modname)) + + return tests diff --git a/qubes/tests/run.py b/qubes/tests/run.py index 6e4efe04..1a207da7 100755 --- a/qubes/tests/run.py +++ b/qubes/tests/run.py @@ -22,28 +22,22 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # +import argparse import curses +import logging +import logging.handlers +import os import socket +import subprocess import sys import unittest - -test_order = [ - 'qubes.tests.events', - 'qubes.tests.vm.init', - 'qubes.tests.vm.qubesvm', - 'qubes.tests.vm.adminvm', - 'qubes.tests.init', - - 'qubes.tests.tools', -] - -sys.path.insert(1, '../../') +import unittest.signals import qubes.tests -class ANSIColor(dict): +class CursesColor(dict): def __init__(self): - super(ANSIColor, self).__init__() + super(CursesColor, self).__init__() try: curses.setupterm() except curses.error: @@ -67,7 +61,7 @@ class ANSIColor(dict): return '' -class ANSITestResult(unittest.TestResult): +class QubesTestResult(unittest.TestResult): '''A test result class that can print colourful text results to a stream. Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult. @@ -77,15 +71,18 @@ class ANSITestResult(unittest.TestResult): separator2 = unittest.TextTestResult.separator2 def __init__(self, stream, descriptions, verbosity): - super(ANSITestResult, self).__init__(stream, descriptions, verbosity) + super(QubesTestResult, self).__init__(stream, descriptions, verbosity) self.stream = stream self.showAll = verbosity > 1 # pylint: disable=invalid-name self.dots = verbosity == 1 self.descriptions = descriptions - self.color = ANSIColor() + self.color = CursesColor() self.hostname = socket.gethostname() + self.log = logging.getLogger('qubes.tests') + + def _fmtexc(self, err): if str(err[1]): return '{color[bold]}{}:{color[normal]} {!s}'.format( @@ -114,7 +111,8 @@ class ANSITestResult(unittest.TestResult): return teststr def startTest(self, test): # pylint: disable=invalid-name - super(ANSITestResult, self).startTest(test) + super(QubesTestResult, self).startTest(test) + test.log.critical('started') if self.showAll: if not qubes.tests.in_git: self.stream.write('{}: '.format(self.hostname)) @@ -123,7 +121,8 @@ class ANSITestResult(unittest.TestResult): self.stream.flush() def addSuccess(self, test): # pylint: disable=invalid-name - super(ANSITestResult, self).addSuccess(test) + super(QubesTestResult, self).addSuccess(test) + test.log.warning('ok') if self.showAll: self.stream.writeln('{color[green]}ok{color[normal]}'.format( color=self.color)) @@ -132,7 +131,9 @@ class ANSITestResult(unittest.TestResult): self.stream.flush() def addError(self, test, err): # pylint: disable=invalid-name - super(ANSITestResult, self).addError(test, err) + super(QubesTestResult, self).addError(test, err) + test.log.critical('ERROR ({err[0].__name__}: {err[1]!r})'.format( + err=err)) if self.showAll: self.stream.writeln( '{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format( @@ -144,7 +145,8 @@ class ANSITestResult(unittest.TestResult): self.stream.flush() def addFailure(self, test, err): # pylint: disable=invalid-name - super(ANSITestResult, self).addFailure(test, err) + super(QubesTestResult, self).addFailure(test, err) + test.log.error('FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err)) if self.showAll: self.stream.writeln('{color[red]}FAIL{color[normal]}'.format( color=self.color)) @@ -154,7 +156,8 @@ class ANSITestResult(unittest.TestResult): self.stream.flush() def addSkip(self, test, reason): # pylint: disable=invalid-name - super(ANSITestResult, self).addSkip(test, reason) + super(QubesTestResult, self).addSkip(test, reason) + test.log.warning('skipped ({})'.format(reason)) if self.showAll: self.stream.writeln( '{color[cyan]}skipped{color[normal]} ({})'.format( @@ -165,7 +168,8 @@ class ANSITestResult(unittest.TestResult): self.stream.flush() def addExpectedFailure(self, test, err): # pylint: disable=invalid-name - super(ANSITestResult, self).addExpectedFailure(test, err) + super(QubesTestResult, self).addExpectedFailure(test, err) + test.log.warning('expected failure') if self.showAll: self.stream.writeln( '{color[yellow]}expected failure{color[normal]}'.format( @@ -176,7 +180,8 @@ class ANSITestResult(unittest.TestResult): self.stream.flush() def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name - super(ANSITestResult, self).addUnexpectedSuccess(test) + super(QubesTestResult, self).addUnexpectedSuccess(test) + test.log.error('unexpected success') if self.showAll: self.stream.writeln( '{color[yellow]}{color[bold]}unexpected success' @@ -206,6 +211,10 @@ class ANSITestResult(unittest.TestResult): self.stream.writeln('%s' % err) +class QubesDNCTestResult(QubesTestResult): + do_not_clean = True + + def demo(verbosity=2): class TC_00_Demo(qubes.tests.QubesTestCase): '''Demo class''' @@ -237,19 +246,153 @@ def demo(verbosity=2): suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo) runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity) - runner.resultclass = ANSITestResult + runner.resultclass = QubesTestResult return runner.run(suite).wasSuccessful() +parser = argparse.ArgumentParser( + epilog='''When running only specific tests, write their names like in log, + in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial + "qubes.tests.". Example: basic/TC_00_Basic/test_000_create''') + +parser.add_argument('--verbose', '-v', + action='count', + help='increase console verbosity level') +parser.add_argument('--quiet', '-q', + action='count', + help='decrease console verbosity level') + +parser.add_argument('--list', '-l', + action='store_true', dest='list', + help='list all available tests and exit') + +parser.add_argument('--failfast', '-f', + action='store_true', dest='failfast', + help='stop on the first fail, error or unexpected success') +parser.add_argument('--no-failfast', + action='store_false', dest='failfast', + help='disable --failfast') + +parser.add_argument('--do-not-clean', '--dnc', '-D', + action='store_true', dest='do_not_clean', + help='do not execute tearDown on failed tests. Implies --failfast.') +parser.add_argument('--do-clean', '-C', + action='store_false', dest='do_not_clean', + help='do execute tearDown even on failed tests.') + +# pylint: disable=protected-access +parser.add_argument('--loglevel', '-L', metavar='LEVEL', + action='store', choices=tuple(k + for k in sorted(logging._levelNames.keys(), + key=lambda x: logging._levelNames[x]) + if isinstance(k, basestring)), + help='logging level for file and syslog forwarding ' + '(one of: %(choices)s; default: %(default)s)') +# pylint: enable=protected-access + +parser.add_argument('--logfile', '-o', metavar='FILE', + action='store', + help='if set, test run will be also logged to file') + +parser.add_argument('--syslog', + action='store_true', dest='syslog', + help='reenable logging to syslog') +parser.add_argument('--no-syslog', + action='store_false', dest='syslog', + help='disable logging to syslog') + +parser.add_argument('--kmsg', '--very-brave-or-very-stupid', + action='store_true', dest='kmsg', + help='log most important things to kernel ring-buffer') +parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers', + action='store_false', dest='kmsg', + help='do not abuse kernel ring-buffer') + +parser.add_argument('names', metavar='TESTNAME', + action='store', nargs='*', + help='list of tests to run named like in description ' + '(default: run all tests)') + +parser.set_defaults( + failfast=False, + loglevel='DEBUG', + logfile=None, + syslog=True, + kmsg=False, + verbose=2, + quiet=0) + + +def list_test_cases(suite): + for test in suite: + if isinstance(test, unittest.TestSuite): + #yield from + for i in list_test_cases(test): + yield i + else: + yield test + + def main(): + args = parser.parse_args() + suite = unittest.TestSuite() loader = unittest.TestLoader() - for modname in test_order: - suite.addTests(loader.loadTestsFromName(modname)) - runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2) - runner.resultclass = ANSITestResult + if args.names: + alltests = loader.loadTestsFromName('qubes.tests') + for name in args.names: + suite.addTests( + [test for test in list_test_cases(alltests) + if (str(test)+'/').startswith(name.replace('.', '/')+'/')]) + else: + suite.addTests(loader.loadTestsFromName('qubes.tests')) + + if args.list: + for test in list_test_cases(suite): + print(str(test)) # pylint: disable=superfluous-parens + return True + + if args.do_not_clean: + args.failfast = True + + logging.root.setLevel(args.loglevel) + + if args.logfile is not None: + ha_file = logging.FileHandler( + os.path.join(os.environ['HOME'], args.logfile)) + ha_file.setFormatter( + logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s')) + logging.root.addHandler(ha_file) + + if args.syslog: + ha_syslog = logging.handlers.SysLogHandler('/dev/log') + ha_syslog.setFormatter( + logging.Formatter('%(name)s[%(process)d]: %(message)s')) + logging.root.addHandler(ha_syslog) + + if args.kmsg: + try: + subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg')) + except subprocess.CalledProcessError: + parser.error('could not chmod /dev/kmsg') + else: + ha_kmsg = logging.FileHandler('/dev/kmsg', 'w') + ha_kmsg.setFormatter( + logging.Formatter('%(name)s[%(process)d]: %(message)s')) + ha_kmsg.setLevel(logging.CRITICAL) + logging.root.addHandler(ha_kmsg) + + runner = unittest.TextTestRunner(stream=sys.stdout, + verbosity=(args.verbose-args.quiet), + failfast=args.failfast) + unittest.signals.installHandler() + + runner.resultclass = QubesDNCTestResult \ + if args.do_not_clean else QubesTestResult + return runner.run(suite).wasSuccessful() + if __name__ == '__main__': sys.exit(not main()) diff --git a/tests/__init__.py b/tests/__init__.py index c3c37401..dd786bea 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,346 +1,3 @@ -#!/usr/bin/python2 -O -# vim: fileencoding=utf-8 - -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2014-2015 -# Marek Marczykowski-Górecki -# Copyright (C) 2015 Wojtek Porczyk -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# - -import multiprocessing -import logging -import os -import shutil -import subprocess -import unittest - -import lxml.etree -import sys - -import qubes.backup -import qubes.qubes - -VMPREFIX = 'test-' - - -#: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise -in_dom0 = False - -#: :py:obj:`False` if outside of git repo, -#: path to root of the directory otherwise -in_git = False - -try: - import libvirt - libvirt.openReadOnly(qubes.qubes.defaults['libvirt_uri']).close() - in_dom0 = True -except libvirt.libvirtError: - pass - -try: - in_git = subprocess.check_output( - ['git', 'rev-parse', '--show-toplevel'], - stderr=open(os.devnull, 'w')).strip() -except subprocess.CalledProcessError: - # git returned nonzero, we are outside git repo - pass -except OSError: - # command not found; let's assume we're outside - pass - - -def skipUnlessDom0(test_item): - '''Decorator that skips test outside dom0. - - Some tests (especially integration tests) have to be run in more or less - working dom0. This is checked by connecting to libvirt. - ''' # pylint: disable=invalid-name - - return unittest.skipUnless(in_dom0, 'outside dom0')(test_item) - - -def skipUnlessGit(test_item): - '''Decorator that skips test outside git repo. - - There are very few tests that an be run only in git. One example is - correctness of example code that won't get included in RPM. - ''' # pylint: disable=invalid-name - - return unittest.skipUnless(in_git, 'outside git tree')(test_item) - - -class _AssertNotRaisesContext(object): - """A context manager used to implement TestCase.assertNotRaises methods. - - Stolen from unittest and hacked. Regexp support stripped. - """ - - def __init__(self, expected, test_case, expected_regexp=None): - self.expected = expected - self.failureException = test_case.failureException - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, tb): - if exc_type is None: - return True - - try: - exc_name = self.expected.__name__ - except AttributeError: - exc_name = str(self.expected) - - if issubclass(exc_type, self.expected): - raise self.failureException( - "{0} raised".format(exc_name)) - else: - # pass through - return False - - self.exception = exc_value # store for later retrieval - - -class BeforeCleanExit(BaseException): - pass - -class QubesTestCase(unittest.TestCase): - '''Base class for Qubes unit tests. - ''' - - def __init__(self, *args, **kwargs): - super(QubesTestCase, self).__init__(*args, **kwargs) - self.longMessage = True - self.log = logging.getLogger('{}.{}.{}'.format( - self.__class__.__module__, - self.__class__.__name__, - self._testMethodName)) - - - def __str__(self): - return '{}/{}/{}'.format( - '.'.join(self.__class__.__module__.split('.')[2:]), - self.__class__.__name__, - self._testMethodName) - - - def tearDown(self): - super(QubesTestCase, self).tearDown() - - result = self._resultForDoCleanups - l = result.failures \ - + result.errors \ - + [(tc, None) for tc in result.unexpectedSuccesses] - - if getattr(result, 'do_not_clean', False) \ - and filter((lambda (tc, exc): tc is self), l): - raise BeforeCleanExit() - - - def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs): - """Fail if an exception of class excClass is raised - by callableObj when invoked with arguments args and keyword - arguments kwargs. If a different type of exception is - raised, it will not be caught, and the test case will be - deemed to have suffered an error, exactly as for an - unexpected exception. - - If called with callableObj omitted or None, will return a - context object used like this:: - - with self.assertRaises(SomeException): - do_something() - - The context manager keeps a reference to the exception as - the 'exception' attribute. This allows you to inspect the - exception after the assertion:: - - with self.assertRaises(SomeException) as cm: - do_something() - the_exception = cm.exception - self.assertEqual(the_exception.error_code, 3) - """ - context = _AssertNotRaisesContext(excClass, self) - if callableObj is None: - return context - with context: - callableObj(*args, **kwargs) - - - def assertXMLEqual(self, xml1, xml2): - '''Check for equality of two XML objects. - - :param xml1: first element - :param xml2: second element - :type xml1: :py:class:`lxml.etree._Element` - :type xml2: :py:class:`lxml.etree._Element` - ''' # pylint: disable=invalid-name - - self.assertEqual(xml1.tag, xml2.tag) - self.assertEqual(xml1.text, xml2.text) - self.assertItemsEqual(xml1.keys(), xml2.keys()) - for key in xml1.keys(): - self.assertEqual(xml1.get(key), xml2.get(key)) - - -class SystemTestsMixin(object): - def setUp(self): - '''Set up the test. - - .. warning:: - This method instantiates QubesVmCollection acquires write lock for - it. You can use is as :py:attr:`qc`. You can (and probably - should) release the lock at the end of setUp in subclass - ''' - - super(SystemTestsMixin, self).setUp() - - self.qc = qubes.qubes.QubesVmCollection() - self.qc.lock_db_for_writing() - self.qc.load() - - self.conn = libvirt.open(qubes.qubes.defaults['libvirt_uri']) - - self.remove_test_vms() - - - def tearDown(self): - super(SystemTestsMixin, self).tearDown() - - try: self.qc.lock_db_for_writing() - except qubes.qubes.QubesException: pass - self.qc.load() - - self.remove_test_vms() - - self.qc.save() - self.qc.unlock_db() - del self.qc - - self.conn.close() - - - def make_vm_name(self, name): - return VMPREFIX + name - - def save_and_reload_db(self): - self.qc.save() - self.qc.unlock_db() - self.qc.lock_db_for_writing() - self.qc.load() - - def _remove_vm_qubes(self, vm): - vmname = vm.name - - try: - # XXX .is_running() may throw libvirtError if undefined - if vm.is_running(): - vm.force_shutdown() - except: pass - - try: vm.remove_from_disk() - except: pass - - try: vm.libvirt_domain.undefine() - except libvirt.libvirtError: pass - - self.qc.pop(vm.qid) - del vm - - # Now ensure it really went away. This may not have happened, - # for example if vm.libvirtDomain malfunctioned. - try: - dom = self.conn.lookupByName(vmname) - except: pass - else: - self._remove_vm_libvirt(dom) - - self._remove_vm_disk(vmname) - - - def _remove_vm_libvirt(self, dom): - try: - dom.destroy() - except libvirt.libvirtError: # not running - pass - dom.undefine() - - - def _remove_vm_disk(self, vmname): - for dirspec in ( - 'qubes_appvms_dir', - 'qubes_servicevms_dir', - 'qubes_templates_dir'): - dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'], - qubes.qubes.system_path[dirspec], vmname) - if os.path.exists(dirpath): - if os.path.isdir(dirpath): - shutil.rmtree(dirpath) - else: - os.unlink(dirpath) - - - def remove_vms(self, vms): - for vm in vms: self._remove_vm_qubes(vm) - self.save_and_reload_db() - - - def remove_test_vms(self): - '''Aggresively remove any domain that has name in testing namespace. - - .. warning:: - The test suite hereby claims any domain whose name starts with - :py:data:`VMPREFIX` as fair game. This is needed to enforce sane - test executing environment. If you have domains named ``test-*``, - don't run the tests. - ''' - - # first, remove them Qubes-way - something_removed = False - for vm in self.qc.values(): - if vm.name.startswith(VMPREFIX): - self._remove_vm_qubes(vm) - something_removed = True - if something_removed: - self.save_and_reload_db() - - # now remove what was only in libvirt - for dom in self.conn.listAllDomains(): - if dom.name().startswith(VMPREFIX): - self._remove_vm_libvirt(dom) - - # finally remove anything that is left on disk - vmnames = set() - for dirspec in ( - 'qubes_appvms_dir', - 'qubes_servicevms_dir', - 'qubes_templates_dir'): - dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'], - qubes.qubes.system_path[dirspec]) - for name in os.listdir(dirpath): - if name.startswith(VMPREFIX): - vmnames.add(name) - for vmname in vmnames: - self._remove_vm_disk(vmname) - - - class BackupTestsMixin(SystemTestsMixin): def setUp(self): super(BackupTestsMixin, self).setUp() @@ -496,24 +153,3 @@ class BackupTestsMixin(SystemTestsMixin): f = open(path, "w") f.truncate(size) f.close() - - -def load_tests(loader, tests, pattern): - # discard any tests from this module, because it hosts base classes - tests = unittest.TestSuite() - - for modname in ( - 'qubes.tests.basic', - 'qubes.tests.dom0_update', - 'qubes.tests.network', - 'qubes.tests.vm_qrexec_gui', - 'qubes.tests.backup', - 'qubes.tests.backupcompatibility', - 'qubes.tests.regressions', - ): - tests.addTests(loader.loadTestsFromName(modname)) - - return tests - - -# vim: ts=4 sw=4 et diff --git a/tests/run.py b/tests/run.py deleted file mode 100755 index d4064898..00000000 --- a/tests/run.py +++ /dev/null @@ -1,361 +0,0 @@ -#!/usr/bin/python2 -O -# vim: fileencoding=utf-8 - -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2014-2015 Joanna Rutkowska -# Copyright (C) 2014-2015 Wojtek Porczyk -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# - -import argparse -import curses -import importlib -import logging -import logging.handlers -import os -import socket -import subprocess -import sys -import unittest -import unittest.signals - -import qubes.tests - -class CursesColor(dict): - def __init__(self): - super(CursesColor, self).__init__() - try: - curses.setupterm() - except curses.error: - return - - # pylint: disable=bad-whitespace - self['black'] = curses.tparm(curses.tigetstr('setaf'), 0) - self['red'] = curses.tparm(curses.tigetstr('setaf'), 1) - self['green'] = curses.tparm(curses.tigetstr('setaf'), 2) - self['yellow'] = curses.tparm(curses.tigetstr('setaf'), 3) - self['blue'] = curses.tparm(curses.tigetstr('setaf'), 4) - self['magenta'] = curses.tparm(curses.tigetstr('setaf'), 5) - self['cyan'] = curses.tparm(curses.tigetstr('setaf'), 6) - self['white'] = curses.tparm(curses.tigetstr('setaf'), 7) - - self['bold'] = curses.tigetstr('bold') - self['normal'] = curses.tigetstr('sgr0') - - def __missing__(self, key): - # pylint: disable=unused-argument,no-self-use - return '' - - -class QubesTestResult(unittest.TestResult): - '''A test result class that can print colourful text results to a stream. - - Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult. - ''' - - separator1 = unittest.TextTestResult.separator1 - separator2 = unittest.TextTestResult.separator2 - - def __init__(self, stream, descriptions, verbosity): - super(QubesTestResult, self).__init__(stream, descriptions, verbosity) - self.stream = stream - self.showAll = verbosity > 1 # pylint: disable=invalid-name - self.dots = verbosity == 1 - self.descriptions = descriptions - - self.color = CursesColor() - self.hostname = socket.gethostname() - - self.log = logging.getLogger('qubes.tests') - - - def _fmtexc(self, err): - if str(err[1]): - return '{color[bold]}{}:{color[normal]} {!s}'.format( - err[0].__name__, err[1], color=self.color) - else: - return '{color[bold]}{}{color[normal]}'.format( - err[0].__name__, color=self.color) - - def getDescription(self, test): # pylint: disable=invalid-name - teststr = str(test).split('/') - for i in range(-2, 0): - try: - fullname = teststr[i].split('_', 2) - except IndexError: - continue - fullname[-1] = '{color[bold]}{}{color[normal]}'.format( - fullname[-1], color=self.color) - teststr[i] = '_'.join(fullname) - teststr = '/'.join(teststr) - - doc_first_line = test.shortDescription() - if self.descriptions and doc_first_line: - return '\n'.join((teststr, ' {}'.format( - doc_first_line, color=self.color))) - else: - return teststr - - def startTest(self, test): # pylint: disable=invalid-name - super(QubesTestResult, self).startTest(test) - test.log.critical('started') - if self.showAll: -# if not qubes.tests.in_git: - self.stream.write('{}: '.format(self.hostname)) - self.stream.write(self.getDescription(test)) - self.stream.write(' ... ') - self.stream.flush() - - def addSuccess(self, test): # pylint: disable=invalid-name - super(QubesTestResult, self).addSuccess(test) - test.log.warning('ok') - if self.showAll: - self.stream.writeln('{color[green]}ok{color[normal]}'.format( - color=self.color)) - elif self.dots: - self.stream.write('.') - self.stream.flush() - - def addError(self, test, err): # pylint: disable=invalid-name - super(QubesTestResult, self).addError(test, err) - test.log.critical('ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err)) - if self.showAll: - self.stream.writeln( - '{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format( - self._fmtexc(err), color=self.color)) - elif self.dots: - self.stream.write( - '{color[red]}{color[bold]}E{color[normal]}'.format( - color=self.color)) - self.stream.flush() - - def addFailure(self, test, err): # pylint: disable=invalid-name - super(QubesTestResult, self).addFailure(test, err) - test.log.error('FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err)) - if self.showAll: - self.stream.writeln('{color[red]}FAIL{color[normal]}'.format( - color=self.color)) - elif self.dots: - self.stream.write('{color[red]}F{color[normal]}'.format( - color=self.color)) - self.stream.flush() - - def addSkip(self, test, reason): # pylint: disable=invalid-name - super(QubesTestResult, self).addSkip(test, reason) - test.log.warning('skipped ({})'.format(reason)) - if self.showAll: - self.stream.writeln( - '{color[cyan]}skipped{color[normal]} ({})'.format( - reason, color=self.color)) - elif self.dots: - self.stream.write('{color[cyan]}s{color[normal]}'.format( - color=self.color)) - self.stream.flush() - - def addExpectedFailure(self, test, err): # pylint: disable=invalid-name - super(QubesTestResult, self).addExpectedFailure(test, err) - test.log.warning('expected failure') - if self.showAll: - self.stream.writeln( - '{color[yellow]}expected failure{color[normal]}'.format( - color=self.color)) - elif self.dots: - self.stream.write('{color[yellow]}x{color[normal]}'.format( - color=self.color)) - self.stream.flush() - - def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name - super(QubesTestResult, self).addUnexpectedSuccess(test) - test.log.error('unexpected success') - if self.showAll: - self.stream.writeln( - '{color[yellow]}{color[bold]}unexpected success' - '{color[normal]}'.format(color=self.color)) - elif self.dots: - self.stream.write( - '{color[yellow]}{color[bold]}u{color[normal]}'.format( - color=self.color)) - self.stream.flush() - - def printErrors(self): # pylint: disable=invalid-name - if self.dots or self.showAll: - self.stream.writeln() - self.printErrorList( - '{color[red]}{color[bold]}ERROR{color[normal]}'.format( - color=self.color), - self.errors) - self.printErrorList( - '{color[red]}FAIL{color[normal]}'.format(color=self.color), - self.failures) - - def printErrorList(self, flavour, errors): # pylint: disable=invalid-name - for test, err in errors: - self.stream.writeln(self.separator1) - self.stream.writeln('%s: %s' % (flavour, self.getDescription(test))) - self.stream.writeln(self.separator2) - self.stream.writeln('%s' % err) - - -class QubesDNCTestResult(QubesTestResult): - do_not_clean = True - - -parser = argparse.ArgumentParser( - epilog='''When running only specific tests, write their names like in log, - in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial - "qubes.tests.". Example: basic/TC_00_Basic/test_000_create''') - -parser.add_argument('--verbose', '-v', - action='count', - help='increase console verbosity level') -parser.add_argument('--quiet', '-q', - action='count', - help='decrease console verbosity level') - -parser.add_argument('--list', '-l', - action='store_true', dest='list', - help='list all available tests and exit') - -parser.add_argument('--failfast', '-f', - action='store_true', dest='failfast', - help='stop on the first fail, error or unexpected success') -parser.add_argument('--no-failfast', - action='store_false', dest='failfast', - help='disable --failfast') - -parser.add_argument('--do-not-clean', '--dnc', '-D', - action='store_true', dest='do_not_clean', - help='do not execute tearDown on failed tests. Implies --failfast.') -parser.add_argument('--do-clean', '-C', - action='store_false', dest='do_not_clean', - help='do execute tearDown even on failed tests.') - -parser.add_argument('--loglevel', '-L', metavar='LEVEL', - action='store', choices=tuple(k - for k in sorted(logging._levelNames.keys(), - key=lambda x: logging._levelNames[x]) - if isinstance(k, basestring)), - help='logging level for file and syslog forwarding ' - '(one of: %(choices)s; default: %(default)s)') - -parser.add_argument('--logfile', '-o', metavar='FILE', - action='store', - help='if set, test run will be also logged to file') - -parser.add_argument('--syslog', - action='store_true', dest='syslog', - help='reenable logging to syslog') -parser.add_argument('--no-syslog', - action='store_false', dest='syslog', - help='disable logging to syslog') - -parser.add_argument('--kmsg', '--very-brave-or-very-stupid', - action='store_true', dest='kmsg', - help='log most important things to kernel ring-buffer') -parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers', - action='store_false', dest='kmsg', - help='do not abuse kernel ring-buffer') - -parser.add_argument('names', metavar='TESTNAME', - action='store', nargs='*', - help='list of tests to run named like in description ' - '(default: run all tests)') - -parser.set_defaults( - failfast=False, - loglevel='DEBUG', - logfile=None, - syslog=True, - kmsg=False, - verbose=2, - quiet=0) - - -def list_test_cases(suite): - for test in suite: - if isinstance(test, unittest.TestSuite): - #yield from - for i in list_test_cases(test): - yield i - else: - yield test - - -def main(): - args = parser.parse_args() - - suite = unittest.TestSuite() - loader = unittest.TestLoader() - - if args.names: - alltests = loader.loadTestsFromName('qubes.tests') - for name in args.names: - suite.addTests( - [test for test in list_test_cases(alltests) - if (str(test)+'/').startswith(name.replace('.', '/')+'/')]) - else: - suite.addTests(loader.loadTestsFromName('qubes.tests')) - - if args.list: - for test in list_test_cases(suite): - print(str(test)) - return True - - if args.do_not_clean: - args.failfast = True - - logging.root.setLevel(args.loglevel) - - if args.logfile is not None: - ha_file = logging.FileHandler( - os.path.join(os.environ['HOME'], args.logfile)) - ha_file.setFormatter( - logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s')) - logging.root.addHandler(ha_file) - - if args.syslog: - ha_syslog = logging.handlers.SysLogHandler('/dev/log') - ha_syslog.setFormatter( - logging.Formatter('%(name)s[%(process)d]: %(message)s')) - logging.root.addHandler(ha_syslog) - - if args.kmsg: - try: - subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg')) - except subprocess.CalledProcessError: - parser.error('could not chmod /dev/kmsg') - else: - ha_kmsg = logging.FileHandler('/dev/kmsg', 'w') - ha_kmsg.setFormatter( - logging.Formatter('%(name)s[%(process)d]: %(message)s')) - ha_kmsg.setLevel(logging.CRITICAL) - logging.root.addHandler(ha_kmsg) - - runner = unittest.TextTestRunner(stream=sys.stdout, - verbosity=(args.verbose-args.quiet), - failfast=args.failfast) - unittest.signals.installHandler() - - runner.resultclass = QubesDNCTestResult \ - if args.do_not_clean else QubesTestResult - - return runner.run(suite).wasSuccessful() - - -if __name__ == '__main__': - sys.exit(not main())