libvirtaio.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python3
  2. import asyncio
  3. import ctypes
  4. import itertools
  5. import logging
  6. import libvirt
  7. ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
  8. ctypes.pythonapi.PyCapsule_GetPointer.argtypes = (
  9. ctypes.py_object, ctypes.c_char_p)
  10. virFreeCallback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
  11. try:
  12. asyncio.ensure_future
  13. except AttributeError:
  14. asyncio.ensure_future = asyncio.async
  15. class LibvirtAsyncIOEventImpl(object):
  16. class Callback(object):
  17. # pylint: disable=too-few-public-methods
  18. _iden_counter = itertools.count()
  19. def __init__(self, impl, cb, opaque, *args, **kwargs):
  20. super().__init__(*args, **kwargs)
  21. self.iden = next(self._iden_counter)
  22. self.impl = impl
  23. self.cb = cb
  24. self.opaque = opaque
  25. assert self.iden not in self.impl.callbacks, \
  26. 'found {} callback: {!r}'.format(
  27. self.iden, self.impl.callbacks[self.iden])
  28. self.impl.callbacks[self.iden] = self
  29. def close(self):
  30. self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
  31. # Now this is cheating but we have no better option.
  32. dummy, caps_opaque, caps_ff = self.opaque
  33. ff = virFreeCallback(ctypes.pythonapi.PyCapsule_GetPointer(
  34. caps_ff, b'virFreeCallback'))
  35. if ff:
  36. real_opaque = ctypes.pythonapi.PyCapsule_GetPointer(
  37. caps_opaque, b'void*')
  38. self.impl.loop.call_soon(ff, real_opaque)
  39. class FDCallback(Callback):
  40. # pylint: disable=too-few-public-methods
  41. def __init__(self, *args, descriptor, event, **kwargs):
  42. super().__init__(*args, **kwargs)
  43. self.descriptor = descriptor
  44. self.event = event
  45. self.descriptor.callbacks[self.iden] = self
  46. def close(self):
  47. del self.descriptor.callbacks[self.iden]
  48. super().close()
  49. class TimeoutCallback(Callback):
  50. def __init__(self, *args, timeout=None, **kwargs):
  51. super().__init__(*args, **kwargs)
  52. self.timeout = timeout
  53. self.task = None
  54. @asyncio.coroutine
  55. def timer(self):
  56. while True:
  57. try:
  58. timeout = self.timeout * 1e-3
  59. self.impl.log.debug('sleeping %r', timeout)
  60. yield from asyncio.sleep(timeout)
  61. except asyncio.CancelledError:
  62. self.impl.log.debug('timer %d cancelled', self.iden)
  63. break
  64. self.cb(self.iden, self.opaque)
  65. self.impl.log.debug('timer %r callback ended', self.iden)
  66. def start(self):
  67. self.impl.log.debug('timer %r start', self.iden)
  68. if self.task is not None:
  69. return
  70. self.task = asyncio.ensure_future(self.timer())
  71. def stop(self):
  72. self.impl.log.debug('timer %r stop', self.iden)
  73. if self.task is None:
  74. return
  75. self.task.cancel() # pylint: disable=no-member
  76. self.task = None
  77. def close(self):
  78. self.stop()
  79. super().close()
  80. class DescriptorDict(dict):
  81. class Descriptor(object):
  82. def __init__(self, loop, fd):
  83. self.loop = loop
  84. self.fd = fd
  85. self.callbacks = {}
  86. self.loop.add_reader(
  87. self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_READABLE)
  88. self.loop.add_writer(
  89. self.fd, self.handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
  90. def close(self):
  91. self.loop.remove_reader(self.fd)
  92. self.loop.remove_writer(self.fd)
  93. def handle(self, event):
  94. for callback in self.callbacks.values():
  95. if callback.event is not None and callback.event & event:
  96. callback.cb(
  97. callback.iden, self.fd, event, callback.opaque)
  98. def __init__(self, loop):
  99. super().__init__()
  100. self.loop = loop
  101. def __missing__(self, fd):
  102. descriptor = self.Descriptor(self.loop, fd)
  103. self[fd] = descriptor
  104. return descriptor
  105. def __init__(self, loop):
  106. self.loop = loop
  107. self.callbacks = {}
  108. self.descriptors = self.DescriptorDict(self.loop)
  109. self.log = logging.getLogger(self.__class__.__name__)
  110. def register(self):
  111. # pylint: disable=bad-whitespace
  112. libvirt.virEventRegisterImpl(
  113. self.add_handle, self.update_handle, self.remove_handle,
  114. self.add_timeout, self.update_timeout, self.remove_timeout)
  115. def add_handle(self, fd, event, cb, opaque):
  116. self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)',
  117. fd, event, cb, opaque)
  118. callback = self.FDCallback(self, cb, opaque,
  119. descriptor=self.descriptors[fd], event=event)
  120. return callback.iden
  121. def update_handle(self, watch, event):
  122. self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
  123. self.callbacks[watch].event = event
  124. def remove_handle(self, watch):
  125. self.log.debug('remove_handle(watch=%d)', watch)
  126. callback = self.callbacks.pop(watch)
  127. callback.close()
  128. # libvirt-python.git/libvirt-override.c suggests that the opaque value
  129. # should be returned. This is horribly wrong, because this would cause
  130. # instant execution of ff callback, which is prohibited by libvirt's
  131. # C API documentation. We therefore intentionally return None.
  132. return None
  133. def add_timeout(self, timeout, cb, opaque):
  134. self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)',
  135. timeout, cb, opaque)
  136. if timeout <= 0:
  137. # TODO we could think about registering timeouts of -1 as a special
  138. # case and emulate 0 somehow (60 Hz?)
  139. self.log.warning('will not add timer with timeout %r', timeout)
  140. return -1
  141. callback = self.TimeoutCallback(self, cb, opaque, timeout=timeout)
  142. callback.start()
  143. return callback.iden
  144. def update_timeout(self, timer, timeout):
  145. self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
  146. callback = self.callbacks[timer]
  147. callback.timeout = timeout
  148. if timeout > 0:
  149. callback.start()
  150. else:
  151. callback.stop()
  152. def remove_timeout(self, timer):
  153. self.log.debug('remove_timeout(timer=%d)', timer)
  154. callback = self.callbacks.pop(timer)
  155. callback.close()
  156. # See remove_handle()
  157. return None