core-admin/qubes/tests/run.py
Wojtek Porczyk 6ff1bfdc16 qubes/tests: convert some tearDown into addCleanup
This is because .tearDown() is not executed if the exception occurs in
setUp() [for example self.skipTest() raises an exception]. The lower
levels of .tearDown() being executed are critical to not leaking file
descriptors.
2017-08-31 20:30:24 +02:00

425 lines
14 KiB
Python
Executable File

#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import argparse
import code
import curses
import itertools
import logging
import logging.handlers
import os
import socket
import subprocess
import sys
import unittest
import unittest.signals
import qubes.tests
import qubes.api.admin
class CursesColor(dict):
colors = (
'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
attrs = {
'bold': 'bold', 'normal': 'sgr0'}
def __init__(self):
super(CursesColor, self).__init__()
self.has_colors = False
try:
curses.setupterm()
self.has_colors = True
except curses.error:
return
def __missing__(self, key):
# pylint: disable=unused-argument,no-self-use
if not self.has_colors:
return ''
try:
value = curses.tigetstr(self.attrs[key])
except KeyError:
try:
value = curses.tparm(
curses.tigetstr('setaf'), self.colors.index(key))
except ValueError:
return ''
value = value.decode()
self[key] = value
return value
class QubesTestResult(unittest.TestResult):
'''A test result class that can print colourful text results to a stream.
Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
'''
separator1 = unittest.TextTestResult.separator1
separator2 = unittest.TextTestResult.separator2
def __init__(self, stream, descriptions, verbosity):
super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.showAll = verbosity > 1 # pylint: disable=invalid-name
self.dots = verbosity == 1
self.descriptions = descriptions
self.color = CursesColor()
self.hostname = socket.gethostname()
self.log = logging.getLogger('qubes.tests')
def _fmtexc(self, err):
if str(err[1]):
return '{color[bold]}{}:{color[normal]} {!s}'.format(
err[0].__name__, err[1], color=self.color)
else:
return '{color[bold]}{}{color[normal]}'.format(
err[0].__name__, color=self.color)
def get_log(self, test):
try:
return test.log
except AttributeError:
return self.log
def getDescription(self, test): # pylint: disable=invalid-name
teststr = str(test).split('/')
for i in range(-2, 0):
try:
fullname = teststr[i].split('_', 2)
except IndexError:
continue
fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
fullname[-1], color=self.color)
teststr[i] = '_'.join(fullname)
teststr = '/'.join(teststr)
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return '\n'.join((teststr, ' {}'.format(
doc_first_line, color=self.color)))
else:
return teststr
def startTest(self, test): # pylint: disable=invalid-name
super(QubesTestResult, self).startTest(test)
self.get_log(test).critical('started')
if self.showAll:
if not qubes.tests.in_git:
self.stream.write('{}: '.format(self.hostname))
self.stream.write(self.getDescription(test))
self.stream.write(' ... ')
self.stream.flush()
def addSuccess(self, test): # pylint: disable=invalid-name
super(QubesTestResult, self).addSuccess(test)
self.get_log(test).warning('ok')
if self.showAll:
self.stream.writeln('{color[green]}ok{color[normal]}'.format(
color=self.color))
elif self.dots:
self.stream.write('.')
self.stream.flush()
def addError(self, test, err): # pylint: disable=invalid-name
super(QubesTestResult, self).addError(test, err)
self.get_log(test).critical(
'ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
if self.showAll:
self.stream.writeln(
'{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
self._fmtexc(err), color=self.color))
elif self.dots:
self.stream.write(
'{color[red]}{color[bold]}E{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addFailure(self, test, err): # pylint: disable=invalid-name
super(QubesTestResult, self).addFailure(test, err)
self.get_log(test).error(
'FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
if self.showAll:
self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
color=self.color))
elif self.dots:
self.stream.write('{color[red]}F{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addSkip(self, test, reason): # pylint: disable=invalid-name
super(QubesTestResult, self).addSkip(test, reason)
self.get_log(test).warning('skipped ({})'.format(reason))
if self.showAll:
self.stream.writeln(
'{color[cyan]}skipped{color[normal]} ({})'.format(
reason, color=self.color))
elif self.dots:
self.stream.write('{color[cyan]}s{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
super(QubesTestResult, self).addExpectedFailure(test, err)
self.get_log(test).warning('expected failure')
if self.showAll:
self.stream.writeln(
'{color[yellow]}expected failure{color[normal]}'.format(
color=self.color))
elif self.dots:
self.stream.write('{color[yellow]}x{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
super(QubesTestResult, self).addUnexpectedSuccess(test)
self.get_log(test).error('unexpected success')
if self.showAll:
self.stream.writeln(
'{color[yellow]}{color[bold]}unexpected success'
'{color[normal]}'.format(color=self.color))
elif self.dots:
self.stream.write(
'{color[yellow]}{color[bold]}u{color[normal]}'.format(
color=self.color))
self.stream.flush()
def printErrors(self): # pylint: disable=invalid-name
if self.dots or self.showAll:
self.stream.writeln()
self.printErrorList(
'{color[red]}{color[bold]}ERROR{color[normal]}'.format(
color=self.color),
self.errors)
self.printErrorList(
'{color[red]}FAIL{color[normal]}'.format(
color=self.color),
self.failures)
self.printErrorList(
'{color[yellow]}EXPECTED{color[normal]}'.format(
color=self.color),
self.expectedFailures)
def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
for test, err in errors:
self.stream.writeln(self.separator1)
self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
self.stream.writeln(self.separator2)
self.stream.writeln('%s' % err)
def demo(verbosity=2):
class TC_00_Demo(qubes.tests.QubesTestCase):
'''Demo class'''
# pylint: disable=no-self-use
def test_0_success(self):
'''Demo test (success)'''
pass
def test_1_error(self):
'''Demo test (error)'''
raise Exception()
def test_2_failure(self):
'''Demo test (failure)'''
self.fail('boo')
def test_3_skip(self):
'''Demo test (skipped by call to self.skipTest())'''
self.skipTest('skip')
@unittest.skip(None)
def test_4_skip_decorator(self):
'''Demo test (skipped by decorator)'''
pass
@unittest.expectedFailure
def test_5_expected_failure(self):
'''Demo test (expected failure)'''
self.fail()
@unittest.expectedFailure
def test_6_unexpected_success(self):
'''Demo test (unexpected success)'''
pass
suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo)
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity)
runner.resultclass = QubesTestResult
return runner.run(suite).wasSuccessful()
parser = argparse.ArgumentParser(
epilog='''When running only specific tests, write their names like in log,
in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
"qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
parser.add_argument('--verbose', '-v',
action='count',
help='increase console verbosity level')
parser.add_argument('--quiet', '-q',
action='count',
help='decrease console verbosity level')
parser.add_argument('--list', '-l',
action='store_true', dest='list',
help='list all available tests and exit')
parser.add_argument('--failfast', '-f',
action='store_true', dest='failfast',
help='stop on the first fail, error or unexpected success')
parser.add_argument('--no-failfast',
action='store_false', dest='failfast',
help='disable --failfast')
# pylint: disable=protected-access
try:
name_to_level = logging._nameToLevel
except AttributeError:
name_to_level = logging._levelNames
parser.add_argument('--loglevel', '-L', metavar='LEVEL',
action='store', choices=tuple(k
for k in sorted(name_to_level.keys(),
key=lambda x: name_to_level[x])
if isinstance(k, str)),
help='logging level for file and syslog forwarding '
'(one of: %(choices)s; default: %(default)s)')
del name_to_level
# pylint: enable=protected-access
parser.add_argument('--logfile', '-o', metavar='FILE',
action='store',
help='if set, test run will be also logged to file')
parser.add_argument('--syslog',
action='store_true', dest='syslog',
help='reenable logging to syslog')
parser.add_argument('--no-syslog',
action='store_false', dest='syslog',
help='disable logging to syslog')
parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
action='store_true', dest='kmsg',
help='log most important things to kernel ring-buffer')
parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
action='store_false', dest='kmsg',
help='do not abuse kernel ring-buffer')
parser.add_argument('--allow-running-along-qubesd',
action='store_true', default=False,
help='allow running in parallel with qubesd;'
' this is DANGEROUS and WILL RESULT IN INCONSISTENT SYSTEM STATE')
parser.add_argument('--break-to-repl',
action='store_true', default=False,
help='break to REPL after tests')
parser.add_argument('names', metavar='TESTNAME',
action='store', nargs='*',
help='list of tests to run named like in description '
'(default: run all tests)')
parser.set_defaults(
failfast=False,
loglevel='DEBUG',
logfile=None,
syslog=True,
kmsg=False,
verbose=2,
quiet=0)
def list_test_cases(suite):
for test in suite:
if isinstance(test, unittest.TestSuite):
#yield from
for i in list_test_cases(test):
yield i
else:
yield test
def main(args=None):
args = parser.parse_args(args)
suite = unittest.TestSuite()
loader = unittest.TestLoader()
if args.names:
alltests = loader.loadTestsFromName('qubes.tests')
for name in args.names:
suite.addTests(
[test for test in list_test_cases(alltests)
if str(test).startswith(name)])
else:
suite.addTests(loader.loadTestsFromName('qubes.tests'))
if args.list:
for test in list_test_cases(suite):
print(str(test)) # pylint: disable=superfluous-parens
return True
logging.root.setLevel(args.loglevel)
if args.logfile is not None:
ha_file = logging.FileHandler(
os.path.join(os.environ['HOME'], args.logfile))
ha_file.setFormatter(
logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
logging.root.addHandler(ha_file)
if args.syslog:
ha_syslog = logging.handlers.SysLogHandler('/dev/log')
ha_syslog.setFormatter(
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
logging.root.addHandler(ha_syslog)
if args.kmsg:
try:
subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
except subprocess.CalledProcessError:
parser.error('could not chmod /dev/kmsg')
else:
ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
ha_kmsg.setFormatter(
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
ha_kmsg.setLevel(logging.CRITICAL)
logging.root.addHandler(ha_kmsg)
if not args.allow_running_along_qubesd \
and os.path.exists(qubes.api.admin.QubesAdminAPI.SOCKNAME):
parser.error('refusing to run until qubesd is disabled')
runner = unittest.TextTestRunner(stream=sys.stdout,
verbosity=(args.verbose-args.quiet),
failfast=args.failfast)
unittest.signals.installHandler()
runner.resultclass = QubesTestResult
result = runner.run(suite)
if args.break_to_repl:
code.interact(local=locals())
return result.wasSuccessful()
if __name__ == '__main__':
sys.exit(not main())