From 785706af2f5fbc8f573a4139bd111e456e794c9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Fri, 14 Apr 2017 13:23:02 +0200 Subject: [PATCH] events: initial implementation This module use asyncio, so require Python 3. And actually >= 3.5.2 because of asyncio.StreamReader.readuntil(). Modules are designed the way it's still possible to use non-events API on Python 2. --- qubesmgmt/app.py | 8 ++ qubesmgmt/events/__init__.py | 158 +++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 qubesmgmt/events/__init__.py diff --git a/qubesmgmt/app.py b/qubesmgmt/app.py index 486a77e..bee14cd 100644 --- a/qubesmgmt/app.py +++ b/qubesmgmt/app.py @@ -112,6 +112,8 @@ class QubesBase(qubesmgmt.base.PropertyHolder): labels = None #: storage pools pools = None + #: type of qubesd connection: either 'socket' or 'qrexec' + qubesd_connection_type = None def __init__(self): super(QubesBase, self).__init__(self, 'mgmt.property.', 'dom0') @@ -178,6 +180,9 @@ class QubesLocal(QubesBase): Used when running in dom0. ''' + + qubesd_connection_type = 'socket' + def qubesd_call(self, dest, method, arg=None, payload=None): try: client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -205,6 +210,9 @@ class QubesRemote(QubesBase): Used when running in VM. ''' + + qubesd_connection_type = 'qrexec' + def qubesd_call(self, dest, method, arg=None, payload=None): service_name = method if arg is not None: diff --git a/qubesmgmt/events/__init__.py b/qubesmgmt/events/__init__.py new file mode 100644 index 0000000..cd9ab84 --- /dev/null +++ b/qubesmgmt/events/__init__.py @@ -0,0 +1,158 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, see . + +'''Event handling implementation, require Python >=3.5.2 for asyncio.''' + +import asyncio +import subprocess + +import qubesmgmt.config +import qubesmgmt.exc + + +class EventsDispatcher(object): + ''' Events dispatcher, responsible for receiving events and calling + appropriate handlers''' + def __init__(self, app): + '''Initialize EventsDispatcher''' + #: Qubes() object + self.app = app + + #: event handlers - dict of event -> handlers + self.handlers = {} + + def add_handler(self, event, handler): + '''Register handler for event + + Use '*' as event to register a handler for all events. + + Handler function is called with: + * subject (VM object or None) + * event name (str) + * keyword arguments related to the event, if any - all values as str + + :param event Event name, or '*' for all events + :param handler Handler function''' + self.handlers.setdefault(event, set()).add(handler) + + def remove_handler(self, event, handler): + '''Remove previously registered event handler + + :param event Event name + :param handler Handler function + ''' + self.handlers[event].remove(handler) + + @asyncio.coroutine + def _get_events_reader(self, vm=None) -> (asyncio.StreamReader, callable): + '''Make connection to qubesd and return stream to read events from + + :param vm: Specific VM for which events should be handled, use None + to handle events from all VMs (and non-VM objects) + :return stream to read events from and a cleanup function + (call it to terminate qubesd connection)''' + if vm is not None: + dest = vm.name + else: + dest = 'dom0' + + if self.app.qubesd_connection_type == 'socket': + reader, writer = yield from asyncio.open_unix_connection( + qubesmgmt.config.QUBESD_SOCKET) + writer.write(b'dom0\0') # source + writer.write(b'mgmt.Events\0') # method + writer.write(dest.encode('ascii') + b'\0') # dest + writer.write(b'\0') # arg + writer.write_eof() + + def cleanup_func(): + '''Close connection to qubesd''' + writer.close() + elif self.app.qubesd_connection_type == 'qrexec': + proc = yield from asyncio.create_subprocess_exec( + ['qrexec-client-vm', dest, 'mgmt.Events'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + proc.stdin.write_eof() + reader = proc.stdout + + def cleanup_func(): + '''Close connection to qubesd''' + proc.kill() + else: + raise NotImplementedError('Unsupported qubesd connection type: ' + + self.app.qubesd_connection_type) + return reader, cleanup_func + + @asyncio.coroutine + def listen_for_events(self, vm=None): + ''' + Listen for events and call appropriate handlers. + This function do not exit until manually terminated. + + This is coroutine. + + :param vm: Listen for events only for this VM, use None to listen for + events about all VMs and not related to any particular VM. + :return: None + ''' + + try: + reader, cleanup_func = yield from self._get_events_reader(vm) + except asyncio.CancelledError: + return + + while not reader.at_eof(): + try: + event_data = yield from reader.readuntil(b'\0\0') + if event_data == b'1\0\0': + # event with non-VM subject contains \0\0 inside of + # event, need to receive rest of the data + event_data += yield from reader.readuntil(b'\0\0') + except asyncio.CancelledError: + break + except asyncio.IncompleteReadError as err: + if err.partial == b'': + break + else: + raise + + if not event_data.startswith(b'1\0'): + raise qubesmgmt.exc.QubesDaemonCommunicationError( + 'Non-event received on events connection: ' + + repr(event_data)) + event_data = event_data.decode('utf-8') + _, subject, event, *kwargs = event_data.split('\0') + # convert list to dict, remove last empty entry + kwargs = dict(zip(kwargs[:-2:2], kwargs[1:-2:2])) + self.handle(subject, event, **kwargs) + + cleanup_func() + + def handle(self, subject, event, **kwargs): + '''Call handlers for given event''' + if subject: + subject = self.app.domains[subject] + else: + subject = None + for handler in self.handlers.get(event, []): + handler(subject, event, **kwargs) + for handler in self.handlers.get('*', []): + handler(subject, event, **kwargs)