run.py 13 KB


  1. #!/usr/bin/python2 -O
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  7. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. #
  9. # This program is free software; you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by
  11. # the Free Software Foundation; either version 2 of the License, or
  12. # (at your option) any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License along
  20. # with this program; if not, write to the Free Software Foundation, Inc.,
  21. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  22. #
  23. import argparse
  24. import curses
  25. import importlib
  26. import logging
  27. import logging.handlers
  28. import os
  29. import socket
  30. import subprocess
  31. import sys
  32. import unittest
  33. import unittest.signals
  34. import qubes.tests
  35. class CursesColor(dict):
  36. def __init__(self):
  37. super(CursesColor, self).__init__()
  38. try:
  39. curses.setupterm()
  40. except curses.error:
  41. return
  42. # pylint: disable=bad-whitespace
  43. self['black'] = curses.tparm(curses.tigetstr('setaf'), 0)
  44. self['red'] = curses.tparm(curses.tigetstr('setaf'), 1)
  45. self['green'] = curses.tparm(curses.tigetstr('setaf'), 2)
  46. self['yellow'] = curses.tparm(curses.tigetstr('setaf'), 3)
  47. self['blue'] = curses.tparm(curses.tigetstr('setaf'), 4)
  48. self['magenta'] = curses.tparm(curses.tigetstr('setaf'), 5)
  49. self['cyan'] = curses.tparm(curses.tigetstr('setaf'), 6)
  50. self['white'] = curses.tparm(curses.tigetstr('setaf'), 7)
  51. self['bold'] = curses.tigetstr('bold')
  52. self['normal'] = curses.tigetstr('sgr0')
  53. def __missing__(self, key):
  54. # pylint: disable=unused-argument,no-self-use
  55. return ''
  56. class QubesTestResult(unittest.TestResult):
  57. '''A test result class that can print colourful text results to a stream.
  58. Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
  59. '''
  60. separator1 = unittest.TextTestResult.separator1
  61. separator2 = unittest.TextTestResult.separator2
  62. def __init__(self, stream, descriptions, verbosity):
  63. super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
  64. self.stream = stream
  65. self.showAll = verbosity > 1 # pylint: disable=invalid-name
  66. self.dots = verbosity == 1
  67. self.descriptions = descriptions
  68. self.color = CursesColor()
  69. self.hostname = socket.gethostname()
  70. self.log = logging.getLogger('qubes.tests')
  71. def _fmtexc(self, err):
  72. if str(err[1]):
  73. return '{color[bold]}{}:{color[normal]} {!s}'.format(
  74. err[0].__name__, err[1], color=self.color)
  75. else:
  76. return '{color[bold]}{}{color[normal]}'.format(
  77. err[0].__name__, color=self.color)
  78. def getDescription(self, test): # pylint: disable=invalid-name
  79. teststr = str(test).split('/')
  80. for i in range(-2, 0):
  81. try:
  82. fullname = teststr[i].split('_', 2)
  83. except IndexError:
  84. continue
  85. fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
  86. fullname[-1], color=self.color)
  87. teststr[i] = '_'.join(fullname)
  88. teststr = '/'.join(teststr)
  89. doc_first_line = test.shortDescription()
  90. if self.descriptions and doc_first_line:
  91. return '\n'.join((teststr, ' {}'.format(
  92. doc_first_line, color=self.color)))
  93. else:
  94. return teststr
  95. def startTest(self, test): # pylint: disable=invalid-name
  96. super(QubesTestResult, self).startTest(test)
  97. test.log.critical('started')
  98. if self.showAll:
  99. # if not qubes.tests.in_git:
  100. self.stream.write('{}: '.format(self.hostname))
  101. self.stream.write(self.getDescription(test))
  102. self.stream.write(' ... ')
  103. self.stream.flush()
  104. def addSuccess(self, test): # pylint: disable=invalid-name
  105. super(QubesTestResult, self).addSuccess(test)
  106. test.log.warning('ok')
  107. if self.showAll:
  108. self.stream.writeln('{color[green]}ok{color[normal]}'.format(
  109. color=self.color))
  110. elif self.dots:
  111. self.stream.write('.')
  112. self.stream.flush()
  113. def addError(self, test, err): # pylint: disable=invalid-name
  114. super(QubesTestResult, self).addError(test, err)
  115. test.log.critical('ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
  116. if self.showAll:
  117. self.stream.writeln(
  118. '{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
  119. self._fmtexc(err), color=self.color))
  120. elif self.dots:
  121. self.stream.write(
  122. '{color[red]}{color[bold]}E{color[normal]}'.format(
  123. color=self.color))
  124. self.stream.flush()
  125. def addFailure(self, test, err): # pylint: disable=invalid-name
  126. super(QubesTestResult, self).addFailure(test, err)
  127. test.log.error('FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
  128. if self.showAll:
  129. self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
  130. color=self.color))
  131. elif self.dots:
  132. self.stream.write('{color[red]}F{color[normal]}'.format(
  133. color=self.color))
  134. self.stream.flush()
  135. def addSkip(self, test, reason): # pylint: disable=invalid-name
  136. super(QubesTestResult, self).addSkip(test, reason)
  137. test.log.warning('skipped ({})'.format(reason))
  138. if self.showAll:
  139. self.stream.writeln(
  140. '{color[cyan]}skipped{color[normal]} ({})'.format(
  141. reason, color=self.color))
  142. elif self.dots:
  143. self.stream.write('{color[cyan]}s{color[normal]}'.format(
  144. color=self.color))
  145. self.stream.flush()
  146. def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
  147. super(QubesTestResult, self).addExpectedFailure(test, err)
  148. test.log.warning('expected failure')
  149. if self.showAll:
  150. self.stream.writeln(
  151. '{color[yellow]}expected failure{color[normal]}'.format(
  152. color=self.color))
  153. elif self.dots:
  154. self.stream.write('{color[yellow]}x{color[normal]}'.format(
  155. color=self.color))
  156. self.stream.flush()
  157. def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
  158. super(QubesTestResult, self).addUnexpectedSuccess(test)
  159. test.log.error('unexpected success')
  160. if self.showAll:
  161. self.stream.writeln(
  162. '{color[yellow]}{color[bold]}unexpected success'
  163. '{color[normal]}'.format(color=self.color))
  164. elif self.dots:
  165. self.stream.write(
  166. '{color[yellow]}{color[bold]}u{color[normal]}'.format(
  167. color=self.color))
  168. self.stream.flush()
  169. def printErrors(self): # pylint: disable=invalid-name
  170. if self.dots or self.showAll:
  171. self.stream.writeln()
  172. self.printErrorList(
  173. '{color[red]}{color[bold]}ERROR{color[normal]}'.format(
  174. color=self.color),
  175. self.errors)
  176. self.printErrorList(
  177. '{color[red]}FAIL{color[normal]}'.format(color=self.color),
  178. self.failures)
  179. def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
  180. for test, err in errors:
  181. self.stream.writeln(self.separator1)
  182. self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
  183. self.stream.writeln(self.separator2)
  184. self.stream.writeln('%s' % err)
  185. class QubesDNCTestResult(QubesTestResult):
  186. do_not_clean = True
  187. parser = argparse.ArgumentParser(
  188. epilog='''When running only specific tests, write their names like in log,
  189. in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
  190. "qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
  191. parser.add_argument('--verbose', '-v',
  192. action='count',
  193. help='increase console verbosity level')
  194. parser.add_argument('--quiet', '-q',
  195. action='count',
  196. help='decrease console verbosity level')
  197. parser.add_argument('--list', '-l',
  198. action='store_true', dest='list',
  199. help='list all available tests and exit')
  200. parser.add_argument('--failfast', '-f',
  201. action='store_true', dest='failfast',
  202. help='stop on the first fail, error or unexpected success')
  203. parser.add_argument('--no-failfast',
  204. action='store_false', dest='failfast',
  205. help='disable --failfast')
  206. parser.add_argument('--do-not-clean', '--dnc', '-D',
  207. action='store_true', dest='do_not_clean',
  208. help='do not execute tearDown on failed tests. Implies --failfast.')
  209. parser.add_argument('--do-clean', '-C',
  210. action='store_false', dest='do_not_clean',
  211. help='do execute tearDown even on failed tests.')
  212. parser.add_argument('--loglevel', '-L', metavar='LEVEL',
  213. action='store', choices=tuple(k
  214. for k in sorted(logging._levelNames.keys(),
  215. key=lambda x: logging._levelNames[x])
  216. if isinstance(k, basestring)),
  217. help='logging level for file and syslog forwarding '
  218. '(one of: %(choices)s; default: %(default)s)')
  219. parser.add_argument('--logfile', '-o', metavar='FILE',
  220. action='store',
  221. help='if set, test run will be also logged to file')
  222. parser.add_argument('--syslog',
  223. action='store_true', dest='syslog',
  224. help='reenable logging to syslog')
  225. parser.add_argument('--no-syslog',
  226. action='store_false', dest='syslog',
  227. help='disable logging to syslog')
  228. parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
  229. action='store_true', dest='kmsg',
  230. help='log most important things to kernel ring-buffer')
  231. parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
  232. action='store_false', dest='kmsg',
  233. help='do not abuse kernel ring-buffer')
  234. parser.add_argument('names', metavar='TESTNAME',
  235. action='store', nargs='*',
  236. help='list of tests to run named like in description '
  237. '(default: run all tests)')
  238. parser.set_defaults(
  239. failfast=False,
  240. loglevel='DEBUG',
  241. logfile=None,
  242. syslog=True,
  243. kmsg=False,
  244. verbose=2,
  245. quiet=0)
  246. def list_test_cases(suite):
  247. for test in suite:
  248. if isinstance(test, unittest.TestSuite):
  249. #yield from
  250. for i in list_test_cases(test):
  251. yield i
  252. else:
  253. yield test
  254. def main():
  255. args = parser.parse_args()
  256. suite = unittest.TestSuite()
  257. loader = unittest.TestLoader()
  258. if args.names:
  259. alltests = loader.loadTestsFromName('qubes.tests')
  260. for name in args.names:
  261. suite.addTests(
  262. [test for test in list_test_cases(alltests)
  263. if (str(test)+'/').startswith(name.replace('.', '/')+'/')])
  264. else:
  265. suite.addTests(loader.loadTestsFromName('qubes.tests'))
  266. if args.list:
  267. for test in list_test_cases(suite):
  268. print(str(test))
  269. return True
  270. if args.do_not_clean:
  271. args.failfast = True
  272. logging.root.setLevel(args.loglevel)
  273. if args.logfile is not None:
  274. ha_file = logging.FileHandler(
  275. os.path.join(os.environ['HOME'], args.logfile))
  276. ha_file.setFormatter(
  277. logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
  278. logging.root.addHandler(ha_file)
  279. if args.syslog:
  280. ha_syslog = logging.handlers.SysLogHandler('/dev/log')
  281. ha_syslog.setFormatter(
  282. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  283. logging.root.addHandler(ha_syslog)
  284. if args.kmsg:
  285. try:
  286. subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
  287. except subprocess.CalledProcessError:
  288. parser.error('could not chmod /dev/kmsg')
  289. else:
  290. ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
  291. ha_kmsg.setFormatter(
  292. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  293. ha_kmsg.setLevel(logging.CRITICAL)
  294. logging.root.addHandler(ha_kmsg)
  295. runner = unittest.TextTestRunner(stream=sys.stdout,
  296. verbosity=(args.verbose-args.quiet),
  297. failfast=args.failfast)
  298. unittest.signals.installHandler()
  299. runner.resultclass = QubesDNCTestResult \
  300. if args.do_not_clean else QubesTestResult
  301. return runner.run(suite).wasSuccessful()
  302. if __name__ == '__main__':
  303. sys.exit(not main())