From 226e257d2aff4c5bd10bf39978b51eca70fb4cde Mon Sep 17 00:00:00 2001 From: Wojtek Porczyk Date: Thu, 5 Feb 2015 15:46:40 +0100 Subject: [PATCH] tests: refactored --- tests/Makefile | 2 + tests/__init__.py | 379 +++++++++++++++++++++++++++++ tests/backup.py | 261 ++++---------------- tests/backupcompatibility.py | 457 +++++++++++++++++++++++------------ tests/basic.py | 147 ++++------- tests/network.py | 159 +++++------- tests/regressions.py | 58 +++++ tests/vm_qrexec_gui.py | 232 ++++++------------ 8 files changed, 971 insertions(+), 724 deletions(-) create mode 100644 tests/regressions.py diff --git a/tests/Makefile b/tests/Makefile index 08c3a8c6..b0829491 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -23,3 +23,5 @@ endif cp network.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH) cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp regressions.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp regressions.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..ecb8964b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,379 @@ +#!/usr/bin/python2 -O +# vim: fileencoding=utf-8 + +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2014-2015 +# Marek Marczykowski-Górecki +# Copyright (C) 2015 Wojtek Porczyk +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import multiprocessing +import os +import shutil +import unittest + +import libvirt + +import qubes.backup +import qubes.qubes + +VMPREFIX = 'test-' + +class _AssertNotRaisesContext(object): + """A context manager used to implement TestCase.assertNotRaises methods. + + Stolen from unittest and hacked. Regexp support stripped. + """ + + def __init__(self, expected, test_case, expected_regexp=None): + self.expected = expected + self.failureException = test_case.failureException + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + if exc_type is None: + return True + + try: + exc_name = self.expected.__name__ + except AttributeError: + exc_name = str(self.expected) + + if issubclass(exc_type, self.expected): + raise self.failureException( + "{0} raised".format(exc_name)) + else: + # pass through + return False + + self.exception = exc_value # store for later retrieval + + + +class SystemTestsMixin(object): + def setUp(self): + '''Set up the test. + + .. warning:: + This method instantiates QubesVmCollection acquires write lock for + it. You can use is as :py:attr:`qc`. Do not release the lock + yourself. + ''' + + super(SystemTestsMixin, self).setUp() + + self.qc = qubes.qubes.QubesVmCollection() + self.qc.lock_db_for_writing() + self.qc.load() + + self.conn = libvirt.open(qubes.qubes.defaults['libvirt_uri']) + + self.remove_test_vms() + + + def tearDown(self): + super(SystemTestsMixin, self).tearDown() + + self.remove_test_vms() + + self.qc.save() + self.qc.unlock_db() + del self.qc + + self.conn.close() + + + def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs): + """Fail if an exception of class excClass is raised + by callableObj when invoked with arguments args and keyword + arguments kwargs. If a different type of exception is + raised, it will not be caught, and the test case will be + deemed to have suffered an error, exactly as for an + unexpected exception. + + If called with callableObj omitted or None, will return a + context object used like this:: + + with self.assertRaises(SomeException): + do_something() + + The context manager keeps a reference to the exception as + the 'exception' attribute. This allows you to inspect the + exception after the assertion:: + + with self.assertRaises(SomeException) as cm: + do_something() + the_exception = cm.exception + self.assertEqual(the_exception.error_code, 3) + """ + context = _AssertNotRaisesContext(excClass, self) + if callableObj is None: + return context + with context: + callableObj(*args, **kwargs) + + + def make_vm_name(self, name): + return VMPREFIX + name + + + def _remove_vm_qubes(self, vm): + vmname = vm.name + + try: + # XXX .is_running() may throw libvirtError if undefined + if vm.is_running(): + vm.force_shutdown() + except: pass + + try: vm.remove_from_disk() + except: pass + + try: vm.libvirtDomain.undefine() + except (AttributeError, libvirt.libvirtError): pass + + self.qc.pop(vm.qid) + self.qc.save() + del vm + + # Now ensure it really went away. This may not have happened, + # for example if vm.libvirtDomain malfunctioned. + try: + dom = self.conn.lookupByName(vmname) + except: pass + else: + self._remove_vm_libvirt(dom) + + self._remove_vm_disk(vmname) + + + def _remove_vm_libvirt(self, dom): + try: + dom.destroy() + except libvirt.libvirtError: # not running + pass + dom.undefine() + + + def _remove_vm_disk(self, vmname): + for dirspec in ( + 'qubes_appvms_dir', + 'qubes_servicevms_dir', + 'qubes_templates_dir'): + dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'], + qubes.qubes.system_path[dirspec], vmname) + if os.path.exists(dirpath): + if os.path.isdir(dirpath): + shutil.rmtree(dirpath) + else: + os.unlink(dirpath) + + + def remove_vms(self, vms): + for vm in vms: self._remove_vm_qubes(vm) + + + def remove_test_vms(self): + '''Aggresively remove any domain that has name in testing namespace. + + .. warning:: + The test suite hereby claims any domain whose name starts with + :py:data:`VMPREFIX` as fair game. This is needed to enforce sane + test executing environment. If you have domains named ``test-*``, + don't run the tests. + ''' + + # first, remove them Qubes-way + for vm in self.qc.values(): + if vm.name.startswith(VMPREFIX): + self._remove_vm_qubes(vm) + + # now remove what was only in libvirt + for dom in self.conn.listAllDomains(): + if dom.name().startswith(VMPREFIX): + self._remove_vm_libvirt(dom) + + # finally remove anything that is left on disk + vmnames = set() + for dirspec in ( + 'qubes_appvms_dir', + 'qubes_servicevms_dir', + 'qubes_templates_dir'): + dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'], + qubes.qubes.system_path[dirspec]) + for name in os.listdir(dirpath): + if name.startswith(VMPREFIX): + vmnames.add(name) + for vmname in vmnames: + self._remove_vm_disk(vmname) + + + +class BackupTestsMixin(SystemTestsMixin): + def setUp(self): + super(BackupTestsMixin, self).setUp() + self.error_detected = multiprocessing.Queue() + self.verbose = False + + if self.verbose: + print >>sys.stderr, "-> Creating backupvm" + + self.backupvm = self.qc.add_new_vm("QubesAppVm", + name=self.make_vm_name('backupvm'), + template=self.qc.get_default_template()) + self.backupvm.create_on_disk(verbose=self.verbose) + + self.backupdir = os.path.join(os.environ["HOME"], "test-backup") + if os.path.exists(self.backupdir): + shutil.rmtree(self.backupdir) + os.mkdir(self.backupdir) + + + def tearDown(self): + super(BackupTestsMixin, self).tearDown() + shutil.rmtree(self.backupdir) + + + def print_progress(self, progress): + if self.verbose: + print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress) + + + def error_callback(self, message): + self.error_detected.put(message) + if self.verbose: + print >> sys.stderr, "ERROR: {0}".format(message) + + + def print_callback(self, msg): + if self.verbose: + print msg + + + def fill_image(self, path, size=None, sparse=False): + block_size = 4096 + + if self.verbose: + print >>sys.stderr, "-> Filling %s" % path + f = open(path, 'w+') + if size is None: + f.seek(0, 2) + size = f.tell() + f.seek(0) + + for block_num in xrange(size/block_size): + f.write('a' * block_size) + if sparse: + f.seek(block_size, 1) + + f.close() + + + # NOTE: this was create_basic_vms + def create_backup_vms(self): + template=self.qc.get_default_template() + + vms = [] + vmname = self.make_vm_name('test1') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testvm1 = self.qc.add_new_vm('QubesAppVm', + name=vmname, template=template) + testvm1.create_on_disk(verbose=self.verbose) + vms.append(testvm1) + self.fill_image(testvm1.private_img, 100*1024*1024) + + vmname = self.make_vm_name('testhvm1') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testvm2 = self.qc.add_new_vm('QubesHVm', name=vmname) + testvm2.create_on_disk(verbose=self.verbose) + self.fill_image(testvm2.root_img, 1024*1024*1024, True) + vms.append(testvm2) + + return vms + + + def make_backup(self, vms, prepare_kwargs=dict(), do_kwargs=dict(), + target=None): + # XXX: bakup_prepare and backup_do don't support host_collection + self.qc.unlock_db() + if target is None: + target = self.backupdir + try: + files_to_backup = \ + qubes.backup.backup_prepare(vms, + print_callback=self.print_callback, + **prepare_kwargs) + except QubesException as e: + self.fail("QubesException during backup_prepare: %s" % str(e)) + + try: + qubes.backup.backup_do(target, files_to_backup, "qubes", + progress_callback=self.print_progress, + **do_kwargs) + except QubesException as e: + self.fail("QubesException during backup_do: %s" % str(e)) + + self.qc.lock_db_for_writing() + self.qc.load() + + + def restore_backup(self, source=None, appvm=None): + if source is None: + backupfile = os.path.join(self.backupdir, + sorted(os.listdir(self.backupdir))[-1]) + else: + backupfile = source + + with self.assertNotRaises(qubes.qubes.QubesException): + backup_info = qubes.backup.backup_restore_prepare( + backupfile, "qubes", + host_collection=self.qc, + print_callback=self.print_callback, + appvm=appvm) + + if self.verbose: + qubes.backup.backup_restore_print_summary(backup_info) + + with self.assertNotRaises(qubes.qubes.QubesException): + qubes.backup.backup_restore_do( + backup_info, + host_collection=self.qc, + print_callback=self.print_callback if self.verbose else None, + error_callback=self.error_callback) + + errors = [] + while not self.error_detected.empty(): + errors.append(self.error_detected.get()) + self.assertTrue(len(errors) == 0, + "Error(s) detected during backup_restore_do: %s" % + '\n'.join(errors)) + if not appvm: + os.unlink(backupfile) + + + def create_sparse(self, path, size): + f = open(path, "w") + f.truncate(size) + f.close() + +# vim: ts=4 sw=4 et diff --git a/tests/backup.py b/tests/backup.py index c430e8d8..f143b335 100644 --- a/tests/backup.py +++ b/tests/backup.py @@ -1,217 +1,60 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- +# vim: fileencoding=utf-8 + # -# The Qubes OS Project, http://www.qubes-os.org +# The Qubes OS Project, https://www.qubes-os.org/ # -# Copyright (C) 2014 Marek Marczykowski-Górecki +# Copyright (C) 2014-2015 +# Marek Marczykowski-Górecki +# Copyright (C) 2015 Wojtek Porczyk # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # -# -from multiprocessing import Event, Queue + import os -import shutil import unittest import sys -from qubes.qubes import QubesVmCollection, QubesException -from qubes import backup -VM_PREFIX = "test-" +import qubes.tests -class BackupTests(unittest.TestCase): - def setUp(self): - self.error_detected = Queue() - self.verbose = False - self.qc = QubesVmCollection() - self.qc.lock_db_for_writing() - self.qc.load() - - if self.verbose: - print >>sys.stderr, "-> Creating backupvm" - self.backupvm = self.qc.add_new_vm("QubesAppVm", - name="%sbackupvm" % VM_PREFIX, - template=self.qc.get_default_template()) - self.backupvm.create_on_disk(verbose=self.verbose) - self.qc.save() - self.qc.unlock_db() - - self.backupdir = os.path.join(os.environ["HOME"], "test-backup") - os.mkdir(self.backupdir) - - - def tearDown(self): - if self.backupvm.is_running(): - self.backupvm.force_shutdown() - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) - shutil.rmtree(self.backupdir) - - def print_progress(self, progress): - if self.verbose: - print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress) - - def error_callback(self, message): - self.error_detected.put(message) - if self.verbose: - print >> sys.stderr, "ERROR: {0}".format(message) - - def print_callback(self, msg): - if self.verbose: - print msg - - def fill_image(self, path, size=None, sparse=False): - block_size = 4096 - - if self.verbose: - print >>sys.stderr, "-> Filling %s" % path - f = open(path, 'w+') - if size is None: - f.seek(0, 2) - size = f.tell() - f.seek(0) - - for block_num in xrange(size/block_size): - f.write('a' * block_size) - if sparse: - f.seek(block_size, 1) - - f.close() - - def create_basic_vms(self, leave_locked=False): - template=self.qc.get_default_template() - - vms = [] - self.qc.lock_db_for_writing() - self.qc.load() - vmname = "%stest1" % VM_PREFIX - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testvm1 = self.qc.add_new_vm("QubesAppVm", - name=vmname, - template=template) - testvm1.create_on_disk(verbose=self.verbose) - vms.append(testvm1) - self.fill_image(testvm1.private_img, 100*1024*1024) - - vmname = "%stesthvm1" % VM_PREFIX - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testvm2 = self.qc.add_new_vm("QubesHVm", - name=vmname) - testvm2.create_on_disk(verbose=self.verbose) - self.fill_image(testvm2.root_img, 1024*1024*1024, True) - vms.append(testvm2) - - if not leave_locked: - self.qc.save() - self.qc.unlock_db() - - return vms - - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - if self.verbose: - print >>sys.stderr, "-> Removing %s" % vm.name - vm.remove_from_disk() - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def make_backup(self, vms, prepare_kwargs=dict(), do_kwargs=dict(), - target=None): - if target is None: - target = self.backupdir - try: - files_to_backup = \ - backup.backup_prepare(vms, - print_callback=self.print_callback, - **prepare_kwargs) - except QubesException as e: - self.fail("QubesException during backup_prepare: %s" % str(e)) - - try: - backup.backup_do(target, files_to_backup, "qubes", - progress_callback=self.print_progress, - **do_kwargs) - except QubesException as e: - self.fail("QubesException during backup_do: %s" % str(e)) - - def restore_backup(self, source=None, appvm=None): - if source is None: - backupfile = os.path.join(self.backupdir, - sorted(os.listdir(self.backupdir))[-1]) - else: - backupfile = source - try: - backup_info = backup.backup_restore_prepare( - backupfile, "qubes", print_callback=self.print_callback, - appvm=appvm) - except QubesException as e: self.fail( - "QubesException during backup_restore_prepare: %s" % str(e)) - if self.verbose: - backup.backup_restore_print_summary(backup_info) - - try: - backup.backup_restore_do( - backup_info, - print_callback=self.print_callback if self.verbose else None, - error_callback=self.error_callback) - except QubesException as e: - self.fail("QubesException during backup_restore_do: %s" % str(e)) - errors = [] - while not self.error_detected.empty(): - errors.append(self.error_detected.get()) - self.assertTrue(len(errors) == 0, - "Error(s) detected during backup_restore_do: %s" % - '\n'.join(errors)) - if not appvm: - os.unlink(backupfile) - - def test_basic_backup(self): - vms = self.create_basic_vms() +class TC_00_Backup(qubes.tests.BackupTestsMixin, unittest.TestCase): + def test_000_basic_backup(self): + vms = self.create_backup_vms() self.make_backup(vms) self.remove_vms(vms) self.restore_backup() self.remove_vms(vms) - def test_compressed_backup(self): - vms = self.create_basic_vms() + def test_001_compressed_backup(self): + vms = self.create_backup_vms() self.make_backup(vms, do_kwargs={'compressed': True}) self.remove_vms(vms) self.restore_backup() self.remove_vms(vms) - def test_encrypted_backup(self): - vms = self.create_basic_vms() + def test_002_encrypted_backup(self): + vms = self.create_backup_vms() self.make_backup(vms, do_kwargs={'encrypted': True}) self.remove_vms(vms) self.restore_backup() self.remove_vms(vms) - def test_compressed_encrypted_backup(self): - vms = self.create_basic_vms() + def test_003_compressed_encrypted_backup(self): + vms = self.create_backup_vms() self.make_backup(vms, do_kwargs={ 'compressed': True, @@ -220,8 +63,30 @@ class BackupTests(unittest.TestCase): self.restore_backup() self.remove_vms(vms) - def test_send_to_vm(self): - vms = self.create_basic_vms() + + def test_004_sparse_multipart(self): + vms = [] + + vmname = self.make_vm_name('testhvm2') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + + hvmtemplate = self.qc.add_new_vm("QubesTemplateHVm", name=vmname) + hvmtemplate.create_on_disk(verbose=self.verbose) + self.fill_image(os.path.join(hvmtemplate.dir_path, '00file'), + 195*1024*1024-4096*3) + self.fill_image(hvmtemplate.private_img, 195*1024*1024-4096*3) + self.fill_image(hvmtemplate.root_img, 1024*1024*1024, sparse=True) + vms.append(hvmtemplate) + + self.make_backup(vms) + self.remove_vms(vms) + self.restore_backup() + self.remove_vms(vms) + + + def test_100_send_to_vm(self): + vms = self.create_backup_vms() self.backupvm.start() self.make_backup(vms, do_kwargs={ @@ -233,33 +98,3 @@ class BackupTests(unittest.TestCase): self.restore_backup(source='dd if=/var/tmp/backup-test', appvm=self.backupvm) self.remove_vms(vms) - - def test_sparse_multipart(self): - vms = [] - self.qc.lock_db_for_writing() - self.qc.load() - - vmname = "%stesthvm2" % VM_PREFIX - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - - hvmtemplate = self.qc.add_new_vm("QubesTemplateHVm", - name=vmname) - hvmtemplate.create_on_disk(verbose=self.verbose) - self.fill_image(os.path.join(hvmtemplate.dir_path, '00file'), - 195*1024*1024-4096*3) - self.fill_image(hvmtemplate.private_img, 195*1024*1024-4096*3) - self.fill_image(hvmtemplate.root_img, 1024*1024*1024, sparse=True) - vms.append(hvmtemplate) - - self.qc.save() - self.qc.unlock_db() - - self.make_backup(vms) - self.remove_vms(vms) - self.restore_backup() - self.remove_vms(vms) - - - - diff --git a/tests/backupcompatibility.py b/tests/backupcompatibility.py index 72f677bc..5e35ab4f 100644 --- a/tests/backupcompatibility.py +++ b/tests/backupcompatibility.py @@ -31,28 +31,157 @@ import sys from qubes.qubes import QubesVmCollection, QubesException from qubes import backup -VM_PREFIX = "test-" +import qubes.tests -QUBESXML_R2B2 = """ - - - - - - - - - - - - - - - +QUBESXML_R2B2 = ''' + + + + + + + + + + + + + + + -""" +''' -APPTEMPLATE_R2B2 = """ +APPTEMPLATE_R2B2 = ''' [Desktop Entry] Name=%VMNAME%: {name} GenericName=%VMNAME%: {name} @@ -100,116 +229,147 @@ Exec=qvm-run -q --tray -a %VMNAME% '{command} %u' Categories=Network;WebBrowser; X-Qubes-VmName=%VMNAME% Icon=%VMDIR%/icon.png -""" +''' -QUBESXML_R1 = """ - -""" - -class BackupCompatibilityTests(unittest.TestCase): - def setUp(self): - self.error_detected = Queue() - self.verbose = False - self.qc = QubesVmCollection() - self.reload_qc() - - self.backupdir = os.path.join(os.environ["HOME"], "test-backup") - os.mkdir(self.backupdir) - - def tearDown(self): - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) - shutil.rmtree(self.backupdir) - - def reload_qc(self): - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - - def print_progress(self, progress): - if self.verbose: - print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress) - - def error_callback(self, message): - self.error_detected.put(message) - if self.verbose: - print >> sys.stderr, "ERROR: {0}".format(message) - - def print_callback(self, msg): - if self.verbose: - print msg - - def fill_image(self, path, size=None, sparse=False): - block_size = 4096 - - if self.verbose: - print >>sys.stderr, "-> Filling %s" % path - f = open(path, 'w+') - if size is None: - f.seek(0, 2) - size = f.tell() - f.seek(0) - - for block_num in xrange(size/block_size): - f.write('a' * block_size) - if sparse: - f.seek(block_size, 1) - - f.close() - - def create_sparse(self, path, size): - f = open(path, "w") - f.truncate(size) - f.close() - - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - if self.verbose: - print >>sys.stderr, "-> Removing %s" % vm.name - vm.remove_from_disk() - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def restore_backup(self, source=None, appvm=None, options=None): - if source is None: - backupfile = os.path.join(self.backupdir, - sorted(os.listdir(self.backupdir))[-1]) - else: - backupfile = source - try: - backup_info = backup.backup_restore_prepare( - backupfile, "qubes", print_callback=self.print_callback, - appvm=appvm, options=options) - except QubesException as e: self.fail( - "QubesException during backup_restore_prepare: %s" % str(e)) - if self.verbose: - backup.backup_restore_print_summary(backup_info) - - try: - backup.backup_restore_do( - backup_info, - print_callback=self.print_callback if self.verbose else None, - error_callback=self.error_callback) - except QubesException as e: - self.fail("QubesException during backup_restore_do: %s" % str(e)) - errors = [] - while not self.error_detected.empty(): - errors.append(self.error_detected.get()) - self.assertTrue(len(errors) == 0, - "Error(s) detected during backup_restore_do: %s" % - '\n'.join(errors)) - if os.path.isfile(backupfile): - os.unlink(backupfile) +QUBESXML_R1 = ''' + + + + + + + + + + + + + + +''' +class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, unittest.TestCase): def create_whitelisted_appmenus(self, filename): f = open(filename, "w") f.write("gnome-terminal.desktop\n") @@ -346,7 +506,27 @@ class BackupCompatibilityTests(unittest.TestCase): appmenus_list) - def test_r2b2(self): + def test_100_r1(self): + self.create_v1_files(r2b2=False) + + f = open(self.fullpath("qubes.xml"), "w") + f.write(QUBESXML_R1) + f.close() + + self.restore_backup(self.backupdir, options={ + 'use-default-template': True, + }) + self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) + self.assertIsNotNone(self.qc.get_vm_by_name( + "test-custom-template-appvm")) + self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") + .template, + self.qc.get_vm_by_name("test-template-clone")) + + def test_200_r2b2(self): self.create_v1_files(r2b2=True) f = open(self.fullpath("qubes.xml"), "w") @@ -356,7 +536,6 @@ class BackupCompatibilityTests(unittest.TestCase): self.restore_backup(self.backupdir, options={ 'use-default-template': True, }) - self.reload_qc() self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) @@ -368,23 +547,3 @@ class BackupCompatibilityTests(unittest.TestCase): .template, self.qc.get_vm_by_name("test-template-clone")) - def test_r1(self): - self.create_v1_files(r2b2=False) - - f = open(self.fullpath("qubes.xml"), "w") - f.write(QUBESXML_R1) - f.close() - - self.restore_backup(self.backupdir, options={ - 'use-default-template': True, - }) - self.reload_qc() - self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) - self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) - self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) - self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) - self.assertIsNotNone(self.qc.get_vm_by_name( - "test-custom-template-appvm")) - self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") - .template, - self.qc.get_vm_by_name("test-template-clone")) diff --git a/tests/basic.py b/tests/basic.py index 855eec6b..e2484a69 100644 --- a/tests/basic.py +++ b/tests/basic.py @@ -1,24 +1,28 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- +# vim: fileencoding=utf-8 + # -# The Qubes OS Project, http://www.qubes-os.org +# The Qubes OS Project, https://www.qubes-os.org/ # -# Copyright (C) 2014 Marek Marczykowski-Górecki -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. +# Copyright (C) 2014-2015 +# Marek Marczykowski-Górecki +# Copyright (C) 2015 Wojtek Porczyk # +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# + import multiprocessing import os import shutil @@ -27,115 +31,44 @@ import unittest import time from qubes.qubes import QubesVmCollection, QubesException, system_path -VM_PREFIX = "test-" +import qubes.qubes +import qubes.tests -class BasicTests(unittest.TestCase): - def setUp(self): - self.qc = QubesVmCollection() - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() +class TC_00_Basic(qubes.tests.SystemTestsMixin, unittest.TestCase): + def test_000_create(self): + vmname = self.make_vm_name('appvm') + vm = self.qc.add_new_vm('QubesAppVm', + name=vmname, template=self.qc.get_default_template()) - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - try: - vm.remove_from_disk() - except OSError: - pass - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def tearDown(self): - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) - - def test_create(self): - self.qc.lock_db_for_writing() - self.qc.load() - - vmname = VM_PREFIX + "appvm" - vm = self.qc.add_new_vm("QubesAppVm", name=vmname, - template=self.qc.get_default_template()) - self.qc.save() - self.qc.unlock_db() self.assertIsNotNone(vm) self.assertEqual(vm.name, vmname) self.assertEqual(vm.template, self.qc.get_default_template()) vm.create_on_disk(verbose=False) - try: + + with self.assertNotRaises(qubes.qubes.QubesException): vm.verify_files() - except QubesException: - self.fail("verify_files() failed") - # Bug: #906 - def test_db_locking(self): - def create_vm(name): - qc = QubesVmCollection() - qc.lock_db_for_writing() - qc.load() - time.sleep(1) - vmname = VM_PREFIX + name - qc.add_new_vm("QubesAppVm", name=vmname, template=qc.get_default_template()) - qc.save() - qc.unlock_db() - t = multiprocessing.Process(target=create_vm, args=("test1",)) - t.start() - create_vm("test2") - t.join() - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - self.assertIsNotNone(self.qc.get_vm_by_name(VM_PREFIX + "test1")) - self.assertIsNotNone(self.qc.get_vm_by_name(VM_PREFIX + "test2")) -class VmPropTests(unittest.TestCase): +class TC_01_Properties(qubes.tests.SystemTestsMixin, unittest.TestCase): def setUp(self): - self.qc = QubesVmCollection() - self.qc.lock_db_for_writing() - self.qc.load() - self.vmname = VM_PREFIX + "appvm" - self.vm = self.qc.add_new_vm("QubesAppVm", name=self.vmname, - template=self.qc.get_default_template()) - self.qc.save() + super(TC_01_Properties, self).setUp() + self.vmname = self.make_vm_name('appvm') + self.vm = self.qc.add_new_vm('QubesAppVm', + name=self.vmname, template=self.qc.get_default_template()) self.vm.create_on_disk(verbose=False) - # WARNING: lock remains taken - def remove_vms(self, vms): - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - vm.remove_from_disk() - self.qc.pop(vm.qid) + def test_000_rename(self): + newname = self.make_vm_name('newname') - def tearDown(self): - # WARNING: lock still taken in setUp() - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) - self.qc.save() - self.qc.unlock_db() - - def test_rename(self): self.assertEqual(self.vm.name, self.vmname) - newname = VM_PREFIX + "newname" + #TODO: change to setting property when implemented self.vm.set_name(newname) self.assertEqual(self.vm.name, newname) self.assertEqual(self.vm.dir_path, - os.path.join(system_path['qubes_appvms_dir'], newname)) + os.path.join(system_path['qubes_appvms_dir'], newname)) self.assertEqual(self.vm.conf_file, - os.path.join(self.vm.dir_path, newname + '.conf')) + os.path.join(self.vm.dir_path, newname + '.conf')) self.assertTrue(os.path.exists( os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory"))) # FIXME: set whitelisted-appmenus.list first @@ -143,13 +76,15 @@ class VmPropTests(unittest.TestCase): os.path.join(self.vm.dir_path, "apps", newname + "-firefox.desktop"))) self.assertTrue(os.path.exists( os.path.join(os.getenv("HOME"), ".local/share/desktop-directories", - newname + "-vm.directory"))) + newname + "-vm.directory"))) self.assertTrue(os.path.exists( os.path.join(os.getenv("HOME"), ".local/share/applications", - newname + "-firefox.desktop"))) + newname + "-firefox.desktop"))) self.assertFalse(os.path.exists( os.path.join(os.getenv("HOME"), ".local/share/desktop-directories", - self.vmname + "-vm.directory"))) + self.vmname + "-vm.directory"))) self.assertFalse(os.path.exists( os.path.join(os.getenv("HOME"), ".local/share/applications", - self.vmname + "-firefox.desktop"))) + self.vmname + "-firefox.desktop"))) + +# vim: ts=4 sw=4 et diff --git a/tests/network.py b/tests/network.py index 17c55e20..1121fc20 100644 --- a/tests/network.py +++ b/tests/network.py @@ -1,25 +1,28 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- +# vim: fileencoding=utf-8 + # -# The Qubes OS Project, http://www.qubes-os.org +# The Qubes OS Project, https://www.qubes-os.org/ # -# Copyright (C) 2015 Marek Marczykowski-Górecki +# Copyright (C) 2015 +# Marek Marczykowski-Górecki +# Copyright (C) 2015 Wojtek Porczyk # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # + import multiprocessing import os import subprocess @@ -28,12 +31,16 @@ import time from qubes.qubes import QubesVmCollection, defaults +import qubes.tests -VM_PREFIX = "test-" -class VmNetworkingTests(unittest.TestCase): - ping_ip = "ping -W 1 -n -c 1 192.168.123.45" - ping_name = "ping -W 1 -c 1 test.example.com" +class VmNetworkingTests(qubes.tests.SystemTestsMixin, unittest.TestCase): + test_ip = '192.168.123.45' + test_name = 'test.example.com' + + ping_cmd = 'ping -W 1 -n -c 1 {target}' + ping_ip = ping_cmd.format(target=test_ip) + ping_name = ping_cmd.format(target=test_name) def run_cmd(self, vm, cmd, user="root"): p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True) @@ -42,22 +49,21 @@ class VmNetworkingTests(unittest.TestCase): return p.wait() def setUp(self): - self.qc = QubesVmCollection() - self.qc.lock_db_for_writing() - self.qc.load() + super(VmNetworkingTests, self).setUp() self.testnetvm = self.qc.add_new_vm("QubesNetVm", - name="%snetvm1" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('netvm1'), + template=self.qc.get_default_template()) self.testnetvm.create_on_disk(verbose=False) self.testvm1 = self.qc.add_new_vm("QubesAppVm", - name="%svm2" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('vm2'), + template=self.qc.get_default_template()) self.testvm1.create_on_disk(verbose=False) self.testvm1.netvm = self.testnetvm self.qc.save() - self.qc.unlock_db() + self.configure_netvm() + def configure_netvm(self): def run_netvm_cmd(cmd): if self.run_cmd(self.testnetvm, cmd) != 0: @@ -73,57 +79,29 @@ class VmNetworkingTests(unittest.TestCase): run_netvm_cmd("ip link add test0 type dummy") run_netvm_cmd("ip link set test0 up") - run_netvm_cmd("ip addr add 192.168.123.45/24 dev test0") - run_netvm_cmd("iptables -I INPUT -d 192.168.123.45 -j ACCEPT") - run_netvm_cmd("dnsmasq -a 192.168.123.45 -A /example.com/192.168.123.45 -i test0 -z") - run_netvm_cmd("echo nameserver 192.168.123.45 > /etc/resolv.conf") + run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip)) + run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT".format(self.test_ip)) + run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format( + ip=self.test_ip, name=self.test_name)) + run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format( + self.test_ip)) run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns") - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - if vm.is_running(): - try: - vm.force_shutdown() - except: - pass - try: - vm.remove_from_disk() - except OSError: - pass - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def tearDown(self): - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) def test_000_simple_networking(self): self.testvm1.start() - self.assertEqual(self.run_cmd(self.testvm1, - "ping -c 1 192.168.123.45"), 0) - self.assertEqual(self.run_cmd(self.testvm1, - "ping -c 1 test.example.com"), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) + def test_010_simple_proxyvm(self): - self.qc.lock_db_for_writing() - self.qc.load() self.proxy = self.qc.add_new_vm("QubesProxyVm", - name="%sproxy" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('proxy'), + template=self.qc.get_default_template()) self.proxy.create_on_disk(verbose=False) self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy self.qc.save() - self.qc.unlock_db() self.testvm1.start() self.assertTrue(self.proxy.is_running()) @@ -136,18 +114,16 @@ class VmNetworkingTests(unittest.TestCase): self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, "Ping by IP from AppVM failed") + def test_020_simple_proxyvm_nm(self): - self.qc.lock_db_for_writing() - self.qc.load() self.proxy = self.qc.add_new_vm("QubesProxyVm", - name="%sproxy" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('proxy'), + template=self.qc.get_default_template()) self.proxy.create_on_disk(verbose=False) self.proxy.netvm = self.testnetvm self.proxy.services['network-manager'] = True self.testvm1.netvm = self.proxy self.qc.save() - self.qc.unlock_db() self.testvm1.start() self.assertTrue(self.proxy.is_running()) @@ -155,42 +131,42 @@ class VmNetworkingTests(unittest.TestCase): "Ping by IP failed") self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, "Ping by name failed") + # reconnect to make sure that device was configured by NM self.assertEqual( self.run_cmd(self.proxy, "nmcli device disconnect eth0", - user="user"), + user="user"), 0, "Failed to disconnect eth0 using nmcli") self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, - "Network should be disabled, but apparently it isn't") + "Network should be disabled, but apparently it isn't") self.assertEqual( - self.run_cmd(self.proxy, "nmcli connection up \"VM uplink eth0\" " - "ifname eth0", - user="user"), + self.run_cmd(self.proxy, + 'nmcli connection up "VM uplink eth0" ifname eth0', + user="user"), 0, "Failed to connect eth0 using nmcli") self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0, "Failed to wait for NM connection") + # check for nm-applet presence self.assertEqual(subprocess.call([ 'xdotool', 'search', '--all', '--name', - '--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name) - ], stdout=open('/dev/null', 'w')), 0, "nm-applet window not found") + '--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name)], + stdout=open('/dev/null', 'w')), 0, "nm-applet window not found") self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, "Ping by IP failed (after NM reconnection") self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, "Ping by name failed (after NM reconnection)") + def test_030_firewallvm_firewall(self): - self.qc.lock_db_for_writing() - self.qc.load() self.proxy = self.qc.add_new_vm("QubesProxyVm", - name="%sproxy" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('proxy'), + template=self.qc.get_default_template()) self.proxy.create_on_disk(verbose=False) self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy self.qc.save() - self.qc.unlock_db() # block all for first @@ -250,7 +226,7 @@ class VmNetworkingTests(unittest.TestCase): 'allow': False, 'allowDns': True, 'allowIcmp': True, - 'rules': [{'address': '192.168.123.45', + 'rules': [{'address': self.ping_ip, 'netmask': 32, 'proto': 'tcp', 'portBegin': 1234 @@ -267,7 +243,7 @@ class VmNetworkingTests(unittest.TestCase): 'allow': True, 'allowDns': True, 'allowIcmp': True, - 'rules': [{'address': '192.168.123.45', + 'rules': [{'address': self.ping_ip, 'netmask': 32, 'proto': 'tcp', 'portBegin': 1234 @@ -281,39 +257,36 @@ class VmNetworkingTests(unittest.TestCase): def test_040_inter_vm(self): - self.qc.lock_db_for_writing() - self.qc.load() self.proxy = self.qc.add_new_vm("QubesProxyVm", - name="%sproxy" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('proxy'), + template=self.qc.get_default_template()) self.proxy.create_on_disk(verbose=False) self.proxy.netvm = self.testnetvm self.testvm1.netvm = self.proxy self.testvm2 = self.qc.add_new_vm("QubesAppVm", - name="%svm3" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('vm3'), + template=self.qc.get_default_template()) self.testvm2.create_on_disk(verbose=False) self.testvm2.netvm = self.proxy self.qc.save() - self.qc.unlock_db() self.testvm1.start() self.testvm2.start() self.assertNotEqual(self.run_cmd(self.testvm1, - "ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 0) + self.ping_cmd.format(target=self.testvm2.ip)), 0) self.testvm2.netvm = self.testnetvm self.assertNotEqual(self.run_cmd(self.testvm1, - "ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 0) + self.ping_cmd.format(target=self.testvm2.ip)), 0) self.assertNotEqual(self.run_cmd(self.testvm2, - "ping -W 1 -n -c 1 {}".format(self.testvm1.ip)), 0) + self.ping_cmd.format(target=self.testvm1.ip)), 0) self.testvm1.netvm = self.testnetvm self.assertNotEqual(self.run_cmd(self.testvm1, - "ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 0) + self.ping_cmd.format(target=self.testvm2.ip)), 0) self.assertNotEqual(self.run_cmd(self.testvm2, - "ping -W 1 -n -c 1 {}".format(self.testvm1.ip)), 0) + self.ping_cmd.format(target=self.testvm1.ip)), 0) diff --git a/tests/regressions.py b/tests/regressions.py new file mode 100644 index 00000000..3b8da430 --- /dev/null +++ b/tests/regressions.py @@ -0,0 +1,58 @@ +#!/usr/bin/python2 -O + +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2014-2015 +# Marek Marczykowski-Górecki +# Copyright (C) 2015 Wojtek Porczyk +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import multiprocessing +import time +import unittest + +import qubes.qubes +import qubes.tests + +class TC_00_Regressions(qubes.tests.SystemTestsMixin, unittest.TestCase): + # Bug: #906 + def test_000_bug_906_db_locking(self): + def create_vm(vmname): + qc = qubes.qubes.QubesVmCollection() + qc.lock_db_for_writing() + qc.load() + time.sleep(1) + qc.add_new_vm('QubesAppVm', + name=vmname, template=qc.get_default_template()) + qc.save() + qc.unlock_db() + + vmname1, vmname2 = map(self.make_vm_name, ('test1', 'test2')) + t = multiprocessing.Process(target=create_vm, args=(vmname1,)) + t.start() + create_vm(vmname2) + t.join() + + qc = qubes.qubes.QubesVmCollection() + qc.lock_db_for_reading() + qc.load() + qc.unlock_db() + + self.assertIsNotNone(qc.get_vm_by_name(vmname1)) + self.assertIsNotNone(qc.get_vm_by_name(vmname2)) + diff --git a/tests/vm_qrexec_gui.py b/tests/vm_qrexec_gui.py index 918a2de2..e1a2ed47 100644 --- a/tests/vm_qrexec_gui.py +++ b/tests/vm_qrexec_gui.py @@ -1,24 +1,28 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- +# vim: fileencoding=utf-8 + # -# The Qubes OS Project, http://www.qubes-os.org +# The Qubes OS Project, https://www.qubes-os.org/ # -# Copyright (C) 2014 Marek Marczykowski-Górecki +# Copyright (C) 2014-2015 +# Marek Marczykowski-Górecki +# Copyright (C) 2015 Wojtek Porczyk # -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # + import multiprocessing import os import subprocess @@ -27,53 +31,23 @@ import time from qubes.qubes import QubesVmCollection, defaults, QubesException - -VM_PREFIX = "test-" +import qubes.tests TEST_DATA = "0123456789" * 1024 -class VmRunningTests(unittest.TestCase): +class TC_00_AppVM(qubes.tests.SystemTestsMixin, unittest.TestCase): def setUp(self): - self.qc = QubesVmCollection() - self.qc.lock_db_for_writing() - self.qc.load() + super(TC_00_AppVM, self).setUp() self.testvm1 = self.qc.add_new_vm("QubesAppVm", - name="%svm1" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('vm1'), + template=self.qc.get_default_template()) self.testvm1.create_on_disk(verbose=False) self.testvm2 = self.qc.add_new_vm("QubesAppVm", - name="%svm2" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('vm2'), + template=self.qc.get_default_template()) self.testvm2.create_on_disk(verbose=False) self.qc.save() - self.qc.unlock_db() - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - if vm.is_running(): - try: - vm.force_shutdown() - except: - pass - try: - vm.remove_from_disk() - except OSError: - pass - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def tearDown(self): - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) def test_000_start_shutdown(self): self.testvm1.start() @@ -89,6 +63,7 @@ class VmRunningTests(unittest.TestCase): time.sleep(1) self.assertEquals(self.testvm1.get_power_state(), "Halted") + def test_010_run_gui_app(self): self.testvm1.start() self.assertEquals(self.testvm1.get_power_state(), "Running") @@ -116,6 +91,7 @@ class VmRunningTests(unittest.TestCase): "termination") time.sleep(0.1) + def test_050_qrexec_simple_eof(self): """Test for data and EOF transmission dom0->VM""" result = multiprocessing.Value('i', 0) @@ -143,6 +119,7 @@ class VmRunningTests(unittest.TestCase): elif result.value == 2: self.fail("Some data was printed to stderr") + @unittest.expectedFailure def test_051_qrexec_simple_eof_reverse(self): """Test for EOF transmission VM->dom0""" @@ -178,6 +155,7 @@ class VmRunningTests(unittest.TestCase): elif result.value == 3: self.fail("VM proceess didn't terminated on EOF") + def test_052_qrexec_vm_service_eof(self): """Test for EOF transmission VM(src)->VM(dst)""" result = multiprocessing.Value('i', 0) @@ -211,6 +189,7 @@ class VmRunningTests(unittest.TestCase): if result.value == 1: self.fail("Received data differs from what was expected") + @unittest.expectedFailure def test_053_qrexec_vm_service_eof_reverse(self): """Test for EOF transmission VM(src)<-VM(dst)""" @@ -245,6 +224,7 @@ class VmRunningTests(unittest.TestCase): if result.value == 1: self.fail("Received data differs from what was expected") + def test_060_qrexec_exit_code_dom0(self): self.testvm1.start() @@ -256,6 +236,7 @@ class VmRunningTests(unittest.TestCase): p.wait() self.assertEqual(3, p.returncode) + @unittest.expectedFailure def test_065_qrexec_exit_code_vm(self): self.testvm1.start() @@ -292,6 +273,7 @@ class VmRunningTests(unittest.TestCase): (stdout, stderr) = p.communicate() self.assertEqual(stdout, "3\n") + def test_100_qrexec_filecopy(self): self.testvm1.start() self.testvm2.start() @@ -308,6 +290,7 @@ class VmRunningTests(unittest.TestCase): "/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True) self.assertEqual(retcode, 0, "file differs") + def test_110_qrexec_filecopy_deny(self): self.testvm1.start() self.testvm2.start() @@ -343,99 +326,49 @@ class VmRunningTests(unittest.TestCase): "/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True) self.assertEqual(retcode, 0, "file differs") -class HVMTests(unittest.TestCase): + +class TC_10_HVM(qubes.tests.SystemTestsMixin, unittest.TestCase): # TODO: test with some OS inside # TODO: windows tools tests - def setUp(self): - self.qc = QubesVmCollection() - - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - if vm.is_running(): - try: - vm.force_shutdown() - except: - pass - try: - vm.remove_from_disk() - except OSError: - pass - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def tearDown(self): - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) - def test_000_create_start(self): - self.qc.lock_db_for_writing() - try: - self.qc.load() - self.testvm1 = self.qc.add_new_vm("QubesHVm", - name="%svm1" % VM_PREFIX) - self.testvm1.create_on_disk(verbose=False) - self.qc.save() - finally: - self.qc.unlock_db() + self.testvm1 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm1')) + self.testvm1.create_on_disk(verbose=False) + self.qc.save() self.testvm1.start() self.assertEquals(self.testvm1.get_power_state(), "Running") def test_010_create_start_template(self): - self.qc.lock_db_for_writing() - try: - self.qc.load() - self.templatevm = self.qc.add_new_vm("QubesTemplateHVm", - name="%stemplate" % VM_PREFIX) - self.templatevm.create_on_disk(verbose=False) - self.qc.save() - finally: - self.qc.unlock_db() + self.templatevm = self.qc.add_new_vm("QubesTemplateHVm", + name=self.make_vm_name('template')) + self.templatevm.create_on_disk(verbose=False) self.templatevm.start() self.assertEquals(self.templatevm.get_power_state(), "Running") def test_020_create_start_template_vm(self): - self.qc.lock_db_for_writing() - try: - self.qc.load() - self.templatevm = self.qc.add_new_vm("QubesTemplateHVm", - name="%stemplate" % VM_PREFIX) - self.templatevm.create_on_disk(verbose=False) - self.testvm2 = self.qc.add_new_vm("QubesHVm", - name="%svm2" % VM_PREFIX, - template=self.templatevm) - self.testvm2.create_on_disk(verbose=False) - self.qc.save() - finally: - self.qc.unlock_db() + self.templatevm = self.qc.add_new_vm("QubesTemplateHVm", + name=self.make_vm_name('template')) + self.templatevm.create_on_disk(verbose=False) + self.testvm2 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm2'), + template=self.templatevm) + self.testvm2.create_on_disk(verbose=False) + self.qc.save() self.testvm2.start() self.assertEquals(self.testvm2.get_power_state(), "Running") def test_030_prevent_simultaneus_start(self): - self.qc.lock_db_for_writing() - try: - self.qc.load() - self.templatevm = self.qc.add_new_vm("QubesTemplateHVm", - name="%stemplate" % VM_PREFIX) - self.templatevm.create_on_disk(verbose=False) - self.testvm2 = self.qc.add_new_vm("QubesHVm", - name="%svm2" % VM_PREFIX, - template=self.templatevm) - self.testvm2.create_on_disk(verbose=False) - self.qc.save() - finally: - self.qc.unlock_db() + self.templatevm = self.qc.add_new_vm("QubesTemplateHVm", + name=self.make_vm_name('template')) + self.templatevm.create_on_disk(verbose=False) + self.testvm2 = self.qc.add_new_vm("QubesHVm", + name=self.make_vm_name('vm2'), + template=self.templatevm) + self.testvm2.create_on_disk(verbose=False) + self.qc.save() self.templatevm.start() self.assertEquals(self.templatevm.get_power_state(), "Running") @@ -446,53 +379,21 @@ class HVMTests(unittest.TestCase): self.assertRaises(QubesException, self.templatevm.start) -class DispVmTests(unittest.TestCase): - def setUp(self): - self.qc = QubesVmCollection() - self.qc.lock_db_for_reading() - self.qc.load() - self.qc.unlock_db() - - def remove_vms(self, vms): - self.qc.lock_db_for_writing() - self.qc.load() - - for vm in vms: - if isinstance(vm, str): - vm = self.qc.get_vm_by_name(vm) - else: - vm = self.qc[vm.qid] - if vm.is_running(): - try: - vm.force_shutdown() - except: - pass - try: - vm.remove_from_disk() - except OSError: - pass - self.qc.pop(vm.qid) - self.qc.save() - self.qc.unlock_db() - - def tearDown(self): - vmlist = [vm for vm in self.qc.values() if vm.name.startswith( - VM_PREFIX)] - self.remove_vms(vmlist) - +class TC_20_DispVM(qubes.tests.SystemTestsMixin, unittest.TestCase): def test_000_prepare_dvm(self): + self.qc.unlock_db() retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm', '--default-template'], stderr=open(os.devnull, 'w')) self.assertEqual(retcode, 0) - self.qc.lock_db_for_reading() + self.qc.lock_db_for_writing() self.qc.load() - self.qc.unlock_db() self.assertIsNotNone(self.qc.get_vm_by_name( self.qc.get_default_template().name + "-dvm")) # TODO: check mtime of snapshot file def test_010_simple_dvm_run(self): + self.qc.unlock_db() p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', 'qubes.VMShell', 'dom0', 'DEFAULT'], stdin=subprocess.PIPE, @@ -502,22 +403,27 @@ class DispVmTests(unittest.TestCase): self.assertEqual(stdout, "test\n") # TODO: check if DispVM is destroyed + def test_020_gui_app(self): + self.qc.unlock_db() p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', 'qubes.VMShell', 'dom0', 'DEFAULT'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=open(os.devnull, 'w')) + # wait for DispVM startup: p.stdin.write("echo test\n") p.stdin.flush() l = p.stdout.readline() self.assertEqual(l, "test\n") + # potential race condition, but our tests are supposed to be # running on dedicated machine, so should not be a problem self.qc.lock_db_for_reading() self.qc.load() self.qc.unlock_db() + max_qid = 0 for vm in self.qc.values(): if not vm.is_disposablevm(): @@ -527,6 +433,7 @@ class DispVmTests(unittest.TestCase): dispvm = self.qc[max_qid] self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml") self.assertTrue(dispvm.is_running()) + p.stdin.write("gnome-terminal\n") wait_count = 0 window_title = 'user@%s' % (dispvm.template.name + "-dvm") @@ -576,17 +483,16 @@ class DispVmTests(unittest.TestCase): "DispVM not removed from qubes.xml") def test_030_edit_file(self): - self.qc.lock_db_for_writing() - self.qc.load() self.testvm1 = self.qc.add_new_vm("QubesAppVm", - name="%svm1" % VM_PREFIX, - template=self.qc.get_default_template()) + name=self.make_vm_name('vm1'), + template=self.qc.get_default_template()) self.testvm1.create_on_disk(verbose=False) self.qc.save() - self.qc.unlock_db() self.testvm1.start() self.testvm1.run("echo test1 > /home/user/test.txt", wait=True) + + self.qc.unlock_db() p = self.testvm1.run("qvm-open-in-dvm /home/user/test.txt", passio_popen=True)