tests: QubesVM.run_service function

This commit is contained in:
Marek Marczykowski-Górecki 2019-06-09 23:03:15 +02:00
parent 50adc60882
commit e352fc25d7
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -21,11 +21,16 @@
# #
import base64 import base64
import os import os
import subprocess
import tempfile import tempfile
import unittest import unittest
import uuid import uuid
import datetime import datetime
import asyncio
import functools
import lxml.etree import lxml.etree
import unittest.mock import unittest.mock
@ -1463,3 +1468,135 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
lambda _: True): lambda _: True):
netvm.create_qdb_entries() netvm.create_qdb_entries()
self.assertEqual(test_qubesdb.data, expected) self.assertEqual(test_qubesdb.data, expected)
@asyncio.coroutine
def coroutine_mock(self, mock, *args, **kwargs):
return mock(*args, **kwargs)
@unittest.mock.patch('asyncio.create_subprocess_exec')
def test_700_run_service(self, mock_subprocess):
func_mock = unittest.mock.Mock()
mock_subprocess.side_effect = functools.partial(
self.coroutine_mock, func_mock)
start_mock = unittest.mock.Mock()
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
vm.start = functools.partial(self.coroutine_mock, start_mock)
with self.subTest('running'):
self.loop.run_until_complete(vm.run_service('test.service'))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service dom0')
self.assertFalse(start_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('not_running'):
vm.is_running = lambda: False
with self.assertRaises(qubes.exc.QubesVMNotRunningError):
self.loop.run_until_complete(vm.run_service('test.service'))
self.assertFalse(func_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('autostart'):
vm.is_running = lambda: False
self.loop.run_until_complete(vm.run_service(
'test.service', autostart=True))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service dom0')
self.assertTrue(start_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('no_qrexec'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: False
with self.assertRaises(qubes.exc.QubesVMError):
self.loop.run_until_complete(vm.run_service('test.service'))
self.assertFalse(start_mock.called)
self.assertFalse(func_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('other_user'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
self.loop.run_until_complete(vm.run_service('test.service',
user='other'))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'other:QUBESRPC test.service dom0')
self.assertFalse(start_mock.called)
func_mock.reset_mock()
start_mock.reset_mock()
with self.subTest('other_source'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
self.loop.run_until_complete(vm.run_service('test.service',
source='test-inst-vm'))
func_mock.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service test-inst-vm')
self.assertFalse(start_mock.called)
@unittest.mock.patch('qubes.vm.qubesvm.QubesVM.run_service')
def test_710_run_service_for_stdio(self, mock_run_service):
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
func_mock = unittest.mock.Mock()
mock_run_service.side_effect = functools.partial(
self.coroutine_mock, func_mock)
communicate_mock = unittest.mock.Mock()
func_mock.return_value.communicate.side_effect = functools.partial(
self.coroutine_mock, communicate_mock)
communicate_mock.return_value = (b'stdout', b'stderr')
func_mock.return_value.returncode = 0
with self.subTest('default'):
value = self.loop.run_until_complete(
vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
communicate_mock.reset_mock()
with self.subTest('with_input'):
value = self.loop.run_until_complete(
vm.run_service_for_stdio('test.service', input=b'abc'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=b'abc')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
communicate_mock.reset_mock()
with self.subTest('error'):
func_mock.return_value.returncode = 1
with self.assertRaises(subprocess.CalledProcessError) as exc:
self.loop.run_until_complete(
vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(exc.exception.returncode, 1)
self.assertEqual(exc.exception.output, b'stdout')
self.assertEqual(exc.exception.stderr, b'stderr')