Start using 'async def' syntax

The Python version (and pylint) are new enough finally.
Adjust QubesVM.run* functions for now.
Python 3.8 brings also AsyncMock() which makes tests slightly less ugly.
This commit is contained in:
Marek Marczykowski-Górecki 2020-11-25 03:51:52 +01:00
parent 8b321e8fb0
commit 77323cd30d
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 60 additions and 89 deletions

View File

@ -110,51 +110,41 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase):
@unittest.mock.patch('asyncio.create_subprocess_exec')
def test_700_run_service(self, mock_subprocess):
func_mock = unittest.mock.Mock()
mock_subprocess.side_effect = functools.partial(
self.coroutine_mock, func_mock)
self.add_vm('vm')
with self.subTest('running'):
self.loop.run_until_complete(self.vm.run_service('test.service'))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'/usr/lib/qubes/qubes-rpc-multiplexer',
'test.service', 'dom0', 'name', 'dom0')
func_mock.reset_mock()
mock_subprocess.reset_mock()
with self.subTest('other_user'):
self.loop.run_until_complete(
self.vm.run_service('test.service', user='other'))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'runuser', '-u', 'other', '--',
'/usr/lib/qubes/qubes-rpc-multiplexer',
'test.service', 'dom0', 'name', 'dom0')
func_mock.reset_mock()
mock_subprocess.reset_mock()
with self.subTest('other_source'):
self.loop.run_until_complete(
self.vm.run_service('test.service', source='test-inst-vm'))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'/usr/lib/qubes/qubes-rpc-multiplexer',
'test.service', 'test-inst-vm', 'name', 'dom0')
@unittest.mock.patch('qubes.vm.adminvm.AdminVM.run_service')
def test_710_run_service_for_stdio(self, mock_run_service):
func_mock = unittest.mock.Mock()
mock_run_service.side_effect = functools.partial(
self.coroutine_mock, func_mock)
communicate_mock = unittest.mock.Mock()
func_mock.return_value.communicate.side_effect = functools.partial(
self.coroutine_mock, communicate_mock)
communicate_mock = mock_run_service.return_value.communicate
communicate_mock.return_value = (b'stdout', b'stderr')
func_mock.return_value.returncode = 0
mock_run_service.return_value.returncode = 0
with self.subTest('default'):
value = self.loop.run_until_complete(
self.vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
mock_run_service.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -162,12 +152,12 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase):
communicate_mock.assert_called_once_with(input=None)
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
mock_run_service.reset_mock()
communicate_mock.reset_mock()
with self.subTest('with_input'):
value = self.loop.run_until_complete(
self.vm.run_service_for_stdio('test.service', input=b'abc'))
func_mock.assert_called_once_with(
mock_run_service.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -175,14 +165,14 @@ class TC_00_AdminVM(qubes.tests.QubesTestCase):
communicate_mock.assert_called_once_with(input=b'abc')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
mock_run_service.reset_mock()
communicate_mock.reset_mock()
with self.subTest('error'):
func_mock.return_value.returncode = 1
mock_run_service.return_value.returncode = 1
with self.assertRaises(subprocess.CalledProcessError) as exc:
self.loop.run_until_complete(
self.vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
mock_run_service.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,

View File

@ -2018,44 +2018,40 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
@unittest.mock.patch('asyncio.create_subprocess_exec')
def test_700_run_service(self, mock_subprocess):
func_mock = unittest.mock.Mock()
mock_subprocess.side_effect = functools.partial(
self.coroutine_mock, func_mock)
start_mock = unittest.mock.Mock()
start_mock = unittest.mock.AsyncMock()
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
vm.start = functools.partial(self.coroutine_mock, start_mock)
vm.start = start_mock
with self.subTest('running'):
self.loop.run_until_complete(vm.run_service('test.service'))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service dom0')
self.assertFalse(start_mock.called)
func_mock.reset_mock()
mock_subprocess.reset_mock()
start_mock.reset_mock()
with self.subTest('not_running'):
vm.is_running = lambda: False
with self.assertRaises(qubes.exc.QubesVMNotRunningError):
self.loop.run_until_complete(vm.run_service('test.service'))
self.assertFalse(func_mock.called)
self.assertFalse(mock_subprocess.called)
func_mock.reset_mock()
mock_subprocess.reset_mock()
start_mock.reset_mock()
with self.subTest('autostart'):
vm.is_running = lambda: False
self.loop.run_until_complete(vm.run_service(
'test.service', autostart=True))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service dom0')
self.assertTrue(start_mock.called)
func_mock.reset_mock()
mock_subprocess.reset_mock()
start_mock.reset_mock()
with self.subTest('no_qrexec'):
vm.is_running = lambda: True
@ -2063,28 +2059,28 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
with self.assertRaises(qubes.exc.QubesVMError):
self.loop.run_until_complete(vm.run_service('test.service'))
self.assertFalse(start_mock.called)
self.assertFalse(func_mock.called)
self.assertFalse(mock_subprocess.called)
func_mock.reset_mock()
mock_subprocess.reset_mock()
start_mock.reset_mock()
with self.subTest('other_user'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
self.loop.run_until_complete(vm.run_service('test.service',
user='other'))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'other:QUBESRPC test.service dom0')
self.assertFalse(start_mock.called)
func_mock.reset_mock()
mock_subprocess.reset_mock()
start_mock.reset_mock()
with self.subTest('other_source'):
vm.is_running = lambda: True
vm.is_qrexec_running = lambda: True
self.loop.run_until_complete(vm.run_service('test.service',
source='test-inst-vm'))
func_mock.assert_called_once_with(
mock_subprocess.assert_called_once_with(
'/usr/bin/qrexec-client', '-d', 'test-inst-vm',
'user:QUBESRPC test.service test-inst-vm')
self.assertFalse(start_mock.called)
@ -2094,19 +2090,14 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
func_mock = unittest.mock.Mock()
mock_run.side_effect = functools.partial(
self.coroutine_mock, func_mock)
communicate_mock = unittest.mock.Mock()
func_mock.return_value.communicate.side_effect = functools.partial(
self.coroutine_mock, communicate_mock)
communicate_mock = mock_run.return_value.communicate
communicate_mock.return_value = (b'stdout', b'stderr')
func_mock.return_value.returncode = 0
mock_run.return_value.returncode = 0
with self.subTest('default'):
value = self.loop.run_until_complete(
vm.run_for_stdio('cat'))
func_mock.assert_called_once_with(
mock_run.assert_called_once_with(
'cat',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -2114,12 +2105,12 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
communicate_mock.assert_called_once_with(input=b'')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
mock_run.reset_mock()
communicate_mock.reset_mock()
with self.subTest('with_input'):
value = self.loop.run_until_complete(
vm.run_for_stdio('cat', input=b'abc'))
func_mock.assert_called_once_with(
mock_run.assert_called_once_with(
'cat',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -2127,14 +2118,14 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
communicate_mock.assert_called_once_with(input=b'abc')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
mock_run.reset_mock()
communicate_mock.reset_mock()
with self.subTest('error'):
func_mock.return_value.returncode = 1
mock_run.return_value.returncode = 1
with self.assertRaises(subprocess.CalledProcessError) as exc:
self.loop.run_until_complete(
vm.run_for_stdio('cat'))
func_mock.assert_called_once_with(
mock_run.assert_called_once_with(
'cat',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -2149,19 +2140,14 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM,
name='vm')
func_mock = unittest.mock.Mock()
mock_run_service.side_effect = functools.partial(
self.coroutine_mock, func_mock)
communicate_mock = unittest.mock.Mock()
func_mock.return_value.communicate.side_effect = functools.partial(
self.coroutine_mock, communicate_mock)
communicate_mock = mock_run_service.return_value.communicate
communicate_mock.return_value = (b'stdout', b'stderr')
func_mock.return_value.returncode = 0
mock_run_service.return_value.returncode = 0
with self.subTest('default'):
value = self.loop.run_until_complete(
vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
mock_run_service.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -2169,12 +2155,12 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
communicate_mock.assert_called_once_with(input=b'')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
mock_run_service.reset_mock()
communicate_mock.reset_mock()
with self.subTest('with_input'):
value = self.loop.run_until_complete(
vm.run_service_for_stdio('test.service', input=b'abc'))
func_mock.assert_called_once_with(
mock_run_service.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@ -2182,14 +2168,14 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
communicate_mock.assert_called_once_with(input=b'abc')
self.assertEqual(value, (b'stdout', b'stderr'))
func_mock.reset_mock()
mock_run_service.reset_mock()
communicate_mock.reset_mock()
with self.subTest('error'):
func_mock.return_value.returncode = 1
mock_run_service.return_value.returncode = 1
with self.assertRaises(subprocess.CalledProcessError) as exc:
self.loop.run_until_complete(
vm.run_service_for_stdio('test.service'))
func_mock.assert_called_once_with(
mock_run_service.assert_called_once_with(
'test.service',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,

View File

@ -237,8 +237,7 @@ class AdminVM(qubes.vm.BaseVM):
self._qdb_connection = qubesdb.QubesDB(self.name)
return self._qdb_connection
@asyncio.coroutine
def run_service(self, service, source=None, user=None,
async def run_service(self, service, source=None, user=None,
filter_esc=False, autostart=False, gui=False, **kwargs):
'''Run service on this VM
@ -267,7 +266,7 @@ class AdminVM(qubes.vm.BaseVM):
if user is None:
user = 'root'
yield from self.fire_event_async('domain-cmd-pre-run', pre_event=True,
await self.fire_event_async('domain-cmd-pre-run', pre_event=True,
start_guid=gui)
if user != 'root':
@ -281,12 +280,11 @@ class AdminVM(qubes.vm.BaseVM):
'name',
self.name,
])
return (yield from asyncio.create_subprocess_exec(
return (await asyncio.create_subprocess_exec(
*cmd,
**kwargs))
@asyncio.coroutine
def run_service_for_stdio(self, *args, input=None, **kwargs):
async def run_service_for_stdio(self, *args, input=None, **kwargs):
'''Run a service, pass an optional input and return (stdout, stderr).
Raises an exception if return code != 0.
@ -302,10 +300,10 @@ class AdminVM(qubes.vm.BaseVM):
kwargs.setdefault('stdin', subprocess.PIPE)
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('stderr', subprocess.PIPE)
p = yield from self.run_service(*args, **kwargs)
p = await self.run_service(*args, **kwargs)
# this one is actually a tuple, but there is no need to unpack it
stdouterr = yield from p.communicate(input=input)
stdouterr = await p.communicate(input=input)
if p.returncode:
raise subprocess.CalledProcessError(p.returncode,

View File

@ -1415,8 +1415,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
return self
@asyncio.coroutine
def run_service(self, service, source=None, user=None,
async def run_service(self, service, source=None, user=None,
filter_esc=False, autostart=False, gui=False, **kwargs):
"""Run service on this VM
@ -1456,24 +1455,23 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if not self.is_running():
if not autostart:
raise qubes.exc.QubesVMNotRunningError(self)
yield from self.start(start_guid=gui)
await self.start(start_guid=gui)
if not self.is_qrexec_running():
raise qubes.exc.QubesVMError(
self, 'Domain {!r}: qrexec not connected'.format(self.name))
yield from self.fire_event_async('domain-cmd-pre-run', pre_event=True,
await self.fire_event_async('domain-cmd-pre-run', pre_event=True,
start_guid=gui)
return (yield from asyncio.create_subprocess_exec(
return (await asyncio.create_subprocess_exec(
qubes.config.system_path['qrexec_client_path'],
'-d', str(self.name),
*(('-t', '-T') if filter_esc else ()),
'{}:QUBESRPC {} {}'.format(user, service, source),
**kwargs))
@asyncio.coroutine
def run_service_for_stdio(self, *args, input=None, **kwargs):
async def run_service_for_stdio(self, *args, input=None, **kwargs):
"""Run a service, pass an optional input and return (stdout, stderr).
Raises an exception if return code != 0.
@ -1492,10 +1490,10 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if kwargs['stdin'] == subprocess.PIPE and input is None:
# workaround for https://bugs.python.org/issue39744
input = b''
p = yield from self.run_service(*args, **kwargs)
p = await self.run_service(*args, **kwargs)
# this one is actually a tuple, but there is no need to unpack it
stdouterr = yield from p.communicate(input=input)
stdouterr = await p.communicate(input=input)
if p.returncode:
raise subprocess.CalledProcessError(p.returncode,
@ -1503,7 +1501,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
return stdouterr
def run(self, command, user=None, **kwargs):
async def run(self, command, user=None, **kwargs):
"""Run a shell command inside the domain using qrexec.
This method is a coroutine.
@ -1512,14 +1510,13 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if user is None:
user = self.default_user
return asyncio.create_subprocess_exec(
return await asyncio.create_subprocess_exec(
qubes.config.system_path['qrexec_client_path'],
'-d', str(self.name),
'{}:{}'.format(user, command),
**kwargs)
@asyncio.coroutine
def run_for_stdio(self, *args, input=None, **kwargs):
async def run_for_stdio(self, *args, input=None, **kwargs):
"""Run a shell command inside the domain using qrexec.
This method is a coroutine.
@ -1531,8 +1528,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
if kwargs['stdin'] == subprocess.PIPE and input is None:
# workaround for https://bugs.python.org/issue39744
input = b''
p = yield from self.run(*args, **kwargs)
stdouterr = yield from p.communicate(input=input)
p = await self.run(*args, **kwargs)
stdouterr = await p.communicate(input=input)
if p.returncode:
raise subprocess.CalledProcessError(p.returncode,