core-admin-client/qubesadmin/tests/events.py
Marek Marczykowski-Górecki ce7d4865b6
events: fix calling mgmt.Events from VM
asyncio.create_subprocess_exec expects program and arguments directly,
not as a list.
2017-06-25 14:01:14 +02:00

238 lines
9.5 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import socket
import subprocess
import qubesadmin.tests
import unittest
try:
# qubesadmin.events require python3, so this tests can also use python3 features
import asyncio
import unittest.mock
import qubesadmin.events
except ImportError:
# don't run any tests on python2
def load_tests(loader, tests, pattern):
return unittest.TestSuite()
# don't fail on coroutine decorator
class asyncio(object):
@staticmethod
def coroutine(f):
return f
class TC_00_Events(qubesadmin.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = unittest.mock.MagicMock()
self.dispatcher = qubesadmin.events.EventsDispatcher(self.app)
def test_000_handler_specific(self):
handler = unittest.mock.Mock()
self.dispatcher.add_handler('some-event', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
handler.assert_called_once_with(None, 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
handler.assert_called_once_with(
self.app.domains['test-vm'], 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('', 'other-event', arg1='value1')
self.assertFalse(handler.called)
self.dispatcher.remove_handler('some-event', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
self.assertFalse(handler.called)
def test_001_handler_glob(self):
handler = unittest.mock.Mock()
self.dispatcher.add_handler('*', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
handler.assert_called_once_with(None, 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('test-vm', 'some-event', arg1='value1')
handler.assert_called_once_with(
self.app.domains['test-vm'], 'some-event', arg1='value1')
handler.reset_mock()
self.dispatcher.handle('', 'other-event', arg1='value1')
handler.assert_called_once_with(None, 'other-event', arg1='value1')
handler.reset_mock()
self.dispatcher.remove_handler('*', handler)
self.dispatcher.handle('', 'some-event', arg1='value1')
self.assertFalse(handler.called)
@asyncio.coroutine
def mock_get_events_reader(self, stream, cleanup_func, expected_vm,
vm=None):
self.assertEqual(expected_vm, vm)
return stream, cleanup_func
@asyncio.coroutine
def send_events(self, stream, events):
for event in events:
stream.feed_data(event)
# don't use yield from...
for x in asyncio.sleep(0.01):
yield x
stream.feed_eof()
def test_010_listen_for_events(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stream = asyncio.StreamReader()
cleanup_func = unittest.mock.Mock()
self.dispatcher._get_events_reader = \
lambda vm: self.mock_get_events_reader(stream, cleanup_func,
None, vm)
handler = unittest.mock.Mock()
self.dispatcher.add_handler('some-event', handler)
events = [
b'1\0\0some-event\0arg1\0value1\0\0',
b'1\0some-vm\0some-event\0arg1\0value1\0\0',
b'1\0some-vm\0some-event\0arg_without_value\0\0arg2\0value\0\0',
b'1\0some-vm\0other-event\0\0',
]
asyncio.ensure_future(self.send_events(stream, events))
loop.run_until_complete(self.dispatcher.listen_for_events(
reconnect=False))
self.assertEqual(handler.mock_calls, [
unittest.mock.call(None, 'some-event', arg1='value1'),
unittest.mock.call(
self.app.domains['some-vm'], 'some-event', arg1='value1'),
unittest.mock.call(
self.app.domains['some-vm'], 'some-event',
arg_without_value='', arg2='value'),
])
cleanup_func.assert_called_once_with()
loop.close()
def mock_open_unix_connection(self, expected_path, sock, path):
self.assertEqual(expected_path, path)
return asyncio.open_connection(sock=sock)
def read_all(self, sock):
buf = b''
for data in iter(lambda: sock.recv(4096), b''):
buf += data
return buf
def test_020_get_events_reader_local(self):
self.app.qubesd_connection_type = 'socket'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
sock1, sock2 = socket.socketpair()
with unittest.mock.patch('asyncio.open_unix_connection',
lambda path: self.mock_open_unix_connection(
qubesadmin.config.QUBESD_SOCKET, sock1, path)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
reader = asyncio.ensure_future(loop.run_in_executor(None,
self.read_all, sock2))
loop.run_until_complete(asyncio.wait([task, reader]))
self.assertEqual(reader.result(),
b'dom0\0admin.Events\0dom0\0\0')
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
cleanup_func = task.result()[1]
cleanup_func()
sock2.close()
# run socket cleanup functions
loop.stop()
loop.run_forever()
loop.close()
def test_021_get_events_reader_local_vm(self):
self.app.qubesd_connection_type = 'socket'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
sock1, sock2 = socket.socketpair()
vm = unittest.mock.Mock()
vm.name = 'test-vm'
with unittest.mock.patch('asyncio.open_unix_connection',
lambda path: self.mock_open_unix_connection(
qubesadmin.config.QUBESD_SOCKET, sock1, path)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
reader = asyncio.ensure_future(loop.run_in_executor(None,
self.read_all, sock2))
loop.run_until_complete(asyncio.wait([task, reader]))
self.assertEqual(reader.result(),
b'dom0\0admin.Events\0test-vm\0\0')
self.assertIsInstance(task.result()[0], asyncio.StreamReader)
cleanup_func = task.result()[1]
cleanup_func()
sock2.close()
# run socket cleanup functions
loop.stop()
loop.run_forever()
loop.close()
@asyncio.coroutine
def mock_coroutine(self, mock, *args, **kwargs):
return mock(*args, **kwargs)
def test_022_get_events_reader_remote(self):
self.app.qubesd_connection_type = 'qrexec'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_proc = unittest.mock.Mock()
with unittest.mock.patch('asyncio.create_subprocess_exec',
lambda *args, **kwargs: self.mock_coroutine(mock_proc,
*args, **kwargs)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader())
loop.run_until_complete(task)
self.assertEqual(mock_proc.mock_calls, [
unittest.mock.call('qrexec-client-vm', 'dom0',
'admin.Events', stdin=subprocess.PIPE,
stdout=subprocess.PIPE),
unittest.mock.call().stdin.write_eof()
])
self.assertEqual(task.result()[0], mock_proc().stdout)
cleanup_func = task.result()[1]
cleanup_func()
unittest.mock.call().kill.assert_called_once_with()
loop.close()
def test_023_get_events_reader_remote_vm(self):
self.app.qubesd_connection_type = 'qrexec'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_proc = unittest.mock.Mock()
vm = unittest.mock.Mock()
vm.name = 'test-vm'
with unittest.mock.patch('asyncio.create_subprocess_exec',
lambda *args, **kwargs: self.mock_coroutine(mock_proc,
*args, **kwargs)):
task = asyncio.ensure_future(self.dispatcher._get_events_reader(vm))
loop.run_until_complete(task)
self.assertEqual(mock_proc.mock_calls, [
unittest.mock.call('qrexec-client-vm', 'test-vm',
'admin.Events', stdin=subprocess.PIPE,
stdout=subprocess.PIPE),
unittest.mock.call().stdin.write_eof()
])
self.assertEqual(task.result()[0], mock_proc().stdout)
cleanup_func = task.result()[1]
cleanup_func()
unittest.mock.call().kill.assert_called_once_with()
loop.close()