qubes/tests: port again the backported test runner
part of QubesOS/qubes-issues#1248
This commit is contained in:
parent
2e42a408e5
commit
dc3fcc3141
@ -129,7 +129,7 @@ variable-rgx=[a-z_][a-z0-9_]{2,30}$
|
||||
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
|
||||
|
||||
# Good variable names which should always be accepted, separated by a comma
|
||||
good-names=e,i,j,k,m,p,ex,Run,_,log,vm,xc,xs,ip,fd,fh,rw,st
|
||||
good-names=e,i,j,k,m,p,ex,Run,_,log,vm,xc,xs,ip,fd,fh,rw,st,tb
|
||||
|
||||
# Bad variable names which should always be refused, separated by a comma
|
||||
bad-names=foo,bar,baz,toto,tutu,tata
|
||||
|
@ -1,10 +1,13 @@
|
||||
#!/usr/bin/python2 -O
|
||||
# vim: fileencoding=utf-8
|
||||
# pylint: disable=invalid-name
|
||||
|
||||
#
|
||||
# The Qubes OS Project, https://www.qubes-os.org/
|
||||
#
|
||||
# Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
|
||||
# Copyright (C) 2014-2015
|
||||
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
|
||||
# Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
@ -23,8 +26,12 @@
|
||||
#
|
||||
|
||||
import collections
|
||||
import multiprocessing
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import lxml.etree
|
||||
@ -32,6 +39,8 @@ import lxml.etree
|
||||
import qubes.config
|
||||
import qubes.events
|
||||
|
||||
VMPREFIX = 'test-'
|
||||
|
||||
|
||||
#: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise
|
||||
in_dom0 = False
|
||||
@ -44,7 +53,6 @@ try:
|
||||
import libvirt
|
||||
libvirt.openReadOnly(qubes.config.defaults['libvirt_uri']).close()
|
||||
in_dom0 = True
|
||||
del libvirt
|
||||
except libvirt.libvirtError:
|
||||
pass
|
||||
|
||||
@ -66,7 +74,7 @@ def skipUnlessDom0(test_item):
|
||||
|
||||
Some tests (especially integration tests) have to be run in more or less
|
||||
working dom0. This is checked by connecting to libvirt.
|
||||
''' # pylint: disable=invalid-name
|
||||
'''
|
||||
|
||||
return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
|
||||
|
||||
@ -76,7 +84,7 @@ def skipUnlessGit(test_item):
|
||||
|
||||
There are very few tests that an be run only in git. One example is
|
||||
correctness of example code that won't get included in RPM.
|
||||
''' # pylint: disable=invalid-name
|
||||
'''
|
||||
|
||||
return unittest.skipUnless(in_git, 'outside git tree')(test_item)
|
||||
|
||||
@ -113,10 +121,64 @@ class TestEmitter(qubes.events.Emitter):
|
||||
self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1
|
||||
|
||||
|
||||
class _AssertNotRaisesContext(object):
|
||||
"""A context manager used to implement TestCase.assertNotRaises methods.
|
||||
|
||||
Stolen from unittest and hacked. Regexp support stripped.
|
||||
""" # pylint: disable=too-few-public-methods
|
||||
|
||||
def __init__(self, expected, test_case, expected_regexp=None):
|
||||
if expected_regexp is not None:
|
||||
raise NotImplementedError('expected_regexp is unsupported')
|
||||
|
||||
self.expected = expected
|
||||
self.exception = None
|
||||
|
||||
self.failureException = test_case.failureException
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
if exc_type is None:
|
||||
return True
|
||||
|
||||
try:
|
||||
exc_name = self.expected.__name__
|
||||
except AttributeError:
|
||||
exc_name = str(self.expected)
|
||||
|
||||
if issubclass(exc_type, self.expected):
|
||||
raise self.failureException(
|
||||
"{0} raised".format(exc_name))
|
||||
else:
|
||||
# pass through
|
||||
return False
|
||||
|
||||
self.exception = exc_value # store for later retrieval
|
||||
|
||||
|
||||
class BeforeCleanExit(BaseException):
|
||||
'''Raised from :py:meth:`QubesTestCase.tearDown` when
|
||||
:py:attr:`qubes.tests.run.QubesDNCTestResult.do_not_clean` is set.'''
|
||||
pass
|
||||
|
||||
|
||||
class QubesTestCase(unittest.TestCase):
|
||||
'''Base class for Qubes unit tests.
|
||||
'''
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(QubesTestCase, self).__init__(*args, **kwargs)
|
||||
self.longMessage = True
|
||||
self.log = logging.getLogger('{}.{}.{}'.format(
|
||||
self.__class__.__module__,
|
||||
self.__class__.__name__,
|
||||
self._testMethodName))
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return '{}/{}/{}'.format(
|
||||
'.'.join(self.__class__.__module__.split('.')[2:]),
|
||||
@ -124,6 +186,49 @@ class QubesTestCase(unittest.TestCase):
|
||||
self._testMethodName)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
super(QubesTestCase, self).tearDown()
|
||||
|
||||
result = self._resultForDoCleanups
|
||||
failed_test_cases = result.failures \
|
||||
+ result.errors \
|
||||
+ [(tc, None) for tc in result.unexpectedSuccesses]
|
||||
|
||||
if getattr(result, 'do_not_clean', False) \
|
||||
and any(tc is self for tc, exc in failed_test_cases):
|
||||
raise BeforeCleanExit()
|
||||
|
||||
|
||||
def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
|
||||
"""Fail if an exception of class excClass is raised
|
||||
by callableObj when invoked with arguments args and keyword
|
||||
arguments kwargs. If a different type of exception is
|
||||
raised, it will not be caught, and the test case will be
|
||||
deemed to have suffered an error, exactly as for an
|
||||
unexpected exception.
|
||||
|
||||
If called with callableObj omitted or None, will return a
|
||||
context object used like this::
|
||||
|
||||
with self.assertRaises(SomeException):
|
||||
do_something()
|
||||
|
||||
The context manager keeps a reference to the exception as
|
||||
the 'exception' attribute. This allows you to inspect the
|
||||
exception after the assertion::
|
||||
|
||||
with self.assertRaises(SomeException) as cm:
|
||||
do_something()
|
||||
the_exception = cm.exception
|
||||
self.assertEqual(the_exception.error_code, 3)
|
||||
"""
|
||||
context = _AssertNotRaisesContext(excClass, self)
|
||||
if callableObj is None:
|
||||
return context
|
||||
with context:
|
||||
callableObj(*args, **kwargs)
|
||||
|
||||
|
||||
def assertXMLEqual(self, xml1, xml2):
|
||||
'''Check for equality of two XML objects.
|
||||
|
||||
@ -131,7 +236,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
:param xml2: second element
|
||||
:type xml1: :py:class:`lxml.etree._Element`
|
||||
:type xml2: :py:class:`lxml.etree._Element`
|
||||
''' # pylint: disable=invalid-name
|
||||
'''
|
||||
|
||||
self.assertEqual(xml1.tag, xml2.tag)
|
||||
self.assertEqual(xml1.text, xml2.text)
|
||||
@ -151,7 +256,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
an event
|
||||
:param list kwargs: when given, all items must appear in kwargs passed \
|
||||
to an event
|
||||
''' # pylint: disable=invalid-name
|
||||
'''
|
||||
|
||||
for ev, ev_args, ev_kwargs in emitter.fired_events:
|
||||
if ev != event:
|
||||
@ -176,7 +281,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
an event
|
||||
:param list kwargs: when given, all items must appear in kwargs passed \
|
||||
to an event
|
||||
''' # pylint: disable=invalid-name
|
||||
'''
|
||||
|
||||
for ev, ev_args, ev_kwargs in emitter.fired_events:
|
||||
if ev != event:
|
||||
@ -205,7 +310,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
:param lxml.etree._Element xml: XML element instance to check
|
||||
:param str file: filename of Relax NG schema
|
||||
:param str schema: optional explicit schema string
|
||||
''' # pylint: disable=invalid-name,redefined-builtin
|
||||
''' # pylint: disable=redefined-builtin
|
||||
|
||||
if schema is not None and file is None:
|
||||
relaxng = schema
|
||||
@ -240,3 +345,147 @@ class QubesTestCase(unittest.TestCase):
|
||||
raise
|
||||
except AssertionError as e:
|
||||
self.fail(str(e))
|
||||
|
||||
|
||||
class SystemTestsMixin(object):
|
||||
def setUp(self):
|
||||
super(SystemTestsMixin, self).setUp()
|
||||
self.remove_test_vms()
|
||||
|
||||
tearDown = setUp
|
||||
|
||||
|
||||
@staticmethod
|
||||
def make_vm_name(name):
|
||||
return VMPREFIX + name
|
||||
|
||||
|
||||
def _remove_vm_qubes(self, vm):
|
||||
vmname = vm.name
|
||||
|
||||
try:
|
||||
# XXX .is_running() may throw libvirtError if undefined
|
||||
if vm.is_running():
|
||||
vm.force_shutdown()
|
||||
except: # pylint: disable=bare-except
|
||||
pass
|
||||
|
||||
try:
|
||||
vm.remove_from_disk()
|
||||
except: # pylint: disable=bare-except
|
||||
pass
|
||||
|
||||
try:
|
||||
vm.libvirt_domain.undefine()
|
||||
except libvirt.libvirtError:
|
||||
pass
|
||||
|
||||
del self.app.domains[vm]
|
||||
del vm
|
||||
|
||||
# Now ensure it really went away. This may not have happened,
|
||||
# for example if vm.libvirt_domain malfunctioned.
|
||||
try:
|
||||
dom = self.conn.lookupByName(vmname)
|
||||
except: # pylint: disable=bare-except
|
||||
pass
|
||||
else:
|
||||
self._remove_vm_libvirt(dom)
|
||||
|
||||
self._remove_vm_disk(vmname)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _remove_vm_libvirt(dom):
|
||||
try:
|
||||
dom.destroy()
|
||||
except libvirt.libvirtError: # not running
|
||||
pass
|
||||
dom.undefine()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _remove_vm_disk(vmname):
|
||||
for dirspec in (
|
||||
'qubes_appvms_dir',
|
||||
'qubes_servicevms_dir',
|
||||
'qubes_templates_dir'):
|
||||
dirpath = os.path.join(qubes.config.system_path['qubes_base_dir'],
|
||||
qubes.config.system_path[dirspec], vmname)
|
||||
if os.path.exists(dirpath):
|
||||
if os.path.isdir(dirpath):
|
||||
shutil.rmtree(dirpath)
|
||||
else:
|
||||
os.unlink(dirpath)
|
||||
|
||||
|
||||
def remove_vms(self, vms):
|
||||
for vm in vms:
|
||||
self._remove_vm_qubes(vm)
|
||||
self.save_and_reload_db()
|
||||
|
||||
|
||||
def remove_test_vms(self):
|
||||
'''Aggresively remove any domain that has name in testing namespace.
|
||||
|
||||
.. warning::
|
||||
The test suite hereby claims any domain whose name starts with
|
||||
:py:data:`VMPREFIX` as fair game. This is needed to enforce sane
|
||||
test executing environment. If you have domains named ``test-*``,
|
||||
don't run the tests.
|
||||
'''
|
||||
|
||||
# first, remove them Qubes-way
|
||||
something_removed = False
|
||||
for vm in self.app.domains:
|
||||
if vm.name.startswith(VMPREFIX):
|
||||
self._remove_vm_qubes(vm)
|
||||
something_removed = True
|
||||
if something_removed:
|
||||
self.save_and_reload_db()
|
||||
|
||||
# now remove what was only in libvirt
|
||||
for dom in self.conn.listAllDomains():
|
||||
if dom.name().startswith(VMPREFIX):
|
||||
self._remove_vm_libvirt(dom)
|
||||
|
||||
# finally remove anything that is left on disk
|
||||
vmnames = set()
|
||||
for dirspec in (
|
||||
'qubes_appvms_dir',
|
||||
'qubes_servicevms_dir',
|
||||
'qubes_templates_dir'):
|
||||
dirpath = os.path.join(qubes.config.system_path['qubes_base_dir'],
|
||||
qubes.config.system_path[dirspec])
|
||||
for name in os.listdir(dirpath):
|
||||
if name.startswith(VMPREFIX):
|
||||
vmnames.add(name)
|
||||
for vmname in vmnames:
|
||||
self._remove_vm_disk(vmname)
|
||||
|
||||
|
||||
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||
# discard any tests from this module, because it hosts base classes
|
||||
tests = unittest.TestSuite()
|
||||
|
||||
for modname in (
|
||||
# unit tests
|
||||
'qubes.tests.events',
|
||||
'qubes.tests.vm.init',
|
||||
'qubes.tests.vm.qubesvm',
|
||||
'qubes.tests.vm.adminvm',
|
||||
'qubes.tests.init',
|
||||
'qubes.tests.tools',
|
||||
|
||||
# integration tests
|
||||
# 'qubes.tests.init.basic',
|
||||
# 'qubes.tests.dom0_update',
|
||||
# 'qubes.tests.network',
|
||||
# 'qubes.tests.vm_qrexec_gui',
|
||||
# 'qubes.tests.backup',
|
||||
# 'qubes.tests.backupcompatibility',
|
||||
# 'qubes.tests.regressions',
|
||||
):
|
||||
tests.addTests(loader.loadTestsFromName(modname))
|
||||
|
||||
return tests
|
||||
|
@ -22,28 +22,22 @@
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import argparse
|
||||
import curses
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
test_order = [
|
||||
'qubes.tests.events',
|
||||
'qubes.tests.vm.init',
|
||||
'qubes.tests.vm.qubesvm',
|
||||
'qubes.tests.vm.adminvm',
|
||||
'qubes.tests.init',
|
||||
|
||||
'qubes.tests.tools',
|
||||
]
|
||||
|
||||
sys.path.insert(1, '../../')
|
||||
import unittest.signals
|
||||
|
||||
import qubes.tests
|
||||
|
||||
class ANSIColor(dict):
|
||||
class CursesColor(dict):
|
||||
def __init__(self):
|
||||
super(ANSIColor, self).__init__()
|
||||
super(CursesColor, self).__init__()
|
||||
try:
|
||||
curses.setupterm()
|
||||
except curses.error:
|
||||
@ -67,7 +61,7 @@ class ANSIColor(dict):
|
||||
return ''
|
||||
|
||||
|
||||
class ANSITestResult(unittest.TestResult):
|
||||
class QubesTestResult(unittest.TestResult):
|
||||
'''A test result class that can print colourful text results to a stream.
|
||||
|
||||
Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
|
||||
@ -77,15 +71,18 @@ class ANSITestResult(unittest.TestResult):
|
||||
separator2 = unittest.TextTestResult.separator2
|
||||
|
||||
def __init__(self, stream, descriptions, verbosity):
|
||||
super(ANSITestResult, self).__init__(stream, descriptions, verbosity)
|
||||
super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
|
||||
self.stream = stream
|
||||
self.showAll = verbosity > 1 # pylint: disable=invalid-name
|
||||
self.dots = verbosity == 1
|
||||
self.descriptions = descriptions
|
||||
|
||||
self.color = ANSIColor()
|
||||
self.color = CursesColor()
|
||||
self.hostname = socket.gethostname()
|
||||
|
||||
self.log = logging.getLogger('qubes.tests')
|
||||
|
||||
|
||||
def _fmtexc(self, err):
|
||||
if str(err[1]):
|
||||
return '{color[bold]}{}:{color[normal]} {!s}'.format(
|
||||
@ -114,7 +111,8 @@ class ANSITestResult(unittest.TestResult):
|
||||
return teststr
|
||||
|
||||
def startTest(self, test): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).startTest(test)
|
||||
super(QubesTestResult, self).startTest(test)
|
||||
test.log.critical('started')
|
||||
if self.showAll:
|
||||
if not qubes.tests.in_git:
|
||||
self.stream.write('{}: '.format(self.hostname))
|
||||
@ -123,7 +121,8 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.flush()
|
||||
|
||||
def addSuccess(self, test): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).addSuccess(test)
|
||||
super(QubesTestResult, self).addSuccess(test)
|
||||
test.log.warning('ok')
|
||||
if self.showAll:
|
||||
self.stream.writeln('{color[green]}ok{color[normal]}'.format(
|
||||
color=self.color))
|
||||
@ -132,7 +131,9 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.flush()
|
||||
|
||||
def addError(self, test, err): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).addError(test, err)
|
||||
super(QubesTestResult, self).addError(test, err)
|
||||
test.log.critical('ERROR ({err[0].__name__}: {err[1]!r})'.format(
|
||||
err=err))
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
|
||||
@ -144,7 +145,8 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.flush()
|
||||
|
||||
def addFailure(self, test, err): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).addFailure(test, err)
|
||||
super(QubesTestResult, self).addFailure(test, err)
|
||||
test.log.error('FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
|
||||
if self.showAll:
|
||||
self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
|
||||
color=self.color))
|
||||
@ -154,7 +156,8 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.flush()
|
||||
|
||||
def addSkip(self, test, reason): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).addSkip(test, reason)
|
||||
super(QubesTestResult, self).addSkip(test, reason)
|
||||
test.log.warning('skipped ({})'.format(reason))
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[cyan]}skipped{color[normal]} ({})'.format(
|
||||
@ -165,7 +168,8 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.flush()
|
||||
|
||||
def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).addExpectedFailure(test, err)
|
||||
super(QubesTestResult, self).addExpectedFailure(test, err)
|
||||
test.log.warning('expected failure')
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[yellow]}expected failure{color[normal]}'.format(
|
||||
@ -176,7 +180,8 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.flush()
|
||||
|
||||
def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
|
||||
super(ANSITestResult, self).addUnexpectedSuccess(test)
|
||||
super(QubesTestResult, self).addUnexpectedSuccess(test)
|
||||
test.log.error('unexpected success')
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[yellow]}{color[bold]}unexpected success'
|
||||
@ -206,6 +211,10 @@ class ANSITestResult(unittest.TestResult):
|
||||
self.stream.writeln('%s' % err)
|
||||
|
||||
|
||||
class QubesDNCTestResult(QubesTestResult):
|
||||
do_not_clean = True
|
||||
|
||||
|
||||
def demo(verbosity=2):
|
||||
class TC_00_Demo(qubes.tests.QubesTestCase):
|
||||
'''Demo class'''
|
||||
@ -237,19 +246,153 @@ def demo(verbosity=2):
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo)
|
||||
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity)
|
||||
runner.resultclass = ANSITestResult
|
||||
runner.resultclass = QubesTestResult
|
||||
return runner.run(suite).wasSuccessful()
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
epilog='''When running only specific tests, write their names like in log,
|
||||
in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
|
||||
"qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
|
||||
|
||||
parser.add_argument('--verbose', '-v',
|
||||
action='count',
|
||||
help='increase console verbosity level')
|
||||
parser.add_argument('--quiet', '-q',
|
||||
action='count',
|
||||
help='decrease console verbosity level')
|
||||
|
||||
parser.add_argument('--list', '-l',
|
||||
action='store_true', dest='list',
|
||||
help='list all available tests and exit')
|
||||
|
||||
parser.add_argument('--failfast', '-f',
|
||||
action='store_true', dest='failfast',
|
||||
help='stop on the first fail, error or unexpected success')
|
||||
parser.add_argument('--no-failfast',
|
||||
action='store_false', dest='failfast',
|
||||
help='disable --failfast')
|
||||
|
||||
parser.add_argument('--do-not-clean', '--dnc', '-D',
|
||||
action='store_true', dest='do_not_clean',
|
||||
help='do not execute tearDown on failed tests. Implies --failfast.')
|
||||
parser.add_argument('--do-clean', '-C',
|
||||
action='store_false', dest='do_not_clean',
|
||||
help='do execute tearDown even on failed tests.')
|
||||
|
||||
# pylint: disable=protected-access
|
||||
parser.add_argument('--loglevel', '-L', metavar='LEVEL',
|
||||
action='store', choices=tuple(k
|
||||
for k in sorted(logging._levelNames.keys(),
|
||||
key=lambda x: logging._levelNames[x])
|
||||
if isinstance(k, basestring)),
|
||||
help='logging level for file and syslog forwarding '
|
||||
'(one of: %(choices)s; default: %(default)s)')
|
||||
# pylint: enable=protected-access
|
||||
|
||||
parser.add_argument('--logfile', '-o', metavar='FILE',
|
||||
action='store',
|
||||
help='if set, test run will be also logged to file')
|
||||
|
||||
parser.add_argument('--syslog',
|
||||
action='store_true', dest='syslog',
|
||||
help='reenable logging to syslog')
|
||||
parser.add_argument('--no-syslog',
|
||||
action='store_false', dest='syslog',
|
||||
help='disable logging to syslog')
|
||||
|
||||
parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
|
||||
action='store_true', dest='kmsg',
|
||||
help='log most important things to kernel ring-buffer')
|
||||
parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
|
||||
action='store_false', dest='kmsg',
|
||||
help='do not abuse kernel ring-buffer')
|
||||
|
||||
parser.add_argument('names', metavar='TESTNAME',
|
||||
action='store', nargs='*',
|
||||
help='list of tests to run named like in description '
|
||||
'(default: run all tests)')
|
||||
|
||||
parser.set_defaults(
|
||||
failfast=False,
|
||||
loglevel='DEBUG',
|
||||
logfile=None,
|
||||
syslog=True,
|
||||
kmsg=False,
|
||||
verbose=2,
|
||||
quiet=0)
|
||||
|
||||
|
||||
def list_test_cases(suite):
|
||||
for test in suite:
|
||||
if isinstance(test, unittest.TestSuite):
|
||||
#yield from
|
||||
for i in list_test_cases(test):
|
||||
yield i
|
||||
else:
|
||||
yield test
|
||||
|
||||
|
||||
def main():
|
||||
args = parser.parse_args()
|
||||
|
||||
suite = unittest.TestSuite()
|
||||
loader = unittest.TestLoader()
|
||||
for modname in test_order:
|
||||
suite.addTests(loader.loadTestsFromName(modname))
|
||||
|
||||
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2)
|
||||
runner.resultclass = ANSITestResult
|
||||
if args.names:
|
||||
alltests = loader.loadTestsFromName('qubes.tests')
|
||||
for name in args.names:
|
||||
suite.addTests(
|
||||
[test for test in list_test_cases(alltests)
|
||||
if (str(test)+'/').startswith(name.replace('.', '/')+'/')])
|
||||
else:
|
||||
suite.addTests(loader.loadTestsFromName('qubes.tests'))
|
||||
|
||||
if args.list:
|
||||
for test in list_test_cases(suite):
|
||||
print(str(test)) # pylint: disable=superfluous-parens
|
||||
return True
|
||||
|
||||
if args.do_not_clean:
|
||||
args.failfast = True
|
||||
|
||||
logging.root.setLevel(args.loglevel)
|
||||
|
||||
if args.logfile is not None:
|
||||
ha_file = logging.FileHandler(
|
||||
os.path.join(os.environ['HOME'], args.logfile))
|
||||
ha_file.setFormatter(
|
||||
logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
|
||||
logging.root.addHandler(ha_file)
|
||||
|
||||
if args.syslog:
|
||||
ha_syslog = logging.handlers.SysLogHandler('/dev/log')
|
||||
ha_syslog.setFormatter(
|
||||
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
|
||||
logging.root.addHandler(ha_syslog)
|
||||
|
||||
if args.kmsg:
|
||||
try:
|
||||
subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
|
||||
except subprocess.CalledProcessError:
|
||||
parser.error('could not chmod /dev/kmsg')
|
||||
else:
|
||||
ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
|
||||
ha_kmsg.setFormatter(
|
||||
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
|
||||
ha_kmsg.setLevel(logging.CRITICAL)
|
||||
logging.root.addHandler(ha_kmsg)
|
||||
|
||||
runner = unittest.TextTestRunner(stream=sys.stdout,
|
||||
verbosity=(args.verbose-args.quiet),
|
||||
failfast=args.failfast)
|
||||
unittest.signals.installHandler()
|
||||
|
||||
runner.resultclass = QubesDNCTestResult \
|
||||
if args.do_not_clean else QubesTestResult
|
||||
|
||||
return runner.run(suite).wasSuccessful()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(not main())
|
||||
|
@ -1,346 +1,3 @@
|
||||
#!/usr/bin/python2 -O
|
||||
# vim: fileencoding=utf-8
|
||||
|
||||
#
|
||||
# The Qubes OS Project, https://www.qubes-os.org/
|
||||
#
|
||||
# Copyright (C) 2014-2015
|
||||
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
|
||||
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import multiprocessing
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import unittest
|
||||
|
||||
import lxml.etree
|
||||
import sys
|
||||
|
||||
import qubes.backup
|
||||
import qubes.qubes
|
||||
|
||||
VMPREFIX = 'test-'
|
||||
|
||||
|
||||
#: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise
|
||||
in_dom0 = False
|
||||
|
||||
#: :py:obj:`False` if outside of git repo,
|
||||
#: path to root of the directory otherwise
|
||||
in_git = False
|
||||
|
||||
try:
|
||||
import libvirt
|
||||
libvirt.openReadOnly(qubes.qubes.defaults['libvirt_uri']).close()
|
||||
in_dom0 = True
|
||||
except libvirt.libvirtError:
|
||||
pass
|
||||
|
||||
try:
|
||||
in_git = subprocess.check_output(
|
||||
['git', 'rev-parse', '--show-toplevel'],
|
||||
stderr=open(os.devnull, 'w')).strip()
|
||||
except subprocess.CalledProcessError:
|
||||
# git returned nonzero, we are outside git repo
|
||||
pass
|
||||
except OSError:
|
||||
# command not found; let's assume we're outside
|
||||
pass
|
||||
|
||||
|
||||
def skipUnlessDom0(test_item):
|
||||
'''Decorator that skips test outside dom0.
|
||||
|
||||
Some tests (especially integration tests) have to be run in more or less
|
||||
working dom0. This is checked by connecting to libvirt.
|
||||
''' # pylint: disable=invalid-name
|
||||
|
||||
return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
|
||||
|
||||
|
||||
def skipUnlessGit(test_item):
|
||||
'''Decorator that skips test outside git repo.
|
||||
|
||||
There are very few tests that an be run only in git. One example is
|
||||
correctness of example code that won't get included in RPM.
|
||||
''' # pylint: disable=invalid-name
|
||||
|
||||
return unittest.skipUnless(in_git, 'outside git tree')(test_item)
|
||||
|
||||
|
||||
class _AssertNotRaisesContext(object):
|
||||
"""A context manager used to implement TestCase.assertNotRaises methods.
|
||||
|
||||
Stolen from unittest and hacked. Regexp support stripped.
|
||||
"""
|
||||
|
||||
def __init__(self, expected, test_case, expected_regexp=None):
|
||||
self.expected = expected
|
||||
self.failureException = test_case.failureException
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
if exc_type is None:
|
||||
return True
|
||||
|
||||
try:
|
||||
exc_name = self.expected.__name__
|
||||
except AttributeError:
|
||||
exc_name = str(self.expected)
|
||||
|
||||
if issubclass(exc_type, self.expected):
|
||||
raise self.failureException(
|
||||
"{0} raised".format(exc_name))
|
||||
else:
|
||||
# pass through
|
||||
return False
|
||||
|
||||
self.exception = exc_value # store for later retrieval
|
||||
|
||||
|
||||
class BeforeCleanExit(BaseException):
|
||||
pass
|
||||
|
||||
class QubesTestCase(unittest.TestCase):
|
||||
'''Base class for Qubes unit tests.
|
||||
'''
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(QubesTestCase, self).__init__(*args, **kwargs)
|
||||
self.longMessage = True
|
||||
self.log = logging.getLogger('{}.{}.{}'.format(
|
||||
self.__class__.__module__,
|
||||
self.__class__.__name__,
|
||||
self._testMethodName))
|
||||
|
||||
|
||||
def __str__(self):
|
||||
return '{}/{}/{}'.format(
|
||||
'.'.join(self.__class__.__module__.split('.')[2:]),
|
||||
self.__class__.__name__,
|
||||
self._testMethodName)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
super(QubesTestCase, self).tearDown()
|
||||
|
||||
result = self._resultForDoCleanups
|
||||
l = result.failures \
|
||||
+ result.errors \
|
||||
+ [(tc, None) for tc in result.unexpectedSuccesses]
|
||||
|
||||
if getattr(result, 'do_not_clean', False) \
|
||||
and filter((lambda (tc, exc): tc is self), l):
|
||||
raise BeforeCleanExit()
|
||||
|
||||
|
||||
def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
|
||||
"""Fail if an exception of class excClass is raised
|
||||
by callableObj when invoked with arguments args and keyword
|
||||
arguments kwargs. If a different type of exception is
|
||||
raised, it will not be caught, and the test case will be
|
||||
deemed to have suffered an error, exactly as for an
|
||||
unexpected exception.
|
||||
|
||||
If called with callableObj omitted or None, will return a
|
||||
context object used like this::
|
||||
|
||||
with self.assertRaises(SomeException):
|
||||
do_something()
|
||||
|
||||
The context manager keeps a reference to the exception as
|
||||
the 'exception' attribute. This allows you to inspect the
|
||||
exception after the assertion::
|
||||
|
||||
with self.assertRaises(SomeException) as cm:
|
||||
do_something()
|
||||
the_exception = cm.exception
|
||||
self.assertEqual(the_exception.error_code, 3)
|
||||
"""
|
||||
context = _AssertNotRaisesContext(excClass, self)
|
||||
if callableObj is None:
|
||||
return context
|
||||
with context:
|
||||
callableObj(*args, **kwargs)
|
||||
|
||||
|
||||
def assertXMLEqual(self, xml1, xml2):
|
||||
'''Check for equality of two XML objects.
|
||||
|
||||
:param xml1: first element
|
||||
:param xml2: second element
|
||||
:type xml1: :py:class:`lxml.etree._Element`
|
||||
:type xml2: :py:class:`lxml.etree._Element`
|
||||
''' # pylint: disable=invalid-name
|
||||
|
||||
self.assertEqual(xml1.tag, xml2.tag)
|
||||
self.assertEqual(xml1.text, xml2.text)
|
||||
self.assertItemsEqual(xml1.keys(), xml2.keys())
|
||||
for key in xml1.keys():
|
||||
self.assertEqual(xml1.get(key), xml2.get(key))
|
||||
|
||||
|
||||
class SystemTestsMixin(object):
|
||||
def setUp(self):
|
||||
'''Set up the test.
|
||||
|
||||
.. warning::
|
||||
This method instantiates QubesVmCollection acquires write lock for
|
||||
it. You can use is as :py:attr:`qc`. You can (and probably
|
||||
should) release the lock at the end of setUp in subclass
|
||||
'''
|
||||
|
||||
super(SystemTestsMixin, self).setUp()
|
||||
|
||||
self.qc = qubes.qubes.QubesVmCollection()
|
||||
self.qc.lock_db_for_writing()
|
||||
self.qc.load()
|
||||
|
||||
self.conn = libvirt.open(qubes.qubes.defaults['libvirt_uri'])
|
||||
|
||||
self.remove_test_vms()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
super(SystemTestsMixin, self).tearDown()
|
||||
|
||||
try: self.qc.lock_db_for_writing()
|
||||
except qubes.qubes.QubesException: pass
|
||||
self.qc.load()
|
||||
|
||||
self.remove_test_vms()
|
||||
|
||||
self.qc.save()
|
||||
self.qc.unlock_db()
|
||||
del self.qc
|
||||
|
||||
self.conn.close()
|
||||
|
||||
|
||||
def make_vm_name(self, name):
|
||||
return VMPREFIX + name
|
||||
|
||||
def save_and_reload_db(self):
|
||||
self.qc.save()
|
||||
self.qc.unlock_db()
|
||||
self.qc.lock_db_for_writing()
|
||||
self.qc.load()
|
||||
|
||||
def _remove_vm_qubes(self, vm):
|
||||
vmname = vm.name
|
||||
|
||||
try:
|
||||
# XXX .is_running() may throw libvirtError if undefined
|
||||
if vm.is_running():
|
||||
vm.force_shutdown()
|
||||
except: pass
|
||||
|
||||
try: vm.remove_from_disk()
|
||||
except: pass
|
||||
|
||||
try: vm.libvirt_domain.undefine()
|
||||
except libvirt.libvirtError: pass
|
||||
|
||||
self.qc.pop(vm.qid)
|
||||
del vm
|
||||
|
||||
# Now ensure it really went away. This may not have happened,
|
||||
# for example if vm.libvirtDomain malfunctioned.
|
||||
try:
|
||||
dom = self.conn.lookupByName(vmname)
|
||||
except: pass
|
||||
else:
|
||||
self._remove_vm_libvirt(dom)
|
||||
|
||||
self._remove_vm_disk(vmname)
|
||||
|
||||
|
||||
def _remove_vm_libvirt(self, dom):
|
||||
try:
|
||||
dom.destroy()
|
||||
except libvirt.libvirtError: # not running
|
||||
pass
|
||||
dom.undefine()
|
||||
|
||||
|
||||
def _remove_vm_disk(self, vmname):
|
||||
for dirspec in (
|
||||
'qubes_appvms_dir',
|
||||
'qubes_servicevms_dir',
|
||||
'qubes_templates_dir'):
|
||||
dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'],
|
||||
qubes.qubes.system_path[dirspec], vmname)
|
||||
if os.path.exists(dirpath):
|
||||
if os.path.isdir(dirpath):
|
||||
shutil.rmtree(dirpath)
|
||||
else:
|
||||
os.unlink(dirpath)
|
||||
|
||||
|
||||
def remove_vms(self, vms):
|
||||
for vm in vms: self._remove_vm_qubes(vm)
|
||||
self.save_and_reload_db()
|
||||
|
||||
|
||||
def remove_test_vms(self):
|
||||
'''Aggresively remove any domain that has name in testing namespace.
|
||||
|
||||
.. warning::
|
||||
The test suite hereby claims any domain whose name starts with
|
||||
:py:data:`VMPREFIX` as fair game. This is needed to enforce sane
|
||||
test executing environment. If you have domains named ``test-*``,
|
||||
don't run the tests.
|
||||
'''
|
||||
|
||||
# first, remove them Qubes-way
|
||||
something_removed = False
|
||||
for vm in self.qc.values():
|
||||
if vm.name.startswith(VMPREFIX):
|
||||
self._remove_vm_qubes(vm)
|
||||
something_removed = True
|
||||
if something_removed:
|
||||
self.save_and_reload_db()
|
||||
|
||||
# now remove what was only in libvirt
|
||||
for dom in self.conn.listAllDomains():
|
||||
if dom.name().startswith(VMPREFIX):
|
||||
self._remove_vm_libvirt(dom)
|
||||
|
||||
# finally remove anything that is left on disk
|
||||
vmnames = set()
|
||||
for dirspec in (
|
||||
'qubes_appvms_dir',
|
||||
'qubes_servicevms_dir',
|
||||
'qubes_templates_dir'):
|
||||
dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'],
|
||||
qubes.qubes.system_path[dirspec])
|
||||
for name in os.listdir(dirpath):
|
||||
if name.startswith(VMPREFIX):
|
||||
vmnames.add(name)
|
||||
for vmname in vmnames:
|
||||
self._remove_vm_disk(vmname)
|
||||
|
||||
|
||||
|
||||
class BackupTestsMixin(SystemTestsMixin):
|
||||
def setUp(self):
|
||||
super(BackupTestsMixin, self).setUp()
|
||||
@ -496,24 +153,3 @@ class BackupTestsMixin(SystemTestsMixin):
|
||||
f = open(path, "w")
|
||||
f.truncate(size)
|
||||
f.close()
|
||||
|
||||
|
||||
def load_tests(loader, tests, pattern):
|
||||
# discard any tests from this module, because it hosts base classes
|
||||
tests = unittest.TestSuite()
|
||||
|
||||
for modname in (
|
||||
'qubes.tests.basic',
|
||||
'qubes.tests.dom0_update',
|
||||
'qubes.tests.network',
|
||||
'qubes.tests.vm_qrexec_gui',
|
||||
'qubes.tests.backup',
|
||||
'qubes.tests.backupcompatibility',
|
||||
'qubes.tests.regressions',
|
||||
):
|
||||
tests.addTests(loader.loadTestsFromName(modname))
|
||||
|
||||
return tests
|
||||
|
||||
|
||||
# vim: ts=4 sw=4 et
|
||||
|
361
tests/run.py
361
tests/run.py
@ -1,361 +0,0 @@
|
||||
#!/usr/bin/python2 -O
|
||||
# vim: fileencoding=utf-8
|
||||
|
||||
#
|
||||
# The Qubes OS Project, https://www.qubes-os.org/
|
||||
#
|
||||
# Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
|
||||
# Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import argparse
|
||||
import curses
|
||||
import importlib
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import unittest
|
||||
import unittest.signals
|
||||
|
||||
import qubes.tests
|
||||
|
||||
class CursesColor(dict):
|
||||
def __init__(self):
|
||||
super(CursesColor, self).__init__()
|
||||
try:
|
||||
curses.setupterm()
|
||||
except curses.error:
|
||||
return
|
||||
|
||||
# pylint: disable=bad-whitespace
|
||||
self['black'] = curses.tparm(curses.tigetstr('setaf'), 0)
|
||||
self['red'] = curses.tparm(curses.tigetstr('setaf'), 1)
|
||||
self['green'] = curses.tparm(curses.tigetstr('setaf'), 2)
|
||||
self['yellow'] = curses.tparm(curses.tigetstr('setaf'), 3)
|
||||
self['blue'] = curses.tparm(curses.tigetstr('setaf'), 4)
|
||||
self['magenta'] = curses.tparm(curses.tigetstr('setaf'), 5)
|
||||
self['cyan'] = curses.tparm(curses.tigetstr('setaf'), 6)
|
||||
self['white'] = curses.tparm(curses.tigetstr('setaf'), 7)
|
||||
|
||||
self['bold'] = curses.tigetstr('bold')
|
||||
self['normal'] = curses.tigetstr('sgr0')
|
||||
|
||||
def __missing__(self, key):
|
||||
# pylint: disable=unused-argument,no-self-use
|
||||
return ''
|
||||
|
||||
|
||||
class QubesTestResult(unittest.TestResult):
|
||||
'''A test result class that can print colourful text results to a stream.
|
||||
|
||||
Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
|
||||
'''
|
||||
|
||||
separator1 = unittest.TextTestResult.separator1
|
||||
separator2 = unittest.TextTestResult.separator2
|
||||
|
||||
def __init__(self, stream, descriptions, verbosity):
|
||||
super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
|
||||
self.stream = stream
|
||||
self.showAll = verbosity > 1 # pylint: disable=invalid-name
|
||||
self.dots = verbosity == 1
|
||||
self.descriptions = descriptions
|
||||
|
||||
self.color = CursesColor()
|
||||
self.hostname = socket.gethostname()
|
||||
|
||||
self.log = logging.getLogger('qubes.tests')
|
||||
|
||||
|
||||
def _fmtexc(self, err):
|
||||
if str(err[1]):
|
||||
return '{color[bold]}{}:{color[normal]} {!s}'.format(
|
||||
err[0].__name__, err[1], color=self.color)
|
||||
else:
|
||||
return '{color[bold]}{}{color[normal]}'.format(
|
||||
err[0].__name__, color=self.color)
|
||||
|
||||
def getDescription(self, test): # pylint: disable=invalid-name
|
||||
teststr = str(test).split('/')
|
||||
for i in range(-2, 0):
|
||||
try:
|
||||
fullname = teststr[i].split('_', 2)
|
||||
except IndexError:
|
||||
continue
|
||||
fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
|
||||
fullname[-1], color=self.color)
|
||||
teststr[i] = '_'.join(fullname)
|
||||
teststr = '/'.join(teststr)
|
||||
|
||||
doc_first_line = test.shortDescription()
|
||||
if self.descriptions and doc_first_line:
|
||||
return '\n'.join((teststr, ' {}'.format(
|
||||
doc_first_line, color=self.color)))
|
||||
else:
|
||||
return teststr
|
||||
|
||||
def startTest(self, test): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).startTest(test)
|
||||
test.log.critical('started')
|
||||
if self.showAll:
|
||||
# if not qubes.tests.in_git:
|
||||
self.stream.write('{}: '.format(self.hostname))
|
||||
self.stream.write(self.getDescription(test))
|
||||
self.stream.write(' ... ')
|
||||
self.stream.flush()
|
||||
|
||||
def addSuccess(self, test): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).addSuccess(test)
|
||||
test.log.warning('ok')
|
||||
if self.showAll:
|
||||
self.stream.writeln('{color[green]}ok{color[normal]}'.format(
|
||||
color=self.color))
|
||||
elif self.dots:
|
||||
self.stream.write('.')
|
||||
self.stream.flush()
|
||||
|
||||
def addError(self, test, err): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).addError(test, err)
|
||||
test.log.critical('ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
|
||||
self._fmtexc(err), color=self.color))
|
||||
elif self.dots:
|
||||
self.stream.write(
|
||||
'{color[red]}{color[bold]}E{color[normal]}'.format(
|
||||
color=self.color))
|
||||
self.stream.flush()
|
||||
|
||||
def addFailure(self, test, err): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).addFailure(test, err)
|
||||
test.log.error('FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
|
||||
if self.showAll:
|
||||
self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
|
||||
color=self.color))
|
||||
elif self.dots:
|
||||
self.stream.write('{color[red]}F{color[normal]}'.format(
|
||||
color=self.color))
|
||||
self.stream.flush()
|
||||
|
||||
def addSkip(self, test, reason): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).addSkip(test, reason)
|
||||
test.log.warning('skipped ({})'.format(reason))
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[cyan]}skipped{color[normal]} ({})'.format(
|
||||
reason, color=self.color))
|
||||
elif self.dots:
|
||||
self.stream.write('{color[cyan]}s{color[normal]}'.format(
|
||||
color=self.color))
|
||||
self.stream.flush()
|
||||
|
||||
def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).addExpectedFailure(test, err)
|
||||
test.log.warning('expected failure')
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[yellow]}expected failure{color[normal]}'.format(
|
||||
color=self.color))
|
||||
elif self.dots:
|
||||
self.stream.write('{color[yellow]}x{color[normal]}'.format(
|
||||
color=self.color))
|
||||
self.stream.flush()
|
||||
|
||||
def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
|
||||
super(QubesTestResult, self).addUnexpectedSuccess(test)
|
||||
test.log.error('unexpected success')
|
||||
if self.showAll:
|
||||
self.stream.writeln(
|
||||
'{color[yellow]}{color[bold]}unexpected success'
|
||||
'{color[normal]}'.format(color=self.color))
|
||||
elif self.dots:
|
||||
self.stream.write(
|
||||
'{color[yellow]}{color[bold]}u{color[normal]}'.format(
|
||||
color=self.color))
|
||||
self.stream.flush()
|
||||
|
||||
def printErrors(self): # pylint: disable=invalid-name
|
||||
if self.dots or self.showAll:
|
||||
self.stream.writeln()
|
||||
self.printErrorList(
|
||||
'{color[red]}{color[bold]}ERROR{color[normal]}'.format(
|
||||
color=self.color),
|
||||
self.errors)
|
||||
self.printErrorList(
|
||||
'{color[red]}FAIL{color[normal]}'.format(color=self.color),
|
||||
self.failures)
|
||||
|
||||
def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
|
||||
for test, err in errors:
|
||||
self.stream.writeln(self.separator1)
|
||||
self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
|
||||
self.stream.writeln(self.separator2)
|
||||
self.stream.writeln('%s' % err)
|
||||
|
||||
|
||||
class QubesDNCTestResult(QubesTestResult):
|
||||
do_not_clean = True
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
epilog='''When running only specific tests, write their names like in log,
|
||||
in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
|
||||
"qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
|
||||
|
||||
parser.add_argument('--verbose', '-v',
|
||||
action='count',
|
||||
help='increase console verbosity level')
|
||||
parser.add_argument('--quiet', '-q',
|
||||
action='count',
|
||||
help='decrease console verbosity level')
|
||||
|
||||
parser.add_argument('--list', '-l',
|
||||
action='store_true', dest='list',
|
||||
help='list all available tests and exit')
|
||||
|
||||
parser.add_argument('--failfast', '-f',
|
||||
action='store_true', dest='failfast',
|
||||
help='stop on the first fail, error or unexpected success')
|
||||
parser.add_argument('--no-failfast',
|
||||
action='store_false', dest='failfast',
|
||||
help='disable --failfast')
|
||||
|
||||
parser.add_argument('--do-not-clean', '--dnc', '-D',
|
||||
action='store_true', dest='do_not_clean',
|
||||
help='do not execute tearDown on failed tests. Implies --failfast.')
|
||||
parser.add_argument('--do-clean', '-C',
|
||||
action='store_false', dest='do_not_clean',
|
||||
help='do execute tearDown even on failed tests.')
|
||||
|
||||
parser.add_argument('--loglevel', '-L', metavar='LEVEL',
|
||||
action='store', choices=tuple(k
|
||||
for k in sorted(logging._levelNames.keys(),
|
||||
key=lambda x: logging._levelNames[x])
|
||||
if isinstance(k, basestring)),
|
||||
help='logging level for file and syslog forwarding '
|
||||
'(one of: %(choices)s; default: %(default)s)')
|
||||
|
||||
parser.add_argument('--logfile', '-o', metavar='FILE',
|
||||
action='store',
|
||||
help='if set, test run will be also logged to file')
|
||||
|
||||
parser.add_argument('--syslog',
|
||||
action='store_true', dest='syslog',
|
||||
help='reenable logging to syslog')
|
||||
parser.add_argument('--no-syslog',
|
||||
action='store_false', dest='syslog',
|
||||
help='disable logging to syslog')
|
||||
|
||||
parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
|
||||
action='store_true', dest='kmsg',
|
||||
help='log most important things to kernel ring-buffer')
|
||||
parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
|
||||
action='store_false', dest='kmsg',
|
||||
help='do not abuse kernel ring-buffer')
|
||||
|
||||
parser.add_argument('names', metavar='TESTNAME',
|
||||
action='store', nargs='*',
|
||||
help='list of tests to run named like in description '
|
||||
'(default: run all tests)')
|
||||
|
||||
parser.set_defaults(
|
||||
failfast=False,
|
||||
loglevel='DEBUG',
|
||||
logfile=None,
|
||||
syslog=True,
|
||||
kmsg=False,
|
||||
verbose=2,
|
||||
quiet=0)
|
||||
|
||||
|
||||
def list_test_cases(suite):
|
||||
for test in suite:
|
||||
if isinstance(test, unittest.TestSuite):
|
||||
#yield from
|
||||
for i in list_test_cases(test):
|
||||
yield i
|
||||
else:
|
||||
yield test
|
||||
|
||||
|
||||
def main():
|
||||
args = parser.parse_args()
|
||||
|
||||
suite = unittest.TestSuite()
|
||||
loader = unittest.TestLoader()
|
||||
|
||||
if args.names:
|
||||
alltests = loader.loadTestsFromName('qubes.tests')
|
||||
for name in args.names:
|
||||
suite.addTests(
|
||||
[test for test in list_test_cases(alltests)
|
||||
if (str(test)+'/').startswith(name.replace('.', '/')+'/')])
|
||||
else:
|
||||
suite.addTests(loader.loadTestsFromName('qubes.tests'))
|
||||
|
||||
if args.list:
|
||||
for test in list_test_cases(suite):
|
||||
print(str(test))
|
||||
return True
|
||||
|
||||
if args.do_not_clean:
|
||||
args.failfast = True
|
||||
|
||||
logging.root.setLevel(args.loglevel)
|
||||
|
||||
if args.logfile is not None:
|
||||
ha_file = logging.FileHandler(
|
||||
os.path.join(os.environ['HOME'], args.logfile))
|
||||
ha_file.setFormatter(
|
||||
logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
|
||||
logging.root.addHandler(ha_file)
|
||||
|
||||
if args.syslog:
|
||||
ha_syslog = logging.handlers.SysLogHandler('/dev/log')
|
||||
ha_syslog.setFormatter(
|
||||
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
|
||||
logging.root.addHandler(ha_syslog)
|
||||
|
||||
if args.kmsg:
|
||||
try:
|
||||
subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
|
||||
except subprocess.CalledProcessError:
|
||||
parser.error('could not chmod /dev/kmsg')
|
||||
else:
|
||||
ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
|
||||
ha_kmsg.setFormatter(
|
||||
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
|
||||
ha_kmsg.setLevel(logging.CRITICAL)
|
||||
logging.root.addHandler(ha_kmsg)
|
||||
|
||||
runner = unittest.TextTestRunner(stream=sys.stdout,
|
||||
verbosity=(args.verbose-args.quiet),
|
||||
failfast=args.failfast)
|
||||
unittest.signals.installHandler()
|
||||
|
||||
runner.resultclass = QubesDNCTestResult \
|
||||
if args.do_not_clean else QubesTestResult
|
||||
|
||||
return runner.run(suite).wasSuccessful()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(not main())
|
Loading…
Reference in New Issue
Block a user