e40875ee5f
Since qubesmgmt.events require python3, those tests can use python3 features. But it should be still importable on python2 (so, no 'yield from', not type hits etc).
235 lines
9.3 KiB
Python
235 lines
9.3 KiB
Python
# -*- encoding: utf8 -*-
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2017 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation; either version 2.1 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License along
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
import socket
|
|
import subprocess
|
|
import qubesmgmt.tests
|
|
import unittest
|
|
try:
|
|
# qubesmgmt.events require python3, so this tests can also use python3 features
|
|
import asyncio
|
|
import unittest.mock
|
|
import qubesmgmt.events
|
|
except ImportError:
|
|
# don't run any tests on python2
|
|
def load_tests(loader, tests, pattern):
|
|
return unittest.TestSuite()
|
|
# don't fail on coroutine decorator
|
|
class asyncio(object):
|
|
@staticmethod
|
|
def coroutine(f):
|
|
return f
|
|
|
|
|
|
class TC_00_Events(qubesmgmt.tests.QubesTestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.app = unittest.mock.MagicMock()
|
|
self.dispatcher = qubesmgmt.events.EventsDispatcher(self.app)
|
|
|
|
def test_000_handler_specific(self):
|
|
handler = unittest.mock.Mock()
|
|
self.dispatcher.add_handler('some-event', handler)
|
|
self.dispatcher.handle('', 'some-event', arg1='value1')
|
|
handler.assert_called_once_with(None, 'some-event', arg1='value1')
|
|
handler.reset_mock()
|
|
self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
|
|
handler.assert_called_once_with(
|
|
self.app.domains['test-vm'], 'some-event', arg1='value1')
|
|
handler.reset_mock()
|
|
self.dispatcher.handle('', 'other-event', arg1='value1')
|
|
self.assertFalse(handler.called)
|
|
self.dispatcher.remove_handler('some-event', handler)
|
|
self.dispatcher.handle('', 'some-event', arg1='value1')
|
|
self.assertFalse(handler.called)
|
|
|
|
def test_001_handler_glob(self):
|
|
handler = unittest.mock.Mock()
|
|
self.dispatcher.add_handler('*', handler)
|
|
self.dispatcher.handle('', 'some-event', arg1='value1')
|
|
handler.assert_called_once_with(None, 'some-event', arg1='value1')
|
|
handler.reset_mock()
|
|
self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
|
|
handler.assert_called_once_with(
|
|
self.app.domains['test-vm'], 'some-event', arg1='value1')
|
|
handler.reset_mock()
|
|
self.dispatcher.handle('', 'other-event', arg1='value1')
|
|
handler.assert_called_once_with(None, 'other-event', arg1='value1')
|
|
handler.reset_mock()
|
|
self.dispatcher.remove_handler('*', handler)
|
|
self.dispatcher.handle('', 'some-event', arg1='value1')
|
|
self.assertFalse(handler.called)
|
|
|
|
@asyncio.coroutine
|
|
def mock_get_events_reader(self, stream, cleanup_func, expected_vm,
|
|
vm=None):
|
|
self.assertEqual(expected_vm, vm)
|
|
return stream, cleanup_func
|
|
|
|
@asyncio.coroutine
|
|
def send_events(self, stream, events):
|
|
for event in events:
|
|
stream.feed_data(event)
|
|
# don't use yield from...
|
|
for x in asyncio.sleep(0.01):
|
|
yield x
|
|
stream.feed_eof()
|
|
|
|
def test_010_listen_for_events(self):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
stream = asyncio.StreamReader()
|
|
cleanup_func = unittest.mock.Mock()
|
|
self.dispatcher._get_events_reader = \
|
|
lambda vm: self.mock_get_events_reader(stream, cleanup_func,
|
|
None, vm)
|
|
handler = unittest.mock.Mock()
|
|
self.dispatcher.add_handler('some-event', handler)
|
|
events = [
|
|
b'1\0\0some-event\0arg1\0value1\0\0',
|
|
b'1\0some-vm\0some-event\0arg1\0value1\0\0',
|
|
b'1\0some-vm\0some-event\0\0',
|
|
b'1\0some-vm\0other-event\0\0',
|
|
]
|
|
asyncio.ensure_future(self.send_events(stream, events))
|
|
loop.run_until_complete(self.dispatcher.listen_for_events())
|
|
self.assertEqual(handler.mock_calls, [
|
|
unittest.mock.call(None, 'some-event', arg1='value1'),
|
|
unittest.mock.call(
|
|
self.app.domains['some-vm'], 'some-event', arg1='value1'),
|
|
unittest.mock.call(self.app.domains['some-vm'], 'some-event'),
|
|
])
|
|
cleanup_func.assert_called_once_with()
|
|
loop.close()
|
|
|
|
def mock_open_unix_connection(self, expected_path, sock, path):
|
|
self.assertEqual(expected_path, path)
|
|
return asyncio.open_connection(sock=sock)
|
|
|
|
def read_all(self, sock):
|
|
buf = b''
|
|
for data in iter(lambda: sock.recv(4096), b''):
|
|
buf += data
|
|
return buf
|
|
|
|
def test_020_get_events_reader_local(self):
|
|
self.app.qubesd_connection_type = 'socket'
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
sock1, sock2 = socket.socketpair()
|
|
with unittest.mock.patch('asyncio.open_unix_connection',
|
|
lambda path: self.mock_open_unix_connection(
|
|
qubesmgmt.config.QUBESD_SOCKET, sock1, path)):
|
|
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
|
|
reader = asyncio.ensure_future(loop.run_in_executor(None,
|
|
self.read_all, sock2))
|
|
loop.run_until_complete(asyncio.wait([task, reader]))
|
|
self.assertEqual(reader.result(),
|
|
b'dom0\0mgmt.Events\0dom0\0\0')
|
|
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
|
|
cleanup_func = task.result()[1]
|
|
cleanup_func()
|
|
sock2.close()
|
|
|
|
# run socket cleanup functions
|
|
loop.stop()
|
|
loop.run_forever()
|
|
loop.close()
|
|
|
|
def test_021_get_events_reader_local_vm(self):
|
|
self.app.qubesd_connection_type = 'socket'
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
sock1, sock2 = socket.socketpair()
|
|
vm = unittest.mock.Mock()
|
|
vm.name = 'test-vm'
|
|
with unittest.mock.patch('asyncio.open_unix_connection',
|
|
lambda path: self.mock_open_unix_connection(
|
|
qubesmgmt.config.QUBESD_SOCKET, sock1, path)):
|
|
task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
|
|
reader = asyncio.ensure_future(loop.run_in_executor(None,
|
|
self.read_all, sock2))
|
|
loop.run_until_complete(asyncio.wait([task, reader]))
|
|
self.assertEqual(reader.result(),
|
|
b'dom0\0mgmt.Events\0test-vm\0\0')
|
|
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
|
|
cleanup_func = task.result()[1]
|
|
cleanup_func()
|
|
sock2.close()
|
|
|
|
# run socket cleanup functions
|
|
loop.stop()
|
|
loop.run_forever()
|
|
loop.close()
|
|
|
|
@asyncio.coroutine
|
|
def mock_coroutine(self, mock, *args, **kwargs):
|
|
return mock(*args, **kwargs)
|
|
|
|
def test_022_get_events_reader_remote(self):
|
|
self.app.qubesd_connection_type = 'qrexec'
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
mock_proc = unittest.mock.Mock()
|
|
with unittest.mock.patch('asyncio.create_subprocess_exec',
|
|
lambda *args, **kwargs: self.mock_coroutine(mock_proc,
|
|
*args, **kwargs)):
|
|
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
|
|
loop.run_until_complete(task)
|
|
self.assertEqual(mock_proc.mock_calls, [
|
|
unittest.mock.call(['qrexec-client-vm', 'dom0',
|
|
'mgmt.Events'], stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE),
|
|
unittest.mock.call().stdin.write_eof()
|
|
])
|
|
self.assertEqual(task.result()[0], mock_proc().stdout)
|
|
cleanup_func = task.result()[1]
|
|
cleanup_func()
|
|
unittest.mock.call().kill.assert_called_once_with()
|
|
|
|
loop.close()
|
|
|
|
def test_023_get_events_reader_remote_vm(self):
|
|
self.app.qubesd_connection_type = 'qrexec'
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
mock_proc = unittest.mock.Mock()
|
|
vm = unittest.mock.Mock()
|
|
vm.name = 'test-vm'
|
|
with unittest.mock.patch('asyncio.create_subprocess_exec',
|
|
lambda *args, **kwargs: self.mock_coroutine(mock_proc,
|
|
*args, **kwargs)):
|
|
task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
|
|
loop.run_until_complete(task)
|
|
self.assertEqual(mock_proc.mock_calls, [
|
|
unittest.mock.call(['qrexec-client-vm', 'test-vm',
|
|
'mgmt.Events'], stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE),
|
|
unittest.mock.call().stdin.write_eof()
|
|
])
|
|
self.assertEqual(task.result()[0], mock_proc().stdout)
|
|
cleanup_func = task.result()[1]
|
|
cleanup_func()
|
|
unittest.mock.call().kill.assert_called_once_with()
|
|
|
|
loop.close()
|