diff --git a/qubes/storage/callback.py b/qubes/storage/callback.py index b51b6cb3..cf7254a3 100644 --- a/qubes/storage/callback.py +++ b/qubes/storage/callback.py @@ -20,10 +20,17 @@ import logging import subprocess import json +import asyncio +import threading from shlex import quote +from qubes.utils import coro_maybe import qubes.storage +class UnhandledSignalException(qubes.storage.StoragePoolException): + def __init__(self, pool, signal): + super().__init__('The pool %s failed to handle the signal %s, likely because it was run from synchronous code.' % (pool.name, signal)) + class CallbackPool(qubes.storage.Pool): ''' Proxy storage pool driver adding callback functionality to other pool drivers. @@ -37,6 +44,12 @@ class CallbackPool(qubes.storage.Pool): - custom pool mounts - encryption - debugging + - run synchronous pool drivers asynchronously + + A word of caution: + This implementation runs all methods that `qubes.storage.Pool` allows to be asynchronous asynchronously. So if a backend pool driver does + not support a particular method to be run asynchronously, there may be issues. In short, it is always preferable to use the original backend + driver over this one unless the functionality of this driver is required for a particular use case. **Integration tests**: @@ -58,7 +71,7 @@ class CallbackPool(qubes.storage.Pool): ls /mnt/test01 qvm-pool -r test && sudo rm -rf /mnt/test01 - echo '#!/bin/bash'$'\n''i=0 ; for arg in "$@" ; do echo "$i: $arg" >> /tmp/callback.log ; (( i++)) ; done ; exit 0' > /usr/bin/testCbLogArgs && chmod +x /usr/bin/testCbLogArgs + echo '#!/bin/bash'$'\n''i=1 ; for arg in "$@" ; do echo "$i: $arg" >> /tmp/callback.log ; (( i++)) ; done ; exit 0' > /usr/bin/testCbLogArgs && chmod +x /usr/bin/testCbLogArgs rm -f /tmp/callback.log qvm-pool -o conf_id=testing-succ-file-02 -a test callback qvm-pool @@ -141,7 +154,7 @@ class CallbackPool(qubes.storage.Pool): systemctl restart qubesd qvm-start test-eph2 (trigger storage re-init) md5sum /mnt/ram/teph.key (same as in (2)) - qvm-shutdown test-eph2 + qvm-shutdown --wait test-eph2 sudo umount /mnt/test_eph qvm-create -l red -P teph test-eph-fail (must fail with error in journalctl) ls /mnt/test_eph/ (should be empty) @@ -150,7 +163,7 @@ class CallbackPool(qubes.storage.Pool): qvm-create -l red -P teph test-eph3 md5sum /mnt/ram/teph.key (same as in (2)) sudo mount|grep -E 'ram|test' - ls /mnt/test_eph/appvms/test_eph3 + ls /mnt/test_eph/appvms/test-eph3 qvm-remove test-eph3 qvm-ls | grep test-eph qvm-pool -r teph @@ -172,13 +185,14 @@ class CallbackPool(qubes.storage.Pool): may be called from an untrusted VM (not by default though). In those cases it may be security-relevant not to pick easily guessable `conf_id` values for your configuration as untrusted VMs may otherwise execute callbacks meant for other pools. + :raise StoragePoolException: For user configuration issues. ''' #NOTE: attribute names **must** start with `_cb_` unless they are meant to be stored as self._cb_impl attributes self._cb_ctor_done = False #: Boolean to indicate whether or not `__init__` successfully ran through. self._cb_log = logging.getLogger('qubes.storage.callback') #: Logger instance. if not isinstance(conf_id, str): raise qubes.storage.StoragePoolException('conf_id is no String. VM attack?!') - self._cb_conf_id = conf_id #: Configuration ID as passed to `__init__`. + self._cb_conf_id = conf_id #: Configuration ID as passed to `__init__()`. with open(CallbackPool.config_path) as json_file: conf_all = json.load(json_file) @@ -207,12 +221,13 @@ class CallbackPool(qubes.storage.Pool): raise qubes.storage.StoragePoolException('The class %s must be a subclass of qubes.storage.Pool.' % cls) self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not. + self._cb_init_lock = threading.Lock() #: Lock ensuring that late storage initialization is only run exactly once. Currently a `threading.Lock()` to make it accessible from synchronous code as well. bdriver_args = self._cb_conf.get('bdriver_args', {}) self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver. super().__init__(name=name, revisions_to_keep=int(bdriver_args.get('revisions_to_keep', 1))) self._cb_ctor_done = True - self._callback('on_ctor') + self._callback_nocoro('on_ctor') def _check_init(self): ''' Whether or not this object requires late storage initialization via callback. ''' @@ -221,24 +236,38 @@ class CallbackPool(qubes.storage.Pool): cmd = self._cb_conf.get('cmd') return bool(cmd and cmd != '-') + @asyncio.coroutine def _init(self, callback=True): ''' Late storage initialization on first use for e.g. decryption on first usage request. :param callback: Whether to trigger the `on_sinit` callback or not. ''' - #maybe TODO: if this function is meant to be run in parallel (are Pool operations asynchronous?), a function lock is required! - if callback: - self._callback('on_sinit') - self._cb_requires_init = False + with self._cb_init_lock: + if self._cb_requires_init: + if callback: + yield from self._callback('on_sinit') + self._cb_requires_init = False + def _init_nocoro(self, callback=True): + ''' `_init()` in synchronous code. ''' + with self._cb_init_lock: + if self._cb_requires_init: + if callback: + self._callback_nocoro('on_sinit') + self._cb_requires_init = False + + @asyncio.coroutine def _assert_initialized(self, **kwargs): if self._cb_requires_init: - self._init(**kwargs) + yield from self._init(**kwargs) - def _callback(self, cb, cb_args=None): - '''Run a callback. + def _callback_nocoro(self, cb, cb_args=None, handle_signals=True): + '''Run a callback (variant that can be used outside of coroutines / from synchronous code). :param cb: Callback identifier string. :param cb_args: Optional list of arguments to pass to the command as last arguments. Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks. + :param handle_signals: Attempt to handle signals locally in synchronous code. + May throw an exception, if a callback signal cannot be handled locally. + :return: String with potentially unhandled signals, if `handle_signals` is `False`. Nothing otherwise. ''' if self._cb_ctor_done: cmd = self._cb_conf.get(cb) @@ -249,7 +278,6 @@ class CallbackPool(qubes.storage.Pool): cmd = self._cb_conf.get('cmd') args = [self.name, self._cb_conf['bdriver'], cb, self._cb_cmd_arg, *cb_args] if cmd and cmd != '-': - args = filter(None, args) args = ' '.join(quote(str(a)) for a in args) cmd = ' '.join(filter(None, [cmd, args])) self._cb_log.info('callback driver executing (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, cmd) @@ -258,8 +286,24 @@ class CallbackPool(qubes.storage.Pool): self._cb_log.debug('callback driver stdout (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stdout) self._cb_log.debug('callback driver stderr (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stderr) if self._cb_conf.get('signal_back', False) is True: - self._process_signals(res.stdout) + if handle_signals: + self._process_signals_nocoro(res.stdout) + else: + return res.stdout + return None + @asyncio.coroutine + def _callback(self, cb, cb_args=None): + '''Run a callback. + :param cb: Callback identifier string. + :param cb_args: Optional list of arguments to pass to the command as last arguments. + Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks. + ''' + ret = self._callback_nocoro(cb, cb_args=cb_args, handle_signals=False) + if ret: + yield from self._process_signals(ret) + + @asyncio.coroutine def _process_signals(self, out): '''Process any signals found inside a string. :param out: String to check for signals. Each signal must be on a dedicated line. @@ -268,7 +312,18 @@ class CallbackPool(qubes.storage.Pool): for line in out.splitlines(): if line == 'SIGNAL_setup': self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id) - self._setup_cb(callback=False) + #NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock` + yield from coro_maybe(self._cb_impl.setup()) + + def _process_signals_nocoro(self, out): + '''Variant of `process_signals` to be used with synchronous code. + :param out: String to check for signals. Each signal must be on a dedicated line. + They are executed in the order they are found. Callbacks are not triggered. + :raise UnhandledSignalException: If signals cannot be handled here / in synchronous code. + ''' + for line in out.splitlines(): + if line == 'SIGNAL_setup': + raise UnhandledSignalException(self, line) @property def config(self): @@ -278,23 +333,21 @@ class CallbackPool(qubes.storage.Pool): 'conf_id': self._cb_conf_id, } + @asyncio.coroutine def destroy(self): - self._assert_initialized() - ret = self._cb_impl.destroy() - self._callback('on_destroy') + yield from self._assert_initialized() + ret = yield from coro_maybe(self._cb_impl.destroy()) + yield from self._callback('on_destroy') return ret def init_volume(self, vm, volume_config): return CallbackVolume(self, self._cb_impl.init_volume(vm, volume_config)) - def _setup_cb(self, callback=True): - if callback: - self._callback('on_setup') - self._assert_initialized(callback=False) #setup is assumed to include storage initialization - return self._cb_impl.setup() - + @asyncio.coroutine def setup(self): - return self._setup_cb() + yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization + yield from self._callback('on_setup') + return (yield from coro_maybe(self._cb_impl.setup())) @property def volumes(self): @@ -365,72 +418,107 @@ class CallbackVolume: self._cb_pool = pool #: CallbackPool instance the Volume belongs to. self._cb_impl = impl #: Backend volume implementation instance. + @asyncio.coroutine def _assert_initialized(self, **kwargs): - return self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access + yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access + @asyncio.coroutine def _callback(self, cb, cb_args=None, **kwargs): if cb_args is None: cb_args = [] vol_args = [self.name, self.vid, *cb_args] - return self._cb_pool._callback(cb, cb_args=vol_args, **kwargs) # pylint: disable=protected-access + yield from self._cb_pool._callback(cb, cb_args=vol_args, **kwargs) # pylint: disable=protected-access + @asyncio.coroutine def create(self): - self._assert_initialized() - self._callback('on_volume_create') - return self._cb_impl.create() + yield from self._assert_initialized() + yield from self._callback('on_volume_create') + return (yield from coro_maybe(self._cb_impl.create())) + @asyncio.coroutine def remove(self): - self._assert_initialized() - ret = self._cb_impl.remove() - self._callback('on_volume_remove') + yield from self._assert_initialized() + ret = yield from coro_maybe(self._cb_impl.remove()) + yield from self._callback('on_volume_remove') return ret + @asyncio.coroutine def resize(self, size): - self._assert_initialized() - self._callback('on_volume_resize', cb_args=[size]) - return self._cb_impl.resize(size) + yield from self._assert_initialized() + yield from self._callback('on_volume_resize', cb_args=[size]) + return (yield from coro_maybe(self._cb_impl.resize(size))) + @asyncio.coroutine def start(self): - self._assert_initialized() - self._callback('on_volume_start') - return self._cb_impl.start() + yield from self._assert_initialized() + yield from self._callback('on_volume_start') + return (yield from coro_maybe(self._cb_impl.start())) + @asyncio.coroutine def stop(self): - self._assert_initialized() - ret = self._cb_impl.stop() - self._callback('on_volume_stop') + yield from self._assert_initialized() + ret = yield from coro_maybe(self._cb_impl.stop()) + yield from self._callback('on_volume_stop') return ret + @asyncio.coroutine def import_data(self): - self._assert_initialized() - self._callback('on_volume_import_data') - return self._cb_impl.import_data() + yield from self._assert_initialized() + yield from self._callback('on_volume_import_data') + return (yield from coro_maybe(self._cb_impl.import_data())) + @asyncio.coroutine def import_data_end(self, success): - self._assert_initialized() - ret = self._cb_impl.import_data_end(success) - self._callback('on_volume_import_data_end', cb_args=[success]) + yield from self._assert_initialized() + ret = yield from coro_maybe(self._cb_impl.import_data_end(success)) + yield from self._callback('on_volume_import_data_end', cb_args=[success]) return ret + @asyncio.coroutine def import_volume(self, src_volume): - self._assert_initialized() - self._callback('on_volume_import', cb_args=[src_volume.vid]) - return self._cb_impl.import_volume(src_volume) + yield from self._assert_initialized() + yield from self._callback('on_volume_import', cb_args=[src_volume.vid]) + return (yield from coro_maybe(self._cb_impl.import_volume(src_volume))) def is_dirty(self): - if self._cb_pool._cb_requires_init: # pylint: disable=protected-access + # pylint: disable=protected-access + if self._cb_pool._cb_requires_init: return False return self._cb_impl.is_dirty() def is_outdated(self): - if self._cb_pool._cb_requires_init: # pylint: disable=protected-access + # pylint: disable=protected-access + if self._cb_pool._cb_requires_init: return False return self._cb_impl.is_outdated() + def block_device(self): + # pylint: disable=protected-access + if self._cb_pool._cb_requires_init: + # usually Volume.start() is called beforehand + # --> we should be initialized in 99% of cases + return None + return self._cb_impl.block_device() + + def export(self, volume): + # pylint: disable=protected-access + #TODO: once this becomes a coroutine in the Volume class, avoid the below blocking & potentially exception-throwing code; maybe also add a callback + if self._cb_pool._cb_requires_init: + self._cb_pool._init_nocoro() + return self._cb_impl.export(volume) + + @asyncio.coroutine + def verify(self): + yield from self._assert_initialized() + return (yield from coro_maybe(self._cb_impl.verify())) + + @asyncio.coroutine + def revert(self, revision=None): + yield from self._assert_initialized() + return (yield from coro_maybe(self._cb_impl.revert(revision=revision))) + #remaining method & attribute delegation def __getattr__(self, name): - if name in ['block_device', 'verify', 'revert', 'export']: - self._assert_initialized() return getattr(self._cb_impl, name) def __setattr__(self, name, value):