__init__.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. #!/usr/bin/python2 -O
  2. # vim: fileencoding=utf-8
  3. # pylint: disable=invalid-name
  4. #
  5. # The Qubes OS Project, https://www.qubes-os.org/
  6. #
  7. # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
  8. # Copyright (C) 2014-2015
  9. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  10. # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  11. #
  12. # This program is free software; you can redistribute it and/or modify
  13. # it under the terms of the GNU General Public License as published by
  14. # the Free Software Foundation; either version 2 of the License, or
  15. # (at your option) any later version.
  16. #
  17. # This program is distributed in the hope that it will be useful,
  18. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  19. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  20. # GNU General Public License for more details.
  21. #
  22. # You should have received a copy of the GNU General Public License along
  23. # with this program; if not, write to the Free Software Foundation, Inc.,
  24. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  25. #
  26. import collections
  27. import multiprocessing
  28. import logging
  29. import os
  30. import shutil
  31. import subprocess
  32. import sys
  33. import unittest
  34. import lxml.etree
  35. import qubes.config
  36. import qubes.events
  37. XMLPATH = '/var/lib/qubes/qubes-test.xml'
  38. TEMPLATE = 'fedora-23'
  39. VMPREFIX = 'test-'
  40. #: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise
  41. in_dom0 = False
  42. #: :py:obj:`False` if outside of git repo,
  43. #: path to root of the directory otherwise
  44. in_git = False
  45. try:
  46. import libvirt
  47. libvirt.openReadOnly(qubes.config.defaults['libvirt_uri']).close()
  48. in_dom0 = True
  49. except libvirt.libvirtError:
  50. pass
  51. try:
  52. in_git = subprocess.check_output(
  53. ['git', 'rev-parse', '--show-toplevel']).strip()
  54. qubes.log.LOGPATH = '/tmp'
  55. qubes.log.LOGFILE = '/tmp/qubes.log'
  56. except subprocess.CalledProcessError:
  57. # git returned nonzero, we are outside git repo
  58. pass
  59. except OSError:
  60. # command not found; let's assume we're outside
  61. pass
  62. def skipUnlessDom0(test_item):
  63. '''Decorator that skips test outside dom0.
  64. Some tests (especially integration tests) have to be run in more or less
  65. working dom0. This is checked by connecting to libvirt.
  66. '''
  67. return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
  68. def skipUnlessGit(test_item):
  69. '''Decorator that skips test outside git repo.
  70. There are very few tests that an be run only in git. One example is
  71. correctness of example code that won't get included in RPM.
  72. '''
  73. return unittest.skipUnless(in_git, 'outside git tree')(test_item)
  74. class TestEmitter(qubes.events.Emitter):
  75. '''Dummy event emitter which records events fired on it.
  76. Events are counted in :py:attr:`fired_events` attribute, which is
  77. :py:class:`collections.Counter` instance. For each event, ``(event, args,
  78. kwargs)`` object is counted. *event* is event name (a string), *args* is
  79. tuple with positional arguments and *kwargs* is sorted tuple of items from
  80. keyword arguments.
  81. >>> emitter = TestEmitter()
  82. >>> emitter.fired_events
  83. Counter()
  84. >>> emitter.fire_event('event', 1, 2, 3, spam='eggs', foo='bar')
  85. >>> emitter.fired_events
  86. Counter({('event', (1, 2, 3), (('foo', 'bar'), ('spam', 'eggs'))): 1})
  87. '''
  88. def __init__(self, *args, **kwargs):
  89. super(TestEmitter, self).__init__(*args, **kwargs)
  90. #: :py:class:`collections.Counter` instance
  91. self.fired_events = collections.Counter()
  92. def fire_event(self, event, *args, **kwargs):
  93. super(TestEmitter, self).fire_event(event, *args, **kwargs)
  94. self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1
  95. def fire_event_pre(self, event, *args, **kwargs):
  96. super(TestEmitter, self).fire_event_pre(event, *args, **kwargs)
  97. self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1
  98. class _AssertNotRaisesContext(object):
  99. """A context manager used to implement TestCase.assertNotRaises methods.
  100. Stolen from unittest and hacked. Regexp support stripped.
  101. """ # pylint: disable=too-few-public-methods
  102. def __init__(self, expected, test_case, expected_regexp=None):
  103. if expected_regexp is not None:
  104. raise NotImplementedError('expected_regexp is unsupported')
  105. self.expected = expected
  106. self.exception = None
  107. self.failureException = test_case.failureException
  108. def __enter__(self):
  109. return self
  110. def __exit__(self, exc_type, exc_value, tb):
  111. if exc_type is None:
  112. return True
  113. try:
  114. exc_name = self.expected.__name__
  115. except AttributeError:
  116. exc_name = str(self.expected)
  117. if issubclass(exc_type, self.expected):
  118. raise self.failureException(
  119. "{0} raised".format(exc_name))
  120. else:
  121. # pass through
  122. return False
  123. self.exception = exc_value # store for later retrieval
  124. class BeforeCleanExit(BaseException):
  125. '''Raised from :py:meth:`QubesTestCase.tearDown` when
  126. :py:attr:`qubes.tests.run.QubesDNCTestResult.do_not_clean` is set.'''
  127. pass
  128. class QubesTestCase(unittest.TestCase):
  129. '''Base class for Qubes unit tests.
  130. '''
  131. def __init__(self, *args, **kwargs):
  132. super(QubesTestCase, self).__init__(*args, **kwargs)
  133. self.longMessage = True
  134. self.log = logging.getLogger('{}.{}.{}'.format(
  135. self.__class__.__module__,
  136. self.__class__.__name__,
  137. self._testMethodName))
  138. def __str__(self):
  139. return '{}/{}/{}'.format(
  140. '.'.join(self.__class__.__module__.split('.')[2:]),
  141. self.__class__.__name__,
  142. self._testMethodName)
  143. def tearDown(self):
  144. super(QubesTestCase, self).tearDown()
  145. result = self._resultForDoCleanups
  146. failed_test_cases = result.failures \
  147. + result.errors \
  148. + [(tc, None) for tc in result.unexpectedSuccesses]
  149. if getattr(result, 'do_not_clean', False) \
  150. and any(tc is self for tc, exc in failed_test_cases):
  151. raise BeforeCleanExit()
  152. def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
  153. """Fail if an exception of class excClass is raised
  154. by callableObj when invoked with arguments args and keyword
  155. arguments kwargs. If a different type of exception is
  156. raised, it will not be caught, and the test case will be
  157. deemed to have suffered an error, exactly as for an
  158. unexpected exception.
  159. If called with callableObj omitted or None, will return a
  160. context object used like this::
  161. with self.assertRaises(SomeException):
  162. do_something()
  163. The context manager keeps a reference to the exception as
  164. the 'exception' attribute. This allows you to inspect the
  165. exception after the assertion::
  166. with self.assertRaises(SomeException) as cm:
  167. do_something()
  168. the_exception = cm.exception
  169. self.assertEqual(the_exception.error_code, 3)
  170. """
  171. context = _AssertNotRaisesContext(excClass, self)
  172. if callableObj is None:
  173. return context
  174. with context:
  175. callableObj(*args, **kwargs)
  176. def assertXMLEqual(self, xml1, xml2):
  177. '''Check for equality of two XML objects.
  178. :param xml1: first element
  179. :param xml2: second element
  180. :type xml1: :py:class:`lxml.etree._Element`
  181. :type xml2: :py:class:`lxml.etree._Element`
  182. ''' # pylint: disable=invalid-name
  183. self.assertEqual(xml1.tag, xml2.tag)
  184. self.assertEqual(xml1.text, xml2.text)
  185. self.assertItemsEqual(xml1.keys(), xml2.keys())
  186. for key in xml1.keys():
  187. self.assertEqual(xml1.get(key), xml2.get(key))
  188. def assertEventFired(self, emitter, event, args=None, kwargs=None):
  189. '''Check whether event was fired on given emitter and fail if it did
  190. not.
  191. :param emitter: emitter which is being checked
  192. :type emitter: :py:class:`TestEmitter`
  193. :param str event: event identifier
  194. :param list args: when given, all items must appear in args passed to \
  195. an event
  196. :param list kwargs: when given, all items must appear in kwargs passed \
  197. to an event
  198. '''
  199. for ev, ev_args, ev_kwargs in emitter.fired_events:
  200. if ev != event:
  201. continue
  202. if args is not None and any(i not in ev_args for i in args):
  203. continue
  204. if kwargs is not None and any(i not in ev_kwargs for i in kwargs):
  205. continue
  206. return
  207. self.fail('event {!r} did not fire on {!r}'.format(event, emitter))
  208. def assertEventNotFired(self, emitter, event, args=None, kwargs=None):
  209. '''Check whether event was fired on given emitter. Fail if it did.
  210. :param emitter: emitter which is being checked
  211. :type emitter: :py:class:`TestEmitter`
  212. :param str event: event identifier
  213. :param list args: when given, all items must appear in args passed to \
  214. an event
  215. :param list kwargs: when given, all items must appear in kwargs passed \
  216. to an event
  217. '''
  218. for ev, ev_args, ev_kwargs in emitter.fired_events:
  219. if ev != event:
  220. continue
  221. if args is not None and any(i not in ev_args for i in args):
  222. continue
  223. if kwargs is not None and any(i not in ev_kwargs for i in kwargs):
  224. continue
  225. self.fail('event {!r} did fire on {!r}'.format(event, emitter))
  226. return
  227. def assertXMLIsValid(self, xml, file=None, schema=None):
  228. '''Check whether given XML fulfills Relax NG schema.
  229. Schema can be given in a couple of ways:
  230. - As separate file. This is most common, and also the only way to
  231. handle file inclusion. Call with file name as second argument.
  232. - As string containing actual schema. Put that string in *schema*
  233. keyword argument.
  234. :param lxml.etree._Element xml: XML element instance to check
  235. :param str file: filename of Relax NG schema
  236. :param str schema: optional explicit schema string
  237. ''' # pylint: disable=redefined-builtin
  238. if schema is not None and file is None:
  239. relaxng = schema
  240. if isinstance(relaxng, str):
  241. relaxng = lxml.etree.XML(relaxng)
  242. # pylint: disable=protected-access
  243. if isinstance(relaxng, lxml.etree._Element):
  244. relaxng = lxml.etree.RelaxNG(relaxng)
  245. elif file is not None and schema is None:
  246. if not os.path.isabs(file):
  247. basedirs = ['/usr/share/doc/qubes/relaxng']
  248. if in_git:
  249. basedirs.insert(0, os.path.join(in_git, 'relaxng'))
  250. for basedir in basedirs:
  251. abspath = os.path.join(basedir, file)
  252. if os.path.exists(abspath):
  253. file = abspath
  254. break
  255. relaxng = lxml.etree.RelaxNG(file=file)
  256. else:
  257. raise TypeError("There should be excactly one of 'file' and "
  258. "'schema' arguments specified.")
  259. # We have to be extra careful here in case someone messed up with
  260. # self.failureException. It should by default be AssertionError, just
  261. # what is spewed by RelaxNG(), but who knows what might happen.
  262. try:
  263. relaxng.assert_(xml)
  264. except self.failureException:
  265. raise
  266. except AssertionError as e:
  267. self.fail(str(e))
  268. class SystemTestsMixin(object):
  269. def setUp(self):
  270. super(SystemTestsMixin, self).setUp()
  271. self.remove_test_vms()
  272. def tearDown(self):
  273. super(SystemTestsMixin, self).tearDown()
  274. self.remove_test_vms()
  275. @staticmethod
  276. def make_vm_name(name):
  277. return VMPREFIX + name
  278. def _remove_vm_qubes(self, vm):
  279. vmname = vm.name
  280. app = vm.app
  281. try:
  282. # XXX .is_running() may throw libvirtError if undefined
  283. if vm.is_running():
  284. vm.force_shutdown()
  285. except: # pylint: disable=bare-except
  286. pass
  287. try:
  288. vm.remove_from_disk()
  289. except: # pylint: disable=bare-except
  290. pass
  291. try:
  292. vm.libvirt_domain.undefine()
  293. except (AttributeError, libvirt.libvirtError):
  294. pass
  295. del app.domains[vm]
  296. del vm
  297. app.save()
  298. del app
  299. # Now ensure it really went away. This may not have happened,
  300. # for example if vm.libvirt_domain malfunctioned.
  301. try:
  302. conn = libvirt.open(qubes.config.defaults['libvirt_uri'])
  303. dom = conn.lookupByName(vmname)
  304. except: # pylint: disable=bare-except
  305. pass
  306. else:
  307. self._remove_vm_libvirt(dom)
  308. self._remove_vm_disk(vmname)
  309. @staticmethod
  310. def _remove_vm_libvirt(dom):
  311. try:
  312. dom.destroy()
  313. except libvirt.libvirtError: # not running
  314. pass
  315. dom.undefine()
  316. @staticmethod
  317. def _remove_vm_disk(vmname):
  318. for dirspec in (
  319. 'qubes_appvms_dir',
  320. 'qubes_servicevms_dir',
  321. 'qubes_templates_dir'):
  322. dirpath = os.path.join(qubes.config.system_path['qubes_base_dir'],
  323. qubes.config.system_path[dirspec], vmname)
  324. if os.path.exists(dirpath):
  325. if os.path.isdir(dirpath):
  326. shutil.rmtree(dirpath)
  327. else:
  328. os.unlink(dirpath)
  329. def remove_vms(self, vms):
  330. for vm in vms:
  331. self._remove_vm_qubes(vm)
  332. def remove_test_vms(self):
  333. '''Aggresively remove any domain that has name in testing namespace.
  334. .. warning::
  335. The test suite hereby claims any domain whose name starts with
  336. :py:data:`VMPREFIX` as fair game. This is needed to enforce sane
  337. test executing environment. If you have domains named ``test-*``,
  338. don't run the tests.
  339. '''
  340. # first, remove them Qubes-way
  341. if os.path.exists(XMLPATH):
  342. self.remove_vms(vm for vm in qubes.Qubes(XMLPATH).domains
  343. if vm.name != TEMPLATE)
  344. os.unlink(XMLPATH)
  345. # now remove what was only in libvirt
  346. conn = libvirt.open(qubes.config.defaults['libvirt_uri'])
  347. for dom in conn.listAllDomains():
  348. if dom.name().startswith(VMPREFIX):
  349. self._remove_vm_libvirt(dom)
  350. # finally remove anything that is left on disk
  351. vmnames = set()
  352. for dirspec in (
  353. 'qubes_appvms_dir',
  354. 'qubes_servicevms_dir',
  355. 'qubes_templates_dir'):
  356. dirpath = os.path.join(qubes.config.system_path['qubes_base_dir'],
  357. qubes.config.system_path[dirspec])
  358. for name in os.listdir(dirpath):
  359. if name.startswith(VMPREFIX):
  360. vmnames.add(name)
  361. for vmname in vmnames:
  362. self._remove_vm_disk(vmname)
  363. def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
  364. # discard any tests from this module, because it hosts base classes
  365. tests = unittest.TestSuite()
  366. for modname in (
  367. # unit tests
  368. 'qubes.tests.events',
  369. 'qubes.tests.init1',
  370. 'qubes.tests.vm.init',
  371. 'qubes.tests.vm.qubesvm',
  372. 'qubes.tests.vm.adminvm',
  373. 'qubes.tests.init2',
  374. 'qubes.tests.tools',
  375. # integration tests
  376. # 'qubes.tests.int.basic',
  377. # 'qubes.tests.dom0_update',
  378. # 'qubes.tests.network',
  379. # 'qubes.tests.vm_qrexec_gui',
  380. # 'qubes.tests.backup',
  381. # 'qubes.tests.backupcompatibility',
  382. # 'qubes.tests.regressions',
  383. # tool tests
  384. 'qubes.tests.int.tools.qubes_create',
  385. 'qubes.tests.int.tools.qvm_run',
  386. ):
  387. tests.addTests(loader.loadTestsFromName(modname))
  388. return tests