events: simplify wait_for_domain_shutdown coroutine

1. Handle timeout externally - using asyncio.wait_for.
2. Add support for waiting for multiple VMs.
This commit is contained in:
Marek Marczykowski-Górecki 2017-07-05 14:12:27 +02:00
parent 43ef244eaa
commit 2052b32202
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 29 additions and 33 deletions

View File

@ -32,43 +32,37 @@ class Interrupt(Exception):
'''Interrupt events processing''' '''Interrupt events processing'''
def interrupt_on_vm_shutdown(vm, subject, event): def interrupt_on_vm_shutdown(vms, subject, event):
'''Interrupt events processing when given VM was shutdown''' '''Interrupt events processing when given VM was shutdown'''
# pylint: disable=unused-argument # pylint: disable=unused-argument
if event == 'connection-established': if event == 'connection-established':
if vm.is_halted(): if all(vm.is_halted() for vm in vms):
raise Interrupt raise Interrupt
elif event == 'domain-shutdown' and vm == subject: elif event == 'domain-shutdown' and subject in vms:
vms.remove(subject)
if not vms:
raise Interrupt raise Interrupt
@asyncio.coroutine @asyncio.coroutine
def wait_for_domain_shutdown(vm, timeout, loop=None): def wait_for_domain_shutdown(vms):
''' Helper function to wait for domain shutdown. ''' Helper function to wait for domain shutdown.
This function wait for domain shutdown, but do not initiate the shutdown This function wait for domain shutdown, but do not initiate the shutdown
itself. itself.
:param vm: QubesVM object to wait for shutdown on :param vms: QubesVM object collection to wait for shutdown on
:param timeout: Timeout in seconds, use 0 for no timeout
:param loop: asyncio event loop
''' '''
if loop is None: if not vms:
loop = asyncio.get_event_loop() return
events = qubesadmin.events.EventsDispatcher(vm.app) app = list(vms)[0].app
vms = set(vms)
events = qubesadmin.events.EventsDispatcher(app)
events.add_handler('domain-shutdown', events.add_handler('domain-shutdown',
functools.partial(interrupt_on_vm_shutdown, vm)) functools.partial(interrupt_on_vm_shutdown, vms))
events.add_handler('connection-established', events.add_handler('connection-established',
functools.partial(interrupt_on_vm_shutdown, vm)) functools.partial(interrupt_on_vm_shutdown, vms))
events_task = asyncio.ensure_future(events.listen_for_events(),
loop=loop)
if timeout:
# pylint: disable=no-member
loop.call_later(timeout, events_task.cancel)
try: try:
yield from events_task yield from events.listen_for_events()
except asyncio.CancelledError:
raise qubesadmin.exc.QubesVMShutdownTimeout(
'VM %s shutdown timeout expired', vm.name)
except Interrupt: except Interrupt:
pass pass

View File

@ -226,6 +226,10 @@ class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
self.app.domains.clear_cache() self.app.domains.clear_cache()
return self.app.domains['test-vm'] return self.app.domains['test-vm']
@asyncio.coroutine
def wait_for_shutdown(self, vm):
pass
@mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
@mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
def test_020_post_install(self, mock_import_root_img, def test_020_post_install(self, mock_import_root_img,
@ -250,6 +254,7 @@ class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
'qubesadmin.events.utils.wait_for_domain_shutdown') 'qubesadmin.events.utils.wait_for_domain_shutdown')
self.addCleanup(patch_domain_shutdown.stop) self.addCleanup(patch_domain_shutdown.stop)
mock_domain_shutdown = patch_domain_shutdown.start() mock_domain_shutdown = patch_domain_shutdown.start()
mock_domain_shutdown.side_effect = self.wait_for_shutdown
else: else:
self.app.expected_calls[ self.app.expected_calls[
('test-vm', 'admin.vm.List', None, None)] = \ ('test-vm', 'admin.vm.List', None, None)] = \
@ -267,18 +272,14 @@ class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
mock_import_appmenus.assert_called_once_with(self.app.domains[ mock_import_appmenus.assert_called_once_with(self.app.domains[
'test-vm'], self.source_dir.name) 'test-vm'], self.source_dir.name)
if qubesadmin.tools.qvm_template_postprocess.have_events: if qubesadmin.tools.qvm_template_postprocess.have_events:
mock_domain_shutdown.assert_called_once_with(self.app.domains[ mock_domain_shutdown.assert_called_once_with([self.app.domains[
'test-vm'], 60) 'test-vm']])
self.assertEqual(self.app.service_calls, [ self.assertEqual(self.app.service_calls, [
('test-vm', 'qubes.PostInstall', {}), ('test-vm', 'qubes.PostInstall', {}),
('test-vm', 'qubes.PostInstall', b''), ('test-vm', 'qubes.PostInstall', b''),
]) ])
self.assertAllCalled() self.assertAllCalled()
@asyncio.coroutine
def wait_for_shutdown(self, vm, timeout):
pass
@mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
@mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
def test_021_post_install_reinstall(self, mock_import_root_img, def test_021_post_install_reinstall(self, mock_import_root_img,
@ -320,8 +321,8 @@ class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
mock_import_appmenus.assert_called_once_with(self.app.domains[ mock_import_appmenus.assert_called_once_with(self.app.domains[
'test-vm'], self.source_dir.name) 'test-vm'], self.source_dir.name)
if qubesadmin.tools.qvm_template_postprocess.have_events: if qubesadmin.tools.qvm_template_postprocess.have_events:
mock_domain_shutdown.assert_called_once_with(self.app.domains[ mock_domain_shutdown.assert_called_once_with([self.app.domains[
'test-vm'], 60) 'test-vm']])
self.assertEqual(self.app.service_calls, [ self.assertEqual(self.app.service_calls, [
('test-vm', 'qubes.PostInstall', {}), ('test-vm', 'qubes.PostInstall', {}),
('test-vm', 'qubes.PostInstall', b''), ('test-vm', 'qubes.PostInstall', b''),

View File

@ -164,9 +164,10 @@ def call_postinstall_service(vm):
if have_events: if have_events:
try: try:
# pylint: disable=no-member # pylint: disable=no-member
yield from qubesadmin.events.utils.wait_for_domain_shutdown( yield from asyncio.wait_for(
vm, qubesadmin.config.defaults['shutdown_timeout']) qubesadmin.events.utils.wait_for_domain_shutdown([vm]),
except qubesadmin.exc.QubesVMShutdownTimeout: qubesadmin.config.defaults['shutdown_timeout'])
except asyncio.TimeoutError:
vm.kill() vm.kill()
else: else:
timeout = qubesadmin.config.defaults['shutdown_timeout'] timeout = qubesadmin.config.defaults['shutdown_timeout']