Browse Source

tests: split vm_qrexec_gui

Marek Marczykowski-Górecki 7 years ago
parent
commit
fd5b357232
8 changed files with 1197 additions and 1028 deletions
  1. 14 6
      tests/Makefile
  2. 4 0
      tests/__init__.py
  3. 97 180
      tests/basic.py
  4. 431 0
      tests/dispvm.py
  5. 125 0
      tests/hvm.py
  6. 354 0
      tests/mime.py
  7. 172 0
      tests/pvgrub.py
  8. 0 842
      tests/vm_qrexec_gui.py

+ 14 - 6
tests/Makefile

@@ -19,12 +19,22 @@ endif
 	cp basic.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp block.py $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp block.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp dispvm.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp dispvm.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp dom0_update.py $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp dom0_update.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp hvm.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp hvm.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp mime.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp mime.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp network.py $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp network.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
-	cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH)
-	cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp pvgrub.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp pvgrub.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp regressions.py $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp regressions.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp run.py $(DESTDIR)$(PYTHON_TESTSPATH)
@@ -33,7 +43,5 @@ endif
 	cp storage.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp storage_xen.py $(DESTDIR)$(PYTHON_TESTSPATH)
 	cp storage_xen.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
-	cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH)
-	cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
-	cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH)
-	cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH)
+	cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)

+ 4 - 0
tests/__init__.py

@@ -755,7 +755,11 @@ def load_tests(loader, tests, pattern):
             'qubes.tests.basic',
             'qubes.tests.dom0_update',
             'qubes.tests.network',
+            'qubes.tests.dispvm',
             'qubes.tests.vm_qrexec_gui',
+            'qubes.tests.mime',
+            'qubes.tests.hvm',
+            'qubes.tests.pvgrub',
             'qubes.tests.backup',
             'qubes.tests.backupcompatibility',
             'qubes.tests.regressions',

+ 97 - 180
tests/basic.py

@@ -22,6 +22,7 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 #
+from distutils import spawn
 
 import multiprocessing
 import os
@@ -699,205 +700,121 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
         self.setup_hvm_template()
         self._do_test()
 
-class TC_04_DispVM(qubes.tests.SystemTestsMixin,
-                   qubes.tests.QubesTestCase):
-
-    @staticmethod
-    def get_dispvm_template_name():
-        vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
-        return os.path.basename(vmdir)
-
-    def test_000_firewall_propagation(self):
-        """
-        Check firewall propagation VM->DispVM, when VM have some firewall rules
-        """
-
-        # FIXME: currently qubes.xml doesn't contain this information...
-        dispvm_template_name = self.get_dispvm_template_name()
-        dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
 
+class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
+    @unittest.skipUnless(spawn.find_executable('xdotool'),
+                         "xdotool not installed")
+    def test_000_clipboard(self):
         testvm1 = self.qc.add_new_vm("QubesAppVm",
                                      name=self.make_vm_name('vm1'),
                                      template=self.qc.get_default_template())
         testvm1.create_on_disk(verbose=False)
-        firewall = testvm1.get_firewall_conf()
-        firewall['allowDns'] = False
-        firewall['allowYumProxy'] = False
-        firewall['rules'] = [{'address': '1.2.3.4',
-                              'netmask': 24,
-                              'proto': 'tcp',
-                              'portBegin': 22,
-                              'portEnd': 22,
-                              }]
-        testvm1.write_firewall_conf(firewall)
+        testvm2 = self.qc.add_new_vm("QubesAppVm",
+                                     name=self.make_vm_name('vm2'),
+                                     template=self.qc.get_default_template())
+        testvm2.create_on_disk(verbose=False)
         self.qc.save()
         self.qc.unlock_db()
 
         testvm1.start()
+        testvm2.start()
+
+        window_title = 'user@{}'.format(testvm1.name)
+        testvm1.run('zenity --text-info --editable --title={}'.format(
+            window_title))
+
+        self.wait_for_window(window_title)
+        time.sleep(0.5)
+        test_string = "test{}".format(testvm1.xid)
+
+        # Type and copy some text
+        subprocess.check_call(['xdotool', 'search', '--name', window_title,
+                               'windowactivate', '--sync',
+                               'type', '{}'.format(test_string)])
+        # second xdotool call because type --terminator do not work (SEGV)
+        # additionally do not use search here, so window stack will be empty
+        # and xdotool will use XTEST instead of generating events manually -
+        # this will be much better - at least because events will have
+        # correct timestamp (so gui-daemon would not drop the copy request)
+        subprocess.check_call(['xdotool',
+                               'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
+                               'Escape'])
+
+        clipboard_content = \
+            open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
+        self.assertEquals(clipboard_content, test_string,
+                          "Clipboard copy operation failed - content")
+        clipboard_source = \
+            open('/var/run/qubes/qubes-clipboard.bin.source',
+                 'r').read().strip()
+        self.assertEquals(clipboard_source, testvm1.name,
+                          "Clipboard copy operation failed - owner")
+
+        # Then paste it to the other window
+        window_title = 'user@{}'.format(testvm2.name)
+        p = testvm2.run('zenity --entry --title={} > test.txt'.format(
+                        window_title), passio_popen=True)
+        self.wait_for_window(window_title)
+
+        subprocess.check_call(['xdotool', 'key', '--delay', '100',
+                               'ctrl+shift+v', 'ctrl+v', 'Return'])
+        p.wait()
 
-        p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
-                        " read x'",
-                        passio_popen=True)
+        # And compare the result
+        (test_output, _) = testvm2.run('cat test.txt',
+                                       passio_popen=True).communicate()
+        self.assertEquals(test_string, test_output.strip())
 
-        dispvm_name = p.stdout.readline().strip()
-        self.qc.lock_db_for_reading()
-        self.qc.load()
-        self.qc.unlock_db()
-        dispvm = self.qc.get_vm_by_name(dispvm_name)
-        self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
-            dispvm_name))
-        # check if firewall was propagated to the DispVM
-        self.assertEquals(testvm1.get_firewall_conf(),
-                          dispvm.get_firewall_conf())
-        # and only there (#1608)
-        self.assertNotEquals(dispvm_template.get_firewall_conf(),
-                             dispvm.get_firewall_conf())
-        # then modify some rule
-        firewall = dispvm.get_firewall_conf()
-        firewall['rules'] = [{'address': '4.3.2.1',
-                              'netmask': 24,
-                              'proto': 'tcp',
-                              'portBegin': 22,
-                              'portEnd': 22,
-                              }]
-        dispvm.write_firewall_conf(firewall)
-        # and check again if wasn't saved anywhere else (#1608)
-        self.assertNotEquals(dispvm_template.get_firewall_conf(),
-                             dispvm.get_firewall_conf())
-        self.assertNotEquals(testvm1.get_firewall_conf(),
-                             dispvm.get_firewall_conf())
-        p.stdin.write('\n')
-        p.wait()
+        clipboard_content = \
+            open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
+        self.assertEquals(clipboard_content, "",
+                          "Clipboard not wiped after paste - content")
+        clipboard_source = \
+            open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
 
-    def test_001_firewall_propagation(self):
-        """
-        Check firewall propagation VM->DispVM, when VM have no firewall rules
-        """
+            ).strip()
+        self.assertEquals(clipboard_source, "",
+                          "Clipboard not wiped after paste - owner")
+
+class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
+    def test_000_create_start(self):
         testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     name=self.make_vm_name('vm1'),
-                                     template=self.qc.get_default_template())
-        testvm1.create_on_disk(verbose=False)
+                                     template=None,
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False,
+                               source_template=self.qc.get_default_template())
         self.qc.save()
         self.qc.unlock_db()
+        testvm1.start()
+        self.assertEquals(testvm1.get_power_state(), "Running")
 
-        # FIXME: currently qubes.xml doesn't contain this information...
-        dispvm_template_name = self.get_dispvm_template_name()
-        dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
-        original_firewall = None
-        if os.path.exists(dispvm_template.firewall_conf):
-            original_firewall = tempfile.TemporaryFile()
-            with open(dispvm_template.firewall_conf) as f:
-                original_firewall.write(f.read())
-        try:
-
-            firewall = dispvm_template.get_firewall_conf()
-            firewall['allowDns'] = False
-            firewall['allowYumProxy'] = False
-            firewall['rules'] = [{'address': '1.2.3.4',
-                                  'netmask': 24,
-                                  'proto': 'tcp',
-                                  'portBegin': 22,
-                                  'portEnd': 22,
-                                  }]
-            dispvm_template.write_firewall_conf(firewall)
-
-            testvm1.start()
-
-            p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
-                            " read x'",
-                            passio_popen=True)
-
-            dispvm_name = p.stdout.readline().strip()
-            self.qc.lock_db_for_reading()
-            self.qc.load()
-            self.qc.unlock_db()
-            dispvm = self.qc.get_vm_by_name(dispvm_name)
-            self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
-                dispvm_name))
-            # check if firewall was propagated to the DispVM from the right VM
-            self.assertEquals(testvm1.get_firewall_conf(),
-                              dispvm.get_firewall_conf())
-            # and only there (#1608)
-            self.assertNotEquals(dispvm_template.get_firewall_conf(),
-                                 dispvm.get_firewall_conf())
-            # then modify some rule
-            firewall = dispvm.get_firewall_conf()
-            firewall['rules'] = [{'address': '4.3.2.1',
-                                  'netmask': 24,
-                                  'proto': 'tcp',
-                                  'portBegin': 22,
-                                  'portEnd': 22,
-                                  }]
-            dispvm.write_firewall_conf(firewall)
-            # and check again if wasn't saved anywhere else (#1608)
-            self.assertNotEquals(dispvm_template.get_firewall_conf(),
-                                 dispvm.get_firewall_conf())
-            self.assertNotEquals(testvm1.get_firewall_conf(),
-                                 dispvm.get_firewall_conf())
-            p.stdin.write('\n')
-            p.wait()
-        finally:
-            if original_firewall:
-                original_firewall.seek(0)
-                with open(dispvm_template.firewall_conf, 'w') as f:
-                    f.write(original_firewall.read())
-                original_firewall.close()
-            else:
-                os.unlink(dispvm_template.firewall_conf)
-
-    def test_002_cleanup(self):
-        self.qc.unlock_db()
-        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
-                              'qubes.VMShell', 'dom0', 'DEFAULT'],
-                             stdin=subprocess.PIPE,
-                             stdout=subprocess.PIPE,
-                             stderr=open(os.devnull, 'w'))
-        (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
-                                          "echo ERROR\n")
-        self.assertEquals(p.returncode, 0)
-        lines = stdout.splitlines()
-        self.assertEqual(lines[0], "test")
-        dispvm_name = lines[1]
-        self.qc.lock_db_for_reading()
-        self.qc.load()
-        self.qc.unlock_db()
-        dispvm = self.qc.get_vm_by_name(dispvm_name)
-        self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
-            dispvm_name))
-
-    def test_003_cleanup_destroyed(self):
-        """
-        Check if DispVM is properly removed even if it terminated itself (#1660)
-        :return:
-        """
+    def test_100_resize_root_img(self):
+        testvm1 = self.qc.add_new_vm("QubesAppVm",
+                                     template=None,
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False,
+                               source_template=self.qc.get_default_template())
+        self.qc.save()
         self.qc.unlock_db()
