Merge branch 'devel-fdleaks'

This commit is contained in:
Wojtek Porczyk 2017-09-01 22:17:09 +02:00
commit a3f542edcd
20 changed files with 250 additions and 171 deletions

View File

@ -689,6 +689,21 @@ class PropertyHolder(qubes.events.Emitter):
# pylint: disable=no-member
self.log.fatal(msg)
def close(self):
super().close()
# Remove all properties -- somewhere in them there are cyclic
# references. This just removes all the properties, just in case.
# They are removed directly, bypassing write_once.
for prop in self.property_list():
# pylint: disable=protected-access
try:
delattr(self, prop._attr_name)
except AttributeError:
pass
# pylint: disable=wrong-import-position
from qubes.vm import VMProperty
from qubes.app import Qubes

View File

@ -32,6 +32,7 @@ import subprocess
import sys
import tempfile
import time
import traceback
import uuid
import lxml.etree
@ -144,14 +145,15 @@ class VMMConnection(object):
:param offline_mode: enable/disable offline mode; default is to
enable when running in chroot as root, otherwise disable
'''
self._libvirt_conn = None
self._xs = None
self._xc = None
if offline_mode is None:
offline_mode = bool(os.getuid() == 0 and
os.stat('/') != os.stat('/proc/1/root/.'))
self._offline_mode = offline_mode
self._libvirt_conn = None
self._xs = None
self._xc = None
@property
def offline_mode(self):
'''Check or enable offline mode (do not actually connect to vmm)'''
@ -218,36 +220,15 @@ class VMMConnection(object):
self.init_vmm_connection()
return self._xc
def register_event_handlers(self, app):
'''Register libvirt event handlers, which will translate libvirt
events into qubes.events. This function should be called only in
'qubesd' process and only when mainloop has been already set.
'''
self.libvirt_conn.domainEventRegisterAny(
None, # any domain
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
self._domain_event_callback,
app
)
@staticmethod
def _domain_event_callback(_conn, domain, event, _detail, opaque):
'''Generic libvirt event handler (virConnectDomainEventCallback),
translate libvirt event into qubes.events.
'''
app = opaque
try:
vm = app.domains[domain.name()]
except KeyError:
# ignore events for unknown domains
return
if event == libvirt.VIR_DOMAIN_EVENT_STOPPED:
vm.fire_event('domain-shutdown')
def __del__(self):
def close(self):
libvirt.registerErrorHandler(None, None)
if self._xs:
self._xs.close()
self._xs = None
if self._libvirt_conn:
self._libvirt_conn.close()
self._libvirt_conn = None
self._xc = None # and pray it will get garbage-collected
class QubesHost(object):
@ -398,6 +379,12 @@ class VMCollection(object):
self._dict = dict()
def close(self):
del self.app
self._dict.clear()
del self._dict
def __repr__(self):
return '<{} {!r}>'.format(
self.__class__.__name__, list(sorted(self.keys())))
@ -729,6 +716,10 @@ class Qubes(qubes.PropertyHolder):
**kwargs):
#: logger instance for logging global messages
self.log = logging.getLogger('app')
self.log.debug('init() -> %#x', id(self))
self.log.debug('stack:')
for frame in traceback.extract_stack():
self.log.debug('%s', frame)
self._extensions = qubes.ext.get_extensions()
@ -759,6 +750,7 @@ class Qubes(qubes.PropertyHolder):
self.__load_timestamp = None
self.__locked_fh = None
self._domain_event_callback_id = None
#: jinja2 environment for libvirt XML templates
self.env = jinja2.Environment(
@ -910,6 +902,40 @@ class Qubes(qubes.PropertyHolder):
self._release_lock()
def close(self):
'''Deconstruct the object and break circular references
After calling this the object is unusable, not even for saving.'''
self.log.debug('close() <- %#x', id(self))
for frame in traceback.extract_stack():
self.log.debug('%s', frame)
super().close()
if self._domain_event_callback_id is not None:
self.vmm.libvirt_conn.domainEventDeregisterAny(
self._domain_event_callback_id)
self._domain_event_callback_id = None
# Only our Lord, The God Almighty, knows what references
# are kept in extensions.
del self._extensions
for vm in self.domains:
vm.close()
self.domains.close()
del self.domains
self.vmm.close()
del self.vmm
del self.host
if self.__locked_fh:
self._release_lock()
def _acquire_lock(self, for_save=False):
assert self.__locked_fh is None, 'double lock'
@ -1129,6 +1155,34 @@ class Qubes(qubes.PropertyHolder):
raise qubes.exc.QubesException('No driver %s for pool %s' %
(driver, name))
def register_event_handlers(self):
'''Register libvirt event handlers, which will translate libvirt
events into qubes.events. This function should be called only in
'qubesd' process and only when mainloop has been already set.
'''
self._domain_event_callback_id = (
self.vmm.libvirt_conn.domainEventRegisterAny(
None, # any domain
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
self._domain_event_callback,
None))
def _domain_event_callback(self, _conn, domain, event, _detail, _opaque):
'''Generic libvirt event handler (virConnectDomainEventCallback),
translate libvirt event into qubes.events.
'''
if not self.events_enabled:
return
try:
vm = self.domains[domain.name()]
except KeyError:
# ignore events for unknown domains
return
if event == libvirt.VIR_DOMAIN_EVENT_STOPPED:
vm.fire_event('domain-shutdown')
@qubes.events.handler('domain-pre-delete')
def on_domain_pre_deleted(self, event, vm):
# pylint: disable=unused-argument

View File

@ -106,6 +106,8 @@ class Emitter(object, metaclass=EmitterMeta):
self.events_enabled = False
self.__handlers__ = collections.defaultdict(set)
def close(self):
self.events_enabled = False
def add_handler(self, event, func):
'''Add event handler to subject's class.

