From 02f966116931612745d6f95e5be3c8ce41a2cb71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 1 Oct 2018 06:02:22 +0200 Subject: [PATCH] tests: migrate mime handlers test to core3 --- qubes/tests/__init__.py | 1 + {tests => qubes/tests/integ}/mime.py | 288 +++++++++++++-------------- rpm_spec/core-dom0.spec.in | 1 + 3 files changed, 141 insertions(+), 149 deletions(-) rename {tests => qubes/tests/integ}/mime.py (54%) diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 131dcb64..01937069 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -1268,6 +1268,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.integ.network', 'qubes.tests.integ.dispvm', 'qubes.tests.integ.vm_qrexec_gui', + 'qubes.tests.integ.mime', 'qubes.tests.integ.salt', 'qubes.tests.integ.backup', 'qubes.tests.integ.backupcompatibility', diff --git a/tests/mime.py b/qubes/tests/integ/mime.py similarity index 54% rename from tests/mime.py rename to qubes/tests/integ/mime.py index 5128585d..e2a5ea0f 100644 --- a/tests/mime.py +++ b/qubes/tests/integ/mime.py @@ -21,112 +21,91 @@ # # from distutils import spawn -import os import re import subprocess import time import unittest +import itertools + +import asyncio + +import sys + import qubes.tests -import qubes.qubes -from qubes.qubes import QubesVmCollection +import qubes @unittest.skipUnless( spawn.find_executable('xprop') and spawn.find_executable('xdotool') and spawn.find_executable('wmctrl'), "xprop or xdotool or wmctrl not installed") -class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): - @classmethod - def setUpClass(cls): - if cls.template == 'whonix-gw' or 'minimal' in cls.template: - raise unittest.SkipTest( - 'Template {} not supported by this test'.format(cls.template)) - - if cls.template == 'whonix-ws': - # TODO remove when Whonix-based DispVMs will work (Whonix 13?) - raise unittest.SkipTest( - 'Template {} not supported by this test'.format(cls.template)) - - qc = QubesVmCollection() - - cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX) - - qc.lock_db_for_writing() - qc.load() - - cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn, - prefix=qubes.tests.CLSVMPREFIX) - - cls.source_vmname = cls.make_vm_name('source', True) - source_vm = qc.add_new_vm("QubesAppVm", - template=qc.get_vm_by_name(cls.template), - name=cls.source_vmname) - source_vm.create_on_disk(verbose=False) - - cls.target_vmname = cls.make_vm_name('target', True) - target_vm = qc.add_new_vm("QubesAppVm", - template=qc.get_vm_by_name(cls.template), - name=cls.target_vmname) - target_vm.create_on_disk(verbose=False) - - qc.save() - qc.unlock_db() - source_vm.start() - target_vm.start() - - # make sure that DispVMs will be started of the same template - retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', - cls.template], - stderr=open(os.devnull, 'w')) - assert retcode == 0, "Error preparing DispVM" - +class TC_50_MimeHandlers: def setUp(self): super(TC_50_MimeHandlers, self).setUp() - self.source_vm = self.qc.get_vm_by_name(self.source_vmname) - self.target_vm = self.qc.get_vm_by_name(self.target_vmname) + if self.template.startswith('whonix-gw') or 'minimal' in self.template: + raise unittest.SkipTest( + 'Template {} not supported by this test'.format(self.template)) + + self.source_vmname = self.make_vm_name('source') + self.source_vm = self.app.add_new_vm("AppVM", + template=self.template, + name=self.source_vmname, + label='red') + self.loop.run_until_complete(self.source_vm.create_on_disk()) + + self.target_vmname = self.make_vm_name('target') + self.target_vm = self.app.add_new_vm("AppVM", + template=self.template, + name=self.target_vmname, + label='red') + self.loop.run_until_complete(self.target_vm.create_on_disk()) + + self.target_vm.template_for_dispvms = True + self.source_vm.default_dispvm = self.target_vm + + done, not_done = self.loop.run_until_complete(asyncio.wait([ + self.source_vm.start(), + self.target_vm.start()])) + for result in itertools.chain(done, not_done): + # catch any exceptions + result.result() + def get_window_class(self, winid, dispvm=False): (vm_winid, _) = subprocess.Popen( ['xprop', '-id', winid, '_QUBES_VMWINDOWID'], stdout=subprocess.PIPE ).communicate() - vm_winid = vm_winid.split("#")[1].strip('\n" ') + vm_winid = vm_winid.decode().split("#")[1].strip('\n" ') if dispvm: (vmname, _) = subprocess.Popen( ['xprop', '-id', winid, '_QUBES_VMNAME'], stdout=subprocess.PIPE ).communicate() - vmname = vmname.split("=")[1].strip('\n" ') - window_class = None - while window_class is None: - # XXX to use self.qc.get_vm_by_name would require reloading - # qubes.xml, so use qvm-run instead - xprop = subprocess.Popen( - ['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format( - vm_winid)], stdout=subprocess.PIPE) - (window_class, _) = xprop.communicate() - if xprop.returncode != 0: - self.skipTest("xprop failed, not installed?") - if 'not found' in window_class: - # WM_CLASS not set yet, wait a little - time.sleep(0.1) - window_class = None + vmname = vmname.decode().split("=")[1].strip('\n" ') + vm = self.app.domains[vmname] else: - window_class = None - while window_class is None: - xprop = self.target_vm.run( - 'xprop -id {} WM_CLASS'.format(vm_winid), - passio_popen=True) - (window_class, _) = xprop.communicate() - if xprop.returncode != 0: - self.skipTest("xprop failed, not installed?") - if 'not found' in window_class: - # WM_CLASS not set yet, wait a little - time.sleep(0.1) - window_class = None + vm = self.target_vm + window_class = None + while window_class is None: + try: + window_class, _ = self.loop.run_until_complete( + vm.run_for_stdio('xprop -id {} WM_CLASS'.format(vm_winid))) + except subprocess.CalledProcessError as e: + if e.returncode == 127: + self.skipTest('xprop not installed') + self.fail( + "xprop -id {} WM_CLASS failed: {}".format( + vm_winid, e.stderr.decode())) + if b'not found' in window_class: + # WM_CLASS not set yet, wait a little + time.sleep(0.1) + window_class = None + # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal" try: + window_class = window_class.decode() window_class = window_class.split("=")[1].split(",")[0].strip('\n" ') except IndexError: raise Exception( @@ -136,44 +115,45 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): def open_file_and_check_viewer(self, filename, expected_app_titles, expected_app_classes, dispvm=False): - self.qc.unlock_db() if dispvm: - p = self.source_vm.run("qvm-open-in-dvm {}".format(filename), - passio_popen=True) - vmpattern = "disp*" + p = self.loop.run_until_complete(self.source_vm.run( + "qvm-open-in-dvm {}".format(filename), stdout=subprocess.PIPE)) + vmpattern = "disp[0-9]*" else: - self.qrexec_policy('qubes.OpenInVM', self.source_vm.name, - self.target_vmname) - self.qrexec_policy('qubes.OpenURL', self.source_vm.name, - self.target_vmname) - p = self.source_vm.run("qvm-open-in-vm {} {}".format( - self.target_vmname, filename), passio_popen=True) + p = self.loop.run_until_complete(self.source_vm.run( + "qvm-open-in-vm {} {}".format(self.target_vmname, filename), + stdout=subprocess.PIPE)) vmpattern = self.target_vmname wait_count = 0 winid = None - window_title = None - while True: - search = subprocess.Popen(['xdotool', 'search', - '--onlyvisible', '--class', vmpattern], - stdout=subprocess.PIPE, - stderr=open(os.path.devnull, 'w')) - retcode = search.wait() - if retcode == 0: - winid = search.stdout.read().strip() - # get window title - (window_title, _) = subprocess.Popen( - ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \ - communicate() - window_title = window_title.strip() - # ignore LibreOffice splash screen and window with no title - # set yet - if window_title and not window_title.startswith("LibreOffice")\ - and not window_title == 'VMapp command': - break - wait_count += 1 - if wait_count > 100: - self.fail("Timeout while waiting for editor window") - time.sleep(0.3) + with self.qrexec_policy('qubes.OpenInVM', self.source_vm.name, + self.target_vmname): + with self.qrexec_policy('qubes.OpenURL', self.source_vm.name, + self.target_vmname): + while True: + search = subprocess.Popen(['xdotool', 'search', + '--onlyvisible', '--class', vmpattern], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL) + retcode = search.wait() + if retcode == 0: + winid = search.stdout.read().strip() + # get window title + (window_title, _) = subprocess.Popen( + ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \ + communicate() + window_title = window_title.decode('utf8').strip() + # ignore LibreOffice splash screen and window with no title + # set yet + if window_title and \ + not window_title.startswith("LibreOffice") and\ + not window_title.startswith("NetworkManager") and\ + not window_title == 'VMapp command': + break + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for editor window") + self.loop.run_until_complete(asyncio.sleep(0.3)) # get window class window_class = self.get_window_class(winid, dispvm) @@ -194,45 +174,66 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): expected_app_titles, expected_app_classes)) def prepare_txt(self, filename): - p = self.source_vm.run("cat > {}".format(filename), passio_popen=True) - p.stdin.write("This is test\n") - p.stdin.close() - retcode = p.wait() - assert retcode == 0, "Failed to write {} file".format(filename) + self.loop.run_until_complete( + self.source_vm.run_for_stdio("cat > {}".format(filename), + input=b'This is test\n')) def prepare_pdf(self, filename): self.prepare_txt("/tmp/source.txt") - cmd = "convert /tmp/source.txt {}".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - assert retcode == 0, "Failed to run '{}'".format(cmd) + cmd = "convert text:/tmp/source.txt {}".format(filename) + try: + self.loop.run_until_complete( + self.source_vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + self.fail('{} failed: {}'.format(cmd, e.stderr.decode())) def prepare_doc(self, filename): self.prepare_txt("/tmp/source.txt") cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) + try: + self.loop.run_until_complete( + self.source_vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + if e.returncode == 127: + self.skipTest("unoconv not installed".format(cmd)) + self.skipTest("Failed to run '{}': {}".format(cmd, + e.stderr.decode())) def prepare_pptx(self, filename): self.prepare_txt("/tmp/source.txt") cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) + try: + self.loop.run_until_complete( + self.source_vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + if e.returncode == 127: + self.skipTest("unoconv not installed".format(cmd)) + self.skipTest("Failed to run '{}': {}".format(cmd, + e.stderr.decode())) def prepare_png(self, filename): self.prepare_txt("/tmp/source.txt") - cmd = "convert /tmp/source.txt {}".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) + cmd = "convert text:/tmp/source.txt {}".format(filename) + try: + self.loop.run_until_complete( + self.source_vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + if e.returncode == 127: + self.skipTest("convert not installed".format(cmd)) + self.skipTest("Failed to run '{}': {}".format(cmd, + e.stderr.decode())) def prepare_jpg(self, filename): self.prepare_txt("/tmp/source.txt") - cmd = "convert /tmp/source.txt {}".format(filename) - retcode = self.source_vm.run(cmd, wait=True) - if retcode != 0: - self.skipTest("Failed to run '{}', not installed?".format(cmd)) + cmd = "convert text:/tmp/source.txt {}".format(filename) + try: + self.loop.run_until_complete( + self.source_vm.run_for_stdio(cmd)) + except subprocess.CalledProcessError as e: + if e.returncode == 127: + self.skipTest("convert not installed".format(cmd)) + self.skipTest("Failed to run '{}': {}".format(cmd, + e.stderr.decode())) def test_000_txt(self): filename = "/home/user/test_file.txt" @@ -335,19 +336,8 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): dispvm=True) def load_tests(loader, tests, pattern): - try: - qc = qubes.qubes.QubesVmCollection() - qc.lock_db_for_reading() - qc.load() - qc.unlock_db() - templates = [vm.name for vm in qc.values() if - isinstance(vm, qubes.qubes.QubesTemplateVm)] - except OSError: - templates = [] - for template in templates: - tests.addTests(loader.loadTestsFromTestCase( - type( - 'TC_50_MimeHandlers_' + template, - (TC_50_MimeHandlers, qubes.tests.QubesTestCase), - {'template': template}))) - return tests \ No newline at end of file + tests.addTests(loader.loadTestsFromNames( + qubes.tests.create_testcases_for_templates('TC_50_MimeHandlers', + TC_50_MimeHandlers, qubes.tests.SystemTestCase, + module=sys.modules[__name__]))) + return tests diff --git a/rpm_spec/core-dom0.spec.in b/rpm_spec/core-dom0.spec.in index 17abc376..5c5034db 100644 --- a/rpm_spec/core-dom0.spec.in +++ b/rpm_spec/core-dom0.spec.in @@ -342,6 +342,7 @@ fi %{python3_sitelib}/qubes/tests/integ/devices_pci.py %{python3_sitelib}/qubes/tests/integ/dispvm.py %{python3_sitelib}/qubes/tests/integ/dom0_update.py +%{python3_sitelib}/qubes/tests/integ/mime.py %{python3_sitelib}/qubes/tests/integ/network.py %{python3_sitelib}/qubes/tests/integ/pvgrub.py %{python3_sitelib}/qubes/tests/integ/salt.py