tests: migrate mime handlers test to core3

This commit is contained in:
Marek Marczykowski-Górecki 2018-10-01 06:02:22 +02:00
parent 5bc0baeafa
commit 02f9661169
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 141 additions and 149 deletions

View File

@ -1268,6 +1268,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
'qubes.tests.integ.network', 'qubes.tests.integ.network',
'qubes.tests.integ.dispvm', 'qubes.tests.integ.dispvm',
'qubes.tests.integ.vm_qrexec_gui', 'qubes.tests.integ.vm_qrexec_gui',
'qubes.tests.integ.mime',
'qubes.tests.integ.salt', 'qubes.tests.integ.salt',
'qubes.tests.integ.backup', 'qubes.tests.integ.backup',
'qubes.tests.integ.backupcompatibility', 'qubes.tests.integ.backupcompatibility',

View File

@ -21,112 +21,91 @@
# #
# #
from distutils import spawn from distutils import spawn
import os
import re import re
import subprocess import subprocess
import time import time
import unittest import unittest
import itertools
import asyncio
import sys
import qubes.tests import qubes.tests
import qubes.qubes import qubes
from qubes.qubes import QubesVmCollection
@unittest.skipUnless( @unittest.skipUnless(
spawn.find_executable('xprop') and spawn.find_executable('xprop') and
spawn.find_executable('xdotool') and spawn.find_executable('xdotool') and
spawn.find_executable('wmctrl'), spawn.find_executable('wmctrl'),
"xprop or xdotool or wmctrl not installed") "xprop or xdotool or wmctrl not installed")
class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin): class TC_50_MimeHandlers:
@classmethod
def setUpClass(cls):
if cls.template == 'whonix-gw' or 'minimal' in cls.template:
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
if cls.template == 'whonix-ws':
# TODO remove when Whonix-based DispVMs will work (Whonix 13?)
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
qc = QubesVmCollection()
cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
qc.lock_db_for_writing()
qc.load()
cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
prefix=qubes.tests.CLSVMPREFIX)
cls.source_vmname = cls.make_vm_name('source', True)
source_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.source_vmname)
source_vm.create_on_disk(verbose=False)
cls.target_vmname = cls.make_vm_name('target', True)
target_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.target_vmname)
target_vm.create_on_disk(verbose=False)
qc.save()
qc.unlock_db()
source_vm.start()
target_vm.start()
# make sure that DispVMs will be started of the same template
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
cls.template],
stderr=open(os.devnull, 'w'))
assert retcode == 0, "Error preparing DispVM"
def setUp(self): def setUp(self):
super(TC_50_MimeHandlers, self).setUp() super(TC_50_MimeHandlers, self).setUp()
self.source_vm = self.qc.get_vm_by_name(self.source_vmname) if self.template.startswith('whonix-gw') or 'minimal' in self.template:
self.target_vm = self.qc.get_vm_by_name(self.target_vmname) raise unittest.SkipTest(
'Template {} not supported by this test'.format(self.template))
self.source_vmname = self.make_vm_name('source')
self.source_vm = self.app.add_new_vm("AppVM",
template=self.template,
name=self.source_vmname,
label='red')
self.loop.run_until_complete(self.source_vm.create_on_disk())
self.target_vmname = self.make_vm_name('target')
self.target_vm = self.app.add_new_vm("AppVM",
template=self.template,
name=self.target_vmname,
label='red')
self.loop.run_until_complete(self.target_vm.create_on_disk())
self.target_vm.template_for_dispvms = True
self.source_vm.default_dispvm = self.target_vm
done, not_done = self.loop.run_until_complete(asyncio.wait([
self.source_vm.start(),
self.target_vm.start()]))
for result in itertools.chain(done, not_done):
# catch any exceptions
result.result()
def get_window_class(self, winid, dispvm=False): def get_window_class(self, winid, dispvm=False):
(vm_winid, _) = subprocess.Popen( (vm_winid, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMWINDOWID'], ['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
stdout=subprocess.PIPE stdout=subprocess.PIPE
).communicate() ).communicate()
vm_winid = vm_winid.split("#")[1].strip('\n" ') vm_winid = vm_winid.decode().split("#")[1].strip('\n" ')
if dispvm: if dispvm:
(vmname, _) = subprocess.Popen( (vmname, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMNAME'], ['xprop', '-id', winid, '_QUBES_VMNAME'],
stdout=subprocess.PIPE stdout=subprocess.PIPE
).communicate() ).communicate()
vmname = vmname.split("=")[1].strip('\n" ') vmname = vmname.decode().split("=")[1].strip('\n" ')
window_class = None vm = self.app.domains[vmname]
while window_class is None:
# XXX to use self.qc.get_vm_by_name would require reloading
# qubes.xml, so use qvm-run instead
xprop = subprocess.Popen(
['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
vm_winid)], stdout=subprocess.PIPE)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
else: else:
window_class = None vm = self.target_vm
while window_class is None: window_class = None
xprop = self.target_vm.run( while window_class is None:
'xprop -id {} WM_CLASS'.format(vm_winid), try:
passio_popen=True) window_class, _ = self.loop.run_until_complete(
(window_class, _) = xprop.communicate() vm.run_for_stdio('xprop -id {} WM_CLASS'.format(vm_winid)))
if xprop.returncode != 0: except subprocess.CalledProcessError as e:
self.skipTest("xprop failed, not installed?") if e.returncode == 127:
if 'not found' in window_class: self.skipTest('xprop not installed')
# WM_CLASS not set yet, wait a little self.fail(
time.sleep(0.1) "xprop -id {} WM_CLASS failed: {}".format(
window_class = None vm_winid, e.stderr.decode()))
if b'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
# output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal" # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
try: try:
window_class = window_class.decode()
window_class = window_class.split("=")[1].split(",")[0].strip('\n" ') window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
except IndexError: except IndexError:
raise Exception( raise Exception(
@ -136,44 +115,45 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
def open_file_and_check_viewer(self, filename, expected_app_titles, def open_file_and_check_viewer(self, filename, expected_app_titles,
expected_app_classes, dispvm=False): expected_app_classes, dispvm=False):
self.qc.unlock_db()
if dispvm: if dispvm:
p = self.source_vm.run("qvm-open-in-dvm {}".format(filename), p = self.loop.run_until_complete(self.source_vm.run(
passio_popen=True) "qvm-open-in-dvm {}".format(filename), stdout=subprocess.PIPE))
vmpattern = "disp*" vmpattern = "disp[0-9]*"
else: else:
self.qrexec_policy('qubes.OpenInVM', self.source_vm.name, p = self.loop.run_until_complete(self.source_vm.run(
self.target_vmname) "qvm-open-in-vm {} {}".format(self.target_vmname, filename),
self.qrexec_policy('qubes.OpenURL', self.source_vm.name, stdout=subprocess.PIPE))
self.target_vmname)
p = self.source_vm.run("qvm-open-in-vm {} {}".format(
self.target_vmname, filename), passio_popen=True)
vmpattern = self.target_vmname vmpattern = self.target_vmname
wait_count = 0 wait_count = 0
winid = None winid = None
window_title = None with self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
while True: self.target_vmname):
search = subprocess.Popen(['xdotool', 'search', with self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
'--onlyvisible', '--class', vmpattern], self.target_vmname):
stdout=subprocess.PIPE, while True:
stderr=open(os.path.devnull, 'w')) search = subprocess.Popen(['xdotool', 'search',
retcode = search.wait() '--onlyvisible', '--class', vmpattern],
if retcode == 0: stdout=subprocess.PIPE,
winid = search.stdout.read().strip() stderr=subprocess.DEVNULL)
# get window title retcode = search.wait()
(window_title, _) = subprocess.Popen( if retcode == 0:
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \ winid = search.stdout.read().strip()
communicate() # get window title
window_title = window_title.strip() (window_title, _) = subprocess.Popen(
# ignore LibreOffice splash screen and window with no title ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
# set yet communicate()
if window_title and not window_title.startswith("LibreOffice")\ window_title = window_title.decode('utf8').strip()
and not window_title == 'VMapp command': # ignore LibreOffice splash screen and window with no title
break # set yet
wait_count += 1 if window_title and \
if wait_count > 100: not window_title.startswith("LibreOffice") and\
self.fail("Timeout while waiting for editor window") not window_title.startswith("NetworkManager") and\
time.sleep(0.3) not window_title == 'VMapp command':
break
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for editor window")
self.loop.run_until_complete(asyncio.sleep(0.3))
# get window class # get window class
window_class = self.get_window_class(winid, dispvm) window_class = self.get_window_class(winid, dispvm)
@ -194,45 +174,66 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
expected_app_titles, expected_app_classes)) expected_app_titles, expected_app_classes))
def prepare_txt(self, filename): def prepare_txt(self, filename):
p = self.source_vm.run("cat > {}".format(filename), passio_popen=True) self.loop.run_until_complete(
p.stdin.write("This is test\n") self.source_vm.run_for_stdio("cat > {}".format(filename),
p.stdin.close() input=b'This is test\n'))
retcode = p.wait()
assert retcode == 0, "Failed to write {} file".format(filename)
def prepare_pdf(self, filename): def prepare_pdf(self, filename):
self.prepare_txt("/tmp/source.txt") self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename) cmd = "convert text:/tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True) try:
assert retcode == 0, "Failed to run '{}'".format(cmd) self.loop.run_until_complete(
self.source_vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
self.fail('{} failed: {}'.format(cmd, e.stderr.decode()))
def prepare_doc(self, filename): def prepare_doc(self, filename):
self.prepare_txt("/tmp/source.txt") self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename) cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True) try:
if retcode != 0: self.loop.run_until_complete(
self.skipTest("Failed to run '{}', not installed?".format(cmd)) self.source_vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
if e.returncode == 127:
self.skipTest("unoconv not installed".format(cmd))
self.skipTest("Failed to run '{}': {}".format(cmd,
e.stderr.decode()))
def prepare_pptx(self, filename): def prepare_pptx(self, filename):
self.prepare_txt("/tmp/source.txt") self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename) cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True) try:
if retcode != 0: self.loop.run_until_complete(
self.skipTest("Failed to run '{}', not installed?".format(cmd)) self.source_vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
if e.returncode == 127:
self.skipTest("unoconv not installed".format(cmd))
self.skipTest("Failed to run '{}': {}".format(cmd,
e.stderr.decode()))
def prepare_png(self, filename): def prepare_png(self, filename):
self.prepare_txt("/tmp/source.txt") self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename) cmd = "convert text:/tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True) try:
if retcode != 0: self.loop.run_until_complete(
self.skipTest("Failed to run '{}', not installed?".format(cmd)) self.source_vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
if e.returncode == 127:
self.skipTest("convert not installed".format(cmd))
self.skipTest("Failed to run '{}': {}".format(cmd,
e.stderr.decode()))
def prepare_jpg(self, filename): def prepare_jpg(self, filename):
self.prepare_txt("/tmp/source.txt") self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename) cmd = "convert text:/tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True) try:
if retcode != 0: self.loop.run_until_complete(
self.skipTest("Failed to run '{}', not installed?".format(cmd)) self.source_vm.run_for_stdio(cmd))
except subprocess.CalledProcessError as e:
if e.returncode == 127:
self.skipTest("convert not installed".format(cmd))
self.skipTest("Failed to run '{}': {}".format(cmd,
e.stderr.decode()))
def test_000_txt(self): def test_000_txt(self):
filename = "/home/user/test_file.txt" filename = "/home/user/test_file.txt"
@ -335,19 +336,8 @@ class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
dispvm=True) dispvm=True)
def load_tests(loader, tests, pattern): def load_tests(loader, tests, pattern):
try: tests.addTests(loader.loadTestsFromNames(
qc = qubes.qubes.QubesVmCollection() qubes.tests.create_testcases_for_templates('TC_50_MimeHandlers',
qc.lock_db_for_reading() TC_50_MimeHandlers, qubes.tests.SystemTestCase,
qc.load() module=sys.modules[__name__])))
qc.unlock_db() return tests
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_50_MimeHandlers_' + template,
(TC_50_MimeHandlers, qubes.tests.QubesTestCase),
{'template': template})))
return tests

View File

@ -342,6 +342,7 @@ fi
%{python3_sitelib}/qubes/tests/integ/devices_pci.py %{python3_sitelib}/qubes/tests/integ/devices_pci.py
%{python3_sitelib}/qubes/tests/integ/dispvm.py %{python3_sitelib}/qubes/tests/integ/dispvm.py
%{python3_sitelib}/qubes/tests/integ/dom0_update.py %{python3_sitelib}/qubes/tests/integ/dom0_update.py
%{python3_sitelib}/qubes/tests/integ/mime.py
%{python3_sitelib}/qubes/tests/integ/network.py %{python3_sitelib}/qubes/tests/integ/network.py
%{python3_sitelib}/qubes/tests/integ/pvgrub.py %{python3_sitelib}/qubes/tests/integ/pvgrub.py
%{python3_sitelib}/qubes/tests/integ/salt.py %{python3_sitelib}/qubes/tests/integ/salt.py