storage/callback: do not run sync code async
This commit is contained in:
parent
a53781b114
commit
287a4a0429
@ -24,7 +24,6 @@ import subprocess
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
from shlex import quote
|
from shlex import quote
|
||||||
from qubes.utils import coro_maybe
|
|
||||||
|
|
||||||
import qubes.storage
|
import qubes.storage
|
||||||
|
|
||||||
@ -45,13 +44,6 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
- custom pool mounts
|
- custom pool mounts
|
||||||
- encryption
|
- encryption
|
||||||
- debugging
|
- debugging
|
||||||
- run synchronous pool drivers asynchronously
|
|
||||||
|
|
||||||
A word of caution:
|
|
||||||
This implementation runs all methods that `qubes.storage.Pool` allows to be asynchronous asynchronously. So if a backend pool driver does
|
|
||||||
not support a particular method to be run asynchronously, there may be issues. In short, it is always preferable to use the original backend
|
|
||||||
driver over this one unless the functionality of this driver is required for a particular use case.
|
|
||||||
|
|
||||||
|
|
||||||
**Integration tests**:
|
**Integration tests**:
|
||||||
(all of these tests assume the `qubes_callback.json.example` configuration)
|
(all of these tests assume the `qubes_callback.json.example` configuration)
|
||||||
@ -234,6 +226,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
|
|
||||||
self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
|
self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
|
||||||
self._cb_init_lock = asyncio.Lock() #: Lock ensuring that late storage initialization is only run exactly once.
|
self._cb_init_lock = asyncio.Lock() #: Lock ensuring that late storage initialization is only run exactly once.
|
||||||
|
self._cb_sync_lock = asyncio.Lock() #: Lock to prevent sync code from running async.
|
||||||
bdriver_args = self._cb_conf.get('bdriver_args', {})
|
bdriver_args = self._cb_conf.get('bdriver_args', {})
|
||||||
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.
|
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.
|
||||||
|
|
||||||
@ -241,6 +234,13 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
self._cb_ctor_done = True
|
self._cb_ctor_done = True
|
||||||
self._callback_nocoro('post_ctor')
|
self._callback_nocoro('post_ctor')
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _coro_maybe(self, value):
|
||||||
|
if asyncio.iscoroutine(value):
|
||||||
|
return (yield from value)
|
||||||
|
with (yield from self._cb_sync_lock):
|
||||||
|
return value
|
||||||
|
|
||||||
def _check_init(self):
|
def _check_init(self):
|
||||||
''' Whether or not this object requires late storage initialization via callback. '''
|
''' Whether or not this object requires late storage initialization via callback. '''
|
||||||
cmd = self._cb_conf.get('pre_sinit')
|
cmd = self._cb_conf.get('pre_sinit')
|
||||||
@ -317,7 +317,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
if line == 'SIGNAL_setup':
|
if line == 'SIGNAL_setup':
|
||||||
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
|
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
|
||||||
#NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
|
#NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
|
||||||
yield from coro_maybe(self._cb_impl.setup())
|
yield from self._coro_maybe(self._cb_impl.setup())
|
||||||
|
|
||||||
def _process_signals_nocoro(self, out):
|
def _process_signals_nocoro(self, out):
|
||||||
'''Variant of `process_signals` to be used with synchronous code.
|
'''Variant of `process_signals` to be used with synchronous code.
|
||||||
@ -347,7 +347,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def destroy(self):
|
def destroy(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from coro_maybe(self._cb_impl.destroy())
|
ret = yield from self._coro_maybe(self._cb_impl.destroy())
|
||||||
yield from self._callback('post_destroy')
|
yield from self._callback('post_destroy')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -360,7 +360,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
def setup(self):
|
def setup(self):
|
||||||
yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
|
yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
|
||||||
yield from self._callback('pre_setup')
|
yield from self._callback('pre_setup')
|
||||||
return (yield from coro_maybe(self._cb_impl.setup()))
|
return (yield from self._coro_maybe(self._cb_impl.setup()))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def volumes(self):
|
def volumes(self):
|
||||||
@ -455,6 +455,10 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
|
self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
|
||||||
self._cb_impl = impl #: Backend volume implementation instance.
|
self._cb_impl = impl #: Backend volume implementation instance.
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _coro_maybe(self, value):
|
||||||
|
return (yield from self._cb_pool._coro_maybe(value)) # pylint: disable=protected-access
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _assert_initialized(self, **kwargs):
|
def _assert_initialized(self, **kwargs):
|
||||||
yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
|
yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
|
||||||
@ -477,12 +481,12 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def create(self):
|
def create(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_create')
|
yield from self._callback('pre_volume_create')
|
||||||
return (yield from coro_maybe(self._cb_impl.create()))
|
return (yield from self._coro_maybe(self._cb_impl.create()))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def remove(self):
|
def remove(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from coro_maybe(self._cb_impl.remove())
|
ret = yield from self._coro_maybe(self._cb_impl.remove())
|
||||||
yield from self._callback('post_volume_remove')
|
yield from self._callback('post_volume_remove')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -490,20 +494,20 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def resize(self, size):
|
def resize(self, size):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_resize', cb_args=[size])
|
yield from self._callback('pre_volume_resize', cb_args=[size])
|
||||||
return (yield from coro_maybe(self._cb_impl.resize(size)))
|
return (yield from self._coro_maybe(self._cb_impl.resize(size)))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start(self):
|
def start(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_start')
|
yield from self._callback('pre_volume_start')
|
||||||
ret = yield from coro_maybe(self._cb_impl.start())
|
ret = yield from self._coro_maybe(self._cb_impl.start())
|
||||||
yield from self._callback('post_volume_start')
|
yield from self._callback('post_volume_start')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from coro_maybe(self._cb_impl.stop())
|
ret = yield from self._coro_maybe(self._cb_impl.stop())
|
||||||
yield from self._callback('post_volume_stop')
|
yield from self._callback('post_volume_stop')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -511,12 +515,12 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def import_data(self, size):
|
def import_data(self, size):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_import_data', cb_args=[size])
|
yield from self._callback('pre_volume_import_data', cb_args=[size])
|
||||||
return (yield from coro_maybe(self._cb_impl.import_data(size)))
|
return (yield from self._coro_maybe(self._cb_impl.import_data(size)))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def import_data_end(self, success):
|
def import_data_end(self, success):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from coro_maybe(self._cb_impl.import_data_end(success))
|
ret = yield from self._coro_maybe(self._cb_impl.import_data_end(success))
|
||||||
yield from self._callback('post_volume_import_data_end', cb_args=[success])
|
yield from self._callback('post_volume_import_data_end', cb_args=[success])
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -524,7 +528,7 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def import_volume(self, src_volume):
|
def import_volume(self, src_volume):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_import', cb_args=[src_volume.vid])
|
yield from self._callback('pre_volume_import', cb_args=[src_volume.vid])
|
||||||
return (yield from coro_maybe(self._cb_impl.import_volume(src_volume)))
|
return (yield from self._coro_maybe(self._cb_impl.import_volume(src_volume)))
|
||||||
|
|
||||||
def is_dirty(self):
|
def is_dirty(self):
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
@ -566,24 +570,24 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def export(self):
|
def export(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_export')
|
yield from self._callback('pre_volume_export')
|
||||||
return (yield from coro_maybe(self._cb_impl.export()))
|
return (yield from self._coro_maybe(self._cb_impl.export()))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def export_end(self, path):
|
def export_end(self, path):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from coro_maybe(self._cb_impl.export_end(path))
|
ret = yield from self._coro_maybe(self._cb_impl.export_end(path))
|
||||||
yield from self._callback('post_volume_export_end', cb_args=[path])
|
yield from self._callback('post_volume_export_end', cb_args=[path])
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def verify(self):
|
def verify(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
return (yield from coro_maybe(self._cb_impl.verify()))
|
return (yield from self._coro_maybe(self._cb_impl.verify()))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def revert(self, revision=None):
|
def revert(self, revision=None):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
return (yield from coro_maybe(self._cb_impl.revert(revision=revision)))
|
return (yield from self._coro_maybe(self._cb_impl.revert(revision=revision)))
|
||||||
|
|
||||||
#shadow all qubes.storage.Volume class attributes as instance properties
|
#shadow all qubes.storage.Volume class attributes as instance properties
|
||||||
#NOTE: this will cause a subtle difference to using an actual _cb_impl instance: CallbackVolume.devtype will return a property object, Volume.devtype the actual value
|
#NOTE: this will cause a subtle difference to using an actual _cb_impl instance: CallbackVolume.devtype will return a property object, Volume.devtype the actual value
|
||||||
|
Loading…
Reference in New Issue
Block a user