View File

@ -52,6 +52,7 @@ import gc
import lxml.etree
import pkg_resources
import qubes
import qubes.api
import qubes.api.admin
import qubes.api.internal
@ -61,6 +62,7 @@ import qubes.devices
import qubes.events
import qubes.exc
import qubes.vm.standalonevm
import qubes.vm.templatevm
XMLPATH = '/var/lib/qubes/qubes-test.xml'
CLASS_XMLPATH = '/var/lib/qubes/qubes-class-test.xml'
@ -109,6 +111,7 @@ except OSError:
# command not found; let's assume we're outside
pass
def skipUnlessDom0(test_item):
'''Decorator that skips test outside dom0.
@ -118,7 +121,6 @@ def skipUnlessDom0(test_item):
return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
def skipUnlessGit(test_item):
'''Decorator that skips test outside git repo.
@ -128,6 +130,16 @@ def skipUnlessGit(test_item):
return unittest.skipUnless(in_git, 'outside git tree')(test_item)
def skipUnlessEnv(varname):
'''Decorator generator for skipping tests without environment variable set.
Some tests require working X11 display, like those using GTK library, which
segfaults without connection to X.
Other require their own, custom variables.
'''
return unittest.skipUnless(os.getenv(varname), 'no {} set'.format(varname))
class TestEmitter(qubes.events.Emitter):
'''Dummy event emitter which records events fired on it.
@ -333,12 +345,6 @@ class substitute_entry_points(object):
self._orig_iter_entry_points = None
class BeforeCleanExit(BaseException):
'''Raised from :py:meth:`QubesTestCase.tearDown` when
:py:attr:`qubes.tests.run.QubesDNCTestResult.do_not_clean` is set.'''
pass
class QubesTestCase(unittest.TestCase):
'''Base class for Qubes unit tests.
'''
@ -367,27 +373,15 @@ class QubesTestCase(unittest.TestCase):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.addCleanup(self.cleanup_loop)
def tearDown(self):
def cleanup_loop(self):
# The loop, when closing, throws a warning if there is
# some unfinished bussiness. Let's catch that.
with warnings.catch_warnings():
warnings.simplefilter('error')
self.loop.close()
# TODO: find better way in py3
try:
result = self._outcome.result
except:
result = self._resultForDoCleanups
failed_test_cases = result.failures \
+ result.errors \
+ [(tc, None) for tc in result.unexpectedSuccesses]
if getattr(result, 'do_not_clean', False) \
and any(tc is self for tc, exc in failed_test_cases):
raise BeforeCleanExit()
del self.loop
def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
"""Fail if an exception of class excClass is raised
@ -593,7 +587,8 @@ class SystemTestCase(QubesTestCase):
if not in_dom0:
self.skipTest('outside dom0')
super(SystemTestCase, self).setUp()
libvirtaio.virEventRegisterAsyncIOImpl(loop=self.loop)
self.libvirt_event_impl = libvirtaio.virEventRegisterAsyncIOImpl(
loop=self.loop)
self.remove_test_vms()
# need some information from the real qubes.xml - at least installed
@ -607,7 +602,7 @@ class SystemTestCase(QubesTestCase):
shutil.copy(self.host_app.store, XMLPATH)
self.app = qubes.Qubes(XMLPATH)
os.environ['QUBES_XML_PATH'] = XMLPATH
self.app.vmm.register_event_handlers(self.app)
self.app.register_event_handlers()
self.qubesd = self.loop.run_until_complete(
qubes.api.create_servers(
@ -615,6 +610,55 @@ class SystemTestCase(QubesTestCase):
qubes.api.internal.QubesInternalAPI,
app=self.app, debug=True))
self.addCleanup(self.cleanup_app)
def cleanup_app(self):
self.remove_test_vms()
server = None
for server in self.qubesd:
for sock in server.sockets:
os.unlink(sock.getsockname())
server.close()
del server
# close all existing connections, especially this will interrupt
# running admin.Events calls, which do keep reference to Qubes() and
# libvirt connection
conn = None
for conn in qubes.api.QubesDaemonProtocol.connections:
if conn.transport:
conn.transport.abort()
del conn
self.loop.run_until_complete(asyncio.wait([
server.wait_closed() for server in self.qubesd]))
del self.qubesd
# remove all references to any complex qubes objects, to release
# resources - most importantly file descriptors; this object will live
# during the whole test run, but all the file descriptors would be
# depleted earlier
self.app.close()
self.host_app.close()
del self.app
del self.host_app
for attr in dir(self):
obj_type = type(getattr(self, attr))
if obj_type.__module__.startswith('qubes'):
delattr(self, attr)
# then trigger garbage collector to really destroy those objects
gc.collect()
self.loop.run_until_complete(self.libvirt_event_impl.drain())
if not self.libvirt_event_impl.is_idle():
self.log.warning(
'libvirt event impl not clean: callbacks %r descriptors %r',
self.libvirt_event_impl.callbacks,
self.libvirt_event_impl.descriptors)
def init_default_template(self, template=None):
if template is None:
template = self.host_app.default_template
@ -663,47 +707,6 @@ class SystemTestCase(QubesTestCase):
self.pool = self.app.add_pool(**POOL_CONF)
self.created_pool = True
def tearDown(self):
self.remove_test_vms()
# close the servers before super(), because that might close the loop
server = None
for server in self.qubesd:
for sock in server.sockets:
os.unlink(sock.getsockname())
server.close()
del server
# close all existing connections, especially this will interrupt
# running admin.Events calls, which do keep reference to Qubes() and
# libvirt connection
conn = None
for conn in qubes.api.QubesDaemonProtocol.connections:
if conn.transport:
conn.transport.abort()
del conn
self.loop.run_until_complete(asyncio.wait([
server.wait_closed() for server in self.qubesd]))
del self.qubesd
# remove all references to any complex qubes objects, to release
# resources - most importantly file descriptors; this object will live
# during the whole test run, but all the file descriptors would be
# depleted earlier
self.app.vmm._libvirt_conn = None
del self.app
del self.host_app
for attr in dir(self):
obj_type = type(getattr(self, attr))
if obj_type.__module__.startswith('qubes'):
delattr(self, attr)
# then trigger garbage collector to really destroy those objects
gc.collect()
super(SystemTestCase, self).tearDown()
def _remove_vm_qubes(self, vm):
vmname = vm.name
app = vm.app
@ -982,6 +985,22 @@ class SystemTestCase(QubesTestCase):
timeout=30)
_templates = None
def list_templates():
'''Returns tuple of template names available in the system.'''
global _templates
if _templates is None:
try:
app = qubes.Qubes()
_templates = tuple(vm.name for vm in app.domains
if isinstance(vm, qubes.vm.templatevm.TemplateVM))
app.close()
del app
except OSError:
_templates = ()
return _templates
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
# discard any tests from this module, because it hosts base classes
tests = unittest.TestSuite()

View File

@ -275,7 +275,7 @@ class TC_90_Qubes(qubes.tests.QubesTestCase):
os.unlink('/tmp/qubestest.xml')
except FileNotFoundError:
pass
qubes.Qubes.create_empty_store('/tmp/qubestest.xml')
qubes.Qubes.create_empty_store('/tmp/qubestest.xml').close()
def test_100_clockvm(self):
app = qubes.Qubes('/tmp/qubestest.xml', load=False, offline_mode=True)
@ -295,6 +295,7 @@ class TC_90_Qubes(qubes.tests.QubesTestCase):
self.assertNotIn('service.clocksync', appvm.features)
self.assertIn('service.clocksync', template.features)
self.assertTrue(template.features['service.clocksync'])
app.close()
@qubes.tests.skipUnlessGit
def test_900_example_xml_in_doc(self):

View File

@ -23,7 +23,6 @@ import sys
import pkg_resources
import qubes.tests
import qubes.vm.appvm
import qubes.vm.templatevm
class ExtraTestCase(qubes.tests.SystemTestCase):
@ -80,18 +79,11 @@ def load_tests(loader, tests, pattern):
{entry.name: runTest})
tests.addTest(ExtraLoadFailure(entry.name))
try:
app = qubes.Qubes()
templates = [vm.name for vm in app.domains if
isinstance(vm, qubes.vm.templatevm.TemplateVM)]
except OSError:
templates = []
for entry in pkg_resources.iter_entry_points(
'qubes.tests.extra.for_template'):
try:
for test_case in entry.load()():
for template in templates:
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'{}_{}_{}'.format(

View File

@ -541,13 +541,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
def load_tests(loader, tests, pattern):
try:
app = qubes.Qubes()
templates = [vm.name for vm in app.domains if
isinstance(vm, qubes.vm.templatevm.TemplateVM)]
except OSError:
templates = []
for template in templates:
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_10_BackupVM_' + template,

View File

@ -31,14 +31,12 @@ import qubes.ext.pci
import qubes.tests
@qubes.tests.skipUnlessEnv('QUBES_TEST_PCIDEV')
class TC_00_Devices_PCI(qubes.tests.SystemTestCase):
def setUp(self):
super(TC_00_Devices_PCI, self).setUp()
if self._testMethodName not in ['test_000_list']:
pcidev = os.environ.get('QUBES_TEST_PCIDEV', None)
if pcidev is None:
self.skipTest('Specify PCI device with QUBES_TEST_PCIDEV '
'environment variable')
pcidev = os.environ['QUBES_TEST_PCIDEV']
self.dev = self.app.domains[0].devices['pci'][pcidev]
self.assignment = qubes.devices.DeviceAssignment(backend_domain=self.dev.backend_domain, ident=self.dev.ident, persistent=True)
if isinstance(self.dev, qubes.devices.UnknownDevice):

View File

@ -19,13 +19,15 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
from distutils import spawn
import qubes.tests
import os
import subprocess
import tempfile
import unittest
import os
import time
import unittest
from distutils import spawn
import qubes.tests
class TC_04_DispVM(qubes.tests.SystemTestCase):
@ -254,13 +256,7 @@ class TC_20_DispVMMixin(object):
self.assertEqual(test_txt_content, b"Test test 2\ntest1\n")
def load_tests(loader, tests, pattern):
try:
app = qubes.Qubes()
templates = [vm.name for vm in app.domains if
isinstance(vm, qubes.vm.templatevm.TemplateVM)]
except OSError:
templates = []
for template in templates:
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_20_DispVM_' + template,

View File

@ -26,6 +26,7 @@ import tempfile
import unittest
import qubes
import qubes.tests
VM_PREFIX = "test-"
@ -359,13 +360,7 @@ Test package
def load_tests(loader, tests, pattern):
try:
app = qubes.Qubes()
templates = [vm.name for vm in app.domains if
isinstance(vm, qubes.vm.templatevm.TemplateVM)]
except OSError:
templates = []
for template in templates:
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_00_Dom0Upgrade_' + template,

View File

@ -934,13 +934,7 @@ SHA256:
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
def load_tests(loader, tests, pattern):
try:
app = qubes.Qubes()
templates = [vm.name for vm in app.domains if
isinstance(vm, qubes.vm.templatevm.TemplateVM)]
except OSError:
templates = []
for template in templates:
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'VmNetworking_' + template,

View File

@ -25,6 +25,7 @@ import multiprocessing
import os
import subprocess
import unittest
from distutils import spawn
import qubes.config
@ -1001,13 +1002,7 @@ class TC_10_Generic(qubes.tests.SystemTestCase):
def load_tests(loader, tests, pattern):
try:
app = qubes.Qubes()
templates = [vm.name for vm in app.domains if
isinstance(vm, qubes.vm.templatevm.TemplateVM)]
except OSError:
templates = []
for template in templates:
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_00_AppVM_' + template,

View File

@ -20,6 +20,7 @@
#
import argparse
import code
import curses
import itertools
import logging
@ -230,10 +231,6 @@ class QubesTestResult(unittest.TestResult):
self.stream.writeln('%s' % err)
class QubesDNCTestResult(QubesTestResult):
do_not_clean = True
def demo(verbosity=2):
class TC_00_Demo(qubes.tests.QubesTestCase):
'''Demo class'''
@ -292,13 +289,6 @@ parser.add_argument('--no-failfast',
action='store_false', dest='failfast',
help='disable --failfast')
parser.add_argument('--do-not-clean', '--dnc', '-D',
action='store_true', dest='do_not_clean',
help='do not execute tearDown on failed tests. Implies --failfast.')
parser.add_argument('--do-clean', '-C',
action='store_false', dest='do_not_clean',
help='do execute tearDown even on failed tests.')
# pylint: disable=protected-access
try:
name_to_level = logging._nameToLevel
@ -337,6 +327,10 @@ parser.add_argument('--allow-running-along-qubesd',
help='allow running in parallel with qubesd;'
' this is DANGEROUS and WILL RESULT IN INCONSISTENT SYSTEM STATE')
parser.add_argument('--break-to-repl',
action='store_true', default=False,
help='break to REPL after tests')
parser.add_argument('names', metavar='TESTNAME',
action='store', nargs='*',
help='list of tests to run named like in description '
@ -362,8 +356,8 @@ def list_test_cases(suite):
yield test
def main():
args = parser.parse_args()
def main(args=None):
args = parser.parse_args(args)
suite = unittest.TestSuite()
loader = unittest.TestLoader()
@ -382,9 +376,6 @@ def main():
print(str(test)) # pylint: disable=superfluous-parens
return True
if args.do_not_clean:
args.failfast = True
logging.root.setLevel(args.loglevel)
if args.logfile is not None:
@ -420,11 +411,13 @@ def main():
verbosity=(args.verbose-args.quiet),
failfast=args.failfast)
unittest.signals.installHandler()
runner.resultclass = QubesTestResult
result = runner.run(suite)
runner.resultclass = QubesDNCTestResult \
if args.do_not_clean else QubesTestResult
if args.break_to_repl:
code.interact(local=locals())
return runner.run(suite).wasSuccessful()
return result.wasSuccessful()
if __name__ == '__main__':

View File

@ -92,6 +92,7 @@ class TC_00_Pool(SystemTestCase):
def setUp(self):
super(TC_00_Pool, self).setUp()
self.app.close()
self.app = TestApp()
def test_000_unknown_pool_driver(self):

View File

@ -35,7 +35,7 @@ def main(args=None):
loop.close()
raise
args.app.vmm.register_event_handlers(args.app)
args.app.register_event_handlers()
if args.debug:
qubes.log.enable_debug()

View File

@ -297,6 +297,22 @@ class BaseVM(qubes.PropertyHolder):
if hasattr(self, 'name'):
self.init_log()
def close(self):
super().close()
if self._qdb_connection_watch is not None:
asyncio.get_event_loop().remove_reader(
self._qdb_connection_watch.watch_fd())
self._qdb_connection_watch.close()
del self._qdb_connection_watch
del self.app
del self.features
del self.storage
# TODO storage may have circ references, but it doesn't leak fds
del self.devices
del self.tags
def load_extras(self):
# features
for node in self.xml.xpath('./features/feature'):

View File

@ -706,6 +706,12 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.events_enabled = True
self.fire_event('domain-init')
def close(self):
if self._qdb_connection is not None:
self._qdb_connection.close()
self._qdb_connection = None
super().close()
def __hash__(self):
return self.qid

View File

@ -69,8 +69,9 @@ def main(args=None):
caller_ident = args.process_ident + "," + args.domain + "," + args.domain_id
log = logging.getLogger('qubespolicy')
log.setLevel(logging.INFO)
handler = logging.handlers.SysLogHandler(address='/dev/log')
log.addHandler(handler)
if not log.handlers:
handler = logging.handlers.SysLogHandler(address='/dev/log')
log.addHandler(handler)
log_prefix = 'qrexec: {}: {} -> {}: '.format(
args.service_name, args.domain, args.target)
try:

View File

@ -26,6 +26,8 @@ import gi # isort:skip
gi.require_version('Gtk', '3.0') # isort:skip
from gi.repository import Gtk # isort:skip pylint:
from qubes.tests import skipUnlessEnv
from qubespolicy.gtkhelpers import VMListModeler, GtkOneTimerHelper, \
FocusStealingHelper
@ -81,6 +83,7 @@ class GtkTestCase(unittest.TestCase):
return iterations, time_length
@skipUnlessEnv('DISPLAY')
class VMListModelerTest(VMListModeler, unittest.TestCase):
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
@ -305,6 +308,7 @@ class FocusStealingHelperMock(FocusStealingHelper):
self._window_changed_focus(True)
@skipUnlessEnv('DISPLAY')
class FocusStealingHelperTest(FocusStealingHelperMock, GtkTestCase):
def __init__(self, *args, **kwargs):
GtkTestCase.__init__(self, *args, **kwargs)

View File

@ -14,6 +14,9 @@ class libvirtError(Exception):
def openReadOnly(*args, **kwargs):
raise libvirtError('mock module, always raises')
def registerErrorHandler(f, ctx):
pass
VIR_DOMAIN_BLOCKED = 2
VIR_DOMAIN_RUNNING = 1
VIR_DOMAIN_PAUSED = 3