__init__.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. '''Event handling implementation, require Python >=3.5.2 for asyncio.'''
  21. import asyncio
  22. import subprocess
  23. import qubesadmin.config
  24. import qubesadmin.exc
  25. class EventsDispatcher(object):
  26. ''' Events dispatcher, responsible for receiving events and calling
  27. appropriate handlers'''
  28. def __init__(self, app):
  29. '''Initialize EventsDispatcher'''
  30. #: Qubes() object
  31. self.app = app
  32. #: event handlers - dict of event -> handlers
  33. self.handlers = {}
  34. def add_handler(self, event, handler):
  35. '''Register handler for event
  36. Use '*' as event to register a handler for all events.
  37. Handler function is called with:
  38. * subject (VM object or None)
  39. * event name (str)
  40. * keyword arguments related to the event, if any - all values as str
  41. :param event Event name, or '*' for all events
  42. :param handler Handler function'''
  43. self.handlers.setdefault(event, set()).add(handler)
  44. def remove_handler(self, event, handler):
  45. '''Remove previously registered event handler
  46. :param event Event name
  47. :param handler Handler function
  48. '''
  49. self.handlers[event].remove(handler)
  50. @asyncio.coroutine
  51. def _get_events_reader(self, vm=None) -> (asyncio.StreamReader, callable):
  52. '''Make connection to qubesd and return stream to read events from
  53. :param vm: Specific VM for which events should be handled, use None
  54. to handle events from all VMs (and non-VM objects)
  55. :return stream to read events from and a cleanup function
  56. (call it to terminate qubesd connection)'''
  57. if vm is not None:
  58. dest = vm.name
  59. else:
  60. dest = 'dom0'
  61. if self.app.qubesd_connection_type == 'socket':
  62. reader, writer = yield from asyncio.open_unix_connection(
  63. qubesadmin.config.QUBESD_SOCKET)
  64. writer.write(b'dom0\0') # source
  65. writer.write(b'mgmt.Events\0') # method
  66. writer.write(dest.encode('ascii') + b'\0') # dest
  67. writer.write(b'\0') # arg
  68. writer.write_eof()
  69. def cleanup_func():
  70. '''Close connection to qubesd'''
  71. writer.close()
  72. elif self.app.qubesd_connection_type == 'qrexec':
  73. proc = yield from asyncio.create_subprocess_exec(
  74. ['qrexec-client-vm', dest, 'mgmt.Events'],
  75. stdin=subprocess.PIPE, stdout=subprocess.PIPE)
  76. proc.stdin.write_eof()
  77. reader = proc.stdout
  78. def cleanup_func():
  79. '''Close connection to qubesd'''
  80. proc.kill()
  81. else:
  82. raise NotImplementedError('Unsupported qubesd connection type: '
  83. + self.app.qubesd_connection_type)
  84. return reader, cleanup_func
  85. @asyncio.coroutine
  86. def listen_for_events(self, vm=None, reconnect=True):
  87. '''
  88. Listen for events and call appropriate handlers.
  89. This function do not exit until manually terminated.
  90. This is coroutine.
  91. :param vm: Listen for events only for this VM, use None to listen for
  92. events about all VMs and not related to any particular VM.
  93. :param reconnect: should reconnect to qubesd if connection is
  94. interrupted?
  95. :rtype: None
  96. '''
  97. while True:
  98. try:
  99. yield from self._listen_for_events(vm)
  100. except ConnectionRefusedError:
  101. pass
  102. if not reconnect:
  103. break
  104. self.app.log.warning(
  105. 'Connection to qubesd terminated, reconnecting in {} '
  106. 'seconds'.format(qubesadmin.config.QUBESD_RECONNECT_DELAY))
  107. # avoid busy-loop if qubesd is dead
  108. yield from asyncio.sleep(qubesadmin.config.QUBESD_RECONNECT_DELAY)
  109. @asyncio.coroutine
  110. def _listen_for_events(self, vm=None):
  111. '''
  112. Listen for events and call appropriate handlers.
  113. This function do not exit until manually terminated.
  114. This is coroutine.
  115. :param vm: Listen for events only for this VM, use None to listen for
  116. events about all VMs and not related to any particular VM.
  117. :return: True if any event was received, otherwise False
  118. :rtype: bool
  119. '''
  120. reader, cleanup_func = yield from self._get_events_reader(vm)
  121. try:
  122. some_event_received = False
  123. while not reader.at_eof():
  124. try:
  125. event_data = yield from reader.readuntil(b'\0\0')
  126. if event_data == b'1\0\0':
  127. # event with non-VM subject contains \0\0 inside of
  128. # event, need to receive rest of the data
  129. event_data += yield from reader.readuntil(b'\0\0')
  130. except asyncio.IncompleteReadError as err:
  131. if err.partial == b'':
  132. break
  133. else:
  134. raise
  135. if not event_data.startswith(b'1\0'):
  136. raise qubesadmin.exc.QubesDaemonCommunicationError(
  137. 'Non-event received on events connection: '
  138. + repr(event_data))
  139. event_data = event_data.decode('utf-8')
  140. _, subject, event, *kwargs = event_data.split('\0')
  141. # convert list to dict, remove last empty entry
  142. kwargs = dict(zip(kwargs[:-2:2], kwargs[1:-2:2]))
  143. self.handle(subject, event, **kwargs)
  144. some_event_received = True
  145. finally:
  146. cleanup_func()
  147. return some_event_received
  148. def handle(self, subject, event, **kwargs):
  149. '''Call handlers for given event'''
  150. if subject:
  151. subject = self.app.domains[subject]
  152. else:
  153. subject = None
  154. for handler in self.handlers.get(event, []):
  155. handler(subject, event, **kwargs)
  156. for handler in self.handlers.get('*', []):
  157. handler(subject, event, **kwargs)