From 5b52d23478d3f96488bf928825bbc59fb1302cbf Mon Sep 17 00:00:00 2001 From: Rusty Bird Date: Fri, 28 Jun 2019 10:29:25 +0000 Subject: [PATCH] factor out utils.void_coros_maybe() --- qubes/storage/__init__.py | 66 ++++++++++----------------------------- qubes/utils.py | 13 ++++++++ 2 files changed, 30 insertions(+), 49 deletions(-) diff --git a/qubes/storage/__init__.py b/qubes/storage/__init__.py index 6c878945..c7911d05 100644 --- a/qubes/storage/__init__.py +++ b/qubes/storage/__init__.py @@ -507,16 +507,8 @@ class Storage: def create(self): ''' Creates volumes on disk ''' old_umask = os.umask(0o002) - - coros = [] - for volume in self.vm.volumes.values(): - # launch the operation, if it's asynchronous, then append to wait - # for them at the end - ret = volume.create() - if asyncio.iscoroutine(ret): - coros.append(ret) - yield from _wait_and_reraise(coros) - + yield from qubes.utils.void_coros_maybe( + vol.create() for vol in self.vm.volumes.values()) os.umask(old_umask) @asyncio.coroutine @@ -544,8 +536,9 @@ class Storage: self.vm.volumes = {} with VmCreationManager(self.vm): - yield from _wait_and_reraise([self.clone_volume(src_vm, vol_name) - for vol_name in self.vm.volume_config.keys()]) + yield from qubes.utils.void_coros_maybe( + self.clone_volume(src_vm, vol_name) + for vol_name in self.vm.volume_config.keys()) @property def outdated_volumes(self): @@ -571,12 +564,8 @@ class Storage: raise qubes.exc.QubesVMError( self.vm, 'VM directory does not exist: {}'.format(self.vm.dir_path)) - futures = [] - for volume in self.vm.volumes.values(): - ret = volume.verify() - if asyncio.iscoroutine(ret): - futures.append(ret) - yield from _wait_and_reraise(futures) + yield from qubes.utils.void_coros_maybe( + vol.verify() for vol in self.vm.volumes.values()) self.vm.fire_event('domain-verify-files') return True @@ -586,42 +575,29 @@ class Storage: Errors on removal are catched and logged. ''' - futures = [] - for name, volume in self.vm.volumes.items(): - self.log.info('Removing volume %s: %s' % (name, volume.vid)) + results = [] + for vol in self.vm.volumes.values(): + self.log.info('Removing volume %s: %s' % (vol.name, vol.vid)) try: - ret = volume.remove() - if asyncio.iscoroutine(ret): - futures.append(ret) + results.append(vol.remove()) except (IOError, OSError) as e: - self.vm.log.exception("Failed to remove volume %s", name, e) - + self.vm.log.exception("Failed to remove volume %s", vol.name, e) try: - yield from _wait_and_reraise(futures) + yield from qubes.utils.void_coros_maybe(results) except (IOError, OSError) as e: self.vm.log.exception("Failed to remove some volume", e) @asyncio.coroutine def start(self): ''' Execute the start method on each volume ''' - futures = [] - for volume in self.vm.volumes.values(): - ret = volume.start() - if asyncio.iscoroutine(ret): - futures.append(ret) - - yield from _wait_and_reraise(futures) + yield from qubes.utils.void_coros_maybe( + vol.start() for vol in self.vm.volumes.values()) @asyncio.coroutine def stop(self): ''' Execute the stop method on each volume ''' - futures = [] - for volume in self.vm.volumes.values(): - ret = volume.stop() - if asyncio.iscoroutine(ret): - futures.append(ret) - - yield from _wait_and_reraise(futures) + yield from qubes.utils.void_coros_maybe( + vol.stop() for vol in self.vm.volumes.values()) def unused_frontend(self): ''' Find an unused device name ''' @@ -826,14 +802,6 @@ class Pool: return NotImplementedError(msg) -@asyncio.coroutine -def _wait_and_reraise(futures): - if futures: - done, _ = yield from asyncio.wait(futures) - for task in done: # (re-)raise first exception in line - task.result() - - def _sanitize_config(config): ''' Helper function to convert types to appropriate strings ''' # FIXME: find another solution for serializing basic types diff --git a/qubes/utils.py b/qubes/utils.py index a0587cbb..6cfdf79b 100644 --- a/qubes/utils.py +++ b/qubes/utils.py @@ -190,3 +190,16 @@ def coro_maybe(value): if asyncio.iscoroutine(value): return (yield from value) return value + +@asyncio.coroutine +def void_coros_maybe(values): + ''' Ignore elements of the iterable values that are not coroutine + objects. Run all coroutine objects to completion, in parallel + to each other. If there were exceptions, re-raise the leftmost + one (not necessarily chronologically first). Return nothing. + ''' + coros = [val for val in values if asyncio.iscoroutine(val)] + if coros: + done, _ = yield from asyncio.wait(coros) + for task in done: + task.result() # re-raises exception if task failed