123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- # -*- encoding: utf8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- '''Event handling implementation, require Python >=3.5.2 for asyncio.'''
- import asyncio
- import subprocess
- import qubesadmin.config
- import qubesadmin.exc
- class EventsDispatcher(object):
- ''' Events dispatcher, responsible for receiving events and calling
- appropriate handlers'''
- def __init__(self, app, api_method='admin.Events'):
- '''Initialize EventsDispatcher'''
- #: Qubes() object
- self.app = app
- self._api_method = api_method
- #: event handlers - dict of event -> handlers
- self.handlers = {}
- def add_handler(self, event, handler):
- '''Register handler for event
- Use '*' as event to register a handler for all events.
- Handler function is called with:
- * subject (VM object or None)
- * event name (str)
- * keyword arguments related to the event, if any - all values as str
- :param event Event name, or '*' for all events
- :param handler Handler function'''
- self.handlers.setdefault(event, set()).add(handler)
- def remove_handler(self, event, handler):
- '''Remove previously registered event handler
- :param event Event name
- :param handler Handler function
- '''
- self.handlers[event].remove(handler)
- @asyncio.coroutine
- def _get_events_reader(self, vm=None) -> (asyncio.StreamReader, callable):
- '''Make connection to qubesd and return stream to read events from
- :param vm: Specific VM for which events should be handled, use None
- to handle events from all VMs (and non-VM objects)
- :return stream to read events from and a cleanup function
- (call it to terminate qubesd connection)'''
- if vm is not None:
- dest = vm.name
- else:
- dest = 'dom0'
- if self.app.qubesd_connection_type == 'socket':
- reader, writer = yield from asyncio.open_unix_connection(
- qubesadmin.config.QUBESD_SOCKET)
- writer.write(b'dom0\0') # source
- writer.write(self._api_method.encode() + b'\0') # method
- writer.write(dest.encode('ascii') + b'\0') # dest
- writer.write(b'\0') # arg
- writer.write_eof()
- def cleanup_func():
- '''Close connection to qubesd'''
- writer.close()
- elif self.app.qubesd_connection_type == 'qrexec':
- proc = yield from asyncio.create_subprocess_exec(
- 'qrexec-client-vm', dest, self._api_method,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- proc.stdin.write_eof()
- reader = proc.stdout
- def cleanup_func():
- '''Close connection to qubesd'''
- try:
- proc.kill()
- except ProcessLookupError:
- pass
- else:
- raise NotImplementedError('Unsupported qubesd connection type: '
- + self.app.qubesd_connection_type)
- return reader, cleanup_func
- @asyncio.coroutine
- def listen_for_events(self, vm=None, reconnect=True):
- '''
- Listen for events and call appropriate handlers.
- This function do not exit until manually terminated.
- This is coroutine.
- :param vm: Listen for events only for this VM, use None to listen for
- events about all VMs and not related to any particular VM.
- :param reconnect: should reconnect to qubesd if connection is
- interrupted?
- :rtype: None
- '''
- while True:
- try:
- yield from self._listen_for_events(vm)
- except (ConnectionRefusedError, ConnectionResetError,
- FileNotFoundError):
- pass
- if not reconnect:
- break
- self.app.log.warning(
- 'Connection to qubesd terminated, reconnecting in {} '
- 'seconds'.format(qubesadmin.config.QUBESD_RECONNECT_DELAY))
- # avoid busy-loop if qubesd is dead
- yield from asyncio.sleep(qubesadmin.config.QUBESD_RECONNECT_DELAY)
- @asyncio.coroutine
- def _listen_for_events(self, vm=None):
- '''
- Listen for events and call appropriate handlers.
- This function do not exit until manually terminated.
- This is coroutine.
- :param vm: Listen for events only for this VM, use None to listen for
- events about all VMs and not related to any particular VM.
- :return: True if any event was received, otherwise False
- :rtype: bool
- '''
- reader, cleanup_func = yield from self._get_events_reader(vm)
- try:
- some_event_received = False
- while not reader.at_eof():
- try:
- event_header = yield from reader.readuntil(b'\0')
- if event_header != b'1\0':
- raise qubesadmin.exc.QubesDaemonCommunicationError(
- 'Non-event received on events connection: '
- + repr(event_header))
- subject = (yield from reader.readuntil(b'\0'))[:-1].decode(
- 'utf-8')
- event = (yield from reader.readuntil(b'\0'))[:-1].decode(
- 'utf-8')
- kwargs = {}
- while True:
- key = (yield from reader.readuntil(b'\0'))[:-1].decode(
- 'utf-8')
- if not key:
- break
- value = (yield from reader.readuntil(b'\0'))[:-1].\
- decode('utf-8')
- kwargs[key] = value
- except asyncio.IncompleteReadError as err:
- if err.partial == b'':
- break
- else:
- raise
- if not subject:
- subject = None
- self.handle(subject, event, **kwargs)
- some_event_received = True
- finally:
- cleanup_func()
- return some_event_received
- def handle(self, subject, event, **kwargs):
- '''Call handlers for given event'''
- if subject:
- if event in ['property-set:name']:
- self.app.domains.clear_cache()
- try:
- subject = self.app.domains[subject]
- except KeyError:
- return
- else:
- # handle cache refreshing on best-effort basis
- if event in ['domain-add', 'domain-delete']:
- self.app.domains.clear_cache()
- subject = None
- for handler in self.handlers.get(event, []):
- handler(subject, event, **kwargs)
- for handler in self.handlers.get('*', []):
- handler(subject, event, **kwargs)
|