storage: factor out _wait_and_reraise(); fix clone/create

_wait_and_reraise() is similar to asyncio.gather(), but it preserves the
current behavior of waiting for all futures and only _then_ reraising
the first exception (if there is any) in line.

Also switch Storage.create() and Storage.clone() to _wait_and_reraise().
Previously, they called asyncio.wait() and implicitly swallowed all
exceptions.
This commit is contained in:
Rusty Bird 2018-09-11 23:50:22 +00:00
parent d33bd3f2b6
commit d181bf1aa4
No known key found for this signature in database
GPG Key ID: 469D78F47AAF2ADF

View File

@ -509,8 +509,7 @@ class Storage:
ret = volume.create()
if asyncio.iscoroutine(ret):
coros.append(ret)
if coros:
yield from asyncio.wait(coros)
yield from _wait_and_reraise(coros)
os.umask(old_umask)
@ -552,7 +551,7 @@ class Storage:
self.vm.volumes = {}
with VmCreationManager(self.vm):
yield from asyncio.wait([self.clone_volume(src_vm, vol_name)
yield from _wait_and_reraise([self.clone_volume(src_vm, vol_name)
for vol_name in self.vm.volume_config.keys()])
@property
@ -584,11 +583,7 @@ class Storage:
ret = volume.verify()
if asyncio.iscoroutine(ret):
futures.append(ret)
if futures:
done, _ = yield from asyncio.wait(futures)
for task in done:
# re-raise any exception from async task
task.result()
yield from _wait_and_reraise(futures)
self.vm.fire_event('domain-verify-files')
return True
@ -608,14 +603,10 @@ class Storage:
except (IOError, OSError) as e:
self.vm.log.exception("Failed to remove volume %s", name, e)
if futures:
try:
done, _ = yield from asyncio.wait(futures)
for task in done:
# re-raise any exception from async task
task.result()
except (IOError, OSError) as e:
self.vm.log.exception("Failed to remove some volume", e)
try:
yield from _wait_and_reraise(futures)
except (IOError, OSError) as e:
self.vm.log.exception("Failed to remove some volume", e)
@asyncio.coroutine
def start(self):
@ -626,11 +617,7 @@ class Storage:
if asyncio.iscoroutine(ret):
futures.append(ret)
if futures:
done, _ = yield from asyncio.wait(futures)
for task in done:
# re-raise any exception from async task
task.result()
yield from _wait_and_reraise(futures)
@asyncio.coroutine
def stop(self):
@ -641,11 +628,7 @@ class Storage:
if asyncio.iscoroutine(ret):
futures.append(ret)
if futures:
done, _ = yield from asyncio.wait(futures)
for task in done:
# re-raise any exception from async task
task.result()
yield from _wait_and_reraise(futures)
def unused_frontend(self):
''' Find an unused device name '''
@ -845,6 +828,14 @@ class Pool:
return NotImplementedError(msg)
@asyncio.coroutine
def _wait_and_reraise(futures):
if futures:
done, _ = yield from asyncio.wait(futures)
for task in done: # (re-)raise first exception in line
task.result()
def _sanitize_config(config):
''' Helper function to convert types to appropriate strings
''' # FIXME: find another solution for serializing basic types