7def96c248
Move this functionality from our custom runner (qubes.tests.run), into base test class. This is very useful for correlating logs, so lets have it with nose2 runner too.
410 lines
14 KiB
Python
Executable File
410 lines
14 KiB
Python
Executable File
#
|
|
# The Qubes OS Project, https://www.qubes-os.org/
|
|
#
|
|
# Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
|
|
# Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import argparse
|
|
import code
|
|
import curses
|
|
import itertools
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import unittest
|
|
import unittest.signals
|
|
|
|
import qubes.tests
|
|
import qubes.api.admin
|
|
|
|
class CursesColor(dict):
|
|
colors = (
|
|
'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
|
|
attrs = {
|
|
'bold': 'bold', 'normal': 'sgr0'}
|
|
def __init__(self):
|
|
super(CursesColor, self).__init__()
|
|
self.has_colors = False
|
|
try:
|
|
curses.setupterm()
|
|
self.has_colors = True
|
|
except curses.error:
|
|
return
|
|
|
|
|
|
def __missing__(self, key):
|
|
# pylint: disable=unused-argument,no-self-use
|
|
if not self.has_colors:
|
|
return ''
|
|
|
|
try:
|
|
value = curses.tigetstr(self.attrs[key])
|
|
except KeyError:
|
|
try:
|
|
value = curses.tparm(
|
|
curses.tigetstr('setaf'), self.colors.index(key))
|
|
except ValueError:
|
|
return ''
|
|
|
|
value = value.decode()
|
|
self[key] = value
|
|
return value
|
|
|
|
|
|
class QubesTestResult(unittest.TestResult):
|
|
'''A test result class that can print colourful text results to a stream.
|
|
|
|
Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
|
|
'''
|
|
|
|
separator1 = unittest.TextTestResult.separator1
|
|
separator2 = unittest.TextTestResult.separator2
|
|
|
|
def __init__(self, stream, descriptions, verbosity):
|
|
super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
|
|
self.stream = stream
|
|
self.showAll = verbosity > 1 # pylint: disable=invalid-name
|
|
self.dots = verbosity == 1
|
|
self.descriptions = descriptions
|
|
|
|
self.color = CursesColor()
|
|
self.hostname = socket.gethostname()
|
|
|
|
self.log = logging.getLogger('qubes.tests')
|
|
|
|
|
|
def _fmtexc(self, err):
|
|
if str(err[1]):
|
|
return '{color[bold]}{}:{color[normal]} {!s}'.format(
|
|
err[0].__name__, err[1], color=self.color)
|
|
else:
|
|
return '{color[bold]}{}{color[normal]}'.format(
|
|
err[0].__name__, color=self.color)
|
|
|
|
def get_log(self, test):
|
|
try:
|
|
return test.log
|
|
except AttributeError:
|
|
return self.log
|
|
|
|
def getDescription(self, test): # pylint: disable=invalid-name
|
|
teststr = str(test).split('/')
|
|
for i in range(-2, 0):
|
|
try:
|
|
fullname = teststr[i].split('_', 2)
|
|
except IndexError:
|
|
continue
|
|
fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
|
|
fullname[-1], color=self.color)
|
|
teststr[i] = '_'.join(fullname)
|
|
teststr = '/'.join(teststr)
|
|
|
|
doc_first_line = test.shortDescription()
|
|
if self.descriptions and doc_first_line:
|
|
return '\n'.join((teststr, ' {}'.format(
|
|
doc_first_line, color=self.color)))
|
|
else:
|
|
return teststr
|
|
|
|
def startTest(self, test): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).startTest(test)
|
|
self.get_log(test).critical('started')
|
|
if self.showAll:
|
|
if not qubes.tests.in_git:
|
|
self.stream.write('{}: '.format(self.hostname))
|
|
self.stream.write(self.getDescription(test))
|
|
self.stream.write(' ... ')
|
|
self.stream.flush()
|
|
|
|
def addSuccess(self, test): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).addSuccess(test)
|
|
self.get_log(test).warning('ok')
|
|
if self.showAll:
|
|
self.stream.writeln('{color[green]}ok{color[normal]}'.format(
|
|
color=self.color))
|
|
elif self.dots:
|
|
self.stream.write('.')
|
|
self.stream.flush()
|
|
|
|
def addError(self, test, err): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).addError(test, err)
|
|
self.get_log(test).critical(
|
|
'ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
|
|
if self.showAll:
|
|
self.stream.writeln(
|
|
'{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
|
|
self._fmtexc(err), color=self.color))
|
|
elif self.dots:
|
|
self.stream.write(
|
|
'{color[red]}{color[bold]}E{color[normal]}'.format(
|
|
color=self.color))
|
|
self.stream.flush()
|
|
|
|
def addFailure(self, test, err): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).addFailure(test, err)
|
|
self.get_log(test).error(
|
|
'FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
|
|
if self.showAll:
|
|
self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
|
|
color=self.color))
|
|
elif self.dots:
|
|
self.stream.write('{color[red]}F{color[normal]}'.format(
|
|
color=self.color))
|
|
self.stream.flush()
|
|
|
|
def addSkip(self, test, reason): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).addSkip(test, reason)
|
|
self.get_log(test).warning('skipped ({})'.format(reason))
|
|
if self.showAll:
|
|
self.stream.writeln(
|
|
'{color[cyan]}skipped{color[normal]} ({})'.format(
|
|
reason, color=self.color))
|
|
elif self.dots:
|
|
self.stream.write('{color[cyan]}s{color[normal]}'.format(
|
|
color=self.color))
|
|
self.stream.flush()
|
|
|
|
def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).addExpectedFailure(test, err)
|
|
self.get_log(test).warning('expected failure')
|
|
if self.showAll:
|
|
self.stream.writeln(
|
|
'{color[yellow]}expected failure{color[normal]}'.format(
|
|
color=self.color))
|
|
elif self.dots:
|
|
self.stream.write('{color[yellow]}x{color[normal]}'.format(
|
|
color=self.color))
|
|
self.stream.flush()
|
|
|
|
def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
|
|
super(QubesTestResult, self).addUnexpectedSuccess(test)
|
|
self.get_log(test).error('unexpected success')
|
|
if self.showAll:
|
|
self.stream.writeln(
|
|
'{color[yellow]}{color[bold]}unexpected success'
|
|
'{color[normal]}'.format(color=self.color))
|
|
elif self.dots:
|
|
self.stream.write(
|
|
'{color[yellow]}{color[bold]}u{color[normal]}'.format(
|
|
color=self.color))
|
|
self.stream.flush()
|
|
|
|
def printErrors(self): # pylint: disable=invalid-name
|
|
if self.dots or self.showAll:
|
|
self.stream.writeln()
|
|
self.printErrorList(
|
|
'{color[red]}{color[bold]}ERROR{color[normal]}'.format(
|
|
color=self.color),
|
|
self.errors)
|
|
self.printErrorList(
|
|
'{color[red]}FAIL{color[normal]}'.format(
|
|
color=self.color),
|
|
self.failures)
|
|
self.printErrorList(
|
|
'{color[yellow]}EXPECTED{color[normal]}'.format(
|
|
color=self.color),
|
|
self.expectedFailures)
|
|
|
|
def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
|
|
for test, err in errors:
|
|
self.stream.writeln(self.separator1)
|
|
self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
|
|
self.stream.writeln(self.separator2)
|
|
self.stream.writeln('%s' % err)
|
|
|
|
|
|
def demo(verbosity=2):
|
|
class TC_00_Demo(qubes.tests.QubesTestCase):
|
|
'''Demo class'''
|
|
# pylint: disable=no-self-use
|
|
def test_0_success(self):
|
|
'''Demo test (success)'''
|
|
pass
|
|
def test_1_error(self):
|
|
'''Demo test (error)'''
|
|
raise Exception()
|
|
def test_2_failure(self):
|
|
'''Demo test (failure)'''
|
|
self.fail('boo')
|
|
def test_3_skip(self):
|
|
'''Demo test (skipped by call to self.skipTest())'''
|
|
self.skipTest('skip')
|
|
@unittest.skip(None)
|
|
def test_4_skip_decorator(self):
|
|
'''Demo test (skipped by decorator)'''
|
|
pass
|
|
@unittest.expectedFailure
|
|
def test_5_expected_failure(self):
|
|
'''Demo test (expected failure)'''
|
|
self.fail()
|
|
@unittest.expectedFailure
|
|
def test_6_unexpected_success(self):
|
|
'''Demo test (unexpected success)'''
|
|
pass
|
|
|
|
suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo)
|
|
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity)
|
|
runner.resultclass = QubesTestResult
|
|
return runner.run(suite).wasSuccessful()
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
epilog='''When running only specific tests, write their names like in log,
|
|
in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
|
|
"qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
|
|
|
|
parser.add_argument('--verbose', '-v',
|
|
action='count',
|
|
help='increase console verbosity level')
|
|
parser.add_argument('--quiet', '-q',
|
|
action='count',
|
|
help='decrease console verbosity level')
|
|
|
|
parser.add_argument('--list', '-l',
|
|
action='store_true', dest='list',
|
|
help='list all available tests and exit')
|
|
|
|
parser.add_argument('--failfast', '-f',
|
|
action='store_true', dest='failfast',
|
|
help='stop on the first fail, error or unexpected success')
|
|
parser.add_argument('--no-failfast',
|
|
action='store_false', dest='failfast',
|
|
help='disable --failfast')
|
|
|
|
# pylint: disable=protected-access
|
|
try:
|
|
name_to_level = logging._nameToLevel
|
|
except AttributeError:
|
|
name_to_level = logging._levelNames
|
|
parser.add_argument('--loglevel', '-L', metavar='LEVEL',
|
|
action='store', choices=tuple(k
|
|
for k in sorted(name_to_level.keys(),
|
|
key=lambda x: name_to_level[x])
|
|
if isinstance(k, str)),
|
|
help='logging level for file and syslog forwarding '
|
|
'(one of: %(choices)s; default: %(default)s)')
|
|
del name_to_level
|
|
# pylint: enable=protected-access
|
|
|
|
parser.add_argument('--logfile', '-o', metavar='FILE',
|
|
action='store',
|
|
help='if set, test run will be also logged to file')
|
|
|
|
parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
|
|
action='store_true', dest='kmsg',
|
|
help='log most important things to kernel ring-buffer')
|
|
parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
|
|
action='store_false', dest='kmsg',
|
|
help='do not abuse kernel ring-buffer')
|
|
|
|
parser.add_argument('--allow-running-along-qubesd',
|
|
action='store_true', default=False,
|
|
help='allow running in parallel with qubesd;'
|
|
' this is DANGEROUS and WILL RESULT IN INCONSISTENT SYSTEM STATE')
|
|
|
|
parser.add_argument('--break-to-repl',
|
|
action='store_true', default=False,
|
|
help='break to REPL after tests')
|
|
|
|
parser.add_argument('names', metavar='TESTNAME',
|
|
action='store', nargs='*',
|
|
help='list of tests to run named like in description '
|
|
'(default: run all tests)')
|
|
|
|
parser.set_defaults(
|
|
failfast=False,
|
|
loglevel='DEBUG',
|
|
logfile=None,
|
|
kmsg=False,
|
|
verbose=2,
|
|
quiet=0)
|
|
|
|
|
|
def list_test_cases(suite):
|
|
for test in suite:
|
|
if isinstance(test, unittest.TestSuite):
|
|
#yield from
|
|
for i in list_test_cases(test):
|
|
yield i
|
|
else:
|
|
yield test
|
|
|
|
|
|
def main(args=None):
|
|
args = parser.parse_args(args)
|
|
|
|
suite = unittest.TestSuite()
|
|
loader = unittest.TestLoader()
|
|
|
|
if args.names:
|
|
alltests = loader.loadTestsFromName('qubes.tests')
|
|
for name in args.names:
|
|
suite.addTests(
|
|
[test for test in list_test_cases(alltests)
|
|
if str(test).startswith(name)])
|
|
else:
|
|
suite.addTests(loader.loadTestsFromName('qubes.tests'))
|
|
|
|
if args.list:
|
|
for test in list_test_cases(suite):
|
|
print(str(test)) # pylint: disable=superfluous-parens
|
|
return True
|
|
|
|
logging.root.setLevel(args.loglevel)
|
|
|
|
if args.logfile is not None:
|
|
ha_file = logging.FileHandler(
|
|
os.path.join(os.environ['HOME'], args.logfile))
|
|
ha_file.setFormatter(
|
|
logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
|
|
logging.root.addHandler(ha_file)
|
|
|
|
if args.kmsg:
|
|
try:
|
|
subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
|
|
except subprocess.CalledProcessError:
|
|
parser.error('could not chmod /dev/kmsg')
|
|
else:
|
|
ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
|
|
ha_kmsg.setFormatter(
|
|
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
|
|
ha_kmsg.setLevel(logging.CRITICAL)
|
|
logging.root.addHandler(ha_kmsg)
|
|
|
|
if not args.allow_running_along_qubesd \
|
|
and os.path.exists(qubes.api.admin.QubesAdminAPI.SOCKNAME):
|
|
parser.error('refusing to run until qubesd is disabled')
|
|
|
|
runner = unittest.TextTestRunner(stream=sys.stdout,
|
|
verbosity=(args.verbose-args.quiet),
|
|
failfast=args.failfast)
|
|
unittest.signals.installHandler()
|
|
runner.resultclass = QubesTestResult
|
|
result = runner.run(suite)
|
|
|
|
if args.break_to_repl:
|
|
code.interact(local=locals())
|
|
|
|
return result.wasSuccessful()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(not main())
|