# -*- encoding: utf8 -*- # # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2017 Marek Marczykowski-Górecki # # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License along # with this program; if not, see . '''Event handling implementation, require Python >=3.5.2 for asyncio.''' import asyncio import fnmatch import subprocess import qubesadmin.config import qubesadmin.exc class EventsDispatcher(object): ''' Events dispatcher, responsible for receiving events and calling appropriate handlers''' def __init__(self, app, api_method='admin.Events', enable_cache=True): """Initialize EventsDispatcher :param app :py:class:`qubesadmin.Qubes` object :param api_method Admin API method producing events :param enable_cache Enable caching (see below) Connecting :py:class:`EventsDispatcher` object to a :py:class:`qubesadmin.Qubes` implicitly enables caching. It is important to actually run the dispatcher (:py:meth:`listen_for_events`), otherwise the cache won't be updated. Alternatively, disable caching by setting :py:attr:`qubesadmin.Qubes.cache_enabled` property to `False`. """ #: Qubes() object self.app = app self._api_method = api_method #: event handlers - dict of event -> handlers self.handlers = {} if enable_cache: self.app.cache_enabled = True def add_handler(self, event, handler): '''Register handler for event Use '*' as event to register a handler for all events. Handler function is called with: * subject (VM object or None) * event name (str) * keyword arguments related to the event, if any - all values as str :param event Event name, or '*' for all events :param handler Handler function''' self.handlers.setdefault(event, set()).add(handler) def remove_handler(self, event, handler): '''Remove previously registered event handler :param event Event name :param handler Handler function ''' self.handlers[event].remove(handler) @asyncio.coroutine def _get_events_reader(self, vm=None) -> (asyncio.StreamReader, callable): '''Make connection to qubesd and return stream to read events from :param vm: Specific VM for which events should be handled, use None to handle events from all VMs (and non-VM objects) :return stream to read events from and a cleanup function (call it to terminate qubesd connection)''' if vm is not None: dest = vm.name else: dest = 'dom0' if self.app.qubesd_connection_type == 'socket': reader, writer = yield from asyncio.open_unix_connection( qubesadmin.config.QUBESD_SOCKET) writer.write(self._api_method.encode() + b'+ ') # method+arg writer.write(b'dom0 ') # source writer.write(b'name ' + dest.encode('ascii') + b'\0') # dest writer.write_eof() def cleanup_func(): '''Close connection to qubesd''' writer.close() elif self.app.qubesd_connection_type == 'qrexec': proc = yield from asyncio.create_subprocess_exec( 'qrexec-client-vm', dest, self._api_method, stdin=subprocess.PIPE, stdout=subprocess.PIPE) proc.stdin.write_eof() reader = proc.stdout def cleanup_func(): '''Close connection to qubesd''' try: proc.kill() except ProcessLookupError: pass else: raise NotImplementedError('Unsupported qubesd connection type: ' + self.app.qubesd_connection_type) return reader, cleanup_func @asyncio.coroutine def listen_for_events(self, vm=None, reconnect=True): ''' Listen for events and call appropriate handlers. This function do not exit until manually terminated. This is coroutine. :param vm: Listen for events only for this VM, use None to listen for events about all VMs and not related to any particular VM. :param reconnect: should reconnect to qubesd if connection is interrupted? :rtype: None ''' while True: try: yield from self._listen_for_events(vm) except (OSError, qubesadmin.exc.QubesDaemonCommunicationError): pass if not reconnect: break self.app.log.warning( 'Connection to qubesd terminated, reconnecting in {} ' 'seconds'.format(qubesadmin.config.QUBESD_RECONNECT_DELAY)) # avoid busy-loop if qubesd is dead yield from asyncio.sleep(qubesadmin.config.QUBESD_RECONNECT_DELAY) @asyncio.coroutine def _listen_for_events(self, vm=None): ''' Listen for events and call appropriate handlers. This function do not exit until manually terminated. This is coroutine. :param vm: Listen for events only for this VM, use None to listen for events about all VMs and not related to any particular VM. :return: True if any event was received, otherwise False :rtype: bool ''' reader, cleanup_func = yield from self._get_events_reader(vm) try: some_event_received = False while not reader.at_eof(): try: event_header = yield from reader.readuntil(b'\0') if event_header != b'1\0': raise qubesadmin.exc.QubesDaemonCommunicationError( 'Non-event received on events connection: ' + repr(event_header)) subject = (yield from reader.readuntil(b'\0'))[:-1].decode( 'utf-8') event = (yield from reader.readuntil(b'\0'))[:-1].decode( 'utf-8') kwargs = {} while True: key = (yield from reader.readuntil(b'\0'))[:-1].decode( 'utf-8') if not key: break value = (yield from reader.readuntil(b'\0'))[:-1].\ decode('utf-8') kwargs[key] = value except BrokenPipeError: break except asyncio.IncompleteReadError as err: if err.partial == b'': break raise if not subject: subject = None self.handle(subject, event, **kwargs) some_event_received = True finally: cleanup_func() return some_event_received def handle(self, subject, event, **kwargs): """Call handlers for given event""" # pylint: disable=protected-access if subject: if event in ['property-set:name']: self.app.domains.clear_cache() try: subject = self.app.domains.get_blind(subject) except KeyError: return else: # handle cache refreshing on best-effort basis if event in ['domain-add', 'domain-delete']: self.app.domains.clear_cache() subject = None # deserialize known attributes if event.startswith('device-') and 'device' in kwargs: try: devclass = event.split(':', 1)[1] backend_domain, ident = kwargs['device'].split(':', 1) kwargs['device'] = self.app.domains.get_blind(backend_domain)\ .devices[devclass][ident] except (KeyError, ValueError): pass # invalidate cache if needed; call it before other handlers # as those may want to use cached value if event.startswith('property-set:') or \ event.startswith('property-reset:'): self.app._invalidate_cache(subject, event, **kwargs) elif event in ('domain-pre-start', 'domain-start', 'domain-shutdown', 'domain-paused', 'domain-unpaused'): self.app._update_power_state_cache(subject, event, **kwargs) handlers = [h_func for h_name, h_func_set in self.handlers.items() for h_func in h_func_set if fnmatch.fnmatch(event, h_name)] for handler in handlers: handler(subject, event, **kwargs)