Merge remote-tracking branch 'qubesos/pr/150'
* qubesos/pr/150: qubes/tests: moar fixes test-packages: add missing libvirt classes qubes/tests: do not deadlock on .drain() qubes/vm: put name= first in __repr__ tests: fix some memory leaks tests: complain about memory leaks tests: use one event loop and one libvirtaio impl
This commit is contained in:
commit
e0cadfdd67
@ -98,6 +98,9 @@ except libvirt.libvirtError:
|
|||||||
|
|
||||||
if in_dom0:
|
if in_dom0:
|
||||||
import libvirtaio
|
import libvirtaio
|
||||||
|
libvirt_event_impl = libvirtaio.virEventRegisterAsyncIOImpl()
|
||||||
|
else:
|
||||||
|
libvirt_event_impl = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
in_git = subprocess.check_output(
|
in_git = subprocess.check_output(
|
||||||
@ -371,16 +374,59 @@ class QubesTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
self.loop = asyncio.new_event_loop()
|
self.addCleanup(self.cleanup_gc)
|
||||||
asyncio.set_event_loop(self.loop)
|
|
||||||
|
self.loop = asyncio.get_event_loop()
|
||||||
self.addCleanup(self.cleanup_loop)
|
self.addCleanup(self.cleanup_loop)
|
||||||
|
|
||||||
|
def cleanup_gc(self):
|
||||||
|
gc.collect()
|
||||||
|
leaked = [obj for obj in gc.get_objects() + gc.garbage
|
||||||
|
if isinstance(obj,
|
||||||
|
(qubes.Qubes, qubes.vm.BaseVM,
|
||||||
|
libvirt.virConnect, libvirt.virDomain))]
|
||||||
|
|
||||||
|
if leaked:
|
||||||
|
try:
|
||||||
|
import objgraph
|
||||||
|
objgraph.show_backrefs(leaked,
|
||||||
|
max_depth=15, extra_info=extra_info,
|
||||||
|
filename='/tmp/objgraph-{}.png'.format(self.id()))
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
assert not leaked
|
||||||
|
|
||||||
def cleanup_loop(self):
|
def cleanup_loop(self):
|
||||||
# The loop, when closing, throws a warning if there is
|
'''Check if the loop is empty'''
|
||||||
# some unfinished bussiness. Let's catch that.
|
# XXX BEWARE this is touching undocumented, implementation-specific
|
||||||
with warnings.catch_warnings():
|
# attributes of the loop. This is most certainly unsupported and likely
|
||||||
warnings.simplefilter('error')
|
# will break when messing with: Python version, kernel family, loop
|
||||||
self.loop.close()
|
# implementation, a combination thereof, or other things.
|
||||||
|
# KEYWORDS for searching:
|
||||||
|
# win32, SelectorEventLoop, ProactorEventLoop, uvloop, gevent
|
||||||
|
|
||||||
|
global libvirt_event_impl
|
||||||
|
|
||||||
|
# Check for unfinished libvirt business.
|
||||||
|
if libvirt_event_impl is not None:
|
||||||
|
try:
|
||||||
|
self.loop.run_until_complete(asyncio.wait_for(
|
||||||
|
libvirt_event_impl.drain(), timeout=4))
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise AssertionError('libvirt event impl drain timeout')
|
||||||
|
|
||||||
|
# Check there are no Tasks left.
|
||||||
|
assert not self.loop._ready
|
||||||
|
assert not self.loop._scheduled
|
||||||
|
|
||||||
|
# Check the loop watches no descriptors.
|
||||||
|
# NOTE the loop has a pipe for self-interrupting, created once per
|
||||||
|
# lifecycle, and it is unwatched only at loop.close(); so we cannot just
|
||||||
|
# check selector for non-emptiness
|
||||||
|
assert len(self.loop._selector.get_map()) \
|
||||||
|
== int(self.loop._ssock is not None)
|
||||||
|
|
||||||
del self.loop
|
del self.loop
|
||||||
|
|
||||||
def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
|
def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
|
||||||
@ -587,8 +633,6 @@ class SystemTestCase(QubesTestCase):
|
|||||||
if not in_dom0:
|
if not in_dom0:
|
||||||
self.skipTest('outside dom0')
|
self.skipTest('outside dom0')
|
||||||
super(SystemTestCase, self).setUp()
|
super(SystemTestCase, self).setUp()
|
||||||
self.libvirt_event_impl = libvirtaio.virEventRegisterAsyncIOImpl(
|
|
||||||
loop=self.loop)
|
|
||||||
self.remove_test_vms()
|
self.remove_test_vms()
|
||||||
|
|
||||||
# need some information from the real qubes.xml - at least installed
|
# need some information from the real qubes.xml - at least installed
|
||||||
@ -652,13 +696,6 @@ class SystemTestCase(QubesTestCase):
|
|||||||
# then trigger garbage collector to really destroy those objects
|
# then trigger garbage collector to really destroy those objects
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
self.loop.run_until_complete(self.libvirt_event_impl.drain())
|
|
||||||
if not self.libvirt_event_impl.is_idle():
|
|
||||||
self.log.warning(
|
|
||||||
'libvirt event impl not clean: callbacks %r descriptors %r',
|
|
||||||
self.libvirt_event_impl.callbacks,
|
|
||||||
self.libvirt_event_impl.descriptors)
|
|
||||||
|
|
||||||
def init_default_template(self, template=None):
|
def init_default_template(self, template=None):
|
||||||
if template is None:
|
if template is None:
|
||||||
template = self.host_app.default_template
|
template = self.host_app.default_template
|
||||||
@ -1000,6 +1037,23 @@ def list_templates():
|
|||||||
_templates = ()
|
_templates = ()
|
||||||
return _templates
|
return _templates
|
||||||
|
|
||||||
|
def extra_info(obj):
|
||||||
|
'''Return short info identifying object.
|
||||||
|
|
||||||
|
For example, if obj is a qube, return its name. This is for use with
|
||||||
|
:py:mod:`objgraph` package.
|
||||||
|
'''
|
||||||
|
# Feel free to extend to other cases.
|
||||||
|
|
||||||
|
if isinstance(obj, qubes.vm.qubesvm.QubesVM):
|
||||||
|
try:
|
||||||
|
return obj.name
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
if isinstance(obj, unittest.TestCase):
|
||||||
|
return obj.id()
|
||||||
|
|
||||||
|
return ''
|
||||||
|
|
||||||
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||||
# discard any tests from this module, because it hosts base classes
|
# discard any tests from this module, because it hosts base classes
|
||||||
|
@ -97,8 +97,6 @@ class TC_00_QubesDaemonProtocol(qubes.tests.QubesTestCase):
|
|||||||
super(TC_00_QubesDaemonProtocol, self).setUp()
|
super(TC_00_QubesDaemonProtocol, self).setUp()
|
||||||
self.app = unittest.mock.Mock()
|
self.app = unittest.mock.Mock()
|
||||||
self.app.log = self.log
|
self.app.log = self.log
|
||||||
self.loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(self.loop)
|
|
||||||
self.sock_client, self.sock_server = socket.socketpair()
|
self.sock_client, self.sock_server = socket.socketpair()
|
||||||
self.reader, self.writer = self.loop.run_until_complete(
|
self.reader, self.writer = self.loop.run_until_complete(
|
||||||
asyncio.open_connection(sock=self.sock_client))
|
asyncio.open_connection(sock=self.sock_client))
|
||||||
@ -113,9 +111,6 @@ class TC_00_QubesDaemonProtocol(qubes.tests.QubesTestCase):
|
|||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.sock_server.close()
|
self.sock_server.close()
|
||||||
self.sock_client.close()
|
self.sock_client.close()
|
||||||
self.loop.stop()
|
|
||||||
self.loop.run_forever()
|
|
||||||
self.loop.close()
|
|
||||||
super(TC_00_QubesDaemonProtocol, self).tearDown()
|
super(TC_00_QubesDaemonProtocol, self).tearDown()
|
||||||
|
|
||||||
def test_000_message_ok(self):
|
def test_000_message_ok(self):
|
||||||
|
@ -90,6 +90,11 @@ class AdminAPITestCase(qubes.tests.QubesTestCase):
|
|||||||
self.base_dir_patch.stop()
|
self.base_dir_patch.stop()
|
||||||
if os.path.exists(self.test_base_dir):
|
if os.path.exists(self.test_base_dir):
|
||||||
shutil.rmtree(self.test_base_dir)
|
shutil.rmtree(self.test_base_dir)
|
||||||
|
del self.vm
|
||||||
|
del self.template
|
||||||
|
self.app.close()
|
||||||
|
del self.app
|
||||||
|
del self.emitter
|
||||||
super(AdminAPITestCase, self).tearDown()
|
super(AdminAPITestCase, self).tearDown()
|
||||||
|
|
||||||
def call_mgmt_func(self, method, dest, arg=b'', payload=b''):
|
def call_mgmt_func(self, method, dest, arg=b'', payload=b''):
|
||||||
@ -1595,6 +1600,12 @@ class TC_00_VMs(AdminAPITestCase):
|
|||||||
self.vm2.volumes['private'].import_volume.return_value = \
|
self.vm2.volumes['private'].import_volume.return_value = \
|
||||||
self.vm2.volumes['private']
|
self.vm2.volumes['private']
|
||||||
|
|
||||||
|
self.addCleanup(self.cleanup_for_clone)
|
||||||
|
|
||||||
|
def cleanup_for_clone(self):
|
||||||
|
del self.vm2
|
||||||
|
del self.pool
|
||||||
|
|
||||||
def test_520_vm_volume_clone(self):
|
def test_520_vm_volume_clone(self):
|
||||||
self.setup_for_clone()
|
self.setup_for_clone()
|
||||||
token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom',
|
token = self.call_mgmt_func(b'admin.vm.volume.CloneFrom',
|
||||||
|
@ -160,6 +160,17 @@ class TC_30_VMCollection(qubes.tests.QubesTestCase):
|
|||||||
self.testvm2 = qubes.tests.init.TestVM(
|
self.testvm2 = qubes.tests.init.TestVM(
|
||||||
None, None, qid=2, name='testvm2')
|
None, None, qid=2, name='testvm2')
|
||||||
|
|
||||||
|
self.addCleanup(self.cleanup_vmcollection)
|
||||||
|
|
||||||
|
def cleanup_vmcollection(self):
|
||||||
|
self.testvm1.close()
|
||||||
|
self.testvm2.close()
|
||||||
|
self.vms.close()
|
||||||
|
del self.testvm1
|
||||||
|
del self.testvm2
|
||||||
|
del self.vms
|
||||||
|
del self.app
|
||||||
|
|
||||||
def test_000_contains(self):
|
def test_000_contains(self):
|
||||||
self.vms._dict = {1: self.testvm1}
|
self.vms._dict = {1: self.testvm1}
|
||||||
|
|
||||||
|
@ -163,8 +163,6 @@ class TC_00_Emitter(qubes.tests.QubesTestCase):
|
|||||||
emitter.events_enabled = True
|
emitter.events_enabled = True
|
||||||
|
|
||||||
effect = loop.run_until_complete(emitter.fire_event_async('testevent'))
|
effect = loop.run_until_complete(emitter.fire_event_async('testevent'))
|
||||||
loop.close()
|
|
||||||
asyncio.set_event_loop(None)
|
|
||||||
|
|
||||||
self.assertCountEqual(effect,
|
self.assertCountEqual(effect,
|
||||||
('testvalue1', 'testvalue2', 'testvalue3', 'testvalue4'))
|
('testvalue1', 'testvalue2', 'testvalue3', 'testvalue4'))
|
||||||
|
@ -320,6 +320,16 @@ class TC_30_VMCollection(qubes.tests.QubesTestCase):
|
|||||||
|
|
||||||
self.testvm1 = TestVM(None, None, qid=1, name='testvm1')
|
self.testvm1 = TestVM(None, None, qid=1, name='testvm1')
|
||||||
self.testvm2 = TestVM(None, None, qid=2, name='testvm2')
|
self.testvm2 = TestVM(None, None, qid=2, name='testvm2')
|
||||||
|
self.addCleanup(self.cleanup_testvm)
|
||||||
|
|
||||||
|
def cleanup_testvm(self):
|
||||||
|
self.vms.close()
|
||||||
|
self.testvm1.close()
|
||||||
|
self.testvm2.close()
|
||||||
|
del self.testvm1
|
||||||
|
del self.testvm2
|
||||||
|
del self.vms
|
||||||
|
del self.app
|
||||||
|
|
||||||
def test_000_contains(self):
|
def test_000_contains(self):
|
||||||
self.vms._dict = {1: self.testvm1}
|
self.vms._dict = {1: self.testvm1}
|
||||||
|
@ -90,93 +90,111 @@ class TC_01_Properties(qubes.tests.SystemTestCase):
|
|||||||
template=self.app.default_template,
|
template=self.app.default_template,
|
||||||
label='red')
|
label='red')
|
||||||
self.loop.run_until_complete(self.vm.create_on_disk())
|
self.loop.run_until_complete(self.vm.create_on_disk())
|
||||||
|
self.addCleanup(self.cleanup_props)
|
||||||
|
|
||||||
|
def cleanup_props(self):
|
||||||
|
del self.vm
|
||||||
|
|
||||||
@unittest.expectedFailure
|
@unittest.expectedFailure
|
||||||
def test_030_clone(self):
|
def test_030_clone(self):
|
||||||
testvm1 = self.app.add_new_vm(
|
try:
|
||||||
qubes.vm.appvm.AppVM,
|
testvm1 = self.app.add_new_vm(
|
||||||
name=self.make_vm_name("vm"),
|
qubes.vm.appvm.AppVM,
|
||||||
template=self.app.default_template,
|
name=self.make_vm_name("vm"),
|
||||||
label='red')
|
template=self.app.default_template,
|
||||||
self.loop.run_until_complete(testvm1.create_on_disk())
|
label='red')
|
||||||
testvm2 = self.app.add_new_vm(testvm1.__class__,
|
self.loop.run_until_complete(testvm1.create_on_disk())
|
||||||
name=self.make_vm_name("clone"),
|
testvm2 = self.app.add_new_vm(testvm1.__class__,
|
||||||
template=testvm1.template,
|
name=self.make_vm_name("clone"),
|
||||||
label='red')
|
template=testvm1.template,
|
||||||
testvm2.clone_properties(testvm1)
|
label='red')
|
||||||
self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
|
testvm2.clone_properties(testvm1)
|
||||||
self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
|
self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
|
||||||
self.assertIn('source', testvm1.volumes['root'].config)
|
self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
|
||||||
self.assertNotEquals(testvm2, None)
|
self.assertIn('source', testvm1.volumes['root'].config)
|
||||||
self.assertNotEquals(testvm2.volumes, {})
|
self.assertNotEquals(testvm2, None)
|
||||||
self.assertIn('source', testvm2.volumes['root'].config)
|
self.assertNotEquals(testvm2.volumes, {})
|
||||||
|
self.assertIn('source', testvm2.volumes['root'].config)
|
||||||
|
|
||||||
# qubes.xml reload
|
# qubes.xml reload
|
||||||
self.app.save()
|
self.app.save()
|
||||||
testvm1 = self.app.domains[testvm1.qid]
|
testvm1 = self.app.domains[testvm1.qid]
|
||||||
testvm2 = self.app.domains[testvm2.qid]
|
testvm2 = self.app.domains[testvm2.qid]
|
||||||
|
|
||||||
self.assertEqual(testvm1.label, testvm2.label)
|
self.assertEqual(testvm1.label, testvm2.label)
|
||||||
self.assertEqual(testvm1.netvm, testvm2.netvm)
|
self.assertEqual(testvm1.netvm, testvm2.netvm)
|
||||||
self.assertEqual(testvm1.property_is_default('netvm'),
|
self.assertEqual(testvm1.property_is_default('netvm'),
|
||||||
testvm2.property_is_default('netvm'))
|
testvm2.property_is_default('netvm'))
|
||||||
self.assertEqual(testvm1.kernel, testvm2.kernel)
|
self.assertEqual(testvm1.kernel, testvm2.kernel)
|
||||||
self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
|
self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
|
||||||
self.assertEqual(testvm1.property_is_default('kernel'),
|
self.assertEqual(testvm1.property_is_default('kernel'),
|
||||||
testvm2.property_is_default('kernel'))
|
testvm2.property_is_default('kernel'))
|
||||||
self.assertEqual(testvm1.property_is_default('kernelopts'),
|
self.assertEqual(testvm1.property_is_default('kernelopts'),
|
||||||
testvm2.property_is_default('kernelopts'))
|
testvm2.property_is_default('kernelopts'))
|
||||||
self.assertEqual(testvm1.memory, testvm2.memory)
|
self.assertEqual(testvm1.memory, testvm2.memory)
|
||||||
self.assertEqual(testvm1.maxmem, testvm2.maxmem)
|
self.assertEqual(testvm1.maxmem, testvm2.maxmem)
|
||||||
self.assertEqual(testvm1.devices, testvm2.devices)
|
self.assertEqual(testvm1.devices, testvm2.devices)
|
||||||
self.assertEqual(testvm1.include_in_backups,
|
self.assertEqual(testvm1.include_in_backups,
|
||||||
testvm2.include_in_backups)
|
testvm2.include_in_backups)
|
||||||
self.assertEqual(testvm1.default_user, testvm2.default_user)
|
self.assertEqual(testvm1.default_user, testvm2.default_user)
|
||||||
self.assertEqual(testvm1.features, testvm2.features)
|
self.assertEqual(testvm1.features, testvm2.features)
|
||||||
self.assertEqual(testvm1.firewall.rules,
|
self.assertEqual(testvm1.firewall.rules,
|
||||||
testvm2.firewall.rules)
|
testvm2.firewall.rules)
|
||||||
|
|
||||||
# now some non-default values
|
# now some non-default values
|
||||||
testvm1.netvm = None
|
testvm1.netvm = None
|
||||||
testvm1.label = 'orange'
|
testvm1.label = 'orange'
|
||||||
testvm1.memory = 512
|
testvm1.memory = 512
|
||||||
firewall = testvm1.firewall
|
firewall = testvm1.firewall
|
||||||
firewall.rules = [
|
firewall.rules = [
|
||||||
qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
|
qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
|
||||||
proto='tcp', dstports=22)]
|
proto='tcp', dstports=22)]
|
||||||
firewall.save()
|
firewall.save()
|
||||||
|
|
||||||
testvm3 = self.app.add_new_vm(testvm1.__class__,
|
testvm3 = self.app.add_new_vm(testvm1.__class__,
|
||||||
name=self.make_vm_name("clone2"),
|
name=self.make_vm_name("clone2"),
|
||||||
template=testvm1.template,
|
template=testvm1.template,
|
||||||
label='red',)
|
label='red',)
|
||||||
testvm3.clone_properties(testvm1)
|
testvm3.clone_properties(testvm1)
|
||||||
self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
|
self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
|
||||||
|
|
||||||
# qubes.xml reload
|
# qubes.xml reload
|
||||||
self.app.save()
|
self.app.save()
|
||||||
testvm1 = self.app.domains[testvm1.qid]
|
testvm1 = self.app.domains[testvm1.qid]
|
||||||
testvm3 = self.app.domains[testvm3.qid]
|
testvm3 = self.app.domains[testvm3.qid]
|
||||||
|
|
||||||
self.assertEqual(testvm1.label, testvm3.label)
|
self.assertEqual(testvm1.label, testvm3.label)
|
||||||
self.assertEqual(testvm1.netvm, testvm3.netvm)
|
self.assertEqual(testvm1.netvm, testvm3.netvm)
|
||||||
self.assertEqual(testvm1.property_is_default('netvm'),
|
self.assertEqual(testvm1.property_is_default('netvm'),
|
||||||
testvm3.property_is_default('netvm'))
|
testvm3.property_is_default('netvm'))
|
||||||
self.assertEqual(testvm1.kernel, testvm3.kernel)
|
self.assertEqual(testvm1.kernel, testvm3.kernel)
|
||||||
self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
|
self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
|
||||||
self.assertEqual(testvm1.property_is_default('kernel'),
|
self.assertEqual(testvm1.property_is_default('kernel'),
|
||||||
testvm3.property_is_default('kernel'))
|
testvm3.property_is_default('kernel'))
|
||||||
self.assertEqual(testvm1.property_is_default('kernelopts'),
|
self.assertEqual(testvm1.property_is_default('kernelopts'),
|
||||||
testvm3.property_is_default('kernelopts'))
|
testvm3.property_is_default('kernelopts'))
|
||||||
self.assertEqual(testvm1.memory, testvm3.memory)
|
self.assertEqual(testvm1.memory, testvm3.memory)
|
||||||
self.assertEqual(testvm1.maxmem, testvm3.maxmem)
|
self.assertEqual(testvm1.maxmem, testvm3.maxmem)
|
||||||
self.assertEqual(testvm1.devices, testvm3.devices)
|
self.assertEqual(testvm1.devices, testvm3.devices)
|
||||||
self.assertEqual(testvm1.include_in_backups,
|
self.assertEqual(testvm1.include_in_backups,
|
||||||
testvm3.include_in_backups)
|
testvm3.include_in_backups)
|
||||||
self.assertEqual(testvm1.default_user, testvm3.default_user)
|
self.assertEqual(testvm1.default_user, testvm3.default_user)
|
||||||
self.assertEqual(testvm1.features, testvm3.features)
|
self.assertEqual(testvm1.features, testvm3.features)
|
||||||
self.assertEqual(testvm1.firewall.rules,
|
self.assertEqual(testvm1.firewall.rules,
|
||||||
testvm2.firewall.rules)
|
testvm2.firewall.rules)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
del testvm1
|
||||||
|
except NameError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
del testvm2
|
||||||
|
except NameError:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
del testvm3
|
||||||
|
except NameError:
|
||||||
|
pass
|
||||||
|
|
||||||
def test_020_name_conflict_app(self):
|
def test_020_name_conflict_app(self):
|
||||||
# TODO decide what exception should be here
|
# TODO decide what exception should be here
|
||||||
@ -322,12 +340,16 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
|
|||||||
super(TC_03_QvmRevertTemplateChanges, self).setUp()
|
super(TC_03_QvmRevertTemplateChanges, self).setUp()
|
||||||
self.init_default_template()
|
self.init_default_template()
|
||||||
|
|
||||||
|
def cleanup_template(self):
|
||||||
|
del self.test_template
|
||||||
|
|
||||||
def setup_pv_template(self):
|
def setup_pv_template(self):
|
||||||
self.test_template = self.app.add_new_vm(
|
self.test_template = self.app.add_new_vm(
|
||||||
qubes.vm.templatevm.TemplateVM,
|
qubes.vm.templatevm.TemplateVM,
|
||||||
name=self.make_vm_name("pv-clone"),
|
name=self.make_vm_name("pv-clone"),
|
||||||
label='red'
|
label='red'
|
||||||
)
|
)
|
||||||
|
self.addCleanup(self.cleanup_template)
|
||||||
self.test_template.clone_properties(self.app.default_template)
|
self.test_template.clone_properties(self.app.default_template)
|
||||||
self.loop.run_until_complete(
|
self.loop.run_until_complete(
|
||||||
self.test_template.clone_disk_files(self.app.default_template))
|
self.test_template.clone_disk_files(self.app.default_template))
|
||||||
@ -340,6 +362,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
|
|||||||
label='red',
|
label='red',
|
||||||
virt_mode='hvm',
|
virt_mode='hvm',
|
||||||
)
|
)
|
||||||
|
self.addCleanup(self.cleanup_template)
|
||||||
self.loop.run_until_complete(self.test_template.create_on_disk())
|
self.loop.run_until_complete(self.test_template.create_on_disk())
|
||||||
self.app.save()
|
self.app.save()
|
||||||
|
|
||||||
|
@ -75,6 +75,8 @@ class TC_00_FilePool(qubes.tests.QubesTestCase):
|
|||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.app.cleanup()
|
self.app.cleanup()
|
||||||
|
self.app.close()
|
||||||
|
del self.app
|
||||||
super(TC_00_FilePool, self).tearDown()
|
super(TC_00_FilePool, self).tearDown()
|
||||||
|
|
||||||
def test000_default_pool_dir(self):
|
def test000_default_pool_dir(self):
|
||||||
@ -120,6 +122,8 @@ class TC_01_FileVolumes(qubes.tests.QubesTestCase):
|
|||||||
""" Remove the file based storage pool after testing """
|
""" Remove the file based storage pool after testing """
|
||||||
self.app.remove_pool("test-pool")
|
self.app.remove_pool("test-pool")
|
||||||
self.app.cleanup()
|
self.app.cleanup()
|
||||||
|
self.app.close()
|
||||||
|
del self.app
|
||||||
super(TC_01_FileVolumes, self).tearDown()
|
super(TC_01_FileVolumes, self).tearDown()
|
||||||
shutil.rmtree(self.POOL_DIR, ignore_errors=True)
|
shutil.rmtree(self.POOL_DIR, ignore_errors=True)
|
||||||
|
|
||||||
@ -328,6 +332,8 @@ class TC_03_FilePool(qubes.tests.QubesTestCase):
|
|||||||
""" Remove the file based storage pool after testing """
|
""" Remove the file based storage pool after testing """
|
||||||
self.app.remove_pool("test-pool")
|
self.app.remove_pool("test-pool")
|
||||||
self.app.cleanup()
|
self.app.cleanup()
|
||||||
|
self.app.close()
|
||||||
|
del self.app
|
||||||
self.base_dir_patch3.stop()
|
self.base_dir_patch3.stop()
|
||||||
self.base_dir_patch2.stop()
|
self.base_dir_patch2.stop()
|
||||||
self.base_dir_patch.stop()
|
self.base_dir_patch.stop()
|
||||||
|
@ -80,6 +80,8 @@ class TC_01_KernelVolumes(qubes.tests.QubesTestCase):
|
|||||||
""" Remove the file based storage pool after testing """
|
""" Remove the file based storage pool after testing """
|
||||||
self.app.remove_pool("test-pool")
|
self.app.remove_pool("test-pool")
|
||||||
self.app.cleanup()
|
self.app.cleanup()
|
||||||
|
self.app.close()
|
||||||
|
del self.app
|
||||||
super(TC_01_KernelVolumes, self).tearDown()
|
super(TC_01_KernelVolumes, self).tearDown()
|
||||||
shutil.rmtree(self.POOL_DIR, ignore_errors=True)
|
shutil.rmtree(self.POOL_DIR, ignore_errors=True)
|
||||||
|
|
||||||
@ -233,6 +235,8 @@ class TC_03_KernelPool(qubes.tests.QubesTestCase):
|
|||||||
""" Remove the file based storage pool after testing """
|
""" Remove the file based storage pool after testing """
|
||||||
self.app.remove_pool("test-pool")
|
self.app.remove_pool("test-pool")
|
||||||
self.app.cleanup()
|
self.app.cleanup()
|
||||||
|
self.app.close()
|
||||||
|
del self.app
|
||||||
super(TC_03_KernelPool, self).tearDown()
|
super(TC_03_KernelPool, self).tearDown()
|
||||||
shutil.rmtree(self.POOL_DIR, ignore_errors=True)
|
shutil.rmtree(self.POOL_DIR, ignore_errors=True)
|
||||||
if os.path.exists('/tmp/qubes-test'):
|
if os.path.exists('/tmp/qubes-test'):
|
||||||
|
@ -39,11 +39,16 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase):
|
|||||||
self.vm = qubes.vm.adminvm.AdminVM(self.app,
|
self.vm = qubes.vm.adminvm.AdminVM(self.app,
|
||||||
xml=None)
|
xml=None)
|
||||||
mock_qdb.assert_called_once_with('dom0')
|
mock_qdb.assert_called_once_with('dom0')
|
||||||
|
self.addCleanup(self.cleanup_adminvm)
|
||||||
except: # pylint: disable=bare-except
|
except: # pylint: disable=bare-except
|
||||||
if self.id().endswith('.test_000_init'):
|
if self.id().endswith('.test_000_init'):
|
||||||
raise
|
raise
|
||||||
self.skipTest('setup failed')
|
self.skipTest('setup failed')
|
||||||
|
|
||||||
|
def cleanup_adminvm(self):
|
||||||
|
self.vm.close()
|
||||||
|
del self.vm
|
||||||
|
|
||||||
def test_000_init(self):
|
def test_000_init(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -69,12 +69,21 @@ class TC_90_AppVM(qubes.tests.vm.qubesvm.QubesVMTestsMixin,
|
|||||||
qid=1, name=qubes.tests.VMPREFIX + 'template')
|
qid=1, name=qubes.tests.VMPREFIX + 'template')
|
||||||
self.app.domains[self.template.name] = self.template
|
self.app.domains[self.template.name] = self.template
|
||||||
self.app.domains[self.template] = self.template
|
self.app.domains[self.template] = self.template
|
||||||
|
self.addCleanup(self.cleanup_appvm)
|
||||||
|
|
||||||
|
def cleanup_appvm(self):
|
||||||
|
self.template.close()
|
||||||
|
del self.template
|
||||||
|
self.app.domains.clear()
|
||||||
|
self.app.pools.clear()
|
||||||
|
|
||||||
def get_vm(self, **kwargs):
|
def get_vm(self, **kwargs):
|
||||||
return qubes.vm.appvm.AppVM(self.app, None,
|
vm = qubes.vm.appvm.AppVM(self.app, None,
|
||||||
qid=2, name=qubes.tests.VMPREFIX + 'test',
|
qid=2, name=qubes.tests.VMPREFIX + 'test',
|
||||||
template=self.template,
|
template=self.template,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
|
self.addCleanup(vm.close)
|
||||||
|
return vm
|
||||||
|
|
||||||
def test_000_init(self):
|
def test_000_init(self):
|
||||||
self.get_vm()
|
self.get_vm()
|
||||||
|
@ -57,6 +57,15 @@ class TC_00_DispVM(qubes.tests.QubesTestCase):
|
|||||||
name='test-vm', template=self.template, label='red')
|
name='test-vm', template=self.template, label='red')
|
||||||
self.app.domains[self.appvm.name] = self.appvm
|
self.app.domains[self.appvm.name] = self.appvm
|
||||||
self.app.domains[self.appvm] = self.appvm
|
self.app.domains[self.appvm] = self.appvm
|
||||||
|
self.addCleanup(self.cleanup_dispvm)
|
||||||
|
|
||||||
|
def cleanup_dispvm(self):
|
||||||
|
self.template.close()
|
||||||
|
self.appvm.close()
|
||||||
|
del self.template
|
||||||
|
del self.appvm
|
||||||
|
self.app.domains.clear()
|
||||||
|
self.app.pools.clear()
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def mock_coro(self, *args, **kwargs):
|
def mock_coro(self, *args, **kwargs):
|
||||||
|
@ -52,6 +52,18 @@ class TC_00_NetVMMixin(
|
|||||||
self.app.domains._dict[domain.qid] = domain
|
self.app.domains._dict[domain.qid] = domain
|
||||||
self.app.default_netvm = self.netvm1
|
self.app.default_netvm = self.netvm1
|
||||||
self.app.default_fw_netvm = self.netvm1
|
self.app.default_fw_netvm = self.netvm1
|
||||||
|
self.addCleanup(self.cleanup_netvms)
|
||||||
|
|
||||||
|
def cleanup_netvms(self):
|
||||||
|
self.netvm1.close()
|
||||||
|
self.netvm2.close()
|
||||||
|
self.nonetvm.close()
|
||||||
|
self.app.domains.close()
|
||||||
|
del self.netvm1
|
||||||
|
del self.netvm2
|
||||||
|
del self.nonetvm
|
||||||
|
del self.app.default_netvm
|
||||||
|
del self.app.default_fw_netvm
|
||||||
|
|
||||||
|
|
||||||
@qubes.tests.skipUnlessDom0
|
@qubes.tests.skipUnlessDom0
|
||||||
@ -81,6 +93,7 @@ class TC_00_NetVMMixin(
|
|||||||
def test_143_netvm_loopback(self):
|
def test_143_netvm_loopback(self):
|
||||||
vm = self.get_vm()
|
vm = self.get_vm()
|
||||||
self.app.domains = {1: vm, vm: vm}
|
self.app.domains = {1: vm, vm: vm}
|
||||||
|
self.addCleanup(self.app.domains.clear)
|
||||||
self.assertPropertyInvalidValue(vm, 'netvm', vm)
|
self.assertPropertyInvalidValue(vm, 'netvm', vm)
|
||||||
|
|
||||||
def test_150_ip(self):
|
def test_150_ip(self):
|
||||||
|
@ -122,9 +122,11 @@ class QubesVMTestsMixin(object):
|
|||||||
self.app.vmm.offline_mode = True
|
self.app.vmm.offline_mode = True
|
||||||
|
|
||||||
def get_vm(self, **kwargs):
|
def get_vm(self, **kwargs):
|
||||||
return qubes.vm.qubesvm.QubesVM(self.app, None,
|
vm = qubes.vm.qubesvm.QubesVM(self.app, None,
|
||||||
qid=1, name=qubes.tests.VMPREFIX + 'test',
|
qid=1, name=qubes.tests.VMPREFIX + 'test',
|
||||||
**kwargs)
|
**kwargs)
|
||||||
|
self.addCleanup(vm.close)
|
||||||
|
return vm
|
||||||
|
|
||||||
def assertPropertyValue(self, vm, prop_name, set_value, expected_value,
|
def assertPropertyValue(self, vm, prop_name, set_value, expected_value,
|
||||||
expected_xml_content=None):
|
expected_xml_content=None):
|
||||||
|
@ -384,14 +384,16 @@ class BaseVM(qubes.PropertyHolder):
|
|||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
proprepr = []
|
proprepr = []
|
||||||
for prop in self.property_list():
|
for prop in self.property_list():
|
||||||
|
if prop.__name__ in ('name', 'qid'):
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
proprepr.append('{}={!s}'.format(
|
proprepr.append('{}={!s}'.format(
|
||||||
prop.__name__, getattr(self, prop.__name__)))
|
prop.__name__, getattr(self, prop.__name__)))
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return '<{} object at {:#x} {}>'.format(
|
return '<{} at {:#x} name={!r} qid={!r} {}>'.format(type(self).__name__,
|
||||||
self.__class__.__name__, id(self), ' '.join(proprepr))
|
id(self), self.name, self.qid, ' '.join(proprepr))
|
||||||
|
|
||||||
#
|
#
|
||||||
# xml serialising methods
|
# xml serialising methods
|
||||||
|
@ -710,6 +710,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
|
|||||||
if self._qdb_connection is not None:
|
if self._qdb_connection is not None:
|
||||||
self._qdb_connection.close()
|
self._qdb_connection.close()
|
||||||
self._qdb_connection = None
|
self._qdb_connection = None
|
||||||
|
if self._libvirt_domain is not None:
|
||||||
|
self._libvirt_domain = None
|
||||||
super().close()
|
super().close()
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
|
@ -11,6 +11,12 @@ added as needed.
|
|||||||
class libvirtError(Exception):
|
class libvirtError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class virConnect:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class virDomain:
|
||||||
|
pass
|
||||||
|
|
||||||
def openReadOnly(*args, **kwargs):
|
def openReadOnly(*args, **kwargs):
|
||||||
raise libvirtError('mock module, always raises')
|
raise libvirtError('mock module, always raises')
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user