diff --git a/tests/vm_qrexec_gui.py b/tests/vm_qrexec_gui.py index 3b1b776f..20ce71ca 100644 --- a/tests/vm_qrexec_gui.py +++ b/tests/vm_qrexec_gui.py @@ -293,3 +293,169 @@ class VmRunningTests(unittest.TestCase): "/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True) self.assertEqual(retcode, 0, "file differs") +class DispVmTests(unittest.TestCase): + def setUp(self): + self.qc = QubesVmCollection() + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + + def remove_vms(self, vms): + self.qc.lock_db_for_writing() + self.qc.load() + + for vm in vms: + if isinstance(vm, str): + vm = self.qc.get_vm_by_name(vm) + else: + vm = self.qc[vm.qid] + if vm.is_running(): + try: + vm.force_shutdown() + except: + pass + try: + vm.remove_from_disk() + except OSError: + pass + self.qc.pop(vm.qid) + self.qc.save() + self.qc.unlock_db() + + def tearDown(self): + vmlist = [vm for vm in self.qc.values() if vm.name.startswith( + VM_PREFIX)] + self.remove_vms(vmlist) + + def test_000_prepare_dvm(self): + retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', + '--default-template'], + stderr=open(os.devnull, 'w')) + self.assertEqual(retcode, 0) + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + self.assertIsNotNone(self.qc.get_vm_by_name( + self.qc.get_default_template().name + "-dvm")) + # TODO: check mtime of snapshot file + + def test_010_simple_dvm_run(self): + p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', + 'qubes.VMShell', 'dom0', 'DEFAULT'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + (stdout, _) = p.communicate(input="echo test") + self.assertEqual(stdout, "test\n") + # TODO: check if DispVM is destroyed + + def test_020_gui_app(self): + p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', + 'qubes.VMShell', 'dom0', 'DEFAULT'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=open(os.devnull, 'w')) + # wait for DispVM startup: + p.stdin.write("echo test\n") + p.stdin.flush() + l = p.stdout.readline() + self.assertEqual(l, "test\n") + # potential race condition, but our tests are supposed to be + # running on dedicated machine, so should not be a problem + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + max_qid = 0 + for vm in self.qc.values(): + if not vm.is_disposablevm(): + continue + if vm.qid > max_qid: + max_qid = vm.qid + dispvm = self.qc[max_qid] + self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml") + self.assertTrue(dispvm.is_running()) + p.stdin.write("gnome-terminal\n") + wait_count = 0 + window_title = 'user@%s' % (dispvm.template.name + "-dvm") + while subprocess.call(['xdotool', 'search', '--name', window_title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) > 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for gnome-terminal window") + time.sleep(0.1) + + time.sleep(0.5) + subprocess.check_call(['xdotool', 'search', '--name', window_title, + 'windowactivate', 'type', 'exit\n']) + + wait_count = 0 + while subprocess.call(['xdotool', 'search', '--name', window_title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) == 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for gnome-terminal " + "termination") + time.sleep(0.1) + + p.stdin.close() + + wait_count = 0 + while dispvm.is_running(): + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for DispVM destruction") + time.sleep(0.1) + wait_count = 0 + while p.poll() is None: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for qfile-daemon-dvm " + "termination") + time.sleep(0.1) + self.assertEqual(p.returncode, 0) + + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + self.assertIsNone(self.qc.get_vm_by_name(dispvm.name), + "DispVM not removed from qubes.xml") + + def test_030_edit_file(self): + self.qc.lock_db_for_writing() + self.qc.load() + self.testvm1 = self.qc.add_new_vm("QubesAppVm", + name="%svm1" % VM_PREFIX, + template=self.qc.get_default_template()) + self.testvm1.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + + self.testvm1.start() + self.testvm1.run("echo test1 > /home/user/test.txt", wait=True) + p = self.testvm1.run("qvm-open-in-dvm /home/user/test.txt", + passio_popen=True) + + wait_count = 0 + # TODO: ensure that gedit is default editor? + window_title = '(/tmp/%s)' % self.testvm1.name + while subprocess.call(['xdotool', 'search', '--name', window_title], + stdout=open(os.path.devnull, 'w'), + stderr=subprocess.STDOUT) > 0: + wait_count += 1 + if wait_count > 100: + self.fail("Timeout while waiting for editor window") + time.sleep(0.3) + + time.sleep(0.5) + subprocess.check_call(['xdotool', 'search', '--name', window_title, + 'windowactivate', 'type', 'test line 2\n']) + subprocess.check_call(['xdotool', 'search', '--name', window_title, + 'key', 'ctrl+s', 'ctrl+q']) + p.wait() + p = self.testvm1.run("cat /home/user/test.txt", + passio_popen=True) + (test_txt_content, _) = p.communicate() + self.assertEqual(test_txt_content, "test line 2\ntest1\n") +