'extra' tests run is getting ridiculously long. Allow splitting it into several jobs. Since this appears as just one class from the test loader perspective, implement it as environment variables: - QUBES_TEST_EXTRA_INCLUDE - load just selected tests - QUBES_TEST_EXTRA_EXCLUDE - skip selected tests (to select "the rest" tests)
		
			
				
	
	
		
			244 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			244 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
# The Qubes OS Project, https://www.qubes-os.org/
 | 
						|
#
 | 
						|
# Copyright (C) 2016
 | 
						|
#                   Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
 | 
						|
#
 | 
						|
# This library is free software; you can redistribute it and/or
 | 
						|
# modify it under the terms of the GNU Lesser General Public
 | 
						|
# License as published by the Free Software Foundation; either
 | 
						|
# version 2.1 of the License, or (at your option) any later version.
 | 
						|
#
 | 
						|
# This library is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | 
						|
# Lesser General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU Lesser General Public
 | 
						|
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
 | 
						|
#
 | 
						|
 | 
						|
import asyncio
 | 
						|
import os
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
 | 
						|
import pkg_resources
 | 
						|
 | 
						|
import qubes.tests
 | 
						|
import qubes.vm.appvm
 | 
						|
 | 
						|
class ProcessWrapper(object):
 | 
						|
    def __init__(self, proc, loop=None):
 | 
						|
        self._proc = proc
 | 
						|
        self._loop = loop or asyncio.get_event_loop()
 | 
						|
 | 
						|
    def __getattr__(self, item):
 | 
						|
        return getattr(self._proc, item)
 | 
						|
 | 
						|
    def __setattr__(self, key, value):
 | 
						|
        if key.startswith('_'):
 | 
						|
            return super(ProcessWrapper, self).__setattr__(key, value)
 | 
						|
        return setattr(self._proc, key, value)
 | 
						|
 | 
						|
    def communicate(self, input=None):
 | 
						|
        return self._loop.run_until_complete(self._proc.communicate(input))
 | 
						|
 | 
						|
    def wait(self):
 | 
						|
        return self._loop.run_until_complete(self._proc.wait())
 | 
						|
 | 
						|
class VMWrapper(object):
 | 
						|
    '''Wrap VM object to provide stable API for basic operations'''
 | 
						|
    def __init__(self, vm, loop=None):
 | 
						|
        self._vm = vm
 | 
						|
        self._loop = loop or asyncio.get_event_loop()
 | 
						|
 | 
						|
    def __getattr__(self, item):
 | 
						|
        return getattr(self._vm, item)
 | 
						|
 | 
						|
    def __setattr__(self, key, value):
 | 
						|
        if key.startswith('_'):
 | 
						|
            return super(VMWrapper, self).__setattr__(key, value)
 | 
						|
        return setattr(self._vm, key, value)
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return str(self._vm)
 | 
						|
 | 
						|
    def __eq__(self, other):
 | 
						|
        return self._vm == other
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return hash(self._vm)
 | 
						|
 | 
						|
    def start(self, start_guid=True):
 | 
						|
        return self._loop.run_until_complete(
 | 
						|
            self._vm.start(start_guid=start_guid))
 | 
						|
 | 
						|
    def shutdown(self):
 | 
						|
        return self._loop.run_until_complete(self._vm.shutdown())
 | 
						|
 | 
						|
    def run(self, command, wait=False, user=None, passio_popen=False,
 | 
						|
            passio_stderr=False, **kwargs):
 | 
						|
        if wait:
 | 
						|
            try:
 | 
						|
                self._loop.run_until_complete(
 | 
						|
                    self._vm.run_for_stdio(command, user=user))
 | 
						|
            except subprocess.CalledProcessError as err:
 | 
						|
                return err.returncode
 | 
						|
            return 0
 | 
						|
        elif passio_popen:
 | 
						|
            p = self._loop.run_until_complete(self._vm.run(command, user=user,
 | 
						|
                stdin=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
                stderr=subprocess.PIPE if passio_stderr else None))
 | 
						|
            return ProcessWrapper(p, self._loop)
 | 
						|
        else:
 | 
						|
            asyncio.ensure_future(self._vm.run_for_stdio(command, user=user),
 | 
						|
                loop=self._loop)
 | 
						|
 | 
						|
    def run_service(self, service, wait=True, input=None, user=None,
 | 
						|
            passio_popen=False,
 | 
						|
            passio_stderr=False, **kwargs):
 | 
						|
        if wait:
 | 
						|
            try:
 | 
						|
                if isinstance(input, str):
 | 
						|
                    input = input.encode()
 | 
						|
                self._loop.run_until_complete(
 | 
						|
                    self._vm.run_service_for_stdio(service,
 | 
						|
                        input=input, user=user))
 | 
						|
            except subprocess.CalledProcessError as err:
 | 
						|
                return err.returncode
 | 
						|
            return 0
 | 
						|
        elif passio_popen:
 | 
						|
            p = self._loop.run_until_complete(self._vm.run_service(service,
 | 
						|
                user=user,
 | 
						|
                stdin=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
                stderr=subprocess.PIPE if passio_stderr else None))
 | 
						|
            return ProcessWrapper(p, self._loop)
 | 
						|
 | 
						|
 | 
						|
