qubes.tests asyncio

QubesOS/qubes-issues#2622
This commit is contained in:
Wojtek Porczyk 2017-04-18 10:16:14 +02:00
parent 345c16aa47
commit b256af3bfb
13 changed files with 1013 additions and 1015 deletions

View File

@ -31,10 +31,12 @@
don't run the tests.
"""
import asyncio
import collections
import functools
import logging
import os
import pathlib
import shutil
import subprocess
import sys
@ -42,6 +44,7 @@ import tempfile
import time
import traceback
import unittest
import warnings
from distutils import spawn
import lxml.etree
@ -223,6 +226,41 @@ class _AssertNotRaisesContext(object):
self.exception = exc_value # store for later retrieval
class _QrexecPolicyContext(object):
'''Context manager for SystemTestsMixin.qrexec_policy'''
def __init__(self, service, source, destination, allow=True):
try:
source = source.name
except AttributeError:
pass
try:
destination = destination.name
except AttributeError:
pass
self._filename = pathlib.Path('/etc/qubes-rpc/policy') / service
self._rule = '{} {} {}\n'.format(source, destination,
'allow' if allow else 'deny')
def _change(self, add=True):
with self._filename.open('r+') as policy:
policy_rules = policy.readlines()
if add:
policy_rules.insert(0, self._rule)
else:
policy_rules.remove(self._rule)
policy.truncate(0)
policy.seek(0)
policy.write(''.join(policy_rules))
def __enter__(self):
self._change(add=True)
return self
def __exit__(self, exc_type, exc_value, tb):
self._change(add=False)
class substitute_entry_points(object):
'''Monkey-patch pkg_resources to substitute one group in iter_entry_points
@ -279,6 +317,8 @@ class QubesTestCase(unittest.TestCase):
self.addTypeEqualityFunc(qubes.devices.DeviceManager,
self.assertDevicesEqual)
self.loop = None
def __str__(self):
return '{}/{}/{}'.format(
@ -287,9 +327,20 @@ class QubesTestCase(unittest.TestCase):
self._testMethodName)
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
super(QubesTestCase, self).tearDown()
# The loop, when closing, throws a warning if there is
# some unfinished bussiness. Let's catch that.
with warnings.catch_warnings():
warnings.simplefilter('error')
self.loop.close()
# TODO: find better way in py3
try:
result = self._outcome.result
@ -749,20 +800,7 @@ class SystemTestsMixin(object):
:return:
"""
def add_remove_rule(add=True):
with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
policy_rules = policy.readlines()
rule = "{} {} {}\n".format(source, destination,
'allow' if allow else 'deny')
if add:
policy_rules.insert(0, rule)
else:
policy_rules.remove(rule)
policy.truncate(0)
policy.seek(0)
policy.write(''.join(policy_rules))
add_remove_rule(add=True)
self.addCleanup(add_remove_rule, add=False)
return _QrexecPolicyContext(service, source, destination, allow=allow)
def wait_for_window(self, title, timeout=30, show=True):
"""

View File

@ -37,6 +37,7 @@ class TestApp(qubes.tests.TestEmitter):
class TC_30_VMCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.vms = qubes.app.VMCollection(self.app)

View File

@ -75,6 +75,7 @@ class TestVM(qubes.tests.TestEmitter):
class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.emitter = TestVM(self.app, 'vm')
self.app.domains['vm'] = self.emitter
@ -152,6 +153,7 @@ class TC_00_DeviceCollection(qubes.tests.QubesTestCase):
class TC_01_DeviceManager(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.emitter = TestVM(self.app, 'vm')
self.manager = qubes.devices.DeviceManager(self.emitter)

View File

@ -64,6 +64,7 @@ class TC_00_Label(qubes.tests.QubesTestCase):
class TC_10_property(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
try:
class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
testprop1 = qubes.property('testprop1')
@ -206,6 +207,7 @@ class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder):
class TC_20_PropertyHolder(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
xml = lxml.etree.XML('''
<qubes version="3">
<properties>
@ -314,6 +316,7 @@ class TestApp(qubes.tests.TestEmitter):
class TC_30_VMCollection(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.app = TestApp()
self.vms = qubes.app.VMCollection(self.app)

View File

@ -500,14 +500,14 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
def test_100_send_to_vm_file_with_spaces(self):
vms = self.create_backup_vms()
self.backupvm.start()
self.backupvm.run("mkdir '/var/tmp/backup directory'", wait=True)
self.loop.run_until_complete(self.backupvm.run_for_stdio(
"mkdir '/var/tmp/backup directory'"))
self.make_backup(vms, target_vm=self.backupvm,
compressed=True, encrypted=True,
target='/var/tmp/backup directory')
self.remove_vms(reversed(vms))
p = self.backupvm.run("ls /var/tmp/backup*/qubes-backup*",
passio_popen=True)
(backup_path, _) = p.communicate()
(backup_path, _) = self.loop.run_until_complete(
self.backupvm.run_for_stdio("ls /var/tmp/backup*/qubes-backup*"))
backup_path = backup_path.decode().strip()
self.restore_backup(source=backup_path,
appvm=self.backupvm)
@ -530,7 +530,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
"""
vms = self.create_backup_vms()
self.backupvm.start()
retcode = self.backupvm.run(
self.loop.run_until_complete(self.backupvm.run_for_stdio(
# Debian 7 has too old losetup to handle loop-control device
"mknod /dev/loop0 b 7 0;"
"truncate -s 50M /home/user/backup.img && "
@ -538,9 +538,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin):
"mkdir /home/user/backup && "
"mount /home/user/backup.img /home/user/backup -o loop &&"
"chmod 777 /home/user/backup",
user="root", wait=True)
if retcode != 0:
raise RuntimeError("Failed to prepare backup directory")
user="root"))
with self.assertRaises(qubes.exc.QubesException):
self.make_backup(vms, target_vm=self.backupvm,
compressed=False, encrypted=True,

View File

@ -24,6 +24,7 @@
from distutils import spawn
import asyncio
import os
import subprocess
import tempfile
@ -74,7 +75,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
template=self.app.default_template,
label='red')
self.vm.create_on_disk()
self.loop.run_until_complete(self.vm.create_on_disk())
def save_and_reload_db(self):
super(TC_01_Properties, self).save_and_reload_db()
@ -152,13 +153,13 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
name=self.make_vm_name("vm"),
template=self.app.default_template,
label='red')
testvm1.create_on_disk()
self.loop.run_until_complete(testvm1.create_on_disk())
testvm2 = self.app.add_new_vm(testvm1.__class__,
name=self.make_vm_name("clone"),
template=testvm1.template,
label='red')
testvm2.clone_properties(testvm1)
testvm2.clone_disk_files(testvm1)
self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
self.assertTrue(testvm1.storage.verify())
self.assertIn('source', testvm1.volumes['root'].config)
self.assertNotEquals(testvm2, None)
@ -206,7 +207,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
template=testvm1.template,
label='red',)
testvm3.clone_properties(testvm1)
testvm3.clone_disk_files(testvm1)
self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
# qubes.xml reload
self.save_and_reload_db()
@ -239,21 +240,21 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.vmname, template=self.app.default_template,
label='red')
self.vm2.create_on_disk()
self.loop.run_until_complete(self.vm2.create_on_disk())
def test_021_name_conflict_template(self):
# TODO decide what exception should be here
with self.assertRaises((qubes.exc.QubesException, ValueError)):
self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
name=self.vmname, label='red')
self.vm2.create_on_disk()
self.loop.run_until_complete(self.vm2.create_on_disk())
def test_030_rename_conflict_app(self):
vm2name = self.make_vm_name('newname')
self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vm2name, template=self.app.default_template, label='red')
self.vm2.create_on_disk()
self.loop.run_until_complete(self.vm2.create_on_disk())
with self.assertNotRaises(OSError):
with self.assertRaises(qubes.exc.QubesException):
@ -272,7 +273,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
qubes.vm.appvm.AppVM,
name=self.make_vm_name("vm"),
label='red')
self.testvm.create_on_disk()
self.loop.run_until_complete(self.testvm.create_on_disk())
self.save_and_reload_db()
def setup_hvm(self):
@ -281,7 +282,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
name=self.make_vm_name("hvm"),
label='red')
self.testvm.hvm = True
self.testvm.create_on_disk()
self.loop.run_until_complete(self.testvm.create_on_disk())
self.save_and_reload_db()
def pref_set(self, name, value, valid=True):
@ -385,7 +386,8 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
label='red'
)
self.test_template.clone_properties(self.app.default_template)
self.test_template.clone_disk_files(self.app.default_template)
self.loop.run_until_complete(
self.test_template.clone_disk_files(self.app.default_template))
self.save_and_reload_db()
def setup_hvm_template(self):
@ -395,7 +397,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
label='red',
hvm=True
)
self.test_template.create_on_disk()
self.loop.run_until_complete(self.test_template.create_on_disk())
self.save_and_reload_db()
def get_rootimg_checksum(self):
@ -406,7 +408,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
def _do_test(self):
checksum_before = self.get_rootimg_checksum()
self.test_template.start()
self.loop.run_until_complete(self.test_template.start())
self.shutdown_and_wait(self.test_template)
checksum_changed = self.get_rootimg_checksum()
if checksum_before == checksum_changed:
@ -449,18 +451,19 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_clipboard(self):
testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm1'), label='red')
testvm1.create_on_disk()
self.loop.run_until_complete(testvm1.create_on_disk())
testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm2'), label='red')
testvm2.create_on_disk()
self.loop.run_until_complete(testvm2.create_on_disk())
self.app.save()
testvm1.start()
testvm2.start()
self.loop.run_until_complete(asyncio.wait([
testvm1.start(),
testvm2.start()]))
window_title = 'user@{}'.format(testvm1.name)
testvm1.run('zenity --text-info --editable --title={}'.format(
window_title))
self.loop.run_until_complete(testvm1.run(
'zenity --text-info --editable --title={}'.format(window_title)))
self.wait_for_window(window_title)
time.sleep(0.5)
@ -491,17 +494,17 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# Then paste it to the other window
window_title = 'user@{}'.format(testvm2.name)
p = testvm2.run('zenity --entry --title={} > test.txt'.format(
window_title), passio_popen=True)
p = self.loop.run_until_complete(testvm2.run(
'zenity --entry --title={} > test.txt'.format(window_title)))
self.wait_for_window(window_title)
subprocess.check_call(['xdotool', 'key', '--delay', '100',
'ctrl+shift+v', 'ctrl+v', 'Return'])
p.wait()
self.loop.run_until_complete(p.wait())
# And compare the result
(test_output, _) = testvm2.run('cat test.txt',
passio_popen=True).communicate()
(test_output, _) = self.loop.run_until_complete(
testvm2.run_for_stdio('cat test.txt'))
self.assertEquals(test_string, test_output.strip().decode('ascii'))
clipboard_content = \
@ -523,24 +526,26 @@ class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase
def test_000_create_start(self):
testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
name=self.make_vm_name('vm1'), label='red')
testvm1.clone_disk_files(self.app.default_template)
self.loop.run_until_complete(
testvm1.clone_disk_files(self.app.default_template))
self.app.save()
testvm1.start()
self.loop.run_until_complete(testvm1.start())
self.assertEquals(testvm1.get_power_state(), "Running")
@unittest.expectedFailure
def test_100_resize_root_img(self):
testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
name=self.make_vm_name('vm1'), label='red')
testvm1.clone_disk_files(self.app.default_template)
self.loop.run_until_complete(
testvm1.clone_disk_files(self.app.default_template))
self.app.save()
testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3)
self.loop.run_until_complete(
testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3))
self.assertEquals(testvm1.volumes['root'].size, 20 * 1024 ** 3)
testvm1.start()
p = testvm1.run('df --output=size /|tail -n 1',
passio_popen=True)
self.loop.run_until_complete(testvm1.start())
# new_size in 1k-blocks
(new_size, _) = p.communicate()
(new_size, _) = self.loop.run_until_complete(
testvm1.run_for_stdio('df --output=size /|tail -n 1'))
# some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)