-        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
-                              'qubes.VMShell', 'dom0', 'DEFAULT'],
-                             stdin=subprocess.PIPE,
-                             stdout=subprocess.PIPE,
-                             stderr=open(os.devnull, 'w'))
-        p.stdin.write("qubesdb-read /name\n")
-        p.stdin.write("echo ERROR\n")
-        p.stdin.write("sudo poweroff\n")
-        # do not close p.stdin on purpose - wait to automatic disconnect when
-        #  domain is destroyed
-        timeout = 30
-        while timeout > 0:
-            if p.poll():
-                break
+        with self.assertRaises(QubesException):
+            testvm1.resize_root_img(20*1024**3)
+        testvm1.resize_root_img(20*1024**3, allow_start=True)
+        timeout = 60
+        while testvm1.is_running():
             time.sleep(1)
             timeout -= 1
-        # includes check for None - timeout
-        self.assertEquals(p.returncode, 0)
-        lines = p.stdout.read().splitlines()
-        dispvm_name = lines[0]
-        self.assertNotEquals(dispvm_name, "ERROR")
-        self.qc.lock_db_for_reading()
-        self.qc.load()
-        self.qc.unlock_db()
-        dispvm = self.qc.get_vm_by_name(dispvm_name)
-        self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
-            dispvm_name))
+            if timeout == 0:
+                self.fail("Timeout while waiting for VM shutdown")
+        self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3)
+        testvm1.start()
+        p = testvm1.run('df --output=size /|tail -n 1',
+                        passio_popen=True)
+        # new_size in 1k-blocks
+        (new_size, _) = p.communicate()
+        # some safety margin for FS metadata
+        self.assertGreater(int(new_size.strip()), 19*1024**2)
+
+
 
 
 

+ 431 - 0
tests/dispvm.py

