libvirtaio.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #!/usr/bin/env python3
  2. import asyncio
  3. import collections
  4. import ctypes
  5. import functools
  6. import itertools
  7. import logging
  8. import signal
  9. import libvirt
  10. import pdb
  11. ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
  12. ctypes.pythonapi.PyCapsule_GetPointer.argtypes = (
  13. ctypes.py_object, ctypes.c_char_p)
  14. virFreeCallback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
  15. try:
  16. asyncio.ensure_future
  17. except AttributeError:
  18. asyncio.ensure_future = asyncio.async
  19. class LibvirtAsyncIOEventImpl(object):
  20. class Callback(object):
  21. _iden_counter = itertools.count()
  22. def __init__(self, impl, cb, opaque, *args, **kwargs):
  23. super().__init__(*args, **kwargs)
  24. self.iden = next(self._iden_counter)
  25. self.impl = impl
  26. self.cb = cb
  27. self.opaque = opaque
  28. assert self.iden not in self.impl.callbacks, \
  29. 'found {} callback: {!r}'.format(
  30. self.iden, self.impl.callbacks[self.iden])
  31. self.impl.callbacks[self.iden] = self
  32. def close(self):
  33. self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
  34. # Now this is cheating but we have no better option.
  35. caps_cb, caps_opaque, caps_ff = self.opaque
  36. ff = virFreeCallback(ctypes.pythonapi.PyCapsule_GetPointer(
  37. caps_ff, b'virFreeCallback'))
  38. if ff:
  39. real_opaque = ctypes.pythonapi.PyCapsule_GetPointer(
  40. caps_opaque, b'void*')
  41. self.impl.loop.call_soon(ff, real_opaque)
  42. class FDCallback(Callback):
  43. def __init__(self, *args, descriptor, event, **kwargs):
  44. super().__init__(*args, **kwargs)
  45. self.descriptor = descriptor
  46. self.event = event
  47. self.descriptor.callbacks[self.iden] = self
  48. def close(self):
  49. del self.descriptor.callbacks[self.iden]
  50. super().close()
  51. class TimeoutCallback(Callback):
  52. def __init__(self, *args, timeout=None, **kwargs):
  53. super().__init__(*args, **kwargs)
  54. self.timeout = timeout
  55. self.task = None
  56. @asyncio.coroutine
  57. def timer(self):
  58. while True:
  59. try:
  60. t = self.timeout * 1e-3
  61. self.impl.log.debug('sleeping %r', t)
  62. yield from asyncio.sleep(t)
  63. except asyncio.CancelledError:
  64. self.impl.log.debug('timer %d cancelled', self.iden)
  65. break
  66. self.cb(self.iden, self.opaque)
  67. self.impl.log.debug('timer %r callback ended', self.iden)
  68. def start(self):
  69. self.impl.log.debug('timer %r start', self.iden)
  70. if self.task is not None:
  71. return
  72. self.task = asyncio.ensure_future(self.timer())
  73. def stop(self):
  74. self.impl.log.debug('timer %r stop', self.iden)
  75. if self.task is None:
  76. return
  77. self.task.cancel()
  78. self.task = None
  79. def close(self):
  80. self.stop()
  81. super().close()
  82. class DescriptorDict(dict):
  83. class Descriptor(object):
  84. def __init__(self, loop, fd):
  85. self.loop = loop
  86. self.fd = fd
  87. self.callbacks = {}
  88. self.loop.add_reader(
  89. self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_READABLE)
  90. self.loop.add_writer(
  91. self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
  92. def close(self):
  93. self.loop.remove_reader(self.fd)
  94. self.loop.remove_writer(self.fd)
  95. def handle(self, event):
  96. for callback in self.callbacks.values():
  97. if callback.event is not None and callback.event & event:
  98. callback.cb(
  99. callback.iden, self.fd, event, callback.opaque)
  100. def __init__(self, loop):
  101. super().__init__()
  102. self.loop = loop
  103. def __missing__(self, fd):
  104. descriptor = self.Descriptor(loop, fd)
  105. self[fd] = descriptor
  106. return descriptor
  107. def __init__(self, loop):
  108. self.loop = loop
  109. self.callbacks = {}
  110. self.descriptors = self.DescriptorDict(self.loop)
  111. self.log = logging.getLogger(self.__class__.__name__)
  112. def register(self):
  113. libvirt.virEventRegisterImpl(
  114. self.add_handle, self.update_handle, self.remove_handle,
  115. self.add_timeout, self.update_timeout, self.remove_timeout)
  116. def add_handle(self, fd, event, cb, opaque):
  117. self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)',
  118. fd, event, cb, opaque)
  119. callback = self.FDCallback(self, cb, opaque,
  120. descriptor=self.descriptors[fd], event=event)
  121. return callback.iden
  122. def update_handle(self, watch, event):
  123. self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
  124. self.callbacks[watch].event = event
  125. def remove_handle(self, watch):
  126. self.log.debug('remove_handle(watch=%d)', watch)
  127. callback = self.callbacks.pop(watch)
  128. callback.close()
  129. # libvirt-python.git/libvirt-override.c suggests that the opaque value
  130. # should be returned. This is horribly wrong, because this would cause
  131. # instant execution of ff callback, which is prohibited by libvirt's
  132. # C API documentation. We therefore intentionally return None.
  133. return None
  134. def add_timeout(self, timeout, cb, opaque):
  135. self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)',
  136. timeout, cb, opaque)
  137. if timeout <= 0:
  138. # TODO we could think about registering timeouts of -1 as a special
  139. # case and emulate 0 somehow (60 Hz?)
  140. self.log.warning('will not add timer with timeout %r', timeout)
  141. return -1
  142. callback = self.TimeoutCallback(self, cb, opaque, timeout=timeout)
  143. callback.start()
  144. return callback.iden
  145. def update_timeout(self, timer, timeout):
  146. self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
  147. callback = self.callbacks[timer]
  148. callback.timeout = timeout
  149. if timeout > 0:
  150. callback.start()
  151. else:
  152. callback.stop()
  153. def remove_timeout(self, timer):
  154. self.log.debug('remove_timeout(timer=%d)', timer)
  155. callback = self.callbacks.pop(timer)
  156. callback.close()
  157. # See remove_handle()
  158. return None