# -*- encoding: utf8 -*- # # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2017 Marek Marczykowski-Górecki # # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License along # with this program; if not, see . import socket import subprocess import qubesmgmt.tests import unittest try: # qubesmgmt.events require python3, so this tests can also use python3 features import asyncio import unittest.mock import qubesmgmt.events except ImportError: # don't run any tests on python2 def load_tests(loader, tests, pattern): return unittest.TestSuite() # don't fail on coroutine decorator class asyncio(object): @staticmethod def coroutine(f): return f class TC_00_Events(qubesmgmt.tests.QubesTestCase): def setUp(self): super().setUp() self.app = unittest.mock.MagicMock() self.dispatcher = qubesmgmt.events.EventsDispatcher(self.app) def test_000_handler_specific(self): handler = unittest.mock.Mock() self.dispatcher.add_handler('some-event', handler) self.dispatcher.handle('', 'some-event', arg1='value1') handler.assert_called_once_with(None, 'some-event', arg1='value1') handler.reset_mock() self.dispatcher.handle('test-vm', 'some-event', arg1='value1') handler.assert_called_once_with( self.app.domains['test-vm'], 'some-event', arg1='value1') handler.reset_mock() self.dispatcher.handle('', 'other-event', arg1='value1') self.assertFalse(handler.called) self.dispatcher.remove_handler('some-event', handler) self.dispatcher.handle('', 'some-event', arg1='value1') self.assertFalse(handler.called) def test_001_handler_glob(self): handler = unittest.mock.Mock() self.dispatcher.add_handler('*', handler) self.dispatcher.handle('', 'some-event', arg1='value1') handler.assert_called_once_with(None, 'some-event', arg1='value1') handler.reset_mock() self.dispatcher.handle('test-vm', 'some-event', arg1='value1') handler.assert_called_once_with( self.app.domains['test-vm'], 'some-event', arg1='value1') handler.reset_mock() self.dispatcher.handle('', 'other-event', arg1='value1') handler.assert_called_once_with(None, 'other-event', arg1='value1') handler.reset_mock() self.dispatcher.remove_handler('*', handler) self.dispatcher.handle('', 'some-event', arg1='value1') self.assertFalse(handler.called) @asyncio.coroutine def mock_get_events_reader(self, stream, cleanup_func, expected_vm, vm=None): self.assertEqual(expected_vm, vm) return stream, cleanup_func @asyncio.coroutine def send_events(self, stream, events): for event in events: stream.feed_data(event) # don't use yield from... for x in asyncio.sleep(0.01): yield x stream.feed_eof() def test_010_listen_for_events(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) stream = asyncio.StreamReader() cleanup_func = unittest.mock.Mock() self.dispatcher._get_events_reader = \ lambda vm: self.mock_get_events_reader(stream, cleanup_func, None, vm) handler = unittest.mock.Mock() self.dispatcher.add_handler('some-event', handler) events = [ b'1\0\0some-event\0arg1\0value1\0\0', b'1\0some-vm\0some-event\0arg1\0value1\0\0', b'1\0some-vm\0some-event\0\0', b'1\0some-vm\0other-event\0\0', ] asyncio.ensure_future(self.send_events(stream, events)) loop.run_until_complete(self.dispatcher.listen_for_events( reconnect=False)) self.assertEqual(handler.mock_calls, [ unittest.mock.call(None, 'some-event', arg1='value1'), unittest.mock.call( self.app.domains['some-vm'], 'some-event', arg1='value1'), unittest.mock.call(self.app.domains['some-vm'], 'some-event'), ]) cleanup_func.assert_called_once_with() loop.close() def mock_open_unix_connection(self, expected_path, sock, path): self.assertEqual(expected_path, path) return asyncio.open_connection(sock=sock) def read_all(self, sock): buf = b'' for data in iter(lambda: sock.recv(4096), b''): buf += data return buf def test_020_get_events_reader_local(self): self.app.qubesd_connection_type = 'socket' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) sock1, sock2 = socket.socketpair() with unittest.mock.patch('asyncio.open_unix_connection', lambda path: self.mock_open_unix_connection( qubesmgmt.config.QUBESD_SOCKET, sock1, path)): task = asyncio.ensure_future(self.dispatcher._get_events_reader()) reader = asyncio.ensure_future(loop.run_in_executor(None, self.read_all, sock2)) loop.run_until_complete(asyncio.wait([task, reader])) self.assertEqual(reader.result(), b'dom0\0mgmt.Events\0dom0\0\0') self.assertIsInstance(task.result()[0], asyncio.StreamReader) cleanup_func = task.result()[1] cleanup_func() sock2.close() # run socket cleanup functions loop.stop() loop.run_forever() loop.close() def test_021_get_events_reader_local_vm(self): self.app.qubesd_connection_type = 'socket' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) sock1, sock2 = socket.socketpair() vm = unittest.mock.Mock() vm.name = 'test-vm' with unittest.mock.patch('asyncio.open_unix_connection', lambda path: self.mock_open_unix_connection( qubesmgmt.config.QUBESD_SOCKET, sock1, path)): task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm)) reader = asyncio.ensure_future(loop.run_in_executor(None, self.read_all, sock2)) loop.run_until_complete(asyncio.wait([task, reader])) self.assertEqual(reader.result(), b'dom0\0mgmt.Events\0test-vm\0\0') self.assertIsInstance(task.result()[0], asyncio.StreamReader) cleanup_func = task.result()[1] cleanup_func() sock2.close() # run socket cleanup functions loop.stop() loop.run_forever() loop.close() @asyncio.coroutine def mock_coroutine(self, mock, *args, **kwargs): return mock(*args, **kwargs) def test_022_get_events_reader_remote(self): self.app.qubesd_connection_type = 'qrexec' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) mock_proc = unittest.mock.Mock() with unittest.mock.patch('asyncio.create_subprocess_exec', lambda *args, **kwargs: self.mock_coroutine(mock_proc, *args, **kwargs)): task = asyncio.ensure_future(self.dispatcher._get_events_reader()) loop.run_until_complete(task) self.assertEqual(mock_proc.mock_calls, [ unittest.mock.call(['qrexec-client-vm', 'dom0', 'mgmt.Events'], stdin=subprocess.PIPE, stdout=subprocess.PIPE), unittest.mock.call().stdin.write_eof() ]) self.assertEqual(task.result()[0], mock_proc().stdout) cleanup_func = task.result()[1] cleanup_func() unittest.mock.call().kill.assert_called_once_with() loop.close() def test_023_get_events_reader_remote_vm(self): self.app.qubesd_connection_type = 'qrexec' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) mock_proc = unittest.mock.Mock() vm = unittest.mock.Mock() vm.name = 'test-vm' with unittest.mock.patch('asyncio.create_subprocess_exec', lambda *args, **kwargs: self.mock_coroutine(mock_proc, *args, **kwargs)): task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm)) loop.run_until_complete(task) self.assertEqual(mock_proc.mock_calls, [ unittest.mock.call(['qrexec-client-vm', 'test-vm', 'mgmt.Events'], stdin=subprocess.PIPE, stdout=subprocess.PIPE), unittest.mock.call().stdin.write_eof() ]) self.assertEqual(task.result()[0], mock_proc().stdout) cleanup_func = task.result()[1] cleanup_func() unittest.mock.call().kill.assert_called_once_with() loop.close()