__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #!/usr/bin/python2 -O
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. import multiprocessing
  25. import os
  26. import shutil
  27. import unittest
  28. import libvirt
  29. import qubes.backup
  30. import qubes.qubes
  31. VMPREFIX = 'test-'
  32. class _AssertNotRaisesContext(object):
  33. """A context manager used to implement TestCase.assertNotRaises methods.
  34. Stolen from unittest and hacked. Regexp support stripped.
  35. """
  36. def __init__(self, expected, test_case, expected_regexp=None):
  37. self.expected = expected
  38. self.failureException = test_case.failureException
  39. def __enter__(self):
  40. return self
  41. def __exit__(self, exc_type, exc_value, tb):
  42. if exc_type is None:
  43. return True
  44. try:
  45. exc_name = self.expected.__name__
  46. except AttributeError:
  47. exc_name = str(self.expected)
  48. if issubclass(exc_type, self.expected):
  49. raise self.failureException(
  50. "{0} raised".format(exc_name))
  51. else:
  52. # pass through
  53. return False
  54. self.exception = exc_value # store for later retrieval
  55. class SystemTestsMixin(object):
  56. def setUp(self):
  57. '''Set up the test.
  58. .. warning::
  59. This method instantiates QubesVmCollection acquires write lock for
  60. it. You can use is as :py:attr:`qc`. Do not release the lock
  61. yourself.
  62. '''
  63. super(SystemTestsMixin, self).setUp()
  64. self.qc = qubes.qubes.QubesVmCollection()
  65. self.qc.lock_db_for_writing()
  66. self.qc.load()
  67. self.conn = libvirt.open(qubes.qubes.defaults['libvirt_uri'])
  68. self.remove_test_vms()
  69. def tearDown(self):
  70. super(SystemTestsMixin, self).tearDown()
  71. self.remove_test_vms()
  72. self.qc.save()
  73. self.qc.unlock_db()
  74. del self.qc
  75. self.conn.close()
  76. def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
  77. """Fail if an exception of class excClass is raised
  78. by callableObj when invoked with arguments args and keyword
  79. arguments kwargs. If a different type of exception is
  80. raised, it will not be caught, and the test case will be
  81. deemed to have suffered an error, exactly as for an
  82. unexpected exception.
  83. If called with callableObj omitted or None, will return a
  84. context object used like this::
  85. with self.assertRaises(SomeException):
  86. do_something()
  87. The context manager keeps a reference to the exception as
  88. the 'exception' attribute. This allows you to inspect the
  89. exception after the assertion::
  90. with self.assertRaises(SomeException) as cm:
  91. do_something()
  92. the_exception = cm.exception
  93. self.assertEqual(the_exception.error_code, 3)
  94. """
  95. context = _AssertNotRaisesContext(excClass, self)
  96. if callableObj is None:
  97. return context
  98. with context:
  99. callableObj(*args, **kwargs)
  100. def make_vm_name(self, name):
  101. return VMPREFIX + name
  102. def _remove_vm_qubes(self, vm):
  103. vmname = vm.name
  104. try:
  105. # XXX .is_running() may throw libvirtError if undefined
  106. if vm.is_running():
  107. vm.force_shutdown()
  108. except: pass
  109. try: vm.remove_from_disk()
  110. except: pass
  111. try: vm.libvirtDomain.undefine()
  112. except (AttributeError, libvirt.libvirtError): pass
  113. self.qc.pop(vm.qid)
  114. self.qc.save()
  115. del vm
  116. # Now ensure it really went away. This may not have happened,
  117. # for example if vm.libvirtDomain malfunctioned.
  118. try:
  119. dom = self.conn.lookupByName(vmname)
  120. except: pass
  121. else:
  122. self._remove_vm_libvirt(dom)
  123. self._remove_vm_disk(vmname)
  124. def _remove_vm_libvirt(self, dom):
  125. try:
  126. dom.destroy()
  127. except libvirt.libvirtError: # not running
  128. pass
  129. dom.undefine()
  130. def _remove_vm_disk(self, vmname):
  131. for dirspec in (
  132. 'qubes_appvms_dir',
  133. 'qubes_servicevms_dir',
  134. 'qubes_templates_dir'):
  135. dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'],
  136. qubes.qubes.system_path[dirspec], vmname)
  137. if os.path.exists(dirpath):
  138. if os.path.isdir(dirpath):
  139. shutil.rmtree(dirpath)
  140. else:
  141. os.unlink(dirpath)
  142. def remove_vms(self, vms):
  143. for vm in vms: self._remove_vm_qubes(vm)
  144. def remove_test_vms(self):
  145. '''Aggresively remove any domain that has name in testing namespace.
  146. .. warning::
  147. The test suite hereby claims any domain whose name starts with
  148. :py:data:`VMPREFIX` as fair game. This is needed to enforce sane
  149. test executing environment. If you have domains named ``test-*``,
  150. don't run the tests.
  151. '''
  152. # first, remove them Qubes-way
  153. for vm in self.qc.values():
  154. if vm.name.startswith(VMPREFIX):
  155. self._remove_vm_qubes(vm)
  156. # now remove what was only in libvirt
  157. for dom in self.conn.listAllDomains():
  158. if dom.name().startswith(VMPREFIX):
  159. self._remove_vm_libvirt(dom)
  160. # finally remove anything that is left on disk
  161. vmnames = set()
  162. for dirspec in (
  163. 'qubes_appvms_dir',
  164. 'qubes_servicevms_dir',
  165. 'qubes_templates_dir'):
  166. dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'],
  167. qubes.qubes.system_path[dirspec])
  168. for name in os.listdir(dirpath):
  169. if name.startswith(VMPREFIX):
  170. vmnames.add(name)
  171. for vmname in vmnames:
  172. self._remove_vm_disk(vmname)
  173. class BackupTestsMixin(SystemTestsMixin):
  174. def setUp(self):
  175. super(BackupTestsMixin, self).setUp()
  176. self.error_detected = multiprocessing.Queue()
  177. self.verbose = False
  178. if self.verbose:
  179. print >>sys.stderr, "-> Creating backupvm"
  180. self.backupvm = self.qc.add_new_vm("QubesAppVm",
  181. name=self.make_vm_name('backupvm'),
  182. template=self.qc.get_default_template())
  183. self.backupvm.create_on_disk(verbose=self.verbose)
  184. self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
  185. if os.path.exists(self.backupdir):
  186. shutil.rmtree(self.backupdir)
  187. os.mkdir(self.backupdir)
  188. def tearDown(self):
  189. super(BackupTestsMixin, self).tearDown()
  190. shutil.rmtree(self.backupdir)
  191. def print_progress(self, progress):
  192. if self.verbose:
  193. print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress)
  194. def error_callback(self, message):
  195. self.error_detected.put(message)
  196. if self.verbose:
  197. print >> sys.stderr, "ERROR: {0}".format(message)
  198. def print_callback(self, msg):
  199. if self.verbose:
  200. print msg
  201. def fill_image(self, path, size=None, sparse=False):
  202. block_size = 4096
  203. if self.verbose:
  204. print >>sys.stderr, "-> Filling %s" % path
  205. f = open(path, 'w+')
  206. if size is None:
  207. f.seek(0, 2)
  208. size = f.tell()
  209. f.seek(0)
  210. for block_num in xrange(size/block_size):
  211. f.write('a' * block_size)
  212. if sparse:
  213. f.seek(block_size, 1)
  214. f.close()
  215. # NOTE: this was create_basic_vms
  216. def create_backup_vms(self):
  217. template=self.qc.get_default_template()
  218. vms = []
  219. vmname = self.make_vm_name('test1')
  220. if self.verbose:
  221. print >>sys.stderr, "-> Creating %s" % vmname
  222. testvm1 = self.qc.add_new_vm('QubesAppVm',
  223. name=vmname, template=template)
  224. testvm1.create_on_disk(verbose=self.verbose)
  225. vms.append(testvm1)
  226. self.fill_image(testvm1.private_img, 100*1024*1024)
  227. vmname = self.make_vm_name('testhvm1')
  228. if self.verbose:
  229. print >>sys.stderr, "-> Creating %s" % vmname
  230. testvm2 = self.qc.add_new_vm('QubesHVm', name=vmname)
  231. testvm2.create_on_disk(verbose=self.verbose)
  232. self.fill_image(testvm2.root_img, 1024*1024*1024, True)
  233. vms.append(testvm2)
  234. return vms
  235. def make_backup(self, vms, prepare_kwargs=dict(), do_kwargs=dict(),
  236. target=None):
  237. # XXX: bakup_prepare and backup_do don't support host_collection
  238. self.qc.unlock_db()
  239. if target is None:
  240. target = self.backupdir
  241. try:
  242. files_to_backup = \
  243. qubes.backup.backup_prepare(vms,
  244. print_callback=self.print_callback,
  245. **prepare_kwargs)
  246. except QubesException as e:
  247. self.fail("QubesException during backup_prepare: %s" % str(e))
  248. try:
  249. qubes.backup.backup_do(target, files_to_backup, "qubes",
  250. progress_callback=self.print_progress,
  251. **do_kwargs)
  252. except QubesException as e:
  253. self.fail("QubesException during backup_do: %s" % str(e))
  254. self.qc.lock_db_for_writing()
  255. self.qc.load()
  256. def restore_backup(self, source=None, appvm=None):
  257. if source is None:
  258. backupfile = os.path.join(self.backupdir,
  259. sorted(os.listdir(self.backupdir))[-1])
  260. else:
  261. backupfile = source
  262. with self.assertNotRaises(qubes.qubes.QubesException):
  263. backup_info = qubes.backup.backup_restore_prepare(
  264. backupfile, "qubes",
  265. host_collection=self.qc,
  266. print_callback=self.print_callback,
  267. appvm=appvm)
  268. if self.verbose:
  269. qubes.backup.backup_restore_print_summary(backup_info)
  270. with self.assertNotRaises(qubes.qubes.QubesException):
  271. qubes.backup.backup_restore_do(
  272. backup_info,
  273. host_collection=self.qc,
  274. print_callback=self.print_callback if self.verbose else None,
  275. error_callback=self.error_callback)
  276. errors = []
  277. while not self.error_detected.empty():
  278. errors.append(self.error_detected.get())
  279. self.assertTrue(len(errors) == 0,
  280. "Error(s) detected during backup_restore_do: %s" %
  281. '\n'.join(errors))
  282. if not appvm:
  283. os.unlink(backupfile)
  284. def create_sparse(self, path, size):
  285. f = open(path, "w")
  286. f.truncate(size)
  287. f.close()
  288. def load_tests(loader, tests, pattern):
  289. for modname in (
  290. 'qubes.tests.basic',
  291. 'qubes.tests.network',
  292. 'qubes.tests.vm_qrexec_gui',
  293. 'qubes.tests.backup',
  294. 'qubes.tests.backupcompatibility',
  295. 'qubes.tests.regressions',
  296. ):
  297. tests.addTests(loader.loadTestsFromName(modname))
  298. return tests
  299. # vim: ts=4 sw=4 et