events.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import socket
  21. import subprocess
  22. import qubesadmin.tests
  23. import unittest
  24. try:
  25. # qubesadmin.events require python3, so this tests can also use python3 features
  26. import asyncio
  27. import unittest.mock
  28. import qubesadmin.events
  29. except ImportError:
  30. # don't run any tests on python2
  31. def load_tests(loader, tests, pattern):
  32. return unittest.TestSuite()
  33. # don't fail on coroutine decorator
  34. class asyncio(object):
  35. @staticmethod
  36. def coroutine(f):
  37. return f
  38. class TC_00_Events(qubesadmin.tests.QubesTestCase):
  39. def setUp(self):
  40. super().setUp()
  41. self.app = unittest.mock.MagicMock()
  42. self.dispatcher = qubesadmin.events.EventsDispatcher(self.app)
  43. def test_000_handler_specific(self):
  44. handler = unittest.mock.Mock()
  45. self.dispatcher.add_handler('some-event', handler)
  46. self.dispatcher.handle('', 'some-event', arg1='value1')
  47. handler.assert_called_once_with(None, 'some-event', arg1='value1')
  48. handler.reset_mock()
  49. self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
  50. handler.assert_called_once_with(
  51. self.app.domains.get_blind('test-vm'), 'some-event', arg1='value1')
  52. handler.reset_mock()
  53. self.dispatcher.handle('', 'other-event', arg1='value1')
  54. self.assertFalse(handler.called)
  55. self.dispatcher.remove_handler('some-event', handler)
  56. self.dispatcher.handle('', 'some-event', arg1='value1')
  57. self.assertFalse(handler.called)
  58. def test_001_handler_glob(self):
  59. handler = unittest.mock.Mock()
  60. self.dispatcher.add_handler('*', handler)
  61. self.dispatcher.handle('', 'some-event', arg1='value1')
  62. handler.assert_called_once_with(None, 'some-event', arg1='value1')
  63. handler.reset_mock()
  64. self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
  65. handler.assert_called_once_with(
  66. self.app.domains.get_blind('test-vm'), 'some-event', arg1='value1')
  67. handler.reset_mock()
  68. self.dispatcher.handle('', 'other-event', arg1='value1')
  69. handler.assert_called_once_with(None, 'other-event', arg1='value1')
  70. handler.reset_mock()
  71. self.dispatcher.remove_handler('*', handler)
  72. self.dispatcher.handle('', 'some-event', arg1='value1')
  73. self.assertFalse(handler.called)
  74. def test_002_handler_glob_partial(self):
  75. handler = unittest.mock.Mock()
  76. self.dispatcher.add_handler('some-*', handler)
  77. self.dispatcher.handle('', 'some-event', arg1='value1')
  78. handler.assert_called_once_with(None, 'some-event', arg1='value1')
  79. handler.reset_mock()
  80. self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
  81. handler.assert_called_once_with(
  82. self.app.domains.get_blind('test-vm'), 'some-event', arg1='value1')
  83. handler.reset_mock()
  84. self.dispatcher.handle('', 'other-event', arg1='value1')
  85. self.assertFalse(handler.called)
  86. handler.reset_mock()
  87. self.dispatcher.remove_handler('some-*', handler)
  88. self.dispatcher.handle('', 'some-event', arg1='value1')
  89. self.assertFalse(handler.called)
  90. @asyncio.coroutine
  91. def mock_get_events_reader(self, stream, cleanup_func, expected_vm,
  92. vm=None):
  93. self.assertEqual(expected_vm, vm)
  94. return stream, cleanup_func
  95. @asyncio.coroutine
  96. def send_events(self, stream, events):
  97. for event in events:
  98. stream.feed_data(event)
  99. # don't use yield from...
  100. sleep = asyncio.sleep(0.01)
  101. for x in iter(lambda: sleep.send(None), None):
  102. yield x
  103. stream.feed_eof()
  104. def test_010_listen_for_events(self):
  105. loop = asyncio.new_event_loop()
  106. asyncio.set_event_loop(loop)
  107. stream = asyncio.StreamReader()
  108. cleanup_func = unittest.mock.Mock()
  109. self.dispatcher._get_events_reader = \
  110. lambda vm: self.mock_get_events_reader(stream, cleanup_func,
  111. None, vm)
  112. handler = unittest.mock.Mock()
  113. self.dispatcher.add_handler('some-event', handler)
  114. events = [
  115. b'1\0\0some-event\0arg1\0value1\0\0',
  116. b'1\0some-vm\0some-event\0arg1\0value1\0\0',
  117. b'1\0some-vm\0some-event\0arg_without_value\0\0arg2\0value\0\0',
  118. b'1\0some-vm\0other-event\0\0',
  119. ]
  120. asyncio.ensure_future(self.send_events(stream, events))
  121. loop.run_until_complete(self.dispatcher.listen_for_events(
  122. reconnect=False))
  123. self.assertEqual(handler.mock_calls, [
  124. unittest.mock.call(None, 'some-event', arg1='value1'),
  125. unittest.mock.call(
  126. self.app.domains.get_blind('some-vm'), 'some-event',
  127. arg1='value1'),
  128. unittest.mock.call(
  129. self.app.domains.get_blind('some-vm'), 'some-event',
  130. arg_without_value='', arg2='value'),
  131. ])
  132. cleanup_func.assert_called_once_with()
  133. loop.close()
  134. def mock_open_unix_connection(self, expected_path, sock, path):
  135. self.assertEqual(expected_path, path)
  136. return asyncio.open_connection(sock=sock)
  137. def read_all(self, sock):
  138. buf = b''
  139. for data in iter(lambda: sock.recv(4096), b''):
  140. buf += data
  141. return buf
  142. def test_020_get_events_reader_local(self):
  143. self.app.qubesd_connection_type = 'socket'
  144. loop = asyncio.new_event_loop()
  145. asyncio.set_event_loop(loop)
  146. sock1, sock2 = socket.socketpair()
  147. with unittest.mock.patch('asyncio.open_unix_connection',
  148. lambda path: self.mock_open_unix_connection(
  149. qubesadmin.config.QUBESD_SOCKET, sock1, path)):
  150. task = asyncio.ensure_future(self.dispatcher._get_events_reader())
  151. reader = asyncio.ensure_future(loop.run_in_executor(None,
  152. self.read_all, sock2))
  153. loop.run_until_complete(asyncio.wait([task, reader]))
  154. self.assertEqual(reader.result(),
  155. b'admin.Events+ dom0 name dom0\0')
  156. self.assertIsInstance(task.result()[0], asyncio.StreamReader)
  157. cleanup_func = task.result()[1]
  158. cleanup_func()
  159. sock2.close()
  160. # run socket cleanup functions
  161. loop.stop()
  162. loop.run_forever()
  163. loop.close()
  164. def test_021_get_events_reader_local_vm(self):
  165. self.app.qubesd_connection_type = 'socket'
  166. loop = asyncio.new_event_loop()
  167. asyncio.set_event_loop(loop)
  168. sock1, sock2 = socket.socketpair()
  169. vm = unittest.mock.Mock()
  170. vm.name = 'test-vm'
  171. with unittest.mock.patch('asyncio.open_unix_connection',
  172. lambda path: self.mock_open_unix_connection(
  173. qubesadmin.config.QUBESD_SOCKET, sock1, path)):
  174. task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
  175. reader = asyncio.ensure_future(loop.run_in_executor(None,
  176. self.read_all, sock2))
  177. loop.run_until_complete(asyncio.wait([task, reader]))
  178. self.assertEqual(reader.result(),
  179. b'admin.Events+ dom0 name test-vm\0')
  180. self.assertIsInstance(task.result()[0], asyncio.StreamReader)
  181. cleanup_func = task.result()[1]
  182. cleanup_func()
  183. sock2.close()
  184. # run socket cleanup functions
  185. loop.stop()
  186. loop.run_forever()
  187. loop.close()
  188. @asyncio.coroutine
  189. def mock_coroutine(self, mock, *args, **kwargs):
  190. return mock(*args, **kwargs)
  191. def test_022_get_events_reader_remote(self):
  192. self.app.qubesd_connection_type = 'qrexec'
  193. loop = asyncio.new_event_loop()
  194. asyncio.set_event_loop(loop)
  195. mock_proc = unittest.mock.Mock()
  196. with unittest.mock.patch('asyncio.create_subprocess_exec',
  197. lambda *args, **kwargs: self.mock_coroutine(mock_proc,
  198. *args, **kwargs)):
  199. task = asyncio.ensure_future(self.dispatcher._get_events_reader())
  200. loop.run_until_complete(task)
  201. self.assertEqual(mock_proc.mock_calls, [
  202. unittest.mock.call('qrexec-client-vm', 'dom0',
  203. 'admin.Events', stdin=subprocess.PIPE,
  204. stdout=subprocess.PIPE),
  205. unittest.mock.call().stdin.write_eof()
  206. ])
  207. self.assertEqual(task.result()[0], mock_proc().stdout)
  208. cleanup_func = task.result()[1]
  209. cleanup_func()
  210. unittest.mock.call().kill.assert_called_once_with()
  211. loop.close()
  212. def test_023_get_events_reader_remote_vm(self):
  213. self.app.qubesd_connection_type = 'qrexec'
  214. loop = asyncio.new_event_loop()
  215. asyncio.set_event_loop(loop)
  216. mock_proc = unittest.mock.Mock()
  217. vm = unittest.mock.Mock()
  218. vm.name = 'test-vm'
  219. with unittest.mock.patch('asyncio.create_subprocess_exec',
  220. lambda *args, **kwargs: self.mock_coroutine(mock_proc,
  221. *args, **kwargs)):
  222. task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
  223. loop.run_until_complete(task)
  224. self.assertEqual(mock_proc.mock_calls, [
  225. unittest.mock.call('qrexec-client-vm', 'test-vm',
  226. 'admin.Events', stdin=subprocess.PIPE,
  227. stdout=subprocess.PIPE),
  228. unittest.mock.call().stdin.write_eof()
  229. ])
  230. self.assertEqual(task.result()[0], mock_proc().stdout)
  231. cleanup_func = task.result()[1]
  232. cleanup_func()
  233. unittest.mock.call().kill.assert_called_once_with()
  234. loop.close()
  235. def test_030_events_device(self):
  236. handler = unittest.mock.Mock()
  237. self.dispatcher.add_handler('device-attach:test', handler)
  238. self.dispatcher.handle('test-vm', 'device-attach:test',
  239. device='test-vm2:dev', options='{}')
  240. vm = self.app.domains.get_blind('test-vm')
  241. dev = self.app.domains.get_blind('test-vm2').devices['test']['dev']
  242. handler.assert_called_once_with(vm, 'device-attach:test', device=dev,
  243. options='{}')