12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466 |
- # pylint: disable=invalid-name
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
- # Copyright (C) 2014-2015
- # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
- # Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- """
- .. warning::
- The test suite hereby claims any domain whose name starts with
- :py:data:`VMPREFIX` as fair game. This is needed to enforce sane
- test executing environment. If you have domains named ``test-*``,
- don't run the tests.
- """
- import asyncio
- import collections
- import functools
- import logging
- import os
- import pathlib
- import shlex
- import shutil
- import subprocess
- import sys
- import tempfile
- import time
- import traceback
- import unittest
- import warnings
- from distutils import spawn
- import gc
- import lxml.etree
- import pkg_resources
- import qubes
- import qubes.api
- import qubes.api.admin
- import qubes.api.internal
- import qubes.api.misc
- import qubes.backup
- import qubes.config
- import qubes.devices
- import qubes.events
- import qubes.exc
- import qubes.ext.pci
- import qubes.vm.standalonevm
- import qubes.vm.templatevm
- XMLPATH = '/var/lib/qubes/qubes-test.xml'
- CLASS_XMLPATH = '/var/lib/qubes/qubes-class-test.xml'
- TEMPLATE = 'fedora-23'
- VMPREFIX = 'test-inst-'
- CLSVMPREFIX = 'test-cls-'
- if 'DEFAULT_LVM_POOL' in os.environ.keys():
- DEFAULT_LVM_POOL = os.environ['DEFAULT_LVM_POOL']
- else:
- DEFAULT_LVM_POOL = 'qubes_dom0/pool00'
- POOL_CONF = {'name': 'test-lvm',
- 'driver': 'lvm_thin',
- 'volume_group': DEFAULT_LVM_POOL.split('/')[0],
- 'thin_pool': DEFAULT_LVM_POOL.split('/')[1]}
- #: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise
- in_dom0 = False
- #: :py:obj:`False` if outside of git repo,
- #: path to root of the directory otherwise
- in_git = False
- try:
- import libvirt
- libvirt.openReadOnly(qubes.config.defaults['libvirt_uri']).close()
- in_dom0 = True
- except libvirt.libvirtError:
- pass
- if in_dom0:
- import libvirtaio
- libvirt_event_impl = None
- try:
- in_git = subprocess.check_output(
- ['git', 'rev-parse', '--show-toplevel'], stderr=subprocess.DEVNULL
- ).decode().strip()
- except subprocess.CalledProcessError:
- # git returned nonzero, we are outside git repo
- pass
- except OSError:
- # command not found; let's assume we're outside
- pass
- ha_syslog = None
- def skipUnlessDom0(test_item):
- """Decorator that skips test outside dom0.
- Some tests (especially integration tests) have to be run in more or less
- working dom0. This is checked by connecting to libvirt.
- """
- return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
- def skipUnlessGit(test_item):
- """Decorator that skips test outside git repo.
- There are very few tests that an be run only in git. One example is
- correctness of example code that won't get included in RPM.
- """
- return unittest.skipUnless(in_git, 'outside git tree')(test_item)
- def skipUnlessEnv(varname):
- """Decorator generator for skipping tests without environment variable set.
- Some tests require working X11 display, like those using GTK library, which
- segfaults without connection to X.
- Other require their own, custom variables.
- """
- return unittest.skipUnless(os.getenv(varname), 'no {} set'.format(varname))
- class TestEmitter(qubes.events.Emitter):
- """Dummy event emitter which records events fired on it.
- Events are counted in :py:attr:`fired_events` attribute, which is
- :py:class:`collections.Counter` instance. For each event, ``(event, args,
- kwargs)`` object is counted. *event* is event name (a string), *args* is
- tuple with positional arguments and *kwargs* is sorted tuple of items from
- keyword arguments.
- >>> emitter = TestEmitter()
- >>> emitter.fired_events
- Counter()
- >>> emitter.fire_event('event', spam='eggs', foo='bar')
- >>> emitter.fired_events
- Counter({('event', (1, 2, 3), (('foo', 'bar'), ('spam', 'eggs'))): 1})
- """
- def __init__(self, *args, **kwargs):
- super(TestEmitter, self).__init__(*args, **kwargs)
- #: :py:class:`collections.Counter` instance
- self.fired_events = collections.Counter()
- def fire_event(self, event, **kwargs):
- effects = super(TestEmitter, self).fire_event(event, **kwargs)
- ev_kwargs = frozenset(
- (key,
- frozenset(value.items()) if isinstance(value, dict)
- else tuple(value) if isinstance(value, list)
- else value)
- for key, value in kwargs.items()
- )
- self.fired_events[(event, ev_kwargs)] += 1
- return effects
- @asyncio.coroutine
- def fire_event_async(self, event, pre_event=False, **kwargs):
- effects = yield from super(TestEmitter, self).fire_event_async(
- event, pre_event=pre_event, **kwargs)
- ev_kwargs = frozenset(
- (key,
- frozenset(value.items()) if isinstance(value, dict) else value)
- for key, value in kwargs.items()
- )
- self.fired_events[(event, ev_kwargs)] += 1
- return effects
- def expectedFailureIfTemplate(templates):
- """
- Decorator for marking specific test as expected to fail only for some
- templates. Template name is compared as substring, so 'whonix' will
- handle both 'whonix-ws' and 'whonix-gw'.
- templates can be either a single string, or an iterable
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(self, *args, **kwargs):
- template = self.template
- if isinstance(templates, str):
- should_expect_fail = template in templates
- else:
- should_expect_fail = any([template in x for x in templates])
- if should_expect_fail:
- try:
- func(self, *args, **kwargs)
- except Exception:
- raise unittest.case._ExpectedFailure(sys.exc_info())
- raise unittest.case._UnexpectedSuccess()
- else:
- # Call directly:
- func(self, *args, **kwargs)
- return wrapper
- return decorator
- def wait_on_fail(func):
- """Test decorator for debugging. It pause test execution on failure and wait
- for user input. It's useful to manually inspect system state just after test
- fails, before executing any cleanup.
- Usage: decorate a test you are debugging.
- DO IT ONLY TEMPORARILY, DO NOT COMMIT!
- """
- @functools.wraps(func)
- def wrapper(self, *args, **kwargs):
- try:
- func(self, *args, **kwargs)
- except:
- print('FAIL\n')
- traceback.print_exc()
- print('Press return to continue:', end='')
- sys.stdout.flush()
- reader = asyncio.StreamReader(loop=self.loop)
- transport, protocol = self.loop.run_until_complete(
- self.loop.connect_read_pipe(
- lambda: asyncio.StreamReaderProtocol(reader),
- os.fdopen(os.dup(sys.stdin.fileno()))))
- self.loop.run_until_complete(reader.readline())
- transport.close()
- raise
- return wrapper
- class _AssertNotRaisesContext(object):
- """A context manager used to implement TestCase.assertNotRaises methods.
- Stolen from unittest and hacked. Regexp support stripped.
- """ # pylint: disable=too-few-public-methods
- def __init__(self, expected, test_case, expected_regexp=None):
- if expected_regexp is not None:
- raise NotImplementedError('expected_regexp is unsupported')
- self.expected = expected
- self.exception = None
- self.failureException = test_case.failureException
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_value, tb):
- if exc_type is None:
- return True
- if issubclass(exc_type, self.expected):
- raise self.failureException(
- "{!r} raised, traceback:\n{!s}".format(
- exc_value, ''.join(traceback.format_tb(tb))))
- else:
- # pass through
- return False
- self.exception = exc_value # store for later retrieval
- class _QrexecPolicyContext(object):
- """Context manager for SystemTestCase.qrexec_policy"""
- def __init__(self, service, source, destination, allow=True, action=None):
- try:
- source = source.name
- except AttributeError:
- pass
- try:
- destination = destination.name
- except AttributeError:
- pass
- self._filename = pathlib.Path('/etc/qubes-rpc/policy') / service
- if action is None:
- action = 'allow' if allow else 'deny'
- self._rule = '{} {} {}\n'.format(source, destination, action)
- self._did_create = False
- self._handle = None
- def load(self):
- if self._handle is None:
- try:
- self._handle = self._filename.open('r+')
- except FileNotFoundError:
- self._handle = self._filename.open('w+')
- self._did_create = True
- self._handle.seek(0)
- return self._handle.readlines()
- def save(self, rules):
- assert self._handle is not None
- self._handle.truncate(0)
- self._handle.seek(0)
- self._handle.write(''.join(rules))
- self._handle.flush()
- def close(self):
- assert self._handle is not None
- self._handle.close()
- self._handle = None
- def __enter__(self):
- rules = self.load()
- rules.insert(0, self._rule)
- self.save(rules)
- return self
- def __exit__(self, exc_type, exc_value, tb):
- if not self._did_create:
- try:
- rules = self.load()
- rules.remove(self._rule)
- self.save(rules)
- finally:
- self.close()
- else:
- self.close()
- self._filename.unlink()
- class substitute_entry_points(object):
- """Monkey-patch pkg_resources to substitute one group in iter_entry_points
- This is for testing plugins, like device classes.
- :param str group: The group that is to be overloaded.
- :param str tempgroup: The substitute group.
- Inside this context, if one iterates over entry points in overloaded group,
- the iteration actually happens over the other group.
- This context manager is stackable. To substitute more than one entry point
- group, just nest two contexts.
- """ # pylint: disable=invalid-name
- def __init__(self, group, tempgroup):
- self.group = group
- self.tempgroup = tempgroup
- self._orig_iter_entry_points = None
- def _iter_entry_points(self, group, *args, **kwargs):
- if group == self.group:
- group = self.tempgroup
- return self._orig_iter_entry_points(group, *args, **kwargs)
- def __enter__(self):
- self._orig_iter_entry_points = pkg_resources.iter_entry_points
- pkg_resources.iter_entry_points = self._iter_entry_points
- return self
- def __exit__(self, exc_type, exc_value, tb):
- pkg_resources.iter_entry_points = self._orig_iter_entry_points
- self._orig_iter_entry_points = None
- class QubesTestCase(unittest.TestCase):
- """Base class for Qubes unit tests.
- """
- def __init__(self, *args, **kwargs):
- super(QubesTestCase, self).__init__(*args, **kwargs)
- self.longMessage = True
- self.log = logging.getLogger('{}.{}.{}'.format(
- self.__class__.__module__,
- self.__class__.__name__,
- self._testMethodName))
- self.addTypeEqualityFunc(qubes.devices.DeviceManager,
- self.assertDevicesEqual)
- self.loop = None
- global libvirt_event_impl
- if in_dom0 and not libvirt_event_impl:
- libvirt_event_impl = libvirtaio.virEventRegisterAsyncIOImpl()
- def __str__(self):
- return '{}/{}/{}'.format(
- self.__class__.__module__,
- self.__class__.__name__,
- self._testMethodName)
- def setUp(self):
- super().setUp()
- self.addCleanup(self.cleanup_gc)
- self.loop = asyncio.get_event_loop()
- self.addCleanup(self.cleanup_loop)
- self.addCleanup(self.cleanup_traceback)
- def cleanup_traceback(self):
- """Remove local variables reference from tracebacks to allow garbage
- collector to clean all Qubes*() objects, otherwise file descriptors
- held by them will leak"""
- exc_infos = [e for test_case, e in self._outcome.errors
- if test_case is self]
- if self._outcome.expectedFailure:
- exc_infos.append(self._outcome.expectedFailure)
- for exc_info in exc_infos:
- if exc_info is None:
- continue
- ex = exc_info[1]
- while ex is not None:
- if isinstance(ex, qubes.exc.QubesVMError):
- ex.vm = None
- traceback.clear_frames(ex.__traceback__)
- ex = ex.__context__
- def cleanup_gc(self):
- gc.collect()
- leaked = [obj for obj in gc.get_objects() + gc.garbage
- if isinstance(obj,
- (qubes.Qubes, qubes.vm.BaseVM,
- libvirt.virConnect, libvirt.virDomain))]
- if leaked:
- try:
- import objgraph
- objgraph.show_backrefs(leaked,
- max_depth=15, extra_info=extra_info,
- filename='/tmp/objgraph-{}.png'.format(
- self.id()))
- except ImportError:
- pass
- # do not keep leaked object references in locals()
- leaked = bool(leaked)
- assert not leaked
- def cleanup_loop(self):
- """Check if the loop is empty"""
- # XXX BEWARE this is touching undocumented, implementation-specific
- # attributes of the loop. This is most certainly unsupported and likely
- # will break when messing with: Python version, kernel family, loop
- # implementation, a combination thereof, or other things.
- # KEYWORDS for searching:
- # win32, SelectorEventLoop, ProactorEventLoop, uvloop, gevent
- global libvirt_event_impl
- # really destroy all objects that could have used loop and/or libvirt
- gc.collect()
- # Check for unfinished libvirt business.
- if libvirt_event_impl is not None:
- try:
- self.loop.run_until_complete(asyncio.wait_for(
- libvirt_event_impl.drain(), timeout=4))
- except asyncio.TimeoutError:
- raise AssertionError('libvirt event impl drain timeout')
- # this is stupid, but apparently it requires two passes
- # to cleanup SIGCHLD handlers
- self.loop.stop()
- self.loop.run_forever()
- self.loop.stop()
- self.loop.run_forever()
- # Check there are no Tasks left.
- assert not self.loop._ready
- assert not self.loop._scheduled
- # Check the loop watches no descriptors.
- # NOTE the loop has a pipe for self-interrupting, created once per
- # lifecycle, and it is unwatched only at loop.close(); so we cannot just
- # check selector for non-emptiness
- assert len(self.loop._selector.get_map()) \
- == int(self.loop._ssock is not None)
- del self.loop
- def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
- """Fail if an exception of class excClass is raised
- by callableObj when invoked with arguments args and keyword
- arguments kwargs. If a different type of exception is
- raised, it will not be caught, and the test case will be
- deemed to have suffered an error, exactly as for an
- unexpected exception.
- If called with callableObj omitted or None, will return a
- context object used like this::
- with self.assertRaises(SomeException):
- do_something()
- The context manager keeps a reference to the exception as
- the 'exception' attribute. This allows you to inspect the
- exception after the assertion::
- with self.assertRaises(SomeException) as cm:
- do_something()
- the_exception = cm.exception
- self.assertEqual(the_exception.error_code, 3)
- """
- context = _AssertNotRaisesContext(excClass, self)
- if callableObj is None:
- return context
- with context:
- callableObj(*args, **kwargs)
- def assertXMLEqual(self, xml1, xml2, msg=''):
- """Check for equality of two XML objects.
- :param xml1: first element
- :param xml2: second element
- :type xml1: :py:class:`lxml.etree._Element`
- :type xml2: :py:class:`lxml.etree._Element`
- """
- self.assertEqual(xml1.tag, xml2.tag)
- msg += '/' + str(xml1.tag)
- if xml1.text is not None and xml2.text is not None:
- self.assertEqual(xml1.text.strip(), xml2.text.strip(), msg)
- else:
- self.assertEqual(xml1.text, xml2.text, msg)
- self.assertCountEqual(xml1.keys(), xml2.keys(), msg)
- for key in xml1.keys():
- self.assertEqual(xml1.get(key), xml2.get(key), msg)
- self.assertEqual(len(xml1), len(xml2), msg + ' children count')
- for child1, child2 in zip(xml1, xml2):
- self.assertXMLEqual(child1, child2, msg=msg)
- def assertDevicesEqual(self, devices1, devices2, msg=None):
- self.assertEqual(devices1.keys(), devices2.keys(), msg)
- for dev_class in devices1.keys():
- self.assertEqual(
- [str(dev) for dev in devices1[dev_class]],
- [str(dev) for dev in devices2[dev_class]],
- "Devices of class {} differs{}".format(
- dev_class, (": " + msg) if msg else "")
- )
- def assertEventFired(self, subject, event, kwargs=None):
- """Check whether event was fired on given emitter and fail if it did
- not.
- :param subject: emitter which is being checked
- :type emitter: :py:class:`TestEmitter`
- :param str event: event identifier
- :param dict kwargs: when given, all items must appear in kwargs passed \
- to an event
- """
- will_not_match = object()
- for ev, ev_kwargs in subject.fired_events:
- if ev != event:
- continue
- if kwargs is not None:
- ev_kwargs = dict(ev_kwargs)
- if any(ev_kwargs.get(k, will_not_match) != v
- for k, v in kwargs.items()):
- continue
- return
- self.fail('event {!r} {}did not fire on {!r}'.format(
- event, ('' if kwargs is None else '{!r} '.format(kwargs)), subject))
- def assertEventNotFired(self, subject, event, kwargs=None):
- """Check whether event was fired on given emitter. Fail if it did.
- :param subject: emitter which is being checked
- :type emitter: :py:class:`TestEmitter`
- :param str event: event identifier
- :param list kwargs: when given, all items must appear in kwargs passed \
- to an event
- """
- will_not_match = object()
- for ev, ev_kwargs in subject.fired_events:
- if ev != event:
- continue
- if kwargs is not None:
- ev_kwargs = dict(ev_kwargs)
- if any(ev_kwargs.get(k, will_not_match) != v
- for k, v in kwargs.items()):
- continue
- self.fail('event {!r} {}did fire on {!r}'.format(
- event,
- ('' if kwargs is None else '{!r} '.format(kwargs)),
- subject))
- return
- def assertXMLIsValid(self, xml, file=None, schema=None):
- """Check whether given XML fulfills Relax NG schema.
- Schema can be given in a couple of ways:
- - As separate file. This is most common, and also the only way to
- handle file inclusion. Call with file name as second argument.
- - As string containing actual schema. Put that string in *schema*
- keyword argument.
- :param lxml.etree._Element xml: XML element instance to check
- :param str file: filename of Relax NG schema
- :param str schema: optional explicit schema string
- """ # pylint: disable=redefined-builtin
- if schema is not None and file is None:
- relaxng = schema
- if isinstance(relaxng, str):
- relaxng = lxml.etree.XML(relaxng)
- # pylint: disable=protected-access
- if isinstance(relaxng, lxml.etree._Element):
- relaxng = lxml.etree.RelaxNG(relaxng)
- elif file is not None and schema is None:
- if not os.path.isabs(file):
- basedirs = ['/usr/share/doc/qubes/relaxng']
- if in_git:
- basedirs.insert(0, os.path.join(in_git, 'relaxng'))
- for basedir in basedirs:
- abspath = os.path.join(basedir, file)
- if os.path.exists(abspath):
- file = abspath
- break
- relaxng = lxml.etree.RelaxNG(file=file)
- else:
- raise TypeError("There should be excactly one of 'file' and "
- "'schema' arguments specified.")
- # We have to be extra careful here in case someone messed up with
- # self.failureException. It should by default be AssertionError, just
- # what is spewed by RelaxNG(), but who knows what might happen.
- try:
- relaxng.assert_(xml)
- except self.failureException:
- raise
- except AssertionError as e:
- self.fail(str(e))
- @staticmethod
- def make_vm_name(name, class_teardown=False):
- if class_teardown:
- return CLSVMPREFIX + name
- else:
- return VMPREFIX + name
- class SystemTestCase(QubesTestCase):
- """
- Mixin for integration tests. All the tests here should use self.app
- object and when need qubes.xml path - should use :py:data:`XMLPATH`
- defined in this file.
- Every VM created by test, must use :py:meth:`SystemTestCase.make_vm_name`
- for VM name.
- By default self.app represents empty collection, if anything is needed
- there from the real collection it can be imported from self.host_app in
- :py:meth:`SystemTestCase.setUp`. But *can not be modified* in any way -
- this include both changing attributes in
- :py:attr:`SystemTestCase.host_app` and modifying files of such imported
- VM. If test need to make some modification, it must clone the VM first.
- If some group of tests needs class-wide initialization, first of all the
- author should consider if it is really needed. But if so, setUpClass can
- be used to create Qubes(CLASS_XMLPATH) object and create/import required
- stuff there. VMs created in :py:meth:`TestCase.setUpClass` should
- use self.make_vm_name('...', class_teardown=True) for name creation.
- Such (group of) test need to take care about
- :py:meth:`TestCase.tearDownClass` implementation itself.
- """
- # noinspection PyAttributeOutsideInit
- def setUp(self):
- if not in_dom0:
- self.skipTest('outside dom0')
- super(SystemTestCase, self).setUp()
- self.remove_test_vms()
- global ha_syslog
- if ha_syslog is None:
- ha_syslog = logging.handlers.SysLogHandler('/dev/log')
- ha_syslog.setFormatter(
- logging.Formatter('%(name)s[%(process)d]: %(message)s'))
- logging.root.addHandler(ha_syslog)
- self.log.critical('starting')
- # need some information from the real qubes.xml - at least installed
- # templates; should not be used for testing, only to initialize self.app
- self.host_app = qubes.Qubes(os.path.join(
- qubes.config.qubes_base_dir,
- qubes.config.system_path['qubes_store_filename']))
- if os.path.exists(CLASS_XMLPATH):
- shutil.copy(CLASS_XMLPATH, XMLPATH)
- else:
- shutil.copy(self.host_app.store, XMLPATH)
- self.app = qubes.Qubes(XMLPATH)
- os.environ['QUBES_XML_PATH'] = XMLPATH
- self.app.register_event_handlers()
- self.qubesd = self.loop.run_until_complete(
- qubes.api.create_servers(
- qubes.api.admin.QubesAdminAPI,
- qubes.api.internal.QubesInternalAPI,
- qubes.api.misc.QubesMiscAPI,
- app=self.app, debug=True))
- self.addCleanup(self.cleanup_app)
- self.app.add_handler('domain-delete', self.close_qdb_on_remove)
- def close_qdb_on_remove(self, app, event, vm, **kwargs):
- # only close QubesDB connection, do not perform other (destructive)
- # actions of vm.close()
- if vm._qdb_connection_watch is not None:
- asyncio.get_event_loop().remove_reader(
- vm._qdb_connection_watch.watch_fd())
- vm._qdb_connection_watch.close()
- vm._qdb_connection_watch = None
- def cleanup_app(self):
- self.remove_test_vms()
- server = None
- for server in self.qubesd:
- for sock in server.sockets:
- os.unlink(sock.getsockname())
- server.close()
- del server
- # close all existing connections, especially this will interrupt
- # running admin.Events calls, which do keep reference to Qubes() and
- # libvirt connection
- conn = None
- for conn in qubes.api.QubesDaemonProtocol.connections:
- if conn.transport:
- conn.transport.abort()
- del conn
- self.loop.run_until_complete(asyncio.wait([
- server.wait_closed() for server in self.qubesd]))
- del self.qubesd
- # remove all references to any complex qubes objects, to release
- # resources - most importantly file descriptors; this object will live
- # during the whole test run, but all the file descriptors would be
- # depleted earlier
- self.app.close()
- self.host_app.close()
- del self.app
- del self.host_app
- for attr in dir(self):
- obj_type = type(getattr(self, attr))
- if obj_type.__module__.startswith('qubes'):
- delattr(self, attr)
- # then trigger garbage collector to really destroy those objects
- gc.collect()
- def init_default_template(self, template=None):
- if template is None:
- template = self.host_app.default_template
- elif isinstance(template, str):
- template = self.host_app.domains[template]
- self.app.default_template = str(template)
- def init_networking(self):
- if not self.app.default_template:
- self.skipTest('Default template required for testing networking')
- default_netvm = self.host_app.default_netvm
- # if testing Whonix Workstation based VMs, try to use sys-whonix instead
- if self.app.default_template.name.startswith('whonix-ws'):
- if 'sys-whonix' in self.host_app.domains:
- default_netvm = self.host_app.domains['sys-whonix']
- if default_netvm is None:
- self.skipTest('Default netvm required')
- if not default_netvm.is_running():
- self.skipTest('VM {} required to be running'.format(
- default_netvm.name))
- self.app.default_netvm = str(default_netvm)
- def _find_pool(self, volume_group, thin_pool):
- """ Returns the pool matching the specified ``volume_group`` &
- ``thin_pool``, or None.
- """
- pools = [p for p in self.app.pools
- if issubclass(p.__class__, qubes.storage.lvm.ThinPool)]
- for pool in pools:
- if pool.volume_group == volume_group \
- and pool.thin_pool == thin_pool:
- return pool
- return None
- def init_lvm_pool(self):
- volume_group, thin_pool = DEFAULT_LVM_POOL.split('/', 1)
- path = "/dev/mapper/{!s}-{!s}".format(volume_group, thin_pool)
- if not os.path.exists(path):
- self.skipTest('LVM thin pool {!r} does not exist'.
- format(DEFAULT_LVM_POOL))
- self.pool = self._find_pool(volume_group, thin_pool)
- if not self.pool:
- self.pool = self.loop.run_until_complete(
- self.app.add_pool(**POOL_CONF))
- self.created_pool = True
- def _remove_vm_qubes(self, vm):
- vmname = vm.name
- app = vm.app
- try:
- del app.domains[vm.qid]
- except KeyError:
- pass
- try:
- self.loop.run_until_complete(vm.remove_from_disk())
- except: # pylint: disable=bare-except
- pass
- vm.close()
- del vm
- app.save()
- del app
- # Now ensure it really went away. This may not have happened,
- # for example if vm.libvirt_domain malfunctioned.
- try:
- conn = libvirt.open(qubes.config.defaults['libvirt_uri'])
- except: # pylint: disable=bare-except
- pass
- else:
- try:
- dom = conn.lookupByName(vmname)
- except: # pylint: disable=bare-except
- pass
- else:
- self._remove_vm_libvirt(dom)
- conn.close()
- self._remove_vm_disk(vmname)
- @staticmethod
- def _remove_vm_libvirt(dom):
- try:
- dom.destroy()
- except libvirt.libvirtError: # not running
- pass
- dom.undefine()
- @staticmethod
- def _remove_vm_disk(vmname):
- for dirspec in (
- 'qubes_appvms_dir',
- 'qubes_templates_dir'):
- dirpath = os.path.join(qubes.config.qubes_base_dir,
- qubes.config.system_path[dirspec], vmname)
- if os.path.exists(dirpath):
- if os.path.isdir(dirpath):
- shutil.rmtree(dirpath)
- else:
- os.unlink(dirpath)
- @staticmethod
- def _remove_vm_disk_lvm(prefix=VMPREFIX):
- """ Remove LVM volumes with given prefix
- This is "a bit" drastic, as it removes volumes regardless of volume
- group, thin pool etc. But we assume no important data on test system.
- """
- try:
- volumes = subprocess.check_output(
- ['lvs', '--noheadings', '-o', 'vg_name,name',
- '--separator', '/']).decode()
- if ('/vm-' + prefix) not in volumes:
- return
- subprocess.check_call(['sudo', 'lvremove', '-f'] +
- [vol.strip() for vol in volumes.splitlines()
- if ('/vm-' + prefix) in vol],
- stdout=subprocess.DEVNULL)
- except subprocess.CalledProcessError:
- pass
- def remove_vms(self, vms):
- vms = list(vms)
- if not vms:
- return
- # workaround for https://phabricator.whonix.org/T930
- # unregister all the VMs from sys-whonix, otherwise it will start them
- # again (possibly in further test)
- if hasattr(self, 'app') and 'whonix' in self.app.default_netvm.name:
- for vm in vms:
- try:
- self.loop.run_until_complete(
- self.app.default_netvm.run_service_for_stdio(
- 'whonix.NewStatus+{}_shutdown'.format(vm.name)))
- except:
- pass
- locked_vms = set()
- # first take startup lock
- for vm in vms:
- self.loop.run_until_complete(vm.startup_lock.acquire())
- locked_vms.add(vm)
- # first kill all the domains, to avoid side effects of changing netvm
- for vm in vms:
- try:
- # XXX .is_running() may throw libvirtError if undefined
- if vm.is_running():
- self.loop.run_until_complete(vm._kill_locked())
- except: # pylint: disable=bare-except
- pass
- # break dependencies
- for vm in vms:
- vm.default_dispvm = None
- vm.netvm = None
- # take app instance from any VM to be removed
- app = vms[0].app
- if app.default_dispvm in vms:
- app.default_dispvm = None
- if app.default_netvm in vms:
- app.default_netvm = None
- del app
- # then remove in reverse topological order (wrt template), using naive
- # algorithm
- # this heavily depends on lack of template loops, but those are
- # impossible
- while vms:
- vm = vms.pop(0)
- # make sure that all connected VMs are going to be removed,
- # otherwise this will loop forever
- child_vms = list(getattr(vm, 'appvms', []))
- assert all(x in vms for x in child_vms)
- if child_vms:
- # if still something use this VM, put it at the end of queue
- # and try next one
- vms.append(vm)
- continue
- self._remove_vm_qubes(vm)
- # release startup_lock, if anything was waiting at vm.start(),
- # it will detect the VM is gone
- for vm in locked_vms:
- vm.startup_lock.release()
- def remove_test_vms(self, xmlpath=XMLPATH, prefix=VMPREFIX):
- """Aggressively remove any domain that has name in testing namespace.
- :param prefix: name prefix of VMs to remove, can be a list of prefixes
- """
- if isinstance(prefix, str):
- prefixes = [prefix]
- else:
- prefixes = prefix
- del prefix
- # first, remove them Qubes-way
- if os.path.exists(xmlpath):
- try:
- try:
- app = self.app
- except AttributeError:
- app = qubes.Qubes(xmlpath)
- try:
- host_app = self.host_app
- except AttributeError:
- host_app = qubes.Qubes()
- self.remove_vms([vm for vm in app.domains
- if any(
- vm.name.startswith(prefix) for prefix in prefixes) or
- (isinstance(vm,
- qubes.vm.dispvm.DispVM) and vm.name
- not in host_app.domains)])
- if not hasattr(self, 'host_app'):
- host_app.close()
- del host_app
- if not hasattr(self, 'app'):
- app.close()
- del app
- except qubes.exc.QubesException:
- pass
- os.unlink(xmlpath)
- # now remove what was only in libvirt
- conn = libvirt.open(qubes.config.defaults['libvirt_uri'])
- for dom in conn.listAllDomains():
- if any(dom.name().startswith(prefix) for prefix in prefixes):
- self._remove_vm_libvirt(dom)
- conn.close()
- # finally remove anything that is left on disk
- vmnames = set()
- for dirspec in (
- 'qubes_appvms_dir',
- 'qubes_templates_dir'):
- dirpath = os.path.join(qubes.config.qubes_base_dir,
- qubes.config.system_path[dirspec])
- if not os.path.exists(dirpath):
- continue
- for name in os.listdir(dirpath):
- if any(name.startswith(prefix) for prefix in prefixes):
- vmnames.add(name)
- for vmname in vmnames:
- self._remove_vm_disk(vmname)
- for prefix in prefixes:
- self._remove_vm_disk_lvm(prefix)
- def qrexec_policy(self, service, source, destination, allow=True,
- action=None):
- """
- Allow qrexec calls for duration of the test
- :param service: service name
- :param source: source VM name
- :param destination: destination VM name
- :param allow: add rule with 'allow' action, otherwise 'deny'
- :param action: custom action, if specified *allow* argument is ignored
- :return:
- """
- return _QrexecPolicyContext(service, source, destination,
- allow=allow, action=action)
- @asyncio.coroutine
- def wait_for_window_hide_coro(self, title, winid, timeout=30):
- """
- Wait for window do disappear
- :param winid: window id
- :return:
- """
- wait_count = 0
- while subprocess.call(['xdotool', 'getwindowname', str(winid)],
- stdout=subprocess.DEVNULL,
- stderr=subprocess.STDOUT) == 0:
- wait_count += 1
- if wait_count > timeout * 10:
- self.fail("Timeout while waiting for {}({}) window to "
- "disappear".format(title, winid))
- yield from asyncio.sleep(0.1)
- @asyncio.coroutine
- def wait_for_window_coro(self, title, search_class=False, timeout=30,
- show=True):
- """
- Wait for a window with a given title. Depending on show parameter,
- it will wait for either window to show or to disappear.
- :param title: title of the window to wait for
- :param timeout: timeout of the operation, in seconds
- :param show: if True - wait for the window to be visible,
- otherwise - to not be visible
- :param search_class: search based on window class instead of title
- :return: window id of found window, if show=True
- """
- xdotool_search = ['xdotool', 'search', '--onlyvisible']
- if search_class:
- xdotool_search.append('--class')
- else:
- xdotool_search.append('--name')
- if show:
- xdotool_search.append('--sync')
- if not show:
- try:
- winid = subprocess.check_output(xdotool_search + [title],
- stderr=subprocess.DEVNULL).decode()
- except subprocess.CalledProcessError:
- # already gone
- return
- yield from self.wait_for_window_hide_coro(winid, title,
- timeout=timeout)
- return
- winid = None
- while not winid:
- p = yield from asyncio.create_subprocess_exec(
- *xdotool_search, title,
- stderr=subprocess.DEVNULL, stdout=subprocess.PIPE)
- try:
- (winid, _) = yield from asyncio.wait_for(
- p.communicate(), timeout)
- # don't check exit code, getting winid on stdout is enough
- # indicator of success; specifically ignore xdotool failing
- # with BadWindow or such - when some window appears only for a
- # moment by xdotool didn't manage to get its properties
- except asyncio.TimeoutError:
- self.fail(
- "Timeout while waiting for {} window to show".format(title))
- return winid.decode().strip()
- def wait_for_window(self, *args, **kwargs):
- """
- Wait for a window with a given title. Depending on show parameter,
- it will wait for either window to show or to disappear.
- :param title: title of the window to wait for
- :param timeout: timeout of the operation, in seconds
- :param show: if True - wait for the window to be visible,
- otherwise - to not be visible
- :param search_class: search based on window class instead of title
- :return: window id of found window, if show=True
- """
- return self.loop.run_until_complete(
- self.wait_for_window_coro(*args, **kwargs))
- def enter_keys_in_window(self, title, keys):
- """
- Search for window with given title, then enter listed keys there.
- The function will wait for said window to appear.
- :param title: title of window
- :param keys: list of keys to enter, as for `xdotool key`
- :return: None
- """
- # 'xdotool search --sync' sometimes crashes on some race when
- # accessing window properties
- self.wait_for_window(title)
- command = ['xdotool', 'search', '--name', title,
- 'windowactivate', '--sync',
- 'key'] + keys
- subprocess.check_call(command)
- def shutdown_and_wait(self, vm, timeout=60):
- try:
- self.loop.run_until_complete(
- vm.shutdown(wait=True, timeout=timeout))
- except qubes.exc.QubesException:
- name = vm.name
- del vm
- self.fail("Timeout while waiting for VM {} shutdown".format(name))
- def prepare_hvm_system_linux(self, vm, init_script, extra_files=None):
- if not os.path.exists('/usr/lib/grub/i386-pc'):
- self.skipTest('grub2 not installed')
- if not spawn.find_executable('grub2-install'):
- self.skipTest('grub2-tools not installed')
- if not spawn.find_executable('dracut'):
- self.skipTest('dracut not installed')
- # create a single partition
- p = subprocess.Popen(['sfdisk', '-q', '-L', vm.storage.root_img],
- stdin=subprocess.PIPE,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.STDOUT)
- p.communicate('2048,\n')
- assert p.returncode == 0, 'sfdisk failed'
- # TODO: check if root_img is really file, not already block device
- p = subprocess.Popen(['sudo', 'losetup', '-f', '-P', '--show',
- vm.storage.root_img], stdout=subprocess.PIPE)
- (loopdev, _) = p.communicate()
- loopdev = loopdev.strip()
- looppart = loopdev + 'p1'
- assert p.returncode == 0, 'losetup failed'
- subprocess.check_call(['sudo', 'mkfs.ext2', '-q', '-F', looppart])
- mountpoint = tempfile.mkdtemp()
- subprocess.check_call(['sudo', 'mount', looppart, mountpoint])
- try:
- subprocess.check_call(['sudo', 'grub2-install',
- '--target', 'i386-pc',
- '--modules', 'part_msdos ext2',
- '--boot-directory', mountpoint, loopdev],
- stderr=subprocess.DEVNULL
- )
- grub_cfg = '{}/grub2/grub.cfg'.format(mountpoint)
- subprocess.check_call(
- ['sudo', 'chown', '-R', os.getlogin(), mountpoint])
- with open(grub_cfg, 'w') as f:
- f.write(
- "set timeout=1\n"
- "menuentry 'Default' {\n"
- " linux /vmlinuz root=/dev/xvda1 "
- "rd.driver.blacklist=bochs_drm "
- "rd.driver.blacklist=uhci_hcd console=hvc0\n"
- " initrd /initrd\n"
- "}"
- )
- p = subprocess.Popen(['uname', '-r'], stdout=subprocess.PIPE)
- (kernel_version, _) = p.communicate()
- kernel_version = kernel_version.strip()
- kernel = '/boot/vmlinuz-{}'.format(kernel_version)
- shutil.copy(kernel, os.path.join(mountpoint, 'vmlinuz'))
- init_path = os.path.join(mountpoint, 'init')
- with open(init_path, 'w') as f:
- f.write(init_script)
- os.chmod(init_path, 0o755)
- dracut_args = [
- '--kver', kernel_version,
- '--include', init_path,
- '/usr/lib/dracut/hooks/pre-pivot/initscript.sh',
- '--no-hostonly', '--nolvmconf', '--nomdadmconf',
- ]
- if extra_files:
- dracut_args += ['--install', ' '.join(extra_files)]
- subprocess.check_call(
- ['dracut'] + dracut_args + [os.path.join(mountpoint,
- 'initrd')],
- stderr=subprocess.DEVNULL
- )
- finally:
- subprocess.check_call(['sudo', 'umount', mountpoint])
- shutil.rmtree(mountpoint)
- subprocess.check_call(['sudo', 'losetup', '-d', loopdev])
- def create_bootable_iso(self):
- """Create simple bootable ISO image.
- Type 'poweroff' to it to terminate that VM.
- """
- isolinux_cfg = (
- 'prompt 1\n'
- 'label poweroff\n'
- ' kernel poweroff.c32\n'
- )
- output_fd, output_path = tempfile.mkstemp('.iso')
- with tempfile.TemporaryDirectory() as tmp_dir:
- try:
- shutil.copy('/usr/share/syslinux/isolinux.bin', tmp_dir)
- shutil.copy('/usr/share/syslinux/ldlinux.c32', tmp_dir)
- shutil.copy('/usr/share/syslinux/poweroff.c32', tmp_dir)
- with open(os.path.join(tmp_dir, 'isolinux.cfg'), 'w') as cfg:
- cfg.write(isolinux_cfg)
- subprocess.check_call(['genisoimage', '-o', output_path,
- '-c', 'boot.cat',
- '-b', 'isolinux.bin',
- '-no-emul-boot',
- '-boot-load-size', '4',
- '-boot-info-table',
- '-q',
- tmp_dir])
- except FileNotFoundError:
- self.skipTest('syslinux or genisoimage not installed')
- os.close(output_fd)
- self.addCleanup(os.unlink, output_path)
- return output_path
- def create_local_file(self, filename, content, mode='w'):
- with open(filename, mode) as file:
- file.write(content)
- self.addCleanup(os.unlink, filename)
- def create_remote_file(self, vm, filename, content, mode=0o755):
- self.loop.run_until_complete(vm.run_for_stdio(
- 'cat > {0}; chmod {1:o} {0}'.format(shlex.quote(filename), mode),
- user='root', input=content.encode('utf-8')))
- @asyncio.coroutine
- def wait_for_session(self, vm):
- timeout = vm.qrexec_timeout
- if getattr(vm, 'template', None) and 'whonix-ws' in vm.template.name:
- # first boot of whonix-ws takes more time because of /home
- # initialization, including Tor Browser copying
- timeout = 120
- yield from asyncio.wait_for(
- vm.run_service_for_stdio(
- 'qubes.WaitForSession', input=vm.default_user.encode()),
- timeout=timeout)
- @asyncio.coroutine
- def start_vm(self, vm):
- """Start a VM and wait for it to be fully up"""
- yield from vm.start()
- yield from self.wait_for_session(vm)
- _templates = None
- def list_templates():
- """Returns tuple of template names available in the system."""
- global _templates
- if _templates is None:
- if 'QUBES_TEST_TEMPLATES' in os.environ:
- _templates = os.environ['QUBES_TEST_TEMPLATES'].split()
- if _templates is None:
- try:
- app = qubes.Qubes()
- _templates = tuple(vm.name for vm in app.domains
- if isinstance(vm,
- qubes.vm.templatevm.TemplateVM) and
- vm.features.get('os', None) != 'Windows')
- app.close()
- del app
- except OSError:
- _templates = ()
- return _templates
- def create_testcases_for_templates(name, *bases, module, **kwds):
- """Do-it-all helper for generating per-template tests via load_tests proto
- This does several things:
- - creates per-template classes
- - adds them to module's :py:func:`globals`
- - returns an iterable suitable for passing to loader.loadTestsFromNames
- TestCase classes created by this function have implicit `.template`
- attribute, which contains name of the respective template. They are also
- named with given prefix, underscore and template name. If template name
- contains characters not valid as part of Python identifier, they are
- impossible to get via standard ``.`` operator, though :py:func:`getattr` is
- still usable.
- >>> class MyTestsMixIn:
- ... def test_000_my_test(self):
- ... assert self.template.startswith('debian')
- >>> def load_tests(loader, tests, pattern):
- ... tests.addTests(loader.loadTestsFromNames(
- ... qubes.tests.create_testcases_for_templates(
- ... 'TC_00_MyTests', MyTestsMixIn, qubes.tests.SystemTestCase,
- ... module=sys.modules[__name__])))
- *NOTE* adding ``module=sys.modules[__name__]`` is *mandatory*, and to allow
- enforcing this, it uses keyword-only argument syntax, which is only in
- Python 3.
- """
- # Do not attempt to grab the module from traceback, since we are actually
- # a generator and loadTestsFromNames may also be a generator, so it's not
- # possible to correctly guess frame from stack. Explicit is better than
- # implicit!
- for template in list_templates():
- clsname = name + '_' + template
- if hasattr(module, clsname):
- continue
- cls = type(clsname, bases, {'template': template, **kwds})
- cls.__module__ = module.__name__
- # XXX I wonder what other __dunder__ attrs did I miss
- setattr(module, clsname, cls)
- yield '.'.join((module.__name__, clsname))
- def maybe_create_testcases_on_import(create_testcases_gen):
- """If certain conditions are met, call *create_testcases_gen* to create
- testcases for templates tests. The purpose is to use it on integration
- tests module(s) import, so the test runner could discover tests without
- using load tests protocol.
- The conditions - any of:
- - QUBES_TEST_TEMPLATES present in the environment (it's possible to
- create test cases without opening qubes.xml)
- - QUBES_TEST_LOAD_ALL present in the environment
- """
- if 'QUBES_TEST_TEMPLATES' in os.environ or \
- 'QUBES_TEST_LOAD_ALL' in os.environ:
- list(create_testcases_gen())
- def extra_info(obj):
- """Return short info identifying object.
- For example, if obj is a qube, return its name. This is for use with
- :py:mod:`objgraph` package.
- """
- # Feel free to extend to other cases.
- if isinstance(obj, qubes.vm.qubesvm.QubesVM):
- try:
- return obj.name
- except AttributeError:
- pass
- if isinstance(obj, unittest.TestCase):
- return obj.id()
- return ''
- def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
- # discard any tests from this module, because it hosts base classes
- tests = unittest.TestSuite()
- for modname in (
- # unit tests
- 'qubes.tests.events',
- 'qubes.tests.devices',
- 'qubes.tests.devices_block',
- 'qubes.tests.firewall',
- 'qubes.tests.init',
- 'qubes.tests.vm.init',
- 'qubes.tests.storage',
- 'qubes.tests.storage_file',
- 'qubes.tests.storage_reflink',
- 'qubes.tests.storage_lvm',
- 'qubes.tests.storage_callback',
- 'qubes.tests.storage_kernels',
- 'qubes.tests.ext',
- 'qubes.tests.vm.qubesvm',
- 'qubes.tests.vm.mix.net',
- 'qubes.tests.vm.adminvm',
- 'qubes.tests.vm.appvm',
- 'qubes.tests.vm.dispvm',
- 'qubes.tests.app',
- 'qubes.tests.tarwriter',
- 'qubes.tests.api',
- 'qubes.tests.api_admin',
- 'qubes.tests.api_misc',
- 'qubes.tests.api_internal',
- 'qubes.tests.rpc_import',
- ):
- tests.addTests(loader.loadTestsFromName(modname))
- tests.addTests(loader.discover(
- os.path.join(os.path.dirname(__file__), 'tools')))
- if not in_dom0:
- return tests
- for modname in (
- 'qrexec.tests',
- 'qrexec.tests.cli',
- 'qrexec.tests.gtkhelpers',
- 'qrexec.tests.rpcconfirmation',
- # integration tests
- 'qubes.tests.integ.basic',
- 'qubes.tests.integ.storage',
- 'qubes.tests.integ.grub',
- 'qubes.tests.integ.devices_block',
- 'qubes.tests.integ.devices_pci',
- 'qubes.tests.integ.qrexec',
- 'qubes.tests.integ.dom0_update',
- 'qubes.tests.integ.vm_update',
- 'qubes.tests.integ.network',
- 'qubes.tests.integ.network_ipv6',
- 'qubes.tests.integ.dispvm',
- 'qubes.tests.integ.vm_qrexec_gui',
- 'qubes.tests.integ.mime',
- 'qubes.tests.integ.salt',
- 'qubes.tests.integ.backup',
- 'qubes.tests.integ.backupcompatibility',
- 'qubes.tests.integ.backupdispvm',
- # external modules
- 'qubes.tests.extra',
- ):
- tests.addTests(loader.loadTestsFromName(modname))
- return tests
|