storage/reflink: run synchronous volume methods in executor

Convert create(), verify(), remove(), start(), stop(), revert(),
resize(), and import_volume() into coroutine methods, via a decorator
that runs them in the event loop's thread-based default executor.

This reduces UI hangs by unblocking the event loop, and can e.g. speed
up VM starts by starting multiple volumes in parallel.
This commit is contained in:
Rusty Bird 2018-09-11 23:50:15 +00:00
parent 3d986be02a
commit 1889c9b75f
No known key found for this signature in database
GPG Key ID: 469D78F47AAF2ADF

View File

@ -22,9 +22,11 @@
but not required. but not required.
''' '''
import asyncio
import collections import collections
import errno import errno
import fcntl import fcntl
import functools
import glob import glob
import logging import logging
import os import os
@ -115,20 +117,37 @@ class ReflinkPool(qubes.storage.Pool):
[pool for pool in app.pools.values() if pool is not self], [pool for pool in app.pools.values() if pool is not self],
self.dir_path) self.dir_path)
def _unblock(method):
''' Decorator transforming a synchronous volume method into a
coroutine that runs the original method in the event loop's
thread-based default executor, under a per-volume lock.
'''
@asyncio.coroutine
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
with (yield from self._lock): # pylint: disable=protected-access
return (yield from asyncio.get_event_loop().run_in_executor(
None, functools.partial(method, self, *args, **kwargs)))
return wrapper
class ReflinkVolume(qubes.storage.Volume): class ReflinkVolume(qubes.storage.Volume):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._lock = asyncio.Lock()
self._path_vid = os.path.join(self.pool.dir_path, self.vid) self._path_vid = os.path.join(self.pool.dir_path, self.vid)
self._path_clean = self._path_vid + '.img' self._path_clean = self._path_vid + '.img'
self._path_dirty = self._path_vid + '-dirty.img' self._path_dirty = self._path_vid + '-dirty.img'
self._path_import = self._path_vid + '-import.img' self._path_import = self._path_vid + '-import.img'
self.path = self._path_dirty self.path = self._path_dirty
@_unblock
def create(self): def create(self):
if self.save_on_stop and not self.snap_on_start: if self.save_on_stop and not self.snap_on_start:
_create_sparse_file(self._path_clean, self.size) _create_sparse_file(self._path_clean, self.size)
return self return self
@_unblock
def verify(self): def verify(self):
if self.snap_on_start: if self.snap_on_start:
img = self.source._path_clean # pylint: disable=protected-access img = self.source._path_clean # pylint: disable=protected-access
@ -142,6 +161,7 @@ class ReflinkVolume(qubes.storage.Volume):
raise qubes.storage.StoragePoolException( raise qubes.storage.StoragePoolException(
'Missing image file {!r} for volume {}'.format(img, self.vid)) 'Missing image file {!r} for volume {}'.format(img, self.vid))
@_unblock
def remove(self): def remove(self):
''' Drop volume object from pool; remove volume images from ''' Drop volume object from pool; remove volume images from
oldest to newest; remove empty VM directory. oldest to newest; remove empty VM directory.
@ -170,6 +190,7 @@ class ReflinkVolume(qubes.storage.Volume):
def is_dirty(self): def is_dirty(self):
return self.save_on_stop and os.path.exists(self._path_dirty) return self.save_on_stop and os.path.exists(self._path_dirty)
@_unblock
def start(self): def start(self):
self._cleanup() self._cleanup()
if self.is_dirty(): # implies self.save_on_stop if self.is_dirty(): # implies self.save_on_stop
@ -183,6 +204,7 @@ class ReflinkVolume(qubes.storage.Volume):
_create_sparse_file(self._path_dirty, self.size) _create_sparse_file(self._path_dirty, self.size)
return self return self
@_unblock
def stop(self): def stop(self):
if self.save_on_stop: if self.save_on_stop:
self._commit(self._path_dirty) self._commit(self._path_dirty)
@ -211,6 +233,7 @@ class ReflinkVolume(qubes.storage.Volume):
for number, timestamp in list(self.revisions.items())[:-keep or None]: for number, timestamp in list(self.revisions.items())[:-keep or None]:
_remove_file(self._path_revision(number, timestamp)) _remove_file(self._path_revision(number, timestamp))
@_unblock
def revert(self, revision=None): def revert(self, revision=None):
if self.is_dirty(): if self.is_dirty():
raise qubes.storage.StoragePoolException( raise qubes.storage.StoragePoolException(
@ -224,6 +247,7 @@ class ReflinkVolume(qubes.storage.Volume):
_rename_file(path_revision, self._path_clean) _rename_file(path_revision, self._path_clean)
return self return self
@_unblock
def resize(self, size): def resize(self, size):
''' Expand a read-write volume image; notify any corresponding ''' Expand a read-write volume image; notify any corresponding
loop devices of the size change. loop devices of the size change.
@ -269,6 +293,7 @@ class ReflinkVolume(qubes.storage.Volume):
_remove_file(self._path_import) _remove_file(self._path_import)
return self return self
@_unblock
def import_volume(self, src_volume): def import_volume(self, src_volume):
if not self.save_on_stop: if not self.save_on_stop:
return self return self