qubesd.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python3.6
  2. import asyncio
  3. import functools
  4. import io
  5. import os
  6. import shutil
  7. import signal
  8. import struct
  9. import traceback
  10. import libvirtaio
  11. import qubes
  12. import qubes.api
  13. import qubes.api.admin
  14. import qubes.api.internal
  15. import qubes.utils
  16. import qubes.vm.qubesvm
  17. QUBESD_SOCK = '/var/run/qubesd.sock'
  18. QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'
  19. class QubesDaemonProtocol(asyncio.Protocol):
  20. buffer_size = 65536
  21. header = struct.Struct('Bx')
  22. def __init__(self, handler, *args, app, debug=False, **kwargs):
  23. super().__init__(*args, **kwargs)
  24. self.handler = handler
  25. self.app = app
  26. self.untrusted_buffer = io.BytesIO()
  27. self.len_untrusted_buffer = 0
  28. self.transport = None
  29. self.debug = debug
  30. self.event_sent = False
  31. self.mgmt = None
  32. def connection_made(self, transport):
  33. self.transport = transport
  34. def connection_lost(self, exc):
  35. self.untrusted_buffer.close()
  36. # for cancellable operation, interrupt it, otherwise it will do nothing
  37. if self.mgmt is not None:
  38. self.mgmt.cancel()
  39. self.transport = None
  40. def data_received(self, untrusted_data): # pylint: disable=arguments-differ
  41. if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
  42. self.app.log.warning('request too long')
  43. self.transport.abort()
  44. self.untrusted_buffer.close()
  45. return
  46. self.len_untrusted_buffer += \
  47. self.untrusted_buffer.write(untrusted_data)
  48. def eof_received(self):
  49. try:
  50. src, method, dest, arg, untrusted_payload = \
  51. self.untrusted_buffer.getvalue().split(b'\0', 4)
  52. except ValueError:
  53. self.app.log.warning('framing error')
  54. self.transport.abort()
  55. return
  56. finally:
  57. self.untrusted_buffer.close()
  58. asyncio.ensure_future(self.respond(
  59. src, method, dest, arg, untrusted_payload=untrusted_payload))
  60. return True
  61. @asyncio.coroutine
  62. def respond(self, src, method, dest, arg, *, untrusted_payload):
  63. try:
  64. self.mgmt = self.handler(self.app, src, method, dest, arg,
  65. self.send_event)
  66. response = yield from self.mgmt.execute(
  67. untrusted_payload=untrusted_payload)
  68. assert not (self.event_sent and response)
  69. if self.transport is None:
  70. return
  71. # except clauses will fall through to transport.abort() below
  72. except qubes.api.PermissionDenied:
  73. self.app.log.warning(
  74. 'permission denied for call %s+%s (%s → %s) '
  75. 'with payload of %d bytes',
  76. method, arg, src, dest, len(untrusted_payload))
  77. except qubes.api.ProtocolError:
  78. self.app.log.warning(
  79. 'protocol error for call %s+%s (%s → %s) '
  80. 'with payload of %d bytes',
  81. method, arg, src, dest, len(untrusted_payload))
  82. except qubes.exc.QubesException as err:
  83. msg = ('%r while calling '
  84. 'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d')
  85. if self.debug:
  86. self.app.log.exception(msg,
  87. err, src, method, dest, arg, len(untrusted_payload))
  88. else:
  89. self.app.log.info(msg,
  90. err, src, method, dest, arg, len(untrusted_payload))
  91. if self.transport is not None:
  92. self.send_exception(err)
  93. self.transport.write_eof()
  94. self.transport.close()
  95. return
  96. except Exception: # pylint: disable=broad-except
  97. self.app.log.exception(
  98. 'unhandled exception while calling '
  99. 'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
  100. src, method, dest, arg, len(untrusted_payload))
  101. else:
  102. if not self.event_sent:
  103. self.send_response(response)
  104. try:
  105. self.transport.write_eof()
  106. except NotImplementedError:
  107. pass
  108. self.transport.close()
  109. return
  110. # this is reached if from except: blocks; do not put it in finally:,
  111. # because this will prevent the good case from sending the reply
  112. self.transport.abort()
  113. def send_header(self, *args):
  114. self.transport.write(self.header.pack(*args))
  115. def send_response(self, content):
  116. assert not self.event_sent
  117. self.send_header(0x30)
  118. if content is not None:
  119. self.transport.write(content.encode('utf-8'))
  120. def send_event(self, subject, event, **kwargs):
  121. self.event_sent = True
  122. self.send_header(0x31)
  123. if subject is not self.app:
  124. self.transport.write(subject.name.encode('ascii'))
  125. self.transport.write(b'\0')
  126. self.transport.write(event.encode('ascii') + b'\0')
  127. for k, v in kwargs.items():
  128. self.transport.write('{}\0{}\0'.format(k, str(v)).encode('ascii'))
  129. self.transport.write(b'\0')
  130. def send_exception(self, exc):
  131. self.send_header(0x32)
  132. self.transport.write(type(exc).__name__.encode() + b'\0')
  133. if self.debug:
  134. self.transport.write(''.join(traceback.format_exception(
  135. type(exc), exc, exc.__traceback__)).encode('utf-8'))
  136. self.transport.write(b'\0')
  137. self.transport.write(str(exc).encode('utf-8') + b'\0')
  138. def sighandler(loop, signame, server, server_internal):
  139. print('caught {}, exiting'.format(signame))
  140. server.close()
  141. server_internal.close()
  142. loop.stop()
  143. parser = qubes.tools.QubesArgumentParser(description='Qubes OS daemon')
  144. parser.add_argument('--debug', action='store_true', default=False,
  145. help='Enable verbose error logging (all exceptions with full '
  146. 'tracebacks) and also send tracebacks to Admin API clients')
  147. def main(args=None):
  148. loop = asyncio.get_event_loop()
  149. libvirtaio.virEventRegisterAsyncIOImpl(loop=loop)
  150. try:
  151. args = parser.parse_args(args)
  152. except:
  153. loop.close()
  154. raise
  155. args.app.vmm.register_event_handlers(args.app)
  156. try:
  157. os.unlink(QUBESD_SOCK)
  158. except FileNotFoundError:
  159. pass
  160. old_umask = os.umask(0o007)
  161. server = loop.run_until_complete(loop.create_unix_server(
  162. functools.partial(QubesDaemonProtocol, qubes.api.admin.QubesAdminAPI,
  163. app=args.app, debug=args.debug), QUBESD_SOCK))
  164. shutil.chown(QUBESD_SOCK, group='qubes')
  165. try:
  166. os.unlink(QUBESD_INTERNAL_SOCK)
  167. except FileNotFoundError:
  168. pass
  169. server_internal = loop.run_until_complete(loop.create_unix_server(
  170. functools.partial(QubesDaemonProtocol,
  171. qubes.api.internal.QubesInternalAPI,
  172. app=args.app, debug=args.debug), QUBESD_INTERNAL_SOCK))
  173. shutil.chown(QUBESD_INTERNAL_SOCK, group='qubes')
  174. os.umask(old_umask)
  175. del old_umask
  176. for signame in ('SIGINT', 'SIGTERM'):
  177. loop.add_signal_handler(getattr(signal, signame),
  178. sighandler, loop, signame, server, server_internal)
  179. qubes.utils.systemd_notify()
  180. # make sure children will not inherit this
  181. os.environ.pop('NOTIFY_SOCKET', None)
  182. try:
  183. loop.run_forever()
  184. loop.run_until_complete(asyncio.wait([
  185. server.wait_closed(),
  186. server_internal.wait_closed(),
  187. ]))
  188. finally:
  189. loop.close()
  190. if __name__ == '__main__':
  191. main()