@@ -0,0 +1,431 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2016 Marek Marczykowski-Górecki
+#                                        <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+#
+#
+
+from distutils import spawn
+import qubes.tests
+import subprocess
+import tempfile
+import unittest
+import os
+import time
+
+class TC_04_DispVM(qubes.tests.SystemTestsMixin,
+                   qubes.tests.QubesTestCase):
+
+    @staticmethod
+    def get_dispvm_template_name():
+        vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
+        return os.path.basename(vmdir)
+
+    def test_000_firewall_propagation(self):
+        """
+        Check firewall propagation VM->DispVM, when VM have some firewall rules
+        """
+
+        # FIXME: currently qubes.xml doesn't contain this information...
+        dispvm_template_name = self.get_dispvm_template_name()
+        dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
+
+        testvm1 = self.qc.add_new_vm("QubesAppVm",
+                                     name=self.make_vm_name('vm1'),
+                                     template=self.qc.get_default_template())
+        testvm1.create_on_disk(verbose=False)
+        firewall = testvm1.get_firewall_conf()
+        firewall['allowDns'] = False
+        firewall['allowYumProxy'] = False
+        firewall['rules'] = [{'address': '1.2.3.4',
+                              'netmask': 24,
+                              'proto': 'tcp',
+                              'portBegin': 22,
+                              'portEnd': 22,
+                              }]
+        testvm1.write_firewall_conf(firewall)
+        self.qc.save()
+        self.qc.unlock_db()
+
+        testvm1.start()
+
+        p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
+                        " read x'",
+                        passio_popen=True)
+
+        dispvm_name = p.stdout.readline().strip()
+        self.qc.lock_db_for_reading()
+        self.qc.load()
+        self.qc.unlock_db()
+        dispvm = self.qc.get_vm_by_name(dispvm_name)
+        self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
+            dispvm_name))
+        # check if firewall was propagated to the DispVM
+        self.assertEquals(testvm1.get_firewall_conf(),
+                          dispvm.get_firewall_conf())
+        # and only there (#1608)
+        self.assertNotEquals(dispvm_template.get_firewall_conf(),
+                             dispvm.get_firewall_conf())
+        # then modify some rule
+        firewall = dispvm.get_firewall_conf()
+        firewall['rules'] = [{'address': '4.3.2.1',
+                              'netmask': 24,
+                              'proto': 'tcp',
+                              'portBegin': 22,
+                              'portEnd': 22,
+                              }]
+        dispvm.write_firewall_conf(firewall)
+        # and check again if wasn't saved anywhere else (#1608)
+        self.assertNotEquals(dispvm_template.get_firewall_conf(),
+                             dispvm.get_firewall_conf())
+        self.assertNotEquals(testvm1.get_firewall_conf(),
+                             dispvm.get_firewall_conf())
+        p.stdin.write('\n')
+        p.wait()
+
+    def test_001_firewall_propagation(self):
+        """
+        Check firewall propagation VM->DispVM, when VM have no firewall rules
+        """
+        testvm1 = self.qc.add_new_vm("QubesAppVm",
+                                     name=self.make_vm_name('vm1'),
+                                     template=self.qc.get_default_template())
+        testvm1.create_on_disk(verbose=False)
+        self.qc.save()
+        self.qc.unlock_db()
+
+        # FIXME: currently qubes.xml doesn't contain this information...
+        dispvm_template_name = self.get_dispvm_template_name()
+        dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
+        original_firewall = None
+        if os.path.exists(dispvm_template.firewall_conf):
+            original_firewall = tempfile.TemporaryFile()
+            with open(dispvm_template.firewall_conf) as f:
+                original_firewall.write(f.read())
+        try:
+
+            firewall = dispvm_template.get_firewall_conf()
+            firewall['allowDns'] = False
+            firewall['allowYumProxy'] = False
+            firewall['rules'] = [{'address': '1.2.3.4',
+                                  'netmask': 24,
+                                  'proto': 'tcp',
+                                  'portBegin': 22,
+                                  'portEnd': 22,
+                                  }]
+            dispvm_template.write_firewall_conf(firewall)
+
+            testvm1.start()
+
+            p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
+                            " read x'",
+                            passio_popen=True)
+
+            dispvm_name = p.stdout.readline().strip()
+            self.qc.lock_db_for_reading()
+            self.qc.load()
+            self.qc.unlock_db()
+            dispvm = self.qc.get_vm_by_name(dispvm_name)
+            self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
+                dispvm_name))
+            # check if firewall was propagated to the DispVM from the right VM
+            self.assertEquals(testvm1.get_firewall_conf(),
+                              dispvm.get_firewall_conf())
+            # and only there (#1608)
+            self.assertNotEquals(dispvm_template.get_firewall_conf(),
+                                 dispvm.get_firewall_conf())
+            # then modify some rule
+            firewall = dispvm.get_firewall_conf()
+            firewall['rules'] = [{'address': '4.3.2.1',
+                                  'netmask': 24,
+                                  'proto': 'tcp',
+                                  'portBegin': 22,
+                                  'portEnd': 22,
+                                  }]
+            dispvm.write_firewall_conf(firewall)
+            # and check again if wasn't saved anywhere else (#1608)
+            self.assertNotEquals(dispvm_template.get_firewall_conf(),
+                                 dispvm.get_firewall_conf())
+            self.assertNotEquals(testvm1.get_firewall_conf(),
+                                 dispvm.get_firewall_conf())
+            p.stdin.write('\n')
+            p.wait()
+        finally:
+            if original_firewall:
+                original_firewall.seek(0)
+                with open(dispvm_template.firewall_conf, 'w') as f:
+                    f.write(original_firewall.read())
+                original_firewall.close()
+            else:
+                os.unlink(dispvm_template.firewall_conf)
+
+    def test_002_cleanup(self):
+        self.qc.unlock_db()
+        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
+                              'qubes.VMShell', 'dom0', 'DEFAULT'],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=open(os.devnull, 'w'))
+        (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
+                                          "echo ERROR\n")
+        self.assertEquals(p.returncode, 0)
+        lines = stdout.splitlines()
+        self.assertEqual(lines[0], "test")
+        dispvm_name = lines[1]
+        self.qc.lock_db_for_reading()
+        self.qc.load()
+        self.qc.unlock_db()
+        dispvm = self.qc.get_vm_by_name(dispvm_name)
+        self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
+            dispvm_name))
+
+    def test_003_cleanup_destroyed(self):
+        """
+        Check if DispVM is properly removed even if it terminated itself (#1660)
+        :return:
+        """
+        self.qc.unlock_db()
+        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
+                              'qubes.VMShell', 'dom0', 'DEFAULT'],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=open(os.devnull, 'w'))
+        p.stdin.write("qubesdb-read /name\n")
+        p.stdin.write("echo ERROR\n")
+        p.stdin.write("sudo poweroff\n")
+        # do not close p.stdin on purpose - wait to automatic disconnect when
+        #  domain is destroyed
+        timeout = 30
+        while timeout > 0:
+            if p.poll():
+                break
+            time.sleep(1)
+            timeout -= 1
+        # includes check for None - timeout
+        self.assertEquals(p.returncode, 0)
+        lines = p.stdout.read().splitlines()
+        dispvm_name = lines[0]
+        self.assertNotEquals(dispvm_name, "ERROR")
+        self.qc.lock_db_for_reading()
+        self.qc.load()
+        self.qc.unlock_db()
+        dispvm = self.qc.get_vm_by_name(dispvm_name)
+        self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
+            dispvm_name))
+
+
+class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
+    def test_000_prepare_dvm(self):
+        self.qc.unlock_db()
+        retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
+                                   self.template],
+                                  stderr=open(os.devnull, 'w'))
+        self.assertEqual(retcode, 0)
+        self.qc.lock_db_for_writing()
+        self.qc.load()
+        self.assertIsNotNone(self.qc.get_vm_by_name(
+            self.template + "-dvm"))
+        # TODO: check mtime of snapshot file
+
+    def test_010_simple_dvm_run(self):
+        self.qc.unlock_db()
+        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
+                              'qubes.VMShell', 'dom0', 'DEFAULT'],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=open(os.devnull, 'w'))
+        (stdout, _) = p.communicate(input="echo test")
+        self.assertEqual(stdout, "test\n")
+        # TODO: check if DispVM is destroyed
+
+    @unittest.skipUnless(spawn.find_executable('xdotool'),
+                         "xdotool not installed")
+    def test_020_gui_app(self):
+        self.qc.unlock_db()
+        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
+                              'qubes.VMShell', 'dom0', 'DEFAULT'],
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=open(os.devnull, 'w'))
+
+        # wait for DispVM startup:
+        p.stdin.write("echo test\n")
+        p.stdin.flush()
+        l = p.stdout.readline()
+        self.assertEqual(l, "test\n")
+
+        # potential race condition, but our tests are supposed to be
+        # running on dedicated machine, so should not be a problem
+        self.qc.lock_db_for_reading()
+        self.qc.load()
+        self.qc.unlock_db()
+
+        max_qid = 0
+        for vm in self.qc.values():
+            if not vm.is_disposablevm():
+                continue
+            if vm.qid > max_qid:
+                max_qid = vm.qid
+        dispvm = self.qc[max_qid]
+        self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
+        self.assertTrue(dispvm.is_running())
+        try:
+            window_title = 'user@%s' % (dispvm.template.name + "-dvm")
+            p.stdin.write("xterm -e "
+                "\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
+                format(window_title))
+            self.wait_for_window(window_title)
+
+            time.sleep(0.5)
+            self.enter_keys_in_window(window_title, ['Return'])
+            # Wait for window to close
+            self.wait_for_window(window_title, show=False)
+        finally:
+            p.stdin.close()
+
+        wait_count = 0
+        while dispvm.is_running():
+            wait_count += 1
+            if wait_count > 100:
+                self.fail("Timeout while waiting for DispVM destruction")
+            time.sleep(0.1)
+        wait_count = 0
+        while p.poll() is None:
+            wait_count += 1
+            if wait_count > 100:
+                self.fail("Timeout while waiting for qfile-daemon-dvm "
+                          "termination")
+            time.sleep(0.1)
+        self.assertEqual(p.returncode, 0)
+
+        self.qc.lock_db_for_reading()
+        self.qc.load()
+        self.qc.unlock_db()
+        self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
+                          "DispVM not removed from qubes.xml")
+
+    def _handle_editor(self, winid):
+        (window_title, _) = subprocess.Popen(
+            ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
+            communicate()
+        window_title = window_title.strip().\
+            replace('(', '\(').replace(')', '\)')
+        time.sleep(1)
+        if "gedit" in window_title:
+            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
+                'type', 'Test test 2\n'])
+            time.sleep(0.5)
+            subprocess.check_call(['xdotool',
+                                   'key', 'ctrl+s', 'ctrl+q'])
+        elif "LibreOffice" in window_title:
+            # wait for actual editor (we've got splash screen)
+            search = subprocess.Popen(['xdotool', 'search', '--sync',
+                '--onlyvisible', '--all', '--name', '--class', 'disp*|Writer'],
+                stdout=subprocess.PIPE,
+                                  stderr=open(os.path.devnull, 'w'))
+            retcode = search.wait()
+            if retcode == 0:
+                winid = search.stdout.read().strip()
+            time.sleep(0.5)
+            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
+                'type', 'Test test 2\n'])
+            time.sleep(0.5)
+            subprocess.check_call(['xdotool',
+                                   'key', '--delay', '100', 'ctrl+s',
+                'Return', 'ctrl+q'])
+        elif "emacs" in window_title:
+            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
+                                   'type', 'Test test 2\n'])
+            time.sleep(0.5)
+            subprocess.check_call(['xdotool',
+                                   'key', 'ctrl+x', 'ctrl+s'])
+            subprocess.check_call(['xdotool',
+                                   'key', 'ctrl+x', 'ctrl+c'])
+        elif "vim" in window_title or "user@" in window_title:
+            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
+                                   'key', 'i', 'type', 'Test test 2\n'])
+            subprocess.check_call(
+                ['xdotool',
+                 'key', 'Escape', 'colon', 'w', 'q', 'Return'])
+        else:
+            self.fail("Unknown editor window: {}".format(window_title))
+
+    @unittest.skipUnless(spawn.find_executable('xdotool'),
+                         "xdotool not installed")
+    def test_030_edit_file(self):
+        testvm1 = self.qc.add_new_vm("QubesAppVm",
+                                     name=self.make_vm_name('vm1'),
+                                     template=self.qc.get_vm_by_name(
+                                         self.template))
+        testvm1.create_on_disk(verbose=False)
+        self.qc.save()
+
+        testvm1.start()
+        testvm1.run("echo test1 > /home/user/test.txt", wait=True)
+
+        self.qc.unlock_db()
+        p = testvm1.run("qvm-open-in-dvm /home/user/test.txt",
+                        passio_popen=True)
+
+        wait_count = 0
+        winid = None
+        while True:
+            search = subprocess.Popen(['xdotool', 'search',
+                                       '--onlyvisible', '--class', 'disp*'],
+                                      stdout=subprocess.PIPE,
+                                      stderr=open(os.path.devnull, 'w'))
+            retcode = search.wait()
+            if retcode == 0:
+                winid = search.stdout.read().strip()
+                break
+            wait_count += 1
+            if wait_count > 100:
+                self.fail("Timeout while waiting for editor window")
+            time.sleep(0.3)
+
+        time.sleep(0.5)
+        self._handle_editor(winid)
+        p.wait()
+        p = testvm1.run("cat /home/user/test.txt",
+                        passio_popen=True)
+        (test_txt_content, _) = p.communicate()
+        # Drop BOM if added by editor
+        if test_txt_content.startswith('\xef\xbb\xbf'):
+            test_txt_content = test_txt_content[3:]
+        self.assertEqual(test_txt_content, "Test test 2\ntest1\n")
+
+def load_tests(loader, tests, pattern):
+    try:
+        qc = qubes.qubes.QubesVmCollection()
+        qc.lock_db_for_reading()
+        qc.load()
+        qc.unlock_db()
+        templates = [vm.name for vm in qc.values() if
+                     isinstance(vm, qubes.qubes.QubesTemplateVm)]
+    except OSError:
+        templates = []
+    for template in templates:
+        tests.addTests(loader.loadTestsFromTestCase(
+            type(
+                'TC_20_DispVM_' + template,
+                (TC_20_DispVMMixin, qubes.tests.QubesTestCase),
+                {'template': template})))
+
+    return tests

+ 125 - 0
tests/hvm.py

@@ -0,0 +1,125 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2016 Marek Marczykowski-Górecki
+#                                        <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+#
+#
+
+import qubes.tests
+from qubes.qubes import QubesException
+
+class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
+    # TODO: test with some OS inside
+    # TODO: windows tools tests
+
+    def test_000_create_start(self):
+        testvm1 = self.qc.add_new_vm("QubesHVm",
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False)
+        self.qc.save()
+        self.qc.unlock_db()
+        testvm1.start()
+        self.assertEquals(testvm1.get_power_state(), "Running")
+
+    def test_010_create_start_template(self):
+        templatevm = self.qc.add_new_vm("QubesTemplateHVm",
+                                        name=self.make_vm_name('template'))
+        templatevm.create_on_disk(verbose=False)
+        self.qc.save()
+        self.qc.unlock_db()
+
+        templatevm.start()
+        self.assertEquals(templatevm.get_power_state(), "Running")
+
+    def test_020_create_start_template_vm(self):
+        templatevm = self.qc.add_new_vm("QubesTemplateHVm",
+                                        name=self.make_vm_name('template'))
+        templatevm.create_on_disk(verbose=False)
+        testvm2 = self.qc.add_new_vm("QubesHVm",
+                                     name=self.make_vm_name('vm2'),
+                                     template=templatevm)
+        testvm2.create_on_disk(verbose=False)
+        self.qc.save()
+        self.qc.unlock_db()
+
+        testvm2.start()
+        self.assertEquals(testvm2.get_power_state(), "Running")
+
+    def test_030_prevent_simultaneus_start(self):
+        templatevm = self.qc.add_new_vm("QubesTemplateHVm",
+                                        name=self.make_vm_name('template'))
+        templatevm.create_on_disk(verbose=False)
+        testvm2 = self.qc.add_new_vm("QubesHVm",
+                                     name=self.make_vm_name('vm2'),
+                                     template=templatevm)
+        testvm2.create_on_disk(verbose=False)
+        self.qc.save()
+        self.qc.unlock_db()
+
+        templatevm.start()
+        self.assertEquals(templatevm.get_power_state(), "Running")
+        self.assertRaises(QubesException, testvm2.start)
+        templatevm.force_shutdown()
+        testvm2.start()
+        self.assertEquals(testvm2.get_power_state(), "Running")
+        self.assertRaises(QubesException, templatevm.start)
+
+    def test_100_resize_root_img(self):
+        testvm1 = self.qc.add_new_vm("QubesHVm",
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False)
+        self.qc.save()
+        self.qc.unlock_db()
+        testvm1.resize_root_img(30*1024**3)
+        self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3)
+        testvm1.start()
+        self.assertEquals(testvm1.get_power_state(), "Running")
+        # TODO: launch some OS there and check the size
+
+    def test_200_start_invalid_drive(self):
+        """Regression test for #1619"""
+        testvm1 = self.qc.add_new_vm("QubesHVm",
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False)
+        testvm1.drive = 'hd:dom0:/invalid'
+        self.qc.save()
+        self.qc.unlock_db()
+        try:
+            testvm1.start()
+        except Exception as e:
+            self.assertIsInstance(e, QubesException)
+        else:
+            self.fail('No exception raised')
+
+    def test_201_start_invalid_drive_cdrom(self):
+        """Regression test for #1619"""
+        testvm1 = self.qc.add_new_vm("QubesHVm",
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False)
+        testvm1.drive = 'cdrom:dom0:/invalid'
+        self.qc.save()
+        self.qc.unlock_db()
+        try:
+            testvm1.start()
+        except Exception as e:
+            self.assertIsInstance(e, QubesException)
+        else:
+            self.fail('No exception raised')
+