class ExtraTestCase(qubes.tests.SystemTestCase):
 | 
						|
 | 
						|
    template = None
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(ExtraTestCase, self).setUp()
 | 
						|
        self.init_default_template(self.template)
 | 
						|
        if self.template is not None:
 | 
						|
            # also use this template for DispVMs
 | 
						|
            dispvm_base = self.app.add_new_vm('AppVM',
 | 
						|
                name=self.make_vm_name('dvm'),
 | 
						|
                template=self.template, label='red', template_for_dispvms=True)
 | 
						|
            self.loop.run_until_complete(dispvm_base.create_on_disk())
 | 
						|
            self.app.default_dispvm = dispvm_base
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.app.default_dispvm = None
 | 
						|
        super(ExtraTestCase, self).tearDown()
 | 
						|
 | 
						|
    def create_vms(self, names):
 | 
						|
        """
 | 
						|
        Create AppVMs for the duration of the test. Will be automatically
 | 
						|
        removed after completing the test.
 | 
						|
        :param names: list of VM names to create (each of them will be
 | 
						|
        prefixed with some test specific string)
 | 
						|
        :return: list of created VM objects
 | 
						|
        """
 | 
						|
        if self.template:
 | 
						|
            template = self.app.domains[self.template]
 | 
						|
        else:
 | 
						|
            template = self.app.default_template
 | 
						|
        for vmname in names:
 | 
						|
            vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
 | 
						|
                                    name=self.make_vm_name(vmname),
 | 
						|
                                    template=template,
 | 
						|
                                    label='red')
 | 
						|
            self.loop.run_until_complete(vm.create_on_disk())
 | 
						|
        self.app.save()
 | 
						|
 | 
						|
        # get objects after reload
 | 
						|
        vms = []
 | 
						|
        for vmname in names:
 | 
						|
            vms.append(VMWrapper(self.app.domains[self.make_vm_name(vmname)],
 | 
						|
                loop=self.loop))
 | 
						|
        return vms
 | 
						|
 | 
						|
    def enable_network(self):
 | 
						|
        """
 | 
						|
        Enable access to the network. Must be called before creating VMs.
 | 
						|
        """
 | 
						|
        self.init_networking()
 | 
						|
 | 
						|
    def qrexec_policy(self, service, source, destination, allow=True):
 | 
						|
        """
 | 
						|
        Allow qrexec calls for duration of the test
 | 
						|
        :param service: service name
 | 
						|
        :param source: source VM name
 | 
						|
        :param destination: destination VM name
 | 
						|
        :return:
 | 
						|
        """
 | 
						|
 | 
						|
        def add_remove_rule(add=True):
 | 
						|
            with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
 | 
						|
                policy_rules = policy.readlines()
 | 
						|
                rule = "{} {} {}\n".format(source, destination,
 | 
						|
                                              'allow' if allow else 'deny')
 | 
						|
                if add:
 | 
						|
                    policy_rules.insert(0, rule)
 | 
						|
                else:
 | 
						|
                    policy_rules.remove(rule)
 | 
						|
                policy.truncate(0)
 | 
						|
                policy.seek(0)
 | 
						|
                policy.write(''.join(policy_rules))
 | 
						|
        add_remove_rule(add=True)
 | 
						|
        self.addCleanup(add_remove_rule, add=False)
 | 
						|
 | 
						|
 | 
						|
def load_tests(loader, tests, pattern):
 | 
						|
    include_list = None
 | 
						|
    if 'QUBES_TEST_EXTRA_INCLUDE' in os.environ:
 | 
						|
        include_list = os.environ['QUBES_TEST_EXTRA_INCLUDE'].split()
 | 
						|
    exclude_list = []
 | 
						|
    if 'QUBES_TEST_EXTRA_EXCLUDE' in os.environ:
 | 
						|
        exclude_list = os.environ['QUBES_TEST_EXTRA_EXCLUDE'].split()
 | 
						|
 | 
						|
    for entry in pkg_resources.iter_entry_points('qubes.tests.extra'):
 | 
						|
        if include_list is not None and entry.name not in include_list:
 | 
						|
            continue
 | 
						|
        if entry.name in exclude_list:
 | 
						|
            continue
 | 
						|
        try:
 | 
						|
            for test_case in entry.load()():
 | 
						|
                tests.addTests(loader.loadTestsFromNames([
 | 
						|
                    '{}.{}'.format(test_case.__module__, test_case.__name__)]))
 | 
						|
        except Exception as err:  # pylint: disable=broad-except
 | 
						|
            def runTest(self):
 | 
						|
                raise err
 | 
						|
            ExtraLoadFailure = type('ExtraLoadFailure',
 | 
						|
                (qubes.tests.QubesTestCase,),
 | 
						|
                {entry.name: runTest})
 | 
						|
            tests.addTest(ExtraLoadFailure(entry.name))
 | 
						|
 | 
						|
    for entry in pkg_resources.iter_entry_points(
 | 
						|
            'qubes.tests.extra.for_template'):
 | 
						|
        if include_list is not None and entry.name not in include_list:
 | 
						|
            continue
 | 
						|
        if entry.name in exclude_list:
 | 
						|
            continue
 | 
						|
        try:
 | 
						|
            for test_case in entry.load()():
 | 
						|
                tests.addTests(loader.loadTestsFromNames(
 | 
						|
                    qubes.tests.create_testcases_for_templates(
 | 
						|
                        test_case.__name__, test_case,
 | 
						|
                        module=sys.modules[test_case.__module__])))
 | 
						|
        except Exception as err:  # pylint: disable=broad-except
 | 
						|
            def runTest(self):
 | 
						|
                raise err
 | 
						|
            ExtraForTemplateLoadFailure = type('ExtraForTemplateLoadFailure',
 | 
						|
                (qubes.tests.QubesTestCase,),
 | 
						|
                {entry.name: runTest})
 | 
						|
            tests.addTest(ExtraForTemplateLoadFailure(entry.name))
 | 
						|
 | 
						|
    return tests
 |