tests: events

Since qubesmgmt.events require python3, those tests can use python3
features. But it should be still importable on python2 (so, no 'yield
from', not type hits etc).
This commit is contained in:
Marek Marczykowski-Górecki 2017-04-14 22:02:43 +02:00
parent 5ed3d8d262
commit e40875ee5f
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

234
qubesmgmt/tests/events.py Normal file
View File

@ -0,0 +1,234 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import socket
import subprocess
import qubesmgmt.tests
import unittest
try:
# qubesmgmt.events require python3, so this tests can also use python3 features
import asyncio
import unittest.mock
import qubesmgmt.events
except ImportError:
# don't run any tests on python2
def load_tests(loader, tests, pattern):
return unittest.TestSuite()
# don't fail on coroutine decorator
class asyncio(object):
@staticmethod
def coroutine(f):
return f
class TC_00_Events(qubesmgmt.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = unittest.mock.MagicMock()
self.dispatcher = qubesmgmt.events.EventsDispatcher(self.app)
def test_000_handler_specific(self):
handler = unittest.mock.Mock()
self.dispatcher.add_handler('some-event', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
handler.assert_called_once_with(None, 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
handler.assert_called_once_with(
self.app.domains['test-vm'], 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('', 'other-event', arg1='value1')
self.assertFalse(handler.called)
self.dispatcher.remove_handler('some-event', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
self.assertFalse(handler.called)
def test_001_handler_glob(self):
handler = unittest.mock.Mock()
self.dispatcher.add_handler('*', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
handler.assert_called_once_with(None, 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
handler.assert_called_once_with(
self.app.domains['test-vm'], 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('', 'other-event', arg1='value1')
handler.assert_called_once_with(None, 'other-event', arg1='value1')
handler.reset_mock()
self.dispatcher.remove_handler('*', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
self.assertFalse(handler.called)
@asyncio.coroutine
def mock_get_events_reader(self, stream, cleanup_func, expected_vm,
vm=None):
self.assertEqual(expected_vm, vm)
return stream, cleanup_func
@asyncio.coroutine
def send_events(self, stream, events):
for event in events:
stream.feed_data(event)
# don't use yield from...
for x in asyncio.sleep(0.01):
yield x
stream.feed_eof()
def test_010_listen_for_events(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stream = asyncio.StreamReader()
cleanup_func = unittest.mock.Mock()
self.dispatcher._get_events_reader = \
lambda vm: self.mock_get_events_reader(stream, cleanup_func,
None, vm)
handler = unittest.mock.Mock()
self.dispatcher.add_handler('some-event', handler)
events = [
b'1\0\0some-event\0arg1\0value1\0\0',
b'1\0some-vm\0some-event\0arg1\0value1\0\0',
b'1\0some-vm\0some-event\0\0',
b'1\0some-vm\0other-event\0\0',
]
asyncio.ensure_future(self.send_events(stream, events))
loop.run_until_complete(self.dispatcher.listen_for_events())
self.assertEqual(handler.mock_calls, [
unittest.mock.call(None, 'some-event', arg1='value1'),
unittest.mock.call(
self.app.domains['some-vm'], 'some-event', arg1='value1'),
unittest.mock.call(self.app.domains['some-vm'], 'some-event'),
])
cleanup_func.assert_called_once_with()
loop.close()
def mock_open_unix_connection(self, expected_path, sock, path):
self.assertEqual(expected_path, path)
return asyncio.open_connection(sock=sock)
def read_all(self, sock):
buf = b''
for data in iter(lambda: sock.recv(4096), b''):
buf += data
return buf
def test_020_get_events_reader_local(self):
self.app.qubesd_connection_type = 'socket'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
sock1, sock2 = socket.socketpair()
with unittest.mock.patch('asyncio.open_unix_connection',
lambda path: self.mock_open_unix_connection(
qubesmgmt.config.QUBESD_SOCKET, sock1, path)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
reader = asyncio.ensure_future(loop.run_in_executor(None,
self.read_all, sock2))
loop.run_until_complete(asyncio.wait([task, reader]))
self.assertEqual(reader.result(),
b'dom0\0mgmt.Events\0dom0\0\0')
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
cleanup_func = task.result()[1]
cleanup_func()
sock2.close()
# run socket cleanup functions
loop.stop()
loop.run_forever()
loop.close()
def test_021_get_events_reader_local_vm(self):
self.app.qubesd_connection_type = 'socket'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
sock1, sock2 = socket.socketpair()
vm = unittest.mock.Mock()
vm.name = 'test-vm'
with unittest.mock.patch('asyncio.open_unix_connection',
lambda path: self.mock_open_unix_connection(
qubesmgmt.config.QUBESD_SOCKET, sock1, path)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
reader = asyncio.ensure_future(loop.run_in_executor(None,
self.read_all, sock2))
loop.run_until_complete(asyncio.wait([task, reader]))
self.assertEqual(reader.result(),
b'dom0\0mgmt.Events\0test-vm\0\0')
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
cleanup_func = task.result()[1]
cleanup_func()
sock2.close()
# run socket cleanup functions
loop.stop()
loop.run_forever()
loop.close()
@asyncio.coroutine
def mock_coroutine(self, mock, *args, **kwargs):
return mock(*args, **kwargs)
def test_022_get_events_reader_remote(self):
self.app.qubesd_connection_type = 'qrexec'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_proc = unittest.mock.Mock()
with unittest.mock.patch('asyncio.create_subprocess_exec',
lambda *args, **kwargs: self.mock_coroutine(mock_proc,
*args, **kwargs)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
loop.run_until_complete(task)
self.assertEqual(mock_proc.mock_calls, [
unittest.mock.call(['qrexec-client-vm', 'dom0',
'mgmt.Events'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE),
unittest.mock.call().stdin.write_eof()
])
self.assertEqual(task.result()[0], mock_proc().stdout)
cleanup_func = task.result()[1]
cleanup_func()
unittest.mock.call().kill.assert_called_once_with()
loop.close()
def test_023_get_events_reader_remote_vm(self):
self.app.qubesd_connection_type = 'qrexec'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_proc = unittest.mock.Mock()
vm = unittest.mock.Mock()
vm.name = 'test-vm'
with unittest.mock.patch('asyncio.create_subprocess_exec',
lambda *args, **kwargs: self.mock_coroutine(mock_proc,
*args, **kwargs)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
loop.run_until_complete(task)
self.assertEqual(mock_proc.mock_calls, [
unittest.mock.call(['qrexec-client-vm', 'test-vm',
'mgmt.Events'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE),
unittest.mock.call().stdin.write_eof()
])
self.assertEqual(task.result()[0], mock_proc().stdout)
cleanup_func = task.result()[1]
cleanup_func()
unittest.mock.call().kill.assert_called_once_with()
loop.close()