+ 354 - 0
tests/mime.py

@@ -0,0 +1,354 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2016 Marek Marczykowski-Górecki
+#                                        <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+#
+#
+from distutils import spawn
+import os
+import re
+import subprocess
+import time
+import unittest
+
+import qubes.tests
+import qubes.qubes
+from qubes.qubes import QubesVmCollection
+
+@unittest.skipUnless(
+    spawn.find_executable('xprop') and
+    spawn.find_executable('xdotool') and
+    spawn.find_executable('wmctrl'),
+    "xprop or xdotool or wmctrl not installed")
+class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
+    @classmethod
+    def setUpClass(cls):
+        if cls.template == 'whonix-gw' or 'minimal' in cls.template:
+            raise unittest.SkipTest(
+                'Template {} not supported by this test'.format(cls.template))
+
+        if cls.template == 'whonix-ws':
+            # TODO remove when Whonix-based DispVMs will work (Whonix 13?)
+            raise unittest.SkipTest(
+                'Template {} not supported by this test'.format(cls.template))
+
+        qc = QubesVmCollection()
+
+        cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
+
+        qc.lock_db_for_writing()
+        qc.load()
+
+        cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
+                            prefix=qubes.tests.CLSVMPREFIX)
+
+        cls.source_vmname = cls.make_vm_name('source', True)
+        source_vm = qc.add_new_vm("QubesAppVm",
+                                  template=qc.get_vm_by_name(cls.template),
+                                  name=cls.source_vmname)
+        source_vm.create_on_disk(verbose=False)
+
+        cls.target_vmname = cls.make_vm_name('target', True)
+        target_vm = qc.add_new_vm("QubesAppVm",
+                                  template=qc.get_vm_by_name(cls.template),
+                                  name=cls.target_vmname)
+        target_vm.create_on_disk(verbose=False)
+
+        qc.save()
+        qc.unlock_db()
+        source_vm.start()
+        target_vm.start()
+
+        # make sure that DispVMs will be started of the same template
+        retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
+                                   cls.template],
+                                  stderr=open(os.devnull, 'w'))
+        assert retcode == 0, "Error preparing DispVM"
+
+    def setUp(self):
+        super(TC_50_MimeHandlers, self).setUp()
+        self.source_vm = self.qc.get_vm_by_name(self.source_vmname)
+        self.target_vm = self.qc.get_vm_by_name(self.target_vmname)
+
+    def get_window_class(self, winid, dispvm=False):
+        (vm_winid, _) = subprocess.Popen(
+            ['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
+            stdout=subprocess.PIPE
+        ).communicate()
+        vm_winid = vm_winid.split("#")[1].strip('\n" ')
+        if dispvm:
+            (vmname, _) = subprocess.Popen(
+                ['xprop', '-id', winid, '_QUBES_VMNAME'],
+                stdout=subprocess.PIPE
+            ).communicate()
+            vmname = vmname.split("=")[1].strip('\n" ')
+            window_class = None
+            while window_class is None:
+                # XXX to use self.qc.get_vm_by_name would require reloading
+                # qubes.xml, so use qvm-run instead
+                xprop = subprocess.Popen(
+                    ['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
+                        vm_winid)], stdout=subprocess.PIPE)
+                (window_class, _) = xprop.communicate()
+                if xprop.returncode != 0:
+                    self.skipTest("xprop failed, not installed?")
+                if 'not found' in window_class:
+                    # WM_CLASS not set yet, wait a little
+                    time.sleep(0.1)
+                    window_class = None
+        else:
+            window_class = None
+            while window_class is None:
+                xprop = self.target_vm.run(
+                    'xprop -id {} WM_CLASS'.format(vm_winid),
+                    passio_popen=True)
+                (window_class, _) = xprop.communicate()
+                if xprop.returncode != 0:
+                    self.skipTest("xprop failed, not installed?")
+                if 'not found' in window_class:
+                    # WM_CLASS not set yet, wait a little
+                    time.sleep(0.1)
+                    window_class = None
+        # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
+        try:
+            window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
+        except IndexError:
+            raise Exception(
+                "Unexpected output from xprop: '{}'".format(window_class))
+
+        return window_class
+
+    def open_file_and_check_viewer(self, filename, expected_app_titles,
+                                   expected_app_classes, dispvm=False):
+        self.qc.unlock_db()
+        if dispvm:
+            p = self.source_vm.run("qvm-open-in-dvm {}".format(filename),
+                                   passio_popen=True)
+            vmpattern = "disp*"
+        else:
+            self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
+                self.target_vmname)
+            self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
+                self.target_vmname)
+            p = self.source_vm.run("qvm-open-in-vm {} {}".format(
+                self.target_vmname, filename), passio_popen=True)
+            vmpattern = self.target_vmname
+        wait_count = 0
+        winid = None
+        window_title = None
+        while True:
+            search = subprocess.Popen(['xdotool', 'search',
+                                       '--onlyvisible', '--class', vmpattern],
+                                      stdout=subprocess.PIPE,
+                                      stderr=open(os.path.devnull, 'w'))
+            retcode = search.wait()
+            if retcode == 0:
+                winid = search.stdout.read().strip()
+                # get window title
+                (window_title, _) = subprocess.Popen(
+                    ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
+                    communicate()
+                window_title = window_title.strip()
+                # ignore LibreOffice splash screen and window with no title
+                # set yet
+                if window_title and not window_title.startswith("LibreOffice")\
+                        and not window_title == 'VMapp command':
+                    break
+            wait_count += 1
+            if wait_count > 100:
+                self.fail("Timeout while waiting for editor window")
+            time.sleep(0.3)
+
+        # get window class
+        window_class = self.get_window_class(winid, dispvm)
+        # close the window - we've got the window class, it is no longer needed
+        subprocess.check_call(['wmctrl', '-i', '-c', winid])
+        p.wait()
+        self.wait_for_window(window_title, show=False)
+
+        def check_matches(obj, patterns):
+            return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
+                        else pat in obj) for pat in patterns)
+
+        if not check_matches(window_title, expected_app_titles) and \
+                not check_matches(window_class, expected_app_classes):
+            self.fail("Opening file {} resulted in window '{} ({})', which is "
+                      "none of {!r} ({!r})".format(
+                          filename, window_title, window_class,
+                          expected_app_titles, expected_app_classes))
+
+    def prepare_txt(self, filename):
+        p = self.source_vm.run("cat > {}".format(filename), passio_popen=True)
+        p.stdin.write("This is test\n")
+        p.stdin.close()
+        retcode = p.wait()
+        assert retcode == 0, "Failed to write {} file".format(filename)
+
+    def prepare_pdf(self, filename):
+        self.prepare_txt("/tmp/source.txt")
+        cmd = "convert /tmp/source.txt {}".format(filename)
+        retcode = self.source_vm.run(cmd, wait=True)
+        assert retcode == 0, "Failed to run '{}'".format(cmd)
+
+    def prepare_doc(self, filename):
+        self.prepare_txt("/tmp/source.txt")
+        cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
+        retcode = self.source_vm.run(cmd, wait=True)
+        if retcode != 0:
+            self.skipTest("Failed to run '{}', not installed?".format(cmd))
+
+    def prepare_pptx(self, filename):
+        self.prepare_txt("/tmp/source.txt")
+        cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
+        retcode = self.source_vm.run(cmd, wait=True)
+        if retcode != 0:
+            self.skipTest("Failed to run '{}', not installed?".format(cmd))
+
+    def prepare_png(self, filename):
+        self.prepare_txt("/tmp/source.txt")
+        cmd = "convert /tmp/source.txt {}".format(filename)
+        retcode = self.source_vm.run(cmd, wait=True)
+        if retcode != 0:
+            self.skipTest("Failed to run '{}', not installed?".format(cmd))
+
+    def prepare_jpg(self, filename):
+        self.prepare_txt("/tmp/source.txt")
+        cmd = "convert /tmp/source.txt {}".format(filename)
+        retcode = self.source_vm.run(cmd, wait=True)
+        if retcode != 0:
+            self.skipTest("Failed to run '{}', not installed?".format(cmd))
+
+    def test_000_txt(self):
+        filename = "/home/user/test_file.txt"
+        self.prepare_txt(filename)
+        self.open_file_and_check_viewer(filename, ["vim", "user@"],
+                                        ["gedit", "emacs", "libreoffice"])
+
+    def test_001_pdf(self):
+        filename = "/home/user/test_file.pdf"
+        self.prepare_pdf(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["evince"])
+
+    def test_002_doc(self):
+        filename = "/home/user/test_file.doc"
+        self.prepare_doc(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["libreoffice", "abiword"])
+
+    def test_003_pptx(self):
+        filename = "/home/user/test_file.pptx"
+        self.prepare_pptx(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["libreoffice"])
+
+    def test_004_png(self):
+        filename = "/home/user/test_file.png"
+        self.prepare_png(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["shotwell", "eog", "display"])
+
+    def test_005_jpg(self):
+        filename = "/home/user/test_file.jpg"
+        self.prepare_jpg(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["shotwell", "eog", "display"])
+
+    def test_006_jpeg(self):
+        filename = "/home/user/test_file.jpeg"
+        self.prepare_jpg(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["shotwell", "eog", "display"])
+
+    def test_010_url(self):
+        self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
+                                        ["Firefox", "Iceweasel", "Navigator"])
+
+    def test_100_txt_dispvm(self):
+        filename = "/home/user/test_file.txt"
+        self.prepare_txt(filename)
+        self.open_file_and_check_viewer(filename, ["vim", "user@"],
+                                        ["gedit", "emacs", "libreoffice"],
+                                        dispvm=True)
+
+    def test_101_pdf_dispvm(self):
+        filename = "/home/user/test_file.pdf"
+        self.prepare_pdf(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["evince"],
+                                        dispvm=True)
+
+    def test_102_doc_dispvm(self):
+        filename = "/home/user/test_file.doc"
+        self.prepare_doc(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["libreoffice", "abiword"],
+                                        dispvm=True)
+
+    def test_103_pptx_dispvm(self):
+        filename = "/home/user/test_file.pptx"
+        self.prepare_pptx(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["libreoffice"],
+                                        dispvm=True)
+
+    def test_104_png_dispvm(self):
+        filename = "/home/user/test_file.png"
+        self.prepare_png(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["shotwell", "eog", "display"],
+                                        dispvm=True)
+
+    def test_105_jpg_dispvm(self):
+        filename = "/home/user/test_file.jpg"
+        self.prepare_jpg(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["shotwell", "eog", "display"],
+                                        dispvm=True)
+
+    def test_106_jpeg_dispvm(self):
+        filename = "/home/user/test_file.jpeg"
+        self.prepare_jpg(filename)
+        self.open_file_and_check_viewer(filename, [],
+                                        ["shotwell", "eog", "display"],
+                                        dispvm=True)
+
+    def test_110_url_dispvm(self):
+        self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
+                                        ["Firefox", "Iceweasel", "Navigator"],
+                                        dispvm=True)
+
+def load_tests(loader, tests, pattern):
+    try:
+        qc = qubes.qubes.QubesVmCollection()
+        qc.lock_db_for_reading()
+        qc.load()
+        qc.unlock_db()
+        templates = [vm.name for vm in qc.values() if
+                     isinstance(vm, qubes.qubes.QubesTemplateVm)]
+    except OSError:
+        templates = []
+    for template in templates:
+        tests.addTests(loader.loadTestsFromTestCase(
+            type(
+                'TC_50_MimeHandlers_' + template,
+                (TC_50_MimeHandlers, qubes.tests.QubesTestCase),
+                {'template': template})))
+    return tests

