diff --git a/qubes/storage/__init__.py b/qubes/storage/__init__.py index 8b79f16e..03af6312 100644 --- a/qubes/storage/__init__.py +++ b/qubes/storage/__init__.py @@ -22,6 +22,7 @@ """ Qubes storage system""" +import functools import inspect import os import os.path @@ -133,6 +134,8 @@ class Volume: self.source = source #: Volume unique (inside given pool) identifier self.vid = vid + #: Asynchronous lock for @Volume.locked decorator + self._lock = asyncio.Lock() def __eq__(self, other): if isinstance(other, Volume): @@ -155,6 +158,23 @@ class Volume: config = _sanitize_config(self.config) return lxml.etree.Element('volume', **config) + @staticmethod + def locked(method): + '''Decorator running given Volume's coroutine under a lock. + Needs to be added after wrapping with @asyncio.coroutine, for example: + + >>>@Volume.locked + >>>@asyncio.coroutine + >>>def start(self): + >>> pass + ''' + @asyncio.coroutine + @functools.wraps(method) + def wrapper(self, *args, **kwargs): + with (yield from self._lock): # pylint: disable=protected-access + return (yield from method(self, *args, **kwargs)) + return wrapper + def create(self): ''' Create the given volume on disk. diff --git a/qubes/storage/lvm.py b/qubes/storage/lvm.py index f0c1874e..61c9887e 100644 --- a/qubes/storage/lvm.py +++ b/qubes/storage/lvm.py @@ -18,7 +18,6 @@ # ''' Driver for storing vm images in a LVM thin pool ''' -import functools import logging import os import subprocess @@ -292,22 +291,6 @@ def _revision_sort_key(revision): revision = revision.split('-')[0] return int(revision) -def locked(method): - '''Decorator running given Volume's coroutine under a lock. - Needs to be added after wrapping with @asyncio.coroutine, for example: - - >>>@locked - >>>@asyncio.coroutine - >>>def start(self): - >>> pass - ''' - @asyncio.coroutine - @functools.wraps(method) - def wrapper(self, *args, **kwargs): - with (yield from self._lock): # pylint: disable=protected-access - return (yield from method(self, *args, **kwargs)) - return wrapper - class ThinVolume(qubes.storage.Volume): ''' Default LVM thin volume implementation ''' # pylint: disable=too-few-public-methods @@ -323,8 +306,6 @@ class ThinVolume(qubes.storage.Volume): if self.save_on_stop: self._vid_import = self.vid + '-import' - self._lock = asyncio.Lock() - @property def path(self): return '/dev/' + self._vid_current @@ -461,7 +442,7 @@ class ThinVolume(qubes.storage.Volume): # and remove old snapshots, if needed yield from self._remove_revisions() - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def create(self): assert self.vid @@ -480,7 +461,7 @@ class ThinVolume(qubes.storage.Volume): yield from reset_cache_coro() return self - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def remove(self): assert self.vid @@ -514,7 +495,7 @@ class ThinVolume(qubes.storage.Volume): devpath = self.path return devpath - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def import_volume(self, src_volume): if not src_volume.save_on_stop: @@ -556,7 +537,7 @@ class ThinVolume(qubes.storage.Volume): return self - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def import_data(self, size): ''' Returns an object that can be `open()`. ''' @@ -573,7 +554,7 @@ class ThinVolume(qubes.storage.Volume): devpath = '/dev/' + self._vid_import return devpath - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def import_data_end(self, success): '''Either commit imported data, or discard temporary volume''' @@ -609,7 +590,7 @@ class ThinVolume(qubes.storage.Volume): return (size_cache[self._vid_snap]['origin'] != self.source.path.split('/')[-1]) - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def revert(self, revision=None): if self.is_dirty(): @@ -633,7 +614,7 @@ class ThinVolume(qubes.storage.Volume): yield from reset_cache_coro() return self - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def resize(self, size): ''' Expands volume, throws @@ -682,7 +663,7 @@ class ThinVolume(qubes.storage.Volume): cmd = ['clone', self.source.path, self._vid_snap] yield from qubes_lvm_coro(cmd, self.log) - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def start(self): self.abort_if_import_in_progress() @@ -696,7 +677,7 @@ class ThinVolume(qubes.storage.Volume): yield from reset_cache_coro() return self - @locked + @qubes.storage.Volume.locked @asyncio.coroutine def stop(self): try: diff --git a/qubes/storage/reflink.py b/qubes/storage/reflink.py index 396dc5c7..ce8310d0 100644 --- a/qubes/storage/reflink.py +++ b/qubes/storage/reflink.py @@ -32,7 +32,6 @@ import logging import os import subprocess import tempfile -import threading from contextlib import contextmanager, suppress import qubes.storage @@ -131,28 +130,17 @@ class ReflinkPool(qubes.storage.Pool): self.dir_path) -def _locked(method): - ''' Decorator transforming a synchronous volume method to run - under the volume lock. - ''' - @functools.wraps(method) - def wrapper(self, *args, **kwargs): - with self._lock: # pylint: disable=protected-access - return method(self, *args, **kwargs) - return wrapper - class ReflinkVolume(qubes.storage.Volume): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._lock = threading.Lock() self._path_vid = os.path.join(self.pool.dir_path, self.vid) self._path_clean = self._path_vid + '.img' self._path_dirty = self._path_vid + '-dirty.img' self._path_import = self._path_vid + '-import.img' self.path = self._path_dirty + @qubes.storage.Volume.locked @_coroutinized - @_locked def create(self): self._remove_all_images() if self.save_on_stop and not self.snap_on_start: @@ -173,8 +161,8 @@ class ReflinkVolume(qubes.storage.Volume): raise qubes.storage.StoragePoolException( 'Missing image file {!r} for volume {}'.format(img, self.vid)) + @qubes.storage.Volume.locked @_coroutinized - @_locked def remove(self): self.pool._volumes.pop(self, None) # pylint: disable=protected-access self._remove_all_images() @@ -203,8 +191,8 @@ class ReflinkVolume(qubes.storage.Volume): def is_dirty(self): return self.save_on_stop and os.path.exists(self._path_dirty) + @qubes.storage.Volume.locked @_coroutinized - @_locked def start(self): self._remove_incomplete_images() if not self.is_dirty(): @@ -220,8 +208,8 @@ class ReflinkVolume(qubes.storage.Volume): _create_sparse_file(self._path_dirty, self.size) return self + @qubes.storage.Volume.locked @_coroutinized - @_locked def stop(self): if self.save_on_stop: self._commit(self._path_dirty) @@ -253,8 +241,8 @@ class ReflinkVolume(qubes.storage.Volume): for number, timestamp in list(self.revisions.items())[:-keep or None]: _remove_file(self._path_revision(number, timestamp)) + @qubes.storage.Volume.locked @_coroutinized - @_locked def revert(self, revision=None): if self.is_dirty(): raise qubes.storage.StoragePoolException( @@ -268,8 +256,8 @@ class ReflinkVolume(qubes.storage.Volume): _rename_file(path_revision, self._path_clean) return self + @qubes.storage.Volume.locked @_coroutinized - @_locked def resize(self, size): ''' Resize a read-write volume; notify any corresponding loop devices of the size change. @@ -292,8 +280,8 @@ class ReflinkVolume(qubes.storage.Volume): 'Cannot export: {} is not save_on_stop'.format(self.vid)) return self._path_clean + @qubes.storage.Volume.locked @_coroutinized - @_locked def import_data(self, size): if not self.save_on_stop: raise NotImplementedError( @@ -305,10 +293,11 @@ class ReflinkVolume(qubes.storage.Volume): (self._commit if success else _remove_file)(self._path_import) return self - import_data_end = _coroutinized(_locked(_import_data_end)) + import_data_end = qubes.storage.Volume.locked(_coroutinized( + _import_data_end)) + @qubes.storage.Volume.locked @_coroutinized - @_locked def import_volume(self, src_volume): if self.save_on_stop: try: