123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2016 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- #
- import os
- from distutils import spawn
- import re
- import subprocess
- import time
- import unittest
- import itertools
- import asyncio
- import sys
- import qubes.tests
- import qubes
- @unittest.skipUnless(
- spawn.find_executable('xprop') and
- spawn.find_executable('xdotool') and
- spawn.find_executable('wmctrl'),
- "xprop or xdotool or wmctrl not installed")
- class TC_50_MimeHandlers:
- def setUp(self):
- super(TC_50_MimeHandlers, self).setUp()
- if self.template.startswith('whonix-gw') or 'minimal' in self.template:
- raise unittest.SkipTest(
- 'Template {} not supported by this test'.format(self.template))
- self.source_vmname = self.make_vm_name('source')
- self.source_vm = self.app.add_new_vm("AppVM",
- template=self.template,
- name=self.source_vmname,
- label='red')
- self.loop.run_until_complete(self.source_vm.create_on_disk())
- self.target_vmname = self.make_vm_name('target')
- self.target_vm = self.app.add_new_vm("AppVM",
- template=self.template,
- name=self.target_vmname,
- label='red')
- self.loop.run_until_complete(self.target_vm.create_on_disk())
- self.target_vm.template_for_dispvms = True
- self.source_vm.default_dispvm = self.target_vm
- done, not_done = self.loop.run_until_complete(asyncio.gather(
- self.source_vm.start(),
- self.target_vm.start()))
- for result in itertools.chain(done, not_done):
- # catch any exceptions
- result.result()
- def get_window_class(self, winid, dispvm=False):
- (vm_winid, _) = subprocess.Popen(
- ['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
- stdout=subprocess.PIPE
- ).communicate()
- vm_winid = vm_winid.decode().split("#")[1].strip('\n" ')
- if dispvm:
- (vmname, _) = subprocess.Popen(
- ['xprop', '-id', winid, '_QUBES_VMNAME'],
- stdout=subprocess.PIPE
- ).communicate()
- vmname = vmname.decode().split("=")[1].strip('\n" ')
- vm = self.app.domains[vmname]
- else:
- vm = self.target_vm
- window_class = None
- while window_class is None:
- try:
- window_class, _ = self.loop.run_until_complete(
- vm.run_for_stdio('xprop -id {} WM_CLASS'.format(vm_winid)))
- except subprocess.CalledProcessError as e:
- if e.returncode == 127:
- self.skipTest('xprop not installed')
- self.fail(
- "xprop -id {} WM_CLASS failed: {}".format(
- vm_winid, e.stderr.decode()))
- if b'not found' in window_class:
- # WM_CLASS not set yet, wait a little
- time.sleep(0.1)
- window_class = None
- # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
- try:
- window_class = window_class.decode()
- window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
- except IndexError:
- raise Exception(
- "Unexpected output from xprop: '{}'".format(window_class))
- return window_class
- def open_file_and_check_viewer(self, filename, expected_app_titles,
- expected_app_classes, dispvm=False):
- if dispvm:
- p = self.loop.run_until_complete(self.source_vm.run(
- "qvm-open-in-dvm {}".format(filename), stdout=subprocess.PIPE))
- vmpattern = "disp[0-9]*"
- else:
- p = self.loop.run_until_complete(self.source_vm.run(
- "qvm-open-in-vm {} {}".format(self.target_vmname, filename),
- stdout=subprocess.PIPE))
- vmpattern = self.target_vmname
- wait_count = 0
- winid = None
- with self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
- self.target_vmname):
- with self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
- self.target_vmname):
- while True:
- search = subprocess.Popen(['xdotool', 'search',
- '--onlyvisible', '--class', vmpattern],
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL)
- retcode = search.wait()
- if retcode == 0:
- winid = search.stdout.read().strip()
- # get window title
- (window_title, _) = subprocess.Popen(
- ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
- communicate()
- window_title = window_title.decode('utf8').strip()
- # ignore LibreOffice splash screen and window with no title
- # set yet
- if window_title and \
- not window_title.startswith("LibreOffice") and\
- not window_title.startswith("NetworkManager") and\
- not window_title == 'VMapp command':
- break
- wait_count += 1
- if wait_count > 100:
- self.fail("Timeout while waiting for editor window")
- self.loop.run_until_complete(asyncio.sleep(0.3))
- # get window class
- window_class = self.get_window_class(winid, dispvm)
- # close the window - we've got the window class, it is no longer needed
- subprocess.check_call(['wmctrl', '-i', '-c', winid])
- # confirm quit for example for Firefox
- self.loop.run_until_complete(asyncio.sleep(1))
- subprocess.call(['xdotool', 'search', '--onlyvisible', '--name',
- 'Quit', 'windowfocus', 'key', 'Return'])
- try:
- self.loop.run_until_complete(asyncio.wait_for(p.wait(), 30))
- except asyncio.TimeoutError:
- self.fail('qvm-open-in-vm did not exited')
- self.wait_for_window(window_title, show=False)
- def check_matches(obj, patterns):
- return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
- else pat in obj) for pat in patterns)
- if not check_matches(window_title, expected_app_titles) and \
- not check_matches(window_class, expected_app_classes):
- self.fail("Opening file {} resulted in window '{} ({})', which is "
- "none of {!r} ({!r})".format(
- filename, window_title, window_class,
- expected_app_titles, expected_app_classes))
- def prepare_txt(self, filename):
- self.loop.run_until_complete(
- self.source_vm.run_for_stdio("cat > {}".format(filename),
- input=b'This is test\n'))
- def prepare_pdf(self, filename):
- self.prepare_txt("/tmp/source.txt")
- cmd = "convert text:/tmp/source.txt {}".format(filename)
- try:
- self.loop.run_until_complete(
- self.source_vm.run_for_stdio(cmd))
- except subprocess.CalledProcessError as e:
- self.fail('{} failed: {}'.format(cmd, e.stderr.decode()))
- def prepare_doc(self, filename):
- self.prepare_txt("/tmp/source.txt")
- cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
- try:
- self.loop.run_until_complete(
- self.source_vm.run_for_stdio(cmd))
- except subprocess.CalledProcessError as e:
- if e.returncode == 127:
- self.skipTest("unoconv not installed".format(cmd))
- self.skipTest("Failed to run '{}': {}".format(cmd,
- e.stderr.decode()))
- def prepare_pptx(self, filename):
- self.prepare_txt("/tmp/source.txt")
- cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
- try:
- self.loop.run_until_complete(
- self.source_vm.run_for_stdio(cmd))
- except subprocess.CalledProcessError as e:
- if e.returncode == 127:
- self.skipTest("unoconv not installed".format(cmd))
- self.skipTest("Failed to run '{}': {}".format(cmd,
- e.stderr.decode()))
- def prepare_png(self, filename):
- self.prepare_txt("/tmp/source.txt")
- cmd = "convert text:/tmp/source.txt {}".format(filename)
- try:
- self.loop.run_until_complete(
- self.source_vm.run_for_stdio(cmd))
- except subprocess.CalledProcessError as e:
- if e.returncode == 127:
- self.skipTest("convert not installed".format(cmd))
- self.skipTest("Failed to run '{}': {}".format(cmd,
- e.stderr.decode()))
- def prepare_jpg(self, filename):
- self.prepare_txt("/tmp/source.txt")
- cmd = "convert text:/tmp/source.txt {}".format(filename)
- try:
- self.loop.run_until_complete(
- self.source_vm.run_for_stdio(cmd))
- except subprocess.CalledProcessError as e:
- if e.returncode == 127:
- self.skipTest("convert not installed".format(cmd))
- self.skipTest("Failed to run '{}': {}".format(cmd,
- e.stderr.decode()))
- def test_000_txt(self):
- filename = "/home/user/test_file.txt"
- self.prepare_txt(filename)
- self.open_file_and_check_viewer(filename, ["vim", "user@"],
- ["gedit", "emacs", "libreoffice"])
- def test_001_pdf(self):
- filename = "/home/user/test_file.pdf"
- self.prepare_pdf(filename)
- self.open_file_and_check_viewer(filename, [],
- ["evince"])
- def test_002_doc(self):
- filename = "/home/user/test_file.doc"
- self.prepare_doc(filename)
- self.open_file_and_check_viewer(filename, [],
- ["libreoffice", "abiword"])
- def test_003_pptx(self):
- filename = "/home/user/test_file.pptx"
- self.prepare_pptx(filename)
- self.open_file_and_check_viewer(filename, [],
- ["libreoffice"])
- def test_004_png(self):
- filename = "/home/user/test_file.png"
- self.prepare_png(filename)
- self.open_file_and_check_viewer(filename, [],
- ["shotwell", "eog", "display"])
- def test_005_jpg(self):
- filename = "/home/user/test_file.jpg"
- self.prepare_jpg(filename)
- self.open_file_and_check_viewer(filename, [],
- ["shotwell", "eog", "display"])
- def test_006_jpeg(self):
- filename = "/home/user/test_file.jpeg"
- self.prepare_jpg(filename)
- self.open_file_and_check_viewer(filename, [],
- ["shotwell", "eog", "display"])
- def test_010_url(self):
- self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
- ["Firefox", "Iceweasel", "Navigator"])
- def test_100_txt_dispvm(self):
- filename = "/home/user/test_file.txt"
- self.prepare_txt(filename)
- self.open_file_and_check_viewer(filename, ["vim", "user@"],
- ["gedit", "emacs", "libreoffice"],
- dispvm=True)
- def test_101_pdf_dispvm(self):
- filename = "/home/user/test_file.pdf"
- self.prepare_pdf(filename)
- self.open_file_and_check_viewer(filename, [],
- ["evince"],
- dispvm=True)
- def test_102_doc_dispvm(self):
- filename = "/home/user/test_file.doc"
- self.prepare_doc(filename)
- self.open_file_and_check_viewer(filename, [],
- ["libreoffice", "abiword"],
- dispvm=True)
- def test_103_pptx_dispvm(self):
- filename = "/home/user/test_file.pptx"
- self.prepare_pptx(filename)
- self.open_file_and_check_viewer(filename, [],
- ["libreoffice"],
- dispvm=True)
- def test_104_png_dispvm(self):
- filename = "/home/user/test_file.png"
- self.prepare_png(filename)
- self.open_file_and_check_viewer(filename, [],
- ["shotwell", "eog", "display"],
- dispvm=True)
- def test_105_jpg_dispvm(self):
- filename = "/home/user/test_file.jpg"
- self.prepare_jpg(filename)
- self.open_file_and_check_viewer(filename, [],
- ["shotwell", "eog", "display"],
- dispvm=True)
- def test_106_jpeg_dispvm(self):
- filename = "/home/user/test_file.jpeg"
- self.prepare_jpg(filename)
- self.open_file_and_check_viewer(filename, [],
- ["shotwell", "eog", "display"],
- dispvm=True)
- def test_110_url_dispvm(self):
- self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
- ["Firefox", "Iceweasel", "Navigator"],
- dispvm=True)
- def create_testcases_for_templates():
- return qubes.tests.create_testcases_for_templates('TC_50_MimeHandlers',
- TC_50_MimeHandlers, qubes.tests.SystemTestCase,
- module=sys.modules[__name__])
- def load_tests(loader, tests, pattern):
- tests.addTests(loader.loadTestsFromNames(
- create_testcases_for_templates()))
- return tests
- qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)
|