Fix asyncio.Lock usage for Python 3.9+

'with (yield from alock):' is incompatible with Python 3.9+.

Change it to 'async with alock:', and then change the affected functions
to 'async def'.

This makes the test suite pass again in a Fedora 33 VM.

QubesOS/qubes-issues#2738
This commit is contained in:
Rusty Bird 2021-02-11 11:17:41 +00:00
parent 7a91af9b2c
commit a7fe59449a
No known key found for this signature in database
GPG Key ID: 469D78F47AAF2ADF
4 changed files with 59 additions and 67 deletions

View File

@ -1191,13 +1191,12 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@qubes.api.method('admin.vm.Remove', no_payload=True,
scope='global', write=True)
@asyncio.coroutine
def vm_remove(self):
async def vm_remove(self):
self.enforce(not self.arg)
self.fire_event_for_permission()
with (yield from self.dest.startup_lock):
async with self.dest.startup_lock:
if not self.dest.is_halted():
raise qubes.exc.QubesVMNotHaltedError(self.dest)
@ -1207,7 +1206,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
del self.app.domains[self.dest]
try:
yield from self.dest.remove_from_disk()
await self.dest.remove_from_disk()
except: # pylint: disable=bare-except
self.app.log.exception('Error while removing VM \'%s\' files',
self.dest.name)

View File

