__init__.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. '''Event handling implementation, require Python >=3.5.2 for asyncio.'''
  21. import asyncio
  22. import fnmatch
  23. import subprocess
  24. import qubesadmin.config
  25. import qubesadmin.exc
  26. class EventsDispatcher(object):
  27. ''' Events dispatcher, responsible for receiving events and calling
  28. appropriate handlers'''
  29. def __init__(self, app, api_method='admin.Events', enable_cache=True):
  30. """Initialize EventsDispatcher
  31. :param app :py:class:`qubesadmin.Qubes` object
  32. :param api_method Admin API method producing events
  33. :param enable_cache Enable caching (see below)
  34. Connecting :py:class:`EventsDispatcher` object to a
  35. :py:class:`qubesadmin.Qubes` implicitly enables caching. It is important
  36. to actually run the dispatcher (:py:meth:`listen_for_events`), otherwise
  37. the cache won't be updated. Alternatively, disable caching by setting
  38. :py:attr:`qubesadmin.Qubes.cache_enabled` property to `False`.
  39. """
  40. #: Qubes() object
  41. self.app = app
  42. self._api_method = api_method
  43. #: event handlers - dict of event -> handlers
  44. self.handlers = {}
  45. if enable_cache:
  46. self.app.cache_enabled = True
  47. def add_handler(self, event, handler):
  48. '''Register handler for event
  49. Use '*' as event to register a handler for all events.
  50. Handler function is called with:
  51. * subject (VM object or None)
  52. * event name (str)
  53. * keyword arguments related to the event, if any - all values as str
  54. :param event Event name, or '*' for all events
  55. :param handler Handler function'''
  56. self.handlers.setdefault(event, set()).add(handler)
  57. def remove_handler(self, event, handler):
  58. '''Remove previously registered event handler
  59. :param event Event name
  60. :param handler Handler function
  61. '''
  62. self.handlers[event].remove(handler)
  63. @asyncio.coroutine
  64. def _get_events_reader(self, vm=None) -> (asyncio.StreamReader, callable):
  65. '''Make connection to qubesd and return stream to read events from
  66. :param vm: Specific VM for which events should be handled, use None
  67. to handle events from all VMs (and non-VM objects)
  68. :return stream to read events from and a cleanup function
  69. (call it to terminate qubesd connection)'''
  70. if vm is not None:
  71. dest = vm.name
  72. else:
  73. dest = 'dom0'
  74. if self.app.qubesd_connection_type == 'socket':
  75. reader, writer = yield from asyncio.open_unix_connection(
  76. qubesadmin.config.QUBESD_SOCKET)
  77. writer.write(self._api_method.encode() + b'+ ') # method+arg
  78. writer.write(b'dom0 ') # source
  79. writer.write(b'name ' + dest.encode('ascii') + b'\0') # dest
  80. writer.write_eof()
  81. def cleanup_func():
  82. '''Close connection to qubesd'''
  83. writer.close()
  84. elif self.app.qubesd_connection_type == 'qrexec':
  85. proc = yield from asyncio.create_subprocess_exec(
  86. 'qrexec-client-vm', dest, self._api_method,
  87. stdin=subprocess.PIPE, stdout=subprocess.PIPE)
  88. proc.stdin.write_eof()
  89. reader = proc.stdout
  90. def cleanup_func():
  91. '''Close connection to qubesd'''
  92. try:
  93. proc.kill()
  94. except ProcessLookupError:
  95. pass
  96. else:
  97. raise NotImplementedError('Unsupported qubesd connection type: '
  98. + self.app.qubesd_connection_type)
  99. return reader, cleanup_func
  100. @asyncio.coroutine
  101. def listen_for_events(self, vm=None, reconnect=True):
  102. '''
  103. Listen for events and call appropriate handlers.
  104. This function do not exit until manually terminated.
  105. This is coroutine.
  106. :param vm: Listen for events only for this VM, use None to listen for
  107. events about all VMs and not related to any particular VM.
  108. :param reconnect: should reconnect to qubesd if connection is
  109. interrupted?
  110. :rtype: None
  111. '''
  112. while True:
  113. try:
  114. yield from self._listen_for_events(vm)
  115. except (OSError, qubesadmin.exc.QubesDaemonCommunicationError):
  116. pass
  117. if not reconnect:
  118. break
  119. self.app.log.warning(
  120. 'Connection to qubesd terminated, reconnecting in {} '
  121. 'seconds'.format(qubesadmin.config.QUBESD_RECONNECT_DELAY))
  122. # avoid busy-loop if qubesd is dead
  123. yield from asyncio.sleep(qubesadmin.config.QUBESD_RECONNECT_DELAY)
  124. @asyncio.coroutine
  125. def _listen_for_events(self, vm=None):
  126. '''
  127. Listen for events and call appropriate handlers.
  128. This function do not exit until manually terminated.
  129. This is coroutine.
  130. :param vm: Listen for events only for this VM, use None to listen for
  131. events about all VMs and not related to any particular VM.
  132. :return: True if any event was received, otherwise False
  133. :rtype: bool
  134. '''
  135. reader, cleanup_func = yield from self._get_events_reader(vm)
  136. try:
  137. some_event_received = False
  138. while not reader.at_eof():
  139. try:
  140. event_header = yield from reader.readuntil(b'\0')
  141. if event_header != b'1\0':
  142. raise qubesadmin.exc.QubesDaemonCommunicationError(
  143. 'Non-event received on events connection: '
  144. + repr(event_header))
  145. subject = (yield from reader.readuntil(b'\0'))[:-1].decode(
  146. 'utf-8')
  147. event = (yield from reader.readuntil(b'\0'))[:-1].decode(
  148. 'utf-8')
  149. kwargs = {}
  150. while True:
  151. key = (yield from reader.readuntil(b'\0'))[:-1].decode(
  152. 'utf-8')
  153. if not key:
  154. break
  155. value = (yield from reader.readuntil(b'\0'))[:-1].\
  156. decode('utf-8')
  157. kwargs[key] = value
  158. except BrokenPipeError:
  159. break
  160. except asyncio.IncompleteReadError as err:
  161. if err.partial == b'':
  162. break
  163. raise
  164. if not subject:
  165. subject = None
  166. self.handle(subject, event, **kwargs)
  167. some_event_received = True
  168. finally:
  169. cleanup_func()
  170. return some_event_received
  171. def handle(self, subject, event, **kwargs):
  172. """Call handlers for given event"""
  173. # pylint: disable=protected-access
  174. if subject:
  175. if event in ['property-set:name']:
  176. self.app.domains.clear_cache()
  177. try:
  178. subject = self.app.domains.get_blind(subject)
  179. except KeyError:
  180. return
  181. else:
  182. # handle cache refreshing on best-effort basis
  183. if event in ['domain-add', 'domain-delete']:
  184. self.app.domains.clear_cache()
  185. subject = None
  186. # deserialize known attributes
  187. if event.startswith('device-') and 'device' in kwargs:
  188. try:
  189. devclass = event.split(':', 1)[1]
  190. backend_domain, ident = kwargs['device'].split(':', 1)
  191. kwargs['device'] = self.app.domains.get_blind(backend_domain)\
  192. .devices[devclass][ident]
  193. except (KeyError, ValueError):
  194. pass
  195. # invalidate cache if needed; call it before other handlers
  196. # as those may want to use cached value
  197. if event.startswith('property-set:') or \
  198. event.startswith('property-reset:'):
  199. self.app._invalidate_cache(subject, event, **kwargs)
  200. elif event in ('domain-pre-start', 'domain-start', 'domain-shutdown',
  201. 'domain-paused', 'domain-unpaused'):
  202. self.app._update_power_state_cache(subject, event, **kwargs)
  203. handlers = [h_func for h_name, h_func_set in self.handlers.items()
  204. for h_func in h_func_set
  205. if fnmatch.fnmatch(event, h_name)]
  206. for handler in handlers:
  207. handler(subject, event, **kwargs)