tests: DispVM tests
This commit is contained in:
		
							parent
							
								
									256e35a62c
								
							
						
					
					
						commit
						49ca49017e
					
				@ -293,3 +293,169 @@ class VmRunningTests(unittest.TestCase):
 | 
			
		||||
                         "/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True)
 | 
			
		||||
        self.assertEqual(retcode, 0, "file differs")
 | 
			
		||||
 | 
			
		||||
class DispVmTests(unittest.TestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        self.qc = QubesVmCollection()
 | 
			
		||||
        self.qc.lock_db_for_reading()
 | 
			
		||||
        self.qc.load()
 | 
			
		||||
        self.qc.unlock_db()
 | 
			
		||||
 | 
			
		||||
    def remove_vms(self, vms):
 | 
			
		||||
        self.qc.lock_db_for_writing()
 | 
			
		||||
        self.qc.load()
 | 
			
		||||
 | 
			
		||||
        for vm in vms:
 | 
			
		||||
            if isinstance(vm, str):
 | 
			
		||||
                vm = self.qc.get_vm_by_name(vm)
 | 
			
		||||
            else:
 | 
			
		||||
                vm = self.qc[vm.qid]
 | 
			
		||||
            if vm.is_running():
 | 
			
		||||
                try:
 | 
			
		||||
                    vm.force_shutdown()
 | 
			
		||||
                except:
 | 
			
		||||
                    pass
 | 
			
		||||
            try:
 | 
			
		||||
                vm.remove_from_disk()
 | 
			
		||||
            except OSError:
 | 
			
		||||
                pass
 | 
			
		||||
            self.qc.pop(vm.qid)
 | 
			
		||||
        self.qc.save()
 | 
			
		||||
        self.qc.unlock_db()
 | 
			
		||||
 | 
			
		||||
    def tearDown(self):
 | 
			
		||||
        vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
 | 
			
		||||
            VM_PREFIX)]
 | 
			
		||||
        self.remove_vms(vmlist)
 | 
			
		||||
 | 
			
		||||
    def test_000_prepare_dvm(self):
 | 
			
		||||
        retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
 | 
			
		||||
                                   '--default-template'],
 | 
			
		||||
                                  stderr=open(os.devnull, 'w'))
 | 
			
		||||
        self.assertEqual(retcode, 0)
 | 
			
		||||
        self.qc.lock_db_for_reading()
 | 
			
		||||
        self.qc.load()
 | 
			
		||||
        self.qc.unlock_db()
 | 
			
		||||
        self.assertIsNotNone(self.qc.get_vm_by_name(
 | 
			
		||||
            self.qc.get_default_template().name + "-dvm"))
 | 
			
		||||
        # TODO: check mtime of snapshot file
 | 
			
		||||
 | 
			
		||||
    def test_010_simple_dvm_run(self):
 | 
			
		||||
        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
 | 
			
		||||
                              'qubes.VMShell', 'dom0', 'DEFAULT'],
 | 
			
		||||
                             stdin=subprocess.PIPE,
 | 
			
		||||
                             stdout=subprocess.PIPE,
 | 
			
		||||
                             stderr=open(os.devnull, 'w'))
 | 
			
		||||
        (stdout, _) = p.communicate(input="echo test")
 | 
			
		||||
        self.assertEqual(stdout, "test\n")
 | 
			
		||||
        # TODO: check if DispVM is destroyed
 | 
			
		||||
 | 
			
		||||
    def test_020_gui_app(self):
 | 
			
		||||
        p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
 | 
			
		||||
                              'qubes.VMShell', 'dom0', 'DEFAULT'],
 | 
			
		||||
                             stdin=subprocess.PIPE,
 | 
			
		||||
                             stdout=subprocess.PIPE,
 | 
			
		||||
                             stderr=open(os.devnull, 'w'))
 | 
			
		||||
        # wait for DispVM startup:
 | 
			
		||||
        p.stdin.write("echo test\n")
 | 
			
		||||
        p.stdin.flush()
 | 
			
		||||
        l = p.stdout.readline()
 | 
			
		||||
        self.assertEqual(l, "test\n")
 | 
			
		||||
        # potential race condition, but our tests are supposed to be
 | 
			
		||||
        # running on dedicated machine, so should not be a problem
 | 
			
		||||
        self.qc.lock_db_for_reading()
 | 
			
		||||
        self.qc.load()
 | 
			
		||||
        self.qc.unlock_db()
 | 
			
		||||
        max_qid = 0
 | 
			
		||||
        for vm in self.qc.values():
 | 
			
		||||
            if not vm.is_disposablevm():
 | 
			
		||||
                continue
 | 
			
		||||
            if vm.qid > max_qid:
 | 
			
		||||
                max_qid = vm.qid
 | 
			
		||||
        dispvm = self.qc[max_qid]
 | 
			
		||||
        self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
 | 
			
		||||
        self.assertTrue(dispvm.is_running())
 | 
			
		||||
        p.stdin.write("gnome-terminal\n")
 | 
			
		||||
        wait_count = 0
 | 
			
		||||
        window_title = 'user@%s' % (dispvm.template.name + "-dvm")
 | 
			
		||||
        while subprocess.call(['xdotool', 'search', '--name', window_title],
 | 
			
		||||
                              stdout=open(os.path.devnull, 'w'),
 | 
			
		||||
                              stderr=subprocess.STDOUT) > 0:
 | 
			
		||||
            wait_count += 1
 | 
			
		||||
            if wait_count > 100:
 | 
			
		||||
                self.fail("Timeout while waiting for gnome-terminal window")
 | 
			
		||||
            time.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
        subprocess.check_call(['xdotool', 'search', '--name', window_title,
 | 
			
		||||
                              'windowactivate', 'type', 'exit\n'])
 | 
			
		||||
 | 
			
		||||
        wait_count = 0
 | 
			
		||||
        while subprocess.call(['xdotool', 'search', '--name', window_title],
 | 
			
		||||
                              stdout=open(os.path.devnull, 'w'),
 | 
			
		||||
                              stderr=subprocess.STDOUT) == 0:
 | 
			
		||||
            wait_count += 1
 | 
			
		||||
            if wait_count > 100:
 | 
			
		||||
                self.fail("Timeout while waiting for gnome-terminal "
 | 
			
		||||
                          "termination")
 | 
			
		||||
            time.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
        p.stdin.close()
 | 
			
		||||
 | 
			
		||||
        wait_count = 0
 | 
			
		||||
        while dispvm.is_running():
 | 
			
		||||
            wait_count += 1
 | 
			
		||||
            if wait_count > 100:
 | 
			
		||||
                self.fail("Timeout while waiting for DispVM destruction")
 | 
			
		||||
            time.sleep(0.1)
 | 
			
		||||
        wait_count = 0
 | 
			
		||||
        while p.poll() is None:
 | 
			
		||||
            wait_count += 1
 | 
			
		||||
            if wait_count > 100:
 | 
			
		||||
                self.fail("Timeout while waiting for qfile-daemon-dvm "
 | 
			
		||||
                          "termination")
 | 
			
		||||
            time.sleep(0.1)
 | 
			
		||||
        self.assertEqual(p.returncode, 0)
 | 
			
		||||
 | 
			
		||||
        self.qc.lock_db_for_reading()
 | 
			
		||||
        self.qc.load()
 | 
			
		||||
        self.qc.unlock_db()
 | 
			
		||||
        self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
 | 
			
		||||
                          "DispVM not removed from qubes.xml")
 | 
			
		||||
 | 
			
		||||
    def test_030_edit_file(self):
 | 
			
		||||
        self.qc.lock_db_for_writing()
 | 
			
		||||
        self.qc.load()
 | 
			
		||||
        self.testvm1 = self.qc.add_new_vm("QubesAppVm",
 | 
			
		||||
                                         name="%svm1" % VM_PREFIX,
 | 
			
		||||
                                         template=self.qc.get_default_template())
 | 
			
		||||
        self.testvm1.create_on_disk(verbose=False)
 | 
			
		||||
        self.qc.save()
 | 
			
		||||
        self.qc.unlock_db()
 | 
			
		||||
 | 
			
		||||
        self.testvm1.start()
 | 
			
		||||
        self.testvm1.run("echo test1 > /home/user/test.txt", wait=True)
 | 
			
		||||
        p = self.testvm1.run("qvm-open-in-dvm /home/user/test.txt",
 | 
			
		||||
                         passio_popen=True)
 | 
			
		||||
 | 
			
		||||
        wait_count = 0
 | 
			
		||||
        # TODO: ensure that gedit is default editor?
 | 
			
		||||
        window_title = '(/tmp/%s)' % self.testvm1.name
 | 
			
		||||
        while subprocess.call(['xdotool', 'search', '--name', window_title],
 | 
			
		||||
                              stdout=open(os.path.devnull, 'w'),
 | 
			
		||||
                              stderr=subprocess.STDOUT) > 0:
 | 
			
		||||
            wait_count += 1
 | 
			
		||||
            if wait_count > 100:
 | 
			
		||||
                self.fail("Timeout while waiting for editor window")
 | 
			
		||||
            time.sleep(0.3)
 | 
			
		||||
 | 
			
		||||
        time.sleep(0.5)
 | 
			
		||||
        subprocess.check_call(['xdotool', 'search', '--name', window_title,
 | 
			
		||||
                               'windowactivate', 'type', 'test line 2\n'])
 | 
			
		||||
        subprocess.check_call(['xdotool', 'search', '--name', window_title,
 | 
			
		||||
                               'key', 'ctrl+s', 'ctrl+q'])
 | 
			
		||||
        p.wait()
 | 
			
		||||
        p = self.testvm1.run("cat /home/user/test.txt",
 | 
			
		||||
                         passio_popen=True)
 | 
			
		||||
        (test_txt_content, _) = p.communicate()
 | 
			
		||||
        self.assertEqual(test_txt_content, "test line 2\ntest1\n")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user