diff --git a/core/qubes.py b/core/qubes.py index fd7c23d8..797e1e14 100755 --- a/core/qubes.py +++ b/core/qubes.py @@ -157,6 +157,7 @@ class QubesVMMConnection(object): if 'xen.lowlevel.xs' in sys.modules: self._xs = xen.lowlevel.xs.xs() + libvirt.virEventRegisterDefaultImpl() self._libvirt_conn = libvirt.open(defaults['libvirt_uri']) if self._libvirt_conn == None: raise QubesException("Failed connect to libvirt driver") diff --git a/core/qubesutils.py b/core/qubesutils.py index b99ed5e4..f6763f04 100644 --- a/core/qubesutils.py +++ b/core/qubesutils.py @@ -21,6 +21,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # + import string from lxml import etree from lxml.etree import ElementTree, SubElement, Element @@ -34,6 +35,8 @@ import subprocess import re import time import stat +import libvirt +from qdb import QubesDB,Error,DisconnectedError import xen.lowlevel.xc import xen.lowlevel.xs @@ -695,94 +698,105 @@ def only_in_first_list(l1, l2): return ret class QubesWatch(object): - class WatchType(object): - def __init__(self, fn, param): - self.fn = fn - self.param = param - def __init__(self): - self.xs = xen.lowlevel.xs.xs() - self.watch_tokens_block = {} - self.watch_tokens_vbd = {} - self.watch_tokens_meminfo = {} + self._qdb = {} + self._qdb_events = {} self.block_callback = None self.meminfo_callback = None self.domain_callback = None - self.xs.watch('@introduceDomain', QubesWatch.WatchType(self.domain_list_changed, None)) - self.xs.watch('@releaseDomain', QubesWatch.WatchType(self.domain_list_changed, None)) + vmm.libvirt_conn.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + self._domain_list_changed, None) + vmm.libvirt_conn.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED, + self._device_removed, None) + # TODO: device attach libvirt event + for vm in vmm.libvirt_conn.listAllDomains(): + if vm.isActive(): + self._register_watches(vm) + # and for dom0 + self._register_watches(None) + + def _qdb_handler(self, watch, fd, events, domain_name): + try: + path = self._qdb[domain_name].read_watch() + except DisconnectedError: + libvirt.virEventRemoveHandle(watch) + del(self._qdb_events[domain_name]) + self._qdb[domain_name].close() + del(self._qdb[domain_name]) + return + if path.startswith('/qubes-block-devices'): + if self.block_callback is not None: + self.block_callback(domain_name) + def setup_block_watch(self, callback): - old_block_callback = self.block_callback self.block_callback = callback - if old_block_callback is not None and callback is None: - # remove watches - self.update_watches_block([]) - else: - # possibly add watches - self.domain_list_changed(None) def setup_meminfo_watch(self, callback): - old_meminfo_callback = self.meminfo_callback - self.meminfo_callback = callback - if old_meminfo_callback is not None and callback is None: - # remove watches - self.update_watches_meminfo([]) - else: - # possibly add watches - self.domain_list_changed(None) + raise NotImplemented def setup_domain_watch(self, callback): self.domain_callback = callback - def get_block_key(self, xid): - return '/local/domain/%s/qubes-block-devices' % xid - - def get_vbd_key(self, xid): - return '/local/domain/%s/device/vbd' % xid - def get_meminfo_key(self, xid): return '/local/domain/%s/memory/meminfo' % xid - def update_watches(self, xid_list, watch_tokens, xs_key_func, callback): - for i in only_in_first_list(xid_list, watch_tokens.keys()): - #new domain has been created - watch = QubesWatch.WatchType(callback, i) - watch_tokens[i] = watch - self.xs.watch(xs_key_func(i), watch) - for i in only_in_first_list(watch_tokens.keys(), xid_list): - #domain destroyed - self.xs.unwatch(xs_key_func(i), watch_tokens[i]) - watch_tokens.pop(i) + def _register_watches(self, libvirt_domain): + if libvirt_domain: + name = libvirt_domain.name() + if name in self._qdb: + return + # open separate connection to Qubes DB: + # 1. to not confuse pull() with responses to real commands sent from + # other threads (like read, write etc) with watch events + # 2. to not think whether QubesDB is thread-safe (it isn't) + while libvirt_domain.isActive() and name not in self._qdb: + try: + self._qdb[name] = QubesDB(name) + except Error as e: + if e.args[0] != 2: + raise + time.sleep(0.5) + else: + name = "dom0" + self._qdb[name] = QubesDB(name) + self._qdb[name].watch('/qubes-block-devices') + self._qdb_events[name] = libvirt.virEventAddHandle( + self._qdb[name].watch_fd(), + libvirt.VIR_EVENT_HANDLE_READABLE, + self._qdb_handler, name) - def update_watches_block(self, xid_list): - self.update_watches(xid_list, self.watch_tokens_block, - self.get_block_key, self.block_callback) - self.update_watches(xid_list, self.watch_tokens_vbd, - self.get_vbd_key, self.block_callback) + def _unregister_watches(self, libvirt_domain): + name = libvirt_domain.name() + if name in self._qdb_events: + libvirt.virEventRemoveHandle(self._qdb_events[name]) + del(self._qdb_events[name]) + if name in self._qdb: + self._qdb[name].close() + del(self._qdb[name]) - def update_watches_meminfo(self, xid_list): - self.update_watches(xid_list, self.watch_tokens_meminfo, - self.get_meminfo_key, self.meminfo_callback) - - def domain_list_changed(self, param): - curr = self.xs.ls('', '/local/domain') - if curr == None: - return + def _domain_list_changed(self, conn, domain, event, reason, param): + if event == libvirt.VIR_DOMAIN_EVENT_STARTED: + self._register_watches(domain) + elif event == libvirt.VIR_DOMAIN_EVENT_STOPPED: + self._unregister_watches(domain) + else: + # ignore other events for now + return None if self.domain_callback: - self.domain_callback() - if self.block_callback: - self.update_watches_block(curr) - if self.meminfo_callback: - self.update_watches_meminfo(curr) + self.domain_callback(name=domain.name()) - def watch_single(self): - result = self.xs.read_watch() - token = result[1] - token.fn(token.param) + def _device_removed(self, conn, domain, device, param): + if self.block_callback is not None: + self.block_callback(domain.name()) def watch_loop(self): while True: - self.watch_single() + libvirt.virEventRunDefaultImpl() ##### updates check #####