Merge remote-tracking branch 'origin/pr/257'

* origin/pr/257:
  tests: AdminVM.run_service*()
  tests: QubesVM.run_service function
  vm/adminvm: add run_service* functions
This commit is contained in:
Marek Marczykowski-Górecki 2019-06-23 03:43:35 +02:00
commit fa0ae0c921
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 322 additions and 2 deletions

View File

@ -33,6 +33,7 @@ system_path = {
'qubes_guid_path': '/usr/bin/qubes-guid',
'qrexec_daemon_path': '/usr/sbin/qrexec-daemon',
'qrexec_client_path': '/usr/bin/qrexec-client',
'qrexec_rpc_multiplexer': '/usr/lib/qubes/qubes-rpc-multiplexer',
'qubesdb_daemon_path': '/usr/sbin/qubesdb-daemon',
# Relative to qubes_base_dir

View File

@ -17,10 +17,14 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
#
import subprocess
import unittest
import unittest.mock
import functools
import asyncio
import qubes
import qubes.exc
import qubes.vm
@ -44,6 +48,24 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase):
raise
self.skipTest('setup failed')
def tearDown(self) -> None:
self.app.domains.clear()
def add_vm(self, name, cls=qubes.vm.qubesvm.QubesVM, **kwargs):
vm = cls(self.app, None,
qid=kwargs.pop('qid', 1), name=qubes.tests.VMPREFIX + name,
**kwargs)
self.app.domains[vm.qid] = vm
self.app.domains[vm.uuid] = vm
self.app.domains[vm.name] = vm
self.app.domains[vm] = vm
self.addCleanup(vm.close)
return vm
@asyncio.coroutine
def coroutine_mock(self, mock, *args, **kwargs):
return mock(*args, **kwargs)
def cleanup_adminvm(self):
self.vm.close()
del self.vm
@ -82,3 +104,87 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase):
def test_311_suspend(self):
with self.assertRaises(qubes.exc.QubesException):
self.vm.suspend()
@unittest.mock.patch('asyncio.create_subprocess_exec')
def test_700_run_service(self, mock_subprocess):
func_mock = unittest.mock.Mock()
mock_subprocess.side_effect = functools.partial(
self.coroutine_mock, func_mock)
self.add_vm('vm')
with self.subTest('running'):
self.loop.run_until_complete(self.vm.run_service('test.service'))
func_mock.assert_called_once_with(
'/usr/lib/qubes/qubes-rpc-multiplexer',
'test.service', 'dom0', 'name', 'dom0')
func_mock.reset_mock()
with self.subTest('other_user'):
self.loop.run_until_complete(
self.vm.run_service('test.service', user='other'))
func_mock.assert_called_once_with(
'runuser', '-u', 'other', '--',
'/usr/lib/qubes/qubes-rpc-multiplexer',
'test.service', 'dom0', 'name', 'dom0')
func_mock.reset_mock()
with self.subTest('other_source'):
self.loop.run_until_complete(
self.vm.run_service('test.service', source='test-inst-vm'))
func_mock.assert_called_once_with(
'/usr/lib/qubes/qubes-rpc-multiplexer',
'test.service', 'test-inst-vm', 'name', 'dom0')
@unittest.mock.patch('qubes.vm.adminvm.AdminVM.run_service')
def test_710_run_service_for_stdio(self, mock_run_service):
func_mock = unittest.mock.Mock()
mock_run_service.side_effect = functools.partial(
self.coroutine_mock, func_mock)
communicate_mock = unittest.mock.Mock()
func_mock.return_value.communicate.side_effect = functools.partial(
self.coroutine_mock, communicate_mock)
communicate_mock.return_value = (b'stdout', b'stderr')
func_mock.return_value.returncode = 0
with self.subTest('default'):
value = self.loop.run_until_complete(
self.vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
communicate_mock.reset_mock()
with self.subTest('with_input'):
value = self.loop.run_until_complete(
self.vm.run_service_for_stdio('test.service', input=b'abc'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=b'abc')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
communicate_mock.reset_mock()
with self.subTest('error'):
func_mock.return_value.returncode = 1
with self.assertRaises(subprocess.CalledProcessError) as exc:
self.loop.run_until_complete(
self.vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(exc.exception.returncode, 1)
self.assertEqual(exc.exception.output, b'stdout')
self.assertEqual(exc.exception.stderr, b'stderr')

View File

@ -21,11 +21,16 @@
#
import base64
import os
import subprocess
import tempfile
import unittest
import uuid
import datetime
import asyncio
import functools
import lxml.etree
import unittest.mock
@ -1463,3 +1468,135 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
lambda _: True):
netvm.create_qdb_entries()
self.assertEqual(test_qubesdb.data, expected)
@asyncio.coroutine
def coroutine_mock(self, mock, *args, **kwargs):
return mock(*args, **kwargs)
@unittest.mock.patch('asyncio.create_subprocess_exec')
def test_700_run_service(self, mock_subprocess):
func_mock = unittest.mock.Mock()
mock_subprocess.side_effect = functools.partial(
self.coroutine_mock, func_mock)
start_mock = unittest.mock.Mock()
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
vm.start = functools.partial(self.coroutine_mock, start_mock)
with self.subTest('running'):
self.loop.run_until_complete(vm.run_service('test.service'))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service dom0')
self.assertFalse(start_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('not_running'):
vm.is_running = lambda: False
with self.assertRaises(qubes.exc.QubesVMNotRunningError):
self.loop.run_until_complete(vm.run_service('test.service'))
self.assertFalse(func_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('autostart'):
vm.is_running = lambda: False
self.loop.run_until_complete(vm.run_service(
'test.service', autostart=True))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service dom0')
self.assertTrue(start_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('no_qrexec'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: False
with self.assertRaises(qubes.exc.QubesVMError):
self.loop.run_until_complete(vm.run_service('test.service'))
self.assertFalse(start_mock.called)
self.assertFalse(func_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('other_user'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
self.loop.run_until_complete(vm.run_service('test.service',
user='other'))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'other:QUBESRPC test.service dom0')
self.assertFalse(start_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('other_source'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
self.loop.run_until_complete(vm.run_service('test.service',
source='test-inst-vm'))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service test-inst-vm')
self.assertFalse(start_mock.called)
@unittest.mock.patch('qubes.vm.qubesvm.QubesVM.run_service')
def test_710_run_service_for_stdio(self, mock_run_service):
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
func_mock = unittest.mock.Mock()
mock_run_service.side_effect = functools.partial(
self.coroutine_mock, func_mock)
communicate_mock = unittest.mock.Mock()
func_mock.return_value.communicate.side_effect = functools.partial(
self.coroutine_mock, communicate_mock)
communicate_mock.return_value = (b'stdout', b'stderr')
func_mock.return_value.returncode = 0
with self.subTest('default'):
value = self.loop.run_until_complete(
vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
communicate_mock.reset_mock()
with self.subTest('with_input'):
value = self.loop.run_until_complete(
vm.run_service_for_stdio('test.service', input=b'abc'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=b'abc')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
communicate_mock.reset_mock()
with self.subTest('error'):
func_mock.return_value.returncode = 1
with self.assertRaises(subprocess.CalledProcessError) as exc:
self.loop.run_until_complete(
vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(exc.exception.returncode, 1)
self.assertEqual(exc.exception.output, b'stdout')
self.assertEqual(exc.exception.stderr, b'stderr')

View File

@ -21,7 +21,8 @@
#
''' This module contains the AdminVM implementation '''
import asyncio
import subprocess
import libvirt
import qubes
@ -212,6 +213,81 @@ class AdminVM(qubes.vm.BaseVM):
self._qdb_connection = qubesdb.QubesDB(self.name)
return self._qdb_connection
@asyncio.coroutine
def run_service(self, service, source=None, user=None,
filter_esc=False, autostart=False, gui=False, **kwargs):
'''Run service on this VM
:param str service: service name
:param qubes.vm.qubesvm.QubesVM source: source domain as presented to
this VM
:param str user: username to run service as
:param bool filter_esc: filter escape sequences to protect terminal \
emulator
:param bool autostart: if :py:obj:`True`, machine will be started if \
it is not running
:param bool gui: when autostarting, also start gui daemon
:rtype: asyncio.subprocess.Process
.. note::
User ``root`` is redefined to ``SYSTEM`` in the Windows agent code
'''
# pylint: disable=unused-argument
source = 'dom0' if source is None else self.app.domains[source].name
if filter_esc:
raise NotImplementedError(
'filter_esc=True not supported on calls to dom0')
if user is None:
user = 'root'
yield from self.fire_event_async('domain-cmd-pre-run', pre_event=True,
start_guid=gui)
if user != 'root':
cmd = ['runuser', '-u', user, '--']
else:
cmd = []
cmd.extend([
qubes.config.system_path['qrexec_rpc_multiplexer'],
service,
source,
'name',
self.name,
])
return (yield from asyncio.create_subprocess_exec(
*cmd,
**kwargs))
@asyncio.coroutine
def run_service_for_stdio(self, *args, input=None, **kwargs):
'''Run a service, pass an optional input and return (stdout, stderr).
Raises an exception if return code != 0.
*args* and *kwargs* are passed verbatim to :py:meth:`run_service`.
.. warning::
There are some combinations if stdio-related *kwargs*, which are
not filtered for problems originating between the keyboard and the
chair.
''' # pylint: disable=redefined-builtin
kwargs.setdefault('stdin', subprocess.PIPE)
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('stderr', subprocess.PIPE)
p = yield from self.run_service(*args, **kwargs)
# this one is actually a tuple, but there is no need to unpack it
stdouterr = yield from p.communicate(input=input)
if p.returncode:
raise subprocess.CalledProcessError(p.returncode,
args[0], *stdouterr)
return stdouterr
# def __init__(self, **kwargs):
# super(QubesAdminVm, self).__init__(qid=0, name="dom0",