Browse Source

factor out utils.void_coros_maybe()

Rusty Bird 4 years ago
parent
commit
5b52d23478
2 changed files with 30 additions and 49 deletions
  1. 17 49
      qubes/storage/__init__.py
  2. 13 0
      qubes/utils.py

+ 17 - 49
qubes/storage/__init__.py

@@ -507,16 +507,8 @@ class Storage:
     def create(self):
         ''' Creates volumes on disk '''
         old_umask = os.umask(0o002)
-
-        coros = []
-        for volume in self.vm.volumes.values():
-            # launch the operation, if it's asynchronous, then append to wait
-            #  for them at the end
-            ret = volume.create()
-            if asyncio.iscoroutine(ret):
-                coros.append(ret)
-        yield from _wait_and_reraise(coros)
-
+        yield from qubes.utils.void_coros_maybe(
+            vol.create() for vol in self.vm.volumes.values())
         os.umask(old_umask)
 
     @asyncio.coroutine
@@ -544,8 +536,9 @@ class Storage:
 
         self.vm.volumes = {}
         with VmCreationManager(self.vm):
-            yield from _wait_and_reraise([self.clone_volume(src_vm, vol_name)
-                for vol_name in self.vm.volume_config.keys()])
+            yield from qubes.utils.void_coros_maybe(
+                self.clone_volume(src_vm, vol_name)
+                for vol_name in self.vm.volume_config.keys())
 
     @property
     def outdated_volumes(self):
@@ -571,12 +564,8 @@ class Storage:
             raise qubes.exc.QubesVMError(
                 self.vm,
                 'VM directory does not exist: {}'.format(self.vm.dir_path))
-        futures = []
-        for volume in self.vm.volumes.values():
-            ret = volume.verify()
-            if asyncio.iscoroutine(ret):
-                futures.append(ret)
-        yield from _wait_and_reraise(futures)
+        yield from qubes.utils.void_coros_maybe(
+            vol.verify() for vol in self.vm.volumes.values())
         self.vm.fire_event('domain-verify-files')
         return True
 
@@ -586,42 +575,29 @@ class Storage:
 
             Errors on removal are catched and logged.
         '''
-        futures = []
-        for name, volume in self.vm.volumes.items():
-            self.log.info('Removing volume %s: %s' % (name, volume.vid))
+        results = []
+        for vol in self.vm.volumes.values():
+            self.log.info('Removing volume %s: %s' % (vol.name, vol.vid))
             try:
-                ret = volume.remove()
-                if asyncio.iscoroutine(ret):
-                    futures.append(ret)
+                results.append(vol.remove())
             except (IOError, OSError) as e:
-                self.vm.log.exception("Failed to remove volume %s", name, e)
-
+                self.vm.log.exception("Failed to remove volume %s", vol.name, e)
         try:
-            yield from _wait_and_reraise(futures)
+            yield from qubes.utils.void_coros_maybe(results)
         except (IOError, OSError) as e:
             self.vm.log.exception("Failed to remove some volume", e)
 
     @asyncio.coroutine
     def start(self):
         ''' Execute the start method on each volume '''
-        futures = []
-        for volume in self.vm.volumes.values():
-            ret = volume.start()
-            if asyncio.iscoroutine(ret):
-                futures.append(ret)
-
-        yield from _wait_and_reraise(futures)
+        yield from qubes.utils.void_coros_maybe(
+            vol.start() for vol in self.vm.volumes.values())
 
     @asyncio.coroutine
     def stop(self):
         ''' Execute the stop method on each volume '''
-        futures = []
-        for volume in self.vm.volumes.values():
-            ret = volume.stop()
-            if asyncio.iscoroutine(ret):
-                futures.append(ret)
-
-        yield from _wait_and_reraise(futures)
+        yield from qubes.utils.void_coros_maybe(
+            vol.stop() for vol in self.vm.volumes.values())
 
     def unused_frontend(self):
         ''' Find an unused device name '''
@@ -826,14 +802,6 @@ class Pool:
         return NotImplementedError(msg)
 
 
-@asyncio.coroutine
-def _wait_and_reraise(futures):
-    if futures:
-        done, _ = yield from asyncio.wait(futures)
-        for task in done:  # (re-)raise first exception in line
-            task.result()
-
-
 def _sanitize_config(config):
     ''' Helper function to convert types to appropriate strings
     '''  # FIXME: find another solution for serializing basic types

+ 13 - 0
qubes/utils.py

@@ -190,3 +190,16 @@ def coro_maybe(value):
     if asyncio.iscoroutine(value):
         return (yield from value)
     return value
+
+@asyncio.coroutine
+def void_coros_maybe(values):
+    ''' Ignore elements of the iterable values that are not coroutine
+        objects. Run all coroutine objects to completion, in parallel
+        to each other. If there were exceptions, re-raise the leftmost
+        one (not necessarily chronologically first). Return nothing.
+    '''
+    coros = [val for val in values if asyncio.iscoroutine(val)]
+    if coros:
+        done, _ = yield from asyncio.wait(coros)
+        for task in done:
+            task.result()  # re-raises exception if task failed