diff --git a/qubesmgmt/config.py b/qubesmgmt/config.py index 22cb271..4a08944 100644 --- a/qubesmgmt/config.py +++ b/qubesmgmt/config.py @@ -24,3 +24,4 @@ QUBESD_SOCKET = '/var/run/qubesd.sock' QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client' QREXEC_CLIENT_VM = '/usr/bin/qrexec-client-vm' +QUBESD_RECONNECT_DELAY = 1.0 diff --git a/qubesmgmt/events/__init__.py b/qubesmgmt/events/__init__.py index cd9ab84..5b805c7 100644 --- a/qubesmgmt/events/__init__.py +++ b/qubesmgmt/events/__init__.py @@ -102,7 +102,7 @@ class EventsDispatcher(object): return reader, cleanup_func @asyncio.coroutine - def listen_for_events(self, vm=None): + def listen_for_events(self, vm=None, reconnect=True): ''' Listen for events and call appropriate handlers. This function do not exit until manually terminated. @@ -111,40 +111,67 @@ class EventsDispatcher(object): :param vm: Listen for events only for this VM, use None to listen for events about all VMs and not related to any particular VM. - :return: None + :param reconnect: should reconnect to qubesd if connection is + interrupted? + :rtype: None + ''' + while True: + try: + yield from self._listen_for_events(vm) + except ConnectionRefusedError: + pass + if not reconnect: + break + self.app.log.warning( + 'Connection to qubesd terminated, reconnecting in {} ' + 'seconds'.format(qubesmgmt.config.QUBESD_RECONNECT_DELAY)) + # avoid busy-loop if qubesd is dead + yield from asyncio.sleep(qubesmgmt.config.QUBESD_RECONNECT_DELAY) + + @asyncio.coroutine + def _listen_for_events(self, vm=None): + ''' + Listen for events and call appropriate handlers. + This function do not exit until manually terminated. + + This is coroutine. + + :param vm: Listen for events only for this VM, use None to listen for + events about all VMs and not related to any particular VM. + :return: True if any event was received, otherwise False + :rtype: bool ''' + reader, cleanup_func = yield from self._get_events_reader(vm) try: - reader, cleanup_func = yield from self._get_events_reader(vm) - except asyncio.CancelledError: - return + some_event_received = False + while not reader.at_eof(): + try: + event_data = yield from reader.readuntil(b'\0\0') + if event_data == b'1\0\0': + # event with non-VM subject contains \0\0 inside of + # event, need to receive rest of the data + event_data += yield from reader.readuntil(b'\0\0') + except asyncio.IncompleteReadError as err: + if err.partial == b'': + break + else: + raise - while not reader.at_eof(): - try: - event_data = yield from reader.readuntil(b'\0\0') - if event_data == b'1\0\0': - # event with non-VM subject contains \0\0 inside of - # event, need to receive rest of the data - event_data += yield from reader.readuntil(b'\0\0') - except asyncio.CancelledError: - break - except asyncio.IncompleteReadError as err: - if err.partial == b'': - break - else: - raise + if not event_data.startswith(b'1\0'): + raise qubesmgmt.exc.QubesDaemonCommunicationError( + 'Non-event received on events connection: ' + + repr(event_data)) + event_data = event_data.decode('utf-8') + _, subject, event, *kwargs = event_data.split('\0') + # convert list to dict, remove last empty entry + kwargs = dict(zip(kwargs[:-2:2], kwargs[1:-2:2])) + self.handle(subject, event, **kwargs) - if not event_data.startswith(b'1\0'): - raise qubesmgmt.exc.QubesDaemonCommunicationError( - 'Non-event received on events connection: ' - + repr(event_data)) - event_data = event_data.decode('utf-8') - _, subject, event, *kwargs = event_data.split('\0') - # convert list to dict, remove last empty entry - kwargs = dict(zip(kwargs[:-2:2], kwargs[1:-2:2])) - self.handle(subject, event, **kwargs) - - cleanup_func() + some_event_received = True + finally: + cleanup_func() + return some_event_received def handle(self, subject, event, **kwargs): '''Call handlers for given event''' diff --git a/qubesmgmt/tests/events.py b/qubesmgmt/tests/events.py index a593494..4dfccb2 100644 --- a/qubesmgmt/tests/events.py +++ b/qubesmgmt/tests/events.py @@ -111,7 +111,8 @@ class TC_00_Events(qubesmgmt.tests.QubesTestCase): b'1\0some-vm\0other-event\0\0', ] asyncio.ensure_future(self.send_events(stream, events)) - loop.run_until_complete(self.dispatcher.listen_for_events()) + loop.run_until_complete(self.dispatcher.listen_for_events( + reconnect=False)) self.assertEqual(handler.mock_calls, [ unittest.mock.call(None, 'some-event', arg1='value1'), unittest.mock.call( diff --git a/qubesmgmt/tools/qvm_start_gui.py b/qubesmgmt/tools/qvm_start_gui.py index 85aba57..f60b29b 100644 --- a/qubesmgmt/tools/qvm_start_gui.py +++ b/qubesmgmt/tools/qvm_start_gui.py @@ -209,7 +209,10 @@ def main(args=None): loop.add_signal_handler(getattr(signal, signame), events_listener.cancel) # pylint: disable=no-member - loop.run_until_complete(events_listener) + try: + loop.run_until_complete(events_listener) + except asyncio.CancelledError: + pass loop.stop() loop.run_forever() loop.close()