+ 172 - 0
tests/pvgrub.py

@@ -0,0 +1,172 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2016 Marek Marczykowski-Górecki
+#                                        <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+#
+#
+
+import os
+import unittest
+import qubes.tests
+@unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'),
+                     'grub-xen package not installed')
+class TC_40_PVGrub(qubes.tests.SystemTestsMixin):
+    def setUp(self):
+        super(TC_40_PVGrub, self).setUp()
+        supported = False
+        if self.template.startswith('fedora-'):
+            supported = True
+        elif self.template.startswith('debian-'):
+            supported = True
+        if not supported:
+            self.skipTest("Template {} not supported by this test".format(
+                self.template))
+
+    def install_packages(self, vm):
+        if self.template.startswith('fedora-'):
+            cmd_install1 = 'dnf clean expire-cache && ' \
+                'dnf install -y qubes-kernel-vm-support grub2-tools'
+            cmd_install2 = 'dnf install -y kernel && ' \
+                'KVER=$(rpm -q --qf %{VERSION}-%{RELEASE}.%{ARCH} kernel) && ' \
+                'dnf install --allowerasing  -y kernel-devel-$KVER && ' \
+                'dkms autoinstall -k $KVER'
+            cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg'
+        elif self.template.startswith('debian-'):
+            cmd_install1 = 'apt-get update && apt-get install -y ' \
+                           'qubes-kernel-vm-support grub2-common'
+            cmd_install2 = 'apt-get install -y linux-image-amd64'
+            cmd_update_grub = 'mkdir /boot/grub && update-grub2'
+        else:
+            assert False, "Unsupported template?!"
+
+        for cmd in [cmd_install1, cmd_install2, cmd_update_grub]:
+            p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True)
+            (stdout, stderr) = p.communicate()
+            self.assertEquals(p.returncode, 0,
+                              "Failed command: {}\nSTDOUT: {}\nSTDERR: {}"
+                              .format(cmd, stdout, stderr))
+
+    def get_kernel_version(self, vm):
+        if self.template.startswith('fedora-'):
+            cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \
+                                     'cut -d - -f 2-'
+        elif self.template.startswith('debian-'):
+            cmd_get_kernel_version = \
+                'dpkg-query --showformat=\'${Package}\\n\' --show ' \
+                '\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-'
+        else:
+            raise RuntimeError("Unsupported template?!")
+
+        p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True)
+        (kver, _) = p.communicate()
+        self.assertEquals(p.returncode, 0,
+                          "Failed command: {}".format(cmd_get_kernel_version))
+        return kver.strip()
+
+    def test_000_standalone_vm(self):
+        testvm1 = self.qc.add_new_vm("QubesAppVm",
+                                     template=None,
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False,
+                               source_template=self.qc.get_vm_by_name(
+                                   self.template))
+        self.save_and_reload_db()
+        self.qc.unlock_db()
+        testvm1 = self.qc[testvm1.qid]
+        testvm1.start()
+        self.install_packages(testvm1)
+        kver = self.get_kernel_version(testvm1)
+        self.shutdown_and_wait(testvm1)
+
+        self.qc.lock_db_for_writing()
+        self.qc.load()
+        testvm1 = self.qc[testvm1.qid]
+        testvm1.kernel = 'pvgrub2'
+        self.save_and_reload_db()
+        self.qc.unlock_db()
+        testvm1 = self.qc[testvm1.qid]
+        testvm1.start()
+        p = testvm1.run('uname -r', passio_popen=True)
+        (actual_kver, _) = p.communicate()
+        self.assertEquals(actual_kver.strip(), kver)
+
+    def test_010_template_based_vm(self):
+        test_template = self.qc.add_new_vm("QubesTemplateVm",
+                                           template=None,
+                                           name=self.make_vm_name('template'))
+        test_template.clone_attrs(self.qc.get_vm_by_name(self.template))
+        test_template.clone_disk_files(
+            src_vm=self.qc.get_vm_by_name(self.template),
+            verbose=False)
+
+        testvm1 = self.qc.add_new_vm("QubesAppVm",
+                                     template=test_template,
+                                     name=self.make_vm_name('vm1'))
+        testvm1.create_on_disk(verbose=False,
+                               source_template=test_template)
+        self.save_and_reload_db()
+        self.qc.unlock_db()
+        test_template = self.qc[test_template.qid]
+        testvm1 = self.qc[testvm1.qid]
+        test_template.start()
+        self.install_packages(test_template)
+        kver = self.get_kernel_version(test_template)
+        self.shutdown_and_wait(test_template)
+
+        self.qc.lock_db_for_writing()
+        self.qc.load()
+        test_template = self.qc[test_template.qid]
+        test_template.kernel = 'pvgrub2'
+        testvm1 = self.qc[testvm1.qid]
+        testvm1.kernel = 'pvgrub2'
+        self.save_and_reload_db()
+        self.qc.unlock_db()
+
+        # Check if TemplateBasedVM boots and has the right kernel
+        testvm1 = self.qc[testvm1.qid]
+        testvm1.start()
+        p = testvm1.run('uname -r', passio_popen=True)
+        (actual_kver, _) = p.communicate()
+        self.assertEquals(actual_kver.strip(), kver)
+
+        # And the same for the TemplateVM itself
+        test_template = self.qc[test_template.qid]
+        test_template.start()
+        p = test_template.run('uname -r', passio_popen=True)
+        (actual_kver, _) = p.communicate()
+        self.assertEquals(actual_kver.strip(), kver)
+
+def load_tests(loader, tests, pattern):
+    try:
+        qc = qubes.qubes.QubesVmCollection()
+        qc.lock_db_for_reading()
+        qc.load()
+        qc.unlock_db()
+        templates = [vm.name for vm in qc.values() if
+                     isinstance(vm, qubes.qubes.QubesTemplateVm)]
+    except OSError:
+        templates = []
+    for template in templates:
+        tests.addTests(loader.loadTestsFromTestCase(
+            type(
+                'TC_40_PVGrub_' + template,
+                (TC_40_PVGrub, qubes.tests.QubesTestCase),
+                {'template': template})))
+    return tests

+ 0 - 842
tests/vm_qrexec_gui.py

@@ -862,831 +862,6 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
         # some safety margin for FS metadata
         self.assertGreater(int(new_size.strip()), 5.8*1024**2)
 
-class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
-    def test_000_create_start(self):
-        testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     template=None,
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False,
-                               source_template=self.qc.get_default_template())
-        self.qc.save()
-        self.qc.unlock_db()
-        testvm1.start()
-        self.assertEquals(testvm1.get_power_state(), "Running")
-
-    def test_100_resize_root_img(self):
-        testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     template=None,
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False,
-                               source_template=self.qc.get_default_template())
-        self.qc.save()
-        self.qc.unlock_db()
-        with self.assertRaises(QubesException):
-            testvm1.resize_root_img(20*1024**3)
-        testvm1.resize_root_img(20*1024**3, allow_start=True)
-        timeout = 60
-        while testvm1.is_running():
-            time.sleep(1)
-            timeout -= 1
-            if timeout == 0:
-                self.fail("Timeout while waiting for VM shutdown")
-        self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3)
-        testvm1.start()
-        p = testvm1.run('df --output=size /|tail -n 1',
-                        passio_popen=True)
-        # new_size in 1k-blocks
-        (new_size, _) = p.communicate()
-        # some safety margin for FS metadata
-        self.assertGreater(int(new_size.strip()), 19*1024**2)
-
-
-class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
-    # TODO: test with some OS inside
-    # TODO: windows tools tests
-
-    def test_000_create_start(self):
-        testvm1 = self.qc.add_new_vm("QubesHVm",
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False)
-        self.qc.save()
-        self.qc.unlock_db()
-        testvm1.start()
-        self.assertEquals(testvm1.get_power_state(), "Running")
-
-    def test_010_create_start_template(self):
-        templatevm = self.qc.add_new_vm("QubesTemplateHVm",
-                                        name=self.make_vm_name('template'))
-        templatevm.create_on_disk(verbose=False)
-        self.qc.save()
-        self.qc.unlock_db()
-
-        templatevm.start()
-        self.assertEquals(templatevm.get_power_state(), "Running")
-
-    def test_020_create_start_template_vm(self):
-        templatevm = self.qc.add_new_vm("QubesTemplateHVm",
-                                        name=self.make_vm_name('template'))
-        templatevm.create_on_disk(verbose=False)
-        testvm2 = self.qc.add_new_vm("QubesHVm",
-                                     name=self.make_vm_name('vm2'),
-                                     template=templatevm)
-        testvm2.create_on_disk(verbose=False)
-        self.qc.save()
-        self.qc.unlock_db()
-
-        testvm2.start()
-        self.assertEquals(testvm2.get_power_state(), "Running")
-
-    def test_030_prevent_simultaneus_start(self):
-        templatevm = self.qc.add_new_vm("QubesTemplateHVm",
-                                        name=self.make_vm_name('template'))
-        templatevm.create_on_disk(verbose=False)
-        testvm2 = self.qc.add_new_vm("QubesHVm",
-                                     name=self.make_vm_name('vm2'),
-                                     template=templatevm)
-        testvm2.create_on_disk(verbose=False)
-        self.qc.save()
-        self.qc.unlock_db()
-
-        templatevm.start()
-        self.assertEquals(templatevm.get_power_state(), "Running")
-        self.assertRaises(QubesException, testvm2.start)
-        templatevm.force_shutdown()
-        testvm2.start()
-        self.assertEquals(testvm2.get_power_state(), "Running")
-        self.assertRaises(QubesException, templatevm.start)
-
-    def test_100_resize_root_img(self):
-        testvm1 = self.qc.add_new_vm("QubesHVm",
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False)
-        self.qc.save()
-        self.qc.unlock_db()
-        testvm1.resize_root_img(30*1024**3)
-        self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3)
-        testvm1.start()
-        self.assertEquals(testvm1.get_power_state(), "Running")
-        # TODO: launch some OS there and check the size
-
-    def test_200_start_invalid_drive(self):
-        """Regression test for #1619"""
-        testvm1 = self.qc.add_new_vm("QubesHVm",
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False)
-        testvm1.drive = 'hd:dom0:/invalid'
-        self.qc.save()
-        self.qc.unlock_db()
-        try:
-            testvm1.start()
-        except Exception as e:
-            self.assertIsInstance(e, QubesException)
-        else:
-            self.fail('No exception raised')
-
-    def test_201_start_invalid_drive_cdrom(self):
-        """Regression test for #1619"""
-        testvm1 = self.qc.add_new_vm("QubesHVm",
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False)
-        testvm1.drive = 'cdrom:dom0:/invalid'
-        self.qc.save()
-        self.qc.unlock_db()
-        try:
-            testvm1.start()
-        except Exception as e:
-            self.assertIsInstance(e, QubesException)
-        else:
-            self.fail('No exception raised')
-
-class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
-    def test_000_prepare_dvm(self):
-        self.qc.unlock_db()
-        retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
-                                   self.template],
-                                  stderr=open(os.devnull, 'w'))
-        self.assertEqual(retcode, 0)
-        self.qc.lock_db_for_writing()
-        self.qc.load()
-        self.assertIsNotNone(self.qc.get_vm_by_name(
-            self.template + "-dvm"))
-        # TODO: check mtime of snapshot file
-
-    def test_010_simple_dvm_run(self):
-        self.qc.unlock_db()
-        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
-                              'qubes.VMShell', 'dom0', 'DEFAULT'],
-                             stdin=subprocess.PIPE,
-                             stdout=subprocess.PIPE,
-                             stderr=open(os.devnull, 'w'))
-        (stdout, _) = p.communicate(input="echo test")
-        self.assertEqual(stdout, "test\n")
-        # TODO: check if DispVM is destroyed
-
-    @unittest.skipUnless(spawn.find_executable('xdotool'),
-                         "xdotool not installed")
-    def test_020_gui_app(self):
-        self.qc.unlock_db()
-        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
-                              'qubes.VMShell', 'dom0', 'DEFAULT'],
-                             stdin=subprocess.PIPE,
-                             stdout=subprocess.PIPE,
-                             stderr=open(os.devnull, 'w'))
-
-        # wait for DispVM startup:
-        p.stdin.write("echo test\n")
-        p.stdin.flush()
-        l = p.stdout.readline()
-        self.assertEqual(l, "test\n")
-
-        # potential race condition, but our tests are supposed to be
-        # running on dedicated machine, so should not be a problem
-        self.qc.lock_db_for_reading()
-        self.qc.load()
-        self.qc.unlock_db()
-
-        max_qid = 0
-        for vm in self.qc.values():
-            if not vm.is_disposablevm():
-                continue
-            if vm.qid > max_qid:
-                max_qid = vm.qid
-        dispvm = self.qc[max_qid]
-        self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
-        self.assertTrue(dispvm.is_running())
-        try:
-            window_title = 'user@%s' % (dispvm.template.name + "-dvm")
-            p.stdin.write("xterm -e "
-                "\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
-                format(window_title))
-            self.wait_for_window(window_title)
-
-            time.sleep(0.5)
-            self.enter_keys_in_window(window_title, ['Return'])
-            # Wait for window to close
-            self.wait_for_window(window_title, show=False)
-        finally:
-            p.stdin.close()
-
-        wait_count = 0
-        while dispvm.is_running():
-            wait_count += 1
-            if wait_count > 100:
-                self.fail("Timeout while waiting for DispVM destruction")
-            time.sleep(0.1)
-        wait_count = 0
-        while p.poll() is None:
-            wait_count += 1
-            if wait_count > 100:
-                self.fail("Timeout while waiting for qfile-daemon-dvm "
-                          "termination")
-            time.sleep(0.1)
-        self.assertEqual(p.returncode, 0)
-
-        self.qc.lock_db_for_reading()
-        self.qc.load()
-        self.qc.unlock_db()
-        self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
-                          "DispVM not removed from qubes.xml")
-
-    def _handle_editor(self, winid):
-        (window_title, _) = subprocess.Popen(
-            ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
-            communicate()
-        window_title = window_title.strip().\
-            replace('(', '\(').replace(')', '\)')
-        time.sleep(1)
-        if "gedit" in window_title:
-            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
-                'type', 'Test test 2\n'])
-            time.sleep(0.5)
-            subprocess.check_call(['xdotool',
-                                   'key', 'ctrl+s', 'ctrl+q'])
-        elif "LibreOffice" in window_title:
-            # wait for actual editor (we've got splash screen)
-            search = subprocess.Popen(['xdotool', 'search', '--sync',
-                '--onlyvisible', '--all', '--name', '--class', 'disp*|Writer'],
-                stdout=subprocess.PIPE,
-                                  stderr=open(os.path.devnull, 'w'))
-            retcode = search.wait()
-            if retcode == 0:
-                winid = search.stdout.read().strip()
-            time.sleep(0.5)
-            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
-                'type', 'Test test 2\n'])
-            time.sleep(0.5)
-            subprocess.check_call(['xdotool',
-                                   'key', '--delay', '100', 'ctrl+s',
-                'Return', 'ctrl+q'])
-        elif "emacs" in window_title:
-            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
-                                   'type', 'Test test 2\n'])
-            time.sleep(0.5)
-            subprocess.check_call(['xdotool',
-                                   'key', 'ctrl+x', 'ctrl+s'])
-            subprocess.check_call(['xdotool',
-                                   'key', 'ctrl+x', 'ctrl+c'])
-        elif "vim" in window_title or "user@" in window_title:
-            subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
-                                   'key', 'i', 'type', 'Test test 2\n'])
-            subprocess.check_call(
-                ['xdotool',
-                 'key', 'Escape', 'colon', 'w', 'q', 'Return'])
-        else:
-            self.fail("Unknown editor window: {}".format(window_title))
-
-    @unittest.skipUnless(spawn.find_executable('xdotool'),
-                         "xdotool not installed")
-    def test_030_edit_file(self):
-        testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     name=self.make_vm_name('vm1'),
-                                     template=self.qc.get_vm_by_name(
-                                         self.template))
-        testvm1.create_on_disk(verbose=False)
-        self.qc.save()
-
-        testvm1.start()
-        testvm1.run("echo test1 > /home/user/test.txt", wait=True)
-
-        self.qc.unlock_db()
-        p = testvm1.run("qvm-open-in-dvm /home/user/test.txt",
-                        passio_popen=True)
-
-        wait_count = 0
-        winid = None
-        while True:
-            search = subprocess.Popen(['xdotool', 'search',
-                                       '--onlyvisible', '--class', 'disp*'],
-                                      stdout=subprocess.PIPE,
-                                      stderr=open(os.path.devnull, 'w'))
-            retcode = search.wait()
-            if retcode == 0:
-                winid = search.stdout.read().strip()
-                break
-            wait_count += 1
-            if wait_count > 100:
-                self.fail("Timeout while waiting for editor window")
-            time.sleep(0.3)
-
-        time.sleep(0.5)
-        self._handle_editor(winid)
-        p.wait()
-        p = testvm1.run("cat /home/user/test.txt",
-                        passio_popen=True)
-        (test_txt_content, _) = p.communicate()
-        # Drop BOM if added by editor
-        if test_txt_content.startswith('\xef\xbb\xbf'):
-            test_txt_content = test_txt_content[3:]
-        self.assertEqual(test_txt_content, "Test test 2\ntest1\n")
-
-
-class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
-    @unittest.skipUnless(spawn.find_executable('xdotool'),
-                         "xdotool not installed")
-    def test_000_clipboard(self):
-        testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     name=self.make_vm_name('vm1'),
-                                     template=self.qc.get_default_template())
-        testvm1.create_on_disk(verbose=False)
-        testvm2 = self.qc.add_new_vm("QubesAppVm",
-                                     name=self.make_vm_name('vm2'),
-                                     template=self.qc.get_default_template())
-        testvm2.create_on_disk(verbose=False)
-        self.qc.save()
-        self.qc.unlock_db()
-
-        testvm1.start()
-        testvm2.start()
-
-        window_title = 'user@{}'.format(testvm1.name)
-        testvm1.run('zenity --text-info --editable --title={}'.format(
-            window_title))
-
-        self.wait_for_window(window_title)
-        time.sleep(0.5)
-        test_string = "test{}".format(testvm1.xid)
-
-        # Type and copy some text
-        subprocess.check_call(['xdotool', 'search', '--name', window_title,
-                               'windowactivate', '--sync',
-                               'type', '{}'.format(test_string)])
-        # second xdotool call because type --terminator do not work (SEGV)
-        # additionally do not use search here, so window stack will be empty
-        # and xdotool will use XTEST instead of generating events manually -
-        # this will be much better - at least because events will have
-        # correct timestamp (so gui-daemon would not drop the copy request)
-        subprocess.check_call(['xdotool',
-                               'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
-                               'Escape'])
-
-        clipboard_content = \
-            open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
-        self.assertEquals(clipboard_content, test_string,
-                          "Clipboard copy operation failed - content")
-        clipboard_source = \
-            open('/var/run/qubes/qubes-clipboard.bin.source',
-                 'r').read().strip()
-        self.assertEquals(clipboard_source, testvm1.name,
-                          "Clipboard copy operation failed - owner")
-
-        # Then paste it to the other window
-        window_title = 'user@{}'.format(testvm2.name)
-        p = testvm2.run('zenity --entry --title={} > test.txt'.format(
-                        window_title), passio_popen=True)
-        self.wait_for_window(window_title)
-
-        subprocess.check_call(['xdotool', 'key', '--delay', '100',
-                               'ctrl+shift+v', 'ctrl+v', 'Return'])
-        p.wait()
-
-        # And compare the result
-        (test_output, _) = testvm2.run('cat test.txt',
-                                       passio_popen=True).communicate()
-        self.assertEquals(test_string, test_output.strip())
-
-        clipboard_content = \
-            open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
-        self.assertEquals(clipboard_content, "",
-                          "Clipboard not wiped after paste - content")
-        clipboard_source = \
-            open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
-
-            ).strip()
-        self.assertEquals(clipboard_source, "",
-                          "Clipboard not wiped after paste - owner")
-
-
-@unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'),
-                     'grub-xen package not installed')
-class TC_40_PVGrub(qubes.tests.SystemTestsMixin):
-    def setUp(self):
-        super(TC_40_PVGrub, self).setUp()
-        supported = False
-        if self.template.startswith('fedora-'):
-            supported = True
-        elif self.template.startswith('debian-'):
-            supported = True
-        if not supported:
-            self.skipTest("Template {} not supported by this test".format(
-                self.template))
-
-    def install_packages(self, vm):
-        if self.template.startswith('fedora-'):
-            cmd_install1 = 'dnf clean expire-cache && ' \
-                'dnf install -y qubes-kernel-vm-support grub2-tools'
-            cmd_install2 = 'dnf install -y kernel && ' \
-                'KVER=$(rpm -q --qf %{VERSION}-%{RELEASE}.%{ARCH} kernel) && ' \
-                'dnf install --allowerasing  -y kernel-devel-$KVER && ' \
-                'dkms autoinstall -k $KVER'
-            cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg'
-        elif self.template.startswith('debian-'):
-            cmd_install1 = 'apt-get update && apt-get install -y ' \
-                           'qubes-kernel-vm-support grub2-common'
-            cmd_install2 = 'apt-get install -y linux-image-amd64'
-            cmd_update_grub = 'mkdir /boot/grub && update-grub2'
-        else:
-            assert False, "Unsupported template?!"
-
-        for cmd in [cmd_install1, cmd_install2, cmd_update_grub]:
-            p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True)
-            (stdout, stderr) = p.communicate()
-            self.assertEquals(p.returncode, 0,
-                              "Failed command: {}\nSTDOUT: {}\nSTDERR: {}"
-                              .format(cmd, stdout, stderr))
-
-    def get_kernel_version(self, vm):
-        if self.template.startswith('fedora-'):
-            cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \
-                                     'cut -d - -f 2-'
-        elif self.template.startswith('debian-'):
-            cmd_get_kernel_version = \
-                'dpkg-query --showformat=\'${Package}\\n\' --show ' \
-                '\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-'
-        else:
-            raise RuntimeError("Unsupported template?!")
-
-        p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True)
-        (kver, _) = p.communicate()
-        self.assertEquals(p.returncode, 0,
-                          "Failed command: {}".format(cmd_get_kernel_version))
-        return kver.strip()
-
-    def test_000_standalone_vm(self):
-        testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     template=None,
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False,
-                               source_template=self.qc.get_vm_by_name(
-                                   self.template))
-        self.save_and_reload_db()
-        self.qc.unlock_db()
-        testvm1 = self.qc[testvm1.qid]
-        testvm1.start()
-        self.install_packages(testvm1)
-        kver = self.get_kernel_version(testvm1)
-        self.shutdown_and_wait(testvm1)
-
-        self.qc.lock_db_for_writing()
-        self.qc.load()
-        testvm1 = self.qc[testvm1.qid]
-        testvm1.kernel = 'pvgrub2'
-        self.save_and_reload_db()
-        self.qc.unlock_db()
-        testvm1 = self.qc[testvm1.qid]
-        testvm1.start()
-        p = testvm1.run('uname -r', passio_popen=True)
-        (actual_kver, _) = p.communicate()
-        self.assertEquals(actual_kver.strip(), kver)
-
-    def test_010_template_based_vm(self):
-        test_template = self.qc.add_new_vm("QubesTemplateVm",
-                                           template=None,
-                                           name=self.make_vm_name('template'))
-        test_template.clone_attrs(self.qc.get_vm_by_name(self.template))
-        test_template.clone_disk_files(
-            src_vm=self.qc.get_vm_by_name(self.template),
-            verbose=False)
-
-        testvm1 = self.qc.add_new_vm("QubesAppVm",
-                                     template=test_template,
-                                     name=self.make_vm_name('vm1'))
-        testvm1.create_on_disk(verbose=False,
-                               source_template=test_template)
-        self.save_and_reload_db()
-        self.qc.unlock_db()
-        test_template = self.qc[test_template.qid]
-        testvm1 = self.qc[testvm1.qid]
-        test_template.start()
-        self.install_packages(test_template)
-        kver = self.get_kernel_version(test_template)
-        self.shutdown_and_wait(test_template)
-
-        self.qc.lock_db_for_writing()
-        self.qc.load()
-        test_template = self.qc[test_template.qid]
-        test_template.kernel = 'pvgrub2'
-        testvm1 = self.qc[testvm1.qid]
-        testvm1.kernel = 'pvgrub2'
-        self.save_and_reload_db()
-        self.qc.unlock_db()
-
-        # Check if TemplateBasedVM boots and has the right kernel
-        testvm1 = self.qc[testvm1.qid]
-        testvm1.start()
-        p = testvm1.run('uname -r', passio_popen=True)
-        (actual_kver, _) = p.communicate()
-        self.assertEquals(actual_kver.strip(), kver)
-
-        # And the same for the TemplateVM itself
-        test_template = self.qc[test_template.qid]
-        test_template.start()
-        p = test_template.run('uname -r', passio_popen=True)
-        (actual_kver, _) = p.communicate()
-        self.assertEquals(actual_kver.strip(), kver)
-
-@unittest.skipUnless(
-    spawn.find_executable('xprop') and
-    spawn.find_executable('xdotool') and
-    spawn.find_executable('wmctrl'),
-    "xprop or xdotool or wmctrl not installed")
-class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
-    @classmethod
-    def setUpClass(cls):
-        if cls.template == 'whonix-gw' or 'minimal' in cls.template:
-            raise unittest.SkipTest(
-                'Template {} not supported by this test'.format(cls.template))
-
-        if cls.template == 'whonix-ws':
-            # TODO remove when Whonix-based DispVMs will work (Whonix 13?)
-            raise unittest.SkipTest(
-                'Template {} not supported by this test'.format(cls.template))
-
-        qc = QubesVmCollection()
-
-        cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
-
-        qc.lock_db_for_writing()
-        qc.load()
-
-        cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
-                            prefix=qubes.tests.CLSVMPREFIX)
-
-        cls.source_vmname = cls.make_vm_name('source', True)
-        source_vm = qc.add_new_vm("QubesAppVm",
-                                  template=qc.get_vm_by_name(cls.template),
-                                  name=cls.source_vmname)
-        source_vm.create_on_disk(verbose=False)
-
-        cls.target_vmname = cls.make_vm_name('target', True)
-        target_vm = qc.add_new_vm("QubesAppVm",
-                                  template=qc.get_vm_by_name(cls.template),
-                                  name=cls.target_vmname)
-        target_vm.create_on_disk(verbose=False)
-
-        qc.save()
-        qc.unlock_db()
-        source_vm.start()
-        target_vm.start()
-
-        # make sure that DispVMs will be started of the same template
-        retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
-                                   cls.template],
-                                  stderr=open(os.devnull, 'w'))
-        assert retcode == 0, "Error preparing DispVM"
-
-    def setUp(self):
-        super(TC_50_MimeHandlers, self).setUp()
-        self.source_vm = self.qc.get_vm_by_name(self.source_vmname)
-        self.target_vm = self.qc.get_vm_by_name(self.target_vmname)
-
-    def get_window_class(self, winid, dispvm=False):
-        (vm_winid, _) = subprocess.Popen(
-            ['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
-            stdout=subprocess.PIPE
-        ).communicate()
-        vm_winid = vm_winid.split("#")[1].strip('\n" ')
-        if dispvm:
-            (vmname, _) = subprocess.Popen(
-                ['xprop', '-id', winid, '_QUBES_VMNAME'],
-                stdout=subprocess.PIPE
-            ).communicate()
-            vmname = vmname.split("=")[1].strip('\n" ')
-            window_class = None
-            while window_class is None:
-                # XXX to use self.qc.get_vm_by_name would require reloading
-                # qubes.xml, so use qvm-run instead
-                xprop = subprocess.Popen(
-                    ['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
-                        vm_winid)], stdout=subprocess.PIPE)
-                (window_class, _) = xprop.communicate()
-                if xprop.returncode != 0:
-                    self.skipTest("xprop failed, not installed?")
-                if 'not found' in window_class:
-                    # WM_CLASS not set yet, wait a little
-                    time.sleep(0.1)
-                    window_class = None
-        else:
-            window_class = None
-            while window_class is None:
-                xprop = self.target_vm.run(
-                    'xprop -id {} WM_CLASS'.format(vm_winid),
-                    passio_popen=True)
-                (window_class, _) = xprop.communicate()
-                if xprop.returncode != 0:
-                    self.skipTest("xprop failed, not installed?")
-                if 'not found' in window_class:
-                    # WM_CLASS not set yet, wait a little
-                    time.sleep(0.1)
-                    window_class = None
-        # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
-        try:
-            window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
-        except IndexError:
-            raise Exception(
-                "Unexpected output from xprop: '{}'".format(window_class))
-
-        return window_class
-
-    def open_file_and_check_viewer(self, filename, expected_app_titles,
-                                   expected_app_classes, dispvm=False):
-        self.qc.unlock_db()
-        if dispvm:
-            p = self.source_vm.run("qvm-open-in-dvm {}".format(filename),
-                                   passio_popen=True)
-            vmpattern = "disp*"
-        else:
-            self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
-                self.target_vmname)
-            self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
-                self.target_vmname)
-            p = self.source_vm.run("qvm-open-in-vm {} {}".format(
-                self.target_vmname, filename), passio_popen=True)
-            vmpattern = self.target_vmname
-        wait_count = 0
-        winid = None
-        window_title = None
-        while True:
-            search = subprocess.Popen(['xdotool', 'search',
-                                       '--onlyvisible', '--class', vmpattern],
-                                      stdout=subprocess.PIPE,
-                                      stderr=open(os.path.devnull, 'w'))
-            retcode = search.wait()
-            if retcode == 0:
-                winid = search.stdout.read().strip()
-                # get window title
-                (window_title, _) = subprocess.Popen(
-                    ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
-                    communicate()
-                window_title = window_title.strip()
-                # ignore LibreOffice splash screen and window with no title
-                # set yet
-                if window_title and not window_title.startswith("LibreOffice")\
-                        and not window_title == 'VMapp command':
-                    break
-            wait_count += 1
-            if wait_count > 100:
-                self.fail("Timeout while waiting for editor window")
-            time.sleep(0.3)
-
-        # get window class
-        window_class = self.get_window_class(winid, dispvm)
-        # close the window - we've got the window class, it is no longer needed
-        subprocess.check_call(['wmctrl', '-i', '-c', winid])
-        p.wait()
-        self.wait_for_window(window_title, show=False)
-
-        def check_matches(obj, patterns):
-            return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
-                        else pat in obj) for pat in patterns)
-
-        if not check_matches(window_title, expected_app_titles) and \
-                not check_matches(window_class, expected_app_classes):
-            self.fail("Opening file {} resulted in window '{} ({})', which is "
-                      "none of {!r} ({!r})".format(
-                          filename, window_title, window_class,
-                          expected_app_titles, expected_app_classes))
-
-    def prepare_txt(self, filename):
-        p = self.source_vm.run("cat > {}".format(filename), passio_popen=True)
-        p.stdin.write("This is test\n")
-        p.stdin.close()
-        retcode = p.wait()
-        assert retcode == 0, "Failed to write {} file".format(filename)
-
-    def prepare_pdf(self, filename):
-        self.prepare_txt("/tmp/source.txt")
-        cmd = "convert /tmp/source.txt {}".format(filename)
-        retcode = self.source_vm.run(cmd, wait=True)
-        assert retcode == 0, "Failed to run '{}'".format(cmd)
-
-    def prepare_doc(self, filename):
-        self.prepare_txt("/tmp/source.txt")
-        cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
-        retcode = self.source_vm.run(cmd, wait=True)
-        if retcode != 0:
-            self.skipTest("Failed to run '{}', not installed?".format(cmd))
-
-    def prepare_pptx(self, filename):
-        self.prepare_txt("/tmp/source.txt")
-        cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
-        retcode = self.source_vm.run(cmd, wait=True)
-        if retcode != 0:
-            self.skipTest("Failed to run '{}', not installed?".format(cmd))
-
-    def prepare_png(self, filename):
-        self.prepare_txt("/tmp/source.txt")
-        cmd = "convert /tmp/source.txt {}".format(filename)
-        retcode = self.source_vm.run(cmd, wait=True)
-        if retcode != 0:
-            self.skipTest("Failed to run '{}', not installed?".format(cmd))
-
-    def prepare_jpg(self, filename):
-        self.prepare_txt("/tmp/source.txt")
-        cmd = "convert /tmp/source.txt {}".format(filename)
-        retcode = self.source_vm.run(cmd, wait=True)
-        if retcode != 0:
-            self.skipTest("Failed to run '{}', not installed?".format(cmd))
-
-    def test_000_txt(self):
-        filename = "/home/user/test_file.txt"
-        self.prepare_txt(filename)
-        self.open_file_and_check_viewer(filename, ["vim", "user@"],
-                                        ["gedit", "emacs", "libreoffice"])
-
-    def test_001_pdf(self):
-        filename = "/home/user/test_file.pdf"
-        self.prepare_pdf(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["evince"])
-
-    def test_002_doc(self):
-        filename = "/home/user/test_file.doc"
-        self.prepare_doc(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["libreoffice", "abiword"])
-
-    def test_003_pptx(self):
-        filename = "/home/user/test_file.pptx"
-        self.prepare_pptx(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["libreoffice"])
-
-    def test_004_png(self):
-        filename = "/home/user/test_file.png"
-        self.prepare_png(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["shotwell", "eog", "display"])
-
-    def test_005_jpg(self):
-        filename = "/home/user/test_file.jpg"
-        self.prepare_jpg(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["shotwell", "eog", "display"])
-
-    def test_006_jpeg(self):
-        filename = "/home/user/test_file.jpeg"
-        self.prepare_jpg(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["shotwell", "eog", "display"])
-
-    def test_010_url(self):
-        self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
-                                        ["Firefox", "Iceweasel", "Navigator"])
-
-    def test_100_txt_dispvm(self):
-        filename = "/home/user/test_file.txt"
-        self.prepare_txt(filename)
-        self.open_file_and_check_viewer(filename, ["vim", "user@"],
-                                        ["gedit", "emacs", "libreoffice"],
-                                        dispvm=True)
-
-    def test_101_pdf_dispvm(self):
-        filename = "/home/user/test_file.pdf"
-        self.prepare_pdf(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["evince"],
-                                        dispvm=True)
-
-    def test_102_doc_dispvm(self):
-        filename = "/home/user/test_file.doc"
-        self.prepare_doc(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["libreoffice", "abiword"],
-                                        dispvm=True)
-
-    def test_103_pptx_dispvm(self):
-        filename = "/home/user/test_file.pptx"
-        self.prepare_pptx(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["libreoffice"],
-                                        dispvm=True)
-
-    def test_104_png_dispvm(self):
-        filename = "/home/user/test_file.png"
-        self.prepare_png(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["shotwell", "eog", "display"],
-                                        dispvm=True)
-
-    def test_105_jpg_dispvm(self):
-        filename = "/home/user/test_file.jpg"
-        self.prepare_jpg(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["shotwell", "eog", "display"],
-                                        dispvm=True)
-
-    def test_106_jpeg_dispvm(self):
-        filename = "/home/user/test_file.jpeg"
-        self.prepare_jpg(filename)
-        self.open_file_and_check_viewer(filename, [],
-                                        ["shotwell", "eog", "display"],
-                                        dispvm=True)
-
-    def test_110_url_dispvm(self):
-        self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
-                                        ["Firefox", "Iceweasel", "Navigator"],
-                                        dispvm=True)
-
 
 def load_tests(loader, tests, pattern):
     try:
@@ -1705,21 +880,4 @@ def load_tests(loader, tests, pattern):
                 (TC_00_AppVMMixin, qubes.tests.QubesTestCase),
                 {'template': template})))
 
-        tests.addTests(loader.loadTestsFromTestCase(
-            type(
-                'TC_20_DispVM_' + template,
-                (TC_20_DispVMMixin, qubes.tests.QubesTestCase),
-                {'template': template})))
-        tests.addTests(loader.loadTestsFromTestCase(
-            type(
-                'TC_40_PVGrub_' + template,
-                (TC_40_PVGrub, qubes.tests.QubesTestCase),
-                {'template': template})))
-
-        tests.addTests(loader.loadTestsFromTestCase(
-            type(
-                'TC_50_MimeHandlers_' + template,
-                (TC_50_MimeHandlers, qubes.tests.QubesTestCase),
-                {'template': template})))
-
     return tests