123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python3
- import asyncio
- import ctypes
- import itertools
- import logging
- import libvirt
- ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
- ctypes.pythonapi.PyCapsule_GetPointer.argtypes = (
- ctypes.py_object, ctypes.c_char_p)
- virFreeCallback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
- try:
- asyncio.ensure_future
- except AttributeError:
- asyncio.ensure_future = asyncio.async
- class LibvirtAsyncIOEventImpl(object):
- class Callback(object):
- # pylint: disable=too-few-public-methods
- _iden_counter = itertools.count()
- def __init__(self, impl, cb, opaque, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.iden = next(self._iden_counter)
- self.impl = impl
- self.cb = cb
- self.opaque = opaque
- assert self.iden not in self.impl.callbacks, \
- 'found {} callback: {!r}'.format(
- self.iden, self.impl.callbacks[self.iden])
- self.impl.callbacks[self.iden] = self
- def close(self):
- self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
- # Now this is cheating but we have no better option.
- dummy, caps_opaque, caps_ff = self.opaque
- ff = virFreeCallback(ctypes.pythonapi.PyCapsule_GetPointer(
- caps_ff, b'virFreeCallback'))
- if ff:
- real_opaque = ctypes.pythonapi.PyCapsule_GetPointer(
- caps_opaque, b'void*')
- self.impl.loop.call_soon(ff, real_opaque)
- class FDCallback(Callback):
- # pylint: disable=too-few-public-methods
- def __init__(self, *args, descriptor, event, **kwargs):
- super().__init__(*args, **kwargs)
- self.descriptor = descriptor
- self.event = event
- self.descriptor.callbacks[self.iden] = self
- def close(self):
- del self.descriptor.callbacks[self.iden]
- super().close()
- class TimeoutCallback(Callback):
- def __init__(self, *args, timeout=None, **kwargs):
- super().__init__(*args, **kwargs)
- self.timeout = timeout
- self.task = None
- @asyncio.coroutine
- def timer(self):
- while True:
- try:
- timeout = self.timeout * 1e-3
- self.impl.log.debug('sleeping %r', timeout)
- yield from asyncio.sleep(timeout)
- except asyncio.CancelledError:
- self.impl.log.debug('timer %d cancelled', self.iden)
- break
- self.cb(self.iden, self.opaque)
- self.impl.log.debug('timer %r callback ended', self.iden)
- def start(self):
- self.impl.log.debug('timer %r start', self.iden)
- if self.task is not None:
- return
- self.task = asyncio.ensure_future(self.timer())
- def stop(self):
- self.impl.log.debug('timer %r stop', self.iden)
- if self.task is None:
- return
- self.task.cancel() # pylint: disable=no-member
- self.task = None
- def close(self):
- self.stop()
- super().close()
- class DescriptorDict(dict):
- class Descriptor(object):
- def __init__(self, loop, fd):
- self.loop = loop
- self.fd = fd
- self.callbacks = {}
- self.loop.add_reader(
- self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_READABLE)
- self.loop.add_writer(
- self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
- def close(self):
- self.loop.remove_reader(self.fd)
- self.loop.remove_writer(self.fd)
- def handle(self, event):
- for callback in self.callbacks.values():
- if callback.event is not None and callback.event & event:
- callback.cb(
- callback.iden, self.fd, event, callback.opaque)
- def __init__(self, loop):
- super().__init__()
- self.loop = loop
- def __missing__(self, fd):
- descriptor = self.Descriptor(self.loop, fd)
- self[fd] = descriptor
- return descriptor
- def __init__(self, loop):
- self.loop = loop
- self.callbacks = {}
- self.descriptors = self.DescriptorDict(self.loop)
- self.log = logging.getLogger(self.__class__.__name__)
- def register(self):
- # pylint: disable=bad-whitespace
- libvirt.virEventRegisterImpl(
- self.add_handle, self.update_handle, self.remove_handle,
- self.add_timeout, self.update_timeout, self.remove_timeout)
- def add_handle(self, fd, event, cb, opaque):
- self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)',
- fd, event, cb, opaque)
- callback = self.FDCallback(self, cb, opaque,
- descriptor=self.descriptors[fd], event=event)
- return callback.iden
- def update_handle(self, watch, event):
- self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
- self.callbacks[watch].event = event
- def remove_handle(self, watch):
- self.log.debug('remove_handle(watch=%d)', watch)
- callback = self.callbacks.pop(watch)
- callback.close()
- # libvirt-python.git/libvirt-override.c suggests that the opaque value
- # should be returned. This is horribly wrong, because this would cause
- # instant execution of ff callback, which is prohibited by libvirt's
- # C API documentation. We therefore intentionally return None.
- return None
- def add_timeout(self, timeout, cb, opaque):
- self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)',
- timeout, cb, opaque)
- if timeout <= 0:
- # TODO we could think about registering timeouts of -1 as a special
- # case and emulate 0 somehow (60 Hz?)
- self.log.warning('will not add timer with timeout %r', timeout)
- return -1
- callback = self.TimeoutCallback(self, cb, opaque, timeout=timeout)
- callback.start()
- return callback.iden
- def update_timeout(self, timer, timeout):
- self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
- callback = self.callbacks[timer]
- callback.timeout = timeout
- if timeout > 0:
- callback.start()
- else:
- callback.stop()
- def remove_timeout(self, timer):
- self.log.debug('remove_timeout(timer=%d)', timer)
- callback = self.callbacks.pop(timer)
- callback.close()
- # See remove_handle()
- return None
|