events: add qubesd reconnection support
If connection is interrupted (for example qubesd restart), attempt to reconnect.
This commit is contained in:
parent
6dd7c69b3f
commit
ef683485e2
@ -24,3 +24,4 @@
|
||||
QUBESD_SOCKET = '/var/run/qubesd.sock'
|
||||
QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client'
|
||||
QREXEC_CLIENT_VM = '/usr/bin/qrexec-client-vm'
|
||||
QUBESD_RECONNECT_DELAY = 1.0
|
||||
|
@ -102,7 +102,7 @@ class EventsDispatcher(object):
|
||||
return reader, cleanup_func
|
||||
|
||||
@asyncio.coroutine
|
||||
def listen_for_events(self, vm=None):
|
||||
def listen_for_events(self, vm=None, reconnect=True):
|
||||
'''
|
||||
Listen for events and call appropriate handlers.
|
||||
This function do not exit until manually terminated.
|
||||
@ -111,40 +111,67 @@ class EventsDispatcher(object):
|
||||
|
||||
:param vm: Listen for events only for this VM, use None to listen for
|
||||
events about all VMs and not related to any particular VM.
|
||||
:return: None
|
||||
:param reconnect: should reconnect to qubesd if connection is
|
||||
interrupted?
|
||||
:rtype: None
|
||||
'''
|
||||
while True:
|
||||
try:
|
||||
yield from self._listen_for_events(vm)
|
||||
except ConnectionRefusedError:
|
||||
pass
|
||||
if not reconnect:
|
||||
break
|
||||
self.app.log.warning(
|
||||
'Connection to qubesd terminated, reconnecting in {} '
|
||||
'seconds'.format(qubesmgmt.config.QUBESD_RECONNECT_DELAY))
|
||||
# avoid busy-loop if qubesd is dead
|
||||
yield from asyncio.sleep(qubesmgmt.config.QUBESD_RECONNECT_DELAY)
|
||||
|
||||
@asyncio.coroutine
|
||||
def _listen_for_events(self, vm=None):
|
||||
'''
|
||||
Listen for events and call appropriate handlers.
|
||||
This function do not exit until manually terminated.
|
||||
|
||||
This is coroutine.
|
||||
|
||||
:param vm: Listen for events only for this VM, use None to listen for
|
||||
events about all VMs and not related to any particular VM.
|
||||
:return: True if any event was received, otherwise False
|
||||
:rtype: bool
|
||||
'''
|
||||
|
||||
reader, cleanup_func = yield from self._get_events_reader(vm)
|
||||
try:
|
||||
reader, cleanup_func = yield from self._get_events_reader(vm)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
some_event_received = False
|
||||
while not reader.at_eof():
|
||||
try:
|
||||
event_data = yield from reader.readuntil(b'\0\0')
|
||||
if event_data == b'1\0\0':
|
||||
# event with non-VM subject contains \0\0 inside of
|
||||
# event, need to receive rest of the data
|
||||
event_data += yield from reader.readuntil(b'\0\0')
|
||||
except asyncio.IncompleteReadError as err:
|
||||
if err.partial == b'':
|
||||
break
|
||||
else:
|
||||
raise
|
||||
|
||||
while not reader.at_eof():
|
||||
try:
|
||||
event_data = yield from reader.readuntil(b'\0\0')
|
||||
if event_data == b'1\0\0':
|
||||
# event with non-VM subject contains \0\0 inside of
|
||||
# event, need to receive rest of the data
|
||||
event_data += yield from reader.readuntil(b'\0\0')
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except asyncio.IncompleteReadError as err:
|
||||
if err.partial == b'':
|
||||
break
|
||||
else:
|
||||
raise
|
||||
if not event_data.startswith(b'1\0'):
|
||||
raise qubesmgmt.exc.QubesDaemonCommunicationError(
|
||||
'Non-event received on events connection: '
|
||||
+ repr(event_data))
|
||||
event_data = event_data.decode('utf-8')
|
||||
_, subject, event, *kwargs = event_data.split('\0')
|
||||
# convert list to dict, remove last empty entry
|
||||
kwargs = dict(zip(kwargs[:-2:2], kwargs[1:-2:2]))
|
||||
self.handle(subject, event, **kwargs)
|
||||
|
||||
if not event_data.startswith(b'1\0'):
|
||||
raise qubesmgmt.exc.QubesDaemonCommunicationError(
|
||||
'Non-event received on events connection: '
|
||||
+ repr(event_data))
|
||||
event_data = event_data.decode('utf-8')
|
||||
_, subject, event, *kwargs = event_data.split('\0')
|
||||
# convert list to dict, remove last empty entry
|
||||
kwargs = dict(zip(kwargs[:-2:2], kwargs[1:-2:2]))
|
||||
self.handle(subject, event, **kwargs)
|
||||
|
||||
cleanup_func()
|
||||
some_event_received = True
|
||||
finally:
|
||||
cleanup_func()
|
||||
return some_event_received
|
||||
|
||||
def handle(self, subject, event, **kwargs):
|
||||
'''Call handlers for given event'''
|
||||
|
@ -111,7 +111,8 @@ class TC_00_Events(qubesmgmt.tests.QubesTestCase):
|
||||
b'1\0some-vm\0other-event\0\0',
|
||||
]
|
||||
asyncio.ensure_future(self.send_events(stream, events))
|
||||
loop.run_until_complete(self.dispatcher.listen_for_events())
|
||||
loop.run_until_complete(self.dispatcher.listen_for_events(
|
||||
reconnect=False))
|
||||
self.assertEqual(handler.mock_calls, [
|
||||
unittest.mock.call(None, 'some-event', arg1='value1'),
|
||||
unittest.mock.call(
|
||||
|
@ -209,7 +209,10 @@ def main(args=None):
|
||||
loop.add_signal_handler(getattr(signal, signame),
|
||||
events_listener.cancel) # pylint: disable=no-member
|
||||
|
||||
loop.run_until_complete(events_listener)
|
||||
try:
|
||||
loop.run_until_complete(events_listener)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
loop.stop()
|
||||
loop.run_forever()
|
||||
loop.close()
|
||||
|
Loading…
Reference in New Issue
Block a user