|
@@ -71,13 +71,11 @@ TEMPLATE = 'fedora-23'
|
|
|
VMPREFIX = 'test-inst-'
|
|
|
CLSVMPREFIX = 'test-cls-'
|
|
|
|
|
|
-
|
|
|
if 'DEFAULT_LVM_POOL' in os.environ.keys():
|
|
|
DEFAULT_LVM_POOL = os.environ['DEFAULT_LVM_POOL']
|
|
|
else:
|
|
|
DEFAULT_LVM_POOL = 'qubes_dom0/pool00'
|
|
|
|
|
|
-
|
|
|
POOL_CONF = {'name': 'test-lvm',
|
|
|
'driver': 'lvm_thin',
|
|
|
'volume_group': DEFAULT_LVM_POOL.split('/')[0],
|
|
@@ -92,6 +90,7 @@ in_git = False
|
|
|
|
|
|
try:
|
|
|
import libvirt
|
|
|
+
|
|
|
libvirt.openReadOnly(qubes.config.defaults['libvirt_uri']).close()
|
|
|
in_dom0 = True
|
|
|
except libvirt.libvirtError:
|
|
@@ -99,9 +98,8 @@ except libvirt.libvirtError:
|
|
|
|
|
|
if in_dom0:
|
|
|
import libvirtaio
|
|
|
- libvirt_event_impl = libvirtaio.virEventRegisterAsyncIOImpl()
|
|
|
-else:
|
|
|
- libvirt_event_impl = None
|
|
|
+
|
|
|
+libvirt_event_impl = None
|
|
|
|
|
|
try:
|
|
|
in_git = subprocess.check_output(
|
|
@@ -118,37 +116,40 @@ except OSError:
|
|
|
|
|
|
ha_syslog = None
|
|
|
|
|
|
+
|
|
|
def skipUnlessDom0(test_item):
|
|
|
- '''Decorator that skips test outside dom0.
|
|
|
+ """Decorator that skips test outside dom0.
|
|
|
|
|
|
Some tests (especially integration tests) have to be run in more or less
|
|
|
working dom0. This is checked by connecting to libvirt.
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
|
|
|
|
|
|
+
|
|
|
def skipUnlessGit(test_item):
|
|
|
- '''Decorator that skips test outside git repo.
|
|
|
+ """Decorator that skips test outside git repo.
|
|
|
|
|
|
There are very few tests that an be run only in git. One example is
|
|
|
correctness of example code that won't get included in RPM.
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
return unittest.skipUnless(in_git, 'outside git tree')(test_item)
|
|
|
|
|
|
+
|
|
|
def skipUnlessEnv(varname):
|
|
|
- '''Decorator generator for skipping tests without environment variable set.
|
|
|
+ """Decorator generator for skipping tests without environment variable set.
|
|
|
|
|
|
Some tests require working X11 display, like those using GTK library, which
|
|
|
segfaults without connection to X.
|
|
|
Other require their own, custom variables.
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
return unittest.skipUnless(os.getenv(varname), 'no {} set'.format(varname))
|
|
|
|
|
|
|
|
|
class TestEmitter(qubes.events.Emitter):
|
|
|
- '''Dummy event emitter which records events fired on it.
|
|
|
+ """Dummy event emitter which records events fired on it.
|
|
|
|
|
|
Events are counted in :py:attr:`fired_events` attribute, which is
|
|
|
:py:class:`collections.Counter` instance. For each event, ``(event, args,
|
|
@@ -162,7 +163,7 @@ class TestEmitter(qubes.events.Emitter):
|
|
|
>>> emitter.fire_event('event', spam='eggs', foo='bar')
|
|
|
>>> emitter.fired_events
|
|
|
Counter({('event', (1, 2, 3), (('foo', 'bar'), ('spam', 'eggs'))): 1})
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super(TestEmitter, self).__init__(*args, **kwargs)
|
|
@@ -174,9 +175,9 @@ class TestEmitter(qubes.events.Emitter):
|
|
|
effects = super(TestEmitter, self).fire_event(event, **kwargs)
|
|
|
ev_kwargs = frozenset(
|
|
|
(key,
|
|
|
- frozenset(value.items()) if isinstance(value, dict)
|
|
|
- else tuple(value) if isinstance(value, list)
|
|
|
- else value)
|
|
|
+ frozenset(value.items()) if isinstance(value, dict)
|
|
|
+ else tuple(value) if isinstance(value, list)
|
|
|
+ else value)
|
|
|
for key, value in kwargs.items()
|
|
|
)
|
|
|
self.fired_events[(event, ev_kwargs)] += 1
|
|
@@ -188,7 +189,7 @@ class TestEmitter(qubes.events.Emitter):
|
|
|
event, pre_event=pre_event, **kwargs)
|
|
|
ev_kwargs = frozenset(
|
|
|
(key,
|
|
|
- frozenset(value.items()) if isinstance(value, dict) else value)
|
|
|
+ frozenset(value.items()) if isinstance(value, dict) else value)
|
|
|
for key, value in kwargs.items()
|
|
|
)
|
|
|
self.fired_events[(event, ev_kwargs)] += 1
|
|
@@ -202,6 +203,7 @@ def expectedFailureIfTemplate(templates):
|
|
|
handle both 'whonix-ws' and 'whonix-gw'.
|
|
|
templates can be either a single string, or an iterable
|
|
|
"""
|
|
|
+
|
|
|
def decorator(func):
|
|
|
@functools.wraps(func)
|
|
|
def wrapper(self, *args, **kwargs):
|
|
@@ -219,7 +221,9 @@ def expectedFailureIfTemplate(templates):
|
|
|
else:
|
|
|
# Call directly:
|
|
|
func(self, *args, **kwargs)
|
|
|
+
|
|
|
return wrapper
|
|
|
+
|
|
|
return decorator
|
|
|
|
|
|
|
|
@@ -243,10 +247,12 @@ def wait_on_fail(func):
|
|
|
sys.stdout.flush()
|
|
|
reader = asyncio.StreamReader(loop=self.loop)
|
|
|
transport, protocol = self.loop.run_until_complete(
|
|
|
- self.loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader),
|
|
|
- sys.stdin))
|
|
|
+ self.loop.connect_read_pipe(
|
|
|
+ lambda: asyncio.StreamReaderProtocol(reader),
|
|
|
+ sys.stdin))
|
|
|
self.loop.run_until_complete(reader.readline())
|
|
|
raise
|
|
|
+
|
|
|
return wrapper
|
|
|
|
|
|
|
|
@@ -254,7 +260,7 @@ class _AssertNotRaisesContext(object):
|
|
|
"""A context manager used to implement TestCase.assertNotRaises methods.
|
|
|
|
|
|
Stolen from unittest and hacked. Regexp support stripped.
|
|
|
- """ # pylint: disable=too-few-public-methods
|
|
|
+ """ # pylint: disable=too-few-public-methods
|
|
|
|
|
|
def __init__(self, expected, test_case, expected_regexp=None):
|
|
|
if expected_regexp is not None:
|
|
@@ -265,11 +271,9 @@ class _AssertNotRaisesContext(object):
|
|
|
|
|
|
self.failureException = test_case.failureException
|
|
|
|
|
|
-
|
|
|
def __enter__(self):
|
|
|
return self
|
|
|
|
|
|
-
|
|
|
def __exit__(self, exc_type, exc_value, tb):
|
|
|
if exc_type is None:
|
|
|
return True
|
|
@@ -284,8 +288,9 @@ class _AssertNotRaisesContext(object):
|
|
|
|
|
|
self.exception = exc_value # store for later retrieval
|
|
|
|
|
|
+
|
|
|
class _QrexecPolicyContext(object):
|
|
|
- '''Context manager for SystemTestCase.qrexec_policy'''
|
|
|
+ """Context manager for SystemTestCase.qrexec_policy"""
|
|
|
|
|
|
def __init__(self, service, source, destination, allow=True, action=None):
|
|
|
try:
|
|
@@ -345,8 +350,9 @@ class _QrexecPolicyContext(object):
|
|
|
self.close()
|
|
|
self._filename.unlink()
|
|
|
|
|
|
+
|
|
|
class substitute_entry_points(object):
|
|
|
- '''Monkey-patch pkg_resources to substitute one group in iter_entry_points
|
|
|
+ """Monkey-patch pkg_resources to substitute one group in iter_entry_points
|
|
|
|
|
|
This is for testing plugins, like device classes.
|
|
|
|
|
@@ -358,7 +364,7 @@ class substitute_entry_points(object):
|
|
|
|
|
|
This context manager is stackable. To substitute more than one entry point
|
|
|
group, just nest two contexts.
|
|
|
- ''' # pylint: disable=invalid-name
|
|
|
+ """ # pylint: disable=invalid-name
|
|
|
|
|
|
def __init__(self, group, tempgroup):
|
|
|
self.group = group
|
|
@@ -381,8 +387,8 @@ class substitute_entry_points(object):
|
|
|
|
|
|
|
|
|
class QubesTestCase(unittest.TestCase):
|
|
|
- '''Base class for Qubes unit tests.
|
|
|
- '''
|
|
|
+ """Base class for Qubes unit tests.
|
|
|
+ """
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super(QubesTestCase, self).__init__(*args, **kwargs)
|
|
@@ -392,10 +398,14 @@ class QubesTestCase(unittest.TestCase):
|
|
|
self.__class__.__name__,
|
|
|
self._testMethodName))
|
|
|
self.addTypeEqualityFunc(qubes.devices.DeviceManager,
|
|
|
- self.assertDevicesEqual)
|
|
|
+ self.assertDevicesEqual)
|
|
|
|
|
|
self.loop = None
|
|
|
|
|
|
+ global libvirt_event_impl
|
|
|
+
|
|
|
+ if in_dom0 and not libvirt_event_impl:
|
|
|
+ libvirt_event_impl = libvirtaio.virEventRegisterAsyncIOImpl()
|
|
|
|
|
|
def __str__(self):
|
|
|
return '{}/{}/{}'.format(
|
|
@@ -403,7 +413,6 @@ class QubesTestCase(unittest.TestCase):
|
|
|
self.__class__.__name__,
|
|
|
self._testMethodName)
|
|
|
|
|
|
-
|
|
|
def setUp(self):
|
|
|
super().setUp()
|
|
|
self.addCleanup(self.cleanup_gc)
|
|
@@ -413,11 +422,11 @@ class QubesTestCase(unittest.TestCase):
|
|
|
self.addCleanup(self.cleanup_traceback)
|
|
|
|
|
|
def cleanup_traceback(self):
|
|
|
- '''Remove local variables reference from tracebacks to allow garbage
|
|
|
+ """Remove local variables reference from tracebacks to allow garbage
|
|
|
collector to clean all Qubes*() objects, otherwise file descriptors
|
|
|
- held by them will leak'''
|
|
|
+ held by them will leak"""
|
|
|
exc_infos = [e for test_case, e in self._outcome.errors
|
|
|
- if test_case is self]
|
|
|
+ if test_case is self]
|
|
|
if self._outcome.expectedFailure:
|
|
|
exc_infos.append(self._outcome.expectedFailure)
|
|
|
for exc_info in exc_infos:
|
|
@@ -433,16 +442,17 @@ class QubesTestCase(unittest.TestCase):
|
|
|
def cleanup_gc(self):
|
|
|
gc.collect()
|
|
|
leaked = [obj for obj in gc.get_objects() + gc.garbage
|
|
|
- if isinstance(obj,
|
|
|
- (qubes.Qubes, qubes.vm.BaseVM,
|
|
|
- libvirt.virConnect, libvirt.virDomain))]
|
|
|
+ if isinstance(obj,
|
|
|
+ (qubes.Qubes, qubes.vm.BaseVM,
|
|
|
+ libvirt.virConnect, libvirt.virDomain))]
|
|
|
|
|
|
if leaked:
|
|
|
try:
|
|
|
import objgraph
|
|
|
objgraph.show_backrefs(leaked,
|
|
|
- max_depth=15, extra_info=extra_info,
|
|
|
- filename='/tmp/objgraph-{}.png'.format(self.id()))
|
|
|
+ max_depth=15, extra_info=extra_info,
|
|
|
+ filename='/tmp/objgraph-{}.png'.format(
|
|
|
+ self.id()))
|
|
|
except ImportError:
|
|
|
pass
|
|
|
|
|
@@ -451,7 +461,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
assert not leaked
|
|
|
|
|
|
def cleanup_loop(self):
|
|
|
- '''Check if the loop is empty'''
|
|
|
+ """Check if the loop is empty"""
|
|
|
# XXX BEWARE this is touching undocumented, implementation-specific
|
|
|
# attributes of the loop. This is most certainly unsupported and likely
|
|
|
# will break when messing with: Python version, kernel family, loop
|
|
@@ -488,7 +498,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
# lifecycle, and it is unwatched only at loop.close(); so we cannot just
|
|
|
# check selector for non-emptiness
|
|
|
assert len(self.loop._selector.get_map()) \
|
|
|
- == int(self.loop._ssock is not None)
|
|
|
+ == int(self.loop._ssock is not None)
|
|
|
|
|
|
del self.loop
|
|
|
|
|
@@ -521,15 +531,14 @@ class QubesTestCase(unittest.TestCase):
|
|
|
with context:
|
|
|
callableObj(*args, **kwargs)
|
|
|
|
|
|
-
|
|
|
def assertXMLEqual(self, xml1, xml2, msg=''):
|
|
|
- '''Check for equality of two XML objects.
|
|
|
+ """Check for equality of two XML objects.
|
|
|
|
|
|
:param xml1: first element
|
|
|
:param xml2: second element
|
|
|
:type xml1: :py:class:`lxml.etree._Element`
|
|
|
:type xml2: :py:class:`lxml.etree._Element`
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
self.assertEqual(xml1.tag, xml2.tag)
|
|
|
msg += '/' + str(xml1.tag)
|
|
@@ -557,7 +566,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
)
|
|
|
|
|
|
def assertEventFired(self, subject, event, kwargs=None):
|
|
|
- '''Check whether event was fired on given emitter and fail if it did
|
|
|
+ """Check whether event was fired on given emitter and fail if it did
|
|
|
not.
|
|
|
|
|
|
:param subject: emitter which is being checked
|
|
@@ -565,7 +574,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
:param str event: event identifier
|
|
|
:param dict kwargs: when given, all items must appear in kwargs passed \
|
|
|
to an event
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
will_not_match = object()
|
|
|
for ev, ev_kwargs in subject.fired_events:
|
|
@@ -574,7 +583,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
if kwargs is not None:
|
|
|
ev_kwargs = dict(ev_kwargs)
|
|
|
if any(ev_kwargs.get(k, will_not_match) != v
|
|
|
- for k, v in kwargs.items()):
|
|
|
+ for k, v in kwargs.items()):
|
|
|
continue
|
|
|
|
|
|
return
|
|
@@ -582,16 +591,15 @@ class QubesTestCase(unittest.TestCase):
|
|
|
self.fail('event {!r} {}did not fire on {!r}'.format(
|
|
|
event, ('' if kwargs is None else '{!r} '.format(kwargs)), subject))
|
|
|
|
|
|
-
|
|
|
def assertEventNotFired(self, subject, event, kwargs=None):
|
|
|
- '''Check whether event was fired on given emitter. Fail if it did.
|
|
|
+ """Check whether event was fired on given emitter. Fail if it did.
|
|
|
|
|
|
:param subject: emitter which is being checked
|
|
|
:type emitter: :py:class:`TestEmitter`
|
|
|
:param str event: event identifier
|
|
|
:param list kwargs: when given, all items must appear in kwargs passed \
|
|
|
to an event
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
will_not_match = object()
|
|
|
for ev, ev_kwargs in subject.fired_events:
|
|
@@ -600,7 +608,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
if kwargs is not None:
|
|
|
ev_kwargs = dict(ev_kwargs)
|
|
|
if any(ev_kwargs.get(k, will_not_match) != v
|
|
|
- for k, v in kwargs.items()):
|
|
|
+ for k, v in kwargs.items()):
|
|
|
continue
|
|
|
|
|
|
self.fail('event {!r} {}did fire on {!r}'.format(
|
|
@@ -610,9 +618,8 @@ class QubesTestCase(unittest.TestCase):
|
|
|
|
|
|
return
|
|
|
|
|
|
-
|
|
|
def assertXMLIsValid(self, xml, file=None, schema=None):
|
|
|
- '''Check whether given XML fulfills Relax NG schema.
|
|
|
+ """Check whether given XML fulfills Relax NG schema.
|
|
|
|
|
|
Schema can be given in a couple of ways:
|
|
|
|
|
@@ -625,7 +632,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
:param lxml.etree._Element xml: XML element instance to check
|
|
|
:param str file: filename of Relax NG schema
|
|
|
:param str schema: optional explicit schema string
|
|
|
- ''' # pylint: disable=redefined-builtin
|
|
|
+ """ # pylint: disable=redefined-builtin
|
|
|
|
|
|
if schema is not None and file is None:
|
|
|
relaxng = schema
|
|
@@ -649,7 +656,7 @@ class QubesTestCase(unittest.TestCase):
|
|
|
|
|
|
else:
|
|
|
raise TypeError("There should be excactly one of 'file' and "
|
|
|
- "'schema' arguments specified.")
|
|
|
+ "'schema' arguments specified.")
|
|
|
|
|
|
# We have to be extra careful here in case someone messed up with
|
|
|
# self.failureException. It should by default be AssertionError, just
|
|
@@ -691,6 +698,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
Such (group of) test need to take care about
|
|
|
:py:meth:`TestCase.tearDownClass` implementation itself.
|
|
|
"""
|
|
|
+
|
|
|
# noinspection PyAttributeOutsideInit
|
|
|
def setUp(self):
|
|
|
if not in_dom0:
|
|
@@ -803,11 +811,10 @@ class SystemTestCase(QubesTestCase):
|
|
|
|
|
|
self.app.default_netvm = str(default_netvm)
|
|
|
|
|
|
-
|
|
|
def _find_pool(self, volume_group, thin_pool):
|
|
|
- ''' Returns the pool matching the specified ``volume_group`` &
|
|
|
+ """ Returns the pool matching the specified ``volume_group`` &
|
|
|
``thin_pool``, or None.
|
|
|
- '''
|
|
|
+ """
|
|
|
pools = [p for p in self.app.pools
|
|
|
if issubclass(p.__class__, qubes.storage.lvm.ThinPool)]
|
|
|
for pool in pools:
|
|
@@ -821,7 +828,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
path = "/dev/mapper/{!s}-{!s}".format(volume_group, thin_pool)
|
|
|
if not os.path.exists(path):
|
|
|
self.skipTest('LVM thin pool {!r} does not exist'.
|
|
|
- format(DEFAULT_LVM_POOL))
|
|
|
+ format(DEFAULT_LVM_POOL))
|
|
|
self.pool = self._find_pool(volume_group, thin_pool)
|
|
|
if not self.pool:
|
|
|
self.pool = self.loop.run_until_complete(
|
|
@@ -865,23 +872,21 @@ class SystemTestCase(QubesTestCase):
|
|
|
|
|
|
self._remove_vm_disk(vmname)
|
|
|
|
|
|
-
|
|
|
@staticmethod
|
|
|
def _remove_vm_libvirt(dom):
|
|
|
try:
|
|
|
dom.destroy()
|
|
|
- except libvirt.libvirtError: # not running
|
|
|
+ except libvirt.libvirtError: # not running
|
|
|
pass
|
|
|
dom.undefine()
|
|
|
|
|
|
-
|
|
|
@staticmethod
|
|
|
def _remove_vm_disk(vmname):
|
|
|
for dirspec in (
|
|
|
'qubes_appvms_dir',
|
|
|
'qubes_templates_dir'):
|
|
|
dirpath = os.path.join(qubes.config.qubes_base_dir,
|
|
|
- qubes.config.system_path[dirspec], vmname)
|
|
|
+ qubes.config.system_path[dirspec], vmname)
|
|
|
if os.path.exists(dirpath):
|
|
|
if os.path.isdir(dirpath):
|
|
|
shutil.rmtree(dirpath)
|
|
@@ -890,21 +895,21 @@ class SystemTestCase(QubesTestCase):
|
|
|
|
|
|
@staticmethod
|
|
|
def _remove_vm_disk_lvm(prefix=VMPREFIX):
|
|
|
- ''' Remove LVM volumes with given prefix
|
|
|
+ """ Remove LVM volumes with given prefix
|
|
|
|
|
|
This is "a bit" drastic, as it removes volumes regardless of volume
|
|
|
group, thin pool etc. But we assume no important data on test system.
|
|
|
- '''
|
|
|
+ """
|
|
|
try:
|
|
|
volumes = subprocess.check_output(
|
|
|
['lvs', '--noheadings', '-o', 'vg_name,name',
|
|
|
- '--separator', '/']).decode()
|
|
|
+ '--separator', '/']).decode()
|
|
|
if ('/vm-' + prefix) not in volumes:
|
|
|
return
|
|
|
subprocess.check_call(['sudo', 'lvremove', '-f'] +
|
|
|
- [vol.strip() for vol in volumes.splitlines()
|
|
|
- if ('/vm-' + prefix) in vol],
|
|
|
- stdout=subprocess.DEVNULL)
|
|
|
+ [vol.strip() for vol in volumes.splitlines()
|
|
|
+ if ('/vm-' + prefix) in vol],
|
|
|
+ stdout=subprocess.DEVNULL)
|
|
|
except subprocess.CalledProcessError:
|
|
|
pass
|
|
|
|
|
@@ -972,10 +977,10 @@ class SystemTestCase(QubesTestCase):
|
|
|
vm.startup_lock.release()
|
|
|
|
|
|
def remove_test_vms(self, xmlpath=XMLPATH, prefix=VMPREFIX):
|
|
|
- '''Aggressively remove any domain that has name in testing namespace.
|
|
|
+ """Aggressively remove any domain that has name in testing namespace.
|
|
|
|
|
|
:param prefix: name prefix of VMs to remove, can be a list of prefixes
|
|
|
- '''
|
|
|
+ """
|
|
|
|
|
|
if isinstance(prefix, str):
|
|
|
prefixes = [prefix]
|
|
@@ -994,9 +999,11 @@ class SystemTestCase(QubesTestCase):
|
|
|
except AttributeError:
|
|
|
host_app = qubes.Qubes()
|
|
|
self.remove_vms([vm for vm in app.domains
|
|
|
- if any(vm.name.startswith(prefix) for prefix in prefixes) or
|
|
|
- (isinstance(vm, qubes.vm.dispvm.DispVM) and vm.name
|
|
|
- not in host_app.domains)])
|
|
|
+ if any(
|
|
|
+ vm.name.startswith(prefix) for prefix in prefixes) or
|
|
|
+ (isinstance(vm,
|
|
|
+ qubes.vm.dispvm.DispVM) and vm.name
|
|
|
+ not in host_app.domains)])
|
|
|
if not hasattr(self, 'host_app'):
|
|
|
host_app.close()
|
|
|
del host_app
|
|
@@ -1020,7 +1027,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
'qubes_appvms_dir',
|
|
|
'qubes_templates_dir'):
|
|
|
dirpath = os.path.join(qubes.config.qubes_base_dir,
|
|
|
- qubes.config.system_path[dirspec])
|
|
|
+ qubes.config.system_path[dirspec])
|
|
|
if not os.path.exists(dirpath):
|
|
|
continue
|
|
|
for name in os.listdir(dirpath):
|
|
@@ -1032,7 +1039,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
self._remove_vm_disk_lvm(prefix)
|
|
|
|
|
|
def qrexec_policy(self, service, source, destination, allow=True,
|
|
|
- action=None):
|
|
|
+ action=None):
|
|
|
"""
|
|
|
Allow qrexec calls for duration of the test
|
|
|
:param service: service name
|
|
@@ -1044,7 +1051,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
"""
|
|
|
|
|
|
return _QrexecPolicyContext(service, source, destination,
|
|
|
- allow=allow, action=action)
|
|
|
+ allow=allow, action=action)
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
def wait_for_window_hide_coro(self, title, winid, timeout=30):
|
|
@@ -1055,7 +1062,8 @@ class SystemTestCase(QubesTestCase):
|
|
|
"""
|
|
|
wait_count = 0
|
|
|
while subprocess.call(['xdotool', 'getwindowname', str(winid)],
|
|
|
- stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) == 0:
|
|
|
+ stdout=subprocess.DEVNULL,
|
|
|
+ stderr=subprocess.STDOUT) == 0:
|
|
|
wait_count += 1
|
|
|
if wait_count > timeout * 10:
|
|
|
self.fail("Timeout while waiting for {}({}) window to "
|
|
@@ -1064,7 +1072,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
def wait_for_window_coro(self, title, search_class=False, timeout=30,
|
|
|
- show=True):
|
|
|
+ show=True):
|
|
|
"""
|
|
|
Wait for a window with a given title. Depending on show parameter,
|
|
|
it will wait for either window to show or to disappear.
|
|
@@ -1087,12 +1095,12 @@ class SystemTestCase(QubesTestCase):
|
|
|
if not show:
|
|
|
try:
|
|
|
winid = subprocess.check_output(xdotool_search + [title],
|
|
|
- stderr=subprocess.DEVNULL).decode()
|
|
|
+ stderr=subprocess.DEVNULL).decode()
|
|
|
except subprocess.CalledProcessError:
|
|
|
# already gone
|
|
|
return
|
|
|
yield from self.wait_for_window_hide_coro(winid, title,
|
|
|
- timeout=timeout)
|
|
|
+ timeout=timeout)
|
|
|
return
|
|
|
|
|
|
winid = None
|
|
@@ -1147,7 +1155,8 @@ class SystemTestCase(QubesTestCase):
|
|
|
|
|
|
def shutdown_and_wait(self, vm, timeout=60):
|
|
|
try:
|
|
|
- self.loop.run_until_complete(vm.shutdown(wait=True, timeout=timeout))
|
|
|
+ self.loop.run_until_complete(
|
|
|
+ vm.shutdown(wait=True, timeout=timeout))
|
|
|
except qubes.exc.QubesException:
|
|
|
name = vm.name
|
|
|
del vm
|
|
@@ -1162,14 +1171,14 @@ class SystemTestCase(QubesTestCase):
|
|
|
self.skipTest('dracut not installed')
|
|
|
# create a single partition
|
|
|
p = subprocess.Popen(['sfdisk', '-q', '-L', vm.storage.root_img],
|
|
|
- stdin=subprocess.PIPE,
|
|
|
- stdout=subprocess.DEVNULL,
|
|
|
- stderr=subprocess.STDOUT)
|
|
|
+ stdin=subprocess.PIPE,
|
|
|
+ stdout=subprocess.DEVNULL,
|
|
|
+ stderr=subprocess.STDOUT)
|
|
|
p.communicate('2048,\n')
|
|
|
assert p.returncode == 0, 'sfdisk failed'
|
|
|
# TODO: check if root_img is really file, not already block device
|
|
|
p = subprocess.Popen(['sudo', 'losetup', '-f', '-P', '--show',
|
|
|
- vm.storage.root_img], stdout=subprocess.PIPE)
|
|
|
+ vm.storage.root_img], stdout=subprocess.PIPE)
|
|
|
(loopdev, _) = p.communicate()
|
|
|
loopdev = loopdev.strip()
|
|
|
looppart = loopdev + 'p1'
|
|
@@ -1179,11 +1188,11 @@ class SystemTestCase(QubesTestCase):
|
|
|
subprocess.check_call(['sudo', 'mount', looppart, mountpoint])
|
|
|
try:
|
|
|
subprocess.check_call(['sudo', 'grub2-install',
|
|
|
- '--target', 'i386-pc',
|
|
|
- '--modules', 'part_msdos ext2',
|
|
|
- '--boot-directory', mountpoint, loopdev],
|
|
|
- stderr=subprocess.DEVNULL
|
|
|
- )
|
|
|
+ '--target', 'i386-pc',
|
|
|
+ '--modules', 'part_msdos ext2',
|
|
|
+ '--boot-directory', mountpoint, loopdev],
|
|
|
+ stderr=subprocess.DEVNULL
|
|
|
+ )
|
|
|
grub_cfg = '{}/grub2/grub.cfg'.format(mountpoint)
|
|
|
subprocess.check_call(
|
|
|
['sudo', 'chown', '-R', os.getlogin(), mountpoint])
|
|
@@ -1216,7 +1225,7 @@ class SystemTestCase(QubesTestCase):
|
|
|
dracut_args += ['--install', ' '.join(extra_files)]
|
|
|
subprocess.check_call(
|
|
|
['dracut'] + dracut_args + [os.path.join(mountpoint,
|
|
|
- 'initrd')],
|
|
|
+ 'initrd')],
|
|
|
stderr=subprocess.DEVNULL
|
|
|
)
|
|
|
finally:
|
|
@@ -1225,9 +1234,9 @@ class SystemTestCase(QubesTestCase):
|
|
|
subprocess.check_call(['sudo', 'losetup', '-d', loopdev])
|
|
|
|
|
|
def create_bootable_iso(self):
|
|
|
- '''Create simple bootable ISO image.
|
|
|
+ """Create simple bootable ISO image.
|
|
|
Type 'poweroff' to it to terminate that VM.
|
|
|
- '''
|
|
|
+ """
|
|
|
isolinux_cfg = (
|
|
|
'prompt 1\n'
|
|
|
'label poweroff\n'
|
|
@@ -1242,13 +1251,13 @@ class SystemTestCase(QubesTestCase):
|
|
|
with open(os.path.join(tmp_dir, 'isolinux.cfg'), 'w') as cfg:
|
|
|
cfg.write(isolinux_cfg)
|
|
|
subprocess.check_call(['genisoimage', '-o', output_path,
|
|
|
- '-c', 'boot.cat',
|
|
|
- '-b', 'isolinux.bin',
|
|
|
- '-no-emul-boot',
|
|
|
- '-boot-load-size', '4',
|
|
|
- '-boot-info-table',
|
|
|
- '-q',
|
|
|
- tmp_dir])
|
|
|
+ '-c', 'boot.cat',
|
|
|
+ '-b', 'isolinux.bin',
|
|
|
+ '-no-emul-boot',
|
|
|
+ '-boot-load-size', '4',
|
|
|
+ '-boot-info-table',
|
|
|
+ '-q',
|
|
|
+ tmp_dir])
|
|
|
except FileNotFoundError:
|
|
|
self.skipTest('syslinux or genisoimage not installed')
|
|
|
os.close(output_fd)
|
|
@@ -1279,8 +1288,10 @@ class SystemTestCase(QubesTestCase):
|
|
|
|
|
|
|
|
|
_templates = None
|
|
|
+
|
|
|
+
|
|
|
def list_templates():
|
|
|
- '''Returns tuple of template names available in the system.'''
|
|
|
+ """Returns tuple of template names available in the system."""
|
|
|
global _templates
|
|
|
if _templates is None:
|
|
|
if 'QUBES_TEST_TEMPLATES' in os.environ:
|
|
@@ -1289,16 +1300,18 @@ def list_templates():
|
|
|
try:
|
|
|
app = qubes.Qubes()
|
|
|
_templates = tuple(vm.name for vm in app.domains
|
|
|
- if isinstance(vm, qubes.vm.templatevm.TemplateVM) and
|
|
|
- vm.features.get('os', None) != 'Windows')
|
|
|
+ if isinstance(vm,
|
|
|
+ qubes.vm.templatevm.TemplateVM) and
|
|
|
+ vm.features.get('os', None) != 'Windows')
|
|
|
app.close()
|
|
|
del app
|
|
|
except OSError:
|
|
|
_templates = ()
|
|
|
return _templates
|
|
|
|
|
|
+
|
|
|
def create_testcases_for_templates(name, *bases, module, **kwds):
|
|
|
- '''Do-it-all helper for generating per-template tests via load_tests proto
|
|
|
+ """Do-it-all helper for generating per-template tests via load_tests proto
|
|
|
|
|
|
This does several things:
|
|
|
- creates per-template classes
|
|
@@ -1324,7 +1337,7 @@ def create_testcases_for_templates(name, *bases, module, **kwds):
|
|
|
*NOTE* adding ``module=sys.modules[__name__]`` is *mandatory*, and to allow
|
|
|
enforcing this, it uses keyword-only argument syntax, which is only in
|
|
|
Python 3.
|
|
|
- '''
|
|
|
+ """
|
|
|
# Do not attempt to grab the module from traceback, since we are actually
|
|
|
# a generator and loadTestsFromNames may also be a generator, so it's not
|
|
|
# possible to correctly guess frame from stack. Explicit is better than
|
|
@@ -1340,8 +1353,9 @@ def create_testcases_for_templates(name, *bases, module, **kwds):
|
|
|
setattr(module, clsname, cls)
|
|
|
yield '.'.join((module.__name__, clsname))
|
|
|
|
|
|
+
|
|
|
def maybe_create_testcases_on_import(create_testcases_gen):
|
|
|
- '''If certain conditions are met, call *create_testcases_gen* to create
|
|
|
+ """If certain conditions are met, call *create_testcases_gen* to create
|
|
|
testcases for templates tests. The purpose is to use it on integration
|
|
|
tests module(s) import, so the test runner could discover tests without
|
|
|
using load tests protocol.
|
|
@@ -1350,17 +1364,18 @@ def maybe_create_testcases_on_import(create_testcases_gen):
|
|
|
- QUBES_TEST_TEMPLATES present in the environment (it's possible to
|
|
|
create test cases without opening qubes.xml)
|
|
|
- QUBES_TEST_LOAD_ALL present in the environment
|
|
|
- '''
|
|
|
+ """
|
|
|
if 'QUBES_TEST_TEMPLATES' in os.environ or \
|
|
|
'QUBES_TEST_LOAD_ALL' in os.environ:
|
|
|
list(create_testcases_gen())
|
|
|
|
|
|
+
|
|
|
def extra_info(obj):
|
|
|
- '''Return short info identifying object.
|
|
|
+ """Return short info identifying object.
|
|
|
|
|
|
For example, if obj is a qube, return its name. This is for use with
|
|
|
:py:mod:`objgraph` package.
|
|
|
- '''
|
|
|
+ """
|
|
|
# Feel free to extend to other cases.
|
|
|
|
|
|
if isinstance(obj, qubes.vm.qubesvm.QubesVM):
|
|
@@ -1373,7 +1388,8 @@ def extra_info(obj):
|
|
|
|
|
|
return ''
|
|
|
|
|
|
-def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
|
|
+
|
|
|
+def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
|
|
# discard any tests from this module, because it hosts base classes
|
|
|
tests = unittest.TestSuite()
|
|
|
|
|
@@ -1403,7 +1419,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
|
|
'qubes.tests.api_misc',
|
|
|
'qubes.tests.api_internal',
|
|
|
'qubes.tests.rpc_import',
|
|
|
- ):
|
|
|
+ ):
|
|
|
tests.addTests(loader.loadTestsFromName(modname))
|
|
|
|
|
|
tests.addTests(loader.discover(
|
|
@@ -1434,7 +1450,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
|
|
|
|
|
# external modules
|
|
|
'qubes.tests.extra',
|
|
|
- ):
|
|
|
+ ):
|
|
|
tests.addTests(loader.loadTestsFromName(modname))
|
|
|
|
|
|
return tests
|