Rework QubesWatch implementation for libvirt events

This commit is contained in:
Marek Marczykowski-Górecki 2014-12-26 02:56:38 +01:00
parent d4ab70ae9d
commit adff88101a
2 changed files with 80 additions and 65 deletions

View File

@ -157,6 +157,7 @@ class QubesVMMConnection(object):
if 'xen.lowlevel.xs' in sys.modules: if 'xen.lowlevel.xs' in sys.modules:
self._xs = xen.lowlevel.xs.xs() self._xs = xen.lowlevel.xs.xs()
libvirt.virEventRegisterDefaultImpl()
self._libvirt_conn = libvirt.open(defaults['libvirt_uri']) self._libvirt_conn = libvirt.open(defaults['libvirt_uri'])
if self._libvirt_conn == None: if self._libvirt_conn == None:
raise QubesException("Failed connect to libvirt driver") raise QubesException("Failed connect to libvirt driver")

View File

@ -21,6 +21,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# #
# #
import string import string
from lxml import etree from lxml import etree
from lxml.etree import ElementTree, SubElement, Element from lxml.etree import ElementTree, SubElement, Element
@ -34,6 +35,8 @@ import subprocess
import re import re
import time import time
import stat import stat
import libvirt
from qdb import QubesDB,Error,DisconnectedError
import xen.lowlevel.xc import xen.lowlevel.xc
import xen.lowlevel.xs import xen.lowlevel.xs
@ -695,94 +698,105 @@ def only_in_first_list(l1, l2):
return ret return ret
class QubesWatch(object): class QubesWatch(object):
class WatchType(object):
def __init__(self, fn, param):
self.fn = fn
self.param = param
def __init__(self): def __init__(self):
self.xs = xen.lowlevel.xs.xs() self._qdb = {}
self.watch_tokens_block = {} self._qdb_events = {}
self.watch_tokens_vbd = {}
self.watch_tokens_meminfo = {}
self.block_callback = None self.block_callback = None
self.meminfo_callback = None self.meminfo_callback = None
self.domain_callback = None self.domain_callback = None
self.xs.watch('@introduceDomain', QubesWatch.WatchType(self.domain_list_changed, None)) vmm.libvirt_conn.domainEventRegisterAny(
self.xs.watch('@releaseDomain', QubesWatch.WatchType(self.domain_list_changed, None)) None,
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
self._domain_list_changed, None)
vmm.libvirt_conn.domainEventRegisterAny(
None,
libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED,
self._device_removed, None)
# TODO: device attach libvirt event
for vm in vmm.libvirt_conn.listAllDomains():
if vm.isActive():
self._register_watches(vm)
# and for dom0
self._register_watches(None)
def _qdb_handler(self, watch, fd, events, domain_name):
try:
path = self._qdb[domain_name].read_watch()
except DisconnectedError:
libvirt.virEventRemoveHandle(watch)
del(self._qdb_events[domain_name])
self._qdb[domain_name].close()
del(self._qdb[domain_name])
return
if path.startswith('/qubes-block-devices'):
if self.block_callback is not None:
self.block_callback(domain_name)
def setup_block_watch(self, callback): def setup_block_watch(self, callback):
old_block_callback = self.block_callback
self.block_callback = callback self.block_callback = callback
if old_block_callback is not None and callback is None:
# remove watches
self.update_watches_block([])
else:
# possibly add watches
self.domain_list_changed(None)
def setup_meminfo_watch(self, callback): def setup_meminfo_watch(self, callback):
old_meminfo_callback = self.meminfo_callback raise NotImplemented
self.meminfo_callback = callback
if old_meminfo_callback is not None and callback is None:
# remove watches
self.update_watches_meminfo([])
else:
# possibly add watches
self.domain_list_changed(None)
def setup_domain_watch(self, callback): def setup_domain_watch(self, callback):
self.domain_callback = callback self.domain_callback = callback
def get_block_key(self, xid):
return '/local/domain/%s/qubes-block-devices' % xid
def get_vbd_key(self, xid):
return '/local/domain/%s/device/vbd' % xid
def get_meminfo_key(self, xid): def get_meminfo_key(self, xid):
return '/local/domain/%s/memory/meminfo' % xid return '/local/domain/%s/memory/meminfo' % xid
def update_watches(self, xid_list, watch_tokens, xs_key_func, callback): def _register_watches(self, libvirt_domain):
for i in only_in_first_list(xid_list, watch_tokens.keys()): if libvirt_domain:
#new domain has been created name = libvirt_domain.name()
watch = QubesWatch.WatchType(callback, i) if name in self._qdb:
watch_tokens[i] = watch return
self.xs.watch(xs_key_func(i), watch) # open separate connection to Qubes DB:
for i in only_in_first_list(watch_tokens.keys(), xid_list): # 1. to not confuse pull() with responses to real commands sent from
#domain destroyed # other threads (like read, write etc) with watch events
self.xs.unwatch(xs_key_func(i), watch_tokens[i]) # 2. to not think whether QubesDB is thread-safe (it isn't)
watch_tokens.pop(i) while libvirt_domain.isActive() and name not in self._qdb:
try:
self._qdb[name] = QubesDB(name)
except Error as e:
if e.args[0] != 2:
raise
time.sleep(0.5)
else:
name = "dom0"
self._qdb[name] = QubesDB(name)
self._qdb[name].watch('/qubes-block-devices')
self._qdb_events[name] = libvirt.virEventAddHandle(
self._qdb[name].watch_fd(),
libvirt.VIR_EVENT_HANDLE_READABLE,
self._qdb_handler, name)
def update_watches_block(self, xid_list): def _unregister_watches(self, libvirt_domain):
self.update_watches(xid_list, self.watch_tokens_block, name = libvirt_domain.name()
self.get_block_key, self.block_callback) if name in self._qdb_events:
self.update_watches(xid_list, self.watch_tokens_vbd, libvirt.virEventRemoveHandle(self._qdb_events[name])
self.get_vbd_key, self.block_callback) del(self._qdb_events[name])
if name in self._qdb:
self._qdb[name].close()
del(self._qdb[name])
def update_watches_meminfo(self, xid_list): def _domain_list_changed(self, conn, domain, event, reason, param):
self.update_watches(xid_list, self.watch_tokens_meminfo, if event == libvirt.VIR_DOMAIN_EVENT_STARTED:
self.get_meminfo_key, self.meminfo_callback) self._register_watches(domain)
elif event == libvirt.VIR_DOMAIN_EVENT_STOPPED:
def domain_list_changed(self, param): self._unregister_watches(domain)
curr = self.xs.ls('', '/local/domain') else:
if curr == None: # ignore other events for now
return return None
if self.domain_callback: if self.domain_callback:
self.domain_callback() self.domain_callback(name=domain.name())
if self.block_callback:
self.update_watches_block(curr)
if self.meminfo_callback:
self.update_watches_meminfo(curr)
def watch_single(self): def _device_removed(self, conn, domain, device, param):
result = self.xs.read_watch() if self.block_callback is not None:
token = result[1] self.block_callback(domain.name())
token.fn(token.param)
def watch_loop(self): def watch_loop(self):
while True: while True:
self.watch_single() libvirt.virEventRunDefaultImpl()
##### updates check ##### ##### updates check #####