core-admin-client/qubesadmin/events/__init__.py
Marek Marczykowski-Górecki 218d43a2e0
Add simple properties caching
Reduce Admin API calls by caching returned values. The cache is not
enabled by default, because it could result in stale values being
returned. It can be enabled by setting 'cache_enabled' to True on
Qubes() object. This is safe in two cases:
 - the application don't care about changed values - like a short-lived
   process that retrieve values once (for example qvm-ls)
 - the application listen for events and invalidate cache when property
   is changed

For the second case, invalidating the cache on appropriate event
(property-set:*, property-reset:*) is done before calling other event
handlers. This is because the event may try to access the property value
(not necessary from the event arguments), so we need to be sure it will
see the new value.

Fixes QubesOS/qubes-issues#5415
2020-05-22 18:59:44 +02:00

225 lines
8.4 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
'''Event handling implementation, require Python >=3.5.2 for asyncio.'''
import asyncio
import fnmatch
import subprocess
import qubesadmin.config
import qubesadmin.exc
class EventsDispatcher(object):
''' Events dispatcher, responsible for receiving events and calling
appropriate handlers'''
def __init__(self, app, api_method='admin.Events'):
'''Initialize EventsDispatcher'''
#: Qubes() object
self.app = app
self._api_method = api_method
#: event handlers - dict of event -> handlers
self.handlers = {}
def add_handler(self, event, handler):
'''Register handler for event
Use '*' as event to register a handler for all events.
Handler function is called with:
* subject (VM object or None)
* event name (str)
* keyword arguments related to the event, if any - all values as str
:param event Event name, or '*' for all events
:param handler Handler function'''
self.handlers.setdefault(event, set()).add(handler)
def remove_handler(self, event, handler):
'''Remove previously registered event handler
:param event Event name
:param handler Handler function
'''
self.handlers[event].remove(handler)
@asyncio.coroutine
def _get_events_reader(self, vm=None) -> (asyncio.StreamReader, callable):
'''Make connection to qubesd and return stream to read events from
:param vm: Specific VM for which events should be handled, use None
to handle events from all VMs (and non-VM objects)
:return stream to read events from and a cleanup function
(call it to terminate qubesd connection)'''
if vm is not None:
dest = vm.name
else:
dest = 'dom0'
if self.app.qubesd_connection_type == 'socket':
reader, writer = yield from asyncio.open_unix_connection(
qubesadmin.config.QUBESD_SOCKET)
writer.write(b'dom0\0') # source
writer.write(self._api_method.encode() + b'\0') # method
writer.write(dest.encode('ascii') + b'\0') # dest
writer.write(b'\0') # arg
writer.write_eof()
def cleanup_func():
'''Close connection to qubesd'''
writer.close()
elif self.app.qubesd_connection_type == 'qrexec':
proc = yield from asyncio.create_subprocess_exec(
'qrexec-client-vm', dest, self._api_method,
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
proc.stdin.write_eof()
reader = proc.stdout
def cleanup_func():
'''Close connection to qubesd'''
try:
proc.kill()
except ProcessLookupError:
pass
else:
raise NotImplementedError('Unsupported qubesd connection type: '
+ self.app.qubesd_connection_type)
return reader, cleanup_func
@asyncio.coroutine
def listen_for_events(self, vm=None, reconnect=True):
'''
Listen for events and call appropriate handlers.
This function do not exit until manually terminated.
This is coroutine.
:param vm: Listen for events only for this VM, use None to listen for
events about all VMs and not related to any particular VM.
:param reconnect: should reconnect to qubesd if connection is
interrupted?
:rtype: None
'''
while True:
try:
yield from self._listen_for_events(vm)
except (OSError, qubesadmin.exc.QubesDaemonCommunicationError):
pass
if not reconnect:
break
self.app.log.warning(
'Connection to qubesd terminated, reconnecting in {} '
'seconds'.format(qubesadmin.config.QUBESD_RECONNECT_DELAY))
# avoid busy-loop if qubesd is dead
yield from asyncio.sleep(qubesadmin.config.QUBESD_RECONNECT_DELAY)
@asyncio.coroutine
def _listen_for_events(self, vm=None):
'''
Listen for events and call appropriate handlers.
This function do not exit until manually terminated.
This is coroutine.
:param vm: Listen for events only for this VM, use None to listen for
events about all VMs and not related to any particular VM.
:return: True if any event was received, otherwise False
:rtype: bool
'''
reader, cleanup_func = yield from self._get_events_reader(vm)
try:
some_event_received = False
while not reader.at_eof():
try:
event_header = yield from reader.readuntil(b'\0')
if event_header != b'1\0':
raise qubesadmin.exc.QubesDaemonCommunicationError(
'Non-event received on events connection: '
+ repr(event_header))
subject = (yield from reader.readuntil(b'\0'))[:-1].decode(
'utf-8')
event = (yield from reader.readuntil(b'\0'))[:-1].decode(
'utf-8')
kwargs = {}
while True:
key = (yield from reader.readuntil(b'\0'))[:-1].decode(
'utf-8')
if not key:
break
value = (yield from reader.readuntil(b'\0'))[:-1].\
decode('utf-8')
kwargs[key] = value
except BrokenPipeError:
break
except asyncio.IncompleteReadError as err:
if err.partial == b'':
break
raise
if not subject:
subject = None
self.handle(subject, event, **kwargs)
some_event_received = True
finally:
cleanup_func()
return some_event_received
def handle(self, subject, event, **kwargs):
"""Call handlers for given event"""
# pylint: disable=protected-access
if subject:
if event in ['property-set:name']:
self.app.domains.clear_cache()
try:
subject = self.app.domains.get_blind(subject)
except KeyError:
return
else:
# handle cache refreshing on best-effort basis
if event in ['domain-add', 'domain-delete']:
self.app.domains.clear_cache()
subject = None
# deserialize known attributes
if event.startswith('device-') and 'device' in kwargs:
try:
devclass = event.split(':', 1)[1]
backend_domain, ident = kwargs['device'].split(':', 1)
kwargs['device'] = self.app.domains.get_blind(backend_domain)\
.devices[devclass][ident]
except (KeyError, ValueError):
pass
# invalidate cache if needed; call it before other handlers
# as those may want to use cached value
if event.startswith('property-set:') or \
event.startswith('property-reset:'):
self.app._invalidate_cache(subject, event, **kwargs)
handlers = [h_func for h_name, h_func_set in self.handlers.items()
for h_func in h_func_set
if fnmatch.fnmatch(event, h_name)]
for handler in handlers:
handler(subject, event, **kwargs)