diff --git a/tests/Makefile b/tests/Makefile index 8f01e2ad..243fe681 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -19,12 +19,22 @@ endif cp basic.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp block.py $(DESTDIR)$(PYTHON_TESTSPATH) cp block.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp dispvm.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp dispvm.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp dom0_update.py $(DESTDIR)$(PYTHON_TESTSPATH) cp dom0_update.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp hvm.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp hvm.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp mime.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp mime.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp network.py $(DESTDIR)$(PYTHON_TESTSPATH) cp network.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) - cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH) - cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp pvgrub.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp pvgrub.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp regressions.py $(DESTDIR)$(PYTHON_TESTSPATH) cp regressions.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp run.py $(DESTDIR)$(PYTHON_TESTSPATH) @@ -33,7 +43,5 @@ endif cp storage.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp storage_xen.py $(DESTDIR)$(PYTHON_TESTSPATH) cp storage_xen.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) - cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH) - cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) - cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH) - cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) diff --git a/tests/__init__.py b/tests/__init__.py index 0e6891d0..cc100bd1 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -755,7 +755,11 @@ def load_tests(loader, tests, pattern): 'qubes.tests.basic', 'qubes.tests.dom0_update', 'qubes.tests.network', + 'qubes.tests.dispvm', 'qubes.tests.vm_qrexec_gui', + 'qubes.tests.mime', + 'qubes.tests.hvm', + 'qubes.tests.pvgrub', 'qubes.tests.backup', 'qubes.tests.backupcompatibility', 'qubes.tests.regressions', diff --git a/tests/basic.py b/tests/basic.py index 2bbe2696..c07bd272 100644 --- a/tests/basic.py +++ b/tests/basic.py @@ -22,6 +22,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # +from distutils import spawn import multiprocessing import os @@ -699,205 +700,121 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin, self.setup_hvm_template() self._do_test() -class TC_04_DispVM(qubes.tests.SystemTestsMixin, - qubes.tests.QubesTestCase): - - @staticmethod - def get_dispvm_template_name(): - vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir') - return os.path.basename(vmdir) - - def test_000_firewall_propagation(self): - """ - Check firewall propagation VM->DispVM, when VM have some firewall rules - """ - - # FIXME: currently qubes.xml doesn't contain this information... - dispvm_template_name = self.get_dispvm_template_name() - dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) +class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): + @unittest.skipUnless(spawn.find_executable('xdotool'), + "xdotool not installed") + def test_000_clipboard(self): testvm1 = self.qc.add_new_vm("QubesAppVm", name=self.make_vm_name('vm1'), template=self.qc.get_default_template()) testvm1.create_on_disk(verbose=False) - firewall = testvm1.get_firewall_conf() - firewall['allowDns'] = False - firewall['allowYumProxy'] = False - firewall['rules'] = [{'address': '1.2.3.4', - 'netmask': 24, - 'proto': 'tcp', - 'portBegin': 22, - 'portEnd': 22, - }] - testvm1.write_firewall_conf(firewall) + testvm2 = self.qc.add_new_vm("QubesAppVm", + name=self.make_vm_name('vm2'), + template=self.qc.get_default_template()) + testvm2.create_on_disk(verbose=False) self.qc.save() self.qc.unlock_db() testvm1.start() + testvm2.start() - p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;" - " read x'", - passio_popen=True) + window_title = 'user@{}'.format(testvm1.name) + testvm1.run('zenity --text-info --editable --title={}'.format( + window_title)) - dispvm_name = p.stdout.readline().strip() - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - dispvm = self.qc.get_vm_by_name(dispvm_name) - self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( - dispvm_name)) - # check if firewall was propagated to the DispVM - self.assertEquals(testvm1.get_firewall_conf(), - dispvm.get_firewall_conf()) - # and only there (#1608) - self.assertNotEquals(dispvm_template.get_firewall_conf(), - dispvm.get_firewall_conf()) - # then modify some rule - firewall = dispvm.get_firewall_conf() - firewall['rules'] = [{'address': '4.3.2.1', - 'netmask': 24, - 'proto': 'tcp', - 'portBegin': 22, - 'portEnd': 22, - }] - dispvm.write_firewall_conf(firewall) - # and check again if wasn't saved anywhere else (#1608) - self.assertNotEquals(dispvm_template.get_firewall_conf(), - dispvm.get_firewall_conf()) - self.assertNotEquals(testvm1.get_firewall_conf(), - dispvm.get_firewall_conf()) - p.stdin.write('\n') + self.wait_for_window(window_title) + time.sleep(0.5) + test_string = "test{}".format(testvm1.xid) + + # Type and copy some text + subprocess.check_call(['xdotool', 'search', '--name', window_title, + 'windowactivate', '--sync', + 'type', '{}'.format(test_string)]) + # second xdotool call because type --terminator do not work (SEGV) + # additionally do not use search here, so window stack will be empty + # and xdotool will use XTEST instead of generating events manually - + # this will be much better - at least because events will have + # correct timestamp (so gui-daemon would not drop the copy request) + subprocess.check_call(['xdotool', + 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c', + 'Escape']) + + clipboard_content = \ + open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() + self.assertEquals(clipboard_content, test_string, + "Clipboard copy operation failed - content") + clipboard_source = \ + open('/var/run/qubes/qubes-clipboard.bin.source', + 'r').read().strip() + self.assertEquals(clipboard_source, testvm1.name, + "Clipboard copy operation failed - owner") + + # Then paste it to the other window + window_title = 'user@{}'.format(testvm2.name) + p = testvm2.run('zenity --entry --title={} > test.txt'.format( + window_title), passio_popen=True) + self.wait_for_window(window_title) + + subprocess.check_call(['xdotool', 'key', '--delay', '100', + 'ctrl+shift+v', 'ctrl+v', 'Return']) p.wait() - def test_001_firewall_propagation(self): - """ - Check firewall propagation VM->DispVM, when VM have no firewall rules - """ + # And compare the result + (test_output, _) = testvm2.run('cat test.txt', + passio_popen=True).communicate() + self.assertEquals(test_string, test_output.strip()) + + clipboard_content = \ + open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() + self.assertEquals(clipboard_content, "", + "Clipboard not wiped after paste - content") + clipboard_source = \ + open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read( + + ).strip() + self.assertEquals(clipboard_source, "", + "Clipboard not wiped after paste - owner") + +class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): + def test_000_create_start(self): testvm1 = self.qc.add_new_vm("QubesAppVm", - name=self.make_vm_name('vm1'), - template=self.qc.get_default_template()) - testvm1.create_on_disk(verbose=False) + template=None, + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False, + source_template=self.qc.get_default_template()) self.qc.save() self.qc.unlock_db() + testvm1.start() + self.assertEquals(testvm1.get_power_state(), "Running") - # FIXME: currently qubes.xml doesn't contain this information... - dispvm_template_name = self.get_dispvm_template_name() - dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) - original_firewall = None - if os.path.exists(dispvm_template.firewall_conf): - original_firewall = tempfile.TemporaryFile() - with open(dispvm_template.firewall_conf) as f: - original_firewall.write(f.read()) - try: - - firewall = dispvm_template.get_firewall_conf() - firewall['allowDns'] = False - firewall['allowYumProxy'] = False - firewall['rules'] = [{'address': '1.2.3.4', - 'netmask': 24, - 'proto': 'tcp', - 'portBegin': 22, - 'portEnd': 22, - }] - dispvm_template.write_firewall_conf(firewall) - - testvm1.start() - - p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;" - " read x'", - passio_popen=True) - - dispvm_name = p.stdout.readline().strip() - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - dispvm = self.qc.get_vm_by_name(dispvm_name) - self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( - dispvm_name)) - # check if firewall was propagated to the DispVM from the right VM - self.assertEquals(testvm1.get_firewall_conf(), - dispvm.get_firewall_conf()) - # and only there (#1608) - self.assertNotEquals(dispvm_template.get_firewall_conf(), - dispvm.get_firewall_conf()) - # then modify some rule - firewall = dispvm.get_firewall_conf() - firewall['rules'] = [{'address': '4.3.2.1', - 'netmask': 24, - 'proto': 'tcp', - 'portBegin': 22, - 'portEnd': 22, - }] - dispvm.write_firewall_conf(firewall) - # and check again if wasn't saved anywhere else (#1608) - self.assertNotEquals(dispvm_template.get_firewall_conf(), - dispvm.get_firewall_conf()) - self.assertNotEquals(testvm1.get_firewall_conf(), - dispvm.get_firewall_conf()) - p.stdin.write('\n') - p.wait() - finally: - if original_firewall: - original_firewall.seek(0) - with open(dispvm_template.firewall_conf, 'w') as f: - f.write(original_firewall.read()) - original_firewall.close() - else: - os.unlink(dispvm_template.firewall_conf) - - def test_002_cleanup(self): + def test_100_resize_root_img(self): + testvm1 = self.qc.add_new_vm("QubesAppVm", + template=None, + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False, + source_template=self.qc.get_default_template()) + self.qc.save() self.qc.unlock_db() - p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', - 'qubes.VMShell', 'dom0', 'DEFAULT'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=open(os.devnull, 'w')) - (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; " - "echo ERROR\n") - self.assertEquals(p.returncode, 0) - lines = stdout.splitlines() - self.assertEqual(lines[0], "test") - dispvm_name = lines[1] - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - dispvm = self.qc.get_vm_by_name(dispvm_name) - self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format( - dispvm_name)) - - def test_003_cleanup_destroyed(self): - """ - Check if DispVM is properly removed even if it terminated itself (#1660) - :return: - """ - self.qc.unlock_db() - p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', - 'qubes.VMShell', 'dom0', 'DEFAULT'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=open(os.devnull, 'w')) - p.stdin.write("qubesdb-read /name\n") - p.stdin.write("echo ERROR\n") - p.stdin.write("sudo poweroff\n") - # do not close p.stdin on purpose - wait to automatic disconnect when - # domain is destroyed - timeout = 30 - while timeout > 0: - if p.poll(): - break + with self.assertRaises(QubesException): + testvm1.resize_root_img(20*1024**3) + testvm1.resize_root_img(20*1024**3, allow_start=True) + timeout = 60 + while testvm1.is_running(): time.sleep(1) timeout -= 1 - # includes check for None - timeout - self.assertEquals(p.returncode, 0) - lines = p.stdout.read().splitlines() - dispvm_name = lines[0] - self.assertNotEquals(dispvm_name, "ERROR") - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - dispvm = self.qc.get_vm_by_name(dispvm_name) - self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format( - dispvm_name)) + if timeout == 0: + self.fail("Timeout while waiting for VM shutdown") + self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3) + testvm1.start() + p = testvm1.run('df --output=size /|tail -n 1', + passio_popen=True) + # new_size in 1k-blocks + (new_size, _) = p.communicate() + # some safety margin for FS metadata + self.assertGreater(int(new_size.strip()), 19*1024**2) + + diff --git a/tests/dispvm.py b/tests/dispvm.py new file mode 100644 index 00000000..2c5c056c --- /dev/null +++ b/tests/dispvm.py @@ -0,0 +1,431 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# + +from distutils import spawn +import qubes.tests +import subprocess +import tempfile +import unittest +import os +import time + +class TC_04_DispVM(qubes.tests.SystemTestsMixin, + qubes.tests.QubesTestCase): + + @staticmethod + def get_dispvm_template_name(): + vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir') + return os.path.basename(vmdir) + + def test_000_firewall_propagation(self): + """ + Check firewall propagation VM->DispVM, when VM have some firewall rules + """ + + # FIXME: currently qubes.xml doesn't contain this information... + dispvm_template_name = self.get_dispvm_template_name() + dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) + + testvm1 = self.qc.add_new_vm("QubesAppVm", + name=self.make_vm_name('vm1'), + template=self.qc.get_default_template()) + testvm1.create_on_disk(verbose=False) + firewall = testvm1.get_firewall_conf() + firewall['allowDns'] = False + firewall['allowYumProxy'] = False + firewall['rules'] = [{'address': '1.2.3.4', + 'netmask': 24, + 'proto': 'tcp', + 'portBegin': 22, + 'portEnd': 22, + }] + testvm1.write_firewall_conf(firewall) + self.qc.save() + self.qc.unlock_db() + + testvm1.start() + + p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;" + " read x'", + passio_popen=True) + + dispvm_name = p.stdout.readline().strip() + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + dispvm = self.qc.get_vm_by_name(dispvm_name) + self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( + dispvm_name)) + # check if firewall was propagated to the DispVM + self.assertEquals(testvm1.get_firewall_conf(), + dispvm.get_firewall_conf()) + # and only there (#1608) + self.assertNotEquals(dispvm_template.get_firewall_conf(), + dispvm.get_firewall_conf()) + # then modify some rule + firewall = dispvm.get_firewall_conf() + firewall['rules'] = [{'address': '4.3.2.1', + 'netmask': 24, + 'proto': 'tcp', + 'portBegin': 22, + 'portEnd': 22, + }] + dispvm.write_firewall_conf(firewall) + # and check again if wasn't saved anywhere else (#1608) + self.assertNotEquals(dispvm_template.get_firewall_conf(), + dispvm.get_firewall_conf()) + self.assertNotEquals(testvm1.get_firewall_conf(), + dispvm.get_firewall_conf()) + p.stdin.write('\n') + p.wait() + + def test_001_firewall_propagation(self): + """ + Check firewall propagation VM->DispVM, when VM have no firewall rules + """ + testvm1 = self.qc.add_new_vm("QubesAppVm", + name=self.make_vm_name('vm1'), + template=self.qc.get_default_template()) + testvm1.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + + # FIXME: currently qubes.xml doesn't contain this information... + dispvm_template_name = self.get_dispvm_template_name() + dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) + original_firewall = None + if os.path.exists(dispvm_template.firewall_conf): + original_firewall = tempfile.TemporaryFile() + with open(dispvm_template.firewall_conf) as f: + original_firewall.write(f.read()) + try: + + firewall = dispvm_template.get_firewall_conf() + firewall['allowDns'] = False + firewall['allowYumProxy'] = False + firewall['rules'] = [{'address': '1.2.3.4', + 'netmask': 24, + 'proto': 'tcp', + 'portBegin': 22, + 'portEnd': 22, + }] + dispvm_template.write_firewall_conf(firewall) + + testvm1.start() + + p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;" + " read x'", + passio_popen=True) + + dispvm_name = p.stdout.readline().strip() + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + dispvm = self.qc.get_vm_by_name(dispvm_name) + self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( + dispvm_name)) + # check if firewall was propagated to the DispVM from the right VM + self.assertEquals(testvm1.get_firewall_conf(), + dispvm.get_firewall_conf()) + # and only there (#1608) + self.assertNotEquals(dispvm_template.get_firewall_conf(), + dispvm.get_firewall_conf()) + # then modify some rule + firewall = dispvm.get_firewall_conf() + firewall['rules'] = [{'address': '4.3.2.1', + 'netmask': 24, + 'proto': 'tcp', + 'portBegin': 22, + 'portEnd': 22, + }] + dispvm.write_firewall_conf(firewall) + # and check again if wasn't saved anywhere else (#1608) + self.assertNotEquals(dispvm_template.get_firewall_conf(), + dispvm.get_firewall_conf()) + self.assertNotEquals(testvm1.get_firewall_conf(), + dispvm.get_firewall_conf()) + p.stdin.write('\n') + p.wait() + finally: + if original_firewall: + original_firewall.seek(0) + with open(dispvm_template.firewall_conf, 'w') as f: + f.write(original_firewall.read()) + original_firewall.close() + else: + os.unlink(dispvm_template.firewall_conf) + + def test_002_cleanup(self): + self.qc.unlock_db() + p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', + 'qubes.VMShell', 'dom0', 'DEFAULT'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; " + "echo ERROR\n") + self.assertEquals(p.returncode, 0) + lines = stdout.splitlines() + self.assertEqual(lines[0], "test") + dispvm_name = lines[1] + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + dispvm = self.qc.get_vm_by_name(dispvm_name) + self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format( + dispvm_name)) + + def test_003_cleanup_destroyed(self): + """ + Check if DispVM is properly removed even if it terminated itself (#1660) + :return: + """ + self.qc.unlock_db() + p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', + 'qubes.VMShell', 'dom0', 'DEFAULT'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + p.stdin.write("qubesdb-read /name\n") + p.stdin.write("echo ERROR\n") + p.stdin.write("sudo poweroff\n") + # do not close p.stdin on purpose - wait to automatic disconnect when + # domain is destroyed + timeout = 30 + while timeout > 0: + if p.poll(): + break + time.sleep(1) + timeout -= 1 + # includes check for None - timeout + self.assertEquals(p.returncode, 0) + lines = p.stdout.read().splitlines() + dispvm_name = lines[0] + self.assertNotEquals(dispvm_name, "ERROR") + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + dispvm = self.qc.get_vm_by_name(dispvm_name) + self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format( + dispvm_name)) + + +class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin): + def test_000_prepare_dvm(self): + self.qc.unlock_db() + retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', + self.template], + stderr=open(os.devnull, 'w')) + self.assertEqual(retcode, 0) + self.qc.lock_db_for_writing() + self.qc.load() + self.assertIsNotNone(self.qc.get_vm_by_name( + self.template + "-dvm")) + # TODO: check mtime of snapshot file + + def test_010_simple_dvm_run(self): + self.qc.unlock_db() + p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', + 'qubes.VMShell', 'dom0', 'DEFAULT'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + (stdout, _) = p.communicate(input="echo test") + self.assertEqual(stdout, "test\n") + # TODO: check if DispVM is destroyed + + @unittest.skipUnless(spawn.find_executable('xdotool'), + "xdotool not installed") + def test_020_gui_app(self): + self.qc.unlock_db() + p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', + 'qubes.VMShell', 'dom0', 'DEFAULT'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + + # wait for DispVM startup: + p.stdin.write("echo test\n") + p.stdin.flush() + l = p.stdout.readline() + self.assertEqual(l, "test\n") + + # potential race condition, but our tests are supposed to be + # running on dedicated machine, so should not be a problem + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + + max_qid = 0 + for vm in self.qc.values(): + if not vm.is_disposablevm(): + continue + if vm.qid > max_qid: + max_qid = vm.qid + dispvm = self.qc[max_qid] + self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml") + self.assertTrue(dispvm.is_running()) + try: + window_title = 'user@%s' % (dispvm.template.name + "-dvm") + p.stdin.write("xterm -e " + "\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n". + format(window_title)) + self.wait_for_window(window_title) + + time.sleep(0.5) + self.enter_keys_in_window(window_title, ['Return']) + # Wait for window to close + self.wait_for_window(window_title, show=False) + finally: + p.stdin.close() + + wait_count = 0 + while dispvm.is_running(): + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for DispVM destruction") + time.sleep(0.1) + wait_count = 0 + while p.poll() is None: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for qfile-daemon-dvm " + "termination") + time.sleep(0.1) + self.assertEqual(p.returncode, 0) + + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + self.assertIsNone(self.qc.get_vm_by_name(dispvm.name), + "DispVM not removed from qubes.xml") + + def _handle_editor(self, winid): + (window_title, _) = subprocess.Popen( + ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\ + communicate() + window_title = window_title.strip().\ + replace('(', '\(').replace(')', '\)') + time.sleep(1) + if "gedit" in window_title: + subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, + 'type', 'Test test 2\n']) + time.sleep(0.5) + subprocess.check_call(['xdotool', + 'key', 'ctrl+s', 'ctrl+q']) + elif "LibreOffice" in window_title: + # wait for actual editor (we've got splash screen) + search = subprocess.Popen(['xdotool', 'search', '--sync', + '--onlyvisible', '--all', '--name', '--class', 'disp*|Writer'], + stdout=subprocess.PIPE, + stderr=open(os.path.devnull, 'w')) + retcode = search.wait() + if retcode == 0: + winid = search.stdout.read().strip() + time.sleep(0.5) + subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, + 'type', 'Test test 2\n']) + time.sleep(0.5) + subprocess.check_call(['xdotool', + 'key', '--delay', '100', 'ctrl+s', + 'Return', 'ctrl+q']) + elif "emacs" in window_title: + subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, + 'type', 'Test test 2\n']) + time.sleep(0.5) + subprocess.check_call(['xdotool', + 'key', 'ctrl+x', 'ctrl+s']) + subprocess.check_call(['xdotool', + 'key', 'ctrl+x', 'ctrl+c']) + elif "vim" in window_title or "user@" in window_title: + subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, + 'key', 'i', 'type', 'Test test 2\n']) + subprocess.check_call( + ['xdotool', + 'key', 'Escape', 'colon', 'w', 'q', 'Return']) + else: + self.fail("Unknown editor window: {}".format(window_title)) + + @unittest.skipUnless(spawn.find_executable('xdotool'), + "xdotool not installed") + def test_030_edit_file(self): + testvm1 = self.qc.add_new_vm("QubesAppVm", + name=self.make_vm_name('vm1'), + template=self.qc.get_vm_by_name( + self.template)) + testvm1.create_on_disk(verbose=False) + self.qc.save() + + testvm1.start() + testvm1.run("echo test1 > /home/user/test.txt", wait=True) + + self.qc.unlock_db() + p = testvm1.run("qvm-open-in-dvm /home/user/test.txt", + passio_popen=True) + + wait_count = 0 + winid = None + while True: + search = subprocess.Popen(['xdotool', 'search', + '--onlyvisible', '--class', 'disp*'], + stdout=subprocess.PIPE, + stderr=open(os.path.devnull, 'w')) + retcode = search.wait() + if retcode == 0: + winid = search.stdout.read().strip() + break + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for editor window") + time.sleep(0.3) + + time.sleep(0.5) + self._handle_editor(winid) + p.wait() + p = testvm1.run("cat /home/user/test.txt", + passio_popen=True) + (test_txt_content, _) = p.communicate() + # Drop BOM if added by editor + if test_txt_content.startswith('\xef\xbb\xbf'): + test_txt_content = test_txt_content[3:] + self.assertEqual(test_txt_content, "Test test 2\ntest1\n") + +def load_tests(loader, tests, pattern): + try: + qc = qubes.qubes.QubesVmCollection() + qc.lock_db_for_reading() + qc.load() + qc.unlock_db() + templates = [vm.name for vm in qc.values() if + isinstance(vm, qubes.qubes.QubesTemplateVm)] + except OSError: + templates = [] + for template in templates: + tests.addTests(loader.loadTestsFromTestCase( + type( + 'TC_20_DispVM_' + template, + (TC_20_DispVMMixin, qubes.tests.QubesTestCase), + {'template': template}))) + + return tests \ No newline at end of file diff --git a/tests/hvm.py b/tests/hvm.py new file mode 100644 index 00000000..9cc3e132 --- /dev/null +++ b/tests/hvm.py @@ -0,0 +1,125 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# + +import qubes.tests +from qubes.qubes import QubesException + +class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): + # TODO: test with some OS inside + # TODO: windows tools tests + + def test_000_create_start(self): + testvm1 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + testvm1.start() + self.assertEquals(testvm1.get_power_state(), "Running") + + def test_010_create_start_template(self): + templatevm = self.qc.add_new_vm("QubesTemplateHVm", + name=self.make_vm_name('template')) + templatevm.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + + templatevm.start() + self.assertEquals(templatevm.get_power_state(), "Running") + + def test_020_create_start_template_vm(self): + templatevm = self.qc.add_new_vm("QubesTemplateHVm", + name=self.make_vm_name('template')) + templatevm.create_on_disk(verbose=False) + testvm2 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm2'), + template=templatevm) + testvm2.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + + testvm2.start() + self.assertEquals(testvm2.get_power_state(), "Running") + + def test_030_prevent_simultaneus_start(self): + templatevm = self.qc.add_new_vm("QubesTemplateHVm", + name=self.make_vm_name('template')) + templatevm.create_on_disk(verbose=False) + testvm2 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm2'), + template=templatevm) + testvm2.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + + templatevm.start() + self.assertEquals(templatevm.get_power_state(), "Running") + self.assertRaises(QubesException, testvm2.start) + templatevm.force_shutdown() + testvm2.start() + self.assertEquals(testvm2.get_power_state(), "Running") + self.assertRaises(QubesException, templatevm.start) + + def test_100_resize_root_img(self): + testvm1 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + testvm1.resize_root_img(30*1024**3) + self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3) + testvm1.start() + self.assertEquals(testvm1.get_power_state(), "Running") + # TODO: launch some OS there and check the size + + def test_200_start_invalid_drive(self): + """Regression test for #1619""" + testvm1 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False) + testvm1.drive = 'hd:dom0:/invalid' + self.qc.save() + self.qc.unlock_db() + try: + testvm1.start() + except Exception as e: + self.assertIsInstance(e, QubesException) + else: + self.fail('No exception raised') + + def test_201_start_invalid_drive_cdrom(self): + """Regression test for #1619""" + testvm1 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False) + testvm1.drive = 'cdrom:dom0:/invalid' + self.qc.save() + self.qc.unlock_db() + try: + testvm1.start() + except Exception as e: + self.assertIsInstance(e, QubesException) + else: + self.fail('No exception raised') + diff --git a/tests/mime.py b/tests/mime.py new file mode 100644 index 00000000..660a0338 --- /dev/null +++ b/tests/mime.py @@ -0,0 +1,354 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# +from distutils import spawn +import os +import re +import subprocess +import time +import unittest + +import qubes.tests +import qubes.qubes +from qubes.qubes import QubesVmCollection + +@unittest.skipUnless( + spawn.find_executable('xprop') and + spawn.find_executable('xdotool') and + spawn.find_executable('wmctrl'), + "xprop or xdotool or wmctrl not installed") +class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): + @classmethod + def setUpClass(cls): + if cls.template == 'whonix-gw' or 'minimal' in cls.template: + raise unittest.SkipTest( + 'Template {} not supported by this test'.format(cls.template)) + + if cls.template == 'whonix-ws': + # TODO remove when Whonix-based DispVMs will work (Whonix 13?) + raise unittest.SkipTest( + 'Template {} not supported by this test'.format(cls.template)) + + qc = QubesVmCollection() + + cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX) + + qc.lock_db_for_writing() + qc.load() + + cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn, + prefix=qubes.tests.CLSVMPREFIX) + + cls.source_vmname = cls.make_vm_name('source', True) + source_vm = qc.add_new_vm("QubesAppVm", + template=qc.get_vm_by_name(cls.template), + name=cls.source_vmname) + source_vm.create_on_disk(verbose=False) + + cls.target_vmname = cls.make_vm_name('target', True) + target_vm = qc.add_new_vm("QubesAppVm", + template=qc.get_vm_by_name(cls.template), + name=cls.target_vmname) + target_vm.create_on_disk(verbose=False) + + qc.save() + qc.unlock_db() + source_vm.start() + target_vm.start() + + # make sure that DispVMs will be started of the same template + retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', + cls.template], + stderr=open(os.devnull, 'w')) + assert retcode == 0, "Error preparing DispVM" + + def setUp(self): + super(TC_50_MimeHandlers, self).setUp() + self.source_vm = self.qc.get_vm_by_name(self.source_vmname) + self.target_vm = self.qc.get_vm_by_name(self.target_vmname) + + def get_window_class(self, winid, dispvm=False): + (vm_winid, _) = subprocess.Popen( + ['xprop', '-id', winid, '_QUBES_VMWINDOWID'], + stdout=subprocess.PIPE + ).communicate() + vm_winid = vm_winid.split("#")[1].strip('\n" ') + if dispvm: + (vmname, _) = subprocess.Popen( + ['xprop', '-id', winid, '_QUBES_VMNAME'], + stdout=subprocess.PIPE + ).communicate() + vmname = vmname.split("=")[1].strip('\n" ') + window_class = None + while window_class is None: + # XXX to use self.qc.get_vm_by_name would require reloading + # qubes.xml, so use qvm-run instead + xprop = subprocess.Popen( + ['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format( + vm_winid)], stdout=subprocess.PIPE) + (window_class, _) = xprop.communicate() + if xprop.returncode != 0: + self.skipTest("xprop failed, not installed?") + if 'not found' in window_class: + # WM_CLASS not set yet, wait a little + time.sleep(0.1) + window_class = None + else: + window_class = None + while window_class is None: + xprop = self.target_vm.run( + 'xprop -id {} WM_CLASS'.format(vm_winid), + passio_popen=True) + (window_class, _) = xprop.communicate() + if xprop.returncode != 0: + self.skipTest("xprop failed, not installed?") + if 'not found' in window_class: + # WM_CLASS not set yet, wait a little + time.sleep(0.1) + window_class = None + # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal" + try: + window_class = window_class.split("=")[1].split(",")[0].strip('\n" ') + except IndexError: + raise Exception( + "Unexpected output from xprop: '{}'".format(window_class)) + + return window_class + + def open_file_and_check_viewer(self, filename, expected_app_titles, + expected_app_classes, dispvm=False): + self.qc.unlock_db() + if dispvm: + p = self.source_vm.run("qvm-open-in-dvm {}".format(filename), + passio_popen=True) + vmpattern = "disp*" + else: + self.qrexec_policy('qubes.OpenInVM', self.source_vm.name, + self.target_vmname) + self.qrexec_policy('qubes.OpenURL', self.source_vm.name, + self.target_vmname) + p = self.source_vm.run("qvm-open-in-vm {} {}".format( + self.target_vmname, filename), passio_popen=True) + vmpattern = self.target_vmname + wait_count = 0 + winid = None + window_title = None + while True: + search = subprocess.Popen(['xdotool', 'search', + '--onlyvisible', '--class', vmpattern], + stdout=subprocess.PIPE, + stderr=open(os.path.devnull, 'w')) + retcode = search.wait() + if retcode == 0: + winid = search.stdout.read().strip() + # get window title + (window_title, _) = subprocess.Popen( + ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \ + communicate() + window_title = window_title.strip() + # ignore LibreOffice splash screen and window with no title + # set yet + if window_title and not window_title.startswith("LibreOffice")\ + and not window_title == 'VMapp command': + break + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for editor window") + time.sleep(0.3) + + # get window class + window_class = self.get_window_class(winid, dispvm) + # close the window - we've got the window class, it is no longer needed + subprocess.check_call(['wmctrl', '-i', '-c', winid]) + p.wait() + self.wait_for_window(window_title, show=False) + + def check_matches(obj, patterns): + return any((pat.search(obj) if isinstance(pat, type(re.compile(''))) + else pat in obj) for pat in patterns) + + if not check_matches(window_title, expected_app_titles) and \ + not check_matches(window_class, expected_app_classes): + self.fail("Opening file {} resulted in window '{} ({})', which is " + "none of {!r} ({!r})".format( + filename, window_title, window_class, + expected_app_titles, expected_app_classes)) + + def prepare_txt(self, filename): + p = self.source_vm.run("cat > {}".format(filename), passio_popen=True) + p.stdin.write("This is test\n") + p.stdin.close() + retcode = p.wait() + assert retcode == 0, "Failed to write {} file".format(filename) + + def prepare_pdf(self, filename): + self.prepare_txt("/tmp/source.txt") + cmd = "convert /tmp/source.txt {}".format(filename) + retcode = self.source_vm.run(cmd, wait=True) + assert retcode == 0, "Failed to run '{}'".format(cmd) + + def prepare_doc(self, filename): + self.prepare_txt("/tmp/source.txt") + cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename) + retcode = self.source_vm.run(cmd, wait=True) + if retcode != 0: + self.skipTest("Failed to run '{}', not installed?".format(cmd)) + + def prepare_pptx(self, filename): + self.prepare_txt("/tmp/source.txt") + cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename) + retcode = self.source_vm.run(cmd, wait=True) + if retcode != 0: + self.skipTest("Failed to run '{}', not installed?".format(cmd)) + + def prepare_png(self, filename): + self.prepare_txt("/tmp/source.txt") + cmd = "convert /tmp/source.txt {}".format(filename) + retcode = self.source_vm.run(cmd, wait=True) + if retcode != 0: + self.skipTest("Failed to run '{}', not installed?".format(cmd)) + + def prepare_jpg(self, filename): + self.prepare_txt("/tmp/source.txt") + cmd = "convert /tmp/source.txt {}".format(filename) + retcode = self.source_vm.run(cmd, wait=True) + if retcode != 0: + self.skipTest("Failed to run '{}', not installed?".format(cmd)) + + def test_000_txt(self): + filename = "/home/user/test_file.txt" + self.prepare_txt(filename) + self.open_file_and_check_viewer(filename, ["vim", "user@"], + ["gedit", "emacs", "libreoffice"]) + + def test_001_pdf(self): + filename = "/home/user/test_file.pdf" + self.prepare_pdf(filename) + self.open_file_and_check_viewer(filename, [], + ["evince"]) + + def test_002_doc(self): + filename = "/home/user/test_file.doc" + self.prepare_doc(filename) + self.open_file_and_check_viewer(filename, [], + ["libreoffice", "abiword"]) + + def test_003_pptx(self): + filename = "/home/user/test_file.pptx" + self.prepare_pptx(filename) + self.open_file_and_check_viewer(filename, [], + ["libreoffice"]) + + def test_004_png(self): + filename = "/home/user/test_file.png" + self.prepare_png(filename) + self.open_file_and_check_viewer(filename, [], + ["shotwell", "eog", "display"]) + + def test_005_jpg(self): + filename = "/home/user/test_file.jpg" + self.prepare_jpg(filename) + self.open_file_and_check_viewer(filename, [], + ["shotwell", "eog", "display"]) + + def test_006_jpeg(self): + filename = "/home/user/test_file.jpeg" + self.prepare_jpg(filename) + self.open_file_and_check_viewer(filename, [], + ["shotwell", "eog", "display"]) + + def test_010_url(self): + self.open_file_and_check_viewer("https://www.qubes-os.org/", [], + ["Firefox", "Iceweasel", "Navigator"]) + + def test_100_txt_dispvm(self): + filename = "/home/user/test_file.txt" + self.prepare_txt(filename) + self.open_file_and_check_viewer(filename, ["vim", "user@"], + ["gedit", "emacs", "libreoffice"], + dispvm=True) + + def test_101_pdf_dispvm(self): + filename = "/home/user/test_file.pdf" + self.prepare_pdf(filename) + self.open_file_and_check_viewer(filename, [], + ["evince"], + dispvm=True) + + def test_102_doc_dispvm(self): + filename = "/home/user/test_file.doc" + self.prepare_doc(filename) + self.open_file_and_check_viewer(filename, [], + ["libreoffice", "abiword"], + dispvm=True) + + def test_103_pptx_dispvm(self): + filename = "/home/user/test_file.pptx" + self.prepare_pptx(filename) + self.open_file_and_check_viewer(filename, [], + ["libreoffice"], + dispvm=True) + + def test_104_png_dispvm(self): + filename = "/home/user/test_file.png" + self.prepare_png(filename) + self.open_file_and_check_viewer(filename, [], + ["shotwell", "eog", "display"], + dispvm=True) + + def test_105_jpg_dispvm(self): + filename = "/home/user/test_file.jpg" + self.prepare_jpg(filename) + self.open_file_and_check_viewer(filename, [], + ["shotwell", "eog", "display"], + dispvm=True) + + def test_106_jpeg_dispvm(self): + filename = "/home/user/test_file.jpeg" + self.prepare_jpg(filename) + self.open_file_and_check_viewer(filename, [], + ["shotwell", "eog", "display"], + dispvm=True) + + def test_110_url_dispvm(self): + self.open_file_and_check_viewer("https://www.qubes-os.org/", [], + ["Firefox", "Iceweasel", "Navigator"], + dispvm=True) + +def load_tests(loader, tests, pattern): + try: + qc = qubes.qubes.QubesVmCollection() + qc.lock_db_for_reading() + qc.load() + qc.unlock_db() + templates = [vm.name for vm in qc.values() if + isinstance(vm, qubes.qubes.QubesTemplateVm)] + except OSError: + templates = [] + for template in templates: + tests.addTests(loader.loadTestsFromTestCase( + type( + 'TC_50_MimeHandlers_' + template, + (TC_50_MimeHandlers, qubes.tests.QubesTestCase), + {'template': template}))) + return tests \ No newline at end of file diff --git a/tests/pvgrub.py b/tests/pvgrub.py new file mode 100644 index 00000000..5467d2a3 --- /dev/null +++ b/tests/pvgrub.py @@ -0,0 +1,172 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# + +import os +import unittest +import qubes.tests +@unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'), + 'grub-xen package not installed') +class TC_40_PVGrub(qubes.tests.SystemTestsMixin): + def setUp(self): + super(TC_40_PVGrub, self).setUp() + supported = False + if self.template.startswith('fedora-'): + supported = True + elif self.template.startswith('debian-'): + supported = True + if not supported: + self.skipTest("Template {} not supported by this test".format( + self.template)) + + def install_packages(self, vm): + if self.template.startswith('fedora-'): + cmd_install1 = 'dnf clean expire-cache && ' \ + 'dnf install -y qubes-kernel-vm-support grub2-tools' + cmd_install2 = 'dnf install -y kernel && ' \ + 'KVER=$(rpm -q --qf %{VERSION}-%{RELEASE}.%{ARCH} kernel) && ' \ + 'dnf install --allowerasing -y kernel-devel-$KVER && ' \ + 'dkms autoinstall -k $KVER' + cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg' + elif self.template.startswith('debian-'): + cmd_install1 = 'apt-get update && apt-get install -y ' \ + 'qubes-kernel-vm-support grub2-common' + cmd_install2 = 'apt-get install -y linux-image-amd64' + cmd_update_grub = 'mkdir /boot/grub && update-grub2' + else: + assert False, "Unsupported template?!" + + for cmd in [cmd_install1, cmd_install2, cmd_update_grub]: + p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True) + (stdout, stderr) = p.communicate() + self.assertEquals(p.returncode, 0, + "Failed command: {}\nSTDOUT: {}\nSTDERR: {}" + .format(cmd, stdout, stderr)) + + def get_kernel_version(self, vm): + if self.template.startswith('fedora-'): + cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \ + 'cut -d - -f 2-' + elif self.template.startswith('debian-'): + cmd_get_kernel_version = \ + 'dpkg-query --showformat=\'${Package}\\n\' --show ' \ + '\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-' + else: + raise RuntimeError("Unsupported template?!") + + p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True) + (kver, _) = p.communicate() + self.assertEquals(p.returncode, 0, + "Failed command: {}".format(cmd_get_kernel_version)) + return kver.strip() + + def test_000_standalone_vm(self): + testvm1 = self.qc.add_new_vm("QubesAppVm", + template=None, + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False, + source_template=self.qc.get_vm_by_name( + self.template)) + self.save_and_reload_db() + self.qc.unlock_db() + testvm1 = self.qc[testvm1.qid] + testvm1.start() + self.install_packages(testvm1) + kver = self.get_kernel_version(testvm1) + self.shutdown_and_wait(testvm1) + + self.qc.lock_db_for_writing() + self.qc.load() + testvm1 = self.qc[testvm1.qid] + testvm1.kernel = 'pvgrub2' + self.save_and_reload_db() + self.qc.unlock_db() + testvm1 = self.qc[testvm1.qid] + testvm1.start() + p = testvm1.run('uname -r', passio_popen=True) + (actual_kver, _) = p.communicate() + self.assertEquals(actual_kver.strip(), kver) + + def test_010_template_based_vm(self): + test_template = self.qc.add_new_vm("QubesTemplateVm", + template=None, + name=self.make_vm_name('template')) + test_template.clone_attrs(self.qc.get_vm_by_name(self.template)) + test_template.clone_disk_files( + src_vm=self.qc.get_vm_by_name(self.template), + verbose=False) + + testvm1 = self.qc.add_new_vm("QubesAppVm", + template=test_template, + name=self.make_vm_name('vm1')) + testvm1.create_on_disk(verbose=False, + source_template=test_template) + self.save_and_reload_db() + self.qc.unlock_db() + test_template = self.qc[test_template.qid] + testvm1 = self.qc[testvm1.qid] + test_template.start() + self.install_packages(test_template) + kver = self.get_kernel_version(test_template) + self.shutdown_and_wait(test_template) + + self.qc.lock_db_for_writing() + self.qc.load() + test_template = self.qc[test_template.qid] + test_template.kernel = 'pvgrub2' + testvm1 = self.qc[testvm1.qid] + testvm1.kernel = 'pvgrub2' + self.save_and_reload_db() + self.qc.unlock_db() + + # Check if TemplateBasedVM boots and has the right kernel + testvm1 = self.qc[testvm1.qid] + testvm1.start() + p = testvm1.run('uname -r', passio_popen=True) + (actual_kver, _) = p.communicate() + self.assertEquals(actual_kver.strip(), kver) + + # And the same for the TemplateVM itself + test_template = self.qc[test_template.qid] + test_template.start() + p = test_template.run('uname -r', passio_popen=True) + (actual_kver, _) = p.communicate() + self.assertEquals(actual_kver.strip(), kver) + +def load_tests(loader, tests, pattern): + try: + qc = qubes.qubes.QubesVmCollection() + qc.lock_db_for_reading() + qc.load() + qc.unlock_db() + templates = [vm.name for vm in qc.values() if + isinstance(vm, qubes.qubes.QubesTemplateVm)] + except OSError: + templates = [] + for template in templates: + tests.addTests(loader.loadTestsFromTestCase( + type( + 'TC_40_PVGrub_' + template, + (TC_40_PVGrub, qubes.tests.QubesTestCase), + {'template': template}))) + return tests \ No newline at end of file diff --git a/tests/vm_qrexec_gui.py b/tests/vm_qrexec_gui.py index c82f43e2..22fff2fd 100644 --- a/tests/vm_qrexec_gui.py +++ b/tests/vm_qrexec_gui.py @@ -862,831 +862,6 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): # some safety margin for FS metadata self.assertGreater(int(new_size.strip()), 5.8*1024**2) -class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): - def test_000_create_start(self): - testvm1 = self.qc.add_new_vm("QubesAppVm", - template=None, - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False, - source_template=self.qc.get_default_template()) - self.qc.save() - self.qc.unlock_db() - testvm1.start() - self.assertEquals(testvm1.get_power_state(), "Running") - - def test_100_resize_root_img(self): - testvm1 = self.qc.add_new_vm("QubesAppVm", - template=None, - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False, - source_template=self.qc.get_default_template()) - self.qc.save() - self.qc.unlock_db() - with self.assertRaises(QubesException): - testvm1.resize_root_img(20*1024**3) - testvm1.resize_root_img(20*1024**3, allow_start=True) - timeout = 60 - while testvm1.is_running(): - time.sleep(1) - timeout -= 1 - if timeout == 0: - self.fail("Timeout while waiting for VM shutdown") - self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3) - testvm1.start() - p = testvm1.run('df --output=size /|tail -n 1', - passio_popen=True) - # new_size in 1k-blocks - (new_size, _) = p.communicate() - # some safety margin for FS metadata - self.assertGreater(int(new_size.strip()), 19*1024**2) - - -class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): - # TODO: test with some OS inside - # TODO: windows tools tests - - def test_000_create_start(self): - testvm1 = self.qc.add_new_vm("QubesHVm", - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False) - self.qc.save() - self.qc.unlock_db() - testvm1.start() - self.assertEquals(testvm1.get_power_state(), "Running") - - def test_010_create_start_template(self): - templatevm = self.qc.add_new_vm("QubesTemplateHVm", - name=self.make_vm_name('template')) - templatevm.create_on_disk(verbose=False) - self.qc.save() - self.qc.unlock_db() - - templatevm.start() - self.assertEquals(templatevm.get_power_state(), "Running") - - def test_020_create_start_template_vm(self): - templatevm = self.qc.add_new_vm("QubesTemplateHVm", - name=self.make_vm_name('template')) - templatevm.create_on_disk(verbose=False) - testvm2 = self.qc.add_new_vm("QubesHVm", - name=self.make_vm_name('vm2'), - template=templatevm) - testvm2.create_on_disk(verbose=False) - self.qc.save() - self.qc.unlock_db() - - testvm2.start() - self.assertEquals(testvm2.get_power_state(), "Running") - - def test_030_prevent_simultaneus_start(self): - templatevm = self.qc.add_new_vm("QubesTemplateHVm", - name=self.make_vm_name('template')) - templatevm.create_on_disk(verbose=False) - testvm2 = self.qc.add_new_vm("QubesHVm", - name=self.make_vm_name('vm2'), - template=templatevm) - testvm2.create_on_disk(verbose=False) - self.qc.save() - self.qc.unlock_db() - - templatevm.start() - self.assertEquals(templatevm.get_power_state(), "Running") - self.assertRaises(QubesException, testvm2.start) - templatevm.force_shutdown() - testvm2.start() - self.assertEquals(testvm2.get_power_state(), "Running") - self.assertRaises(QubesException, templatevm.start) - - def test_100_resize_root_img(self): - testvm1 = self.qc.add_new_vm("QubesHVm", - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False) - self.qc.save() - self.qc.unlock_db() - testvm1.resize_root_img(30*1024**3) - self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3) - testvm1.start() - self.assertEquals(testvm1.get_power_state(), "Running") - # TODO: launch some OS there and check the size - - def test_200_start_invalid_drive(self): - """Regression test for #1619""" - testvm1 = self.qc.add_new_vm("QubesHVm", - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False) - testvm1.drive = 'hd:dom0:/invalid' - self.qc.save() - self.qc.unlock_db() - try: - testvm1.start() - except Exception as e: - self.assertIsInstance(e, QubesException) - else: - self.fail('No exception raised') - - def test_201_start_invalid_drive_cdrom(self): - """Regression test for #1619""" - testvm1 = self.qc.add_new_vm("QubesHVm", - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False) - testvm1.drive = 'cdrom:dom0:/invalid' - self.qc.save() - self.qc.unlock_db() - try: - testvm1.start() - except Exception as e: - self.assertIsInstance(e, QubesException) - else: - self.fail('No exception raised') - -class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin): - def test_000_prepare_dvm(self): - self.qc.unlock_db() - retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', - self.template], - stderr=open(os.devnull, 'w')) - self.assertEqual(retcode, 0) - self.qc.lock_db_for_writing() - self.qc.load() - self.assertIsNotNone(self.qc.get_vm_by_name( - self.template + "-dvm")) - # TODO: check mtime of snapshot file - - def test_010_simple_dvm_run(self): - self.qc.unlock_db() - p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', - 'qubes.VMShell', 'dom0', 'DEFAULT'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=open(os.devnull, 'w')) - (stdout, _) = p.communicate(input="echo test") - self.assertEqual(stdout, "test\n") - # TODO: check if DispVM is destroyed - - @unittest.skipUnless(spawn.find_executable('xdotool'), - "xdotool not installed") - def test_020_gui_app(self): - self.qc.unlock_db() - p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', - 'qubes.VMShell', 'dom0', 'DEFAULT'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=open(os.devnull, 'w')) - - # wait for DispVM startup: - p.stdin.write("echo test\n") - p.stdin.flush() - l = p.stdout.readline() - self.assertEqual(l, "test\n") - - # potential race condition, but our tests are supposed to be - # running on dedicated machine, so should not be a problem - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - - max_qid = 0 - for vm in self.qc.values(): - if not vm.is_disposablevm(): - continue - if vm.qid > max_qid: - max_qid = vm.qid - dispvm = self.qc[max_qid] - self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml") - self.assertTrue(dispvm.is_running()) - try: - window_title = 'user@%s' % (dispvm.template.name + "-dvm") - p.stdin.write("xterm -e " - "\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n". - format(window_title)) - self.wait_for_window(window_title) - - time.sleep(0.5) - self.enter_keys_in_window(window_title, ['Return']) - # Wait for window to close - self.wait_for_window(window_title, show=False) - finally: - p.stdin.close() - - wait_count = 0 - while dispvm.is_running(): - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for DispVM destruction") - time.sleep(0.1) - wait_count = 0 - while p.poll() is None: - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for qfile-daemon-dvm " - "termination") - time.sleep(0.1) - self.assertEqual(p.returncode, 0) - - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - self.assertIsNone(self.qc.get_vm_by_name(dispvm.name), - "DispVM not removed from qubes.xml") - - def _handle_editor(self, winid): - (window_title, _) = subprocess.Popen( - ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\ - communicate() - window_title = window_title.strip().\ - replace('(', '\(').replace(')', '\)') - time.sleep(1) - if "gedit" in window_title: - subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, - 'type', 'Test test 2\n']) - time.sleep(0.5) - subprocess.check_call(['xdotool', - 'key', 'ctrl+s', 'ctrl+q']) - elif "LibreOffice" in window_title: - # wait for actual editor (we've got splash screen) - search = subprocess.Popen(['xdotool', 'search', '--sync', - '--onlyvisible', '--all', '--name', '--class', 'disp*|Writer'], - stdout=subprocess.PIPE, - stderr=open(os.path.devnull, 'w')) - retcode = search.wait() - if retcode == 0: - winid = search.stdout.read().strip() - time.sleep(0.5) - subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, - 'type', 'Test test 2\n']) - time.sleep(0.5) - subprocess.check_call(['xdotool', - 'key', '--delay', '100', 'ctrl+s', - 'Return', 'ctrl+q']) - elif "emacs" in window_title: - subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, - 'type', 'Test test 2\n']) - time.sleep(0.5) - subprocess.check_call(['xdotool', - 'key', 'ctrl+x', 'ctrl+s']) - subprocess.check_call(['xdotool', - 'key', 'ctrl+x', 'ctrl+c']) - elif "vim" in window_title or "user@" in window_title: - subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid, - 'key', 'i', 'type', 'Test test 2\n']) - subprocess.check_call( - ['xdotool', - 'key', 'Escape', 'colon', 'w', 'q', 'Return']) - else: - self.fail("Unknown editor window: {}".format(window_title)) - - @unittest.skipUnless(spawn.find_executable('xdotool'), - "xdotool not installed") - def test_030_edit_file(self): - testvm1 = self.qc.add_new_vm("QubesAppVm", - name=self.make_vm_name('vm1'), - template=self.qc.get_vm_by_name( - self.template)) - testvm1.create_on_disk(verbose=False) - self.qc.save() - - testvm1.start() - testvm1.run("echo test1 > /home/user/test.txt", wait=True) - - self.qc.unlock_db() - p = testvm1.run("qvm-open-in-dvm /home/user/test.txt", - passio_popen=True) - - wait_count = 0 - winid = None - while True: - search = subprocess.Popen(['xdotool', 'search', - '--onlyvisible', '--class', 'disp*'], - stdout=subprocess.PIPE, - stderr=open(os.path.devnull, 'w')) - retcode = search.wait() - if retcode == 0: - winid = search.stdout.read().strip() - break - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for editor window") - time.sleep(0.3) - - time.sleep(0.5) - self._handle_editor(winid) - p.wait() - p = testvm1.run("cat /home/user/test.txt", - passio_popen=True) - (test_txt_content, _) = p.communicate() - # Drop BOM if added by editor - if test_txt_content.startswith('\xef\xbb\xbf'): - test_txt_content = test_txt_content[3:] - self.assertEqual(test_txt_content, "Test test 2\ntest1\n") - - -class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): - @unittest.skipUnless(spawn.find_executable('xdotool'), - "xdotool not installed") - def test_000_clipboard(self): - testvm1 = self.qc.add_new_vm("QubesAppVm", - name=self.make_vm_name('vm1'), - template=self.qc.get_default_template()) - testvm1.create_on_disk(verbose=False) - testvm2 = self.qc.add_new_vm("QubesAppVm", - name=self.make_vm_name('vm2'), - template=self.qc.get_default_template()) - testvm2.create_on_disk(verbose=False) - self.qc.save() - self.qc.unlock_db() - - testvm1.start() - testvm2.start() - - window_title = 'user@{}'.format(testvm1.name) - testvm1.run('zenity --text-info --editable --title={}'.format( - window_title)) - - self.wait_for_window(window_title) - time.sleep(0.5) - test_string = "test{}".format(testvm1.xid) - - # Type and copy some text - subprocess.check_call(['xdotool', 'search', '--name', window_title, - 'windowactivate', '--sync', - 'type', '{}'.format(test_string)]) - # second xdotool call because type --terminator do not work (SEGV) - # additionally do not use search here, so window stack will be empty - # and xdotool will use XTEST instead of generating events manually - - # this will be much better - at least because events will have - # correct timestamp (so gui-daemon would not drop the copy request) - subprocess.check_call(['xdotool', - 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c', - 'Escape']) - - clipboard_content = \ - open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() - self.assertEquals(clipboard_content, test_string, - "Clipboard copy operation failed - content") - clipboard_source = \ - open('/var/run/qubes/qubes-clipboard.bin.source', - 'r').read().strip() - self.assertEquals(clipboard_source, testvm1.name, - "Clipboard copy operation failed - owner") - - # Then paste it to the other window - window_title = 'user@{}'.format(testvm2.name) - p = testvm2.run('zenity --entry --title={} > test.txt'.format( - window_title), passio_popen=True) - self.wait_for_window(window_title) - - subprocess.check_call(['xdotool', 'key', '--delay', '100', - 'ctrl+shift+v', 'ctrl+v', 'Return']) - p.wait() - - # And compare the result - (test_output, _) = testvm2.run('cat test.txt', - passio_popen=True).communicate() - self.assertEquals(test_string, test_output.strip()) - - clipboard_content = \ - open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() - self.assertEquals(clipboard_content, "", - "Clipboard not wiped after paste - content") - clipboard_source = \ - open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read( - - ).strip() - self.assertEquals(clipboard_source, "", - "Clipboard not wiped after paste - owner") - - -@unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'), - 'grub-xen package not installed') -class TC_40_PVGrub(qubes.tests.SystemTestsMixin): - def setUp(self): - super(TC_40_PVGrub, self).setUp() - supported = False - if self.template.startswith('fedora-'): - supported = True - elif self.template.startswith('debian-'): - supported = True - if not supported: - self.skipTest("Template {} not supported by this test".format( - self.template)) - - def install_packages(self, vm): - if self.template.startswith('fedora-'): - cmd_install1 = 'dnf clean expire-cache && ' \ - 'dnf install -y qubes-kernel-vm-support grub2-tools' - cmd_install2 = 'dnf install -y kernel && ' \ - 'KVER=$(rpm -q --qf %{VERSION}-%{RELEASE}.%{ARCH} kernel) && ' \ - 'dnf install --allowerasing -y kernel-devel-$KVER && ' \ - 'dkms autoinstall -k $KVER' - cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg' - elif self.template.startswith('debian-'): - cmd_install1 = 'apt-get update && apt-get install -y ' \ - 'qubes-kernel-vm-support grub2-common' - cmd_install2 = 'apt-get install -y linux-image-amd64' - cmd_update_grub = 'mkdir /boot/grub && update-grub2' - else: - assert False, "Unsupported template?!" - - for cmd in [cmd_install1, cmd_install2, cmd_update_grub]: - p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True) - (stdout, stderr) = p.communicate() - self.assertEquals(p.returncode, 0, - "Failed command: {}\nSTDOUT: {}\nSTDERR: {}" - .format(cmd, stdout, stderr)) - - def get_kernel_version(self, vm): - if self.template.startswith('fedora-'): - cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \ - 'cut -d - -f 2-' - elif self.template.startswith('debian-'): - cmd_get_kernel_version = \ - 'dpkg-query --showformat=\'${Package}\\n\' --show ' \ - '\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-' - else: - raise RuntimeError("Unsupported template?!") - - p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True) - (kver, _) = p.communicate() - self.assertEquals(p.returncode, 0, - "Failed command: {}".format(cmd_get_kernel_version)) - return kver.strip() - - def test_000_standalone_vm(self): - testvm1 = self.qc.add_new_vm("QubesAppVm", - template=None, - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False, - source_template=self.qc.get_vm_by_name( - self.template)) - self.save_and_reload_db() - self.qc.unlock_db() - testvm1 = self.qc[testvm1.qid] - testvm1.start() - self.install_packages(testvm1) - kver = self.get_kernel_version(testvm1) - self.shutdown_and_wait(testvm1) - - self.qc.lock_db_for_writing() - self.qc.load() - testvm1 = self.qc[testvm1.qid] - testvm1.kernel = 'pvgrub2' - self.save_and_reload_db() - self.qc.unlock_db() - testvm1 = self.qc[testvm1.qid] - testvm1.start() - p = testvm1.run('uname -r', passio_popen=True) - (actual_kver, _) = p.communicate() - self.assertEquals(actual_kver.strip(), kver) - - def test_010_template_based_vm(self): - test_template = self.qc.add_new_vm("QubesTemplateVm", - template=None, - name=self.make_vm_name('template')) - test_template.clone_attrs(self.qc.get_vm_by_name(self.template)) - test_template.clone_disk_files( - src_vm=self.qc.get_vm_by_name(self.template), - verbose=False) - - testvm1 = self.qc.add_new_vm("QubesAppVm", - template=test_template, - name=self.make_vm_name('vm1')) - testvm1.create_on_disk(verbose=False, - source_template=test_template) - self.save_and_reload_db() - self.qc.unlock_db() - test_template = self.qc[test_template.qid] - testvm1 = self.qc[testvm1.qid] - test_template.start() - self.install_packages(test_template) - kver = self.get_kernel_version(test_template) - self.shutdown_and_wait(test_template) - - self.qc.lock_db_for_writing() - self.qc.load() - test_template = self.qc[test_template.qid] - test_template.kernel = 'pvgrub2' - testvm1 = self.qc[testvm1.qid] - testvm1.kernel = 'pvgrub2' - self.save_and_reload_db() - self.qc.unlock_db() - - # Check if TemplateBasedVM boots and has the right kernel - testvm1 = self.qc[testvm1.qid] - testvm1.start() - p = testvm1.run('uname -r', passio_popen=True) - (actual_kver, _) = p.communicate() - self.assertEquals(actual_kver.strip(), kver) - - # And the same for the TemplateVM itself - test_template = self.qc[test_template.qid] - test_template.start() - p = test_template.run('uname -r', passio_popen=True) - (actual_kver, _) = p.communicate() - self.assertEquals(actual_kver.strip(), kver) - -@unittest.skipUnless( - spawn.find_executable('xprop') and - spawn.find_executable('xdotool') and - spawn.find_executable('wmctrl'), - "xprop or xdotool or wmctrl not installed") -class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): - @classmethod - def setUpClass(cls): - if cls.template == 'whonix-gw' or 'minimal' in cls.template: - raise unittest.SkipTest( - 'Template {} not supported by this test'.format(cls.template)) - - if cls.template == 'whonix-ws': - # TODO remove when Whonix-based DispVMs will work (Whonix 13?) - raise unittest.SkipTest( - 'Template {} not supported by this test'.format(cls.template)) - - qc = QubesVmCollection() - - cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX) - - qc.lock_db_for_writing() - qc.load() - - cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn, - prefix=qubes.tests.CLSVMPREFIX) - - cls.source_vmname = cls.make_vm_name('source', True) - source_vm = qc.add_new_vm("QubesAppVm", - template=qc.get_vm_by_name(cls.template), - name=cls.source_vmname) - source_vm.create_on_disk(verbose=False) - - cls.target_vmname = cls.make_vm_name('target', True) - target_vm = qc.add_new_vm("QubesAppVm", - template=qc.get_vm_by_name(cls.template), - name=cls.target_vmname) - target_vm.create_on_disk(verbose=False) - - qc.save() - qc.unlock_db() - source_vm.start() - target_vm.start() - - # make sure that DispVMs will be started of the same template - retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', - cls.template], - stderr=open(os.devnull, 'w')) - assert retcode == 0, "Error preparing DispVM" - - def setUp(self): - super(TC_50_MimeHandlers, self).setUp() - self.source_vm = self.qc.get_vm_by_name(self.source_vmname) - self.target_vm = self.qc.get_vm_by_name(self.target_vmname) - - def get_window_class(self, winid, dispvm=False): - (vm_winid, _) = subprocess.Popen( - ['xprop', '-id', winid, '_QUBES_VMWINDOWID'], - stdout=subprocess.PIPE - ).communicate() - vm_winid = vm_winid.split("#")[1].strip('\n" ') - if dispvm: - (vmname, _) = subprocess.Popen( - ['xprop', '-id', winid, '_QUBES_VMNAME'], - stdout=subprocess.PIPE - ).communicate() - vmname = vmname.split("=")[1].strip('\n" ') - window_class = None - while window_class is None: - # XXX to use self.qc.get_vm_by_name would require reloading - # qubes.xml, so use qvm-run instead - xprop = subprocess.Popen( - ['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format( - vm_winid)], stdout=subprocess.PIPE) - (window_class, _) = xprop.communicate() - if xprop.returncode != 0: - self.skipTest("xprop failed, not installed?") - if 'not found' in window_class: - # WM_CLASS not set yet, wait a little - time.sleep(0.1) - window_class = None - else: - window_class = None - while window_class is None: - xprop = self.target_vm.run( - 'xprop -id {} WM_CLASS'.format(vm_winid), - passio_popen=True) - (window_class, _) = xprop.communicate() - if xprop.returncode != 0: - self.skipTest("xprop failed, not installed?") - if 'not found' in window_class: - # WM_CLASS not set yet, wait a little - time.sleep(0.1) - window_class = None - # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal" - try: - window_class = window_class.split("=")[1].split(",")[0].strip('\n" ') - except IndexError: - raise Exception( - "Unexpected output from xprop: '{}'".format(window_class)) - - return window_class - - def open_file_and_check_viewer(self, filename, expected_app_titles, - expected_app_classes, dispvm=False): - self.qc.unlock_db() - if dispvm: - p = self.source_vm.run("qvm-open-in-dvm {}".format(filename), - passio_popen=True) - vmpattern = "disp*" - else: - self.qrexec_policy('qubes.OpenInVM', self.source_vm.name, - self.target_vmname) - self.qrexec_policy('qubes.OpenURL', self.source_vm.name, - self.target_vmname) - p = self.source_vm.run("qvm-open-in-vm {} {}".format( - self.target_vmname, filename), passio_popen=True) - vmpattern = self.target_vmname - wait_count = 0 - winid = None - window_title = None - while True: - search = subprocess.Popen(['xdotool', 'search', - '--onlyvisible', '--class', vmpattern], - stdout=subprocess.PIPE, - stderr=open(os.path.devnull, 'w')) - retcode = search.wait() - if retcode == 0: - winid = search.stdout.read().strip() - # get window title - (window_title, _) = subprocess.Popen( - ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \ - communicate() - window_title = window_title.strip() - # ignore LibreOffice splash screen and window with no title - # set yet - if window_title and not window_title.startswith("LibreOffice")\ - and not window_title == 'VMapp command': - break - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for editor window") - time.sleep(0.3) - - # get window class - window_class = self.get_window_class(winid, dispvm) - # close the window - we've got the window class, it is no longer needed - subprocess.check_call(['wmctrl', '-i', '-c', winid]) - p.wait() - self.wait_for_window(window_title, show=False) - - def check_matches(obj, patterns): - return any((pat.search(obj) if isinstance(pat, type(re.compile(''))) - else pat in obj) for pat in patterns) - - if not check_matches(window_title, expected_app_titles) and \ - not check_matches(window_class, expected_app_classes): - self.fail("Opening file {} resulted in window '{} ({})', which is " - "none of {!r} ({!r})".format( - filename, window_title, window_class, - expected_app_titles, expected_app_classes)) - - def prepare_txt(self, filename): - p = self.source_vm.run("cat > {}".format(filename), passio_popen=True) - p.stdin.write("This is test\n") - p.stdin.close() - retcode = p.wait() - assert retcode == 0, "Failed to write {} file".format(filename) - - def prepare_pdf(self, filename): - self.prepare_txt("/tmp/source.txt") - cmd = "convert /tmp/source.txt {}".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - assert retcode == 0, "Failed to run '{}'".format(cmd) - - def prepare_doc(self, filename): - self.prepare_txt("/tmp/source.txt") - cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) - - def prepare_pptx(self, filename): - self.prepare_txt("/tmp/source.txt") - cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) - - def prepare_png(self, filename): - self.prepare_txt("/tmp/source.txt") - cmd = "convert /tmp/source.txt {}".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) - - def prepare_jpg(self, filename): - self.prepare_txt("/tmp/source.txt") - cmd = "convert /tmp/source.txt {}".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) - - def test_000_txt(self): - filename = "/home/user/test_file.txt" - self.prepare_txt(filename) - self.open_file_and_check_viewer(filename, ["vim", "user@"], - ["gedit", "emacs", "libreoffice"]) - - def test_001_pdf(self): - filename = "/home/user/test_file.pdf" - self.prepare_pdf(filename) - self.open_file_and_check_viewer(filename, [], - ["evince"]) - - def test_002_doc(self): - filename = "/home/user/test_file.doc" - self.prepare_doc(filename) - self.open_file_and_check_viewer(filename, [], - ["libreoffice", "abiword"]) - - def test_003_pptx(self): - filename = "/home/user/test_file.pptx" - self.prepare_pptx(filename) - self.open_file_and_check_viewer(filename, [], - ["libreoffice"]) - - def test_004_png(self): - filename = "/home/user/test_file.png" - self.prepare_png(filename) - self.open_file_and_check_viewer(filename, [], - ["shotwell", "eog", "display"]) - - def test_005_jpg(self): - filename = "/home/user/test_file.jpg" - self.prepare_jpg(filename) - self.open_file_and_check_viewer(filename, [], - ["shotwell", "eog", "display"]) - - def test_006_jpeg(self): - filename = "/home/user/test_file.jpeg" - self.prepare_jpg(filename) - self.open_file_and_check_viewer(filename, [], - ["shotwell", "eog", "display"]) - - def test_010_url(self): - self.open_file_and_check_viewer("https://www.qubes-os.org/", [], - ["Firefox", "Iceweasel", "Navigator"]) - - def test_100_txt_dispvm(self): - filename = "/home/user/test_file.txt" - self.prepare_txt(filename) - self.open_file_and_check_viewer(filename, ["vim", "user@"], - ["gedit", "emacs", "libreoffice"], - dispvm=True) - - def test_101_pdf_dispvm(self): - filename = "/home/user/test_file.pdf" - self.prepare_pdf(filename) - self.open_file_and_check_viewer(filename, [], - ["evince"], - dispvm=True) - - def test_102_doc_dispvm(self): - filename = "/home/user/test_file.doc" - self.prepare_doc(filename) - self.open_file_and_check_viewer(filename, [], - ["libreoffice", "abiword"], - dispvm=True) - - def test_103_pptx_dispvm(self): - filename = "/home/user/test_file.pptx" - self.prepare_pptx(filename) - self.open_file_and_check_viewer(filename, [], - ["libreoffice"], - dispvm=True) - - def test_104_png_dispvm(self): - filename = "/home/user/test_file.png" - self.prepare_png(filename) - self.open_file_and_check_viewer(filename, [], - ["shotwell", "eog", "display"], - dispvm=True) - - def test_105_jpg_dispvm(self): - filename = "/home/user/test_file.jpg" - self.prepare_jpg(filename) - self.open_file_and_check_viewer(filename, [], - ["shotwell", "eog", "display"], - dispvm=True) - - def test_106_jpeg_dispvm(self): - filename = "/home/user/test_file.jpeg" - self.prepare_jpg(filename) - self.open_file_and_check_viewer(filename, [], - ["shotwell", "eog", "display"], - dispvm=True) - - def test_110_url_dispvm(self): - self.open_file_and_check_viewer("https://www.qubes-os.org/", [], - ["Firefox", "Iceweasel", "Navigator"], - dispvm=True) - def load_tests(loader, tests, pattern): try: @@ -1705,21 +880,4 @@ def load_tests(loader, tests, pattern): (TC_00_AppVMMixin, qubes.tests.QubesTestCase), {'template': template}))) - tests.addTests(loader.loadTestsFromTestCase( - type( - 'TC_20_DispVM_' + template, - (TC_20_DispVMMixin, qubes.tests.QubesTestCase), - {'template': template}))) - tests.addTests(loader.loadTestsFromTestCase( - type( - 'TC_40_PVGrub_' + template, - (TC_40_PVGrub, qubes.tests.QubesTestCase), - {'template': template}))) - - tests.addTests(loader.loadTestsFromTestCase( - type( - 'TC_50_MimeHandlers_' + template, - (TC_50_MimeHandlers, qubes.tests.QubesTestCase), - {'template': template}))) - return tests