Revert "storage/callback: do not run sync code async"
This reverts commit 287a4a0429
.
As Marek correctly pointed out, sync functions cannot be run async against one another even if run inside an async function
(the python interpreter will remain active until the next yield and that's at the end of the sync func / inside the async function).
--> So there's no need for a lock.
I still cannot protect against assumptions made by sync code authors about blocking the Qubes OS main loop. Those will be broken.
Moreover the code of this commit was botched anyway.
This commit is contained in:
parent
fdceb064fe
commit
3db5e9f8bf
@ -24,6 +24,7 @@ import subprocess
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
from shlex import quote
|
from shlex import quote
|
||||||
|
from qubes.utils import coro_maybe
|
||||||
|
|
||||||
import qubes.storage
|
import qubes.storage
|
||||||
|
|
||||||
@ -44,6 +45,13 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
- custom pool mounts
|
- custom pool mounts
|
||||||
- encryption
|
- encryption
|
||||||
- debugging
|
- debugging
|
||||||
|
- run synchronous pool drivers asynchronously
|
||||||
|
|
||||||
|
A word of caution:
|
||||||
|
This implementation runs all methods that `qubes.storage.Pool` allows to be asynchronous asynchronously. So if a backend pool driver does
|
||||||
|
not support a particular method to be run asynchronously, there may be issues. In short, it is always preferable to use the original backend
|
||||||
|
driver over this one unless the functionality of this driver is required for a particular use case.
|
||||||
|
|
||||||
|
|
||||||
**Integration tests**:
|
**Integration tests**:
|
||||||
(all of these tests assume the `qubes_callback.json.example` configuration)
|
(all of these tests assume the `qubes_callback.json.example` configuration)
|
||||||
@ -226,7 +234,6 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
|
|
||||||
self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
|
self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
|
||||||
self._cb_init_lock = asyncio.Lock() #: Lock ensuring that late storage initialization is only run exactly once.
|
self._cb_init_lock = asyncio.Lock() #: Lock ensuring that late storage initialization is only run exactly once.
|
||||||
self._cb_sync_lock = asyncio.Lock() #: Lock to prevent sync code from running async.
|
|
||||||
bdriver_args = self._cb_conf.get('bdriver_args', {})
|
bdriver_args = self._cb_conf.get('bdriver_args', {})
|
||||||
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.
|
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.
|
||||||
|
|
||||||
@ -234,13 +241,6 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
self._cb_ctor_done = True
|
self._cb_ctor_done = True
|
||||||
self._callback_nocoro('post_ctor')
|
self._callback_nocoro('post_ctor')
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def _coro_maybe(self, value):
|
|
||||||
if asyncio.iscoroutine(value):
|
|
||||||
return (yield from value)
|
|
||||||
with (yield from self._cb_sync_lock):
|
|
||||||
return value
|
|
||||||
|
|
||||||
def _check_init(self):
|
def _check_init(self):
|
||||||
''' Whether or not this object requires late storage initialization via callback. '''
|
''' Whether or not this object requires late storage initialization via callback. '''
|
||||||
cmd = self._cb_conf.get('pre_sinit')
|
cmd = self._cb_conf.get('pre_sinit')
|
||||||
@ -317,7 +317,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
if line == 'SIGNAL_setup':
|
if line == 'SIGNAL_setup':
|
||||||
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
|
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
|
||||||
#NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
|
#NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
|
||||||
yield from self._coro_maybe(self._cb_impl.setup())
|
yield from coro_maybe(self._cb_impl.setup())
|
||||||
|
|
||||||
def _process_signals_nocoro(self, out):
|
def _process_signals_nocoro(self, out):
|
||||||
'''Variant of `process_signals` to be used with synchronous code.
|
'''Variant of `process_signals` to be used with synchronous code.
|
||||||
@ -347,7 +347,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def destroy(self):
|
def destroy(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from self._coro_maybe(self._cb_impl.destroy())
|
ret = yield from coro_maybe(self._cb_impl.destroy())
|
||||||
yield from self._callback('post_destroy')
|
yield from self._callback('post_destroy')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -360,7 +360,7 @@ class CallbackPool(qubes.storage.Pool):
|
|||||||
def setup(self):
|
def setup(self):
|
||||||
yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
|
yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
|
||||||
yield from self._callback('pre_setup')
|
yield from self._callback('pre_setup')
|
||||||
return (yield from self._coro_maybe(self._cb_impl.setup()))
|
return (yield from coro_maybe(self._cb_impl.setup()))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def volumes(self):
|
def volumes(self):
|
||||||
@ -455,10 +455,6 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
|
self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
|
||||||
self._cb_impl = impl #: Backend volume implementation instance.
|
self._cb_impl = impl #: Backend volume implementation instance.
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def _coro_maybe(self, value):
|
|
||||||
return (yield from self._cb_pool._coro_maybe(value)) # pylint: disable=protected-access
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _assert_initialized(self, **kwargs):
|
def _assert_initialized(self, **kwargs):
|
||||||
yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
|
yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
|
||||||
@ -481,12 +477,12 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def create(self):
|
def create(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_create')
|
yield from self._callback('pre_volume_create')
|
||||||
return (yield from self._coro_maybe(self._cb_impl.create()))
|
return (yield from coro_maybe(self._cb_impl.create()))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def remove(self):
|
def remove(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from self._coro_maybe(self._cb_impl.remove())
|
ret = yield from coro_maybe(self._cb_impl.remove())
|
||||||
yield from self._callback('post_volume_remove')
|
yield from self._callback('post_volume_remove')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -494,20 +490,20 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def resize(self, size):
|
def resize(self, size):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_resize', cb_args=[size])
|
yield from self._callback('pre_volume_resize', cb_args=[size])
|
||||||
return (yield from self._coro_maybe(self._cb_impl.resize(size)))
|
return (yield from coro_maybe(self._cb_impl.resize(size)))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def start(self):
|
def start(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_start')
|
yield from self._callback('pre_volume_start')
|
||||||
ret = yield from self._coro_maybe(self._cb_impl.start())
|
ret = yield from coro_maybe(self._cb_impl.start())
|
||||||
yield from self._callback('post_volume_start')
|
yield from self._callback('post_volume_start')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def stop(self):
|
def stop(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from self._coro_maybe(self._cb_impl.stop())
|
ret = yield from coro_maybe(self._cb_impl.stop())
|
||||||
yield from self._callback('post_volume_stop')
|
yield from self._callback('post_volume_stop')
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -515,12 +511,12 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def import_data(self, size):
|
def import_data(self, size):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_import_data', cb_args=[size])
|
yield from self._callback('pre_volume_import_data', cb_args=[size])
|
||||||
return (yield from self._coro_maybe(self._cb_impl.import_data(size)))
|
return (yield from coro_maybe(self._cb_impl.import_data(size)))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def import_data_end(self, success):
|
def import_data_end(self, success):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from self._coro_maybe(self._cb_impl.import_data_end(success))
|
ret = yield from coro_maybe(self._cb_impl.import_data_end(success))
|
||||||
yield from self._callback('post_volume_import_data_end', cb_args=[success])
|
yield from self._callback('post_volume_import_data_end', cb_args=[success])
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -528,7 +524,7 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def import_volume(self, src_volume):
|
def import_volume(self, src_volume):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_import', cb_args=[src_volume.vid])
|
yield from self._callback('pre_volume_import', cb_args=[src_volume.vid])
|
||||||
return (yield from self._coro_maybe(self._cb_impl.import_volume(src_volume)))
|
return (yield from coro_maybe(self._cb_impl.import_volume(src_volume)))
|
||||||
|
|
||||||
def is_dirty(self):
|
def is_dirty(self):
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
@ -570,24 +566,24 @@ class CallbackVolume(qubes.storage.Volume):
|
|||||||
def export(self):
|
def export(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
yield from self._callback('pre_volume_export')
|
yield from self._callback('pre_volume_export')
|
||||||
return (yield from self._coro_maybe(self._cb_impl.export()))
|
return (yield from coro_maybe(self._cb_impl.export()))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def export_end(self, path):
|
def export_end(self, path):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
ret = yield from self._coro_maybe(self._cb_impl.export_end(path))
|
ret = yield from coro_maybe(self._cb_impl.export_end(path))
|
||||||
yield from self._callback('post_volume_export_end', cb_args=[path])
|
yield from self._callback('post_volume_export_end', cb_args=[path])
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def verify(self):
|
def verify(self):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
return (yield from self._coro_maybe(self._cb_impl.verify()))
|
return (yield from coro_maybe(self._cb_impl.verify()))
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def revert(self, revision=None):
|
def revert(self, revision=None):
|
||||||
yield from self._assert_initialized()
|
yield from self._assert_initialized()
|
||||||
return (yield from self._coro_maybe(self._cb_impl.revert(revision=revision)))
|
return (yield from coro_maybe(self._cb_impl.revert(revision=revision)))
|
||||||
|
|
||||||
#shadow all qubes.storage.Volume class attributes as instance properties
|
#shadow all qubes.storage.Volume class attributes as instance properties
|
||||||
#NOTE: this will cause a subtle difference to using an actual _cb_impl instance: CallbackVolume.devtype will return a property object, Volume.devtype the actual value
|
#NOTE: this will cause a subtle difference to using an actual _cb_impl instance: CallbackVolume.devtype will return a property object, Volume.devtype the actual value
|
||||||
|
Loading…
Reference in New Issue
Block a user