run.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  5. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import argparse
  21. import code
  22. import curses
  23. import itertools
  24. import logging
  25. import logging.handlers
  26. import os
  27. import socket
  28. import subprocess
  29. import sys
  30. import unittest
  31. import unittest.signals
  32. import qubes.tests
  33. import qubes.api.admin
  34. class CursesColor(dict):
  35. colors = (
  36. 'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
  37. attrs = {
  38. 'bold': 'bold', 'normal': 'sgr0'}
  39. def __init__(self):
  40. super(CursesColor, self).__init__()
  41. self.has_colors = False
  42. try:
  43. curses.setupterm()
  44. self.has_colors = True
  45. except curses.error:
  46. return
  47. def __missing__(self, key):
  48. # pylint: disable=unused-argument,no-self-use
  49. if not self.has_colors:
  50. return ''
  51. try:
  52. value = curses.tigetstr(self.attrs[key])
  53. except KeyError:
  54. try:
  55. value = curses.tparm(
  56. curses.tigetstr('setaf'), self.colors.index(key))
  57. except ValueError:
  58. return ''
  59. value = value.decode()
  60. self[key] = value
  61. return value
  62. class QubesTestResult(unittest.TestResult):
  63. '''A test result class that can print colourful text results to a stream.
  64. Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
  65. '''
  66. separator1 = unittest.TextTestResult.separator1
  67. separator2 = unittest.TextTestResult.separator2
  68. def __init__(self, stream, descriptions, verbosity):
  69. super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
  70. self.stream = stream
  71. self.showAll = verbosity > 1 # pylint: disable=invalid-name
  72. self.dots = verbosity == 1
  73. self.descriptions = descriptions
  74. self.color = CursesColor()
  75. self.hostname = socket.gethostname()
  76. self.log = logging.getLogger('qubes.tests')
  77. def _fmtexc(self, err):
  78. if str(err[1]):
  79. return '{color[bold]}{}:{color[normal]} {!s}'.format(
  80. err[0].__name__, err[1], color=self.color)
  81. else:
  82. return '{color[bold]}{}{color[normal]}'.format(
  83. err[0].__name__, color=self.color)
  84. def get_log(self, test):
  85. try:
  86. return test.log
  87. except AttributeError:
  88. return self.log
  89. def getDescription(self, test): # pylint: disable=invalid-name
  90. teststr = str(test).split('/')
  91. for i in range(-2, 0):
  92. try:
  93. fullname = teststr[i].split('_', 2)
  94. except IndexError:
  95. continue
  96. fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
  97. fullname[-1], color=self.color)
  98. teststr[i] = '_'.join(fullname)
  99. teststr = '/'.join(teststr)
  100. doc_first_line = test.shortDescription()
  101. if self.descriptions and doc_first_line:
  102. return '\n'.join((teststr, ' {}'.format(
  103. doc_first_line, color=self.color)))
  104. else:
  105. return teststr
  106. def startTest(self, test): # pylint: disable=invalid-name
  107. super(QubesTestResult, self).startTest(test)
  108. self.get_log(test).critical('started')
  109. if self.showAll:
  110. if not qubes.tests.in_git:
  111. self.stream.write('{}: '.format(self.hostname))
  112. self.stream.write(self.getDescription(test))
  113. self.stream.write(' ... ')
  114. self.stream.flush()
  115. def addSuccess(self, test): # pylint: disable=invalid-name
  116. super(QubesTestResult, self).addSuccess(test)
  117. self.get_log(test).warning('ok')
  118. if self.showAll:
  119. self.stream.writeln('{color[green]}ok{color[normal]}'.format(
  120. color=self.color))
  121. elif self.dots:
  122. self.stream.write('.')
  123. self.stream.flush()
  124. def addError(self, test, err): # pylint: disable=invalid-name
  125. super(QubesTestResult, self).addError(test, err)
  126. self.get_log(test).critical(
  127. 'ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
  128. if self.showAll:
  129. self.stream.writeln(
  130. '{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
  131. self._fmtexc(err), color=self.color))
  132. elif self.dots:
  133. self.stream.write(
  134. '{color[red]}{color[bold]}E{color[normal]}'.format(
  135. color=self.color))
  136. self.stream.flush()
  137. def addFailure(self, test, err): # pylint: disable=invalid-name
  138. super(QubesTestResult, self).addFailure(test, err)
  139. self.get_log(test).error(
  140. 'FAIL ({err[0].__name__}: {err[1]!r})'.format(err=err))
  141. if self.showAll:
  142. self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
  143. color=self.color))
  144. elif self.dots:
  145. self.stream.write('{color[red]}F{color[normal]}'.format(
  146. color=self.color))
  147. self.stream.flush()
  148. def addSkip(self, test, reason): # pylint: disable=invalid-name
  149. super(QubesTestResult, self).addSkip(test, reason)
  150. self.get_log(test).warning('skipped ({})'.format(reason))
  151. if self.showAll:
  152. self.stream.writeln(
  153. '{color[cyan]}skipped{color[normal]} ({})'.format(
  154. reason, color=self.color))
  155. elif self.dots:
  156. self.stream.write('{color[cyan]}s{color[normal]}'.format(
  157. color=self.color))
  158. self.stream.flush()
  159. def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
  160. super(QubesTestResult, self).addExpectedFailure(test, err)
  161. self.get_log(test).warning('expected failure')
  162. if self.showAll:
  163. self.stream.writeln(
  164. '{color[yellow]}expected failure{color[normal]}'.format(
  165. color=self.color))
  166. elif self.dots:
  167. self.stream.write('{color[yellow]}x{color[normal]}'.format(
  168. color=self.color))
  169. self.stream.flush()
  170. def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
  171. super(QubesTestResult, self).addUnexpectedSuccess(test)
  172. self.get_log(test).error('unexpected success')
  173. if self.showAll:
  174. self.stream.writeln(
  175. '{color[yellow]}{color[bold]}unexpected success'
  176. '{color[normal]}'.format(color=self.color))
  177. elif self.dots:
  178. self.stream.write(
  179. '{color[yellow]}{color[bold]}u{color[normal]}'.format(
  180. color=self.color))
  181. self.stream.flush()
  182. def printErrors(self): # pylint: disable=invalid-name
  183. if self.dots or self.showAll:
  184. self.stream.writeln()
  185. self.printErrorList(
  186. '{color[red]}{color[bold]}ERROR{color[normal]}'.format(
  187. color=self.color),
  188. self.errors)
  189. self.printErrorList(
  190. '{color[red]}FAIL{color[normal]}'.format(
  191. color=self.color),
  192. self.failures)
  193. self.printErrorList(
  194. '{color[yellow]}EXPECTED{color[normal]}'.format(
  195. color=self.color),
  196. self.expectedFailures)
  197. def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
  198. for test, err in errors:
  199. self.stream.writeln(self.separator1)
  200. self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
  201. self.stream.writeln(self.separator2)
  202. self.stream.writeln('%s' % err)
  203. def demo(verbosity=2):
  204. class TC_00_Demo(qubes.tests.QubesTestCase):
  205. '''Demo class'''
  206. # pylint: disable=no-self-use
  207. def test_0_success(self):
  208. '''Demo test (success)'''
  209. pass
  210. def test_1_error(self):
  211. '''Demo test (error)'''
  212. raise Exception()
  213. def test_2_failure(self):
  214. '''Demo test (failure)'''
  215. self.fail('boo')
  216. def test_3_skip(self):
  217. '''Demo test (skipped by call to self.skipTest())'''
  218. self.skipTest('skip')
  219. @unittest.skip(None)
  220. def test_4_skip_decorator(self):
  221. '''Demo test (skipped by decorator)'''
  222. pass
  223. @unittest.expectedFailure
  224. def test_5_expected_failure(self):
  225. '''Demo test (expected failure)'''
  226. self.fail()
  227. @unittest.expectedFailure
  228. def test_6_unexpected_success(self):
  229. '''Demo test (unexpected success)'''
  230. pass
  231. suite = unittest.TestLoader().loadTestsFromTestCase(TC_00_Demo)
  232. runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=verbosity)
  233. runner.resultclass = QubesTestResult
  234. return runner.run(suite).wasSuccessful()
  235. parser = argparse.ArgumentParser(
  236. epilog='''When running only specific tests, write their names like in log,
  237. in format: MODULE+"/"+CLASS+"/"+FUNCTION. MODULE should omit initial
  238. "qubes.tests.". Example: basic/TC_00_Basic/test_000_create''')
  239. parser.add_argument('--verbose', '-v',
  240. action='count',
  241. help='increase console verbosity level')
  242. parser.add_argument('--quiet', '-q',
  243. action='count',
  244. help='decrease console verbosity level')
  245. parser.add_argument('--list', '-l',
  246. action='store_true', dest='list',
  247. help='list all available tests and exit')
  248. parser.add_argument('--failfast', '-f',
  249. action='store_true', dest='failfast',
  250. help='stop on the first fail, error or unexpected success')
  251. parser.add_argument('--no-failfast',
  252. action='store_false', dest='failfast',
  253. help='disable --failfast')
  254. # pylint: disable=protected-access
  255. try:
  256. name_to_level = logging._nameToLevel
  257. except AttributeError:
  258. name_to_level = logging._levelNames
  259. parser.add_argument('--loglevel', '-L', metavar='LEVEL',
  260. action='store', choices=tuple(k
  261. for k in sorted(name_to_level.keys(),
  262. key=lambda x: name_to_level[x])
  263. if isinstance(k, str)),
  264. help='logging level for file and syslog forwarding '
  265. '(one of: %(choices)s; default: %(default)s)')
  266. del name_to_level
  267. # pylint: enable=protected-access
  268. parser.add_argument('--logfile', '-o', metavar='FILE',
  269. action='store',
  270. help='if set, test run will be also logged to file')
  271. parser.add_argument('--kmsg', '--very-brave-or-very-stupid',
  272. action='store_true', dest='kmsg',
  273. help='log most important things to kernel ring-buffer')
  274. parser.add_argument('--no-kmsg', '--i-am-smarter-than-kay-sievers',
  275. action='store_false', dest='kmsg',
  276. help='do not abuse kernel ring-buffer')
  277. parser.add_argument('--allow-running-along-qubesd',
  278. action='store_true', default=False,
  279. help='allow running in parallel with qubesd;'
  280. ' this is DANGEROUS and WILL RESULT IN INCONSISTENT SYSTEM STATE')
  281. parser.add_argument('--break-to-repl',
  282. action='store_true', default=False,
  283. help='break to REPL after tests')
  284. parser.add_argument('names', metavar='TESTNAME',
  285. action='store', nargs='*',
  286. help='list of tests to run named like in description '
  287. '(default: run all tests)')
  288. parser.set_defaults(
  289. failfast=False,
  290. loglevel='DEBUG',
  291. logfile=None,
  292. kmsg=False,
  293. verbose=2,
  294. quiet=0)
  295. def list_test_cases(suite):
  296. for test in suite:
  297. if isinstance(test, unittest.TestSuite):
  298. #yield from
  299. for i in list_test_cases(test):
  300. yield i
  301. else:
  302. yield test
  303. def main(args=None):
  304. args = parser.parse_args(args)
  305. suite = unittest.TestSuite()
  306. loader = unittest.TestLoader()
  307. if args.names:
  308. alltests = loader.loadTestsFromName('qubes.tests')
  309. for name in args.names:
  310. suite.addTests(
  311. [test for test in list_test_cases(alltests)
  312. if str(test).startswith(name)])
  313. else:
  314. suite.addTests(loader.loadTestsFromName('qubes.tests'))
  315. if args.list:
  316. for test in list_test_cases(suite):
  317. print(str(test)) # pylint: disable=superfluous-parens
  318. return True
  319. logging.root.setLevel(args.loglevel)
  320. if args.logfile is not None:
  321. ha_file = logging.FileHandler(
  322. os.path.join(os.environ['HOME'], args.logfile))
  323. ha_file.setFormatter(
  324. logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
  325. logging.root.addHandler(ha_file)
  326. if args.kmsg:
  327. try:
  328. subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
  329. except subprocess.CalledProcessError:
  330. parser.error('could not chmod /dev/kmsg')
  331. else:
  332. ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
  333. ha_kmsg.setFormatter(
  334. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  335. ha_kmsg.setLevel(logging.CRITICAL)
  336. logging.root.addHandler(ha_kmsg)
  337. if not args.allow_running_along_qubesd \
  338. and os.path.exists(qubes.api.admin.QubesAdminAPI.SOCKNAME):
  339. parser.error('refusing to run until qubesd is disabled')
  340. runner = unittest.TextTestRunner(stream=sys.stdout,
  341. verbosity=(args.verbose-args.quiet),
  342. failfast=args.failfast)
  343. unittest.signals.installHandler()
  344. runner.resultclass = QubesTestResult
  345. result = runner.run(suite)
  346. if args.break_to_repl:
  347. code.interact(local=locals())
  348. return result.wasSuccessful()
  349. if __name__ == '__main__':
  350. sys.exit(not main())