diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 508e9e1e..6f8786a9 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -31,10 +31,12 @@ don't run the tests. """ +import asyncio import collections import functools import logging import os +import pathlib import shutil import subprocess import sys @@ -42,6 +44,7 @@ import tempfile import time import traceback import unittest +import warnings from distutils import spawn import lxml.etree @@ -223,6 +226,41 @@ class _AssertNotRaisesContext(object): self.exception = exc_value # store for later retrieval +class _QrexecPolicyContext(object): + '''Context manager for SystemTestsMixin.qrexec_policy''' + + def __init__(self, service, source, destination, allow=True): + try: + source = source.name + except AttributeError: + pass + + try: + destination = destination.name + except AttributeError: + pass + + self._filename = pathlib.Path('/etc/qubes-rpc/policy') / service + self._rule = '{} {} {}\n'.format(source, destination, + 'allow' if allow else 'deny') + + def _change(self, add=True): + with self._filename.open('r+') as policy: + policy_rules = policy.readlines() + if add: + policy_rules.insert(0, self._rule) + else: + policy_rules.remove(self._rule) + policy.truncate(0) + policy.seek(0) + policy.write(''.join(policy_rules)) + + def __enter__(self): + self._change(add=True) + return self + + def __exit__(self, exc_type, exc_value, tb): + self._change(add=False) class substitute_entry_points(object): '''Monkey-patch pkg_resources to substitute one group in iter_entry_points @@ -279,6 +317,8 @@ class QubesTestCase(unittest.TestCase): self.addTypeEqualityFunc(qubes.devices.DeviceManager, self.assertDevicesEqual) + self.loop = None + def __str__(self): return '{}/{}/{}'.format( @@ -287,9 +327,20 @@ class QubesTestCase(unittest.TestCase): self._testMethodName) + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + def tearDown(self): super(QubesTestCase, self).tearDown() + # The loop, when closing, throws a warning if there is + # some unfinished bussiness. Let's catch that. + with warnings.catch_warnings(): + warnings.simplefilter('error') + self.loop.close() + # TODO: find better way in py3 try: result = self._outcome.result @@ -749,20 +800,7 @@ class SystemTestsMixin(object): :return: """ - def add_remove_rule(add=True): - with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy: - policy_rules = policy.readlines() - rule = "{} {} {}\n".format(source, destination, - 'allow' if allow else 'deny') - if add: - policy_rules.insert(0, rule) - else: - policy_rules.remove(rule) - policy.truncate(0) - policy.seek(0) - policy.write(''.join(policy_rules)) - add_remove_rule(add=True) - self.addCleanup(add_remove_rule, add=False) + return _QrexecPolicyContext(service, source, destination, allow=allow) def wait_for_window(self, title, timeout=30, show=True): """ diff --git a/qubes/tests/app.py b/qubes/tests/app.py index 9de827ca..f9c6987f 100644 --- a/qubes/tests/app.py +++ b/qubes/tests/app.py @@ -37,6 +37,7 @@ class TestApp(qubes.tests.TestEmitter): class TC_30_VMCollection(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() self.app = TestApp() self.vms = qubes.app.VMCollection(self.app) diff --git a/qubes/tests/devices.py b/qubes/tests/devices.py index 06a79915..ff48800a 100644 --- a/qubes/tests/devices.py +++ b/qubes/tests/devices.py @@ -75,6 +75,7 @@ class TestVM(qubes.tests.TestEmitter): class TC_00_DeviceCollection(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() self.app = TestApp() self.emitter = TestVM(self.app, 'vm') self.app.domains['vm'] = self.emitter @@ -152,6 +153,7 @@ class TC_00_DeviceCollection(qubes.tests.QubesTestCase): class TC_01_DeviceManager(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() self.app = TestApp() self.emitter = TestVM(self.app, 'vm') self.manager = qubes.devices.DeviceManager(self.emitter) diff --git a/qubes/tests/init.py b/qubes/tests/init.py index 0662bae9..22c7cfa9 100644 --- a/qubes/tests/init.py +++ b/qubes/tests/init.py @@ -64,6 +64,7 @@ class TC_00_Label(qubes.tests.QubesTestCase): class TC_10_property(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() try: class MyTestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder): testprop1 = qubes.property('testprop1') @@ -206,6 +207,7 @@ class TestHolder(qubes.tests.TestEmitter, qubes.PropertyHolder): class TC_20_PropertyHolder(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() xml = lxml.etree.XML(''' @@ -314,6 +316,7 @@ class TestApp(qubes.tests.TestEmitter): class TC_30_VMCollection(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() self.app = TestApp() self.vms = qubes.app.VMCollection(self.app) diff --git a/qubes/tests/integ/backup.py b/qubes/tests/integ/backup.py index 5d85134e..90fcfc0d 100644 --- a/qubes/tests/integ/backup.py +++ b/qubes/tests/integ/backup.py @@ -500,14 +500,14 @@ class TC_10_BackupVMMixin(BackupTestsMixin): def test_100_send_to_vm_file_with_spaces(self): vms = self.create_backup_vms() self.backupvm.start() - self.backupvm.run("mkdir '/var/tmp/backup directory'", wait=True) + self.loop.run_until_complete(self.backupvm.run_for_stdio( + "mkdir '/var/tmp/backup directory'")) self.make_backup(vms, target_vm=self.backupvm, compressed=True, encrypted=True, target='/var/tmp/backup directory') self.remove_vms(reversed(vms)) - p = self.backupvm.run("ls /var/tmp/backup*/qubes-backup*", - passio_popen=True) - (backup_path, _) = p.communicate() + (backup_path, _) = self.loop.run_until_complete( + self.backupvm.run_for_stdio("ls /var/tmp/backup*/qubes-backup*")) backup_path = backup_path.decode().strip() self.restore_backup(source=backup_path, appvm=self.backupvm) @@ -530,7 +530,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin): """ vms = self.create_backup_vms() self.backupvm.start() - retcode = self.backupvm.run( + self.loop.run_until_complete(self.backupvm.run_for_stdio( # Debian 7 has too old losetup to handle loop-control device "mknod /dev/loop0 b 7 0;" "truncate -s 50M /home/user/backup.img && " @@ -538,9 +538,7 @@ class TC_10_BackupVMMixin(BackupTestsMixin): "mkdir /home/user/backup && " "mount /home/user/backup.img /home/user/backup -o loop &&" "chmod 777 /home/user/backup", - user="root", wait=True) - if retcode != 0: - raise RuntimeError("Failed to prepare backup directory") + user="root")) with self.assertRaises(qubes.exc.QubesException): self.make_backup(vms, target_vm=self.backupvm, compressed=False, encrypted=True, diff --git a/qubes/tests/integ/basic.py b/qubes/tests/integ/basic.py index 68c5c188..2689c471 100644 --- a/qubes/tests/integ/basic.py +++ b/qubes/tests/integ/basic.py @@ -24,6 +24,7 @@ from distutils import spawn +import asyncio import os import subprocess import tempfile @@ -74,7 +75,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname, template=self.app.default_template, label='red') - self.vm.create_on_disk() + self.loop.run_until_complete(self.vm.create_on_disk()) def save_and_reload_db(self): super(TC_01_Properties, self).save_and_reload_db() @@ -152,13 +153,13 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): name=self.make_vm_name("vm"), template=self.app.default_template, label='red') - testvm1.create_on_disk() + self.loop.run_until_complete(testvm1.create_on_disk()) testvm2 = self.app.add_new_vm(testvm1.__class__, name=self.make_vm_name("clone"), template=testvm1.template, label='red') testvm2.clone_properties(testvm1) - testvm2.clone_disk_files(testvm1) + self.loop.run_until_complete(testvm2.clone_disk_files(testvm1)) self.assertTrue(testvm1.storage.verify()) self.assertIn('source', testvm1.volumes['root'].config) self.assertNotEquals(testvm2, None) @@ -206,7 +207,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): template=testvm1.template, label='red',) testvm3.clone_properties(testvm1) - testvm3.clone_disk_files(testvm1) + self.loop.run_until_complete(testvm3.clone_disk_files(testvm1)) # qubes.xml reload self.save_and_reload_db() @@ -239,21 +240,21 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname, template=self.app.default_template, label='red') - self.vm2.create_on_disk() + self.loop.run_until_complete(self.vm2.create_on_disk()) def test_021_name_conflict_template(self): # TODO decide what exception should be here with self.assertRaises((qubes.exc.QubesException, ValueError)): self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM, name=self.vmname, label='red') - self.vm2.create_on_disk() + self.loop.run_until_complete(self.vm2.create_on_disk()) def test_030_rename_conflict_app(self): vm2name = self.make_vm_name('newname') self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vm2name, template=self.app.default_template, label='red') - self.vm2.create_on_disk() + self.loop.run_until_complete(self.vm2.create_on_disk()) with self.assertNotRaises(OSError): with self.assertRaises(qubes.exc.QubesException): @@ -272,7 +273,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): qubes.vm.appvm.AppVM, name=self.make_vm_name("vm"), label='red') - self.testvm.create_on_disk() + self.loop.run_until_complete(self.testvm.create_on_disk()) self.save_and_reload_db() def setup_hvm(self): @@ -281,7 +282,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): name=self.make_vm_name("hvm"), label='red') self.testvm.hvm = True - self.testvm.create_on_disk() + self.loop.run_until_complete(self.testvm.create_on_disk()) self.save_and_reload_db() def pref_set(self, name, value, valid=True): @@ -385,7 +386,8 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin, label='red' ) self.test_template.clone_properties(self.app.default_template) - self.test_template.clone_disk_files(self.app.default_template) + self.loop.run_until_complete( + self.test_template.clone_disk_files(self.app.default_template)) self.save_and_reload_db() def setup_hvm_template(self): @@ -395,7 +397,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin, label='red', hvm=True ) - self.test_template.create_on_disk() + self.loop.run_until_complete(self.test_template.create_on_disk()) self.save_and_reload_db() def get_rootimg_checksum(self): @@ -406,7 +408,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin, def _do_test(self): checksum_before = self.get_rootimg_checksum() - self.test_template.start() + self.loop.run_until_complete(self.test_template.start()) self.shutdown_and_wait(self.test_template) checksum_changed = self.get_rootimg_checksum() if checksum_before == checksum_changed: @@ -449,18 +451,19 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): def test_000_clipboard(self): testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm1'), label='red') - testvm1.create_on_disk() + self.loop.run_until_complete(testvm1.create_on_disk()) testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm2'), label='red') - testvm2.create_on_disk() + self.loop.run_until_complete(testvm2.create_on_disk()) self.app.save() - testvm1.start() - testvm2.start() + self.loop.run_until_complete(asyncio.wait([ + testvm1.start(), + testvm2.start()])) window_title = 'user@{}'.format(testvm1.name) - testvm1.run('zenity --text-info --editable --title={}'.format( - window_title)) + self.loop.run_until_complete(testvm1.run( + 'zenity --text-info --editable --title={}'.format(window_title))) self.wait_for_window(window_title) time.sleep(0.5) @@ -491,17 +494,17 @@ class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): # Then paste it to the other window window_title = 'user@{}'.format(testvm2.name) - p = testvm2.run('zenity --entry --title={} > test.txt'.format( - window_title), passio_popen=True) + p = self.loop.run_until_complete(testvm2.run( + 'zenity --entry --title={} > test.txt'.format(window_title))) self.wait_for_window(window_title) subprocess.check_call(['xdotool', 'key', '--delay', '100', 'ctrl+shift+v', 'ctrl+v', 'Return']) - p.wait() + self.loop.run_until_complete(p.wait()) # And compare the result - (test_output, _) = testvm2.run('cat test.txt', - passio_popen=True).communicate() + (test_output, _) = self.loop.run_until_complete( + testvm2.run_for_stdio('cat test.txt')) self.assertEquals(test_string, test_output.strip().decode('ascii')) clipboard_content = \ @@ -523,24 +526,26 @@ class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase def test_000_create_start(self): testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, name=self.make_vm_name('vm1'), label='red') - testvm1.clone_disk_files(self.app.default_template) + self.loop.run_until_complete( + testvm1.clone_disk_files(self.app.default_template)) self.app.save() - testvm1.start() + self.loop.run_until_complete(testvm1.start()) self.assertEquals(testvm1.get_power_state(), "Running") @unittest.expectedFailure def test_100_resize_root_img(self): testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, name=self.make_vm_name('vm1'), label='red') - testvm1.clone_disk_files(self.app.default_template) + self.loop.run_until_complete( + testvm1.clone_disk_files(self.app.default_template)) self.app.save() - testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3) + self.loop.run_until_complete( + testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3)) self.assertEquals(testvm1.volumes['root'].size, 20 * 1024 ** 3) - testvm1.start() - p = testvm1.run('df --output=size /|tail -n 1', - passio_popen=True) + self.loop.run_until_complete(testvm1.start()) # new_size in 1k-blocks - (new_size, _) = p.communicate() + (new_size, _) = self.loop.run_until_complete( + testvm1.run_for_stdio('df --output=size /|tail -n 1')) # some safety margin for FS metadata self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2) diff --git a/qubes/tests/integ/dom0_update.py b/qubes/tests/integ/dom0_update.py index 77412d48..e0fc1b70 100644 --- a/qubes/tests/integ/dom0_update.py +++ b/qubes/tests/integ/dom0_update.py @@ -108,14 +108,14 @@ enabled = 1 name=self.make_vm_name("updatevm"), label='red' ) - self.updatevm.create_on_disk() + self.loop.run_until_complete(self.updatevm.create_on_disk()) self.app.updatevm = self.updatevm self.app.save() subprocess.call(['sudo', 'rpm', '-e', self.pkg_name], stderr=open(os.devnull, 'w')) subprocess.check_call(['sudo', 'rpm', '--import', os.path.join(self.tmpdir, 'pubkey.asc')]) - self.updatevm.start() + self.loop.run_until_complete(self.updatevm.start()) self.repo_running = False def tearDown(self): @@ -170,26 +170,28 @@ Test package return pkg_path def send_pkg(self, filename): - p = self.updatevm.run('mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format( - os.path.basename( - filename)), passio_popen=True) - p.stdin.write(open(filename, 'rb').read()) - p.stdin.close() - p.wait() - retcode = self.updatevm.run('cd /tmp/repo; createrepo .', wait=True) - if retcode == 127: - self.skipTest("createrepo not installed in template {}".format( - self.template)) - elif retcode != 0: - self.skipTest("createrepo failed with code {}, cannot perform the " - "test".format(retcode)) + self.loop.run_until_complete(self.updatevm.run_for_stdio( + 'mkdir -p /tmp/repo; cat > /tmp/repo/{}'.format( + os.path.basename(filename)), + input=open(filename, 'rb').read())) + try: + self.loop.run_until_complete( + self.updatevm.run_for_stdio('cd /tmp/repo; createrepo .')) + except subprocess.CalledProcessError as e: + if e.returncode == 127: + self.skipTest('createrepo not installed in template {}'.format( + self.template)) + else: + self.skipTest('createrepo failed with code {}, ' + 'cannot perform the test'.format(retcode)) self.start_repo() def start_repo(self): - if not self.repo_running: - self.updatevm.run("cd /tmp/repo &&" - "python -m SimpleHTTPServer 8080") - self.repo_running = True + if self.repo_running: + return + self.loop.run_until_complete(self.updatevm.run( + 'cd /tmp/repo && python -m SimpleHTTPServer 8080')) + self.repo_running = True def test_000_update(self): """Dom0 update tests diff --git a/qubes/tests/integ/network.py b/qubes/tests/integ/network.py index 7207f0dc..18f04500 100644 --- a/qubes/tests/integ/network.py +++ b/qubes/tests/integ/network.py @@ -22,6 +22,7 @@ from distutils import spawn +import asyncio import multiprocessing import os import subprocess @@ -35,7 +36,6 @@ class NcVersion: Trad = 1 Nmap = 2 - # noinspection PyAttributeOutsideInit class VmNetworkingMixin(qubes.tests.SystemTestsMixin): test_ip = '192.168.123.45' @@ -49,10 +49,11 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): template = None def run_cmd(self, vm, cmd, user="root"): - p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True) - p.stdin.close() - p.stdout.read().decode() - return p.wait() + try: + self.loop.run_until_complete(vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + return e.returncode + return 0 def setUp(self): super(VmNetworkingMixin, self).setUp() @@ -81,11 +82,12 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.fail("Command '%s' failed" % cmd) if not self.testnetvm.is_running(): - self.testnetvm.start() + self.loop.run_until_complete(self.testnetvm.start()) # Ensure that dnsmasq is installed: - p = self.testnetvm.run("dnsmasq --version", user="root", - passio_popen=True) - if p.wait() != 0: + try: + self.loop.run_until_complete(self.testnetvm.run_for_stdio( + 'dnsmasq --version', user='root')) + except subprocess.CalledProcessError: self.skipTest("dnsmasq not installed") run_netvm_cmd("ip link add test0 type dummy") @@ -102,7 +104,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): def test_000_simple_networking(self): - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) @@ -113,11 +115,11 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): label='red') self.proxy.provides_network = True self.proxy.netvm = self.testnetvm - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.testvm1.netvm = self.proxy self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, "Ping by IP from ProxyVM failed") @@ -137,13 +139,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): name=self.make_vm_name('proxy'), label='red') self.proxy.provides_network = True - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.netvm = self.testnetvm self.proxy.features['network-manager'] = True self.testvm1.netvm = self.proxy self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, "Ping by IP failed") @@ -182,7 +184,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): name=self.make_vm_name('proxy'), label='red') self.proxy.provides_network = True - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy self.app.save() @@ -196,14 +198,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.testvm1.firewall.policy = 'drop' self.testvm1.firewall.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - if nc_version == NcVersion.Nmap: - self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234") - else: - self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do " - "true; done") + self.loop.run_until_complete(self.testnetvm.run_for_stdio( + 'nc -l --send-only -e /bin/hostname -k 1234' + if nc_version == NcVersion.Nmap + else 'while nc -l -e /bin/hostname -p 1234; do true; done')) self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, "Ping by IP from ProxyVM failed") @@ -278,7 +279,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('proxy'), label='red') - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.provides_network = True self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy @@ -286,12 +287,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm3'), label='red') - self.testvm2.create_on_disk() + self.loop.run_until_complete(self.testvm2.create_on_disk()) self.testvm2.netvm = self.proxy self.app.save() - self.testvm1.start() - self.testvm2.start() + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_cmd.format(target=self.testvm2.ip)), 0) @@ -312,14 +314,14 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): def test_050_spoof_ip(self): """Test if VM IP spoofing is blocked""" - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) - self.testvm1.run("ip addr flush dev eth0", user="root", wait=True) - self.testvm1.run("ip addr add 10.137.1.128/24 dev eth0", user="root", - wait=True) - self.testvm1.run("ip route add default dev eth0", user="root", - wait=True) + self.loop.run_until_complete(self.testvm1.run_for_stdio(''' + ip addr flush dev eth0 + ip addr add 10.137.1.128/24 dev eth0 + ip route add default dev eth0 + ''', user='root')) self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, "Spoofed ping should be blocked") @@ -329,7 +331,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): cmd = "systemctl stop xendriverdomain" if self.run_cmd(self.testnetvm, cmd) != 0: self.fail("Command '%s' failed" % cmd) - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) cmd = "systemctl start xendriverdomain" if self.run_cmd(self.testnetvm, cmd) != 0: @@ -343,24 +345,26 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.testvm1.features['net/fake-gateway'] = '192.168.1.1' self.testvm1.features['net/fake-netmask'] = '255.255.255.0' self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) - p = self.testvm1.run('ip addr show dev eth0', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + + try: + (output, _) = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'ip addr show dev eth0', user='root')) + except subprocess.CalledProcessError: + self.fail('ip addr show dev eth0 failed') + self.assertIn('192.168.1.128', output) self.assertNotIn(self.testvm1.ip, output) - p = self.testvm1.run('ip route show', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip route show failed') + try: + (output, _) = self.loop.run_until_complete( + self.testvm1.run_for_stdio('ip route show', user='root')) + except subprocess.CalledProcessError: + self.fail('ip route show failed') + self.assertIn('192.168.1.1', output) self.assertNotIn(self.testvm1.netvm.ip, output) @@ -368,15 +372,17 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): '''Test hiding VM real IP''' self.testvm1.features['net/fake-ip'] = '192.168.1.128' self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) - p = self.testvm1.run('ip addr show dev eth0', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + + try: + (output, _) = self.loop.run_until_complete( + self.testvm1.run_for_stdio('ip addr show dev eth0', + user='root')) + except subprocess.CalledProcessError: + self.fail('ip addr show dev eth0 failed') + self.assertIn('192.168.1.128', output) self.assertNotIn(self.testvm1.ip, output) @@ -390,7 +396,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): name=self.make_vm_name('proxy'), label='red') self.proxy.provides_network = True - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy self.app.save() @@ -408,14 +414,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): qubes.firewall.Rule(None, action='accept', specialtarget='dns'), ] self.testvm1.firewall.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - if nc_version == NcVersion.Nmap: - self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234") - else: - self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do " - "true; done") + self.loop.run_until_complete(self.testnetvm.run_for_stdio( + 'nc -l --send-only -e /bin/hostname -k 1234' + if nc_version == NcVersion.Nmap + else 'while nc -l -e /bin/hostname -p 1234; do true; done')) self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, "Ping by IP from ProxyVM failed") @@ -437,7 +442,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('proxy'), label='red') - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.provides_network = True self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy @@ -448,31 +453,36 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm3'), label='red') - self.testvm2.create_on_disk() + self.loop.run_until_complete(self.testvm2.create_on_disk()) self.testvm2.netvm = self.proxy self.app.save() - self.testvm1.start() - self.testvm2.start() + self.loop.run_until_complete(self.testvm1.start()) + self.loop.run_until_complete(self.testvm2.start()) - cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format( - self.testvm2.ip, self.testvm1.ip) - retcode = self.proxy.run(cmd, user='root', wait=True) - self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode)) + try: + self.loop.run_until_complete(self.proxy.run_for_stdio( + 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format( + self.testvm2.ip, self.testvm1.ip), user='root')) + except subprocess.CalledProcessError as e: + self.fail('{} failed with: {}'.format(cmd, e.returncode)) - cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format( - self.testvm2.ip) - retcode = self.testvm1.run(cmd, user='root', wait=True) - self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode)) + try: + self.loop.run_until_complete(self.proxy.run_for_stdio( + 'iptables -I INPUT -s {} -j ACCEPT'.format( + self.testvm2.ip), user='root')) + except subprocess.CalledProcessError as e: + self.fail('{} failed with: {}'.format(cmd, e.returncode)) self.assertEqual(self.run_cmd(self.testvm2, self.ping_cmd.format(target=self.testvm1.ip)), 0) - cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip) - p = self.testvm1.run(cmd, user='root', passio_popen=True) - (stdout, _) = p.communicate() - self.assertEqual(p.returncode, 0, - '{} failed with {}'.format(cmd, p.returncode)) + try: + (stdout, _) = self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip), user='root')) + except subprocess.CalledProcessError as e: + self.fail( + '{} failed with {}'.format(cmd, e.returncode)) self.assertNotEqual(stdout.decode().split()[0], '0', 'Packets didn\'t managed to the VM') @@ -481,7 +491,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('proxy'), label='red') - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.provides_network = True self.proxy.netvm = self.testnetvm self.proxy.features['net/fake-ip'] = '192.168.1.128' @@ -489,7 +499,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.proxy.features['net/fake-netmask'] = '255.255.255.0' self.testvm1.netvm = self.proxy self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0) @@ -497,39 +507,39 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) - p = self.proxy.run('ip addr show dev eth0', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + try: + (output, _) = self.loop.run_until_complete( + self.proxy.run_for_stdio( + 'ip addr show dev eth0', user='root')) + except subprocess.CalledProcessError as e: + self.fail('ip addr show dev eth0 failed') self.assertIn('192.168.1.128', output) self.assertNotIn(self.testvm1.ip, output) - p = self.proxy.run('ip route show', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip route show failed') + try: + (output, _) = self.loop.run_until_complete( + self.proxy.run_for_stdio( + 'ip route show', user='root')) + except subprocess.CalledProcessError as e: + self.fail('ip route show failed') self.assertIn('192.168.1.1', output) self.assertNotIn(self.testvm1.netvm.ip, output) - p = self.testvm1.run('ip addr show dev eth0', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + try: + (output, _) = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'ip addr show dev eth0', user='root')) + except subprocess.CalledProcessError as e: + self.fail('ip addr show dev eth0 failed') self.assertNotIn('192.168.1.128', output) self.assertIn(self.testvm1.ip, output) - p = self.testvm1.run('ip route show', user='root', - passio_popen=True, - ignore_stderr=True) - p.stdin.close() - output = p.stdout.read().decode() - self.assertEqual(p.wait(), 0, 'ip route show failed') + try: + (output, _) = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'ip route show', user='root')) + except subprocess.CalledProcessError as e: + self.fail('ip route show failed') self.assertIn('192.168.1.128', output) self.assertNotIn(self.proxy.ip, output) @@ -537,7 +547,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): '''Custom AppVM IP''' self.testvm1.ip = '192.168.1.1' self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) @@ -546,14 +556,14 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('proxy'), label='red') - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.provides_network = True self.proxy.netvm = self.testnetvm self.proxy.ip = '192.168.1.1' self.testvm1.netvm = self.proxy self.app.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) @@ -566,7 +576,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): name=self.make_vm_name('proxy'), label='red') self.proxy.provides_network = True - self.proxy.create_on_disk() + self.loop.run_until_complete(self.proxy.create_on_disk()) self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy self.app.save() @@ -584,14 +594,13 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): qubes.firewall.Rule(None, action='accept', specialtarget='dns'), ] self.testvm1.firewall.save() - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - if nc_version == NcVersion.Nmap: - self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234") - else: - self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do " - "true; done") + self.loop.run_until_complete(self.testnetvm.run_for_stdio( + 'nc -l --send-only -e /bin/hostname -k 1234' + if nc_version == NcVersion.Nmap + else 'while nc -l -e /bin/hostname -p 1234; do true; done')) self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, "Ping by IP from ProxyVM failed") @@ -686,10 +695,11 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin): ) def run_cmd(self, vm, cmd, user="root"): - p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True) - p.stdin.close() - p.stdout.read().decode() - return p.wait() + try: + self.loop.run_until_complete(vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + return e.returncode + return 0 def setUp(self): super(VmUpdatesMixin, self).setUp() @@ -724,116 +734,95 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin): qubes.vm.appvm.AppVM, name=self.make_vm_name('vm1'), label='red') - self.testvm1.create_on_disk() + self.loop.run_until_complete(self.testvm1.create_on_disk()) def test_000_simple_update(self): self.save_and_reload_db() # reload the VM to have all the properties properly set (especially # default netvm) self.testvm1 = self.app.domains[self.testvm1.qid] - self.testvm1.start() - p = self.testvm1.run(self.update_cmd, wait=True, user="root", - passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - self.assertIn(p.wait(), self.exit_code_ok, - "{}: {}\n{}".format(self.update_cmd, stdout, stderr) - ) + self.loop.run_until_complete(self.testvm1.start()) + p = self.loop.run_until_complete( + self.testvm1.run(self.update_cmd, user='root')) + (stdout, stderr) = self.loop.run_until_complete(p.communicate()) + self.assertIn(p.returncode, self.exit_code_ok, + '{}: {}\n{}'.format(self.update_cmd, stdout, stderr)) def create_repo_apt(self): pkg_file_name = "test-pkg_1.0-1_amd64.deb" - p = self.netvm_repo.run("mkdir /tmp/apt-repo && cd /tmp/apt-repo &&" - "base64 -d | zcat > {}".format(pkg_file_name), - passio_popen=True) - p.stdin.write(self.DEB_PACKAGE_GZIP_BASE64) - p.stdin.close() - if p.wait() != 0: - raise RuntimeError("Failed to write {}".format(pkg_file_name)) + self.loop.run_until_complete(self.netvm_repo.run_for_stdio(''' + mkdir /tmp/apt-repo \ + && cd /tmp/apt-repo \ + && base64 -d | zcat > {} + '''.format(pkg_file_name), + input=self.DEB_PACKAGE_GZIP_BASE64)) # do not assume dpkg-scanpackage installed packages_path = "dists/test/main/binary-amd64/Packages" - p = self.netvm_repo.run( - "mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 && " - "cd /tmp/apt-repo && " - "cat > {packages} && " - "echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ')" - " >> {packages} && " - "echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ')" - " >> {packages} && " - "echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ')" - " >> {packages} && " - "gzip < {packages} > {packages}.gz".format(pkg=pkg_file_name, - packages=packages_path), - passio_popen=True, passio_stderr=True) - p.stdin.write( - "Package: test-pkg\n" - "Version: 1.0-1\n" - "Architecture: amd64\n" - "Maintainer: unknown \n" - "Installed-Size: 25\n" - "Filename: {pkg}\n" - "Size: 994\n" - "Section: unknown\n" - "Priority: optional\n" - "Description: Test package\n".format(pkg=pkg_file_name) - ) - p.stdin.close() - if p.wait() != 0: - raise RuntimeError("Failed to write Packages file: {}".format( - p.stderr.read().decode())) + self.loop.run_until_complete(self.netvm_repo.run_for_stdio(''' + mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 \ + && cd /tmp/apt-repo \ + && cat > {packages} \ + && echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ') \ + >> {packages} \ + && echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ') \ + >> {packages} \ + && echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ') \ + >> {packages} \ + && gzip < {packages} > {packages}.gz + '''.format(pkg=pkg_file_name, packages=packages_path), + input='''\ +Package: test-pkg +Version: 1.0-1 +Architecture: amd64 +Maintainer: unknown +Installed-Size: 25 +Filename: {pkg} +Size: 994 +Section: unknown +Priority: optional +Description: Test package'''.format(pkg=pkg_file_name).encode('utf-8'))) - p = self.netvm_repo.run( - "mkdir -p /tmp/apt-repo/dists/test && " - "cd /tmp/apt-repo/dists/test && " - "cat > Release && " - "echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}" - " >> Release && " - "echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}" - " >> Release" - .format(p="main/binary-amd64/Packages", - z="main/binary-amd64/Packages.gz"), - passio_popen=True, passio_stderr=True - ) - p.stdin.write( - "Label: Test repo\n" - "Suite: test\n" - "Codename: test\n" - "Date: Tue, 27 Oct 2015 03:22:09 UTC\n" - "Architectures: amd64\n" - "Components: main\n" - "SHA256:\n" - ) - p.stdin.close() - if p.wait() != 0: - raise RuntimeError("Failed to write Release file: {}".format( - p.stderr.read().decode())) + self.loop.run_until_complete(self.netvm_repo.run_for_stdio(''' + mkdir -p /tmp/apt-repo/dists/test \ + && cd /tmp/apt-repo/dists/test \ + && cat > Release \ + && echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}\ + >> Release \ + && echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}\ + >> Release + '''.format(p='main/binary-amd64/Packages', + z='main/binary-amd64/Packages.gz'), + input='''\ +Label: Test repo +Suite: test +Codename: test +Date: Tue, 27 Oct 2015 03:22:09 UTC +Architectures: amd64 +Components: main +SHA256: +''')) def create_repo_yum(self): pkg_file_name = "test-pkg-1.0-1.fc21.x86_64.rpm" - p = self.netvm_repo.run("mkdir /tmp/yum-repo && cd /tmp/yum-repo &&" - "base64 -d | zcat > {}".format(pkg_file_name), - passio_popen=True, passio_stderr=True) - p.stdin.write(self.RPM_PACKAGE_GZIP_BASE64) - p.stdin.close() - if p.wait() != 0: - raise RuntimeError("Failed to write {}: {}".format(pkg_file_name, - p.stderr.read().decode())) + self.loop.run_until_complete(self.netvm_repo.run_for_stdio(''' + mkdir /tmp/yum-repo \ + && cd /tmp/yum-repo \ + && base64 -d | zcat > {} + '''.format(pkg_file_name), input=self.RPM_PACKAGE_GZIP_BASE64)) # createrepo is installed by default in Fedora template - p = self.netvm_repo.run("createrepo /tmp/yum-repo", - passio_popen=True, - passio_stderr=True) - if p.wait() != 0: - raise RuntimeError("Failed to create yum metadata: {}".format( - p.stderr.read().decode())) + self.loop.run_until_complete(self.netvm_repo.run_for_stdio( + 'createrepo /tmp/yum-repo')) def create_repo_and_serve(self): if self.template.count("debian") or self.template.count("whonix"): self.create_repo_apt() - self.netvm_repo.run("cd /tmp/apt-repo &&" - "python -m SimpleHTTPServer 8080") + self.loop.run_until_complete(self.netvm_repo.run( + 'cd /tmp/apt-repo && python -m SimpleHTTPServer 8080')) elif self.template.count("fedora"): self.create_repo_yum() - self.netvm_repo.run("cd /tmp/yum-repo &&" - "python -m SimpleHTTPServer 8080") + self.loop.run_until_complete(self.netvm_repo.run( + 'cd /tmp/yum-repo && python -m SimpleHTTPServer 8080')) else: # not reachable... self.skipTest("Template {} not supported by this test".format( @@ -848,13 +837,13 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin): """ if self.template.count("debian") or self.template.count("whonix"): - self.testvm1.run( + self.loop.run_until_complete(self.testvm1.run_for_stdio( "rm -f /etc/apt/sources.list.d/* &&" "echo 'deb [trusted=yes] http://localhost:8080 test main' " "> /etc/apt/sources.list", - user="root") + user="root")) elif self.template.count("fedora"): - self.testvm1.run( + self.loop.run_until_complete(self.testvm1.run_for_stdio( "rm -f /etc/yum.repos.d/*.repo &&" "echo '[test]' > /etc/yum.repos.d/test.repo &&" "echo 'name=Test repo' >> /etc/yum.repos.d/test.repo &&" @@ -862,7 +851,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin): "echo 'baseurl=http://localhost:8080/'" " >> /etc/yum.repos.d/test.repo", user="root" - ) + )) else: # not reachable... self.skipTest("Template {} not supported by this test".format( @@ -881,7 +870,7 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin): name=self.make_vm_name('net'), label='red') self.netvm_repo.provides_network = True - self.netvm_repo.create_on_disk() + self.loop.run_until_complete(self.netvm_repo.create_on_disk()) self.testvm1.netvm = self.netvm_repo # NetVM should have qubes-updates-proxy enabled by default #self.netvm_repo.features['qubes-updates-proxy'] = True @@ -890,38 +879,33 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin): self.app.save() # Setup test repo - self.netvm_repo.start() + self.loop.run_until_complete(self.netvm_repo.start()) self.create_repo_and_serve() # Configure local repo - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.configure_test_repo() # update repository metadata - p = self.testvm1.run(self.update_cmd, wait=True, user="root", - passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - self.assertIn(p.wait(), self.exit_code_ok, - "{}: {}\n{}".format(self.update_cmd, stdout, stderr) - ) + p = self.loop.run_until_complete(self.testvm1.run( + self.update_cmd, user='root')) + (stdout, stderr) = self.loop.run_until_complete(p.communicate()) + self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok, + '{}: {}\n{}'.format(self.update_cmd, stdout, stderr)) # install test package - p = self.testvm1.run(self.install_cmd.format('test-pkg'), - wait=True, user="root", - passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - self.assertIn(p.wait(), self.exit_code_ok, - "{}: {}\n{}".format(self.update_cmd, stdout, stderr) - ) + p = self.loop.run_until_complete(self.testvm1.run( + self.install_cmd.format('test-pkg'), user='root')) + (stdout, stderr) = self.loop.run_until_complete(p.communicate()) + self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok, + '{}: {}\n{}'.format(self.update_cmd, stdout, stderr)) # verify if it was really installed - p = self.testvm1.run(self.install_test_cmd.format('test-pkg'), - wait=True, user="root", - passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - self.assertIn(p.wait(), self.exit_code_ok, - "{}: {}\n{}".format(self.update_cmd, stdout, stderr) - ) + p = self.loop.run_until_complete(self.testvm1.run( + self.install_test_cmd.format('test-pkg'), user='root')) + (stdout, stderr) = self.loop.run_until_complete(p.communicate()) + self.assertIn(self.loop.run_until_complete(p.wait()), self.exit_code_ok, + '{}: {}\n{}'.format(self.update_cmd, stdout, stderr)) def load_tests(loader, tests, pattern): try: diff --git a/qubes/tests/integ/storage.py b/qubes/tests/integ/storage.py index a4ab4596..f06f82c6 100644 --- a/qubes/tests/integ/storage.py +++ b/qubes/tests/integ/storage.py @@ -19,8 +19,8 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # +import asyncio import os - import shutil import qubes.storage.lvm @@ -51,6 +51,10 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): def test_000_volatile(self): '''Test if volatile volume is really volatile''' + return self.loop.run_until_complete(self._test_000_volatile()) + + @asyncio.coroutine + def _test_000_volatile(self): size = 32*1024*1024 volume_config = { 'pool': self.pool.name, @@ -60,27 +64,29 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'rw': True, } testvol = self.vm1.storage.init_volume('testvol', volume_config) - self.vm1.storage.get_pool(testvol).create(testvol) + yield from self.vm1.storage.get_pool(testvol).create(testvol) self.app.save() - self.vm1.start() - p = self.vm1.run( + yield from (self.vm1.start()) + + # volatile image not clean + yield from (self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'volatile image not clean: {}'.format(stdout)) - self.vm1.run('echo test123 > /dev/xvde', user='root', wait=True) - self.vm1.shutdown(wait=True) - self.vm1.start() - p = self.vm1.run( + user='root')) + # volatile image not volatile + yield from ( + self.vm1.run_for_stdio('echo test123 > /dev/xvde', user='root')) + yield from (self.vm1.shutdown(wait=True)) + yield from (self.vm1.start()) + yield from (self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'volatile image not volatile: {}'.format(stdout)) + user='root')) def test_001_non_volatile(self): '''Test if non-volatile volume is really non-volatile''' + return self.loop.run_until_complete(self._test_001_non_volatile()) + + @asyncio.coroutine + def _test_001_non_volatile(self): size = 32*1024*1024 volume_config = { 'pool': self.pool.name, @@ -89,28 +95,31 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'save_on_stop': True, 'rw': True, } - testvol = self.vm1.storage.init_volume('testvol', volume_config) - self.vm1.storage.get_pool(testvol).create(testvol) + testvol = yield from self.vm1.storage.init_volume( + 'testvol', volume_config) + yield from self.vm1.storage.get_pool(testvol).create(testvol) self.app.save() - self.vm1.start() - p = self.vm1.run( + yield from self.vm1.start() + # non-volatile image not clean + yield from self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'non-volatile image not clean: {}'.format(stdout)) - self.vm1.run('echo test123 > /dev/xvde', user='root', wait=True) - self.vm1.shutdown(wait=True) - self.vm1.start() - p = self.vm1.run( + user='root') + + yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde', + user='root') + yield from self.vm1.shutdown(wait=True) + yield from self.vm1.start() + # non-volatile image volatile + yield from self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertNotEqual(p.returncode, 0, - 'non-volatile image volatile: {}'.format(stdout)) + user='root') def test_002_read_only(self): '''Test read-only volume''' + self.loop.run_until_complete(self._test_002_read_only()) + + @asyncio.coroutine + def _test_002_read_only(self): size = 32 * 1024 * 1024 volume_config = { 'pool': self.pool.name, @@ -120,29 +129,28 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'rw': False, } testvol = self.vm1.storage.init_volume('testvol', volume_config) - self.vm1.storage.get_pool(testvol).create(testvol) + yield from self.vm1.storage.get_pool(testvol).create(testvol) self.app.save() - self.vm1.start() - p = self.vm1.run( + yield from self.vm1.start() + # non-volatile image not clean + yield from self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'non-volatile image not clean: {}'.format(stdout)) - p = self.vm1.run('echo test123 > /dev/xvde', user='root', - passio_popen=True) - p.wait() - self.assertNotEqual(p.returncode, 0, - 'Write to read-only volume unexpectedly succeeded') - p = self.vm1.run( + user='root') + # Write to read-only volume unexpectedly succeeded + with self.assertRaises(subprocess.CalledProcessError): + yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde', + user='root') + # read-only volume modified + yield from self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'read-only volume modified: {}'.format(stdout)) + user='root') def test_003_snapshot(self): '''Test snapshot volume data propagation''' + self.loop.run_until_complete(self._test_003_snapshot()) + + @asyncio.coroutine + def _test_003_snapshot(self): size = 128 * 1024 * 1024 volume_config = { 'pool': self.pool.name, @@ -152,7 +160,7 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'rw': True, } testvol = self.vm1.storage.init_volume('testvol', volume_config) - self.vm1.storage.get_pool(testvol).create(testvol) + yield from self.vm1.storage.get_pool(testvol).create(testvol) volume_config = { 'pool': self.pool.name, 'size': size, @@ -162,57 +170,55 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'rw': True, } testvol_snap = self.vm2.storage.init_volume('testvol', volume_config) - self.vm2.storage.get_pool(testvol_snap).create(testvol_snap) + yield from self.vm2.storage.get_pool(testvol_snap).create(testvol_snap) self.app.save() - self.vm1.start() - self.vm2.start() - p = self.vm1.run( + yield from self.vm1.start() + yield from self.vm2.start() + # origin image not clean + yield from self.vm1.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'origin image not clean: {}'.format(stdout)) + user='root') - p = self.vm2.run( + # snapshot image not clean + yield from self.vm2.run_for_stdio( 'head -c {} /dev/zero | diff -q /dev/xvde -'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'snapshot image not clean: {}'.format(stdout)) + user='root') - self.vm1.run('echo test123 > /dev/xvde && sync', user='root', wait=True) - p.wait() - self.assertEqual(p.returncode, 0, - 'Write to read-write volume failed') - p = self.vm2.run( + # Write to read-write volume failed + yield from self.vm1.run_for_stdio('echo test123 > /dev/xvde && sync', + user='root') + # origin changes propagated to snapshot too early + yield from self.vm2.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'origin changes propagated to snapshot too early: {}'.format( - stdout)) - self.vm1.shutdown(wait=True) + user='root') + yield from self.vm1.shutdown(wait=True) + # after origin shutdown there should be still no change - p = self.vm2.run( - 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'origin changes propagated to snapshot too early2: {}'.format( - stdout)) - self.vm2.shutdown(wait=True) - self.vm2.start() - # only after target VM restart changes should be visible - p = self.vm2.run( + # origin changes propagated to snapshot too early2 + yield from self.vm2.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertNotEqual(p.returncode, 0, - 'origin changes not visible in snapshot: {}'.format(stdout)) + user='root') + + yield from self.vm2.shutdown(wait=True) + yield from self.vm2.start() + + # only after target VM restart changes should be visible + + # origin changes not visible in snapshot + with self.assertRaises(subprocess.CalledProcessError): + yield from self.vm2.run( + 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format( + size), + user='root') def test_004_snapshot_non_persistent(self): '''Test snapshot volume non-persistence''' + return self.loop.run_until_complete( + self._test_004_snapshot_non_persistent()) + + @asyncio.coroutine + def _test_004_snapshot_non_persistent(self): size = 128 * 1024 * 1024 volume_config = { 'pool': self.pool.name, @@ -222,7 +228,7 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'rw': True, } testvol = self.vm1.storage.init_volume('testvol', volume_config) - self.vm1.storage.get_pool(testvol).create(testvol) + yield from self.vm1.storage.get_pool(testvol).create(testvol) volume_config = { 'pool': self.pool.name, 'size': size, @@ -232,30 +238,25 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): 'rw': True, } testvol_snap = self.vm2.storage.init_volume('testvol', volume_config) - self.vm2.storage.get_pool(testvol_snap).create(testvol_snap) + yield from self.vm2.storage.get_pool(testvol_snap).create(testvol_snap) self.app.save() - self.vm2.start() + yield from self.vm2.start() - p = self.vm2.run( + # snapshot image not clean + yield from self.vm2.run_for_stdio( 'head -c {} /dev/zero | diff -q /dev/xvde -'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'snapshot image not clean: {}'.format(stdout)) + user='root') - self.vm2.run('echo test123 > /dev/xvde && sync', user='root', wait=True) - p.wait() - self.assertEqual(p.returncode, 0, - 'Write to read-write snapshot volume failed') - self.vm2.shutdown(wait=True) - self.vm2.start() - p = self.vm2.run( + # Write to read-write snapshot volume failed + yield from self.vm2.run_for_stdio('echo test123 > /dev/xvde && sync', + user='root') + yield from self.vm2.shutdown(wait=True) + yield from self.vm2.start() + + # changes on snapshot survived VM restart + yield from self.vm2.run_for_stdio( 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root', passio_popen=True) - stdout, _ = p.communicate() - self.assertEqual(p.returncode, 0, - 'changes on snapshot survived VM restart: {}'.format( - stdout)) + user='root') class StorageFile(StorageTestMixin, qubes.tests.QubesTestCase): diff --git a/qubes/tests/integ/vm_qrexec_gui.py b/qubes/tests/integ/vm_qrexec_gui.py index ce336292..b03e5fca 100644 --- a/qubes/tests/integ/vm_qrexec_gui.py +++ b/qubes/tests/integ/vm_qrexec_gui.py @@ -22,11 +22,13 @@ from distutils import spawn +import asyncio import multiprocessing import os +import shlex import subprocess -import unittest import time +import unittest import qubes.config import qubes.tests @@ -48,19 +50,29 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): label='red', name=self.make_vm_name('vm1'), template=self.app.domains[self.template]) - self.testvm1.create_on_disk() + self.loop.run_until_complete(self.testvm1.create_on_disk()) self.testvm2 = self.app.add_new_vm( qubes.vm.appvm.AppVM, label='red', name=self.make_vm_name('vm2'), template=self.app.domains[self.template]) - self.testvm2.create_on_disk() + self.loop.run_until_complete(self.testvm2.create_on_disk()) self.app.save() + def create_local_file(self, filename, content, mode='w'): + with open(filename, mode) as file: + file.write(content) + self.addCleanup(os.unlink, filename) + + def create_remote_file(self, vm, filename, content): + self.loop.run_until_complete(vm.run_for_stdio( + 'cat > {}'.format(shlex.quote(filename)), + user='root', input=content.encode('utf-8'))) + def test_000_start_shutdown(self): - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEquals(self.testvm1.get_power_state(), "Running") - self.testvm1.shutdown() + self.loop.run_until_complete(self.testvm1.shutdown()) shutdown_counter = 0 while self.testvm1.is_running(): @@ -74,87 +86,101 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): @unittest.skipUnless(spawn.find_executable('xdotool'), "xdotool not installed") def test_010_run_xterm(self): - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEquals(self.testvm1.get_power_state(), "Running") - self.testvm1.run("xterm") - wait_count = 0 - title = 'user@{}'.format(self.testvm1.name) - if self.template.count("whonix"): - title = 'user@host' - while subprocess.call( - ['xdotool', 'search', '--name', title], - stdout=open(os.path.devnull, 'w'), - stderr=subprocess.STDOUT) > 0: - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for xterm window") - time.sleep(0.1) - time.sleep(0.5) - subprocess.check_call( - ['xdotool', 'search', '--name', title, - 'windowactivate', 'type', 'exit\n']) + p = self.loop.run_until_complete(self.testvm1.run('xterm')) + try: + wait_count = 0 + title = 'user@{}'.format(self.testvm1.name) + if self.template.count("whonix"): + title = 'user@host' + while subprocess.call( + ['xdotool', 'search', '--name', title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) > 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for xterm window") + time.sleep(0.1) - wait_count = 0 - while subprocess.call(['xdotool', 'search', '--name', title], - stdout=open(os.path.devnull, 'w'), - stderr=subprocess.STDOUT) == 0: - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for xterm " - "termination") - time.sleep(0.1) + time.sleep(0.5) + subprocess.check_call( + ['xdotool', 'search', '--name', title, + 'windowactivate', 'type', 'exit\n']) + + wait_count = 0 + while subprocess.call(['xdotool', 'search', '--name', title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) == 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for xterm " + "termination") + time.sleep(0.1) + finally: + p.terminate() + self.loop.run_until_complete(p.wait()) @unittest.skipUnless(spawn.find_executable('xdotool'), "xdotool not installed") def test_011_run_gnome_terminal(self): if "minimal" in self.template: self.skipTest("Minimal template doesn't have 'gnome-terminal'") - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEquals(self.testvm1.get_power_state(), "Running") - self.testvm1.run("gnome-terminal") - title = 'user@{}'.format(self.testvm1.name) - if self.template.count("whonix"): - title = 'user@host' - wait_count = 0 - while subprocess.call( - ['xdotool', 'search', '--name', title], - stdout=open(os.path.devnull, 'w'), - stderr=subprocess.STDOUT) > 0: - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for gnome-terminal window") - time.sleep(0.1) + p = self.loop.run_until_complete(self.testvm1.run('gnome-terminal')) + try: + title = 'user@{}'.format(self.testvm1.name) + if self.template.count("whonix"): + title = 'user@host' + wait_count = 0 + while subprocess.call( + ['xdotool', 'search', '--name', title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) > 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for gnome-terminal window") + time.sleep(0.1) - time.sleep(0.5) - subprocess.check_call( - ['xdotool', 'search', '--name', title, - 'windowactivate', '--sync', 'type', 'exit\n']) + time.sleep(0.5) + subprocess.check_call( + ['xdotool', 'search', '--name', title, + 'windowactivate', '--sync', 'type', 'exit\n']) - wait_count = 0 - while subprocess.call(['xdotool', 'search', '--name', title], - stdout=open(os.path.devnull, 'w'), - stderr=subprocess.STDOUT) == 0: - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for gnome-terminal " - "termination") - time.sleep(0.1) + wait_count = 0 + while subprocess.call(['xdotool', 'search', '--name', title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) == 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for gnome-terminal " + "termination") + time.sleep(0.1) + finally: + p.terminate() + self.loop.run_until_complete(p.wait()) @unittest.skipUnless(spawn.find_executable('xdotool'), "xdotool not installed") @unittest.expectedFailure def test_012_qubes_desktop_run(self): - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) self.assertEquals(self.testvm1.get_power_state(), "Running") xterm_desktop_path = "/usr/share/applications/xterm.desktop" # Debian has it different... xterm_desktop_path_debian = \ "/usr/share/applications/debian-xterm.desktop" - if self.testvm1.run("test -r {}".format(xterm_desktop_path_debian), - wait=True) == 0: + try: + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'test -r {}'.format(xterm_desktop_path_debian))) + except subprocess.CalledProcessError: + pass + else: xterm_desktop_path = xterm_desktop_path_debian - self.testvm1.run("qubes-desktop-run {}".format(xterm_desktop_path)) + self.loop.run_until_complete( + self.testvm1.run('qubes-desktop-run {}'.format(xterm_desktop_path))) title = 'user@{}'.format(self.testvm1.name) if self.template.count("whonix"): title = 'user@host' @@ -185,137 +211,101 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): def test_050_qrexec_simple_eof(self): """Test for data and EOF transmission dom0->VM""" - result = multiprocessing.Value('i', 0) - def run(self, result): - p = self.testvm1.run("cat", passio_popen=True, - passio_stderr=True) + # XXX is this still correct? this is no longer simple qrexec, + # but qubes.VMShell - (stdout, stderr) = p.communicate(TEST_DATA) - if stdout != TEST_DATA: - result.value = 1 - if len(stderr) > 0: - result.value = 2 + self.loop.run_until_complete(self.testvm1.start()) + try: + (stdout, stderr) = self.loop.run_until_complete(asyncio.wait_for( + self.testvm1.run_for_stdio('cat', input=TEST_DATA), + timeout=10)) + except asyncio.TimeoutError: + self.fail( + "Timeout, probably EOF wasn't transferred to the VM process") - self.testvm1.start() - - t = multiprocessing.Process(target=run, args=(self, result)) - t.start() - t.join(timeout=10) - if t.is_alive(): - t.terminate() - self.fail("Timeout, probably EOF wasn't transferred to the VM " - "process") - if result.value == 1: - self.fail("Received data differs from what was sent") - elif result.value == 2: - self.fail("Some data was printed to stderr") + self.assertEquals(stdout, TEST_DATA, + 'Received data differs from what was sent') + self.assertFalse(stderr, + 'Some data was printed to stderr') def test_051_qrexec_simple_eof_reverse(self): """Test for EOF transmission VM->dom0""" - result = multiprocessing.Value('i', 0) - def run(self, result): - p = self.testvm1.run("echo test; exec >&-; cat > /dev/null", - passio_popen=True, passio_stderr=True) + @asyncio.coroutine + def run(self): + p = yield from self.testvm1.run( + 'echo test; exec >&-; cat > /dev/null') + # this will hang on test failure - stdout = p.stdout.read() + stdout = yield from p.stdout.read() + p.stdin.write(TEST_DATA) + yield from p.stdin.drain() p.stdin.close() - if stdout.strip() != b"test": - result.value = 1 + self.assertEquals(stdout.strip(), 'test', + 'Received data differs from what was expected') # this may hang in some buggy cases - elif len(p.stderr.read()) > 0: - result.value = 2 - elif p.poll() is None: - time.sleep(1) - if p.poll() is None: - result.value = 3 + self.assertFalse((yield from p.stderr.read()), + 'Some data was printed to stderr') - self.testvm1.start() + try: + yield from asyncio.wait_for(p.wait(), timeout=1) + except asyncio.TimeoutError: + self.fail("Timeout, " + "probably EOF wasn't transferred from the VM process") - t = multiprocessing.Process(target=run, args=(self, result)) - t.start() - t.join(timeout=10) - if t.is_alive(): - t.terminate() - self.fail("Timeout, probably EOF wasn't transferred from the VM " - "process") - if result.value == 1: - self.fail("Received data differs from what was expected") - elif result.value == 2: - self.fail("Some data was printed to stderr") - elif result.value == 3: - self.fail("VM proceess didn't terminated on EOF") + self.loop.run_until_complete(self.testvm1.start()) + self.loop.run_until_complete(run(self)) def test_052_qrexec_vm_service_eof(self): """Test for EOF transmission VM(src)->VM(dst)""" - result = multiprocessing.Value('i', 0) - def run(self, result): - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.EOF " - "/bin/sh -c 'echo test; exec >&-; cat " - ">&$SAVED_FD_1'" % self.testvm2.name, - passio_popen=True) - (stdout, stderr) = p.communicate() - if stdout != b"test\n": - result.value = 1 + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + self.loop.run_until_complete(self.testvm2.run_for_stdio( + 'cat > /etc/qubes-rpc/test.EOF', + user='root', + input=b'/bin/cat')) - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root", - passio_popen=True) - p.stdin.write(b"/bin/cat") - p.stdin.close() - p.wait() - policy = open("/etc/qubes-rpc/policy/test.EOF", "w") - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - policy.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.EOF") + with self.qrexec_policy('test.EOF', self.testvm1, self.testvm2): + try: + stdout, _ = self.loop.run_until_complete(asyncio.wait_for( + self.testvm1.run_for_stdio('''\ + /usr/lib/qubes/qrexec-client-vm {} test.EOF \ + /bin/sh -c 'echo test; exec >&-; cat >&$SAVED_FD_1' + '''.format(self.testvm2.name)), + timeout=10)) + except asyncio.TimeoutError: + self.fail("Timeout, probably EOF wasn't transferred") - t = multiprocessing.Process(target=run, args=(self, result)) - t.start() - t.join(timeout=10) - if t.is_alive(): - t.terminate() - self.fail("Timeout, probably EOF wasn't transferred") - if result.value == 1: - self.fail("Received data differs from what was expected") + self.assertEquals(stdout, b'test', + 'Received data differs from what was expected') @unittest.expectedFailure def test_053_qrexec_vm_service_eof_reverse(self): """Test for EOF transmission VM(src)<-VM(dst)""" - result = multiprocessing.Value('i', 0) - def run(self, result): - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.EOF " - "/bin/sh -c 'cat >&$SAVED_FD_1'" - % self.testvm2.name, - passio_popen=True) - (stdout, stderr) = p.communicate() - if stdout != b"test\n": - result.value = 1 + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.EOF', + 'echo test; exec >&-; cat >/dev/null') - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root", - passio_popen=True) - p.stdin.write(b"echo test; exec >&-; cat >/dev/null") - p.stdin.close() - p.wait() - policy = open("/etc/qubes-rpc/policy/test.EOF", "w") - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - policy.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.EOF") + with self.qrexec_policy('test.EOF', self.testvm1, self.testvm2): + try: + stdout, _ = self.loop.run_until_complete(asyncio.wait_for( + self.testvm1.run_for_stdio('''\ + /usr/lib/qubes/qrexec-client-vm {} test.EOF \ + /bin/sh -c 'cat >&$SAVED_FD_1' + '''.format(self.testvm2.name)), + timeout=10)) + except asyncio.TimeoutError: + self.fail("Timeout, probably EOF wasn't transferred") - t = multiprocessing.Process(target=run, args=(self, result)) - t.start() - t.join(timeout=10) - if t.is_alive(): - t.terminate() - self.fail("Timeout, probably EOF wasn't transferred") - if result.value == 1: - self.fail("Received data differs from what was expected") + self.assertEquals(stdout, b'test', + 'Received data differs from what was expected') def test_055_qrexec_dom0_service_abort(self): """ @@ -327,78 +317,50 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): its stdout - otherwise such service might hang on write(2) call. """ - def run (src): - p = src.run("/usr/lib/qubes/qrexec-client-vm dom0 " - "test.Abort /bin/cat /dev/zero", - passio_popen=True) - - p.communicate() - p.wait() - - self.testvm1.start() - service = open("/etc/qubes-rpc/test.Abort", "w") - service.write("sleep 1") - service.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/test.Abort") - policy = open("/etc/qubes-rpc/policy/test.Abort", "w") - policy.write("%s dom0 allow" % (self.testvm1.name)) - policy.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Abort") - - t = multiprocessing.Process(target=run, args=(self.testvm1,)) - t.start() - t.join(timeout=10) - if t.is_alive(): - t.terminate() - self.fail("Timeout, probably stdout wasn't closed") + self.loop.run_until_complete(self.testvm1.start()) + self.create_local_file('/etc/qubes-rpc/test.Abort', + 'sleep 1') + with self.qrexec_policy('test.Abort', self.testvm1, 'dom0'): + try: + stdout, _ = self.loop.run_until_complete(asyncio.wait_for( + self.testvm1.run_for_stdio('''\ + /usr/lib/qubes/qrexec-client-vm dom0 test.Abort \ + /bin/cat /dev/zero'''), + timeout=10)) + except asyncio.TimeoutError: + self.fail("Timeout, probably stdout wasn't closed") def test_060_qrexec_exit_code_dom0(self): - self.testvm1.start() - - p = self.testvm1.run("exit 0", passio_popen=True) - p.wait() - self.assertEqual(0, p.returncode) - - p = self.testvm1.run("exit 3", passio_popen=True) - p.wait() - self.assertEqual(3, p.returncode) + self.loop.run_until_complete(self.testvm1.start()) + self.loop.run_until_complete(self.testvm1.run_for_stdio('exit 0')) + with self.assertRaises(subprocess.CalledProcessError): + self.loop.run_until_complete(self.testvm1.run_for_stdio('exit 3')) @unittest.expectedFailure def test_065_qrexec_exit_code_vm(self): - self.testvm1.start() - self.testvm2.start() + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - policy = open("/etc/qubes-rpc/policy/test.Retcode", "w") - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - policy.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Retcode") + with self.qrexec_policy('test.Retcode', self.testvm1, self.testvm2): + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.Retcode', + 'exit 0') + (stdout, stderr) = self.loop.run_until_complete( + self.testvm1.run_for_stdio('''\ + /usr/lib/qubes/qrexec-client-vm {} test.Retcode \ + /bin/sh -c 'cat >/dev/null'; + echo $?'''.format(self.testvm1.name))) + self.assertEqual(stdout, b'0\n') - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root", - passio_popen=True) - p.stdin.write(b"exit 0") - p.stdin.close() - p.wait() - - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.Retcode " - "/bin/sh -c 'cat >/dev/null'; echo $?" - % self.testvm1.name, - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"0\n") - - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root", - passio_popen=True) - p.stdin.write(b"exit 3") - p.stdin.close() - p.wait() - - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.Retcode " - "/bin/sh -c 'cat >/dev/null'; echo $?" - % self.testvm1.name, - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"3\n") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.Retcode', + 'exit 3') + (stdout, stderr) = self.loop.run_until_complete( + self.testvm1.run_for_stdio('''\ + /usr/lib/qubes/qrexec-client-vm {} test.Retcode \ + /bin/sh -c 'cat >/dev/null'; + echo $?'''.format(self.testvm1.name))) + self.assertEqual(stdout, b'3\n') def test_070_qrexec_vm_simultaneous_write(self): """Test for simultaneous write in VM(src)->VM(dst) connection @@ -411,53 +373,47 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): There was a bug where remote side was waiting on write(2) and not handling anything else. """ - result = multiprocessing.Value('i', -1) def run(self): - p = self.testvm1.run( - "/usr/lib/qubes/qrexec-client-vm %s test.write " - "/bin/sh -c '" - # first write a lot of data to fill all the buffers - "dd if=/dev/zero bs=993 count=10000 iflag=fullblock & " - # then after some time start reading - "sleep 1; " - "dd of=/dev/null bs=993 count=10000 iflag=fullblock; " - "wait" - "'" % self.testvm2.name, passio_popen=True) + # first write a lot of data to fill all the buffers + # then after some time start reading p.communicate() result.value = p.returncode - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root", - passio_popen=True) - # first write a lot of data - p.stdin.write(b"dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n") - # and only then read something - p.stdin.write(b"dd of=/dev/null bs=993 count=10000 iflag=fullblock\n") - p.stdin.close() - p.wait() - policy = open("/etc/qubes-rpc/policy/test.write", "w") - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - policy.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.write") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - t = multiprocessing.Process(target=run, args=(self,)) - t.start() - t.join(timeout=10) - if t.is_alive(): - t.terminate() - self.fail("Timeout, probably deadlock") - self.assertEqual(result.value, 0, "Service call failed") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.write', '''\ + # first write a lot of data + dd if=/dev/zero bs=993 count=10000 iflag=fullblock + # and only then read something + dd of=/dev/null bs=993 count=10000 iflag=fullblock + ''') + with self.qrexec_policy('test.write', self.testvm1, self.testvm2): + try: + self.loop.run_until_complete(asyncio.wait_for( + self.testvm1.run_for_stdio('''\ + /usr/lib/qubes/qrexec-client-vm {} test.write \ + /bin/sh -c ' + dd if=/dev/zero bs=993 count=10000 iflag=fullblock & + sleep 1; + dd of=/dev/null bs=993 count=10000 iflag=fullblock; + wait' + '''.format(self.testvm2.name)), timeout=10)) + except subprocess.CalledProcessError: + self.fail('Service call failed') + except asyncio.TimeoutError: + self.fail('Timeout, probably deadlock') + + @unittest.skip('localcmd= argument went away') def test_071_qrexec_dom0_simultaneous_write(self): """Test for simultaneous write in dom0(src)->VM(dst) connection Similar to test_070_qrexec_vm_simultaneous_write, but with dom0 as a source. """ - result = multiprocessing.Value('i', -1) - def run(self): result.value = self.testvm2.run_service( "test.write", localcmd="/bin/sh -c '" @@ -469,19 +425,14 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): "wait" "'") - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root", - passio_popen=True) - # first write a lot of data - p.stdin.write(b"dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n") - # and only then read something - p.stdin.write(b"dd of=/dev/null bs=993 count=10000 iflag=fullblock\n") - p.stdin.close() - p.wait() - policy = open("/etc/qubes-rpc/policy/test.write", "w") - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - policy.close() - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.write") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.write', '''\ + # first write a lot of data + dd if=/dev/zero bs=993 count=10000 iflag=fullblock + # and only then read something + dd of=/dev/null bs=993 count=10000 iflag=fullblock + ''') + self.create_local_file('/etc/qubes-rpc/policy/test.write', + '{} {} allow'.format(self.testvm1.name, self.testvm2.name)) t = multiprocessing.Process(target=run, args=(self,)) t.start() @@ -491,6 +442,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): self.fail("Timeout, probably deadlock") self.assertEqual(result.value, 0, "Service call failed") + @unittest.skip('localcmd= argument went away') def test_072_qrexec_to_dom0_simultaneous_write(self): """Test for simultaneous write in dom0(src)<-VM(dst) connection @@ -508,7 +460,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): "dd of=/dev/null bs=993 count=10000 iflag=fullblock; " "'") - self.testvm2.start() + self.loop.run_until_complete(self.testvm2.start()) p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root", passio_popen=True) # first write a lot of data @@ -534,187 +486,194 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): def test_080_qrexec_service_argument_allow_default(self): """Qrexec service call with argument""" - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", - passio_popen=True) - p.communicate(b"/bin/echo $1") - with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " - "test.Argument+argument".format(self.testvm2.name), - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"argument\n") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.Argument', + '/usr/bin/printf %s "$1"') + with self.qrexec_policy('test.Argument', self.testvm1, self.testvm2): + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio('/usr/lib/qubes/qrexec-client-vm ' + '{} test.Argument+argument'.format(self.testvm2.name))) + self.assertEqual(stdout, b'argument') def test_081_qrexec_service_argument_allow_specific(self): """Qrexec service call with argument - allow only specific value""" - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", - passio_popen=True) - p.communicate(b"/bin/echo $1") - with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: - policy.write("$anyvm $anyvm deny") - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \ - policy: - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - self.addCleanup(os.unlink, - "/etc/qubes-rpc/policy/test.Argument+argument") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.Argument', + '/usr/bin/printf %s "$1"') - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " - "test.Argument+argument".format(self.testvm2.name), - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"argument\n") + with self.qrexec_policy('test.Argument', '$anyvm', '$anyvm', False): + with self.qrexec_policy('test.Argument+argument', + self.testvm1.name, self.testvm2.name): + stdout, stderr = self.loop.run_until_complete(self.testvm1.run( + '/usr/lib/qubes/qrexec-client-vm ' + '{} test.Argument+argument'.format(self.testvm2.name))) + self.assertEqual(stdout, b'argument') def test_082_qrexec_service_argument_deny_specific(self): """Qrexec service call with argument - deny specific value""" - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", - passio_popen=True) - p.communicate(b"/bin/echo $1") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: - policy.write("$anyvm $anyvm allow") - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") - - with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \ - policy: - policy.write("%s %s deny" % (self.testvm1.name, self.testvm2.name)) - self.addCleanup(os.unlink, - "/etc/qubes-rpc/policy/test.Argument+argument") - - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " - "test.Argument+argument".format(self.testvm2.name), - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"") - self.assertEqual(p.returncode, 1, "Service request should be denied") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.Argument', + '/usr/bin/printf %s "$1"') + with self.qrexec_policy('test.Argument', '$anyvm', '$anyvm'): + with self.qrexec_policy('test.Argument+argument', + self.testvm1, self.testvm2, allow=False): + with self.assertRaises(subprocess.CalledProcessError, + 'Service request should be denied'): + self.loop.run_until_complete( + self.testvm1.run('/usr/lib/qubes/qrexec-client-vm {} ' + 'test.Argument+argument'.format(self.testvm2.name))) def test_083_qrexec_service_argument_specific_implementation(self): """Qrexec service call with argument - argument specific implementatation""" - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", - passio_popen=True) - p.communicate(b"/bin/echo $1") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument", - user="root", passio_popen=True) - p.communicate(b"/bin/echo specific: $1") + self.create_remote_file(self.testvm2, + '/etc/qubes-rpc/test.Argument', + '/usr/bin/printf %s "$1"') + self.create_remote_file(self.testvm2, + '/etc/qubes-rpc/test.Argument+argument', + '/usr/bin/printf "specific: %s" "$1"') - with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + with self.qrexec_policy('test.Argument', self.testvm1, self.testvm2): + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio('/usr/lib/qubes/qrexec-client-vm ' + '{} test.Argument+argument'.format(self.testvm2.name))) - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " - "test.Argument+argument".format(self.testvm2.name), - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"specific: argument\n") + self.assertEqual(stdout, b'specific: argument') def test_084_qrexec_service_argument_extra_env(self): """Qrexec service call with argument - extra env variables""" - self.testvm1.start() - self.testvm2.start() - p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root", - passio_popen=True) - p.communicate(b"/bin/echo $QREXEC_SERVICE_FULL_NAME " - b"$QREXEC_SERVICE_ARGUMENT") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) - with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy: - policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name)) - self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument") + self.create_remote_file(self.testvm2, '/etc/qubes-rpc/test.Argument', + '/usr/bin/printf "%s %s" ' + '"$QREXEC_SERVICE_FULL_NAME" "$QREXEC_SERVICE_ARGUMENT"') - p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} " - "test.Argument+argument".format(self.testvm2.name), - passio_popen=True) - (stdout, stderr) = p.communicate() - self.assertEqual(stdout, b"test.Argument+argument argument\n") + with self.qrexec_policy('test.Argument', self.testvm1, self.testvm2): + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio('/usr/lib/qubes/qrexec-client-vm ' + '{} test.Argument+argument'.format(self.testvm2.name))) + + self.assertEqual(stdout, b'test.Argument+argument argument') def test_100_qrexec_filecopy(self): - self.testvm1.start() - self.testvm2.start() - self.qrexec_policy('qubes.Filecopy', self.testvm1.name, - self.testvm2.name) - p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % - self.testvm2.name, passio_popen=True, - passio_stderr=True) - p.wait() - self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" % - p.stderr.read()) - retcode = self.testvm2.run("diff /etc/passwd " - "/home/user/QubesIncoming/{}/passwd".format( - self.testvm1.name), - wait=True) - self.assertEqual(retcode, 0, "file differs") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + + with self.qrexec_policy('qubes.Filecopy', self.testvm1, self.testvm2): + try: + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'qvm-copy-to-vm {} /etc/passwd'.format( + self.testvm2.name))) + except subprocess.CalledProcessError: + self.fail('qvm-copy-to-vm failed: {}'.format(stderr)) + + try: + self.loop.run_until_complete(self.testvm2.run_for_stdio( + 'diff /etc/passwd /home/user/QubesIncoming/{}/passwd'.format( + self.testvm1.name))) + except subprocess.CalledProcessError: + self.fail('file differs') + + try: + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'test -f /etc/passwd')) + except subprocess.CalledProcessError: + self.fail('source file got removed') def test_105_qrexec_filemove(self): - self.testvm1.start() - self.testvm2.start() - self.qrexec_policy('qubes.Filecopy', self.testvm1.name, - self.testvm2.name) - retcode = self.testvm1.run("cp /etc/passwd passwd", wait=True) - assert retcode == 0, "Failed to prepare source file" - p = self.testvm1.run("qvm-move-to-vm %s passwd" % - self.testvm2.name, passio_popen=True, - passio_stderr=True) - p.wait() - self.assertEqual(p.returncode, 0, "qvm-move-to-vm failed: %s" % - p.stderr.read()) - retcode = self.testvm2.run("diff /etc/passwd " - "/home/user/QubesIncoming/{}/passwd".format( - self.testvm1.name), - wait=True) - self.assertEqual(retcode, 0, "file differs") - retcode = self.testvm1.run("test -f passwd", wait=True) - self.assertEqual(retcode, 1, "source file not removed") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'cp /etc/passwd passwd')) + with self.qrexec_policy('qubes.Filecopy', self.testvm1, self.testvm2): + try: + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'qvm-move-to-vm {} passwd'.format(self.testvm2.name))) + except subprocess.CalledProcessError: + self.fail('qvm-move-to-vm failed: {}'.format(stderr)) + + try: + self.loop.run_until_complete(self.testvm2.run_for_stdio( + 'diff /etc/passwd /home/user/QubesIncoming/{}/passwd'.format( + self.testvm1.name))) + except subprocess.CalledProcessError: + self.fail('file differs') + + with self.assertRaises(subprocess.CalledProcessError): + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'test -f passwd')) def test_101_qrexec_filecopy_with_autostart(self): - self.testvm1.start() - self.qrexec_policy('qubes.Filecopy', self.testvm1.name, - self.testvm2.name) - p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % - self.testvm2.name, passio_popen=True, - passio_stderr=True) - p.wait() - self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" % - p.stderr.read()) + self.loop.run_until_complete(self.testvm1.start()) + + with self.qrexec_policy('qubes.Filecopy', self.testvm1, self.testvm2): + try: + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'qvm-copy-to-vm {} /etc/passwd'.format( + self.testvm2.name))) + except subprocess.CalledProcessError: + self.fail('qvm-copy-to-vm failed: {}'.format(stderr)) + # workaround for libvirt bug (domain ID isn't updated when is started # from other application) - details in # QubesOS/qubes-core-libvirt@63ede4dfb4485c4161dd6a2cc809e8fb45ca664f + # XXX is it still true with qubesd? --woju 20170523 self.testvm2._libvirt_domain = None self.assertTrue(self.testvm2.is_running()) - retcode = self.testvm2.run("diff /etc/passwd " - "/home/user/QubesIncoming/{}/passwd".format( - self.testvm1.name), - wait=True) - self.assertEqual(retcode, 0, "file differs") + + try: + self.loop.run_until_complete(self.testvm2.run_for_stdio( + 'diff /etc/passwd /home/user/QubesIncoming/{}/passwd'.format( + self.testvm1.name))) + except subprocess.CalledProcessError: + self.fail('file differs') + + try: + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'test -f /etc/passwd')) + except subprocess.CalledProcessError: + self.fail('source file got removed') def test_110_qrexec_filecopy_deny(self): - self.testvm1.start() - self.testvm2.start() - self.qrexec_policy('qubes.Filecopy', self.testvm1.name, - self.testvm2.name, allow=False) - p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % - self.testvm2.name, passio_popen=True) - p.wait() - self.assertNotEqual(p.returncode, 0, "qvm-copy-to-vm unexpectedly " - "succeeded") - retcode = self.testvm1.run("ls /home/user/QubesIncoming/%s" % - self.testvm1.name, wait=True, - ignore_stderr=True) - self.assertNotEqual(retcode, 0, "QubesIncoming exists although file " - "copy was denied") + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + + with self.qrexec_policy('qubes.Filecopy', self.testvm1, self.testvm2, + allow=False): + with self.assertRaises(subprocess.CalledProcessError): + stdout, stderr = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'qvm-copy-to-vm {} /etc/passwd'.format( + self.testvm2.name))) + + with self.assertRaises(subprocess.CalledProcessError): + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'test -d /home/user/QubesIncoming/{}'.format( + self.testvm1.name))) @unittest.skip("Xen gntalloc driver crashes when page is mapped in the " "same domain") @@ -737,93 +696,86 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): @unittest.skipUnless(spawn.find_executable('xdotool'), "xdotool not installed") def test_130_qrexec_filemove_disk_full(self): - self.testvm1.start() - self.testvm2.start() - self.qrexec_policy('qubes.Filecopy', self.testvm1.name, - self.testvm2.name) + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + # Prepare test file - prepare_cmd = ("yes teststring | dd of=testfile bs=1M " - "count=50 iflag=fullblock") - retcode = self.testvm1.run(prepare_cmd, wait=True) - if retcode != 0: - raise RuntimeError("Failed '{}' in {}".format(prepare_cmd, - self.testvm1.name)) + self.loop.run_until_complete(self.testvm1.run( + 'yes teststring | dd of=testfile bs=1M count=50 iflag=fullblock')) + # Prepare target directory with limited size - prepare_cmd = ( - "mkdir -p /home/user/QubesIncoming && " - "chown user /home/user/QubesIncoming && " - "mount -t tmpfs none /home/user/QubesIncoming -o size=48M" - ) - retcode = self.testvm2.run(prepare_cmd, user="root", wait=True) - if retcode != 0: - raise RuntimeError("Failed '{}' in {}".format(prepare_cmd, - self.testvm2.name)) - p = self.testvm1.run("qvm-move-to-vm %s testfile" % - self.testvm2.name, passio_popen=True, - passio_stderr=True) + self.loop.run_until_complete(self.testvm2.run( + 'mkdir -p /home/user/QubesIncoming && ' + 'chown user /home/user/QubesIncoming && ' + 'mount -t tmpfs none /home/user/QubesIncoming -o size=48M', + user='root')) + + with self.qrexec_policy('qubes.Filecopy', self.testvm1, self.testvm2): + with self.assertRaises(subprocess.CalledProcessError): + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'qvm-move-to-vm {} testfile'.format(self.testvm2.name))) + # Close GUI error message self.enter_keys_in_window('Error', ['Return']) - p.wait() - self.assertNotEqual(p.returncode, 0, "qvm-move-to-vm should fail") - retcode = self.testvm1.run("test -f testfile", wait=True) - self.assertEqual(retcode, 0, "testfile should not be deleted in " - "source VM") + + # the file shouldn't be removed in source vm + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'test -f testfile')) def test_200_timezone(self): """Test whether timezone setting is properly propagated to the VM""" if "whonix" in self.template: self.skipTest("Timezone propagation disabled on Whonix templates") - self.testvm1.start() - (vm_tz, _) = self.testvm1.run("date +%Z", - passio_popen=True).communicate() - (dom0_tz, _) = subprocess.Popen(["date", "+%Z"], - stdout=subprocess.PIPE).communicate() + self.loop.run_until_complete(self.testvm1.start()) + vm_tz, _ = self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'date +%Z')) + dom0_tz = subprocess.check_output(['date', '+%Z']) self.assertEqual(vm_tz.strip(), dom0_tz.strip()) # Check if reverting back to UTC works - (vm_tz, _) = self.testvm1.run("TZ=UTC date +%Z", - passio_popen=True).communicate() - self.assertEqual(vm_tz.strip(), b"UTC") + vm_tz, _ = self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'TZ=UTC date +%Z')) + self.assertEqual(vm_tz.strip(), b'UTC') def test_210_time_sync(self): """Test time synchronization mechanism""" if self.template.startswith('whonix-'): self.skipTest('qvm-sync-clock disabled for Whonix VMs') - self.testvm1.start() - self.testvm2.start() - (start_time, _) = subprocess.Popen(["date", "-u", "+%s"], - stdout=subprocess.PIPE).communicate() + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + start_time = subprocess.check_output(['date', '-u', '+%s']) + try: self.app.clockvm = self.testvm1 self.app.save() # break vm and dom0 time, to check if qvm-sync-clock would fix it - subprocess.check_call(["sudo", "date", "-s", - "2001-01-01T12:34:56"], - stdout=open(os.devnull, 'w')) - retcode = self.testvm1.run("date -s 2001-01-01T12:34:56", - user="root", wait=True) - self.assertEquals(retcode, 0, "Failed to break the VM(1) time") - retcode = self.testvm2.run("date -s 2001-01-01T12:34:56", - user="root", wait=True) - self.assertEquals(retcode, 0, "Failed to break the VM(2) time") - retcode = subprocess.call(["qvm-sync-clock"]) - self.assertEquals(retcode, 0, - "qvm-sync-clock failed with code {}". - format(retcode)) + subprocess.check_call(['sudo', 'date', '-s', '2001-01-01T12:34:56'], + stdout=subprocess.DEVNULL) + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.run_for_stdio('date -s 2001-01-01T12:34:56', + user='root'), + self.testvm2.run_for_stdio('date -s 2001-01-01T12:34:56', + user='root'), + ])) + + subprocess.check_call(['qvm-sync-clock'], stdout=subprocess.DEVNULL) # qvm-sync-clock is asynchronous - it spawns qubes.SetDateTime # service, send it timestamp value and exists without waiting for # actual time set + time.sleep(1) - (vm_time, _) = self.testvm1.run("date -u +%s", - passio_popen=True).communicate() + vm_time, _ = self.loop.run_until_complete( + self.testvm1.run_for_stdio('date -u +%s')) self.assertAlmostEquals(int(vm_time), int(start_time), delta=30) - (vm_time, _) = self.testvm2.run("date -u +%s", - passio_popen=True).communicate() + + vm_time, _ = self.loop.run_until_complete( + self.testvm2.run_for_stdio('date -u +%s')) self.assertAlmostEquals(int(vm_time), int(start_time), delta=30) - (dom0_time, _) = subprocess.Popen(["date", "-u", "+%s"], - stdout=subprocess.PIPE - ).communicate() + + dom0_time, _ = subprocess.check_output(['date', '-u', '+%s']) self.assertAlmostEquals(int(dom0_time), int(start_time), delta=30) except: @@ -840,21 +792,20 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): """ # First offline test self.testvm1.storage.resize('private', 4*1024**3) - self.testvm1.start() + self.loop.run_until_complete(self.testvm1.start()) df_cmd = '( df --output=size /rw || df /rw | awk \'{print $2}\' )|' \ 'tail -n 1' - p = self.testvm1.run(df_cmd, - passio_popen=True) # new_size in 1k-blocks - (new_size, _) = p.communicate() + new_size, _ = self.loop.run_until_complete( + self.testvm1.run_for_stdio(df_cmd)) # some safety margin for FS metadata self.assertGreater(int(new_size.strip()), 3.8*1024**2) # Then online test - self.testvm1.storage.resize('private', 6*1024**3) - p = self.testvm1.run(df_cmd, - passio_popen=True) + self.loop.run_until_complete( + self.testvm1.storage.resize('private', 6*1024**3)) # new_size in 1k-blocks - (new_size, _) = p.communicate() + new_size, _ = self.loop.run_until_complete( + self.testvm1.run_for_stdio(df_cmd)) # some safety margin for FS metadata self.assertGreater(int(new_size.strip()), 5.8*1024**2) @@ -866,140 +817,152 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): still use old pointers and will display old pages :return: """ + + # this test does too much asynchronous operations, + # so let's rewrite it as a coroutine and call it as such + return self.loop.run_until_complete( + self._test_300_bug_1028_gui_memory_pinning()) + + @asyncio.coroutine + def _test_300_bug_1028_gui_memory_pinning(self): self.testvm1.memory = 800 self.testvm1.maxmem = 800 + # exclude from memory balancing self.testvm1.features['services/meminfo-writer'] = False - self.testvm1.start() - # and allow large map count - self.testvm1.run("echo 256000 > /proc/sys/vm/max_map_count", - user="root", wait=True) - allocator_c = ( - "#include \n" - "#include \n" - "#include \n" - "\n" - "int main(int argc, char **argv) {\n" - " int total_pages;\n" - " char *addr, *iter;\n" - "\n" - " total_pages = atoi(argv[1]);\n" - " addr = mmap(NULL, total_pages * 0x1000, PROT_READ | " - "PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_POPULATE, -1, 0);\n" - " if (addr == MAP_FAILED) {\n" - " perror(\"mmap\");\n" - " exit(1);\n" - " }\n" - " printf(\"Stage1\\n\");\n" - " fflush(stdout);\n" - " getchar();\n" - " for (iter = addr; iter < addr + total_pages*0x1000; iter += " - "0x2000) {\n" - " if (mlock(iter, 0x1000) == -1) {\n" - " perror(\"mlock\");\n" - " fprintf(stderr, \"%d of %d\\n\", (iter-addr)/0x1000, " - "total_pages);\n" - " exit(1);\n" - " }\n" - " }\n" - " printf(\"Stage2\\n\");\n" - " fflush(stdout);\n" - " for (iter = addr+0x1000; iter < addr + total_pages*0x1000; " - "iter += 0x2000) {\n" - " if (munmap(iter, 0x1000) == -1) {\n" - " perror(\"munmap\");\n" - " exit(1);\n" - " }\n" - " }\n" - " printf(\"Stage3\\n\");\n" - " fflush(stdout);\n" - " fclose(stdout);\n" - " getchar();\n" - "\n" - " return 0;\n" - "}\n") + yield from self.testvm1.start() - p = self.testvm1.run("cat > allocator.c", passio_popen=True) - p.communicate(allocator_c.encode()) - p = self.testvm1.run("gcc allocator.c -o allocator", - passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - if p.returncode != 0: - self.skipTest("allocator compile failed: {}".format(stderr)) + # and allow large map count + yield from self.testvm1.run('echo 256000 > /proc/sys/vm/max_map_count', + user="root") + + allocator_c = ''' +#include +#include +#include + +int main(int argc, char **argv) { + int total_pages; + char *addr, *iter; + + total_pages = atoi(argv[1]); + addr = mmap(NULL, total_pages * 0x1000, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE | MAP_POPULATE, -1, 0); + if (addr == MAP_FAILED) { + perror("mmap"); + exit(1); + } + + printf("Stage1\\n"); + fflush(stdout); + getchar(); + for (iter = addr; iter < addr + total_pages*0x1000; iter += 0x2000) { + if (mlock(iter, 0x1000) == -1) { + perror("mlock"); + fprintf(stderr, "%d of %d\\n", (iter-addr)/0x1000, total_pages); + exit(1); + } + } + + printf("Stage2\\n"); + fflush(stdout); + for (iter = addr+0x1000; iter < addr + total_pages*0x1000; iter += 0x2000) { + if (munmap(iter, 0x1000) == -1) { + perror(\"munmap\"); + exit(1); + } + } + + printf("Stage3\\n"); + fflush(stdout); + fclose(stdout); + getchar(); + + return 0; +} +''' + + yield from self.testvm1.run_for_stdio('cat > allocator.c', + input=allocator_c.encode()) + + try: + stdout, stderr = yield from self.testvm1.run_for_stdio( + 'gcc allocator.c -o allocator') + except subprocess.CalledProcessError: + self.skipTest('allocator compile failed: {}'.format(stderr)) # drop caches to have even more memory pressure - self.testvm1.run("echo 3 > /proc/sys/vm/drop_caches", - user="root", wait=True) + yield from self.testvm1.run( + 'echo 3 > /proc/sys/vm/drop_caches', user='root') # now fragment all free memory - p = self.testvm1.run("grep ^MemFree: /proc/meminfo|awk '{print $2}'", - passio_popen=True) - memory_pages = int(p.communicate()[0].strip()) - memory_pages //= 4 # 4k pages - alloc1 = self.testvm1.run( - "ulimit -l unlimited; exec /home/user/allocator {}".format( + stdout, _ = yield from self.testvm1.run_for_stdio( + "grep ^MemFree: /proc/meminfo|awk '{print $2}'") + memory_pages = int(stdout) // 4 # 4k pages + + alloc1 = yield from self.testvm1.run( + 'ulimit -l unlimited; exec /home/user/allocator {}'.format( memory_pages), - user="root", passio_popen=True, passio_stderr=True) + user="root") + # wait for memory being allocated; can't use just .read(), because EOF # passing is unreliable while the process is still running - alloc1.stdin.write(b"\n") - alloc1.stdin.flush() - alloc_out = alloc1.stdout.read(len("Stage1\nStage2\nStage3\n")) + yield from alloc1.stdin.write(b'\n') + yield from alloc1.stdin.flush() + alloc_out = yield from alloc1.stdout.read( + len('Stage1\nStage2\nStage3\n')) - if b"Stage3" not in alloc_out: + if b'Stage3' not in alloc_out: # read stderr only in case of failed assert, but still have nice # failure message (don't use self.fail() directly) - self.assertIn(b"Stage3", alloc_out, alloc1.stderr.read()) + # + # XXX why don't read stderr always? --woju 20170523 + self.assertIn(b'Stage3', alloc_out, + (yield from alloc1.stderr.read())) # now, launch some window - it should get fragmented composition buffer # it is important to have some changing content there, to generate # content update events (aka damage notify) - proc = self.testvm1.run("gnome-terminal --full-screen -e top", - passio_popen=True) + proc = yield from self.testvm1.run( + 'gnome-terminal --full-screen -e top') # help xdotool a little... - time.sleep(2) + yield from asyncio.sleep(2) # get window ID - search = subprocess.Popen(['xdotool', 'search', '--sync', - '--onlyvisible', '--class', self.testvm1.name + ':.*erminal'], - stdout=subprocess.PIPE) - winid = search.communicate()[0].strip() - xprop = subprocess.Popen(['xprop', '-notype', '-id', winid, - '_QUBES_VMWINDOWID'], stdout=subprocess.PIPE) - vm_winid = xprop.stdout.read().decode().strip().split(' ')[4] + winid = yield from asyncio.get_event_loop().run_in_executor( + subprocess.check_output, + ['xdotool', 'search', '--sync', '--onlyvisible', '--class', + self.testvm1.name + ':.*erminal']).decode() + xprop = yield from asyncio.get_event_loop().run_in_executor( + subprocess.check_output, + ['xprop', '-notype', '-id', winid, '_QUBES_VMWINDOWID']) + vm_winid = xprop.decode().strip().split(' ')[4] # now free the fragmented memory and trigger compaction - alloc1.stdin.write(b"\n") - alloc1.stdin.flush() - alloc1.wait() - self.testvm1.run("echo 1 > /proc/sys/vm/compact_memory", user="root") + yield from alloc1.stdin.write(b'\n') + yield from alloc1.stdin.flush() + yield from alloc1.wait() + yield from self.testvm1.run_for_stdio( + 'echo 1 > /proc/sys/vm/compact_memory', user='root') # now window may be already "broken"; to be sure, allocate (=zero) # some memory - alloc2 = self.testvm1.run( - "ulimit -l unlimited; /home/user/allocator {}".format(memory_pages), - user="root", passio_popen=True, passio_stderr=True) - alloc2.stdout.read(len("Stage1\n")) + alloc2 = yield from self.testvm1.run( + 'ulimit -l unlimited; /home/user/allocator {}'.format(memory_pages), + user='root') + yield from alloc2.stdout.read(len('Stage1\n')) # wait for damage notify - top updates every 3 sec by default - time.sleep(6) + yield from asyncio.sleep(6) # now take screenshot of the window, from dom0 and VM # choose pnm format, as it doesn't have any useless metadata - easy # to compare - p = self.testvm1.run("import -window {} pnm:-".format(vm_winid), - passio_popen=True, passio_stderr=True) - (vm_image, stderr) = p.communicate() - if p.returncode != 0: - raise Exception("Failed to get VM window image: {}".format( - stderr)) + vm_image, _ = yield from self.testvm1.run_for_stdio( + 'import -window {} pnm:-'.format(vm_winid)) - p = subprocess.Popen(["import", "-window", winid, "pnm:-"], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (dom0_image, stderr) = p.communicate() - if p.returncode != 0: - raise Exception("Failed to get dom0 window image: {}".format( - stderr)) + dom0_image = yield from asyncio.get_event_loop().run_in_executor( + subprocess.check_output, ['import', '-window', winid, 'pnm:-']) if vm_image != dom0_image: self.fail("Dom0 window doesn't match VM window content") @@ -1013,7 +976,7 @@ class TC_10_Generic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): name=self.make_vm_name('vm'), label='red', template=self.app.default_template) - self.vm.create_on_disk() + self.loop.run_until_complete(self.vm.create_on_disk()) self.save_and_reload_db() self.vm = self.app.domains[self.vm.qid] @@ -1027,21 +990,19 @@ class TC_10_Generic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): flagfile = '/tmp/test-anyvmdeny-flag' if os.path.exists(flagfile): os.remove(flagfile) - with open('/etc/qubes-rpc/test.AnyvmDeny', 'w') as f: - f.write('touch {}\n'.format(flagfile)) - f.write('echo service output\n') - self.addCleanup(os.unlink, "/etc/qubes-rpc/test.AnyvmDeny") - self.vm.start() - p = self.vm.run("/usr/lib/qubes/qrexec-client-vm dom0 test.AnyvmDeny", - passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - self.assertEqual(p.returncode, 1, - '$anyvm matched dom0, qrexec-client-vm output: {}'. - format(stdout + stderr)) + self.create_local_file('/etc/qubes-rpc/test.AnyvmDeny', + 'touch {}\necho service output\n'.format(flagfile)) + + self.loop.run_until_complete(self.vm.start()) + with self.qrexec_policy('test.AnyvmDeny', self.vm, '$anyvm'): + with self.assertRaises(subprocess.CalledProcessError): + stdout, stderr = self.loop.run_until_complete( + self.vm.run_for_stdio( + '/usr/lib/qubes/qrexec-client-vm dom0 test.AnyvmDeny')) self.assertFalse(os.path.exists(flagfile), 'Flag file created (service was run) even though should be denied,' - ' qrexec-client-vm output: {}'.format(stdout + stderr)) + ' qrexec-client-vm output: {} {}'.format(stdout, stderr)) def load_tests(loader, tests, pattern): diff --git a/qubes/tests/vm/adminvm.py b/qubes/tests/vm/adminvm.py index 87310913..e2d41198 100644 --- a/qubes/tests/vm/adminvm.py +++ b/qubes/tests/vm/adminvm.py @@ -31,6 +31,7 @@ import qubes.tests @qubes.tests.skipUnlessDom0 class TC_00_AdminVM(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() try: self.app = qubes.tests.vm.TestApp() self.vm = qubes.vm.adminvm.AdminVM(self.app, diff --git a/qubes/tests/vm/init.py b/qubes/tests/vm/init.py index c3fb2f99..1df527d1 100644 --- a/qubes/tests/vm/init.py +++ b/qubes/tests/vm/init.py @@ -51,6 +51,7 @@ class TestVM(qubes.vm.BaseVM): class TC_10_BaseVM(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() self.xml = lxml.etree.XML(''' diff --git a/qubes/tests/vm/qubesvm.py b/qubes/tests/vm/qubesvm.py index a8d9f626..9e99240e 100644 --- a/qubes/tests/vm/qubesvm.py +++ b/qubes/tests/vm/qubesvm.py @@ -60,6 +60,7 @@ class TestVM(object): class TC_00_setters(qubes.tests.QubesTestCase): def setUp(self): + super().setUp() self.vm = TestVM() self.prop = TestProp()