tests: make PEP8 happier
This commit is contained in:
parent
40156c3e78
commit
d8f80c9687
@ -71,13 +71,11 @@ TEMPLATE = 'fedora-23'
|
||||
VMPREFIX = 'test-inst-'
|
||||
CLSVMPREFIX = 'test-cls-'
|
||||
|
||||
|
||||
if 'DEFAULT_LVM_POOL' in os.environ.keys():
|
||||
DEFAULT_LVM_POOL = os.environ['DEFAULT_LVM_POOL']
|
||||
else:
|
||||
DEFAULT_LVM_POOL = 'qubes_dom0/pool00'
|
||||
|
||||
|
||||
POOL_CONF = {'name': 'test-lvm',
|
||||
'driver': 'lvm_thin',
|
||||
'volume_group': DEFAULT_LVM_POOL.split('/')[0],
|
||||
@ -92,6 +90,7 @@ in_git = False
|
||||
|
||||
try:
|
||||
import libvirt
|
||||
|
||||
libvirt.openReadOnly(qubes.config.defaults['libvirt_uri']).close()
|
||||
in_dom0 = True
|
||||
except libvirt.libvirtError:
|
||||
@ -116,37 +115,40 @@ except OSError:
|
||||
|
||||
ha_syslog = None
|
||||
|
||||
|
||||
def skipUnlessDom0(test_item):
|
||||
'''Decorator that skips test outside dom0.
|
||||
"""Decorator that skips test outside dom0.
|
||||
|
||||
Some tests (especially integration tests) have to be run in more or less
|
||||
working dom0. This is checked by connecting to libvirt.
|
||||
'''
|
||||
"""
|
||||
|
||||
return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
|
||||
|
||||
|
||||
def skipUnlessGit(test_item):
|
||||
'''Decorator that skips test outside git repo.
|
||||
"""Decorator that skips test outside git repo.
|
||||
|
||||
There are very few tests that an be run only in git. One example is
|
||||
correctness of example code that won't get included in RPM.
|
||||
'''
|
||||
"""
|
||||
|
||||
return unittest.skipUnless(in_git, 'outside git tree')(test_item)
|
||||
|
||||
|
||||
def skipUnlessEnv(varname):
|
||||
'''Decorator generator for skipping tests without environment variable set.
|
||||
"""Decorator generator for skipping tests without environment variable set.
|
||||
|
||||
Some tests require working X11 display, like those using GTK library, which
|
||||
segfaults without connection to X.
|
||||
Other require their own, custom variables.
|
||||
'''
|
||||
"""
|
||||
|
||||
return unittest.skipUnless(os.getenv(varname), 'no {} set'.format(varname))
|
||||
|
||||
|
||||
class TestEmitter(qubes.events.Emitter):
|
||||
'''Dummy event emitter which records events fired on it.
|
||||
"""Dummy event emitter which records events fired on it.
|
||||
|
||||
Events are counted in :py:attr:`fired_events` attribute, which is
|
||||
:py:class:`collections.Counter` instance. For each event, ``(event, args,
|
||||
@ -160,7 +162,7 @@ class TestEmitter(qubes.events.Emitter):
|
||||
>>> emitter.fire_event('event', spam='eggs', foo='bar')
|
||||
>>> emitter.fired_events
|
||||
Counter({('event', (1, 2, 3), (('foo', 'bar'), ('spam', 'eggs'))): 1})
|
||||
'''
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TestEmitter, self).__init__(*args, **kwargs)
|
||||
@ -172,9 +174,9 @@ class TestEmitter(qubes.events.Emitter):
|
||||
effects = super(TestEmitter, self).fire_event(event, **kwargs)
|
||||
ev_kwargs = frozenset(
|
||||
(key,
|
||||
frozenset(value.items()) if isinstance(value, dict)
|
||||
else tuple(value) if isinstance(value, list)
|
||||
else value)
|
||||
frozenset(value.items()) if isinstance(value, dict)
|
||||
else tuple(value) if isinstance(value, list)
|
||||
else value)
|
||||
for key, value in kwargs.items()
|
||||
)
|
||||
self.fired_events[(event, ev_kwargs)] += 1
|
||||
@ -186,7 +188,7 @@ class TestEmitter(qubes.events.Emitter):
|
||||
event, pre_event=pre_event, **kwargs)
|
||||
ev_kwargs = frozenset(
|
||||
(key,
|
||||
frozenset(value.items()) if isinstance(value, dict) else value)
|
||||
frozenset(value.items()) if isinstance(value, dict) else value)
|
||||
for key, value in kwargs.items()
|
||||
)
|
||||
self.fired_events[(event, ev_kwargs)] += 1
|
||||
@ -200,6 +202,7 @@ def expectedFailureIfTemplate(templates):
|
||||
handle both 'whonix-ws' and 'whonix-gw'.
|
||||
templates can be either a single string, or an iterable
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
@ -217,7 +220,9 @@ def expectedFailureIfTemplate(templates):
|
||||
else:
|
||||
# Call directly:
|
||||
func(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@ -241,10 +246,12 @@ def wait_on_fail(func):
|
||||
sys.stdout.flush()
|
||||
reader = asyncio.StreamReader(loop=self.loop)
|
||||
transport, protocol = self.loop.run_until_complete(
|
||||
self.loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader),
|
||||
sys.stdin))
|
||||
self.loop.connect_read_pipe(
|
||||
lambda: asyncio.StreamReaderProtocol(reader),
|
||||
sys.stdin))
|
||||
self.loop.run_until_complete(reader.readline())
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@ -252,7 +259,7 @@ class _AssertNotRaisesContext(object):
|
||||
"""A context manager used to implement TestCase.assertNotRaises methods.
|
||||
|
||||
Stolen from unittest and hacked. Regexp support stripped.
|
||||
""" # pylint: disable=too-few-public-methods
|
||||
""" # pylint: disable=too-few-public-methods
|
||||
|
||||
def __init__(self, expected, test_case, expected_regexp=None):
|
||||
if expected_regexp is not None:
|
||||
@ -263,11 +270,9 @@ class _AssertNotRaisesContext(object):
|
||||
|
||||
self.failureException = test_case.failureException
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
if exc_type is None:
|
||||
return True
|
||||
@ -282,8 +287,9 @@ class _AssertNotRaisesContext(object):
|
||||
|
||||
self.exception = exc_value # store for later retrieval
|
||||
|
||||
|
||||
class _QrexecPolicyContext(object):
|
||||
'''Context manager for SystemTestCase.qrexec_policy'''
|
||||
"""Context manager for SystemTestCase.qrexec_policy"""
|
||||
|
||||
def __init__(self, service, source, destination, allow=True, action=None):
|
||||
try:
|
||||
@ -343,8 +349,9 @@ class _QrexecPolicyContext(object):
|
||||
self.close()
|
||||
self._filename.unlink()
|
||||
|
||||
|
||||
class substitute_entry_points(object):
|
||||
'''Monkey-patch pkg_resources to substitute one group in iter_entry_points
|
||||
"""Monkey-patch pkg_resources to substitute one group in iter_entry_points
|
||||
|
||||
This is for testing plugins, like device classes.
|
||||
|
||||
@ -356,7 +363,7 @@ class substitute_entry_points(object):
|
||||
|
||||
This context manager is stackable. To substitute more than one entry point
|
||||
group, just nest two contexts.
|
||||
''' # pylint: disable=invalid-name
|
||||
""" # pylint: disable=invalid-name
|
||||
|
||||
def __init__(self, group, tempgroup):
|
||||
self.group = group
|
||||
@ -379,8 +386,8 @@ class substitute_entry_points(object):
|
||||
|
||||
|
||||
class QubesTestCase(unittest.TestCase):
|
||||
'''Base class for Qubes unit tests.
|
||||
'''
|
||||
"""Base class for Qubes unit tests.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(QubesTestCase, self).__init__(*args, **kwargs)
|
||||
@ -390,7 +397,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
self.__class__.__name__,
|
||||
self._testMethodName))
|
||||
self.addTypeEqualityFunc(qubes.devices.DeviceManager,
|
||||
self.assertDevicesEqual)
|
||||
self.assertDevicesEqual)
|
||||
|
||||
self.loop = None
|
||||
|
||||
@ -405,7 +412,6 @@ class QubesTestCase(unittest.TestCase):
|
||||
self.__class__.__name__,
|
||||
self._testMethodName)
|
||||
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.addCleanup(self.cleanup_gc)
|
||||
@ -415,11 +421,11 @@ class QubesTestCase(unittest.TestCase):
|
||||
self.addCleanup(self.cleanup_traceback)
|
||||
|
||||
def cleanup_traceback(self):
|
||||
'''Remove local variables reference from tracebacks to allow garbage
|
||||
"""Remove local variables reference from tracebacks to allow garbage
|
||||
collector to clean all Qubes*() objects, otherwise file descriptors
|
||||
held by them will leak'''
|
||||
held by them will leak"""
|
||||
exc_infos = [e for test_case, e in self._outcome.errors
|
||||
if test_case is self]
|
||||
if test_case is self]
|
||||
if self._outcome.expectedFailure:
|
||||
exc_infos.append(self._outcome.expectedFailure)
|
||||
for exc_info in exc_infos:
|
||||
@ -435,16 +441,17 @@ class QubesTestCase(unittest.TestCase):
|
||||
def cleanup_gc(self):
|
||||
gc.collect()
|
||||
leaked = [obj for obj in gc.get_objects() + gc.garbage
|
||||
if isinstance(obj,
|
||||
(qubes.Qubes, qubes.vm.BaseVM,
|
||||
libvirt.virConnect, libvirt.virDomain))]
|
||||
if isinstance(obj,
|
||||
(qubes.Qubes, qubes.vm.BaseVM,
|
||||
libvirt.virConnect, libvirt.virDomain))]
|
||||
|
||||
if leaked:
|
||||
try:
|
||||
import objgraph
|
||||
objgraph.show_backrefs(leaked,
|
||||
max_depth=15, extra_info=extra_info,
|
||||
filename='/tmp/objgraph-{}.png'.format(self.id()))
|
||||
max_depth=15, extra_info=extra_info,
|
||||
filename='/tmp/objgraph-{}.png'.format(
|
||||
self.id()))
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
@ -453,7 +460,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
assert not leaked
|
||||
|
||||
def cleanup_loop(self):
|
||||
'''Check if the loop is empty'''
|
||||
"""Check if the loop is empty"""
|
||||
# XXX BEWARE this is touching undocumented, implementation-specific
|
||||
# attributes of the loop. This is most certainly unsupported and likely
|
||||
# will break when messing with: Python version, kernel family, loop
|
||||
@ -490,7 +497,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
# lifecycle, and it is unwatched only at loop.close(); so we cannot just
|
||||
# check selector for non-emptiness
|
||||
assert len(self.loop._selector.get_map()) \
|
||||
== int(self.loop._ssock is not None)
|
||||
== int(self.loop._ssock is not None)
|
||||
|
||||
del self.loop
|
||||
|
||||
@ -523,15 +530,14 @@ class QubesTestCase(unittest.TestCase):
|
||||
with context:
|
||||
callableObj(*args, **kwargs)
|
||||
|
||||
|
||||
def assertXMLEqual(self, xml1, xml2, msg=''):
|
||||
'''Check for equality of two XML objects.
|
||||
"""Check for equality of two XML objects.
|
||||
|
||||
:param xml1: first element
|
||||
:param xml2: second element
|
||||
:type xml1: :py:class:`lxml.etree._Element`
|
||||
:type xml2: :py:class:`lxml.etree._Element`
|
||||
'''
|
||||
"""
|
||||
|
||||
self.assertEqual(xml1.tag, xml2.tag)
|
||||
msg += '/' + str(xml1.tag)
|
||||
@ -559,7 +565,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
def assertEventFired(self, subject, event, kwargs=None):
|
||||
'''Check whether event was fired on given emitter and fail if it did
|
||||
"""Check whether event was fired on given emitter and fail if it did
|
||||
not.
|
||||
|
||||
:param subject: emitter which is being checked
|
||||
@ -567,7 +573,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
:param str event: event identifier
|
||||
:param dict kwargs: when given, all items must appear in kwargs passed \
|
||||
to an event
|
||||
'''
|
||||
"""
|
||||
|
||||
will_not_match = object()
|
||||
for ev, ev_kwargs in subject.fired_events:
|
||||
@ -576,7 +582,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
if kwargs is not None:
|
||||
ev_kwargs = dict(ev_kwargs)
|
||||
if any(ev_kwargs.get(k, will_not_match) != v
|
||||
for k, v in kwargs.items()):
|
||||
for k, v in kwargs.items()):
|
||||
continue
|
||||
|
||||
return
|
||||
@ -584,16 +590,15 @@ class QubesTestCase(unittest.TestCase):
|
||||
self.fail('event {!r} {}did not fire on {!r}'.format(
|
||||
event, ('' if kwargs is None else '{!r} '.format(kwargs)), subject))
|
||||
|
||||
|
||||
def assertEventNotFired(self, subject, event, kwargs=None):
|
||||
'''Check whether event was fired on given emitter. Fail if it did.
|
||||
"""Check whether event was fired on given emitter. Fail if it did.
|
||||
|
||||
:param subject: emitter which is being checked
|
||||
:type emitter: :py:class:`TestEmitter`
|
||||
:param str event: event identifier
|
||||
:param list kwargs: when given, all items must appear in kwargs passed \
|
||||
to an event
|
||||
'''
|
||||
"""
|
||||
|
||||
will_not_match = object()
|
||||
for ev, ev_kwargs in subject.fired_events:
|
||||
@ -602,7 +607,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
if kwargs is not None:
|
||||
ev_kwargs = dict(ev_kwargs)
|
||||
if any(ev_kwargs.get(k, will_not_match) != v
|
||||
for k, v in kwargs.items()):
|
||||
for k, v in kwargs.items()):
|
||||
continue
|
||||
|
||||
self.fail('event {!r} {}did fire on {!r}'.format(
|
||||
@ -612,9 +617,8 @@ class QubesTestCase(unittest.TestCase):
|
||||
|
||||
return
|
||||
|
||||
|
||||
def assertXMLIsValid(self, xml, file=None, schema=None):
|
||||
'''Check whether given XML fulfills Relax NG schema.
|
||||
"""Check whether given XML fulfills Relax NG schema.
|
||||
|
||||
Schema can be given in a couple of ways:
|
||||
|
||||
@ -627,7 +631,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
:param lxml.etree._Element xml: XML element instance to check
|
||||
:param str file: filename of Relax NG schema
|
||||
:param str schema: optional explicit schema string
|
||||
''' # pylint: disable=redefined-builtin
|
||||
""" # pylint: disable=redefined-builtin
|
||||
|
||||
if schema is not None and file is None:
|
||||
relaxng = schema
|
||||
@ -651,7 +655,7 @@ class QubesTestCase(unittest.TestCase):
|
||||
|
||||
else:
|
||||
raise TypeError("There should be excactly one of 'file' and "
|
||||
"'schema' arguments specified.")
|
||||
"'schema' arguments specified.")
|
||||
|
||||
# We have to be extra careful here in case someone messed up with
|
||||
# self.failureException. It should by default be AssertionError, just
|
||||
@ -693,6 +697,7 @@ class SystemTestCase(QubesTestCase):
|
||||
Such (group of) test need to take care about
|
||||
:py:meth:`TestCase.tearDownClass` implementation itself.
|
||||
"""
|
||||
|
||||
# noinspection PyAttributeOutsideInit
|
||||
def setUp(self):
|
||||
if not in_dom0:
|
||||
@ -805,11 +810,10 @@ class SystemTestCase(QubesTestCase):
|
||||
|
||||
self.app.default_netvm = str(default_netvm)
|
||||
|
||||
|
||||
def _find_pool(self, volume_group, thin_pool):
|
||||
''' Returns the pool matching the specified ``volume_group`` &
|
||||
""" Returns the pool matching the specified ``volume_group`` &
|
||||
``thin_pool``, or None.
|
||||
'''
|
||||
"""
|
||||
pools = [p for p in self.app.pools
|
||||
if issubclass(p.__class__, qubes.storage.lvm.ThinPool)]
|
||||
for pool in pools:
|
||||
@ -823,7 +827,7 @@ class SystemTestCase(QubesTestCase):
|
||||
path = "/dev/mapper/{!s}-{!s}".format(volume_group, thin_pool)
|
||||
if not os.path.exists(path):
|
||||
self.skipTest('LVM thin pool {!r} does not exist'.
|
||||
format(DEFAULT_LVM_POOL))
|
||||
format(DEFAULT_LVM_POOL))
|
||||
self.pool = self._find_pool(volume_group, thin_pool)
|
||||
if not self.pool:
|
||||
self.pool = self.loop.run_until_complete(
|
||||
@ -867,23 +871,21 @@ class SystemTestCase(QubesTestCase):
|
||||
|
||||
self._remove_vm_disk(vmname)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _remove_vm_libvirt(dom):
|
||||
try:
|
||||
dom.destroy()
|
||||
except libvirt.libvirtError: # not running
|
||||
except libvirt.libvirtError: # not running
|
||||
pass
|
||||
dom.undefine()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _remove_vm_disk(vmname):
|
||||
for dirspec in (
|
||||
'qubes_appvms_dir',
|
||||
'qubes_templates_dir'):
|
||||
dirpath = os.path.join(qubes.config.qubes_base_dir,
|
||||
qubes.config.system_path[dirspec], vmname)
|
||||
qubes.config.system_path[dirspec], vmname)
|
||||
if os.path.exists(dirpath):
|
||||
if os.path.isdir(dirpath):
|
||||
shutil.rmtree(dirpath)
|
||||
@ -892,21 +894,21 @@ class SystemTestCase(QubesTestCase):
|
||||
|
||||
@staticmethod
|
||||
def _remove_vm_disk_lvm(prefix=VMPREFIX):
|
||||
''' Remove LVM volumes with given prefix
|
||||
""" Remove LVM volumes with given prefix
|
||||
|
||||
This is "a bit" drastic, as it removes volumes regardless of volume
|
||||
group, thin pool etc. But we assume no important data on test system.
|
||||
'''
|
||||
"""
|
||||
try:
|
||||
volumes = subprocess.check_output(
|
||||
['lvs', '--noheadings', '-o', 'vg_name,name',
|
||||
'--separator', '/']).decode()
|
||||
'--separator', '/']).decode()
|
||||
if ('/vm-' + prefix) not in volumes:
|
||||
return
|
||||
subprocess.check_call(['sudo', 'lvremove', '-f'] +
|
||||
[vol.strip() for vol in volumes.splitlines()
|
||||
if ('/vm-' + prefix) in vol],
|
||||
stdout=subprocess.DEVNULL)
|
||||
[vol.strip() for vol in volumes.splitlines()
|
||||
if ('/vm-' + prefix) in vol],
|
||||
stdout=subprocess.DEVNULL)
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
|
||||
@ -974,10 +976,10 @@ class SystemTestCase(QubesTestCase):
|
||||
vm.startup_lock.release()
|
||||
|
||||
def remove_test_vms(self, xmlpath=XMLPATH, prefix=VMPREFIX):
|
||||
'''Aggressively remove any domain that has name in testing namespace.
|
||||
"""Aggressively remove any domain that has name in testing namespace.
|
||||
|
||||
:param prefix: name prefix of VMs to remove, can be a list of prefixes
|
||||
'''
|
||||
"""
|
||||
|
||||
if isinstance(prefix, str):
|
||||
prefixes = [prefix]
|
||||
@ -996,9 +998,11 @@ class SystemTestCase(QubesTestCase):
|
||||
except AttributeError:
|
||||
host_app = qubes.Qubes()
|
||||
self.remove_vms([vm for vm in app.domains
|
||||
if any(vm.name.startswith(prefix) for prefix in prefixes) or
|
||||
(isinstance(vm, qubes.vm.dispvm.DispVM) and vm.name
|
||||
not in host_app.domains)])
|
||||
if any(
|
||||
vm.name.startswith(prefix) for prefix in prefixes) or
|
||||
(isinstance(vm,
|
||||
qubes.vm.dispvm.DispVM) and vm.name
|
||||
not in host_app.domains)])
|
||||
if not hasattr(self, 'host_app'):
|
||||
host_app.close()
|
||||
del host_app
|
||||
@ -1022,7 +1026,7 @@ class SystemTestCase(QubesTestCase):
|
||||
'qubes_appvms_dir',
|
||||
'qubes_templates_dir'):
|
||||
dirpath = os.path.join(qubes.config.qubes_base_dir,
|
||||
qubes.config.system_path[dirspec])
|
||||
qubes.config.system_path[dirspec])
|
||||
if not os.path.exists(dirpath):
|
||||
continue
|
||||
for name in os.listdir(dirpath):
|
||||
@ -1034,7 +1038,7 @@ class SystemTestCase(QubesTestCase):
|
||||
self._remove_vm_disk_lvm(prefix)
|
||||
|
||||
def qrexec_policy(self, service, source, destination, allow=True,
|
||||
action=None):
|
||||
action=None):
|
||||
"""
|
||||
Allow qrexec calls for duration of the test
|
||||
:param service: service name
|
||||
@ -1046,7 +1050,7 @@ class SystemTestCase(QubesTestCase):
|
||||
"""
|
||||
|
||||
return _QrexecPolicyContext(service, source, destination,
|
||||
allow=allow, action=action)
|
||||
allow=allow, action=action)
|
||||
|
||||
@asyncio.coroutine
|
||||
def wait_for_window_hide_coro(self, title, winid, timeout=30):
|
||||
@ -1057,7 +1061,8 @@ class SystemTestCase(QubesTestCase):
|
||||
"""
|
||||
wait_count = 0
|
||||
while subprocess.call(['xdotool', 'getwindowname', str(winid)],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) == 0:
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT) == 0:
|
||||
wait_count += 1
|
||||
if wait_count > timeout * 10:
|
||||
self.fail("Timeout while waiting for {}({}) window to "
|
||||
@ -1066,7 +1071,7 @@ class SystemTestCase(QubesTestCase):
|
||||
|
||||
@asyncio.coroutine
|
||||
def wait_for_window_coro(self, title, search_class=False, timeout=30,
|
||||
show=True):
|
||||
show=True):
|
||||
"""
|
||||
Wait for a window with a given title. Depending on show parameter,
|
||||
it will wait for either window to show or to disappear.
|
||||
@ -1089,12 +1094,12 @@ class SystemTestCase(QubesTestCase):
|
||||
if not show:
|
||||
try:
|
||||
winid = subprocess.check_output(xdotool_search + [title],
|
||||
stderr=subprocess.DEVNULL).decode()
|
||||
stderr=subprocess.DEVNULL).decode()
|
||||
except subprocess.CalledProcessError:
|
||||
# already gone
|
||||
return
|
||||
yield from self.wait_for_window_hide_coro(winid, title,
|
||||
timeout=timeout)
|
||||
timeout=timeout)
|
||||
return
|
||||
|
||||
winid = None
|
||||
@ -1149,7 +1154,8 @@ class SystemTestCase(QubesTestCase):
|
||||
|
||||
def shutdown_and_wait(self, vm, timeout=60):
|
||||
try:
|
||||
self.loop.run_until_complete(vm.shutdown(wait=True, timeout=timeout))
|
||||
self.loop.run_until_complete(
|
||||
vm.shutdown(wait=True, timeout=timeout))
|
||||
except qubes.exc.QubesException:
|
||||
name = vm.name
|
||||
del vm
|
||||
@ -1164,14 +1170,14 @@ class SystemTestCase(QubesTestCase):
|
||||
self.skipTest('dracut not installed')
|
||||
# create a single partition
|
||||
p = subprocess.Popen(['sfdisk', '-q', '-L', vm.storage.root_img],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT)
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.STDOUT)
|
||||
p.communicate('2048,\n')
|
||||
assert p.returncode == 0, 'sfdisk failed'
|
||||
# TODO: check if root_img is really file, not already block device
|
||||
p = subprocess.Popen(['sudo', 'losetup', '-f', '-P', '--show',
|
||||
vm.storage.root_img], stdout=subprocess.PIPE)
|
||||
vm.storage.root_img], stdout=subprocess.PIPE)
|
||||
(loopdev, _) = p.communicate()
|
||||
loopdev = loopdev.strip()
|
||||
looppart = loopdev + 'p1'
|
||||
@ -1181,11 +1187,11 @@ class SystemTestCase(QubesTestCase):
|
||||
subprocess.check_call(['sudo', 'mount', looppart, mountpoint])
|
||||
try:
|
||||
subprocess.check_call(['sudo', 'grub2-install',
|
||||
'--target', 'i386-pc',
|
||||
'--modules', 'part_msdos ext2',
|
||||
'--boot-directory', mountpoint, loopdev],
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
'--target', 'i386-pc',
|
||||
'--modules', 'part_msdos ext2',
|
||||
'--boot-directory', mountpoint, loopdev],
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
grub_cfg = '{}/grub2/grub.cfg'.format(mountpoint)
|
||||
subprocess.check_call(
|
||||
['sudo', 'chown', '-R', os.getlogin(), mountpoint])
|
||||
@ -1218,7 +1224,7 @@ class SystemTestCase(QubesTestCase):
|
||||
dracut_args += ['--install', ' '.join(extra_files)]
|
||||
subprocess.check_call(
|
||||
['dracut'] + dracut_args + [os.path.join(mountpoint,
|
||||
'initrd')],
|
||||
'initrd')],
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
finally:
|
||||
@ -1227,9 +1233,9 @@ class SystemTestCase(QubesTestCase):
|
||||
subprocess.check_call(['sudo', 'losetup', '-d', loopdev])
|
||||
|
||||
def create_bootable_iso(self):
|
||||
'''Create simple bootable ISO image.
|
||||
"""Create simple bootable ISO image.
|
||||
Type 'poweroff' to it to terminate that VM.
|
||||
'''
|
||||
"""
|
||||
isolinux_cfg = (
|
||||
'prompt 1\n'
|
||||
'label poweroff\n'
|
||||
@ -1244,13 +1250,13 @@ class SystemTestCase(QubesTestCase):
|
||||
with open(os.path.join(tmp_dir, 'isolinux.cfg'), 'w') as cfg:
|
||||
cfg.write(isolinux_cfg)
|
||||
subprocess.check_call(['genisoimage', '-o', output_path,
|
||||
'-c', 'boot.cat',
|
||||
'-b', 'isolinux.bin',
|
||||
'-no-emul-boot',
|
||||
'-boot-load-size', '4',
|
||||
'-boot-info-table',
|
||||
'-q',
|
||||
tmp_dir])
|
||||
'-c', 'boot.cat',
|
||||
'-b', 'isolinux.bin',
|
||||
'-no-emul-boot',
|
||||
'-boot-load-size', '4',
|
||||
'-boot-info-table',
|
||||
'-q',
|
||||
tmp_dir])
|
||||
except FileNotFoundError:
|
||||
self.skipTest('syslinux or genisoimage not installed')
|
||||
os.close(output_fd)
|
||||
@ -1281,8 +1287,10 @@ class SystemTestCase(QubesTestCase):
|
||||
|
||||
|
||||
_templates = None
|
||||
|
||||
|
||||
def list_templates():
|
||||
'''Returns tuple of template names available in the system.'''
|
||||
"""Returns tuple of template names available in the system."""
|
||||
global _templates
|
||||
if _templates is None:
|
||||
if 'QUBES_TEST_TEMPLATES' in os.environ:
|
||||
@ -1291,16 +1299,18 @@ def list_templates():
|
||||
try:
|
||||
app = qubes.Qubes()
|
||||
_templates = tuple(vm.name for vm in app.domains
|
||||
if isinstance(vm, qubes.vm.templatevm.TemplateVM) and
|
||||
vm.features.get('os', None) != 'Windows')
|
||||
if isinstance(vm,
|
||||
qubes.vm.templatevm.TemplateVM) and
|
||||
vm.features.get('os', None) != 'Windows')
|
||||
app.close()
|
||||
del app
|
||||
except OSError:
|
||||
_templates = ()
|
||||
return _templates
|
||||
|
||||
|
||||
def create_testcases_for_templates(name, *bases, module, **kwds):
|
||||
'''Do-it-all helper for generating per-template tests via load_tests proto
|
||||
"""Do-it-all helper for generating per-template tests via load_tests proto
|
||||
|
||||
This does several things:
|
||||
- creates per-template classes
|
||||
@ -1326,7 +1336,7 @@ def create_testcases_for_templates(name, *bases, module, **kwds):
|
||||
*NOTE* adding ``module=sys.modules[__name__]`` is *mandatory*, and to allow
|
||||
enforcing this, it uses keyword-only argument syntax, which is only in
|
||||
Python 3.
|
||||
'''
|
||||
"""
|
||||
# Do not attempt to grab the module from traceback, since we are actually
|
||||
# a generator and loadTestsFromNames may also be a generator, so it's not
|
||||
# possible to correctly guess frame from stack. Explicit is better than
|
||||
@ -1342,8 +1352,9 @@ def create_testcases_for_templates(name, *bases, module, **kwds):
|
||||
setattr(module, clsname, cls)
|
||||
yield '.'.join((module.__name__, clsname))
|
||||
|
||||
|
||||
def maybe_create_testcases_on_import(create_testcases_gen):
|
||||
'''If certain conditions are met, call *create_testcases_gen* to create
|
||||
"""If certain conditions are met, call *create_testcases_gen* to create
|
||||
testcases for templates tests. The purpose is to use it on integration
|
||||
tests module(s) import, so the test runner could discover tests without
|
||||
using load tests protocol.
|
||||
@ -1352,17 +1363,18 @@ def maybe_create_testcases_on_import(create_testcases_gen):
|
||||
- QUBES_TEST_TEMPLATES present in the environment (it's possible to
|
||||
create test cases without opening qubes.xml)
|
||||
- QUBES_TEST_LOAD_ALL present in the environment
|
||||
'''
|
||||
"""
|
||||
if 'QUBES_TEST_TEMPLATES' in os.environ or \
|
||||
'QUBES_TEST_LOAD_ALL' in os.environ:
|
||||
list(create_testcases_gen())
|
||||
|
||||
|
||||
def extra_info(obj):
|
||||
'''Return short info identifying object.
|
||||
"""Return short info identifying object.
|
||||
|
||||
For example, if obj is a qube, return its name. This is for use with
|
||||
:py:mod:`objgraph` package.
|
||||
'''
|
||||
"""
|
||||
# Feel free to extend to other cases.
|
||||
|
||||
if isinstance(obj, qubes.vm.qubesvm.QubesVM):
|
||||
@ -1375,7 +1387,8 @@ def extra_info(obj):
|
||||
|
||||
return ''
|
||||
|
||||
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||
|
||||
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||
# discard any tests from this module, because it hosts base classes
|
||||
tests = unittest.TestSuite()
|
||||
|
||||
@ -1405,7 +1418,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||
'qubes.tests.api_misc',
|
||||
'qubes.tests.api_internal',
|
||||
'qubes.tests.rpc_import',
|
||||
):
|
||||
):
|
||||
tests.addTests(loader.loadTestsFromName(modname))
|
||||
|
||||
tests.addTests(loader.discover(
|
||||
@ -1436,7 +1449,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||
|
||||
# external modules
|
||||
'qubes.tests.extra',
|
||||
):
|
||||
):
|
||||
tests.addTests(loader.loadTestsFromName(modname))
|
||||
|
||||
return tests
|
||||
|
Loading…
Reference in New Issue
Block a user