api.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import asyncio
  21. import socket
  22. import unittest.mock
  23. import qubes.api
  24. import qubes.tests
  25. class TestMgmt(object):
  26. def __init__(self, app, src, method, dest, arg, send_event=None):
  27. self.app = app
  28. self.src = src
  29. self.method = method
  30. self.dest = dest
  31. self.arg = arg
  32. self.send_event = send_event
  33. try:
  34. self.function = {
  35. 'mgmt.success': self.success,
  36. 'mgmt.success_none': self.success_none,
  37. 'mgmt.qubesexception': self.qubesexception,
  38. 'mgmt.exception': self.exception,
  39. 'mgmt.event': self.event,
  40. }[self.method.decode()]
  41. except KeyError:
  42. raise qubes.api.ProtocolError('Invalid method')
  43. def execute(self, untrusted_payload):
  44. self.task = asyncio.Task(self.function(
  45. untrusted_payload=untrusted_payload))
  46. return self.task
  47. def cancel(self):
  48. self.task.cancel()
  49. @asyncio.coroutine
  50. def success(self, untrusted_payload):
  51. return 'src: {!r}, dest: {!r}, arg: {!r}, payload: {!r}'.format(
  52. self.src, self.dest, self.arg, untrusted_payload
  53. )
  54. @asyncio.coroutine
  55. def success_none(self, untrusted_payload):
  56. pass
  57. @asyncio.coroutine
  58. def qubesexception(self, untrusted_payload):
  59. raise qubes.exc.QubesException('qubes-exception')
  60. @asyncio.coroutine
  61. def exception(self, untrusted_payload):
  62. raise Exception('exception')
  63. @asyncio.coroutine
  64. def event(self, untrusted_payload):
  65. future = asyncio.get_event_loop().create_future()
  66. class Subject:
  67. name = 'subject'
  68. def __str__(self):
  69. return 'subject'
  70. self.send_event(Subject(), 'event', payload=untrusted_payload.decode())
  71. try:
  72. # give some time to close the other end
  73. yield from asyncio.sleep(0.1)
  74. # should be canceled
  75. self.send_event(Subject, 'event2',
  76. payload=untrusted_payload.decode())
  77. yield from future
  78. except asyncio.CancelledError:
  79. pass
  80. class TC_00_QubesDaemonProtocol(qubes.tests.QubesTestCase):
  81. def setUp(self):
  82. super(TC_00_QubesDaemonProtocol, self).setUp()
  83. self.app = unittest.mock.Mock()
  84. self.app.log = self.log
  85. self.loop = asyncio.new_event_loop()
  86. asyncio.set_event_loop(self.loop)
  87. self.sock_client, self.sock_server = socket.socketpair()
  88. self.reader, self.writer = self.loop.run_until_complete(
  89. asyncio.open_connection(sock=self.sock_client))
  90. connect_coro = self.loop.create_connection(
  91. lambda: qubes.api.QubesDaemonProtocol(
  92. TestMgmt, app=self.app),
  93. sock=self.sock_server)
  94. self.transport, self.protocol = self.loop.run_until_complete(
  95. connect_coro)
  96. def tearDown(self):
  97. self.sock_server.close()
  98. self.sock_client.close()
  99. self.loop.stop()
  100. self.loop.run_forever()
  101. self.loop.close()
  102. super(TC_00_QubesDaemonProtocol, self).tearDown()
  103. def test_000_message_ok(self):
  104. self.writer.write(b'dom0\0mgmt.success\0dom0\0arg\0payload')
  105. self.writer.write_eof()
  106. with self.assertNotRaises(asyncio.TimeoutError):
  107. response = self.loop.run_until_complete(
  108. asyncio.wait_for(self.reader.read(), 1))
  109. self.assertEqual(response,
  110. b"0\0src: b'dom0', dest: b'dom0', arg: b'arg', payload: b'payload'")
  111. def test_001_message_ok_in_parts(self):
  112. self.writer.write(b'dom0\0mgmt.')
  113. self.loop.run_until_complete(self.writer.drain())
  114. self.writer.write(b'success\0dom0\0arg\0payload')
  115. self.writer.write_eof()
  116. with self.assertNotRaises(asyncio.TimeoutError):
  117. response = self.loop.run_until_complete(
  118. asyncio.wait_for(self.reader.read(), 1))
  119. self.assertEqual(response,
  120. b"0\0src: b'dom0', dest: b'dom0', arg: b'arg', payload: b'payload'")
  121. def test_002_message_ok_empty(self):
  122. self.writer.write(b'dom0\0mgmt.success_none\0dom0\0arg\0payload')
  123. self.writer.write_eof()
  124. with self.assertNotRaises(asyncio.TimeoutError):
  125. response = self.loop.run_until_complete(
  126. asyncio.wait_for(self.reader.read(), 1))
  127. self.assertEqual(response, b"0\0")
  128. def test_003_exception_qubes(self):
  129. self.writer.write(b'dom0\0mgmt.qubesexception\0dom0\0arg\0payload')
  130. self.writer.write_eof()
  131. with self.assertNotRaises(asyncio.TimeoutError):
  132. response = self.loop.run_until_complete(
  133. asyncio.wait_for(self.reader.read(), 1))
  134. self.assertEqual(response, b"2\0QubesException\0\0qubes-exception\0")
  135. def test_004_exception_generic(self):
  136. self.writer.write(b'dom0\0mgmt.exception\0dom0\0arg\0payload')
  137. self.writer.write_eof()
  138. with self.assertNotRaises(asyncio.TimeoutError):
  139. response = self.loop.run_until_complete(
  140. asyncio.wait_for(self.reader.read(), 1))
  141. self.assertEqual(response, b"")
  142. def test_005_event(self):
  143. self.writer.write(b'dom0\0mgmt.event\0dom0\0arg\0payload')
  144. self.writer.write_eof()
  145. with self.assertNotRaises(asyncio.TimeoutError):
  146. response = self.loop.run_until_complete(
  147. asyncio.wait_for(self.reader.readuntil(b'\0\0'), 1))
  148. self.assertEqual(response, b"1\0subject\0event\0payload\0payload\0\0")
  149. # this will trigger connection_lost, but only when next event is sent
  150. self.sock_client.shutdown(socket.SHUT_RD)
  151. # check if event-producing method is interrupted
  152. with self.assertNotRaises(asyncio.TimeoutError):
  153. self.loop.run_until_complete(
  154. asyncio.wait_for(self.protocol.mgmt.task, 1))