events: add support for async event handlers
See documentation for details.
This commit is contained in:
parent
6238254f49
commit
ea1a04cb19
@ -81,7 +81,7 @@ class EventVisitor(ast.NodeVisitor):
|
||||
# events this way
|
||||
return
|
||||
|
||||
if name.endswith('.fire_event') or name.endswith('.fire_event_pre'):
|
||||
if name.endswith('.fire_event') or name.endswith('.fire_event_async'):
|
||||
# here we throw events; event name is the first argument; sometimes
|
||||
# it is expressed as 'event-stem:' + some_variable
|
||||
eventnode = node.args[0]
|
||||
|
@ -143,6 +143,75 @@ returned to the caller as list. The order of this list is undefined.
|
||||
effect = o.fire_event('event1')
|
||||
|
||||
|
||||
Asynchronous event handling
|
||||
---------------------------
|
||||
|
||||
Event handlers can be defined as coroutine. This way they can execute long
|
||||
running actions without blocking the whole qubesd process. To define
|
||||
asynchronous event handler, annotate a coroutine (a function defined with
|
||||
`async def`, or decorated with `py:func:`asyncio.coroutine`) with
|
||||
py:func:`qubes.events.handler` decorator. By definition, order of
|
||||
such handlers is undefined.
|
||||
|
||||
Asynchronous events can be fired using
|
||||
:py:meth:`qubes.events.Emitter.fire_event_async` method. It will handle both
|
||||
synchronous and asynchronous handlers. It's an error to register asynchronous
|
||||
handler (a coroutine) for synchronous event (the one fired with
|
||||
:py:meth:`qubes.events.Emitter.fire_event`) - it will result in
|
||||
:py:exc:`RuntimeError` exception.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
import qubes.events
|
||||
|
||||
class MyClass(qubes.events.Emitter):
|
||||
@qubes.events.handler('event1', 'event2')
|
||||
@asyncio.coroutine
|
||||
def event_handler(self, event):
|
||||
if event == 'event1':
|
||||
print('Got event 1, starting long running action')
|
||||
yield from asyncio.sleep(10)
|
||||
print('Done')
|
||||
|
||||
o = MyClass()
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(o.fire_event_async('event1'))
|
||||
|
||||
Asynchronous event handlers can also return value - but only a collection, not
|
||||
yield individual values (because of python limitation):
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
import qubes.events
|
||||
|
||||
class MyClass(qubes.events.Emitter):
|
||||
@qubes.events.handler('event1')
|
||||
@asyncio.coroutine
|
||||
def event_handler(self, event):
|
||||
print('Got event, starting long running action')
|
||||
yield from asyncio.sleep(10)
|
||||
return ('result1', 'result2')
|
||||
|
||||
@qubes.events.handler('event1')
|
||||
@asyncio.coroutine
|
||||
def another_handler(self, event):
|
||||
print('Got event, starting long running action')
|
||||
yield from asyncio.sleep(10)
|
||||
return ('result3', 'result4')
|
||||
|
||||
@qubes.events.handler('event1')
|
||||
def synchronous_handler(self, event):
|
||||
yield 'sync result'
|
||||
|
||||
o = MyClass()
|
||||
loop = asyncio.get_event_loop()
|
||||
# returns ['sync result', 'result1', 'result2', 'result3', 'result4'],
|
||||
# possibly not in order
|
||||
effects = loop.run_until_complete(o.fire_event_async('event1'))
|
||||
|
||||
|
||||
Module contents
|
||||
---------------
|
||||
|
||||
|
@ -24,7 +24,7 @@
|
||||
Events are fired when something happens, like VM start or stop, property change
|
||||
etc.
|
||||
'''
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
|
||||
import itertools
|
||||
@ -36,14 +36,14 @@ def handler(*events):
|
||||
To hook an event, decorate a method in your plugin class with this
|
||||
decorator.
|
||||
|
||||
It probably makes no sense to specify more than one handler for specific
|
||||
event in one class, because handlers are not run concurrently and there is
|
||||
no guarantee of the order of execution.
|
||||
Some event handlers may be defined as coroutine. In such a case, *async*
|
||||
should be set to :py:obj:``True``.
|
||||
See appropriate event documentation for details.
|
||||
|
||||
.. note::
|
||||
For hooking events from extensions, see :py:func:`qubes.ext.handler`.
|
||||
|
||||
:param str event: event type
|
||||
:param str events: events
|
||||
'''
|
||||
|
||||
def decorator(func):
|
||||
@ -141,13 +141,14 @@ class Emitter(object, metaclass=EmitterMeta):
|
||||
'''
|
||||
|
||||
if not self.events_enabled:
|
||||
return []
|
||||
return [], []
|
||||
|
||||
order = itertools.chain((self,), self.__class__.__mro__)
|
||||
if not pre_event:
|
||||
order = reversed(list(order))
|
||||
|
||||
effects = []
|
||||
async_effects = []
|
||||
for i in order:
|
||||
try:
|
||||
handlers_dict = i.__handlers__
|
||||
@ -160,9 +161,11 @@ class Emitter(object, metaclass=EmitterMeta):
|
||||
key=(lambda handler: hasattr(handler, 'ha_bound')),
|
||||
reverse=True):
|
||||
effect = func(self, event, **kwargs)
|
||||
if effect is not None:
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
async_effects.append(effect)
|
||||
elif effect is not None:
|
||||
effects.extend(effect)
|
||||
return effects
|
||||
return effects, async_effects
|
||||
|
||||
def fire_event(self, event, pre_event=False, **kwargs):
|
||||
'''Call all handlers for an event.
|
||||
@ -173,6 +176,13 @@ class Emitter(object, metaclass=EmitterMeta):
|
||||
(specified in class definition), then handlers from extensions. Aside
|
||||
from above, remaining order is undefined.
|
||||
|
||||
This method call only synchronous handlers. If any asynchronous
|
||||
handler is registered for the event, :py:class:``RuntimeError`` is
|
||||
raised.
|
||||
|
||||
.. seealso::
|
||||
:py:meth:`fire_event_async`
|
||||
|
||||
:param str event: event identifier
|
||||
:param pre_event: is this -pre- event? reverse handlers calling order
|
||||
:returns: list of effects
|
||||
@ -181,4 +191,46 @@ class Emitter(object, metaclass=EmitterMeta):
|
||||
events.
|
||||
'''
|
||||
|
||||
return self._fire_event(event, kwargs, pre_event=pre_event)
|
||||
sync_effects, async_effects = self._fire_event(event, kwargs,
|
||||
pre_event=pre_event)
|
||||
if async_effects:
|
||||
raise RuntimeError(
|
||||
'unexpected async-handler(s) {!r} for sync event {!s}'.format(
|
||||
async_effects, event))
|
||||
return sync_effects
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def fire_event_async(self, event, pre_event=False, **kwargs):
|
||||
'''Call all handlers for an event, allowing async calls.
|
||||
|
||||
Handlers are called for class and all parent classes, in **reversed**
|
||||
or **true** (depending on *pre_event* parameter)
|
||||
method resolution order. For each class first are called bound handlers
|
||||
(specified in class definition), then handlers from extensions. Aside
|
||||
from above, remaining order is undefined.
|
||||
|
||||
This method call both synchronous and asynchronous handlers. Order of
|
||||
asynchronous calls is, by definition, undefined.
|
||||
|
||||
.. seealso::
|
||||
:py:meth:`fire_event`
|
||||
|
||||
:param str event: event identifier
|
||||
:param pre_event: is this -pre- event? reverse handlers calling order
|
||||
:returns: list of effects
|
||||
|
||||
All *kwargs* are passed verbatim. They are different for different
|
||||
events.
|
||||
'''
|
||||
|
||||
sync_effects, async_effects = self._fire_event(event,
|
||||
kwargs, pre_event=pre_event)
|
||||
effects = sync_effects
|
||||
if async_effects:
|
||||
async_tasks, _ = yield from asyncio.wait(async_effects)
|
||||
for task in async_tasks:
|
||||
effect = task.result()
|
||||
if effect is not None:
|
||||
effects.extend(effect)
|
||||
return effects
|
||||
|
@ -159,6 +159,18 @@ class TestEmitter(qubes.events.Emitter):
|
||||
self.fired_events[(event, ev_kwargs)] += 1
|
||||
return effects
|
||||
|
||||
@asyncio.coroutine
|
||||
def fire_event_async(self, event, pre_event=False, **kwargs):
|
||||
effects = yield from super(TestEmitter, self).fire_event_async(
|
||||
event, pre_event=pre_event, **kwargs)
|
||||
ev_kwargs = frozenset(
|
||||
(key,
|
||||
frozenset(value.items()) if isinstance(value, dict) else value)
|
||||
for key, value in kwargs.items()
|
||||
)
|
||||
self.fired_events[(event, ev_kwargs)] += 1
|
||||
return effects
|
||||
|
||||
|
||||
def expectedFailureIfTemplate(templates):
|
||||
"""
|
||||
|
@ -18,6 +18,7 @@
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
import asyncio
|
||||
|
||||
import qubes.events
|
||||
import qubes.tests
|
||||
@ -134,3 +135,36 @@ class TC_00_Emitter(qubes.tests.QubesTestCase):
|
||||
['testevent_2', 'testevent_1'])
|
||||
self.assertEqual(list(effect2),
|
||||
['testevent_1'])
|
||||
|
||||
def test_005_fire_for_effect_async(self):
|
||||
class TestEmitter(qubes.events.Emitter):
|
||||
@qubes.events.handler('testevent')
|
||||
@asyncio.coroutine
|
||||
def on_testevent_1(self, event):
|
||||
pass
|
||||
|
||||
@qubes.events.handler('testevent')
|
||||
@asyncio.coroutine
|
||||
def on_testevent_2(self, event):
|
||||
yield from asyncio.sleep(0.01)
|
||||
return ['testvalue1']
|
||||
|
||||
@qubes.events.handler('testevent')
|
||||
@asyncio.coroutine
|
||||
def on_testevent_3(self, event):
|
||||
return ('testvalue2', 'testvalue3')
|
||||
|
||||
@qubes.events.handler('testevent')
|
||||
def on_testevent_4(self, event):
|
||||
return ('testvalue4',)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
emitter = TestEmitter()
|
||||
emitter.events_enabled = True
|
||||
|
||||
effect = loop.run_until_complete(emitter.fire_event_async('testevent'))
|
||||
loop.close()
|
||||
asyncio.set_event_loop(None)
|
||||
|
||||
self.assertCountEqual(effect,
|
||||
('testvalue1', 'testvalue2', 'testvalue3', 'testvalue4'))
|
||||
|
Loading…
Reference in New Issue
Block a user