extra.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2016
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import asyncio
  21. import os
  22. import subprocess
  23. import sys
  24. import pkg_resources
  25. import qubes.tests
  26. import qubes.vm.appvm
  27. class ProcessWrapper(object):
  28. def __init__(self, proc, loop=None):
  29. self._proc = proc
  30. self._loop = loop or asyncio.get_event_loop()
  31. def __getattr__(self, item):
  32. return getattr(self._proc, item)
  33. def __setattr__(self, key, value):
  34. if key.startswith('_'):
  35. return super(ProcessWrapper, self).__setattr__(key, value)
  36. return setattr(self._proc, key, value)
  37. def communicate(self, input=None):
  38. if self._proc.stdin is not None and input is None:
  39. input = b''
  40. return self._loop.run_until_complete(self._proc.communicate(input))
  41. def wait(self):
  42. return self._loop.run_until_complete(self._proc.wait())
  43. class VMWrapper(object):
  44. '''Wrap VM object to provide stable API for basic operations'''
  45. def __init__(self, vm, loop=None):
  46. self._vm = vm
  47. self._loop = loop or asyncio.get_event_loop()
  48. def __getattr__(self, item):
  49. return getattr(self._vm, item)
  50. def __setattr__(self, key, value):
  51. if key.startswith('_'):
  52. return super(VMWrapper, self).__setattr__(key, value)
  53. return setattr(self._vm, key, value)
  54. def __str__(self):
  55. return str(self._vm)
  56. def __eq__(self, other):
  57. return self._vm == other
  58. def __hash__(self):
  59. return hash(self._vm)
  60. def start(self, start_guid=True):
  61. return self._loop.run_until_complete(
  62. self._vm.start(start_guid=start_guid))
  63. def shutdown(self):
  64. return self._loop.run_until_complete(self._vm.shutdown())
  65. def run(self, command, wait=False, user=None, passio_popen=False,
  66. passio_stderr=False, gui=False, **kwargs):
  67. if gui:
  68. try:
  69. self._loop.run_until_complete(
  70. self._vm.run_service_for_stdio('qubes.WaitForSession',
  71. user=user))
  72. except subprocess.CalledProcessError as err:
  73. return err.returncode
  74. if wait:
  75. try:
  76. self._loop.run_until_complete(
  77. self._vm.run_for_stdio(command, user=user))
  78. except subprocess.CalledProcessError as err:
  79. return err.returncode
  80. return 0
  81. elif passio_popen:
  82. p = self._loop.run_until_complete(self._vm.run(command, user=user,
  83. stdin=subprocess.PIPE,
  84. stdout=subprocess.PIPE,
  85. stderr=subprocess.PIPE if passio_stderr else None))
  86. return ProcessWrapper(p, self._loop)
  87. else:
  88. asyncio.ensure_future(self._vm.run_for_stdio(command, user=user),
  89. loop=self._loop)
  90. def run_service(self, service, wait=True, input=None, user=None,
  91. passio_popen=False,
  92. passio_stderr=False, **kwargs):
  93. if wait:
  94. try:
  95. if isinstance(input, str):
  96. input = input.encode()
  97. self._loop.run_until_complete(
  98. self._vm.run_service_for_stdio(service,
  99. input=input, user=user))
  100. except subprocess.CalledProcessError as err:
  101. return err.returncode
  102. return 0
  103. elif passio_popen:
  104. p = self._loop.run_until_complete(self._vm.run_service(service,
  105. user=user,
  106. stdin=subprocess.PIPE,
  107. stdout=subprocess.PIPE,
  108. stderr=subprocess.PIPE if passio_stderr else None))
  109. return ProcessWrapper(p, self._loop)
  110. class ExtraTestCase(qubes.tests.SystemTestCase):
  111. template = None
  112. def setUp(self):
  113. super(ExtraTestCase, self).setUp()
  114. self.init_default_template(self.template)
  115. if self.template is not None:
  116. # also use this template for DispVMs
  117. dispvm_base = self.app.add_new_vm('AppVM',
  118. name=self.make_vm_name('dvm'),
  119. template=self.template, label='red', template_for_dispvms=True)
  120. self.loop.run_until_complete(dispvm_base.create_on_disk())
  121. self.app.default_dispvm = dispvm_base
  122. def tearDown(self):
  123. self.app.default_dispvm = None
  124. super(ExtraTestCase, self).tearDown()
  125. def create_vms(self, names):
  126. """
  127. Create AppVMs for the duration of the test. Will be automatically
  128. removed after completing the test.
  129. :param names: list of VM names to create (each of them will be
  130. prefixed with some test specific string)
  131. :return: list of created VM objects
  132. """
  133. if self.template:
  134. template = self.app.domains[self.template]
  135. else:
  136. template = self.app.default_template
  137. for vmname in names:
  138. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  139. name=self.make_vm_name(vmname),
  140. template=template,
  141. label='red')
  142. self.loop.run_until_complete(vm.create_on_disk())
  143. self.app.save()
  144. # get objects after reload
  145. vms = []
  146. for vmname in names:
  147. vms.append(VMWrapper(self.app.domains[self.make_vm_name(vmname)],
  148. loop=self.loop))
  149. return vms
  150. def enable_network(self):
  151. """
  152. Enable access to the network. Must be called before creating VMs.
  153. """
  154. self.init_networking()
  155. def qrexec_policy(self, service, source, destination, allow=True):
  156. """
  157. Allow qrexec calls for duration of the test
  158. :param service: service name
  159. :param source: source VM name
  160. :param destination: destination VM name
  161. :return:
  162. """
  163. def add_remove_rule(add=True):
  164. with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
  165. policy_rules = policy.readlines()
  166. rule = "{} {} {}\n".format(source, destination,
  167. 'allow' if allow else 'deny')
  168. if add:
  169. policy_rules.insert(0, rule)
  170. else:
  171. policy_rules.remove(rule)
  172. policy.truncate(0)
  173. policy.seek(0)
  174. policy.write(''.join(policy_rules))
  175. add_remove_rule(add=True)
  176. self.addCleanup(add_remove_rule, add=False)
  177. def load_tests(loader, tests, pattern):
  178. include_list = None
  179. if 'QUBES_TEST_EXTRA_INCLUDE' in os.environ:
  180. include_list = os.environ['QUBES_TEST_EXTRA_INCLUDE'].split()
  181. exclude_list = []
  182. if 'QUBES_TEST_EXTRA_EXCLUDE' in os.environ:
  183. exclude_list = os.environ['QUBES_TEST_EXTRA_EXCLUDE'].split()
  184. for entry in pkg_resources.iter_entry_points('qubes.tests.extra'):
  185. if include_list is not None and entry.name not in include_list:
  186. continue
  187. if entry.name in exclude_list:
  188. continue
  189. try:
  190. for test_case in entry.resolve()():
  191. tests.addTests(loader.loadTestsFromNames([
  192. '{}.{}'.format(test_case.__module__, test_case.__name__)]))
  193. except Exception as err: # pylint: disable=broad-except
  194. def runTest(self, err=err):
  195. raise err
  196. ExtraLoadFailure = type('ExtraLoadFailure',
  197. (qubes.tests.QubesTestCase,),
  198. {entry.name: runTest})
  199. tests.addTest(ExtraLoadFailure(entry.name))
  200. for entry in pkg_resources.iter_entry_points(
  201. 'qubes.tests.extra.for_template'):
  202. if include_list is not None and entry.name not in include_list:
  203. continue
  204. if entry.name in exclude_list:
  205. continue
  206. try:
  207. for test_case in entry.resolve()():
  208. tests.addTests(loader.loadTestsFromNames(
  209. qubes.tests.create_testcases_for_templates(
  210. test_case.__name__, test_case,
  211. module=sys.modules[test_case.__module__])))
  212. except Exception as err: # pylint: disable=broad-except
  213. def runTest(self, err=err):
  214. raise err
  215. ExtraForTemplateLoadFailure = type('ExtraForTemplateLoadFailure',
  216. (qubes.tests.QubesTestCase,),
  217. {entry.name: runTest})
  218. tests.addTest(ExtraForTemplateLoadFailure(entry.name))
  219. return tests