195 lines
6.7 KiB
Python
195 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import ctypes
|
|
import itertools
|
|
import logging
|
|
|
|
import libvirt
|
|
|
|
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
|
|
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = (
|
|
ctypes.py_object, ctypes.c_char_p)
|
|
|
|
virFreeCallback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
|
|
|
|
try:
|
|
asyncio.ensure_future
|
|
except AttributeError:
|
|
asyncio.ensure_future = asyncio.async
|
|
|
|
class LibvirtAsyncIOEventImpl(object):
|
|
class Callback(object):
|
|
# pylint: disable=too-few-public-methods
|
|
_iden_counter = itertools.count()
|
|
def __init__(self, impl, cb, opaque, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.iden = next(self._iden_counter)
|
|
self.impl = impl
|
|
self.cb = cb
|
|
self.opaque = opaque
|
|
|
|
assert self.iden not in self.impl.callbacks, \
|
|
'found {} callback: {!r}'.format(
|
|
self.iden, self.impl.callbacks[self.iden])
|
|
self.impl.callbacks[self.iden] = self
|
|
|
|
def close(self):
|
|
self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
|
|
|
|
# Now this is cheating but we have no better option.
|
|
dummy, caps_opaque, caps_ff = self.opaque
|
|
ff = virFreeCallback(ctypes.pythonapi.PyCapsule_GetPointer(
|
|
caps_ff, b'virFreeCallback'))
|
|
|
|
if ff:
|
|
real_opaque = ctypes.pythonapi.PyCapsule_GetPointer(
|
|
caps_opaque, b'void*')
|
|
self.impl.loop.call_soon(ff, real_opaque)
|
|
|
|
|
|
class FDCallback(Callback):
|
|
# pylint: disable=too-few-public-methods
|
|
def __init__(self, *args, descriptor, event, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.descriptor = descriptor
|
|
self.event = event
|
|
|
|
self.descriptor.callbacks[self.iden] = self
|
|
|
|
def close(self):
|
|
del self.descriptor.callbacks[self.iden]
|
|
super().close()
|
|
|
|
|
|
class TimeoutCallback(Callback):
|
|
def __init__(self, *args, timeout=None, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.timeout = timeout
|
|
self.task = None
|
|
|
|
@asyncio.coroutine
|
|
def timer(self):
|
|
while True:
|
|
try:
|
|
timeout = self.timeout * 1e-3
|
|
self.impl.log.debug('sleeping %r', timeout)
|
|
yield from asyncio.sleep(timeout)
|
|
except asyncio.CancelledError:
|
|
self.impl.log.debug('timer %d cancelled', self.iden)
|
|
break
|
|
self.cb(self.iden, self.opaque)
|
|
self.impl.log.debug('timer %r callback ended', self.iden)
|
|
|
|
def start(self):
|
|
self.impl.log.debug('timer %r start', self.iden)
|
|
if self.task is not None:
|
|
return
|
|
self.task = asyncio.ensure_future(self.timer())
|
|
|
|
def stop(self):
|
|
self.impl.log.debug('timer %r stop', self.iden)
|
|
if self.task is None:
|
|
return
|
|
self.task.cancel() # pylint: disable=no-member
|
|
self.task = None
|
|
|
|
def close(self):
|
|
self.stop()
|
|
super().close()
|
|
|
|
|
|
class DescriptorDict(dict):
|
|
class Descriptor(object):
|
|
def __init__(self, loop, fd):
|
|
self.loop = loop
|
|
self.fd = fd
|
|
self.callbacks = {}
|
|
|
|
self.loop.add_reader(
|
|
self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_READABLE)
|
|
self.loop.add_writer(
|
|
self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
|
|
|
|
def close(self):
|
|
self.loop.remove_reader(self.fd)
|
|
self.loop.remove_writer(self.fd)
|
|
|
|
def handle(self, event):
|
|
for callback in self.callbacks.values():
|
|
if callback.event is not None and callback.event & event:
|
|
callback.cb(
|
|
callback.iden, self.fd, event, callback.opaque)
|
|
|
|
def __init__(self, loop):
|
|
super().__init__()
|
|
self.loop = loop
|
|
def __missing__(self, fd):
|
|
descriptor = self.Descriptor(self.loop, fd)
|
|
self[fd] = descriptor
|
|
return descriptor
|
|
|
|
def __init__(self, loop):
|
|
self.loop = loop
|
|
self.callbacks = {}
|
|
self.descriptors = self.DescriptorDict(self.loop)
|
|
self.log = logging.getLogger(self.__class__.__name__)
|
|
|
|
def register(self):
|
|
# pylint: disable=bad-whitespace
|
|
libvirt.virEventRegisterImpl(
|
|
self.add_handle, self.update_handle, self.remove_handle,
|
|
self.add_timeout, self.update_timeout, self.remove_timeout)
|
|
|
|
def add_handle(self, fd, event, cb, opaque):
|
|
self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)',
|
|
fd, event, cb, opaque)
|
|
callback = self.FDCallback(self, cb, opaque,
|
|
descriptor=self.descriptors[fd], event=event)
|
|
return callback.iden
|
|
|
|
def update_handle(self, watch, event):
|
|
self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
|
|
self.callbacks[watch].event = event
|
|
|
|
def remove_handle(self, watch):
|
|
self.log.debug('remove_handle(watch=%d)', watch)
|
|
callback = self.callbacks.pop(watch)
|
|
callback.close()
|
|
|
|
# libvirt-python.git/libvirt-override.c suggests that the opaque value
|
|
# should be returned. This is horribly wrong, because this would cause
|
|
# instant execution of ff callback, which is prohibited by libvirt's
|
|
# C API documentation. We therefore intentionally return None.
|
|
return None
|
|
|
|
def add_timeout(self, timeout, cb, opaque):
|
|
self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)',
|
|
timeout, cb, opaque)
|
|
if timeout <= 0:
|
|
# TODO we could think about registering timeouts of -1 as a special
|
|
# case and emulate 0 somehow (60 Hz?)
|
|
self.log.warning('will not add timer with timeout %r', timeout)
|
|
return -1
|
|
|
|
callback = self.TimeoutCallback(self, cb, opaque, timeout=timeout)
|
|
callback.start()
|
|
return callback.iden
|
|
|
|
def update_timeout(self, timer, timeout):
|
|
self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
|
|
callback = self.callbacks[timer]
|
|
callback.timeout = timeout
|
|
if timeout > 0:
|
|
callback.start()
|
|
else:
|
|
callback.stop()
|
|
|
|
def remove_timeout(self, timer):
|
|
self.log.debug('remove_timeout(timer=%d)', timer)
|
|
callback = self.callbacks.pop(timer)
|
|
callback.close()
|
|
|
|
# See remove_handle()
|
|
return None
|