Browse Source

storage: move @locked from lvm to Volume base class

And use it in reflink, instead of a synchronous lock.
Rusty Bird 4 years ago
parent
commit
e188b93c95
3 changed files with 39 additions and 49 deletions
  1. 20 0
      qubes/storage/__init__.py
  2. 9 28
      qubes/storage/lvm.py
  3. 10 21
      qubes/storage/reflink.py

+ 20 - 0
qubes/storage/__init__.py

@@ -22,6 +22,7 @@
 
 """ Qubes storage system"""
 
+import functools
 import inspect
 import os
 import os.path
@@ -133,6 +134,8 @@ class Volume:
         self.source = source
         #: Volume unique (inside given pool) identifier
         self.vid = vid
+        #: Asynchronous lock for @Volume.locked decorator
+        self._lock = asyncio.Lock()
 
     def __eq__(self, other):
         if isinstance(other, Volume):
@@ -155,6 +158,23 @@ class Volume:
         config = _sanitize_config(self.config)
         return lxml.etree.Element('volume', **config)
 
+    @staticmethod
+    def locked(method):
+        '''Decorator running given Volume's coroutine under a lock.
+        Needs to be added after wrapping with @asyncio.coroutine, for example:
+
+        >>>@Volume.locked
+        >>>@asyncio.coroutine
+        >>>def start(self):
+        >>>    pass
+        '''
+        @asyncio.coroutine
+        @functools.wraps(method)
+        def wrapper(self, *args, **kwargs):
+            with (yield from self._lock):  # pylint: disable=protected-access
+                return (yield from method(self, *args, **kwargs))
+        return wrapper
+
     def create(self):
         ''' Create the given volume on disk.
 

+ 9 - 28
qubes/storage/lvm.py

@@ -18,7 +18,6 @@
 #
 
 ''' Driver for storing vm images in a LVM thin pool '''
-import functools
 import logging
 import os
 import subprocess
@@ -292,22 +291,6 @@ def _revision_sort_key(revision):
         revision = revision.split('-')[0]
     return int(revision)
 
-def locked(method):
-    '''Decorator running given Volume's coroutine under a lock.
-    Needs to be added after wrapping with @asyncio.coroutine, for example:
-
-    >>>@locked
-    >>>@asyncio.coroutine
-    >>>def start(self):
-    >>>    pass
-    '''
-    @asyncio.coroutine
-    @functools.wraps(method)
-    def wrapper(self, *args, **kwargs):
-        with (yield from self._lock):  # pylint: disable=protected-access
-            return (yield from method(self, *args, **kwargs))
-    return wrapper
-
 class ThinVolume(qubes.storage.Volume):
     ''' Default LVM thin volume implementation
     '''  # pylint: disable=too-few-public-methods
@@ -323,8 +306,6 @@ class ThinVolume(qubes.storage.Volume):
         if self.save_on_stop:
             self._vid_import = self.vid + '-import'
 
-        self._lock = asyncio.Lock()
-
     @property
     def path(self):
         return '/dev/' + self._vid_current
@@ -461,7 +442,7 @@ class ThinVolume(qubes.storage.Volume):
         # and remove old snapshots, if needed
         yield from self._remove_revisions()
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def create(self):
         assert self.vid
@@ -480,7 +461,7 @@ class ThinVolume(qubes.storage.Volume):
             yield from reset_cache_coro()
         return self
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def remove(self):
         assert self.vid
@@ -514,7 +495,7 @@ class ThinVolume(qubes.storage.Volume):
         devpath = self.path
         return devpath
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def import_volume(self, src_volume):
         if not src_volume.save_on_stop:
@@ -556,7 +537,7 @@ class ThinVolume(qubes.storage.Volume):
 
         return self
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def import_data(self, size):
         ''' Returns an object that can be `open()`. '''
@@ -573,7 +554,7 @@ class ThinVolume(qubes.storage.Volume):
         devpath = '/dev/' + self._vid_import
         return devpath
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def import_data_end(self, success):
         '''Either commit imported data, or discard temporary volume'''
@@ -609,7 +590,7 @@ class ThinVolume(qubes.storage.Volume):
         return (size_cache[self._vid_snap]['origin'] !=
                self.source.path.split('/')[-1])
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def revert(self, revision=None):
         if self.is_dirty():
@@ -633,7 +614,7 @@ class ThinVolume(qubes.storage.Volume):
         yield from reset_cache_coro()
         return self
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def resize(self, size):
         ''' Expands volume, throws
