From 4c9c0a88d54a90c42419ac153cbe7fc6c9cf4032 Mon Sep 17 00:00:00 2001 From: Rusty Bird Date: Sun, 23 Jun 2019 12:47:58 +0000 Subject: [PATCH] storage/reflink: split @_unblock into @_coroutinized @_locked And change the volume lock from an asyncio.Lock to a threading.Lock - locking is now handled before coroutinization. This will allow the coroutinized resize() and a new *not* coroutinized size() getter from one of the next commits ("storage/reflink: preferably get volume size from image size") to both run under the volume lock. --- qubes/storage/reflink.py | 55 +++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/qubes/storage/reflink.py b/qubes/storage/reflink.py index 5ed21f81..9da74113 100644 --- a/qubes/storage/reflink.py +++ b/qubes/storage/reflink.py @@ -32,6 +32,7 @@ import logging import os import subprocess import tempfile +import threading from contextlib import contextmanager, suppress import qubes.storage @@ -117,36 +118,47 @@ class ReflinkPool(qubes.storage.Pool): self.dir_path) -def _unblock(method): - ''' Decorator transforming a synchronous volume method into a - coroutine that runs the original method in the event loop's - thread-based default executor, under a per-volume lock. +def _coroutinized(function): + ''' Decorator transforming a synchronous function into a coroutine + that runs the function in the event loop's thread-based + default executor. ''' @asyncio.coroutine + @functools.wraps(function) + def wrapper(*args, **kwargs): + return (yield from asyncio.get_event_loop().run_in_executor( + None, functools.partial(function, *args, **kwargs))) + return wrapper + +def _locked(method): + ''' Decorator transforming a synchronous volume method to run + under the volume lock. + ''' @functools.wraps(method) def wrapper(self, *args, **kwargs): - with (yield from self._lock): # pylint: disable=protected-access - return (yield from asyncio.get_event_loop().run_in_executor( - None, functools.partial(method, self, *args, **kwargs))) + with self._lock: # pylint: disable=protected-access + return method(self, *args, **kwargs) return wrapper class ReflinkVolume(qubes.storage.Volume): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._lock = asyncio.Lock() + self._lock = threading.Lock() self._path_vid = os.path.join(self.pool.dir_path, self.vid) self._path_clean = self._path_vid + '.img' self._path_dirty = self._path_vid + '-dirty.img' self._path_import = self._path_vid + '-import.img' self.path = self._path_dirty - @_unblock + @_coroutinized + @_locked def create(self): if self.save_on_stop and not self.snap_on_start: _create_sparse_file(self._path_clean, self.size) return self - @_unblock + @_coroutinized + @_locked def verify(self): if self.snap_on_start: img = self.source._path_clean # pylint: disable=protected-access @@ -160,7 +172,8 @@ class ReflinkVolume(qubes.storage.Volume): raise qubes.storage.StoragePoolException( 'Missing image file {!r} for volume {}'.format(img, self.vid)) - @_unblock + @_coroutinized + @_locked def remove(self): ''' Drop volume object from pool; remove volume images from oldest to newest; remove empty VM directory. @@ -189,7 +202,8 @@ class ReflinkVolume(qubes.storage.Volume): def is_dirty(self): return self.save_on_stop and os.path.exists(self._path_dirty) - @_unblock + @_coroutinized + @_locked def start(self): self._cleanup() if self.is_dirty(): # implies self.save_on_stop @@ -203,7 +217,8 @@ class ReflinkVolume(qubes.storage.Volume): _create_sparse_file(self._path_dirty, self.size) return self - @_unblock + @_coroutinized + @_locked def stop(self): if self.save_on_stop: self._commit(self._path_dirty) @@ -232,7 +247,8 @@ class ReflinkVolume(qubes.storage.Volume): for number, timestamp in list(self.revisions.items())[:-keep or None]: _remove_file(self._path_revision(number, timestamp)) - @_unblock + @_coroutinized + @_locked def revert(self, revision=None): if self.is_dirty(): raise qubes.storage.StoragePoolException( @@ -246,7 +262,8 @@ class ReflinkVolume(qubes.storage.Volume): _rename_file(path_revision, self._path_clean) return self - @_unblock + @_coroutinized + @_locked def resize(self, size): ''' Resize a read-write volume image; notify any corresponding loop devices of the size change. @@ -271,7 +288,8 @@ class ReflinkVolume(qubes.storage.Volume): 'Cannot export: {} is not save_on_stop'.format(self.vid)) return self._path_clean - @_unblock + @_coroutinized + @_locked def import_data(self): if not self.save_on_stop: raise NotImplementedError( @@ -286,9 +304,10 @@ class ReflinkVolume(qubes.storage.Volume): _remove_file(self._path_import) return self - import_data_end = _unblock(_import_data_end) + import_data_end = _coroutinized(_locked(_import_data_end)) - @_unblock + @_coroutinized + @_locked def import_volume(self, src_volume): if not self.save_on_stop: return self