qubesd.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #!/usr/bin/env python3.6
  2. import asyncio
  3. import functools
  4. import io
  5. import os
  6. import shutil
  7. import signal
  8. import struct
  9. import traceback
  10. import libvirtaio
  11. import qubes
  12. import qubes.mgmt
  13. import qubes.utils
  14. import qubes.vm.qubesvm
  15. QUBESD_SOCK = '/var/run/qubesd.sock'
  16. class QubesDaemonProtocol(asyncio.Protocol):
  17. buffer_size = 65536
  18. header = struct.Struct('Bx')
  19. def __init__(self, *args, app, debug=False, **kwargs):
  20. super().__init__(*args, **kwargs)
  21. self.app = app
  22. self.untrusted_buffer = io.BytesIO()
  23. self.len_untrusted_buffer = 0
  24. self.transport = None
  25. self.debug = debug
  26. def connection_made(self, transport):
  27. print('connection_made()')
  28. self.transport = transport
  29. def connection_lost(self, exc):
  30. print('connection_lost(exc={!r})'.format(exc))
  31. self.untrusted_buffer.close()
  32. def data_received(self, untrusted_data):
  33. print('data_received(untrusted_data={!r})'.format(untrusted_data))
  34. if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
  35. self.app.log.warning('request too long')
  36. self.transport.abort()
  37. self.untrusted_buffer.close()
  38. return
  39. self.len_untrusted_buffer += \
  40. self.untrusted_buffer.write(untrusted_data)
  41. def eof_received(self):
  42. print('eof_received()')
  43. try:
  44. src, method, dest, arg, untrusted_payload = \
  45. self.untrusted_buffer.getvalue().split(b'\0', 4)
  46. except ValueError:
  47. self.app.log.warning('framing error')
  48. self.transport.abort()
  49. return
  50. finally:
  51. self.untrusted_buffer.close()
  52. asyncio.ensure_future(self.respond(
  53. src, method, dest, arg, untrusted_payload=untrusted_payload))
  54. return True
  55. @asyncio.coroutine
  56. def respond(self, src, method, dest, arg, *, untrusted_payload):
  57. try:
  58. mgmt = qubes.mgmt.QubesMgmt(self.app, src, method, dest, arg)
  59. response = yield from mgmt.execute(
  60. untrusted_payload=untrusted_payload)
  61. # except clauses will fall through to transport.abort() below
  62. except qubes.mgmt.PermissionDenied:
  63. self.app.log.warning(
  64. 'permission denied for call %s+%s (%s → %s) '
  65. 'with payload of %d bytes',
  66. method, arg, src, dest, len(untrusted_payload))
  67. except qubes.mgmt.ProtocolError:
  68. self.app.log.warning(
  69. 'protocol error for call %s+%s (%s → %s) '
  70. 'with payload of %d bytes',
  71. method, arg, src, dest, len(untrusted_payload))
  72. except qubes.exc.QubesException as err:
  73. self.send_exception(err)
  74. self.transport.write_eof()
  75. self.transport.close()
  76. return
  77. except Exception: # pylint: disable=broad-except
  78. self.app.log.exception(
  79. 'unhandled exception while calling '
  80. 'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
  81. src, method, dest, arg, len(untrusted_payload))
  82. else:
  83. self.send_response(response)
  84. try:
  85. self.transport.write_eof()
  86. except NotImplementedError:
  87. pass
  88. self.transport.close()
  89. return
  90. # this is reached if from except: blocks; do not put it in finally:,
  91. # because this will prevent the good case from sending the reply
  92. self.transport.abort()
  93. def send_header(self, *args):
  94. self.transport.write(self.header.pack(*args))
  95. def send_response(self, content):
  96. self.send_header(0x30)
  97. if content is not None:
  98. self.transport.write(content.encode('utf-8'))
  99. def send_event(self, subject, event, **kwargs):
  100. self.send_header(0x31)
  101. if subject is not self.app:
  102. self.transport.write(subject.name.encode('ascii'))
  103. self.transport.write(b'\0')
  104. self.transport.write(event.encode('ascii') + b'\0')
  105. for k, v in kwargs.items():
  106. self.transport.write('{}\0{}\0'.format(k, str(v)).encode('ascii'))
  107. self.transport.write(b'\0')
  108. def send_exception(self, exc):
  109. self.send_header(0x32)
  110. self.transport.write(type(exc).__name__ + b'\0')
  111. if self.debug:
  112. self.transport.write(''.join(traceback.format_exception(
  113. type(exc), exc, exc.__traceback__)).encode('utf-8'))
  114. self.transport.write(b'\0')
  115. self.transport.write(str(exc).encode('utf-8') + b'\0')
  116. def sighandler(loop, signame, server):
  117. print('caught {}, exiting'.format(signame))
  118. server.close()
  119. loop.stop()
  120. parser = qubes.tools.QubesArgumentParser(description='Qubes OS daemon')
  121. def main(args=None):
  122. args = parser.parse_args(args)
  123. loop = asyncio.get_event_loop()
  124. libvirtaio.virEventRegisterAsyncIOImpl(loop=loop)
  125. try:
  126. os.unlink(QUBESD_SOCK)
  127. except FileNotFoundError:
  128. pass
  129. old_umask = os.umask(0o007)
  130. server = loop.run_until_complete(loop.create_unix_server(
  131. functools.partial(QubesDaemonProtocol, app=args.app), QUBESD_SOCK))
  132. shutil.chown(QUBESD_SOCK, group='qubes')
  133. os.umask(old_umask)
  134. del old_umask
  135. for signame in ('SIGINT', 'SIGTERM'):
  136. loop.add_signal_handler(getattr(signal, signame),
  137. sighandler, loop, signame, server)
  138. qubes.utils.systemd_notify()
  139. try:
  140. loop.run_forever()
  141. loop.run_until_complete(server.wait_closed())
  142. finally:
  143. loop.close()
  144. if __name__ == '__main__':
  145. main()