storage/callback: asyncio implementation

This commit is contained in:
3hhh 2020-07-01 19:48:28 +02:00
parent 170e5f5d7a
commit 889c9238fe
No known key found for this signature in database
GPG Key ID: EB03A691DB2F0833

View File

@ -20,10 +20,17 @@
import logging import logging
import subprocess import subprocess
import json import json
import asyncio
import threading
from shlex import quote from shlex import quote
from qubes.utils import coro_maybe
import qubes.storage import qubes.storage
class UnhandledSignalException(qubes.storage.StoragePoolException):
def __init__(self, pool, signal):
super().__init__('The pool %s failed to handle the signal %s, likely because it was run from synchronous code.' % (pool.name, signal))
class CallbackPool(qubes.storage.Pool): class CallbackPool(qubes.storage.Pool):
''' Proxy storage pool driver adding callback functionality to other pool drivers. ''' Proxy storage pool driver adding callback functionality to other pool drivers.
@ -37,6 +44,12 @@ class CallbackPool(qubes.storage.Pool):
- custom pool mounts - custom pool mounts
- encryption - encryption
- debugging - debugging
- run synchronous pool drivers asynchronously
A word of caution:
This implementation runs all methods that `qubes.storage.Pool` allows to be asynchronous asynchronously. So if a backend pool driver does
not support a particular method to be run asynchronously, there may be issues. In short, it is always preferable to use the original backend
driver over this one unless the functionality of this driver is required for a particular use case.
**Integration tests**: **Integration tests**:
@ -58,7 +71,7 @@ class CallbackPool(qubes.storage.Pool):
ls /mnt/test01 ls /mnt/test01
qvm-pool -r test && sudo rm -rf /mnt/test01 qvm-pool -r test && sudo rm -rf /mnt/test01
echo '#!/bin/bash'$'\n''i=0 ; for arg in "$@" ; do echo "$i: $arg" >> /tmp/callback.log ; (( i++)) ; done ; exit 0' > /usr/bin/testCbLogArgs && chmod +x /usr/bin/testCbLogArgs echo '#!/bin/bash'$'\n''i=1 ; for arg in "$@" ; do echo "$i: $arg" >> /tmp/callback.log ; (( i++)) ; done ; exit 0' > /usr/bin/testCbLogArgs && chmod +x /usr/bin/testCbLogArgs
rm -f /tmp/callback.log rm -f /tmp/callback.log
qvm-pool -o conf_id=testing-succ-file-02 -a test callback qvm-pool -o conf_id=testing-succ-file-02 -a test callback
qvm-pool qvm-pool
@ -141,7 +154,7 @@ class CallbackPool(qubes.storage.Pool):
systemctl restart qubesd systemctl restart qubesd
qvm-start test-eph2 (trigger storage re-init) qvm-start test-eph2 (trigger storage re-init)
md5sum /mnt/ram/teph.key (same as in (2)) md5sum /mnt/ram/teph.key (same as in (2))
qvm-shutdown test-eph2 qvm-shutdown --wait test-eph2
sudo umount /mnt/test_eph sudo umount /mnt/test_eph
qvm-create -l red -P teph test-eph-fail (must fail with error in journalctl) qvm-create -l red -P teph test-eph-fail (must fail with error in journalctl)
ls /mnt/test_eph/ (should be empty) ls /mnt/test_eph/ (should be empty)
@ -150,7 +163,7 @@ class CallbackPool(qubes.storage.Pool):
qvm-create -l red -P teph test-eph3 qvm-create -l red -P teph test-eph3
md5sum /mnt/ram/teph.key (same as in (2)) md5sum /mnt/ram/teph.key (same as in (2))
sudo mount|grep -E 'ram|test' sudo mount|grep -E 'ram|test'
ls /mnt/test_eph/appvms/test_eph3 ls /mnt/test_eph/appvms/test-eph3
qvm-remove test-eph3 qvm-remove test-eph3
qvm-ls | grep test-eph qvm-ls | grep test-eph
qvm-pool -r teph qvm-pool -r teph
@ -172,13 +185,14 @@ class CallbackPool(qubes.storage.Pool):
may be called from an untrusted VM (not by default though). In those cases it may be security-relevant may be called from an untrusted VM (not by default though). In those cases it may be security-relevant
not to pick easily guessable `conf_id` values for your configuration as untrusted VMs may otherwise not to pick easily guessable `conf_id` values for your configuration as untrusted VMs may otherwise
execute callbacks meant for other pools. execute callbacks meant for other pools.
:raise StoragePoolException: For user configuration issues.
''' '''
#NOTE: attribute names **must** start with `_cb_` unless they are meant to be stored as self._cb_impl attributes #NOTE: attribute names **must** start with `_cb_` unless they are meant to be stored as self._cb_impl attributes
self._cb_ctor_done = False #: Boolean to indicate whether or not `__init__` successfully ran through. self._cb_ctor_done = False #: Boolean to indicate whether or not `__init__` successfully ran through.
self._cb_log = logging.getLogger('qubes.storage.callback') #: Logger instance. self._cb_log = logging.getLogger('qubes.storage.callback') #: Logger instance.
if not isinstance(conf_id, str): if not isinstance(conf_id, str):
raise qubes.storage.StoragePoolException('conf_id is no String. VM attack?!') raise qubes.storage.StoragePoolException('conf_id is no String. VM attack?!')
self._cb_conf_id = conf_id #: Configuration ID as passed to `__init__`. self._cb_conf_id = conf_id #: Configuration ID as passed to `__init__()`.
with open(CallbackPool.config_path) as json_file: with open(CallbackPool.config_path) as json_file:
conf_all = json.load(json_file) conf_all = json.load(json_file)
@ -207,12 +221,13 @@ class CallbackPool(qubes.storage.Pool):
raise qubes.storage.StoragePoolException('The class %s must be a subclass of qubes.storage.Pool.' % cls) raise qubes.storage.StoragePoolException('The class %s must be a subclass of qubes.storage.Pool.' % cls)
self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not. self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
self._cb_init_lock = threading.Lock() #: Lock ensuring that late storage initialization is only run exactly once. Currently a `threading.Lock()` to make it accessible from synchronous code as well.
bdriver_args = self._cb_conf.get('bdriver_args', {}) bdriver_args = self._cb_conf.get('bdriver_args', {})
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver. self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.
super().__init__(name=name, revisions_to_keep=int(bdriver_args.get('revisions_to_keep', 1))) super().__init__(name=name, revisions_to_keep=int(bdriver_args.get('revisions_to_keep', 1)))
self._cb_ctor_done = True self._cb_ctor_done = True
self._callback('on_ctor') self._callback_nocoro('on_ctor')
def _check_init(self): def _check_init(self):
''' Whether or not this object requires late storage initialization via callback. ''' ''' Whether or not this object requires late storage initialization via callback. '''
@ -221,24 +236,38 @@ class CallbackPool(qubes.storage.Pool):
cmd = self._cb_conf.get('cmd') cmd = self._cb_conf.get('cmd')
return bool(cmd and cmd != '-') return bool(cmd and cmd != '-')
@asyncio.coroutine
def _init(self, callback=True): def _init(self, callback=True):
''' Late storage initialization on first use for e.g. decryption on first usage request. ''' Late storage initialization on first use for e.g. decryption on first usage request.
:param callback: Whether to trigger the `on_sinit` callback or not. :param callback: Whether to trigger the `on_sinit` callback or not.
''' '''
#maybe TODO: if this function is meant to be run in parallel (are Pool operations asynchronous?), a function lock is required! with self._cb_init_lock:
if callback: if self._cb_requires_init:
self._callback('on_sinit') if callback:
self._cb_requires_init = False yield from self._callback('on_sinit')
self._cb_requires_init = False
def _init_nocoro(self, callback=True):
''' `_init()` in synchronous code. '''
with self._cb_init_lock:
if self._cb_requires_init:
if callback:
self._callback_nocoro('on_sinit')
self._cb_requires_init = False
@asyncio.coroutine
def _assert_initialized(self, **kwargs): def _assert_initialized(self, **kwargs):
if self._cb_requires_init: if self._cb_requires_init:
self._init(**kwargs) yield from self._init(**kwargs)
def _callback(self, cb, cb_args=None): def _callback_nocoro(self, cb, cb_args=None, handle_signals=True):
'''Run a callback. '''Run a callback (variant that can be used outside of coroutines / from synchronous code).
:param cb: Callback identifier string. :param cb: Callback identifier string.
:param cb_args: Optional list of arguments to pass to the command as last arguments. :param cb_args: Optional list of arguments to pass to the command as last arguments.
Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks. Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks.
:param handle_signals: Attempt to handle signals locally in synchronous code.
May throw an exception, if a callback signal cannot be handled locally.
:return: String with potentially unhandled signals, if `handle_signals` is `False`. Nothing otherwise.
''' '''
if self._cb_ctor_done: if self._cb_ctor_done:
cmd = self._cb_conf.get(cb) cmd = self._cb_conf.get(cb)
@ -249,7 +278,6 @@ class CallbackPool(qubes.storage.Pool):
cmd = self._cb_conf.get('cmd') cmd = self._cb_conf.get('cmd')
args = [self.name, self._cb_conf['bdriver'], cb, self._cb_cmd_arg, *cb_args] args = [self.name, self._cb_conf['bdriver'], cb, self._cb_cmd_arg, *cb_args]
if cmd and cmd != '-': if cmd and cmd != '-':
args = filter(None, args)
args = ' '.join(quote(str(a)) for a in args) args = ' '.join(quote(str(a)) for a in args)
cmd = ' '.join(filter(None, [cmd, args])) cmd = ' '.join(filter(None, [cmd, args]))
self._cb_log.info('callback driver executing (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, cmd) self._cb_log.info('callback driver executing (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, cmd)
@ -258,8 +286,24 @@ class CallbackPool(qubes.storage.Pool):
self._cb_log.debug('callback driver stdout (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stdout) self._cb_log.debug('callback driver stdout (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stdout)
self._cb_log.debug('callback driver stderr (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stderr) self._cb_log.debug('callback driver stderr (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stderr)
if self._cb_conf.get('signal_back', False) is True: if self._cb_conf.get('signal_back', False) is True:
self._process_signals(res.stdout) if handle_signals:
self._process_signals_nocoro(res.stdout)
else:
return res.stdout
return None
@asyncio.coroutine
def _callback(self, cb, cb_args=None):
'''Run a callback.
:param cb: Callback identifier string.
:param cb_args: Optional list of arguments to pass to the command as last arguments.
Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks.
'''
ret = self._callback_nocoro(cb, cb_args=cb_args, handle_signals=False)
if ret:
yield from self._process_signals(ret)
@asyncio.coroutine
def _process_signals(self, out): def _process_signals(self, out):
'''Process any signals found inside a string. '''Process any signals found inside a string.
:param out: String to check for signals. Each signal must be on a dedicated line. :param out: String to check for signals. Each signal must be on a dedicated line.
@ -268,7 +312,18 @@ class CallbackPool(qubes.storage.Pool):
for line in out.splitlines(): for line in out.splitlines():
if line == 'SIGNAL_setup': if line == 'SIGNAL_setup':
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id) self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
self._setup_cb(callback=False) #NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
yield from coro_maybe(self._cb_impl.setup())
def _process_signals_nocoro(self, out):
'''Variant of `process_signals` to be used with synchronous code.
:param out: String to check for signals. Each signal must be on a dedicated line.
They are executed in the order they are found. Callbacks are not triggered.
:raise UnhandledSignalException: If signals cannot be handled here / in synchronous code.
'''
for line in out.splitlines():
if line == 'SIGNAL_setup':
raise UnhandledSignalException(self, line)
@property @property
def config(self): def config(self):
@ -278,23 +333,21 @@ class CallbackPool(qubes.storage.Pool):
'conf_id': self._cb_conf_id, 'conf_id': self._cb_conf_id,
} }
@asyncio.coroutine
def destroy(self): def destroy(self):
self._assert_initialized() yield from self._assert_initialized()
ret = self._cb_impl.destroy() ret = yield from coro_maybe(self._cb_impl.destroy())
self._callback('on_destroy') yield from self._callback('on_destroy')
return ret return ret
def init_volume(self, vm, volume_config): def init_volume(self, vm, volume_config):
return CallbackVolume(self, self._cb_impl.init_volume(vm, volume_config)) return CallbackVolume(self, self._cb_impl.init_volume(vm, volume_config))
def _setup_cb(self, callback=True): @asyncio.coroutine
if callback:
self._callback('on_setup')
self._assert_initialized(callback=False) #setup is assumed to include storage initialization
return self._cb_impl.setup()
def setup(self): def setup(self):
return self._setup_cb() yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
yield from self._callback('on_setup')
return (yield from coro_maybe(self._cb_impl.setup()))
@property @property
def volumes(self): def volumes(self):
@ -365,72 +418,107 @@ class CallbackVolume:
self._cb_pool = pool #: CallbackPool instance the Volume belongs to. self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
self._cb_impl = impl #: Backend volume implementation instance. self._cb_impl = impl #: Backend volume implementation instance.
@asyncio.coroutine
def _assert_initialized(self, **kwargs): def _assert_initialized(self, **kwargs):
return self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
@asyncio.coroutine
def _callback(self, cb, cb_args=None, **kwargs): def _callback(self, cb, cb_args=None, **kwargs):
if cb_args is None: if cb_args is None:
cb_args = [] cb_args = []
vol_args = [self.name, self.vid, *cb_args] vol_args = [self.name, self.vid, *cb_args]
return self._cb_pool._callback(cb, cb_args=vol_args, **kwargs) # pylint: disable=protected-access yield from self._cb_pool._callback(cb, cb_args=vol_args, **kwargs) # pylint: disable=protected-access
@asyncio.coroutine
def create(self): def create(self):
self._assert_initialized() yield from self._assert_initialized()
self._callback('on_volume_create') yield from self._callback('on_volume_create')
return self._cb_impl.create() return (yield from coro_maybe(self._cb_impl.create()))
@asyncio.coroutine
def remove(self): def remove(self):
self._assert_initialized() yield from self._assert_initialized()
ret = self._cb_impl.remove() ret = yield from coro_maybe(self._cb_impl.remove())
self._callback('on_volume_remove') yield from self._callback('on_volume_remove')
return ret return ret
@asyncio.coroutine
def resize(self, size): def resize(self, size):
self._assert_initialized() yield from self._assert_initialized()
self._callback('on_volume_resize', cb_args=[size]) yield from self._callback('on_volume_resize', cb_args=[size])
return self._cb_impl.resize(size) return (yield from coro_maybe(self._cb_impl.resize(size)))
@asyncio.coroutine
def start(self): def start(self):
self._assert_initialized() yield from self._assert_initialized()
self._callback('on_volume_start') yield from self._callback('on_volume_start')
return self._cb_impl.start() return (yield from coro_maybe(self._cb_impl.start()))
@asyncio.coroutine
def stop(self): def stop(self):
self._assert_initialized() yield from self._assert_initialized()
ret = self._cb_impl.stop() ret = yield from coro_maybe(self._cb_impl.stop())
self._callback('on_volume_stop') yield from self._callback('on_volume_stop')
return ret return ret
@asyncio.coroutine
def import_data(self): def import_data(self):
self._assert_initialized() yield from self._assert_initialized()
self._callback('on_volume_import_data') yield from self._callback('on_volume_import_data')
return self._cb_impl.import_data() return (yield from coro_maybe(self._cb_impl.import_data()))
@asyncio.coroutine
def import_data_end(self, success): def import_data_end(self, success):
self._assert_initialized() yield from self._assert_initialized()
ret = self._cb_impl.import_data_end(success) ret = yield from coro_maybe(self._cb_impl.import_data_end(success))
self._callback('on_volume_import_data_end', cb_args=[success]) yield from self._callback('on_volume_import_data_end', cb_args=[success])
return ret return ret
@asyncio.coroutine
def import_volume(self, src_volume): def import_volume(self, src_volume):
self._assert_initialized() yield from self._assert_initialized()
self._callback('on_volume_import', cb_args=[src_volume.vid]) yield from self._callback('on_volume_import', cb_args=[src_volume.vid])
return self._cb_impl.import_volume(src_volume) return (yield from coro_maybe(self._cb_impl.import_volume(src_volume)))
def is_dirty(self): def is_dirty(self):
if self._cb_pool._cb_requires_init: # pylint: disable=protected-access # pylint: disable=protected-access
if self._cb_pool._cb_requires_init:
return False return False
return self._cb_impl.is_dirty() return self._cb_impl.is_dirty()
def is_outdated(self): def is_outdated(self):
if self._cb_pool._cb_requires_init: # pylint: disable=protected-access # pylint: disable=protected-access
if self._cb_pool._cb_requires_init:
return False return False
return self._cb_impl.is_outdated() return self._cb_impl.is_outdated()
def block_device(self):
# pylint: disable=protected-access
if self._cb_pool._cb_requires_init:
# usually Volume.start() is called beforehand
# --> we should be initialized in 99% of cases
return None
return self._cb_impl.block_device()
def export(self, volume):
# pylint: disable=protected-access
#TODO: once this becomes a coroutine in the Volume class, avoid the below blocking & potentially exception-throwing code; maybe also add a callback
if self._cb_pool._cb_requires_init:
self._cb_pool._init_nocoro()
return self._cb_impl.export(volume)
@asyncio.coroutine
def verify(self):
yield from self._assert_initialized()
return (yield from coro_maybe(self._cb_impl.verify()))
@asyncio.coroutine
def revert(self, revision=None):
yield from self._assert_initialized()
return (yield from coro_maybe(self._cb_impl.revert(revision=revision)))
#remaining method & attribute delegation #remaining method & attribute delegation
def __getattr__(self, name): def __getattr__(self, name):
if name in ['block_device', 'verify', 'revert', 'export']:
self._assert_initialized()
return getattr(self._cb_impl, name) return getattr(self._cb_impl, name)
def __setattr__(self, name, value): def __setattr__(self, name, value):