qubesd.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. #!/usr/bin/env python3.6
  2. import asyncio
  3. import functools
  4. import io
  5. import os
  6. import shutil
  7. import signal
  8. import struct
  9. import traceback
  10. import libvirtaio
  11. import qubes
  12. import qubes.mgmt
  13. import qubes.mgmtinternal
  14. import qubes.utils
  15. import qubes.vm.qubesvm
  16. QUBESD_SOCK = '/var/run/qubesd.sock'
  17. QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'
  18. class QubesDaemonProtocol(asyncio.Protocol):
  19. buffer_size = 65536
  20. header = struct.Struct('Bx')
  21. def __init__(self, handler, *args, app, debug=False, **kwargs):
  22. super().__init__(*args, **kwargs)
  23. self.handler = handler
  24. self.app = app
  25. self.untrusted_buffer = io.BytesIO()
  26. self.len_untrusted_buffer = 0
  27. self.transport = None
  28. self.debug = debug
  29. def connection_made(self, transport):
  30. print('connection_made()')
  31. self.transport = transport
  32. def connection_lost(self, exc):
  33. print('connection_lost(exc={!r})'.format(exc))
  34. self.untrusted_buffer.close()
  35. def data_received(self, untrusted_data):
  36. print('data_received(untrusted_data={!r})'.format(untrusted_data))
  37. if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
  38. self.app.log.warning('request too long')
  39. self.transport.abort()
  40. self.untrusted_buffer.close()
  41. return
  42. self.len_untrusted_buffer += \
  43. self.untrusted_buffer.write(untrusted_data)
  44. def eof_received(self):
  45. print('eof_received()')
  46. try:
  47. src, method, dest, arg, untrusted_payload = \
  48. self.untrusted_buffer.getvalue().split(b'\0', 4)
  49. except ValueError:
  50. self.app.log.warning('framing error')
  51. self.transport.abort()
  52. return
  53. finally:
  54. self.untrusted_buffer.close()
  55. asyncio.ensure_future(self.respond(
  56. src, method, dest, arg, untrusted_payload=untrusted_payload))
  57. return True
  58. @asyncio.coroutine
  59. def respond(self, src, method, dest, arg, *, untrusted_payload):
  60. try:
  61. mgmt = self.handler(self.app, src, method, dest, arg)
  62. response = yield from mgmt.execute(
  63. untrusted_payload=untrusted_payload)
  64. # except clauses will fall through to transport.abort() below
  65. except qubes.mgmt.PermissionDenied:
  66. self.app.log.warning(
  67. 'permission denied for call %s+%s (%s → %s) '
  68. 'with payload of %d bytes',
  69. method, arg, src, dest, len(untrusted_payload))
  70. except qubes.mgmt.ProtocolError:
  71. self.app.log.warning(
  72. 'protocol error for call %s+%s (%s → %s) '
  73. 'with payload of %d bytes',
  74. method, arg, src, dest, len(untrusted_payload))
  75. except qubes.exc.QubesException as err:
  76. self.send_exception(err)
  77. self.transport.write_eof()
  78. self.transport.close()
  79. return
  80. except Exception: # pylint: disable=broad-except
  81. self.app.log.exception(
  82. 'unhandled exception while calling '
  83. 'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
  84. src, method, dest, arg, len(untrusted_payload))
  85. else:
  86. self.send_response(response)
  87. try:
  88. self.transport.write_eof()
  89. except NotImplementedError:
  90. pass
  91. self.transport.close()
  92. return
  93. # this is reached if from except: blocks; do not put it in finally:,
  94. # because this will prevent the good case from sending the reply
  95. self.transport.abort()
  96. def send_header(self, *args):
  97. self.transport.write(self.header.pack(*args))
  98. def send_response(self, content):
  99. self.send_header(0x30)
  100. if content is not None:
  101. self.transport.write(content.encode('utf-8'))
  102. def send_event(self, subject, event, **kwargs):
  103. self.send_header(0x31)
  104. if subject is not self.app:
  105. self.transport.write(subject.name.encode('ascii'))
  106. self.transport.write(b'\0')
  107. self.transport.write(event.encode('ascii') + b'\0')
  108. for k, v in kwargs.items():
  109. self.transport.write('{}\0{}\0'.format(k, str(v)).encode('ascii'))
  110. self.transport.write(b'\0')
  111. def send_exception(self, exc):
  112. self.send_header(0x32)
  113. self.transport.write(type(exc).__name__ + b'\0')
  114. if self.debug:
  115. self.transport.write(''.join(traceback.format_exception(
  116. type(exc), exc, exc.__traceback__)).encode('utf-8'))
  117. self.transport.write(b'\0')
  118. self.transport.write(str(exc).encode('utf-8') + b'\0')
  119. def sighandler(loop, signame, server, server_internal):
  120. print('caught {}, exiting'.format(signame))
  121. server.close()
  122. server_internal.close()
  123. loop.stop()
  124. parser = qubes.tools.QubesArgumentParser(description='Qubes OS daemon')
  125. def main(args=None):
  126. args = parser.parse_args(args)
  127. loop = asyncio.get_event_loop()
  128. libvirtaio.virEventRegisterAsyncIOImpl(loop=loop)
  129. try:
  130. os.unlink(QUBESD_SOCK)
  131. except FileNotFoundError:
  132. pass
  133. old_umask = os.umask(0o007)
  134. server = loop.run_until_complete(loop.create_unix_server(
  135. functools.partial(QubesDaemonProtocol, qubes.mgmt.QubesMgmt,
  136. app=args.app), QUBESD_SOCK))
  137. shutil.chown(QUBESD_SOCK, group='qubes')
  138. try:
  139. os.unlink(QUBESD_INTERNAL_SOCK)
  140. except FileNotFoundError:
  141. pass
  142. server_internal = loop.run_until_complete(loop.create_unix_server(
  143. functools.partial(QubesDaemonProtocol,
  144. qubes.mgmtinternal.QubesInternalMgmt,
  145. app=args.app), QUBESD_INTERNAL_SOCK))
  146. shutil.chown(QUBESD_INTERNAL_SOCK, group='qubes')
  147. os.umask(old_umask)
  148. del old_umask
  149. for signame in ('SIGINT', 'SIGTERM'):
  150. loop.add_signal_handler(getattr(signal, signame),
  151. sighandler, loop, signame, server, server_internal)
  152. qubes.utils.systemd_notify()
  153. try:
  154. loop.run_forever()
  155. loop.run_until_complete(asyncio.wait([
  156. server.wait_closed(),
  157. server_internal.wait_closed(),
  158. ]))
  159. finally:
  160. loop.close()
  161. if __name__ == '__main__':
  162. main()