123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
- # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- import argparse
- import code
- import curses
- import itertools
- import logging
- import logging.handlers
- import os
- import socket
- import subprocess
- import sys
- import unittest
- import unittest.signals
- import qubes.tests
- import qubes.api.admin
- class CursesColor(dict):
- colors = (
- 'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
- attrs = {
- 'bold': 'bold', 'normal': 'sgr0'}
- def __init__(self):
- super(CursesColor, self).__init__()
- self.has_colors = False
- try:
- curses.setupterm()
- self.has_colors = True
- except curses.error:
- return
- def __missing__(self, key):
- # pylint: disable=unused-argument,no-self-use
- if not self.has_colors:
- return ''
- try:
- value = curses.tigetstr(self.attrs[key])
- except KeyError:
- try:
- value = curses.tparm(
- curses.tigetstr('setaf'), self.colors.index(key))
- except ValueError:
- return ''
- value = value.decode()
- self[key] = value
- return value
- class QubesTestResult(unittest.TestResult):
- '''A test result class that can print colourful text results to a stream.
- Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
- '''
- separator1 = unittest.TextTestResult.separator1
- separator2 = unittest.TextTestResult.separator2
- def __init__(self, stream, descriptions, verbosity):
- super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
- self.stream = stream
- self.showAll = verbosity > 1 # pylint: disable=invalid-name
- self.dots = verbosity == 1
- self.descriptions = descriptions
- self.color = CursesColor()
- self.hostname = socket.gethostname()
- self.log = logging.getLogger('qubes.tests')
- def _fmtexc(self, err):
- if str(err[1]):
- return '{color[bold]}{}:{color[normal]} {!s}'.format(
- err[0].__name__, err[1], color=self.color)
- else:
- return '{color[bold]}{}{color[normal]}'.format(
- err[0].__name__, color=self.color)
- def get_log(self, test):
- try:
- return test.log
- except AttributeError:
- return self.log
- def getDescription(self, test): # pylint: disable=invalid-name
- teststr = str(test).split('/')
- for i in range(-2, 0):
- try:
- fullname = teststr[i].split('_', 2)
- except IndexError:
- continue
- fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
- fullname[-1], color=self.color)
- teststr[i] = '_'.join(fullname)
- teststr = '/'.join(teststr)
- doc_first_line = test.shortDescription()
- if self.descriptions and doc_first_line:
- return '\n'.join((teststr, ' {}'.format(
- doc_first_line, color=self.color)))
- else:
- return teststr
- def startTest(self, test): # pylint: disable=invalid-name
- super(QubesTestResult, self).startTest(test)
- self.get_log(test).critical('started')
- if self.showAll:
- if not qubes.tests.in_git:
- self.stream.write('{}: '.format(self.hostname))
- self.stream.write(self.getDescription(test))
- self.stream.write(' ... ')
- self.stream.flush()
- def addSuccess(self, test): # pylint: disable=invalid-name
- super(QubesTestResult, self).addSuccess(test)
- self.get_log(test).warning('ok')
- if self.showAll:
- self.stream.writeln('{color[green]}ok{color[normal]}'.format(
- color=self.color))
- elif self.dots:
- self.stream.write('.')
- self.stream.flush()
- def addError(self, test, err): # pylint: disable=invalid-name
- super(QubesTestResult, self).addError(test, err)
- self.get_log(test).critical(
- 'ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
- if self.showAll:
- self.stream.writeln(
- '{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
- self._fmtexc(err), color=self.color))
- elif self.dots:
- self.stream.write(
- '{color[red]}{color[bold]}E{color[normal]}'.format(
- color=self.color))
- self.stream.flush()
- def addFailure(self, test, err): # pylint: disable=invalid-name
- super(QubesTestResult, self).addFailure(test, err)
- self.get_log(test).error(
- 'FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
- if self.showAll:
- self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
- color=self.color))
- elif self.dots:
- self.stream.write('{color[red]}F{color[normal]}'.format(
- color=self.color))
- self.stream.flush()
- def addSkip(self, test, reason): # pylint: disable=invalid-name
- super(QubesTestResult, self).addSkip(test, reason)
- self.get_log(test).warning('skipped ({})'.format(reason))
- if self.showAll:
- self.stream.writeln(
- '{color[cyan]}skipped{color[normal]} ({})'.format(
- reason, color=self.color))
- elif self.dots:
- self.stream.write('{color[cyan]}s{color[normal]}'.format(
- color=self.color))
- self.stream.flush()
- def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
- super(QubesTestResult, self).addExpectedFailure(test, err)
- self.get_log(test).warning('expected failure')
- if self.showAll:
- self.stream.writeln(
- '{color[yellow]}expected failure{color[normal]}'.format(
- color=self.color))
- elif self.dots:
- self.stream.write('{color[yellow]}x{color[normal]}'.format(
- color=self.color))
- self.stream.flush()
- def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
- super(QubesTestResult, self).addUnexpectedSuccess(test)
- self.get_log(test).error('unexpected success')
- if self.showAll:
- self.stream.writeln(
- '{color[yellow]}{color[bold]}unexpected success'
- '{color[normal]}'.format(color=self.color))
- elif self.dots:
- self.stream.write(
- '{color[yellow]}{color[bold]}u{color[normal]}'.format(
- color=self.color))
- self.stream.flush()
- def printErrors(self): # pylint: disable=invalid-name
- if self.dots or self.showAll:
- self.stream.writeln()
- self.printErrorList(
- '{color[red]}{color[bold]}ERROR{color[normal]}'.format(
- color=self.color),
- self.errors)
- self.printErrorList(
- '{color[red]}FAIL{color[normal]}'.format(
- color=self.color),
- self.failures)
- self.printErrorList(
- '{color[yellow]}EXPECTED{color[normal]}'.format(
- color=self.color),
- self.expectedFailures)
- def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
- for test, err in errors:
- self.stream.writeln(self.separator1)
- self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
- self.stream.writeln(self.separator2)
- self.stream.writeln('%s' % err)
- def demo(verbosity=2):
- class TC_00_Demo(qubes.tests.QubesTestCase):
- '''Demo class'''
- # pylint: disable=no-self-use
- def test_0_success(self):
- '''Demo test (success)'''
- pass
- def test_1_error(self):
- '''Demo test (error)'''
- raise Exception()
- def test_2_failure(self):
- '''Demo test (failure)'''
- self.fail('boo')
- def test_3_skip(self):
- '''Demo test (skipped by call to self.skipTest())'''
- self.skipTest('skip')
- @unittest.skip(None)
- def test_4_skip_decorator(self):
- '''Demo test (skipped by decorator)'''
- pass
- @unittest.expectedFailure
- def test_5_expected_failure(self):
- '''Demo test (expected failure)'''
- self.fail()
- @unittest.expectedFailure
- def test_6_unexpected_success(self):
- '''Demo test (unexpected success)'''
- pass
- suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo)
- runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity)
- runner.resultclass = QubesTestResult
- return runner.run(suite).wasSuccessful()
- parser = argparse.ArgumentParser(
- epilog='''When running only specific tests, write their names like in log,
- in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
- "qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
- parser.add_argument('--verbose', '-v',
- action='count',
- help='increase console verbosity level')
- parser.add_argument('--quiet', '-q',
- action='count',
- help='decrease console verbosity level')
- parser.add_argument('--list', '-l',
- action='store_true', dest='list',
- help='list all available tests and exit')
- parser.add_argument('--failfast', '-f',
- action='store_true', dest='failfast',
- help='stop on the first fail, error or unexpected success')
- parser.add_argument('--no-failfast',
- action='store_false', dest='failfast',
- help='disable --failfast')
- # pylint: disable=protected-access
- try:
- name_to_level = logging._nameToLevel
- except AttributeError:
- name_to_level = logging._levelNames
- parser.add_argument('--loglevel', '-L', metavar='LEVEL',
- action='store', choices=tuple(k
- for k in sorted(name_to_level.keys(),
- key=lambda x: name_to_level[x])
- if isinstance(k, str)),
- help='logging level for file and syslog forwarding '
- '(one of: %(choices)s; default: %(default)s)')
- del name_to_level
- # pylint: enable=protected-access
- parser.add_argument('--logfile', '-o', metavar='FILE',
- action='store',
- help='if set, test run will be also logged to file')
- parser.add_argument('--syslog',
- action='store_true', dest='syslog',
- help='reenable logging to syslog')
- parser.add_argument('--no-syslog',
- action='store_false', dest='syslog',
- help='disable logging to syslog')
- parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
- action='store_true', dest='kmsg',
- help='log most important things to kernel ring-buffer')
- parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
- action='store_false', dest='kmsg',
- help='do not abuse kernel ring-buffer')
- parser.add_argument('--allow-running-along-qubesd',
- action='store_true', default=False,
- help='allow running in parallel with qubesd;'
- ' this is DANGEROUS and WILL RESULT IN INCONSISTENT SYSTEM STATE')
- parser.add_argument('--break-to-repl',
- action='store_true', default=False,
- help='break to REPL after tests')
- parser.add_argument('names', metavar='TESTNAME',
- action='store', nargs='*',
- help='list of tests to run named like in description '
- '(default: run all tests)')
- parser.set_defaults(
- failfast=False,
- loglevel='DEBUG',
- logfile=None,
- syslog=True,
- kmsg=False,
- verbose=2,
- quiet=0)
- def list_test_cases(suite):
- for test in suite:
- if isinstance(test, unittest.TestSuite):
- #yield from
- for i in list_test_cases(test):
- yield i
- else:
- yield test
- def main(args=None):
- args = parser.parse_args(args)
- suite = unittest.TestSuite()
- loader = unittest.TestLoader()
- if args.names:
- alltests = loader.loadTestsFromName('qubes.tests')
- for name in args.names:
- suite.addTests(
- [test for test in list_test_cases(alltests)
- if str(test).startswith(name)])
- else:
- suite.addTests(loader.loadTestsFromName('qubes.tests'))
- if args.list:
- for test in list_test_cases(suite):
- print(str(test)) # pylint: disable=superfluous-parens
- return True
- logging.root.setLevel(args.loglevel)
- if args.logfile is not None:
- ha_file = logging.FileHandler(
- os.path.join(os.environ['HOME'], args.logfile))
- ha_file.setFormatter(
- logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
- logging.root.addHandler(ha_file)
- if args.syslog:
- ha_syslog = logging.handlers.SysLogHandler('/dev/log')
- ha_syslog.setFormatter(
- logging.Formatter('%(name)s[%(process)d]: %(message)s'))
- logging.root.addHandler(ha_syslog)
- if args.kmsg:
- try:
- subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
- except subprocess.CalledProcessError:
- parser.error('could not chmod /dev/kmsg')
- else:
- ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
- ha_kmsg.setFormatter(
- logging.Formatter('%(name)s[%(process)d]: %(message)s'))
- ha_kmsg.setLevel(logging.CRITICAL)
- logging.root.addHandler(ha_kmsg)
- if not args.allow_running_along_qubesd \
- and os.path.exists(qubes.api.admin.QubesAdminAPI.SOCKNAME):
- parser.error('refusing to run until qubesd is disabled')
- runner = unittest.TextTestRunner(stream=sys.stdout,
- verbosity=(args.verbose-args.quiet),
- failfast=args.failfast)
- unittest.signals.installHandler()
- runner.resultclass = QubesTestResult
- result = runner.run(suite)
- if args.break_to_repl:
- code.interact(local=locals())
- return result.wasSuccessful()
- if __name__ == '__main__':
- sys.exit(not main())
|