@ -168,11 +168,10 @@ class Volume:
>>>def start(self):
>>> pass
'''
@asyncio.coroutine
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
with (yield from self._lock): # pylint: disable=protected-access
return (yield from method(self, *args, **kwargs))
async def wrapper(self, *args, **kwargs):
async with self._lock: # pylint: disable=protected-access
return await method(self, *args, **kwargs)
return wrapper
def create(self):

View File

@ -242,15 +242,14 @@ class CallbackPool(qubes.storage.Pool):
cmd = self._cb_conf.get('cmd')
return bool(cmd and cmd != '-')
@asyncio.coroutine
def _init(self, callback=True):
async def _init(self, callback=True):
''' Late storage initialization on first use for e.g. decryption on first usage request.
:param callback: Whether to trigger the `pre_sinit` callback or not.
'''
with (yield from self._cb_init_lock):
async with self._cb_init_lock:
if self._cb_requires_init:
if callback:
yield from self._callback('pre_sinit')
await self._callback('pre_sinit')
self._cb_requires_init = False
@asyncio.coroutine

View File

@ -1060,12 +1060,11 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# methods for changing domain state
#
@asyncio.coroutine
def _ensure_shutdown_handled(self):
async def _ensure_shutdown_handled(self):
"""Make sure previous shutdown is fully handled.
MUST NOT be called when domain is running.
"""
with (yield from self._domain_stopped_lock):
async with self._domain_stopped_lock:
# Don't accept any new stopped event's till a new VM has been
# created. If we didn't received any stopped event or it wasn't
# handled yet we will handle this in the next lines.
@ -1086,11 +1085,10 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# twice if an exception gets thrown.
self._domain_stopped_event_handled = True
yield from self.fire_event_async('domain-stopped')
yield from self.fire_event_async('domain-shutdown')
await self.fire_event_async('domain-stopped')
await self.fire_event_async('domain-shutdown')
@asyncio.coroutine
def start(self, start_guid=True, notify_function=None,
async def start(self, start_guid=True, notify_function=None,
mem_required=None):
"""Start domain
@ -1099,7 +1097,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
:param int mem_required: FIXME
"""
with (yield from self.startup_lock):
async with self.startup_lock:
# check if domain wasn't removed in the meantime
if self not in self.app.domains:
raise qubes.exc.QubesVMNotFoundError(self.name)
@ -1108,19 +1106,19 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if self.get_power_state() != 'Halted':
return self
yield from self._ensure_shutdown_handled()
await self._ensure_shutdown_handled()
self.log.info('Starting {}'.format(self.name))
try:
yield from self.fire_event_async('domain-pre-start',
pre_event=True,
start_guid=start_guid,
mem_required=mem_required)
await self.fire_event_async('domain-pre-start',
pre_event=True,
start_guid=start_guid,
mem_required=mem_required)
except Exception as exc:
self.log.error('Start failed: %s', str(exc))
yield from self.fire_event_async('domain-start-failed',
reason=str(exc))
await self.fire_event_async('domain-start-failed',
reason=str(exc))
raise
qmemman_client = None
@ -1135,26 +1133,26 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if self.virt_mode == 'pvh' and not self.kernel:
raise qubes.exc.QubesException(
'virt_mode PVH require kernel to be set')
yield from self.storage.verify()
await self.storage.verify()
if self.netvm is not None:
# pylint: disable = no-member
if self.netvm.qid != 0:
if not self.netvm.is_running():
yield from self.netvm.start(
await self.netvm.start(
start_guid=start_guid,
notify_function=notify_function)
qmemman_client = yield from asyncio.get_event_loop(). \
qmemman_client = await asyncio.get_event_loop(). \
run_in_executor(None, self.request_memory, mem_required)
yield from self.storage.start()
await self.storage.start()
except Exception as exc:
self.log.error('Start failed: %s', str(exc))
# let anyone receiving domain-pre-start know that startup failed
yield from self.fire_event_async('domain-start-failed',
reason=str(exc))
await self.fire_event_async('domain-start-failed',
reason=str(exc))
if qmemman_client:
qmemman_client.close()
raise
@ -1179,16 +1177,16 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
'Failed to start an HVM qube with PCI devices assigned '
'- hardware does not support IOMMU/VT-d/AMD-Vi')
self.log.error('Start failed: %s', str(exc))
yield from self.fire_event_async('domain-start-failed',
reason=str(exc))
yield from self.storage.stop()
await self.fire_event_async('domain-start-failed',
reason=str(exc))
await self.storage.stop()
raise exc
except Exception as exc:
self.log.error('Start failed: %s', str(exc))
# let anyone receiving domain-pre-start know that startup failed
yield from self.fire_event_async('domain-start-failed',
reason=str(exc))
yield from self.storage.stop()
await self.fire_event_async('domain-start-failed',
reason=str(exc))
await self.storage.stop()
raise
finally:
@ -1199,21 +1197,21 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self._domain_stopped_event_handled = False
try:
yield from self.fire_event_async('domain-spawn',
start_guid=start_guid)
await self.fire_event_async('domain-spawn',
start_guid=start_guid)
self.log.info('Setting Qubes DB info for the VM')
yield from self.start_qubesdb()
await self.start_qubesdb()
self.create_qdb_entries()
self.start_qdb_watch()
self.log.warning('Activating the {} VM'.format(self.name))
self.libvirt_domain.resume()
yield from self.start_qrexec_daemon()
await self.start_qrexec_daemon()
yield from self.fire_event_async('domain-start',
start_guid=start_guid)
await self.fire_event_async('domain-start',
start_guid=start_guid)
except Exception as exc: # pylint: disable=bare-except
self.log.error('Start failed: %s', str(exc))
@ -1221,13 +1219,13 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# raised in self._kill_locked(), because the vm is not
# running or paused
try:
yield from self._kill_locked()
await self._kill_locked()
except qubes.exc.QubesVMNotStartedError:
pass
# let anyone receiving domain-pre-start know that startup failed
yield from self.fire_event_async('domain-start-failed',
reason=str(exc))
await self.fire_event_async('domain-start-failed',
reason=str(exc))
raise
return self
@ -1255,9 +1253,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self._domain_stopped_future = \
asyncio.ensure_future(self._domain_stopped_coro())
@asyncio.coroutine
def _domain_stopped_coro(self):
with (yield from self._domain_stopped_lock):
async def _domain_stopped_coro(self):
async with self._domain_stopped_lock:
assert not self._domain_stopped_event_handled
# Set this immediately such that we don't generate events twice if
@ -1265,9 +1262,9 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self._domain_stopped_event_handled = True
while self.get_power_state() == 'Dying':
yield from asyncio.sleep(0.25)
yield from self.fire_event_async('domain-stopped')
yield from self.fire_event_async('domain-shutdown')
await asyncio.sleep(0.25)
await self.fire_event_async('domain-stopped')
await self.fire_event_async('domain-shutdown')
@qubes.events.handler('domain-stopped')
@asyncio.coroutine
@ -1282,8 +1279,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.fire_event('property-reset:stubdom_xid', name='stubdom_xid')
self.fire_event('property-reset:start_time', name='start_time')
@asyncio.coroutine
def shutdown(self, force=False, wait=False, timeout=None):
async def shutdown(self, force=False, wait=False, timeout=None):
"""Shutdown domain.
:param force: ignored
@ -1298,8 +1294,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
raise qubes.exc.QubesVMNotStartedError(self)
try:
yield from self.fire_event_async('domain-pre-shutdown',
pre_event=True, force=force)
await self.fire_event_async('domain-pre-shutdown',
pre_event=True, force=force)
self.libvirt_domain.shutdown()
@ -1307,23 +1303,22 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if timeout is None:
timeout = self.shutdown_timeout
while timeout > 0 and not self.is_halted():
yield from asyncio.sleep(0.25)
await asyncio.sleep(0.25)
timeout -= 0.25
with (yield from self.startup_lock):
async with self.startup_lock:
if self.is_halted():
# make sure all shutdown tasks are completed
yield from self._ensure_shutdown_handled()
await self._ensure_shutdown_handled()
else:
raise qubes.exc.QubesVMShutdownTimeoutError(self)
except Exception as ex:
yield from self.fire_event_async('domain-shutdown-failed',
reason=str(ex))
await self.fire_event_async('domain-shutdown-failed',
reason=str(ex))
raise
return self
@asyncio.coroutine
def kill(self):
async def kill(self):
"""Forcefully shutdown (destroy) domain.
:raises qubes.exc.QubesVMNotStartedError: \
@ -1333,8 +1328,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if not self.is_running() and not self.is_paused():
raise qubes.exc.QubesVMNotStartedError(self)
with (yield from self.startup_lock):
yield from self._kill_locked()
async with self.startup_lock:
await self._kill_locked()
return self