diff --git a/qubes/config.py b/qubes/config.py index d4a15117..b3c979d1 100644 --- a/qubes/config.py +++ b/qubes/config.py @@ -33,6 +33,7 @@ system_path = { 'qubes_guid_path': '/usr/bin/qubes-guid', 'qrexec_daemon_path': '/usr/sbin/qrexec-daemon', 'qrexec_client_path': '/usr/bin/qrexec-client', + 'qrexec_rpc_multiplexer': '/usr/lib/qubes/qubes-rpc-multiplexer', 'qubesdb_daemon_path': '/usr/sbin/qubesdb-daemon', # Relative to qubes_base_dir diff --git a/qubes/tests/vm/adminvm.py b/qubes/tests/vm/adminvm.py index b847a707..92643637 100644 --- a/qubes/tests/vm/adminvm.py +++ b/qubes/tests/vm/adminvm.py @@ -17,10 +17,14 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, see . # - +import subprocess import unittest import unittest.mock +import functools + +import asyncio + import qubes import qubes.exc import qubes.vm @@ -44,6 +48,24 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase): raise self.skipTest('setup failed') + def tearDown(self) -> None: + self.app.domains.clear() + + def add_vm(self, name, cls=qubes.vm.qubesvm.QubesVM, **kwargs): + vm = cls(self.app, None, + qid=kwargs.pop('qid', 1), name=qubes.tests.VMPREFIX + name, + **kwargs) + self.app.domains[vm.qid] = vm + self.app.domains[vm.uuid] = vm + self.app.domains[vm.name] = vm + self.app.domains[vm] = vm + self.addCleanup(vm.close) + return vm + + @asyncio.coroutine + def coroutine_mock(self, mock, *args, **kwargs): + return mock(*args, **kwargs) + def cleanup_adminvm(self): self.vm.close() del self.vm @@ -82,3 +104,87 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase): def test_311_suspend(self): with self.assertRaises(qubes.exc.QubesException): self.vm.suspend() + + @unittest.mock.patch('asyncio.create_subprocess_exec') + def test_700_run_service(self, mock_subprocess): + func_mock = unittest.mock.Mock() + mock_subprocess.side_effect = functools.partial( + self.coroutine_mock, func_mock) + + self.add_vm('vm') + + with self.subTest('running'): + self.loop.run_until_complete(self.vm.run_service('test.service')) + func_mock.assert_called_once_with( + '/usr/lib/qubes/qubes-rpc-multiplexer', + 'test.service', 'dom0', 'name', 'dom0') + + func_mock.reset_mock() + with self.subTest('other_user'): + self.loop.run_until_complete( + self.vm.run_service('test.service', user='other')) + func_mock.assert_called_once_with( + 'runuser', '-u', 'other', '--', + '/usr/lib/qubes/qubes-rpc-multiplexer', + 'test.service', 'dom0', 'name', 'dom0') + + func_mock.reset_mock() + with self.subTest('other_source'): + self.loop.run_until_complete( + self.vm.run_service('test.service', source='test-inst-vm')) + func_mock.assert_called_once_with( + '/usr/lib/qubes/qubes-rpc-multiplexer', + 'test.service', 'test-inst-vm', 'name', 'dom0') + + @unittest.mock.patch('qubes.vm.adminvm.AdminVM.run_service') + def test_710_run_service_for_stdio(self, mock_run_service): + + func_mock = unittest.mock.Mock() + mock_run_service.side_effect = functools.partial( + self.coroutine_mock, func_mock) + communicate_mock = unittest.mock.Mock() + func_mock.return_value.communicate.side_effect = functools.partial( + self.coroutine_mock, communicate_mock) + communicate_mock.return_value = (b'stdout', b'stderr') + func_mock.return_value.returncode = 0 + + with self.subTest('default'): + value = self.loop.run_until_complete( + self.vm.run_service_for_stdio('test.service')) + func_mock.assert_called_once_with( + 'test.service', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + communicate_mock.assert_called_once_with(input=None) + self.assertEqual(value, (b'stdout', b'stderr')) + + func_mock.reset_mock() + communicate_mock.reset_mock() + with self.subTest('with_input'): + value = self.loop.run_until_complete( + self.vm.run_service_for_stdio('test.service', input=b'abc')) + func_mock.assert_called_once_with( + 'test.service', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + communicate_mock.assert_called_once_with(input=b'abc') + self.assertEqual(value, (b'stdout', b'stderr')) + + func_mock.reset_mock() + communicate_mock.reset_mock() + with self.subTest('error'): + func_mock.return_value.returncode = 1 + with self.assertRaises(subprocess.CalledProcessError) as exc: + self.loop.run_until_complete( + self.vm.run_service_for_stdio('test.service')) + func_mock.assert_called_once_with( + 'test.service', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + communicate_mock.assert_called_once_with(input=None) + self.assertEqual(exc.exception.returncode, 1) + self.assertEqual(exc.exception.output, b'stdout') + self.assertEqual(exc.exception.stderr, b'stderr') diff --git a/qubes/tests/vm/qubesvm.py b/qubes/tests/vm/qubesvm.py index ba7767ed..f5550a48 100644 --- a/qubes/tests/vm/qubesvm.py +++ b/qubes/tests/vm/qubesvm.py @@ -21,11 +21,16 @@ # import base64 import os +import subprocess import tempfile import unittest import uuid import datetime + +import asyncio + +import functools import lxml.etree import unittest.mock @@ -1463,3 +1468,135 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase): lambda _: True): netvm.create_qdb_entries() self.assertEqual(test_qubesdb.data, expected) + + @asyncio.coroutine + def coroutine_mock(self, mock, *args, **kwargs): + return mock(*args, **kwargs) + + @unittest.mock.patch('asyncio.create_subprocess_exec') + def test_700_run_service(self, mock_subprocess): + func_mock = unittest.mock.Mock() + mock_subprocess.side_effect = functools.partial( + self.coroutine_mock, func_mock) + + start_mock = unittest.mock.Mock() + + vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM, + name='vm') + vm.is_running = lambda: True + vm.is_qrexec_running = lambda: True + vm.start = functools.partial(self.coroutine_mock, start_mock) + with self.subTest('running'): + self.loop.run_until_complete(vm.run_service('test.service')) + func_mock.assert_called_once_with( + '/usr/bin/qrexec-client', '-d', 'test-inst-vm', + 'user:QUBESRPC test.service dom0') + self.assertFalse(start_mock.called) + + func_mock.reset_mock() + start_mock.reset_mock() + with self.subTest('not_running'): + vm.is_running = lambda: False + with self.assertRaises(qubes.exc.QubesVMNotRunningError): + self.loop.run_until_complete(vm.run_service('test.service')) + self.assertFalse(func_mock.called) + + func_mock.reset_mock() + start_mock.reset_mock() + with self.subTest('autostart'): + vm.is_running = lambda: False + self.loop.run_until_complete(vm.run_service( + 'test.service', autostart=True)) + func_mock.assert_called_once_with( + '/usr/bin/qrexec-client', '-d', 'test-inst-vm', + 'user:QUBESRPC test.service dom0') + self.assertTrue(start_mock.called) + + func_mock.reset_mock() + start_mock.reset_mock() + with self.subTest('no_qrexec'): + vm.is_running = lambda: True + vm.is_qrexec_running = lambda: False + with self.assertRaises(qubes.exc.QubesVMError): + self.loop.run_until_complete(vm.run_service('test.service')) + self.assertFalse(start_mock.called) + self.assertFalse(func_mock.called) + + func_mock.reset_mock() + start_mock.reset_mock() + with self.subTest('other_user'): + vm.is_running = lambda: True + vm.is_qrexec_running = lambda: True + self.loop.run_until_complete(vm.run_service('test.service', + user='other')) + func_mock.assert_called_once_with( + '/usr/bin/qrexec-client', '-d', 'test-inst-vm', + 'other:QUBESRPC test.service dom0') + self.assertFalse(start_mock.called) + + func_mock.reset_mock() + start_mock.reset_mock() + with self.subTest('other_source'): + vm.is_running = lambda: True + vm.is_qrexec_running = lambda: True + self.loop.run_until_complete(vm.run_service('test.service', + source='test-inst-vm')) + func_mock.assert_called_once_with( + '/usr/bin/qrexec-client', '-d', 'test-inst-vm', + 'user:QUBESRPC test.service test-inst-vm') + self.assertFalse(start_mock.called) + + @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.run_service') + def test_710_run_service_for_stdio(self, mock_run_service): + vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM, + name='vm') + + func_mock = unittest.mock.Mock() + mock_run_service.side_effect = functools.partial( + self.coroutine_mock, func_mock) + communicate_mock = unittest.mock.Mock() + func_mock.return_value.communicate.side_effect = functools.partial( + self.coroutine_mock, communicate_mock) + communicate_mock.return_value = (b'stdout', b'stderr') + func_mock.return_value.returncode = 0 + + with self.subTest('default'): + value = self.loop.run_until_complete( + vm.run_service_for_stdio('test.service')) + func_mock.assert_called_once_with( + 'test.service', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + communicate_mock.assert_called_once_with(input=None) + self.assertEqual(value, (b'stdout', b'stderr')) + + func_mock.reset_mock() + communicate_mock.reset_mock() + with self.subTest('with_input'): + value = self.loop.run_until_complete( + vm.run_service_for_stdio('test.service', input=b'abc')) + func_mock.assert_called_once_with( + 'test.service', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + communicate_mock.assert_called_once_with(input=b'abc') + self.assertEqual(value, (b'stdout', b'stderr')) + + func_mock.reset_mock() + communicate_mock.reset_mock() + with self.subTest('error'): + func_mock.return_value.returncode = 1 + with self.assertRaises(subprocess.CalledProcessError) as exc: + self.loop.run_until_complete( + vm.run_service_for_stdio('test.service')) + func_mock.assert_called_once_with( + 'test.service', + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + communicate_mock.assert_called_once_with(input=None) + self.assertEqual(exc.exception.returncode, 1) + self.assertEqual(exc.exception.output, b'stdout') + self.assertEqual(exc.exception.stderr, b'stderr') diff --git a/qubes/vm/adminvm.py b/qubes/vm/adminvm.py index a7c949c5..46871b7b 100644 --- a/qubes/vm/adminvm.py +++ b/qubes/vm/adminvm.py @@ -21,7 +21,8 @@ # ''' This module contains the AdminVM implementation ''' - +import asyncio +import subprocess import libvirt import qubes @@ -212,6 +213,81 @@ class AdminVM(qubes.vm.BaseVM): self._qdb_connection = qubesdb.QubesDB(self.name) return self._qdb_connection + @asyncio.coroutine + def run_service(self, service, source=None, user=None, + filter_esc=False, autostart=False, gui=False, **kwargs): + '''Run service on this VM + + :param str service: service name + :param qubes.vm.qubesvm.QubesVM source: source domain as presented to + this VM + :param str user: username to run service as + :param bool filter_esc: filter escape sequences to protect terminal \ + emulator + :param bool autostart: if :py:obj:`True`, machine will be started if \ + it is not running + :param bool gui: when autostarting, also start gui daemon + :rtype: asyncio.subprocess.Process + + .. note:: + User ``root`` is redefined to ``SYSTEM`` in the Windows agent code + ''' + # pylint: disable=unused-argument + + source = 'dom0' if source is None else self.app.domains[source].name + + if filter_esc: + raise NotImplementedError( + 'filter_esc=True not supported on calls to dom0') + + if user is None: + user = 'root' + + yield from self.fire_event_async('domain-cmd-pre-run', pre_event=True, + start_guid=gui) + + if user != 'root': + cmd = ['runuser', '-u', user, '--'] + else: + cmd = [] + cmd.extend([ + qubes.config.system_path['qrexec_rpc_multiplexer'], + service, + source, + 'name', + self.name, + ]) + return (yield from asyncio.create_subprocess_exec( + *cmd, + **kwargs)) + + @asyncio.coroutine + def run_service_for_stdio(self, *args, input=None, **kwargs): + '''Run a service, pass an optional input and return (stdout, stderr). + + Raises an exception if return code != 0. + + *args* and *kwargs* are passed verbatim to :py:meth:`run_service`. + + .. warning:: + There are some combinations if stdio-related *kwargs*, which are + not filtered for problems originating between the keyboard and the + chair. + ''' # pylint: disable=redefined-builtin + + kwargs.setdefault('stdin', subprocess.PIPE) + kwargs.setdefault('stdout', subprocess.PIPE) + kwargs.setdefault('stderr', subprocess.PIPE) + p = yield from self.run_service(*args, **kwargs) + + # this one is actually a tuple, but there is no need to unpack it + stdouterr = yield from p.communicate(input=input) + + if p.returncode: + raise subprocess.CalledProcessError(p.returncode, + args[0], *stdouterr) + + return stdouterr # def __init__(self, **kwargs): # super(QubesAdminVm, self).__init__(qid=0, name="dom0",