diff --git a/qubesmgmt/tests/events.py b/qubesmgmt/tests/events.py new file mode 100644 index 0000000..a593494 --- /dev/null +++ b/qubesmgmt/tests/events.py @@ -0,0 +1,234 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, see . + + + +import socket +import subprocess +import qubesmgmt.tests +import unittest +try: + # qubesmgmt.events require python3, so this tests can also use python3 features + import asyncio + import unittest.mock + import qubesmgmt.events +except ImportError: + # don't run any tests on python2 + def load_tests(loader, tests, pattern): + return unittest.TestSuite() + # don't fail on coroutine decorator + class asyncio(object): + @staticmethod + def coroutine(f): + return f + + +class TC_00_Events(qubesmgmt.tests.QubesTestCase): + def setUp(self): + super().setUp() + self.app = unittest.mock.MagicMock() + self.dispatcher = qubesmgmt.events.EventsDispatcher(self.app) + + def test_000_handler_specific(self): + handler = unittest.mock.Mock() + self.dispatcher.add_handler('some-event', handler) + self.dispatcher.handle('', 'some-event', arg1='value1') + handler.assert_called_once_with(None, 'some-event', arg1='value1') + handler.reset_mock() + self.dispatcher.handle('test-vm', 'some-event', arg1='value1') + handler.assert_called_once_with( + self.app.domains['test-vm'], 'some-event', arg1='value1') + handler.reset_mock() + self.dispatcher.handle('', 'other-event', arg1='value1') + self.assertFalse(handler.called) + self.dispatcher.remove_handler('some-event', handler) + self.dispatcher.handle('', 'some-event', arg1='value1') + self.assertFalse(handler.called) + + def test_001_handler_glob(self): + handler = unittest.mock.Mock() + self.dispatcher.add_handler('*', handler) + self.dispatcher.handle('', 'some-event', arg1='value1') + handler.assert_called_once_with(None, 'some-event', arg1='value1') + handler.reset_mock() + self.dispatcher.handle('test-vm', 'some-event', arg1='value1') + handler.assert_called_once_with( + self.app.domains['test-vm'], 'some-event', arg1='value1') + handler.reset_mock() + self.dispatcher.handle('', 'other-event', arg1='value1') + handler.assert_called_once_with(None, 'other-event', arg1='value1') + handler.reset_mock() + self.dispatcher.remove_handler('*', handler) + self.dispatcher.handle('', 'some-event', arg1='value1') + self.assertFalse(handler.called) + + @asyncio.coroutine + def mock_get_events_reader(self, stream, cleanup_func, expected_vm, + vm=None): + self.assertEqual(expected_vm, vm) + return stream, cleanup_func + + @asyncio.coroutine + def send_events(self, stream, events): + for event in events: + stream.feed_data(event) + # don't use yield from... + for x in asyncio.sleep(0.01): + yield x + stream.feed_eof() + + def test_010_listen_for_events(self): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + stream = asyncio.StreamReader() + cleanup_func = unittest.mock.Mock() + self.dispatcher._get_events_reader = \ + lambda vm: self.mock_get_events_reader(stream, cleanup_func, + None, vm) + handler = unittest.mock.Mock() + self.dispatcher.add_handler('some-event', handler) + events = [ + b'1\0\0some-event\0arg1\0value1\0\0', + b'1\0some-vm\0some-event\0arg1\0value1\0\0', + b'1\0some-vm\0some-event\0\0', + b'1\0some-vm\0other-event\0\0', + ] + asyncio.ensure_future(self.send_events(stream, events)) + loop.run_until_complete(self.dispatcher.listen_for_events()) + self.assertEqual(handler.mock_calls, [ + unittest.mock.call(None, 'some-event', arg1='value1'), + unittest.mock.call( + self.app.domains['some-vm'], 'some-event', arg1='value1'), + unittest.mock.call(self.app.domains['some-vm'], 'some-event'), + ]) + cleanup_func.assert_called_once_with() + loop.close() + + def mock_open_unix_connection(self, expected_path, sock, path): + self.assertEqual(expected_path, path) + return asyncio.open_connection(sock=sock) + + def read_all(self, sock): + buf = b'' + for data in iter(lambda: sock.recv(4096), b''): + buf += data + return buf + + def test_020_get_events_reader_local(self): + self.app.qubesd_connection_type = 'socket' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + sock1, sock2 = socket.socketpair() + with unittest.mock.patch('asyncio.open_unix_connection', + lambda path: self.mock_open_unix_connection( + qubesmgmt.config.QUBESD_SOCKET, sock1, path)): + task = asyncio.ensure_future(self.dispatcher._get_events_reader()) + reader = asyncio.ensure_future(loop.run_in_executor(None, + self.read_all, sock2)) + loop.run_until_complete(asyncio.wait([task, reader])) + self.assertEqual(reader.result(), + b'dom0\0mgmt.Events\0dom0\0\0') + self.assertIsInstance(task.result()[0], asyncio.StreamReader) + cleanup_func = task.result()[1] + cleanup_func() + sock2.close() + + # run socket cleanup functions + loop.stop() + loop.run_forever() + loop.close() + + def test_021_get_events_reader_local_vm(self): + self.app.qubesd_connection_type = 'socket' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + sock1, sock2 = socket.socketpair() + vm = unittest.mock.Mock() + vm.name = 'test-vm' + with unittest.mock.patch('asyncio.open_unix_connection', + lambda path: self.mock_open_unix_connection( + qubesmgmt.config.QUBESD_SOCKET, sock1, path)): + task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm)) + reader = asyncio.ensure_future(loop.run_in_executor(None, + self.read_all, sock2)) + loop.run_until_complete(asyncio.wait([task, reader])) + self.assertEqual(reader.result(), + b'dom0\0mgmt.Events\0test-vm\0\0') + self.assertIsInstance(task.result()[0], asyncio.StreamReader) + cleanup_func = task.result()[1] + cleanup_func() + sock2.close() + + # run socket cleanup functions + loop.stop() + loop.run_forever() + loop.close() + + @asyncio.coroutine + def mock_coroutine(self, mock, *args, **kwargs): + return mock(*args, **kwargs) + + def test_022_get_events_reader_remote(self): + self.app.qubesd_connection_type = 'qrexec' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + mock_proc = unittest.mock.Mock() + with unittest.mock.patch('asyncio.create_subprocess_exec', + lambda *args, **kwargs: self.mock_coroutine(mock_proc, + *args, **kwargs)): + task = asyncio.ensure_future(self.dispatcher._get_events_reader()) + loop.run_until_complete(task) + self.assertEqual(mock_proc.mock_calls, [ + unittest.mock.call(['qrexec-client-vm', 'dom0', + 'mgmt.Events'], stdin=subprocess.PIPE, + stdout=subprocess.PIPE), + unittest.mock.call().stdin.write_eof() + ]) + self.assertEqual(task.result()[0], mock_proc().stdout) + cleanup_func = task.result()[1] + cleanup_func() + unittest.mock.call().kill.assert_called_once_with() + + loop.close() + + def test_023_get_events_reader_remote_vm(self): + self.app.qubesd_connection_type = 'qrexec' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + mock_proc = unittest.mock.Mock() + vm = unittest.mock.Mock() + vm.name = 'test-vm' + with unittest.mock.patch('asyncio.create_subprocess_exec', + lambda *args, **kwargs: self.mock_coroutine(mock_proc, + *args, **kwargs)): + task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm)) + loop.run_until_complete(task) + self.assertEqual(mock_proc.mock_calls, [ + unittest.mock.call(['qrexec-client-vm', 'test-vm', + 'mgmt.Events'], stdin=subprocess.PIPE, + stdout=subprocess.PIPE), + unittest.mock.call().stdin.write_eof() + ]) + self.assertEqual(task.result()[0], mock_proc().stdout) + cleanup_func = task.result()[1] + cleanup_func() + unittest.mock.call().kill.assert_called_once_with() + + loop.close()