View File

@ -108,14 +108,14 @@ enabled = 1
name=self.make_vm_name("updatevm"),
label='red'
)
self.updatevm.create_on_disk()
self.loop.run_until_complete(self.updatevm.create_on_disk())
self.app.updatevm = self.updatevm
self.app.save()
subprocess.call(['sudo', 'rpm', '-e', self.pkg_name],
stderr=open(os.devnull, 'w'))
subprocess.check_call(['sudo', 'rpm', '--import',
os.path.join(self.tmpdir, 'pubkey.asc')])
self.updatevm.start()
self.loop.run_until_complete(self.updatevm.start())
self.repo_running = False
def tearDown(self):
@ -170,26 +170,28 @@ Test package
return pkg_path
def send_pkg(self, filename):
p = self.updatevm.run('mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
os.path.basename(
filename)), passio_popen=True)
p.stdin.write(open(filename, 'rb').read())
p.stdin.close()
p.wait()
retcode = self.updatevm.run('cd /tmp/repo; createrepo .', wait=True)
if retcode == 127:
self.skipTest("createrepo not installed in template {}".format(
self.template))
elif retcode != 0:
self.skipTest("createrepo failed with code {}, cannot perform the "
"test".format(retcode))
self.loop.run_until_complete(self.updatevm.run_for_stdio(
'mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format(
os.path.basename(filename)),
input=open(filename, 'rb').read()))
try:
self.loop.run_until_complete(
self.updatevm.run_for_stdio('cd /tmp/repo; createrepo .'))
except subprocess.CalledProcessError as e:
if e.returncode == 127:
self.skipTest('createrepo not installed in template {}'.format(
self.template))
else:
self.skipTest('createrepo failed with code {}, '
'cannot perform the test'.format(retcode))
self.start_repo()
def start_repo(self):
if not self.repo_running:
self.updatevm.run("cd /tmp/repo &&"
"python -m SimpleHTTPServer 8080")
self.repo_running = True
if self.repo_running:
return
self.loop.run_until_complete(self.updatevm.run(
'cd /tmp/repo && python -m SimpleHTTPServer 8080'))
self.repo_running = True
def test_000_update(self):
"""Dom0 update tests

View File

@ -22,6 +22,7 @@
from distutils import spawn
import asyncio
import multiprocessing
import os
import subprocess
@ -35,7 +36,6 @@ class NcVersion:
Trad = 1
Nmap = 2
# noinspection PyAttributeOutsideInit
class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
test_ip = '192.168.123.45'
@ -49,10 +49,11 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
template = None
def run_cmd(self, vm, cmd, user="root"):
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
p.stdin.close()
p.stdout.read().decode()
return p.wait()
try:
self.loop.run_until_complete(vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
return e.returncode
return 0
def setUp(self):
super(VmNetworkingMixin, self).setUp()
@ -81,11 +82,12 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.fail("Command '%s' failed" % cmd)
if not self.testnetvm.is_running():
self.testnetvm.start()
self.loop.run_until_complete(self.testnetvm.start())
# Ensure that dnsmasq is installed:
p = self.testnetvm.run("dnsmasq --version", user="root",
passio_popen=True)
if p.wait() != 0:
try:
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
'dnsmasq --version', user='root'))
except subprocess.CalledProcessError:
self.skipTest("dnsmasq not installed")
run_netvm_cmd("ip link add test0 type dummy")
@ -102,7 +104,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
def test_000_simple_networking(self):
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
@ -113,11 +115,11 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
label='red')
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
@ -137,13 +139,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.proxy.features['network-manager'] = True
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed")
@ -182,7 +184,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
@ -196,14 +198,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
@ -278,7 +279,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
@ -286,12 +287,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm3'),
label='red')
self.testvm2.create_on_disk()
self.loop.run_until_complete(self.testvm2.create_on_disk())
self.testvm2.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.testvm2.start()
self.loop.run_until_complete(asyncio.wait([
self.testvm1.start(),
self.testvm2.start()]))
self.assertNotEqual(self.run_cmd(self.testvm1,
self.ping_cmd.format(target=self.testvm2.ip)), 0)
@ -312,14 +314,14 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
def test_050_spoof_ip(self):
"""Test if VM IP spoofing is blocked"""
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.testvm1.run("ip addr flush dev eth0", user="root", wait=True)
self.testvm1.run("ip addr add 10.137.1.128/24 dev eth0", user="root",
wait=True)
self.testvm1.run("ip route add default dev eth0", user="root",
wait=True)
self.loop.run_until_complete(self.testvm1.run_for_stdio('''
ip addr flush dev eth0
ip addr add 10.137.1.128/24 dev eth0
ip route add default dev eth0
''', user='root'))
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Spoofed ping should be blocked")
@ -329,7 +331,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
cmd = "systemctl stop xendriverdomain"
if self.run_cmd(self.testnetvm, cmd) != 0:
self.fail("Command '%s' failed" % cmd)
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
cmd = "systemctl start xendriverdomain"
if self.run_cmd(self.testnetvm, cmd) != 0:
@ -343,24 +345,26 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm1.features['net/fake-gateway'] = '192.168.1.1'
self.testvm1.features['net/fake-netmask'] = '255.255.255.0'
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError:
self.fail('ip addr show dev eth0 failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
p = self.testvm1.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip route show failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio('ip route show', user='root'))
except subprocess.CalledProcessError:
self.fail('ip route show failed')
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
@ -368,15 +372,17 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'''Test hiding VM real IP'''
self.testvm1.features['net/fake-ip'] = '192.168.1.128'
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio('ip addr show dev eth0',
user='root'))
except subprocess.CalledProcessError:
self.fail('ip addr show dev eth0 failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
@ -390,7 +396,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
@ -408,14 +414,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
@ -437,7 +442,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
@ -448,31 +453,36 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm3'),
label='red')
self.testvm2.create_on_disk()
self.loop.run_until_complete(self.testvm2.create_on_disk())
self.testvm2.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.testvm2.start()
self.loop.run_until_complete(self.testvm1.start())
self.loop.run_until_complete(self.testvm2.start())
cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
self.testvm2.ip, self.testvm1.ip)
retcode = self.proxy.run(cmd, user='root', wait=True)
self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode))
try:
self.loop.run_until_complete(self.proxy.run_for_stdio(
'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
self.testvm2.ip, self.testvm1.ip), user='root'))
except subprocess.CalledProcessError as e:
self.fail('{} failed with: {}'.format(cmd, e.returncode))
cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(
self.testvm2.ip)
retcode = self.testvm1.run(cmd, user='root', wait=True)
self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode))
try:
self.loop.run_until_complete(self.proxy.run_for_stdio(
'iptables -I INPUT -s {} -j ACCEPT'.format(
self.testvm2.ip), user='root'))
except subprocess.CalledProcessError as e:
self.fail('{} failed with: {}'.format(cmd, e.returncode))
self.assertEqual(self.run_cmd(self.testvm2,
self.ping_cmd.format(target=self.testvm1.ip)), 0)
cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
p = self.testvm1.run(cmd, user='root', passio_popen=True)
(stdout, _) = p.communicate()
self.assertEqual(p.returncode, 0,
'{} failed with {}'.format(cmd, p.returncode))
try:
(stdout, _) = self.loop.run_until_complete(self.testvm1.run_for_stdio(
'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip), user='root'))
except subprocess.CalledProcessError as e:
self.fail(
'{} failed with {}'.format(cmd, e.returncode))
self.assertNotEqual(stdout.decode().split()[0], '0',
'Packets didn\'t managed to the VM')
@ -481,7 +491,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.features['net/fake-ip'] = '192.168.1.128'
@ -489,7 +499,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy.features['net/fake-netmask'] = '255.255.255.0'
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
@ -497,39 +507,39 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.proxy.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.proxy.run_for_stdio(
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip addr show dev eth0 failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
p = self.proxy.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip route show failed')
try:
(output, _) = self.loop.run_until_complete(
self.proxy.run_for_stdio(
'ip route show', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip route show failed')
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip addr show dev eth0 failed')
self.assertNotIn('192.168.1.128', output)
self.assertIn(self.testvm1.ip, output)
p = self.testvm1.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read().decode()
self.assertEqual(p.wait(), 0, 'ip route show failed')
try:
(output, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'ip route show', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip route show failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.proxy.ip, output)
@ -537,7 +547,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'''Custom AppVM IP'''
self.testvm1.ip = '192.168.1.1'
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
@ -546,14 +556,14 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.ip = '192.168.1.1'
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
@ -566,7 +576,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
@ -584,14 +594,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
@ -686,10 +695,11 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
)
def run_cmd(self, vm, cmd, user="root"):
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
p.stdin.close()
p.stdout.read().decode()
return p.wait()
try:
self.loop.run_until_complete(vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
return e.returncode
return 0
def setUp(self):
super(VmUpdatesMixin, self).setUp()
@ -724,116 +734,95 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm1'),
label='red')
self.testvm1.create_on_disk()
self.loop.run_until_complete(self.testvm1.create_on_disk())
def test_000_simple_update(self):
self.save_and_reload_db()
# reload the VM to have all the properties properly set (especially
# default netvm)
self.testvm1 = self.app.domains[self.testvm1.qid]
self.testvm1.start()
p = self.testvm1.run(self.update_cmd, wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
self.loop.run_until_complete(self.testvm1.start())
p = self.loop.run_until_complete(
self.testvm1.run(self.update_cmd, user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(p.returncode, self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
def create_repo_apt(self):
pkg_file_name = "test-pkg_1.0-1_amd64.deb"
p = self.netvm_repo.run("mkdir /tmp/apt-repo && cd /tmp/apt-repo &&"
"base64 -d | zcat > {}".format(pkg_file_name),
passio_popen=True)
p.stdin.write(self.DEB_PACKAGE_GZIP_BASE64)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write {}".format(pkg_file_name))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir /tmp/apt-repo \
&& cd /tmp/apt-repo \
&& base64 -d | zcat > {}
'''.format(pkg_file_name),
input=self.DEB_PACKAGE_GZIP_BASE64))
# do not assume dpkg-scanpackage installed
packages_path = "dists/test/main/binary-amd64/Packages"
p = self.netvm_repo.run(
"mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 && "
"cd /tmp/apt-repo && "
"cat > {packages} && "
"echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ')"
" >> {packages} && "
"echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ')"
" >> {packages} && "
"echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ')"
" >> {packages} && "
"gzip < {packages} > {packages}.gz".format(pkg=pkg_file_name,
packages=packages_path),
passio_popen=True, passio_stderr=True)
p.stdin.write(
"Package: test-pkg\n"
"Version: 1.0-1\n"
"Architecture: amd64\n"
"Maintainer: unknown <user@host>\n"
"Installed-Size: 25\n"
"Filename: {pkg}\n"
"Size: 994\n"
"Section: unknown\n"
"Priority: optional\n"
"Description: Test package\n".format(pkg=pkg_file_name)
)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write Packages file: {}".format(
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 \
&& cd /tmp/apt-repo \
&& cat > {packages} \
&& echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ') \
>> {packages} \
&& echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ') \
>> {packages} \
&& echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ') \
>> {packages} \
&& gzip < {packages} > {packages}.gz
'''.format(pkg=pkg_file_name, packages=packages_path),
input='''\
Package: test-pkg
Version: 1.0-1
Architecture: amd64
Maintainer: unknown <user@host>
Installed-Size: 25
Filename: {pkg}
Size: 994
Section: unknown
Priority: optional
Description: Test package'''.format(pkg=pkg_file_name).encode('utf-8')))
p = self.netvm_repo.run(
"mkdir -p /tmp/apt-repo/dists/test && "
"cd /tmp/apt-repo/dists/test && "
"cat > Release && "
"echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}"
" >> Release && "
"echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}"
" >> Release"
.format(p="main/binary-amd64/Packages",
z="main/binary-amd64/Packages.gz"),
passio_popen=True, passio_stderr=True
)
p.stdin.write(
"Label: Test repo\n"
"Suite: test\n"
"Codename: test\n"
"Date: Tue, 27 Oct 2015 03:22:09 UTC\n"
"Architectures: amd64\n"
"Components: main\n"
"SHA256:\n"
)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write Release file: {}".format(
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir -p /tmp/apt-repo/dists/test \
&& cd /tmp/apt-repo/dists/test \
&& cat > Release \
&& echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}\
>> Release \
&& echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}\
>> Release
'''.format(p='main/binary-amd64/Packages',
z='main/binary-amd64/Packages.gz'),
input='''\
Label: Test repo
Suite: test
Codename: test
Date: Tue, 27 Oct 2015 03:22:09 UTC
Architectures: amd64
Components: main
SHA256:
'''))
def create_repo_yum(self):
pkg_file_name = "test-pkg-1.0-1.fc21.x86_64.rpm"
p = self.netvm_repo.run("mkdir /tmp/yum-repo && cd /tmp/yum-repo &&"
"base64 -d | zcat > {}".format(pkg_file_name),
passio_popen=True, passio_stderr=True)
p.stdin.write(self.RPM_PACKAGE_GZIP_BASE64)
p.stdin.close()
if p.wait() != 0:
raise RuntimeError("Failed to write {}: {}".format(pkg_file_name,
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
mkdir /tmp/yum-repo \
&& cd /tmp/yum-repo \
&& base64 -d | zcat > {}
'''.format(pkg_file_name), input=self.RPM_PACKAGE_GZIP_BASE64))
# createrepo is installed by default in Fedora template
p = self.netvm_repo.run("createrepo /tmp/yum-repo",
passio_popen=True,
passio_stderr=True)
if p.wait() != 0:
raise RuntimeError("Failed to create yum metadata: {}".format(
p.stderr.read().decode()))
self.loop.run_until_complete(self.netvm_repo.run_for_stdio(
'createrepo /tmp/yum-repo'))
def create_repo_and_serve(self):
if self.template.count("debian") or self.template.count("whonix"):
self.create_repo_apt()
self.netvm_repo.run("cd /tmp/apt-repo &&"
"python -m SimpleHTTPServer 8080")
self.loop.run_until_complete(self.netvm_repo.run(
'cd /tmp/apt-repo && python -m SimpleHTTPServer 8080'))
elif self.template.count("fedora"):
self.create_repo_yum()
self.netvm_repo.run("cd /tmp/yum-repo &&"
"python -m SimpleHTTPServer 8080")
self.loop.run_until_complete(self.netvm_repo.run(
'cd /tmp/yum-repo && python -m SimpleHTTPServer 8080'))
else:
# not reachable...
self.skipTest("Template {} not supported by this test".format(
@ -848,13 +837,13 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
"""
if self.template.count("debian") or self.template.count("whonix"):
self.testvm1.run(
self.loop.run_until_complete(self.testvm1.run_for_stdio(
"rm -f /etc/apt/sources.list.d/* &&"
"echo 'deb [trusted=yes] http://localhost:8080 test main' "
"> /etc/apt/sources.list",
user="root")
user="root"))
elif self.template.count("fedora"):
self.testvm1.run(
self.loop.run_until_complete(self.testvm1.run_for_stdio(
"rm -f /etc/yum.repos.d/*.repo &&"
"echo '[test]' > /etc/yum.repos.d/test.repo &&"
"echo 'name=Test repo' >> /etc/yum.repos.d/test.repo &&"
@ -862,7 +851,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
"echo 'baseurl=http://localhost:8080/'"
" >> /etc/yum.repos.d/test.repo",
user="root"
)
))
else:
# not reachable...
self.skipTest("Template {} not supported by this test".format(
@ -881,7 +870,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
name=self.make_vm_name('net'),
label='red')
self.netvm_repo.provides_network = True
self.netvm_repo.create_on_disk()
self.loop.run_until_complete(self.netvm_repo.create_on_disk())
self.testvm1.netvm = self.netvm_repo
# NetVM should have qubes-updates-proxy enabled by default
#self.netvm_repo.features['qubes-updates-proxy'] = True
@ -890,38 +879,33 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
self.app.save()
# Setup test repo
self.netvm_repo.start()
self.loop.run_until_complete(self.netvm_repo.start())
self.create_repo_and_serve()
# Configure local repo
self.testvm1.start()
self.loop.run_until_complete(self.testvm1.start())
self.configure_test_repo()
# update repository metadata
p = self.testvm1.run(self.update_cmd, wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
p = self.loop.run_until_complete(self.testvm1.run(
self.update_cmd, user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
# install test package
p = self.testvm1.run(self.install_cmd.format('test-pkg'),
wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
p = self.loop.run_until_complete(self.testvm1.run(
self.install_cmd.format('test-pkg'), user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
# verify if it was really installed
p = self.testvm1.run(self.install_test_cmd.format('test-pkg'),
wait=True, user="root",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertIn(p.wait(), self.exit_code_ok,
"{}: {}\n{}".format(self.update_cmd, stdout, stderr)
)
p = self.loop.run_until_complete(self.testvm1.run(
self.install_test_cmd.format('test-pkg'), user='root'))
(stdout, stderr) = self.loop.run_until_complete(p.communicate())
self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok,
'{}: {}\n{}'.format(self.update_cmd, stdout, stderr))
def load_tests(loader, tests, pattern):
try:

View File

@ -19,8 +19,8 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import asyncio
import os
import shutil
import qubes.storage.lvm
@ -51,6 +51,10 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
def test_000_volatile(self):
'''Test if volatile volume is really volatile'''
return self.loop.run_until_complete(self._test_000_volatile())
@asyncio.coroutine
def _test_000_volatile(self):
size = 32*1024*1024
volume_config = {
'pool': self.pool.name,
@ -60,27 +64,29 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
self.app.save()
self.vm1.start()
p = self.vm1.run(
yield from (self.vm1.start())
# volatile image not clean
yield from (self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'volatile image not clean: {}'.format(stdout))
self.vm1.run('echo test123 > /dev/xvde', user='root', wait=True)
self.vm1.shutdown(wait=True)
self.vm1.start()
p = self.vm1.run(
user='root'))
# volatile image not volatile
yield from (
self.vm1.run_for_stdio('echo test123 > /dev/xvde', user='root'))
yield from (self.vm1.shutdown(wait=True))
yield from (self.vm1.start())
yield from (self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'volatile image not volatile: {}'.format(stdout))
user='root'))
def test_001_non_volatile(self):
'''Test if non-volatile volume is really non-volatile'''
return self.loop.run_until_complete(self._test_001_non_volatile())
@asyncio.coroutine
def _test_001_non_volatile(self):
size = 32*1024*1024
volume_config = {
'pool': self.pool.name,
@ -89,28 +95,31 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'save_on_stop': True,
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
testvol = yield from self.vm1.storage.init_volume(
'testvol', volume_config)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
self.app.save()
self.vm1.start()
p = self.vm1.run(
yield from self.vm1.start()
# non-volatile image not clean
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'non-volatile image not clean: {}'.format(stdout))
self.vm1.run('echo test123 > /dev/xvde', user='root', wait=True)
self.vm1.shutdown(wait=True)
self.vm1.start()
p = self.vm1.run(
user='root')
yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde',
user='root')
yield from self.vm1.shutdown(wait=True)
yield from self.vm1.start()
# non-volatile image volatile
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertNotEqual(p.returncode, 0,
'non-volatile image volatile: {}'.format(stdout))
user='root')
def test_002_read_only(self):
'''Test read-only volume'''
self.loop.run_until_complete(self._test_002_read_only())
@asyncio.coroutine
def _test_002_read_only(self):
size = 32 * 1024 * 1024
volume_config = {
'pool': self.pool.name,
@ -120,29 +129,28 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': False,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
self.app.save()
self.vm1.start()
p = self.vm1.run(
yield from self.vm1.start()
# non-volatile image not clean
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'non-volatile image not clean: {}'.format(stdout))
p = self.vm1.run('echo test123 > /dev/xvde', user='root',
passio_popen=True)
p.wait()
self.assertNotEqual(p.returncode, 0,
'Write to read-only volume unexpectedly succeeded')
p = self.vm1.run(
user='root')
# Write to read-only volume unexpectedly succeeded
with self.assertRaises(subprocess.CalledProcessError):
yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde',
user='root')
# read-only volume modified
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'read-only volume modified: {}'.format(stdout))
user='root')
def test_003_snapshot(self):
'''Test snapshot volume data propagation'''
self.loop.run_until_complete(self._test_003_snapshot())
@asyncio.coroutine
def _test_003_snapshot(self):
size = 128 * 1024 * 1024
volume_config = {
'pool': self.pool.name,
@ -152,7 +160,7 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
volume_config = {
'pool': self.pool.name,
'size': size,
@ -162,57 +170,55 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol_snap = self.vm2.storage.init_volume('testvol', volume_config)
self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
yield from self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
self.app.save()
self.vm1.start()
self.vm2.start()
p = self.vm1.run(
yield from self.vm1.start()
yield from self.vm2.start()
# origin image not clean
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'origin image not clean: {}'.format(stdout))
user='root')
p = self.vm2.run(
# snapshot image not clean
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero | diff -q /dev/xvde -'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'snapshot image not clean: {}'.format(stdout))
user='root')
self.vm1.run('echo test123 > /dev/xvde && sync', user='root', wait=True)
p.wait()
self.assertEqual(p.returncode, 0,
'Write to read-write volume failed')
p = self.vm2.run(
# Write to read-write volume failed
yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde && sync',
user='root')
# origin changes propagated to snapshot too early
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'origin changes propagated to snapshot too early: {}'.format(
stdout))
self.vm1.shutdown(wait=True)
user='root')
yield from self.vm1.shutdown(wait=True)
# after origin shutdown there should be still no change
p = self.vm2.run(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'origin changes propagated to snapshot too early2: {}'.format(
stdout))
self.vm2.shutdown(wait=True)
self.vm2.start()
# only after target VM restart changes should be visible
p = self.vm2.run(
# origin changes propagated to snapshot too early2
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertNotEqual(p.returncode, 0,
'origin changes not visible in snapshot: {}'.format(stdout))
user='root')
yield from self.vm2.shutdown(wait=True)
yield from self.vm2.start()
# only after target VM restart changes should be visible
# origin changes not visible in snapshot
with self.assertRaises(subprocess.CalledProcessError):
yield from self.vm2.run(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(
size),
user='root')
def test_004_snapshot_non_persistent(self):
'''Test snapshot volume non-persistence'''
return self.loop.run_until_complete(
self._test_004_snapshot_non_persistent())
@asyncio.coroutine
def _test_004_snapshot_non_persistent(self):
size = 128 * 1024 * 1024
volume_config = {
'pool': self.pool.name,
@ -222,7 +228,7 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol = self.vm1.storage.init_volume('testvol', volume_config)
self.vm1.storage.get_pool(testvol).create(testvol)
yield from self.vm1.storage.get_pool(testvol).create(testvol)
volume_config = {
'pool': self.pool.name,
'size': size,
@ -232,30 +238,25 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
'rw': True,
}
testvol_snap = self.vm2.storage.init_volume('testvol', volume_config)
self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
yield from self.vm2.storage.get_pool(testvol_snap).create(testvol_snap)
self.app.save()
self.vm2.start()
yield from self.vm2.start()
p = self.vm2.run(
# snapshot image not clean
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero | diff -q /dev/xvde -'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'snapshot image not clean: {}'.format(stdout))
user='root')
self.vm2.run('echo test123 > /dev/xvde && sync', user='root', wait=True)
p.wait()
self.assertEqual(p.returncode, 0,
'Write to read-write snapshot volume failed')
self.vm2.shutdown(wait=True)
self.vm2.start()
p = self.vm2.run(
# Write to read-write snapshot volume failed
yield from self.vm2.run_for_stdio('echo test123 > /dev/xvde && sync',
user='root')
yield from self.vm2.shutdown(wait=True)
yield from self.vm2.start()
# changes on snapshot survived VM restart
yield from self.vm2.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root', passio_popen=True)
stdout, _ = p.communicate()
self.assertEqual(p.returncode, 0,
'changes on snapshot survived VM restart: {}'.format(
stdout))
user='root')
class StorageFile(StorageTestMixin, qubes.tests.QubesTestCase):

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,7 @@ import qubes.tests
@qubes.tests.skipUnlessDom0
class TC_00_AdminVM(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
try:
self.app = qubes.tests.vm.TestApp()
self.vm = qubes.vm.adminvm.AdminVM(self.app,

View File

@ -51,6 +51,7 @@ class TestVM(qubes.vm.BaseVM):
class TC_10_BaseVM(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.xml = lxml.etree.XML('''
<qubes version="3"> <!-- xmlns="https://qubes-os.org/QubesXML/1" -->
<labels>

View File

@ -60,6 +60,7 @@ class TestVM(object):
class TC_00_setters(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.vm = TestVM()
self.prop = TestProp()