storage/reflink: split @_unblock into @_coroutinized @_locked
And change the volume lock from an asyncio.Lock to a threading.Lock - locking is now handled before coroutinization. This will allow the coroutinized resize() and a new *not* coroutinized size() getter from one of the next commits ("storage/reflink: preferably get volume size from image size") to both run under the volume lock.
This commit is contained in:
parent
ab929baa38
commit
4c9c0a88d5
@ -32,6 +32,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import threading
|
||||||
from contextlib import contextmanager, suppress
|
from contextlib import contextmanager, suppress
|
||||||
|
|
||||||
import qubes.storage
|
import qubes.storage
|
||||||
@ -117,36 +118,47 @@ class ReflinkPool(qubes.storage.Pool):
|
|||||||
self.dir_path)
|
self.dir_path)
|
||||||
|
|
||||||
|
|
||||||
def _unblock(method):
|
def _coroutinized(function):
|
||||||
''' Decorator transforming a synchronous volume method into a
|
''' Decorator transforming a synchronous function into a coroutine
|
||||||
coroutine that runs the original method in the event loop's
|
that runs the function in the event loop's thread-based
|
||||||
thread-based default executor, under a per-volume lock.
|
default executor.
|
||||||
'''
|
'''
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@functools.wraps(function)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
return (yield from asyncio.get_event_loop().run_in_executor(
|
||||||
|
None, functools.partial(function, *args, **kwargs)))
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
def _locked(method):
|
||||||
|
''' Decorator transforming a synchronous volume method to run
|
||||||
|
under the volume lock.
|
||||||
|
'''
|
||||||
@functools.wraps(method)
|
@functools.wraps(method)
|
||||||
def wrapper(self, *args, **kwargs):
|
def wrapper(self, *args, **kwargs):
|
||||||
with (yield from self._lock): # pylint: disable=protected-access
|
with self._lock: # pylint: disable=protected-access
|
||||||
return (yield from asyncio.get_event_loop().run_in_executor(
|
return method(self, *args, **kwargs)
|
||||||
None, functools.partial(method, self, *args, **kwargs)))
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
class ReflinkVolume(qubes.storage.Volume):
|
class ReflinkVolume(qubes.storage.Volume):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self._lock = asyncio.Lock()
|
self._lock = threading.Lock()
|
||||||
self._path_vid = os.path.join(self.pool.dir_path, self.vid)
|
self._path_vid = os.path.join(self.pool.dir_path, self.vid)
|
||||||
self._path_clean = self._path_vid + '.img'
|
self._path_clean = self._path_vid + '.img'
|
||||||
self._path_dirty = self._path_vid + '-dirty.img'
|
self._path_dirty = self._path_vid + '-dirty.img'
|
||||||
self._path_import = self._path_vid + '-import.img'
|
self._path_import = self._path_vid + '-import.img'
|
||||||
self.path = self._path_dirty
|
self.path = self._path_dirty
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def create(self):
|
def create(self):
|
||||||
if self.save_on_stop and not self.snap_on_start:
|
if self.save_on_stop and not self.snap_on_start:
|
||||||
_create_sparse_file(self._path_clean, self.size)
|
_create_sparse_file(self._path_clean, self.size)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def verify(self):
|
def verify(self):
|
||||||
if self.snap_on_start:
|
if self.snap_on_start:
|
||||||
img = self.source._path_clean # pylint: disable=protected-access
|
img = self.source._path_clean # pylint: disable=protected-access
|
||||||
@ -160,7 +172,8 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
raise qubes.storage.StoragePoolException(
|
raise qubes.storage.StoragePoolException(
|
||||||
'Missing image file {!r} for volume {}'.format(img, self.vid))
|
'Missing image file {!r} for volume {}'.format(img, self.vid))
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def remove(self):
|
def remove(self):
|
||||||
''' Drop volume object from pool; remove volume images from
|
''' Drop volume object from pool; remove volume images from
|
||||||
oldest to newest; remove empty VM directory.
|
oldest to newest; remove empty VM directory.
|
||||||
@ -189,7 +202,8 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
def is_dirty(self):
|
def is_dirty(self):
|
||||||
return self.save_on_stop and os.path.exists(self._path_dirty)
|
return self.save_on_stop and os.path.exists(self._path_dirty)
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def start(self):
|
def start(self):
|
||||||
self._cleanup()
|
self._cleanup()
|
||||||
if self.is_dirty(): # implies self.save_on_stop
|
if self.is_dirty(): # implies self.save_on_stop
|
||||||
@ -203,7 +217,8 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
_create_sparse_file(self._path_dirty, self.size)
|
_create_sparse_file(self._path_dirty, self.size)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.save_on_stop:
|
if self.save_on_stop:
|
||||||
self._commit(self._path_dirty)
|
self._commit(self._path_dirty)
|
||||||
@ -232,7 +247,8 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
for number, timestamp in list(self.revisions.items())[:-keep or None]:
|
for number, timestamp in list(self.revisions.items())[:-keep or None]:
|
||||||
_remove_file(self._path_revision(number, timestamp))
|
_remove_file(self._path_revision(number, timestamp))
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def revert(self, revision=None):
|
def revert(self, revision=None):
|
||||||
if self.is_dirty():
|
if self.is_dirty():
|
||||||
raise qubes.storage.StoragePoolException(
|
raise qubes.storage.StoragePoolException(
|
||||||
@ -246,7 +262,8 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
_rename_file(path_revision, self._path_clean)
|
_rename_file(path_revision, self._path_clean)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def resize(self, size):
|
def resize(self, size):
|
||||||
''' Resize a read-write volume image; notify any corresponding
|
''' Resize a read-write volume image; notify any corresponding
|
||||||
loop devices of the size change.
|
loop devices of the size change.
|
||||||
@ -271,7 +288,8 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
'Cannot export: {} is not save_on_stop'.format(self.vid))
|
'Cannot export: {} is not save_on_stop'.format(self.vid))
|
||||||
return self._path_clean
|
return self._path_clean
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def import_data(self):
|
def import_data(self):
|
||||||
if not self.save_on_stop:
|
if not self.save_on_stop:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
@ -286,9 +304,10 @@ class ReflinkVolume(qubes.storage.Volume):
|
|||||||
_remove_file(self._path_import)
|
_remove_file(self._path_import)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
import_data_end = _unblock(_import_data_end)
|
import_data_end = _coroutinized(_locked(_import_data_end))
|
||||||
|
|
||||||
@_unblock
|
@_coroutinized
|
||||||
|
@_locked
|
||||||
def import_volume(self, src_volume):
|
def import_volume(self, src_volume):
|
||||||
if not self.save_on_stop:
|
if not self.save_on_stop:
|
||||||
return self
|
return self
|
||||||
|
Loading…
Reference in New Issue
Block a user