@@ -682,7 +663,7 @@ class ThinVolume(qubes.storage.Volume):
             cmd = ['clone', self.source.path, self._vid_snap]
         yield from qubes_lvm_coro(cmd, self.log)
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def start(self):
         self.abort_if_import_in_progress()
@@ -696,7 +677,7 @@ class ThinVolume(qubes.storage.Volume):
             yield from reset_cache_coro()
         return self
 
-    @locked
+    @qubes.storage.Volume.locked
     @asyncio.coroutine
     def stop(self):
         try:

+ 10 - 21
qubes/storage/reflink.py

@@ -32,7 +32,6 @@ import logging
 import os
 import subprocess
 import tempfile
-import threading
 from contextlib import contextmanager, suppress
 
 import qubes.storage
@@ -131,28 +130,17 @@ class ReflinkPool(qubes.storage.Pool):
             self.dir_path)
 
 
-def _locked(method):
-    ''' Decorator transforming a synchronous volume method to run
-        under the volume lock.
-    '''
-    @functools.wraps(method)
-    def wrapper(self, *args, **kwargs):
-        with self._lock:  # pylint: disable=protected-access
-            return method(self, *args, **kwargs)
-    return wrapper
-
 class ReflinkVolume(qubes.storage.Volume):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
-        self._lock = threading.Lock()
         self._path_vid = os.path.join(self.pool.dir_path, self.vid)
         self._path_clean = self._path_vid + '.img'
         self._path_dirty = self._path_vid + '-dirty.img'
         self._path_import = self._path_vid + '-import.img'
         self.path = self._path_dirty
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def create(self):
         self._remove_all_images()
         if self.save_on_stop and not self.snap_on_start:
@@ -173,8 +161,8 @@ class ReflinkVolume(qubes.storage.Volume):
         raise qubes.storage.StoragePoolException(
             'Missing image file {!r} for volume {}'.format(img, self.vid))
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def remove(self):
         self.pool._volumes.pop(self, None)  # pylint: disable=protected-access
         self._remove_all_images()
@@ -203,8 +191,8 @@ class ReflinkVolume(qubes.storage.Volume):
     def is_dirty(self):
         return self.save_on_stop and os.path.exists(self._path_dirty)
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def start(self):
         self._remove_incomplete_images()
         if not self.is_dirty():
@@ -220,8 +208,8 @@ class ReflinkVolume(qubes.storage.Volume):
                 _create_sparse_file(self._path_dirty, self.size)
         return self
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def stop(self):
         if self.save_on_stop:
             self._commit(self._path_dirty)
@@ -253,8 +241,8 @@ class ReflinkVolume(qubes.storage.Volume):
         for number, timestamp in list(self.revisions.items())[:-keep or None]:
             _remove_file(self._path_revision(number, timestamp))
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def revert(self, revision=None):
         if self.is_dirty():
             raise qubes.storage.StoragePoolException(
@@ -268,8 +256,8 @@ class ReflinkVolume(qubes.storage.Volume):
         _rename_file(path_revision, self._path_clean)
         return self
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def resize(self, size):
         ''' Resize a read-write volume; notify any corresponding loop
             devices of the size change.
@@ -292,8 +280,8 @@ class ReflinkVolume(qubes.storage.Volume):
                 'Cannot export: {} is not save_on_stop'.format(self.vid))
         return self._path_clean
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def import_data(self, size):
         if not self.save_on_stop:
             raise NotImplementedError(
@@ -305,10 +293,11 @@ class ReflinkVolume(qubes.storage.Volume):
         (self._commit if success else _remove_file)(self._path_import)
         return self
 
-    import_data_end = _coroutinized(_locked(_import_data_end))
+    import_data_end = qubes.storage.Volume.locked(_coroutinized(
+        _import_data_end))
 
+    @qubes.storage.Volume.locked
     @_coroutinized
-    @_locked
     def import_volume(self, src_volume):
         if self.save_on_stop:
             try: