qubesd.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. #!/usr/bin/env python3.6
  2. import asyncio
  3. import functools
  4. import io
  5. import os
  6. import shutil
  7. import signal
  8. import struct
  9. import traceback
  10. import libvirtaio
  11. import qubes
  12. import qubes.api
  13. import qubes.api.admin
  14. import qubes.api.internal
  15. import qubes.utils
  16. import qubes.vm.qubesvm
  17. QUBESD_SOCK = '/var/run/qubesd.sock'
  18. QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'
  19. class QubesDaemonProtocol(asyncio.Protocol):
  20. buffer_size = 65536
  21. header = struct.Struct('Bx')
  22. def __init__(self, handler, *args, app, debug=False, **kwargs):
  23. super().__init__(*args, **kwargs)
  24. self.handler = handler
  25. self.app = app
  26. self.untrusted_buffer = io.BytesIO()
  27. self.len_untrusted_buffer = 0
  28. self.transport = None
  29. self.debug = debug
  30. self.event_sent = False
  31. self.mgmt = None
  32. def connection_made(self, transport):
  33. print('connection_made()')
  34. self.transport = transport
  35. def connection_lost(self, exc):
  36. print('connection_lost(exc={!r})'.format(exc))
  37. self.untrusted_buffer.close()
  38. # for cancellable operation, interrupt it, otherwise it will do nothing
  39. if self.mgmt is not None:
  40. self.mgmt.cancel()
  41. self.transport = None
  42. def data_received(self, untrusted_data): # pylint: disable=arguments-differ
  43. print('data_received(untrusted_data={!r})'.format(untrusted_data))
  44. if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
  45. self.app.log.warning('request too long')
  46. self.transport.abort()
  47. self.untrusted_buffer.close()
  48. return
  49. self.len_untrusted_buffer += \
  50. self.untrusted_buffer.write(untrusted_data)
  51. def eof_received(self):
  52. print('eof_received()')
  53. try:
  54. src, method, dest, arg, untrusted_payload = \
  55. self.untrusted_buffer.getvalue().split(b'\0', 4)
  56. except ValueError:
  57. self.app.log.warning('framing error')
  58. self.transport.abort()
  59. return
  60. finally:
  61. self.untrusted_buffer.close()
  62. asyncio.ensure_future(self.respond(
  63. src, method, dest, arg, untrusted_payload=untrusted_payload))
  64. return True
  65. @asyncio.coroutine
  66. def respond(self, src, method, dest, arg, *, untrusted_payload):
  67. try:
  68. self.mgmt = self.handler(self.app, src, method, dest, arg,
  69. self.send_event)
  70. response = yield from self.mgmt.execute(
  71. untrusted_payload=untrusted_payload)
  72. assert not (self.event_sent and response)
  73. if self.transport is None:
  74. return
  75. # except clauses will fall through to transport.abort() below
  76. except qubes.api.PermissionDenied:
  77. self.app.log.warning(
  78. 'permission denied for call %s+%s (%s → %s) '
  79. 'with payload of %d bytes',
  80. method, arg, src, dest, len(untrusted_payload))
  81. except qubes.api.ProtocolError:
  82. self.app.log.warning(
  83. 'protocol error for call %s+%s (%s → %s) '
  84. 'with payload of %d bytes',
  85. method, arg, src, dest, len(untrusted_payload))
  86. except qubes.exc.QubesException as err:
  87. self.app.log.exception(
  88. 'error while calling '
  89. 'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
  90. src, method, dest, arg, len(untrusted_payload))
  91. if self.transport is not None:
  92. self.send_exception(err)
  93. self.transport.write_eof()
  94. self.transport.close()
  95. return
  96. except Exception: # pylint: disable=broad-except
  97. self.app.log.exception(
  98. 'unhandled exception while calling '
  99. 'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
  100. src, method, dest, arg, len(untrusted_payload))
  101. else:
  102. if not self.event_sent:
  103. self.send_response(response)
  104. try:
  105. self.transport.write_eof()
  106. except NotImplementedError:
  107. pass
  108. self.transport.close()
  109. return
  110. # this is reached if from except: blocks; do not put it in finally:,
  111. # because this will prevent the good case from sending the reply
  112. self.transport.abort()
  113. def send_header(self, *args):
  114. self.transport.write(self.header.pack(*args))
  115. def send_response(self, content):
  116. assert not self.event_sent
  117. self.send_header(0x30)
  118. if content is not None:
  119. self.transport.write(content.encode('utf-8'))
  120. def send_event(self, subject, event, **kwargs):
  121. self.event_sent = True
  122. self.send_header(0x31)
  123. if subject is not self.app:
  124. self.transport.write(subject.name.encode('ascii'))
  125. self.transport.write(b'\0')
  126. self.transport.write(event.encode('ascii') + b'\0')
  127. for k, v in kwargs.items():
  128. self.transport.write('{}\0{}\0'.format(k, str(v)).encode('ascii'))
  129. self.transport.write(b'\0')
  130. def send_exception(self, exc):
  131. self.send_header(0x32)
  132. self.transport.write(type(exc).__name__.encode() + b'\0')
  133. if self.debug:
  134. self.transport.write(''.join(traceback.format_exception(
  135. type(exc), exc, exc.__traceback__)).encode('utf-8'))
  136. self.transport.write(b'\0')
  137. self.transport.write(str(exc).encode('utf-8') + b'\0')
  138. def sighandler(loop, signame, server, server_internal):
  139. print('caught {}, exiting'.format(signame))
  140. server.close()
  141. server_internal.close()
  142. loop.stop()
  143. parser = qubes.tools.QubesArgumentParser(description='Qubes OS daemon')
  144. def main(args=None):
  145. loop = asyncio.get_event_loop()
  146. libvirtaio.virEventRegisterAsyncIOImpl(loop=loop)
  147. try:
  148. args = parser.parse_args(args)
  149. except:
  150. loop.close()
  151. raise
  152. args.app.vmm.register_event_handlers(args.app)
  153. try:
  154. os.unlink(QUBESD_SOCK)
  155. except FileNotFoundError:
  156. pass
  157. old_umask = os.umask(0o007)
  158. server = loop.run_until_complete(loop.create_unix_server(
  159. functools.partial(QubesDaemonProtocol, qubes.api.admin.QubesAdminAPI,
  160. app=args.app), QUBESD_SOCK))
  161. shutil.chown(QUBESD_SOCK, group='qubes')
  162. try:
  163. os.unlink(QUBESD_INTERNAL_SOCK)
  164. except FileNotFoundError:
  165. pass
  166. server_internal = loop.run_until_complete(loop.create_unix_server(
  167. functools.partial(QubesDaemonProtocol,
  168. qubes.api.internal.QubesInternalAPI,
  169. app=args.app), QUBESD_INTERNAL_SOCK))
  170. shutil.chown(QUBESD_INTERNAL_SOCK, group='qubes')
  171. os.umask(old_umask)
  172. del old_umask
  173. for signame in ('SIGINT', 'SIGTERM'):
  174. loop.add_signal_handler(getattr(signal, signame),
  175. sighandler, loop, signame, server, server_internal)
  176. qubes.utils.systemd_notify()
  177. try:
  178. loop.run_forever()
  179. loop.run_until_complete(asyncio.wait([
  180. server.wait_closed(),
  181. server_internal.wait_closed(),
  182. ]))
  183. finally:
  184. loop.close()
  185. if __name__ == '__main__':
  186. main()