extra.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2016
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import sys
  21. import asyncio
  22. import subprocess
  23. import pkg_resources
  24. import qubes.tests
  25. import qubes.vm.appvm
  26. class ProcessWrapper(object):
  27. def __init__(self, proc, loop=None):
  28. self._proc = proc
  29. self._loop = loop or asyncio.get_event_loop()
  30. def __getattr__(self, item):
  31. return getattr(self._proc, item)
  32. def __setattr__(self, key, value):
  33. if key.startswith('_'):
  34. return super(ProcessWrapper, self).__setattr__(key, value)
  35. return setattr(self._proc, key, value)
  36. def communicate(self, input=None):
  37. return self._loop.run_until_complete(self._proc.communicate(input))
  38. class VMWrapper(object):
  39. '''Wrap VM object to provide stable API for basic operations'''
  40. def __init__(self, vm, loop=None):
  41. self._vm = vm
  42. self._loop = loop or asyncio.get_event_loop()
  43. def __getattr__(self, item):
  44. return getattr(self._vm, item)
  45. def __setattr__(self, key, value):
  46. if key.startswith('_'):
  47. return super(VMWrapper, self).__setattr__(key, value)
  48. return setattr(self._vm, key, value)
  49. def __str__(self):
  50. return str(self._vm)
  51. def __eq__(self, other):
  52. return self._vm == other
  53. def __hash__(self):
  54. return hash(self._vm)
  55. def start(self):
  56. return self._loop.run_until_complete(self._vm.start())
  57. def shutdown(self):
  58. return self._loop.run_until_complete(self._vm.shutdown())
  59. def run(self, command, wait=False, user=None, passio_popen=False,
  60. passio_stderr=False, **kwargs):
  61. if wait:
  62. try:
  63. self._loop.run_until_complete(
  64. self._vm.run_for_stdio(command, user=user))
  65. except subprocess.CalledProcessError as err:
  66. return err.returncode
  67. return 0
  68. elif passio_popen:
  69. p = self._loop.run_until_complete(self._vm.run(command, user=user,
  70. stdin=subprocess.PIPE,
  71. stdout=subprocess.PIPE,
  72. stderr=subprocess.PIPE if passio_stderr else None))
  73. return ProcessWrapper(p, self._loop)
  74. def run_service(self, service, wait=True, input=None, user=None,
  75. passio_popen=False,
  76. passio_stderr=False, **kwargs):
  77. if wait:
  78. try:
  79. if isinstance(input, str):
  80. input = input.encode()
  81. self._loop.run_until_complete(
  82. self._vm.run_service_for_stdio(service,
  83. input=input, user=user))
  84. except subprocess.CalledProcessError as err:
  85. return err.returncode
  86. return 0
  87. elif passio_popen:
  88. p = self._loop.run_until_complete(self._vm.run_service(service,
  89. user=user,
  90. stdin=subprocess.PIPE,
  91. stdout=subprocess.PIPE,
  92. stderr=subprocess.PIPE if passio_stderr else None))
  93. return ProcessWrapper(p, self._loop)
  94. class ExtraTestCase(qubes.tests.SystemTestCase):
  95. template = None
  96. def setUp(self):
  97. super(ExtraTestCase, self).setUp()
  98. self.init_default_template(self.template)
  99. if self.template is not None:
  100. # also use this template for DispVMs
  101. dispvm_base = self.app.add_new_vm('AppVM',
  102. name=self.make_vm_name('dvm'),
  103. template=self.template, label='red', template_for_dispvms=True)
  104. self.loop.run_until_complete(dispvm_base.create_on_disk())
  105. self.app.default_dispvm = dispvm_base
  106. def tearDown(self):
  107. self.app.default_dispvm = None
  108. super(ExtraTestCase, self).tearDown()
  109. def create_vms(self, names):
  110. """
  111. Create AppVMs for the duration of the test. Will be automatically
  112. removed after completing the test.
  113. :param names: list of VM names to create (each of them will be
  114. prefixed with some test specific string)
  115. :return: list of created VM objects
  116. """
  117. if self.template:
  118. template = self.app.domains[self.template]
  119. else:
  120. template = self.app.default_template
  121. for vmname in names:
  122. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  123. name=self.make_vm_name(vmname),
  124. template=template,
  125. label='red')
  126. self.loop.run_until_complete(vm.create_on_disk())
  127. self.app.save()
  128. # get objects after reload
  129. vms = []
  130. for vmname in names:
  131. vms.append(VMWrapper(self.app.domains[self.make_vm_name(vmname)],
  132. loop=self.loop))
  133. return vms
  134. def enable_network(self):
  135. """
  136. Enable access to the network. Must be called before creating VMs.
  137. """
  138. self.init_networking()
  139. def qrexec_policy(self, service, source, destination, allow=True):
  140. """
  141. Allow qrexec calls for duration of the test
  142. :param service: service name
  143. :param source: source VM name
  144. :param destination: destination VM name
  145. :return:
  146. """
  147. def add_remove_rule(add=True):
  148. with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
  149. policy_rules = policy.readlines()
  150. rule = "{} {} {}\n".format(source, destination,
  151. 'allow' if allow else 'deny')
  152. if add:
  153. policy_rules.insert(0, rule)
  154. else:
  155. policy_rules.remove(rule)
  156. policy.truncate(0)
  157. policy.seek(0)
  158. policy.write(''.join(policy_rules))
  159. add_remove_rule(add=True)
  160. self.addCleanup(add_remove_rule, add=False)
  161. def load_tests(loader, tests, pattern):
  162. for entry in pkg_resources.iter_entry_points('qubes.tests.extra'):
  163. try:
  164. for test_case in entry.load()():
  165. tests.addTests(loader.loadTestsFromTestCase(test_case))
  166. except Exception as err: # pylint: disable=broad-except
  167. def runTest(self):
  168. raise err
  169. ExtraLoadFailure = type('ExtraLoadFailure',
  170. (qubes.tests.QubesTestCase,),
  171. {entry.name: runTest})
  172. tests.addTest(ExtraLoadFailure(entry.name))
  173. for entry in pkg_resources.iter_entry_points(
  174. 'qubes.tests.extra.for_template'):
  175. try:
  176. for test_case in entry.load()():
  177. for template in qubes.tests.list_templates():
  178. tests.addTests(loader.loadTestsFromTestCase(
  179. type(
  180. '{}_{}_{}'.format(
  181. entry.name, test_case.__name__, template),
  182. (test_case,),
  183. {'template': template}
  184. )
  185. ))
  186. except Exception as err: # pylint: disable=broad-except
  187. def runTest(self):
  188. raise err
  189. ExtraForTemplateLoadFailure = type('ExtraForTemplateLoadFailure',
  190. (qubes.tests.QubesTestCase,),
  191. {entry.name: runTest})
  192. tests.addTest(ExtraForTemplateLoadFailure(entry.name))
  193. return tests