Merge remote-tracking branch 'qubesos/pr/174'

* qubesos/pr/174:
  tests: fix (system) network tests after switching to ipaddress module
  tests: resurrect extra tests loader
  tests: basic salt integration tests
This commit is contained in:
Marek Marczykowski-Górecki 2018-01-15 04:20:32 +01:00
commit b20c3d3458
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
5 changed files with 489 additions and 13 deletions

View File

@ -1198,12 +1198,13 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
'qubes.tests.integ.network',
'qubes.tests.integ.dispvm',
'qubes.tests.integ.vm_qrexec_gui',
'qubes.tests.integ.salt',
'qubes.tests.integ.backup',
'qubes.tests.integ.backupcompatibility',
# 'qubes.tests.regressions',
# external modules
# 'qubes.tests.extra',
'qubes.tests.extra',
):
tests.addTests(loader.loadTestsFromName(modname))

View File

@ -19,10 +19,71 @@
#
import sys
import asyncio
import subprocess
import pkg_resources
import qubes.tests
import qubes.vm.appvm
class ProcessWrapper(object):
def __init__(self, proc, loop=None):
self._proc = proc
self._loop = loop or asyncio.get_event_loop()
def __getattr__(self, item):
return getattr(self._proc, item)
def __setattr__(self, key, value):
if key.startswith('_'):
return super(ProcessWrapper, self).__setattr__(key, value)
return setattr(self._proc, key, value)
def communicate(self, input=None):
return self._loop.run_until_complete(self._proc.communicate(input))
class VMWrapper(object):
'''Wrap VM object to provide stable API for basic operations'''
def __init__(self, vm, loop=None):
self._vm = vm
self._loop = loop or asyncio.get_event_loop()
def __getattr__(self, item):
return getattr(self._vm, item)
def __setattr__(self, key, value):
if key.startswith('_'):
return super(VMWrapper, self).__setattr__(key, value)
return setattr(self._vm, key, value)
def __str__(self):
return str(self._vm)
def __eq__(self, other):
return self._vm == other
def start(self):
return self._loop.run_until_complete(self._vm.start())
def shutdown(self):
return self._loop.run_until_complete(self._vm.shutdown())
def run(self, command, wait=False, user=None, passio_popen=False,
passio_stderr=False, **kwargs):
if wait:
try:
self._loop.run_until_complete(
self._vm.run_for_stdio(command, user=user))
except subprocess.CalledProcessError as err:
return err.returncode
return 0
elif passio_popen:
p = self._loop.run_until_complete(self._vm.run(command, user=user,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE if passio_stderr else None))
return ProcessWrapper(p, self._loop)
class ExtraTestCase(qubes.tests.SystemTestCase):
@ -31,6 +92,17 @@ class ExtraTestCase(qubes.tests.SystemTestCase):
def setUp(self):
super(ExtraTestCase, self).setUp()
self.init_default_template(self.template)
if self.template is not None:
# also use this template for DispVMs
dispvm_base = self.app.add_new_vm('AppVM',
name=self.make_vm_name('dvm'),
template=self.template, label='red', template_for_dispvms=True)
self.loop.run_until_complete(dispvm_base.create_on_disk())
self.app.default_dispvm = dispvm_base
def tearDown(self):
self.app.default_dispvm = None
super(ExtraTestCase, self).tearDown()
def create_vms(self, names):
"""
@ -49,13 +121,14 @@ class ExtraTestCase(qubes.tests.SystemTestCase):
name=self.make_vm_name(vmname),
template=template,
label='red')
vm.create_on_disk()
self.loop.run_until_complete(vm.create_on_disk())
self.app.save()
# get objects after reload
vms = []
for vmname in names:
vms.append(self.app.domains[self.make_vm_name(vmname)])
vms.append(VMWrapper(self.app.domains[self.make_vm_name(vmname)],
loop=self.loop))
return vms
def enable_network(self):
@ -97,6 +170,6 @@ def load_tests(loader, tests, pattern):
ExtraForTemplateLoadFailure = type('ExtraForTemplateLoadFailure',
(qubes.tests.QubesTestCase,),
{entry.name: runTest})
tests.addTest(ExtraLoadFailure(entry.name))
tests.addTest(ExtraForTemplateLoadFailure(entry.name))
return tests

View File

@ -379,7 +379,7 @@ class VmNetworkingMixin(object):
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
self.assertNotIn(str(self.testvm1.ip), output)
try:
(output, _) = self.loop.run_until_complete(
@ -389,7 +389,7 @@ class VmNetworkingMixin(object):
output = output.decode()
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
self.assertNotIn(str(self.testvm1.netvm.ip), output)
def test_201_fake_ip_without_gw(self):
'''Test hiding VM real IP'''
@ -408,7 +408,7 @@ class VmNetworkingMixin(object):
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
self.assertNotIn(str(self.testvm1.ip), output)
def test_202_fake_ip_firewall(self):
'''Test hiding VM real IP, firewall'''
@ -546,7 +546,7 @@ class VmNetworkingMixin(object):
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
self.assertNotIn(str(self.testvm1.ip), output)
try:
(output, _) = self.loop.run_until_complete(
@ -556,7 +556,7 @@ class VmNetworkingMixin(object):
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
self.assertNotIn(str(self.testvm1.netvm.ip), output)
try:
(output, _) = self.loop.run_until_complete(
@ -566,7 +566,7 @@ class VmNetworkingMixin(object):
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertNotIn('192.168.1.128', output)
self.assertIn(self.testvm1.ip, output)
self.assertIn(str(self.testvm1.ip), output)
try:
(output, _) = self.loop.run_until_complete(
@ -576,7 +576,7 @@ class VmNetworkingMixin(object):
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.proxy.ip, output)
self.assertNotIn(str(self.proxy.ip), output)
def test_210_custom_ip_simple(self):
'''Custom AppVM IP'''
@ -932,8 +932,8 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
'ip -6 addr flush dev eth0 && '
'ip -6 addr add {}/128 dev eth0 && '
'ip -6 route add default via {} dev eth0'.format(
self.testvm1.visible_ip6 + '1',
self.testvm1.visible_gateway6),
str(self.testvm1.visible_ip6) + '1',
str(self.testvm1.visible_gateway6)),
user='root'))
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Spoofed ping should be blocked")

401
qubes/tests/integ/salt.py Normal file
View File

@ -0,0 +1,401 @@
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import subprocess
import json
import shutil
import asyncio
import qubes.tests
class SaltTestMixin(object):
def setUp(self):
super().setUp()
self.salt_testdir = '/srv/salt/test_salt'
os.makedirs(self.salt_testdir, exist_ok=True)
def tearDown(self):
shutil.rmtree(self.salt_testdir)
try:
tops = '/srv/salt/_tops/base'
for top_link in os.listdir(tops):
path = os.path.join(tops, top_link)
target = os.readlink(path)
if target.startswith(self.salt_testdir):
os.unlink(path)
except FileNotFoundError:
pass
super().tearDown()
def salt_call(self, cmd):
full_cmd = ['qubesctl']
if '--dom0-only' in cmd:
full_cmd.insert(1, '--dom0-only')
cmd.remove('--dom0-only')
full_cmd.extend(cmd)
full_cmd.append('--out=json')
p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
*full_cmd, stdout=subprocess.PIPE))
output, _ = self.loop.run_until_complete(p.communicate())
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, full_cmd, output)
return output.decode()
def dom0_salt_call_json(self, cmd):
return json.loads(self.salt_call(['--dom0-only'] + cmd))
class TC_00_Dom0(SaltTestMixin, qubes.tests.SystemTestCase):
def test_000_top_enable_disable(self):
with open(os.path.join(self.salt_testdir, 'something.sls'), 'w') as f:
f.write('test-top-enable:\n')
f.write(' test.succeed_without_changes: []\n')
with open(os.path.join(self.salt_testdir, 'something.top'), 'w') as f:
f.write('base:\n')
f.write(' dom0:\n')
f.write(' - test_salt.something\n')
cmd_output = self.dom0_salt_call_json(
['top.enable', 'test_salt.something'])
self.assertEqual(cmd_output,
{'local': {'test_salt.something.top': {'status': 'enabled'}}})
cmd_output = self.dom0_salt_call_json(['state.show_top'])
self.assertIn('local', cmd_output)
self.assertIn('base', cmd_output['local'])
self.assertIn('test_salt.something', cmd_output['local']['base'])
cmd_output = self.dom0_salt_call_json(
['top.disable', 'test_salt.something'])
#self.assertEqual(cmd_output,
# {'local': {'test_salt.something.top': {'status': 'disabled'}}})
cmd_output = self.dom0_salt_call_json(['state.show_top'])
self.assertIn('local', cmd_output)
self.assertIn('base', cmd_output['local'])
self.assertNotIn('test_salt.something', cmd_output['local']['base'])
def test_001_state_sls(self):
with open(os.path.join(self.salt_testdir, 'something.sls'), 'w') as f:
f.write('test-top-enable:\n')
f.write(' test.succeed_without_changes: []\n')
cmd_output = self.dom0_salt_call_json(
['state.sls', 'test_salt.something'])
state_id = 'test_|-test-top-enable_|-test-top-enable_|-succeed_without_changes'
self.assertIn('local', cmd_output)
self.assertIn(state_id, cmd_output['local'])
self.assertIn('start_time', cmd_output['local'][state_id])
del cmd_output['local'][state_id]['start_time']
self.assertIn('duration', cmd_output['local'][state_id])
del cmd_output['local'][state_id]['duration']
self.assertEqual(cmd_output,
{'local': {state_id: {
'name': 'test-top-enable',
'comment': 'Success!',
'result': True,
'__run_num__': 0,
'__sls__': 'test_salt.something',
'changes': {},
'__id__': 'test-top-enable'
}}})
def test_010_create_vm(self):
vmname = self.make_vm_name('appvm')
with open(os.path.join(self.salt_testdir, 'create_vm.sls'), 'w') as f:
f.write(vmname + ':\n')
f.write(' qvm.vm:\n')
f.write(' - present:\n')
f.write(' - label: orange\n')
f.write(' - prefs:\n')
f.write(' - vcpus: 1\n')
cmd_output = self.dom0_salt_call_json(
['state.sls', 'test_salt.create_vm'])
state_out = list(cmd_output['local'].values())[0]
del state_out['start_time']
del state_out['duration']
self.assertEqual(state_out, {
'comment': '====== [\'present\'] ======\n'
'/usr/bin/qvm-create {} --class=AppVM --label=orange \n'
'\n'
'====== [\'prefs\'] ======\n'.format(vmname),
'name': vmname,
'result': True,
'changes': {
'qvm.prefs': {'qvm.create': {
'vcpus': {'new': 1, 'old': '*default*'}
}
},
},
'__sls__': 'test_salt.create_vm',
'__run_num__': 0,
'__id__': vmname,
})
self.assertIn(vmname, self.app.domains)
vm = self.app.domains[vmname]
self.assertEqual(str(vm.label), 'orange')
self.assertEqual(vm.vcpus, 1)
def test_011_set_prefs(self):
vmname = self.make_vm_name('appvm')
vm = self.app.add_new_vm('AppVM', label='red',
name=vmname)
self.loop.run_until_complete(vm.create_on_disk())
with open(os.path.join(self.salt_testdir, 'create_vm.sls'), 'w') as f:
f.write(vmname + ':\n')
f.write(' qvm.vm:\n')
f.write(' - present:\n')
f.write(' - label: orange\n')
f.write(' - prefs:\n')
f.write(' - vcpus: 1\n')
cmd_output = self.dom0_salt_call_json(
['state.sls', 'test_salt.create_vm'])
state_out = list(cmd_output['local'].values())[0]
del state_out['start_time']
del state_out['duration']
self.assertEqual(state_out, {
'comment': '====== [\'present\'] ======\n'
'[SKIP] A VM with the name \'{}\' already exists.\n'
'\n'
'====== [\'prefs\'] ======\n'.format(vmname),
'name': vmname,
'result': True,
'changes': {
'qvm.prefs': {'qvm.create': {
'vcpus': {'new': 1, 'old': '*default*'}
}
},
},
'__sls__': 'test_salt.create_vm',
'__run_num__': 0,
'__id__': vmname,
})
self.assertIn(vmname, self.app.domains)
vm = self.app.domains[vmname]
self.assertEqual(str(vm.label), 'red')
self.assertEqual(vm.vcpus, 1)
def test_012_tags(self):
vmname = self.make_vm_name('appvm')
vm = self.app.add_new_vm('AppVM', label='red',
name=vmname)
self.loop.run_until_complete(vm.create_on_disk())
vm.tags.add('tag1')
vm.tags.add('tag3')
with open(os.path.join(self.salt_testdir, 'test_state.sls'), 'w') as f:
f.write(vmname + ':\n')
f.write(' qvm.vm:\n')
f.write(' - tags:\n')
f.write(' - present:\n')
f.write(' - tag1\n')
f.write(' - tag2\n')
f.write(' - absent:\n')
f.write(' - tag3\n')
f.write(' - tag4\n')
cmd_output = self.dom0_salt_call_json(
['state.sls', 'test_salt.test_state'])
state_out = list(cmd_output['local'].values())[0]
del state_out['start_time']
del state_out['duration']
self.assertEqual(state_out, {
'comment': '====== [\'tags\'] ======\n',
'name': vmname,
'result': True,
'changes': {
'qvm.tags': {'qvm.tags': {
'new': ['tag1', 'tag2'], 'old': ['tag1', 'tag3'],
}
},
},
'__sls__': 'test_salt.test_state',
'__run_num__': 0,
'__id__': vmname,
})
self.assertIn(vmname, self.app.domains)
vm = self.app.domains[vmname]
self.assertEqual(set(vm.tags), {'tag1', 'tag2'})
def test_020_qubes_pillar(self):
vmname = self.make_vm_name('appvm')
vm = self.app.add_new_vm('AppVM', label='red',
name=vmname)
self.loop.run_until_complete(vm.create_on_disk())
cmd_output = self.dom0_salt_call_json(
['pillar.items', '--id=' + vmname])
self.assertIn('local', cmd_output)
self.assertIn('qubes', cmd_output['local'])
qubes_pillar = cmd_output['local']['qubes']
self.assertEqual(qubes_pillar, {
'type': 'app',
'netvm': str(vm.netvm),
'template': str(vm.template),
})
class SaltVMTestMixin(SaltTestMixin):
template = None
def setUp(self):
if self.template.startswith('whonix'):
self.skipTest('Whonix not supported as salt VM')
super(SaltVMTestMixin, self).setUp()
self.init_default_template(self.template)
dispvm_tpl_name = self.make_vm_name('disp-tpl')
dispvm_tpl = self.app.add_new_vm('AppVM', label='red',
template_for_dispvms=True, name=dispvm_tpl_name)
self.loop.run_until_complete(dispvm_tpl.create_on_disk())
self.app.default_dispvm = dispvm_tpl
def tearDown(self):
self.app.default_dispvm = None
super(SaltVMTestMixin, self).tearDown()
def test_000_simple_sls(self):
vmname = self.make_vm_name('target')
self.vm = self.app.add_new_vm('AppVM', name=vmname, label='red')
self.loop.run_until_complete(self.vm.create_on_disk())
# start the VM manually, so it stays running after applying salt state
self.loop.run_until_complete(self.vm.start())
with open(os.path.join(self.salt_testdir, 'something.sls'), 'w') as f:
f.write('/home/user/testfile:\n')
f.write(' file.managed:\n')
f.write(' - contents: |\n')
f.write(' this is test\n')
with open(os.path.join(self.salt_testdir, 'something.top'), 'w') as f:
f.write('base:\n')
f.write(' {}:\n'.format(vmname))
f.write(' - test_salt.something\n')
# enable so state.show_top will not be empty, otherwise qubesctl will
# skip the VM; but we don't use state.highstate
self.dom0_salt_call_json(['top.enable', 'test_salt.something'])
state_output = self.salt_call(
['--skip-dom0', '--show-output', '--targets=' + vmname,
'state.sls', 'test_salt.something'])
expected_output = vmname + ':\n'
self.assertTrue(state_output.startswith(expected_output),
'Full output: ' + state_output)
state_id = 'file_|-/home/user/testfile_|-/home/user/testfile_|-managed'
# drop the header
state_output_json = json.loads(state_output[len(expected_output):])
state_output_json = state_output_json[vmname][state_id]
try:
del state_output_json['duration']
del state_output_json['start_time']
except KeyError:
pass
try:
del state_output_json['pchanges']
except KeyError:
pass
try:
# older salt do not report this
self.assertEqual(state_output_json['__id__'], '/home/user/testfile')
del state_output_json['__id__']
except KeyError:
pass
try:
# or sls file
self.assertEqual(state_output_json['__sls__'],
'test_salt.something')
del state_output_json['__sls__']
except KeyError:
pass
# different output depending on salt version
expected_output = {
'__run_num__': 0,
'changes': {
'diff': 'New file',
},
'name': '/home/user/testfile',
'comment': 'File /home/user/testfile updated',
'result': True,
}
self.assertEqual(state_output_json, expected_output)
stdout, stderr = self.loop.run_until_complete(self.vm.run_for_stdio(
'cat /home/user/testfile'))
self.assertEqual(stdout, b'this is test\n')
self.assertEqual(stderr, b'')
def test_001_multi_state_highstate(self):
vmname = self.make_vm_name('target')
self.vm = self.app.add_new_vm('AppVM', name=vmname, label='red')
self.loop.run_until_complete(self.vm.create_on_disk())
# start the VM manually, so it stays running after applying salt state
self.loop.run_until_complete(self.vm.start())
states = ('something', 'something2')
for state in states:
with open(os.path.join(self.salt_testdir, state + '.sls'), 'w') as f:
f.write('/home/user/{}:\n'.format(state))
f.write(' file.managed:\n')
f.write(' - contents: |\n')
f.write(' this is test\n')
with open(os.path.join(self.salt_testdir, state + '.top'), 'w') as f:
f.write('base:\n')
f.write(' {}:\n'.format(vmname))
f.write(' - test_salt.{}\n'.format(state))
self.dom0_salt_call_json(['top.enable', 'test_salt.something'])
self.dom0_salt_call_json(['top.enable', 'test_salt.something2'])
state_output = self.salt_call(
['--skip-dom0', '--show-output', '--targets=' + vmname,
'state.highstate'])
expected_output = vmname + ':\n'
self.assertTrue(state_output.startswith(expected_output),
'Full output: ' + state_output)
state_output_json = json.loads(state_output[len(expected_output):])
for state in states:
state_id = \
'file_|-/home/user/{0}_|-/home/user/{0}_|-managed'.format(state)
# drop the header
self.assertIn(state_id, state_output_json[vmname])
state_output_single = state_output_json[vmname][state_id]
self.assertTrue(state_output_single['result'])
self.assertNotEqual(state_output_single['changes'], {})
stdout, stderr = self.loop.run_until_complete(self.vm.run_for_stdio(
'cat /home/user/' + state))
self.assertEqual(stdout, b'this is test\n')
self.assertEqual(stderr, b'')
def load_tests(loader, tests, pattern):
for template in qubes.tests.list_templates():
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_10_VMSalt_' + template,
(SaltVMTestMixin, qubes.tests.SystemTestCase),
{'template': template})))
return tests

View File

@ -348,6 +348,7 @@ fi
%{python3_sitelib}/qubes/tests/integ/dispvm.py
%{python3_sitelib}/qubes/tests/integ/dom0_update.py
%{python3_sitelib}/qubes/tests/integ/network.py
%{python3_sitelib}/qubes/tests/integ/salt.py
%{python3_sitelib}/qubes/tests/integ/storage.py
%{python3_sitelib}/qubes/tests/integ/vm_qrexec_gui.py