123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2016
- # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- import asyncio
- import os
- import subprocess
- import sys
- import pkg_resources
- import qubes.tests
- import qubes.vm.appvm
- class ProcessWrapper(object):
- def __init__(self, proc, loop=None):
- self._proc = proc
- self._loop = loop or asyncio.get_event_loop()
- def __getattr__(self, item):
- return getattr(self._proc, item)
- def __setattr__(self, key, value):
- if key.startswith('_'):
- return super(ProcessWrapper, self).__setattr__(key, value)
- return setattr(self._proc, key, value)
- def communicate(self, input=None):
- return self._loop.run_until_complete(self._proc.communicate(input))
- def wait(self):
- return self._loop.run_until_complete(self._proc.wait())
- class VMWrapper(object):
- '''Wrap VM object to provide stable API for basic operations'''
- def __init__(self, vm, loop=None):
- self._vm = vm
- self._loop = loop or asyncio.get_event_loop()
- def __getattr__(self, item):
- return getattr(self._vm, item)
- def __setattr__(self, key, value):
- if key.startswith('_'):
- return super(VMWrapper, self).__setattr__(key, value)
- return setattr(self._vm, key, value)
- def __str__(self):
- return str(self._vm)
- def __eq__(self, other):
- return self._vm == other
- def __hash__(self):
- return hash(self._vm)
- def start(self, start_guid=True):
- return self._loop.run_until_complete(
- self._vm.start(start_guid=start_guid))
- def shutdown(self):
- return self._loop.run_until_complete(self._vm.shutdown())
- def run(self, command, wait=False, user=None, passio_popen=False,
- passio_stderr=False, **kwargs):
- if wait:
- try:
- self._loop.run_until_complete(
- self._vm.run_for_stdio(command, user=user))
- except subprocess.CalledProcessError as err:
- return err.returncode
- return 0
- elif passio_popen:
- p = self._loop.run_until_complete(self._vm.run(command, user=user,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE if passio_stderr else None))
- return ProcessWrapper(p, self._loop)
- else:
- asyncio.ensure_future(self._vm.run_for_stdio(command, user=user),
- loop=self._loop)
- def run_service(self, service, wait=True, input=None, user=None,
- passio_popen=False,
- passio_stderr=False, **kwargs):
- if wait:
- try:
- if isinstance(input, str):
- input = input.encode()
- self._loop.run_until_complete(
- self._vm.run_service_for_stdio(service,
- input=input, user=user))
- except subprocess.CalledProcessError as err:
- return err.returncode
- return 0
- elif passio_popen:
- p = self._loop.run_until_complete(self._vm.run_service(service,
- user=user,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE if passio_stderr else None))
- return ProcessWrapper(p, self._loop)
- class ExtraTestCase(qubes.tests.SystemTestCase):
- template = None
- def setUp(self):
- super(ExtraTestCase, self).setUp()
- self.init_default_template(self.template)
- if self.template is not None:
- # also use this template for DispVMs
- dispvm_base = self.app.add_new_vm('AppVM',
- name=self.make_vm_name('dvm'),
- template=self.template, label='red', template_for_dispvms=True)
- self.loop.run_until_complete(dispvm_base.create_on_disk())
- self.app.default_dispvm = dispvm_base
- def tearDown(self):
- self.app.default_dispvm = None
- super(ExtraTestCase, self).tearDown()
- def create_vms(self, names):
- """
- Create AppVMs for the duration of the test. Will be automatically
- removed after completing the test.
- :param names: list of VM names to create (each of them will be
- prefixed with some test specific string)
- :return: list of created VM objects
- """
- if self.template:
- template = self.app.domains[self.template]
- else:
- template = self.app.default_template
- for vmname in names:
- vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
- name=self.make_vm_name(vmname),
- template=template,
- label='red')
- self.loop.run_until_complete(vm.create_on_disk())
- self.app.save()
- # get objects after reload
- vms = []
- for vmname in names:
- vms.append(VMWrapper(self.app.domains[self.make_vm_name(vmname)],
- loop=self.loop))
- return vms
- def enable_network(self):
- """
- Enable access to the network. Must be called before creating VMs.
- """
- self.init_networking()
- def qrexec_policy(self, service, source, destination, allow=True):
- """
- Allow qrexec calls for duration of the test
- :param service: service name
- :param source: source VM name
- :param destination: destination VM name
- :return:
- """
- def add_remove_rule(add=True):
- with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
- policy_rules = policy.readlines()
- rule = "{} {} {}\n".format(source, destination,
- 'allow' if allow else 'deny')
- if add:
- policy_rules.insert(0, rule)
- else:
- policy_rules.remove(rule)
- policy.truncate(0)
- policy.seek(0)
- policy.write(''.join(policy_rules))
- add_remove_rule(add=True)
- self.addCleanup(add_remove_rule, add=False)
- def load_tests(loader, tests, pattern):
- include_list = None
- if 'QUBES_TEST_EXTRA_INCLUDE' in os.environ:
- include_list = os.environ['QUBES_TEST_EXTRA_INCLUDE'].split()
- exclude_list = []
- if 'QUBES_TEST_EXTRA_EXCLUDE' in os.environ:
- exclude_list = os.environ['QUBES_TEST_EXTRA_EXCLUDE'].split()
- for entry in pkg_resources.iter_entry_points('qubes.tests.extra'):
- if include_list is not None and entry.name not in include_list:
- continue
- if entry.name in exclude_list:
- continue
- try:
- for test_case in entry.load()():
- tests.addTests(loader.loadTestsFromNames([
- '{}.{}'.format(test_case.__module__, test_case.__name__)]))
- except Exception as err: # pylint: disable=broad-except
- def runTest(self):
- raise err
- ExtraLoadFailure = type('ExtraLoadFailure',
- (qubes.tests.QubesTestCase,),
- {entry.name: runTest})
- tests.addTest(ExtraLoadFailure(entry.name))
- for entry in pkg_resources.iter_entry_points(
- 'qubes.tests.extra.for_template'):
- if include_list is not None and entry.name not in include_list:
- continue
- if entry.name in exclude_list:
- continue
- try:
- for test_case in entry.load()():
- tests.addTests(loader.loadTestsFromNames(
- qubes.tests.create_testcases_for_templates(
- test_case.__name__, test_case,
- module=sys.modules[test_case.__module__])))
- except Exception as err: # pylint: disable=broad-except
- def runTest(self):
- raise err
- ExtraForTemplateLoadFailure = type('ExtraForTemplateLoadFailure',
- (qubes.tests.QubesTestCase,),
- {entry.name: runTest})
- tests.addTest(ExtraForTemplateLoadFailure(entry.name))
- return tests
|