Merge remote-tracking branch 'woju/master'

This commit is contained in:
Marek Marczykowski-Górecki 2015-02-18 15:11:05 +01:00
commit affb39f435
10 changed files with 1047 additions and 709 deletions

View File

@ -23,3 +23,7 @@ endif
cp network.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp regressions.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp regressions.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp run.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp run.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)

View File

@ -0,0 +1,483 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014-2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import multiprocessing
import logging
import os
import shutil
import subprocess
import unittest
import lxml.etree
import qubes.backup
import qubes.qubes
VMPREFIX = 'test-'
#: :py:obj:`True` if running in dom0, :py:obj:`False` otherwise
in_dom0 = False
#: :py:obj:`False` if outside of git repo,
#: path to root of the directory otherwise
in_git = False
try:
import libvirt
libvirt.openReadOnly(qubes.qubes.defaults['libvirt_uri']).close()
in_dom0 = True
except libvirt.libvirtError:
pass
try:
in_git = subprocess.check_output(
['git', 'rev-parse', '--show-toplevel']).strip()
except subprocess.CalledProcessError:
# git returned nonzero, we are outside git repo
pass
except OSError:
# command not found; let's assume we're outside
pass
def skipUnlessDom0(test_item):
'''Decorator that skips test outside dom0.
Some tests (especially integration tests) have to be run in more or less
working dom0. This is checked by connecting to libvirt.
''' # pylint: disable=invalid-name
return unittest.skipUnless(in_dom0, 'outside dom0')(test_item)
def skipUnlessGit(test_item):
'''Decorator that skips test outside git repo.
There are very few tests that an be run only in git. One example is
correctness of example code that won't get included in RPM.
''' # pylint: disable=invalid-name
return unittest.skipUnless(in_git, 'outside git tree')(test_item)
class _AssertNotRaisesContext(object):
"""A context manager used to implement TestCase.assertNotRaises methods.
Stolen from unittest and hacked. Regexp support stripped.
"""
def __init__(self, expected, test_case, expected_regexp=None):
self.expected = expected
self.failureException = test_case.failureException
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
return True
try:
exc_name = self.expected.__name__
except AttributeError:
exc_name = str(self.expected)
if issubclass(exc_type, self.expected):
raise self.failureException(
"{0} raised".format(exc_name))
else:
# pass through
return False
self.exception = exc_value # store for later retrieval
class QubesTestCase(unittest.TestCase):
'''Base class for Qubes unit tests.
'''
def __init__(self, *args, **kwargs):
super(QubesTestCase, self).__init__(*args, **kwargs)
self.log = logging.getLogger('{}.{}.{}'.format(
self.__class__.__module__,
self.__class__.__name__,
self._testMethodName))
def __str__(self):
return '{}/{}/{}'.format(
'.'.join(self.__class__.__module__.split('.')[2:]),
self.__class__.__name__,
self._testMethodName)
def assertNotRaises(self, excClass, callableObj=None, *args, **kwargs):
"""Fail if an exception of class excClass is raised
by callableObj when invoked with arguments args and keyword
arguments kwargs. If a different type of exception is
raised, it will not be caught, and the test case will be
deemed to have suffered an error, exactly as for an
unexpected exception.
If called with callableObj omitted or None, will return a
context object used like this::
with self.assertRaises(SomeException):
do_something()
The context manager keeps a reference to the exception as
the 'exception' attribute. This allows you to inspect the
exception after the assertion::
with self.assertRaises(SomeException) as cm:
do_something()
the_exception = cm.exception
self.assertEqual(the_exception.error_code, 3)
"""
context = _AssertNotRaisesContext(excClass, self)
if callableObj is None:
return context
with context:
callableObj(*args, **kwargs)
def assertXMLEqual(self, xml1, xml2):
'''Check for equality of two XML objects.
:param xml1: first element
:param xml2: second element
:type xml1: :py:class:`lxml.etree._Element`
:type xml2: :py:class:`lxml.etree._Element`
''' # pylint: disable=invalid-name
self.assertEqual(xml1.tag, xml2.tag)
self.assertEqual(xml1.text, xml2.text)
self.assertItemsEqual(xml1.keys(), xml2.keys())
for key in xml1.keys():
self.assertEqual(xml1.get(key), xml2.get(key))
class SystemTestsMixin(object):
def setUp(self):
'''Set up the test.
.. warning::
This method instantiates QubesVmCollection acquires write lock for
it. You can use is as :py:attr:`qc`. Do not release the lock
yourself.
'''
super(SystemTestsMixin, self).setUp()
self.qc = qubes.qubes.QubesVmCollection()
self.qc.lock_db_for_writing()
self.qc.load()
self.conn = libvirt.open(qubes.qubes.defaults['libvirt_uri'])
self.remove_test_vms()
def tearDown(self):
super(SystemTestsMixin, self).tearDown()
self.remove_test_vms()
self.qc.save()
self.qc.unlock_db()
del self.qc
self.conn.close()
def make_vm_name(self, name):
return VMPREFIX + name
def _remove_vm_qubes(self, vm):
vmname = vm.name
try:
# XXX .is_running() may throw libvirtError if undefined
if vm.is_running():
vm.force_shutdown()
except: pass
try: vm.remove_from_disk()
except: pass
try: vm.libvirt_domain.undefine()
except libvirt.libvirtError: pass
self.qc.pop(vm.qid)
self.qc.save()
del vm
# Now ensure it really went away. This may not have happened,
# for example if vm.libvirtDomain malfunctioned.
try:
dom = self.conn.lookupByName(vmname)
except: pass
else:
self._remove_vm_libvirt(dom)
self._remove_vm_disk(vmname)
def _remove_vm_libvirt(self, dom):
try:
dom.destroy()
except libvirt.libvirtError: # not running
pass
dom.undefine()
def _remove_vm_disk(self, vmname):
for dirspec in (
'qubes_appvms_dir',
'qubes_servicevms_dir',
'qubes_templates_dir'):
dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'],
qubes.qubes.system_path[dirspec], vmname)
if os.path.exists(dirpath):
if os.path.isdir(dirpath):
shutil.rmtree(dirpath)
else:
os.unlink(dirpath)
def remove_vms(self, vms):
for vm in vms: self._remove_vm_qubes(vm)
def remove_test_vms(self):
'''Aggresively remove any domain that has name in testing namespace.
.. warning::
The test suite hereby claims any domain whose name starts with
:py:data:`VMPREFIX` as fair game. This is needed to enforce sane
test executing environment. If you have domains named ``test-*``,
don't run the tests.
'''
# first, remove them Qubes-way
for vm in self.qc.values():
if vm.name.startswith(VMPREFIX):
self._remove_vm_qubes(vm)
# now remove what was only in libvirt
for dom in self.conn.listAllDomains():
if dom.name().startswith(VMPREFIX):
self._remove_vm_libvirt(dom)
# finally remove anything that is left on disk
vmnames = set()
for dirspec in (
'qubes_appvms_dir',
'qubes_servicevms_dir',
'qubes_templates_dir'):
dirpath = os.path.join(qubes.qubes.system_path['qubes_base_dir'],
qubes.qubes.system_path[dirspec])
for name in os.listdir(dirpath):
if name.startswith(VMPREFIX):
vmnames.add(name)
for vmname in vmnames:
self._remove_vm_disk(vmname)
class BackupTestsMixin(SystemTestsMixin):
def setUp(self):
super(BackupTestsMixin, self).setUp()
self.error_detected = multiprocessing.Queue()
self.verbose = False
if self.verbose:
print >>sys.stderr, "-> Creating backupvm"
self.backupvm = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('backupvm'),
template=self.qc.get_default_template())
self.backupvm.create_on_disk(verbose=self.verbose)
self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
if os.path.exists(self.backupdir):
shutil.rmtree(self.backupdir)
os.mkdir(self.backupdir)
def tearDown(self):
super(BackupTestsMixin, self).tearDown()
shutil.rmtree(self.backupdir)
def print_progress(self, progress):
if self.verbose:
print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress)
def error_callback(self, message):
self.error_detected.put(message)
if self.verbose:
print >> sys.stderr, "ERROR: {0}".format(message)
def print_callback(self, msg):
if self.verbose:
print msg
def fill_image(self, path, size=None, sparse=False):
block_size = 4096
if self.verbose:
print >>sys.stderr, "-> Filling %s" % path
f = open(path, 'w+')
if size is None:
f.seek(0, 2)
size = f.tell()
f.seek(0)
for block_num in xrange(size/block_size):
f.write('a' * block_size)
if sparse:
f.seek(block_size, 1)
f.close()
# NOTE: this was create_basic_vms
def create_backup_vms(self):
template=self.qc.get_default_template()
vms = []
vmname = self.make_vm_name('test1')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm1 = self.qc.add_new_vm('QubesAppVm',
name=vmname, template=template)
testvm1.create_on_disk(verbose=self.verbose)
vms.append(testvm1)
self.fill_image(testvm1.private_img, 100*1024*1024)
vmname = self.make_vm_name('testhvm1')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm2 = self.qc.add_new_vm('QubesHVm', name=vmname)
testvm2.create_on_disk(verbose=self.verbose)
self.fill_image(testvm2.root_img, 1024*1024*1024, True)
vms.append(testvm2)
return vms
def make_backup(self, vms, prepare_kwargs=dict(), do_kwargs=dict(),
target=None):
# XXX: bakup_prepare and backup_do don't support host_collection
self.qc.unlock_db()
if target is None:
target = self.backupdir
try:
files_to_backup = \
qubes.backup.backup_prepare(vms,
print_callback=self.print_callback,
**prepare_kwargs)
except QubesException as e:
self.fail("QubesException during backup_prepare: %s" % str(e))
try:
qubes.backup.backup_do(target, files_to_backup, "qubes",
progress_callback=self.print_progress,
**do_kwargs)
except QubesException as e:
self.fail("QubesException during backup_do: %s" % str(e))
self.qc.lock_db_for_writing()
self.qc.load()
def restore_backup(self, source=None, appvm=None, options=None):
if source is None:
backupfile = os.path.join(self.backupdir,
sorted(os.listdir(self.backupdir))[-1])
else:
backupfile = source
with self.assertNotRaises(qubes.qubes.QubesException):
backup_info = qubes.backup.backup_restore_prepare(
backupfile, "qubes",
host_collection=self.qc,
print_callback=self.print_callback,
appvm=appvm,
options=options or {})
if self.verbose:
qubes.backup.backup_restore_print_summary(backup_info)
with self.assertNotRaises(qubes.qubes.QubesException):
qubes.backup.backup_restore_do(
backup_info,
host_collection=self.qc,
print_callback=self.print_callback if self.verbose else None,
error_callback=self.error_callback)
# maybe someone forgot to call .save()
self.qc.load()
errors = []
while not self.error_detected.empty():
errors.append(self.error_detected.get())
self.assertTrue(len(errors) == 0,
"Error(s) detected during backup_restore_do: %s" %
'\n'.join(errors))
if not appvm:
os.unlink(backupfile)
def create_sparse(self, path, size):
f = open(path, "w")
f.truncate(size)
f.close()
def load_tests(loader, tests, pattern):
# discard any tests from this module, because it hosts base classes
tests = unittest.TestSuite()
for modname in (
'qubes.tests.basic',
'qubes.tests.network',
'qubes.tests.vm_qrexec_gui',
'qubes.tests.backup',
'qubes.tests.backupcompatibility',
'qubes.tests.regressions',
):
tests.addTests(loader.loadTestsFromName(modname))
return tests
# vim: ts=4 sw=4 et

View File

@ -1,217 +1,60 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
#
# The Qubes OS Project, http://www.qubes-os.org
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2014-2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
#
from multiprocessing import Event, Queue
import os
import shutil
import unittest
import sys
from qubes.qubes import QubesVmCollection, QubesException
from qubes import backup
VM_PREFIX = "test-"
import qubes.tests
class BackupTests(unittest.TestCase):
def setUp(self):
self.error_detected = Queue()
self.verbose = False
self.qc = QubesVmCollection()
self.qc.lock_db_for_writing()
self.qc.load()
if self.verbose:
print >>sys.stderr, "-> Creating backupvm"
self.backupvm = self.qc.add_new_vm("QubesAppVm",
name="%sbackupvm" % VM_PREFIX,
template=self.qc.get_default_template())
self.backupvm.create_on_disk(verbose=self.verbose)
self.qc.save()
self.qc.unlock_db()
self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
os.mkdir(self.backupdir)
def tearDown(self):
if self.backupvm.is_running():
self.backupvm.force_shutdown()
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
shutil.rmtree(self.backupdir)
def print_progress(self, progress):
if self.verbose:
print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress)
def error_callback(self, message):
self.error_detected.put(message)
if self.verbose:
print >> sys.stderr, "ERROR: {0}".format(message)
def print_callback(self, msg):
if self.verbose:
print msg
def fill_image(self, path, size=None, sparse=False):
block_size = 4096
if self.verbose:
print >>sys.stderr, "-> Filling %s" % path
f = open(path, 'w+')
if size is None:
f.seek(0, 2)
size = f.tell()
f.seek(0)
for block_num in xrange(size/block_size):
f.write('a' * block_size)
if sparse:
f.seek(block_size, 1)
f.close()
def create_basic_vms(self, leave_locked=False):
template=self.qc.get_default_template()
vms = []
self.qc.lock_db_for_writing()
self.qc.load()
vmname = "%stest1" % VM_PREFIX
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=vmname,
template=template)
testvm1.create_on_disk(verbose=self.verbose)
vms.append(testvm1)
self.fill_image(testvm1.private_img, 100*1024*1024)
vmname = "%stesthvm1" % VM_PREFIX
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm2 = self.qc.add_new_vm("QubesHVm",
name=vmname)
testvm2.create_on_disk(verbose=self.verbose)
self.fill_image(testvm2.root_img, 1024*1024*1024, True)
vms.append(testvm2)
if not leave_locked:
self.qc.save()
self.qc.unlock_db()
return vms
def remove_vms(self, vms):
self.qc.lock_db_for_writing()
self.qc.load()
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
if self.verbose:
print >>sys.stderr, "-> Removing %s" % vm.name
vm.remove_from_disk()
self.qc.pop(vm.qid)
self.qc.save()
self.qc.unlock_db()
def make_backup(self, vms, prepare_kwargs=dict(), do_kwargs=dict(),
target=None):
if target is None:
target = self.backupdir
try:
files_to_backup = \
backup.backup_prepare(vms,
print_callback=self.print_callback,
**prepare_kwargs)
except QubesException as e:
self.fail("QubesException during backup_prepare: %s" % str(e))
try:
backup.backup_do(target, files_to_backup, "qubes",
progress_callback=self.print_progress,
**do_kwargs)
except QubesException as e:
self.fail("QubesException during backup_do: %s" % str(e))
def restore_backup(self, source=None, appvm=None):
if source is None:
backupfile = os.path.join(self.backupdir,
sorted(os.listdir(self.backupdir))[-1])
else:
backupfile = source
try:
backup_info = backup.backup_restore_prepare(
backupfile, "qubes", print_callback=self.print_callback,
appvm=appvm)
except QubesException as e: self.fail(
"QubesException during backup_restore_prepare: %s" % str(e))
if self.verbose:
backup.backup_restore_print_summary(backup_info)
try:
backup.backup_restore_do(
backup_info,
print_callback=self.print_callback if self.verbose else None,
error_callback=self.error_callback)
except QubesException as e:
self.fail("QubesException during backup_restore_do: %s" % str(e))
errors = []
while not self.error_detected.empty():
errors.append(self.error_detected.get())
self.assertTrue(len(errors) == 0,
"Error(s) detected during backup_restore_do: %s" %
'\n'.join(errors))
if not appvm:
os.unlink(backupfile)
def test_basic_backup(self):
vms = self.create_basic_vms()
class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
def test_000_basic_backup(self):
vms = self.create_backup_vms()
self.make_backup(vms)
self.remove_vms(vms)
self.restore_backup()
self.remove_vms(vms)
def test_compressed_backup(self):
vms = self.create_basic_vms()
def test_001_compressed_backup(self):
vms = self.create_backup_vms()
self.make_backup(vms, do_kwargs={'compressed': True})
self.remove_vms(vms)
self.restore_backup()
self.remove_vms(vms)
def test_encrypted_backup(self):
vms = self.create_basic_vms()
def test_002_encrypted_backup(self):
vms = self.create_backup_vms()
self.make_backup(vms, do_kwargs={'encrypted': True})
self.remove_vms(vms)
self.restore_backup()
self.remove_vms(vms)
def test_compressed_encrypted_backup(self):
vms = self.create_basic_vms()
def test_003_compressed_encrypted_backup(self):
vms = self.create_backup_vms()
self.make_backup(vms,
do_kwargs={
'compressed': True,
@ -220,8 +63,30 @@ class BackupTests(unittest.TestCase):
self.restore_backup()
self.remove_vms(vms)
def test_send_to_vm(self):
vms = self.create_basic_vms()
def test_004_sparse_multipart(self):
vms = []
vmname = self.make_vm_name('testhvm2')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
hvmtemplate = self.qc.add_new_vm("QubesTemplateHVm", name=vmname)
hvmtemplate.create_on_disk(verbose=self.verbose)
self.fill_image(os.path.join(hvmtemplate.dir_path, '00file'),
195*1024*1024-4096*3)
self.fill_image(hvmtemplate.private_img, 195*1024*1024-4096*3)
self.fill_image(hvmtemplate.root_img, 1024*1024*1024, sparse=True)
vms.append(hvmtemplate)
self.make_backup(vms)
self.remove_vms(vms)
self.restore_backup()
self.remove_vms(vms)
def test_100_send_to_vm(self):
vms = self.create_backup_vms()
self.backupvm.start()
self.make_backup(vms,
do_kwargs={
@ -233,33 +98,3 @@ class BackupTests(unittest.TestCase):
self.restore_backup(source='dd if=/var/tmp/backup-test',
appvm=self.backupvm)
self.remove_vms(vms)
def test_sparse_multipart(self):
vms = []
self.qc.lock_db_for_writing()
self.qc.load()
vmname = "%stesthvm2" % VM_PREFIX
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
hvmtemplate = self.qc.add_new_vm("QubesTemplateHVm",
name=vmname)
hvmtemplate.create_on_disk(verbose=self.verbose)
self.fill_image(os.path.join(hvmtemplate.dir_path, '00file'),
195*1024*1024-4096*3)
self.fill_image(hvmtemplate.private_img, 195*1024*1024-4096*3)
self.fill_image(hvmtemplate.root_img, 1024*1024*1024, sparse=True)
vms.append(hvmtemplate)
self.qc.save()
self.qc.unlock_db()
self.make_backup(vms)
self.remove_vms(vms)
self.restore_backup()
self.remove_vms(vms)

File diff suppressed because one or more lines are too long

View File

@ -1,24 +1,28 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
#
# The Qubes OS Project, http://www.qubes-os.org
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2014-2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import multiprocessing
import os
import shutil
@ -27,115 +31,44 @@ import unittest
import time
from qubes.qubes import QubesVmCollection, QubesException, system_path
VM_PREFIX = "test-"
import qubes.qubes
import qubes.tests
class BasicTests(unittest.TestCase):
def setUp(self):
self.qc = QubesVmCollection()
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_create(self):
vmname = self.make_vm_name('appvm')
vm = self.qc.add_new_vm('QubesAppVm',
name=vmname, template=self.qc.get_default_template())
def remove_vms(self, vms):
self.qc.lock_db_for_writing()
self.qc.load()
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
try:
vm.remove_from_disk()
except OSError:
pass
self.qc.pop(vm.qid)
self.qc.save()
self.qc.unlock_db()
def tearDown(self):
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
def test_create(self):
self.qc.lock_db_for_writing()
self.qc.load()
vmname = VM_PREFIX + "appvm"
vm = self.qc.add_new_vm("QubesAppVm", name=vmname,
template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
self.assertIsNotNone(vm)
self.assertEqual(vm.name, vmname)
self.assertEqual(vm.template, self.qc.get_default_template())
vm.create_on_disk(verbose=False)
try:
with self.assertNotRaises(qubes.qubes.QubesException):
vm.verify_files()
except QubesException:
self.fail("verify_files() failed")
# Bug: #906
def test_db_locking(self):
def create_vm(name):
qc = QubesVmCollection()
qc.lock_db_for_writing()
qc.load()
time.sleep(1)
vmname = VM_PREFIX + name
qc.add_new_vm("QubesAppVm", name=vmname, template=qc.get_default_template())
qc.save()
qc.unlock_db()
t = multiprocessing.Process(target=create_vm, args=("test1",))
t.start()
create_vm("test2")
t.join()
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
self.assertIsNotNone(self.qc.get_vm_by_name(VM_PREFIX + "test1"))
self.assertIsNotNone(self.qc.get_vm_by_name(VM_PREFIX + "test2"))
class VmPropTests(unittest.TestCase):
class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self):
self.qc = QubesVmCollection()
self.qc.lock_db_for_writing()
self.qc.load()
self.vmname = VM_PREFIX + "appvm"
self.vm = self.qc.add_new_vm("QubesAppVm", name=self.vmname,
template=self.qc.get_default_template())
self.qc.save()
super(TC_01_Properties, self).setUp()
self.vmname = self.make_vm_name('appvm')
self.vm = self.qc.add_new_vm('QubesAppVm',
name=self.vmname, template=self.qc.get_default_template())
self.vm.create_on_disk(verbose=False)
# WARNING: lock remains taken
def remove_vms(self, vms):
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
vm.remove_from_disk()
self.qc.pop(vm.qid)
def test_000_rename(self):
newname = self.make_vm_name('newname')
def tearDown(self):
# WARNING: lock still taken in setUp()
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
self.qc.save()
self.qc.unlock_db()
def test_rename(self):
self.assertEqual(self.vm.name, self.vmname)
newname = VM_PREFIX + "newname"
#TODO: change to setting property when implemented
self.vm.set_name(newname)
self.assertEqual(self.vm.name, newname)
self.assertEqual(self.vm.dir_path,
os.path.join(system_path['qubes_appvms_dir'], newname))
os.path.join(system_path['qubes_appvms_dir'], newname))
self.assertEqual(self.vm.conf_file,
os.path.join(self.vm.dir_path, newname + '.conf'))
os.path.join(self.vm.dir_path, newname + '.conf'))
self.assertTrue(os.path.exists(
os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory")))
# FIXME: set whitelisted-appmenus.list first
@ -143,13 +76,15 @@ class VmPropTests(unittest.TestCase):
os.path.join(self.vm.dir_path, "apps", newname + "-firefox.desktop")))
self.assertTrue(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
newname + "-vm.directory")))
newname + "-vm.directory")))
self.assertTrue(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/applications",
newname + "-firefox.desktop")))
newname + "-firefox.desktop")))
self.assertFalse(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
self.vmname + "-vm.directory")))
self.vmname + "-vm.directory")))
self.assertFalse(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/applications",
self.vmname + "-firefox.desktop")))
self.vmname + "-firefox.desktop")))
# vim: ts=4 sw=4 et

View File

@ -27,12 +27,14 @@ import tempfile
import unittest
from qubes.qubes import QubesVmCollection
import qubes.qubes
VM_PREFIX = "test-"
@unittest.skipUnless(os.path.exists('/usr/bin/rpmsign') and
os.path.exists('/usr/bin/rpmbuild'),
'rpm-sign and/or rpm-buid not installed')
class TC_00_Dom0Upgrade(unittest.TestCase):
class TC_00_Dom0Upgrade(qubes.tests.QubesTestCase):
cleanup_paths = []
pkg_name = 'qubes-test-pkg'
dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test',

View File

@ -1,25 +1,28 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
#
# The Qubes OS Project, http://www.qubes-os.org
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2015 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import multiprocessing
import os
import subprocess
@ -28,12 +31,16 @@ import time
from qubes.qubes import QubesVmCollection, defaults
import qubes.tests
VM_PREFIX = "test-"
class VmNetworkingTests(unittest.TestCase):
ping_ip = "ping -W 1 -n -c 1 192.168.123.45"
ping_name = "ping -W 1 -c 1 test.example.com"
class VmNetworkingTests(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
test_ip = '192.168.123.45'
test_name = 'test.example.com'
ping_cmd = 'ping -W 1 -n -c 1 {target}'
ping_ip = ping_cmd.format(target=test_ip)
ping_name = ping_cmd.format(target=test_name)
def run_cmd(self, vm, cmd, user="root"):
p = vm.run(cmd, user=user, passio_popen=True, ignore_stderr=True)
@ -42,22 +49,21 @@ class VmNetworkingTests(unittest.TestCase):
return p.wait()
def setUp(self):
self.qc = QubesVmCollection()
self.qc.lock_db_for_writing()
self.qc.load()
super(VmNetworkingTests, self).setUp()
self.testnetvm = self.qc.add_new_vm("QubesNetVm",
name="%snetvm1" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('netvm1'),
template=self.qc.get_default_template())
self.testnetvm.create_on_disk(verbose=False)
self.testvm1 = self.qc.add_new_vm("QubesAppVm",
name="%svm2" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('vm2'),
template=self.qc.get_default_template())
self.testvm1.create_on_disk(verbose=False)
self.testvm1.netvm = self.testnetvm
self.qc.save()
self.qc.unlock_db()
self.configure_netvm()
def configure_netvm(self):
def run_netvm_cmd(cmd):
if self.run_cmd(self.testnetvm, cmd) != 0:
@ -73,57 +79,29 @@ class VmNetworkingTests(unittest.TestCase):
run_netvm_cmd("ip link add test0 type dummy")
run_netvm_cmd("ip link set test0 up")
run_netvm_cmd("ip addr add 192.168.123.45/24 dev test0")
run_netvm_cmd("iptables -I INPUT -d 192.168.123.45 -j ACCEPT")
run_netvm_cmd("dnsmasq -a 192.168.123.45 -A /example.com/192.168.123.45 -i test0 -z")
run_netvm_cmd("echo nameserver 192.168.123.45 > /etc/resolv.conf")
run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT".format(self.test_ip))
run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
ip=self.test_ip, name=self.test_name))
run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
self.test_ip))
run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
def remove_vms(self, vms):
self.qc.lock_db_for_writing()
self.qc.load()
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
if vm.is_running():
try:
vm.force_shutdown()
except:
pass
try:
vm.remove_from_disk()
except OSError:
pass
self.qc.pop(vm.qid)
self.qc.save()
self.qc.unlock_db()
def tearDown(self):
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
def test_000_simple_networking(self):
self.testvm1.start()
self.assertEqual(self.run_cmd(self.testvm1,
"ping -c 1 192.168.123.45"), 0)
self.assertEqual(self.run_cmd(self.testvm1,
"ping -c 1 test.example.com"), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
def test_010_simple_proxyvm(self):
self.qc.lock_db_for_writing()
self.qc.load()
self.proxy = self.qc.add_new_vm("QubesProxyVm",
name="%sproxy" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('proxy'),
template=self.qc.get_default_template())
self.proxy.create_on_disk(verbose=False)
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.qc.save()
self.qc.unlock_db()
self.testvm1.start()
self.assertTrue(self.proxy.is_running())
@ -136,18 +114,16 @@ class VmNetworkingTests(unittest.TestCase):
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by IP from AppVM failed")
def test_020_simple_proxyvm_nm(self):
self.qc.lock_db_for_writing()
self.qc.load()
self.proxy = self.qc.add_new_vm("QubesProxyVm",
name="%sproxy" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('proxy'),
template=self.qc.get_default_template())
self.proxy.create_on_disk(verbose=False)
self.proxy.netvm = self.testnetvm
self.proxy.services['network-manager'] = True
self.testvm1.netvm = self.proxy
self.qc.save()
self.qc.unlock_db()
self.testvm1.start()
self.assertTrue(self.proxy.is_running())
@ -155,42 +131,42 @@ class VmNetworkingTests(unittest.TestCase):
"Ping by IP failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed")
# reconnect to make sure that device was configured by NM
self.assertEqual(
self.run_cmd(self.proxy, "nmcli device disconnect eth0",
user="user"),
user="user"),
0, "Failed to disconnect eth0 using nmcli")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Network should be disabled, but apparently it isn't")
"Network should be disabled, but apparently it isn't")
self.assertEqual(
self.run_cmd(self.proxy, "nmcli connection up \"VM uplink eth0\" "
"ifname eth0",
user="user"),
self.run_cmd(self.proxy,
'nmcli connection up "VM uplink eth0" ifname eth0',
user="user"),
0, "Failed to connect eth0 using nmcli")
self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
"Failed to wait for NM connection")
# check for nm-applet presence
self.assertEqual(subprocess.call([
'xdotool', 'search', '--all', '--name',
'--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name)
], stdout=open('/dev/null', 'w')), 0, "nm-applet window not found")
'--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name)],
stdout=open('/dev/null', 'w')), 0, "nm-applet window not found")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed (after NM reconnection")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (after NM reconnection)")
def test_030_firewallvm_firewall(self):
self.qc.lock_db_for_writing()
self.qc.load()
self.proxy = self.qc.add_new_vm("QubesProxyVm",
name="%sproxy" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('proxy'),
template=self.qc.get_default_template())
self.proxy.create_on_disk(verbose=False)
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.qc.save()
self.qc.unlock_db()
# block all for first
@ -250,7 +226,7 @@ class VmNetworkingTests(unittest.TestCase):
'allow': False,
'allowDns': True,
'allowIcmp': True,
'rules': [{'address': '192.168.123.45',
'rules': [{'address': self.ping_ip,
'netmask': 32,
'proto': 'tcp',
'portBegin': 1234
@ -267,7 +243,7 @@ class VmNetworkingTests(unittest.TestCase):
'allow': True,
'allowDns': True,
'allowIcmp': True,
'rules': [{'address': '192.168.123.45',
'rules': [{'address': self.ping_ip,
'netmask': 32,
'proto': 'tcp',
'portBegin': 1234
@ -281,39 +257,36 @@ class VmNetworkingTests(unittest.TestCase):
def test_040_inter_vm(self):
self.qc.lock_db_for_writing()
self.qc.load()
self.proxy = self.qc.add_new_vm("QubesProxyVm",
name="%sproxy" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('proxy'),
template=self.qc.get_default_template())
self.proxy.create_on_disk(verbose=False)
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.testvm2 = self.qc.add_new_vm("QubesAppVm",
name="%svm3" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('vm3'),
template=self.qc.get_default_template())
self.testvm2.create_on_disk(verbose=False)
self.testvm2.netvm = self.proxy
self.qc.save()
self.qc.unlock_db()
self.testvm1.start()
self.testvm2.start()
self.assertNotEqual(self.run_cmd(self.testvm1,
"ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 0)
self.ping_cmd.format(target=self.testvm2.ip)), 0)
self.testvm2.netvm = self.testnetvm
self.assertNotEqual(self.run_cmd(self.testvm1,
"ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 0)
self.ping_cmd.format(target=self.testvm2.ip)), 0)
self.assertNotEqual(self.run_cmd(self.testvm2,
"ping -W 1 -n -c 1 {}".format(self.testvm1.ip)), 0)
self.ping_cmd.format(target=self.testvm1.ip)), 0)
self.testvm1.netvm = self.testnetvm
self.assertNotEqual(self.run_cmd(self.testvm1,
"ping -W 1 -n -c 1 {}".format(self.testvm2.ip)), 0)
self.ping_cmd.format(target=self.testvm2.ip)), 0)
self.assertNotEqual(self.run_cmd(self.testvm2,
"ping -W 1 -n -c 1 {}".format(self.testvm1.ip)), 0)
self.ping_cmd.format(target=self.testvm1.ip)), 0)

58
tests/regressions.py Normal file
View File

@ -0,0 +1,58 @@
#!/usr/bin/python2 -O
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014-2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import multiprocessing
import time
import unittest
import qubes.qubes
import qubes.tests
class TC_00_Regressions(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# Bug: #906
def test_000_bug_906_db_locking(self):
def create_vm(vmname):
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_writing()
qc.load()
time.sleep(1)
qc.add_new_vm('QubesAppVm',
name=vmname, template=qc.get_default_template())
qc.save()
qc.unlock_db()
vmname1, vmname2 = map(self.make_vm_name, ('test1', 'test2'))
t = multiprocessing.Process(target=create_vm, args=(vmname1,))
t.start()
create_vm(vmname2)
t.join()
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
self.assertIsNotNone(qc.get_vm_by_name(vmname1))
self.assertIsNotNone(qc.get_vm_by_name(vmname2))

246
tests/run.py Executable file
View File

@ -0,0 +1,246 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
# Copyright (C) 2014-2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import curses
import importlib
import logging
import logging.handlers
import os
import socket
import subprocess
import sys
import unittest
import unittest.signals
import qubes.tests
class CursesColor(dict):
def __init__(self):
super(CursesColor, self).__init__()
try:
curses.setupterm()
except curses.error:
return
# pylint: disable=bad-whitespace
self['black'] = curses.tparm(curses.tigetstr('setaf'), 0)
self['red'] = curses.tparm(curses.tigetstr('setaf'), 1)
self['green'] = curses.tparm(curses.tigetstr('setaf'), 2)
self['yellow'] = curses.tparm(curses.tigetstr('setaf'), 3)
self['blue'] = curses.tparm(curses.tigetstr('setaf'), 4)
self['magenta'] = curses.tparm(curses.tigetstr('setaf'), 5)
self['cyan'] = curses.tparm(curses.tigetstr('setaf'), 6)
self['white'] = curses.tparm(curses.tigetstr('setaf'), 7)
self['bold'] = curses.tigetstr('bold')
self['normal'] = curses.tigetstr('sgr0')
def __missing__(self, key):
# pylint: disable=unused-argument,no-self-use
return ''
class QubesTestResult(unittest.TestResult):
'''A test result class that can print colourful text results to a stream.
Used by TextTestRunner. This is a lightly rewritten unittest.TextTestResult.
'''
separator1 = unittest.TextTestResult.separator1
separator2 = unittest.TextTestResult.separator2
def __init__(self, stream, descriptions, verbosity):
super(QubesTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.showAll = verbosity > 1 # pylint: disable=invalid-name
self.dots = verbosity == 1
self.descriptions = descriptions
self.color = CursesColor()
self.hostname = socket.gethostname()
self.log = logging.getLogger('qubes.tests')
def _fmtexc(self, err):
if str(err[1]):
return '{color[bold]}{}:{color[normal]} {!s}'.format(
err[0].__name__, err[1], color=self.color)
else:
return '{color[bold]}{}{color[normal]}'.format(
err[0].__name__, color=self.color)
def getDescription(self, test): # pylint: disable=invalid-name
teststr = str(test).split('/')
for i in range(-2, 0):
try:
fullname = teststr[i].split('_', 2)
except IndexError:
continue
fullname[-1] = '{color[bold]}{}{color[normal]}'.format(
fullname[-1], color=self.color)
teststr[i] = '_'.join(fullname)
teststr = '/'.join(teststr)
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return '\n'.join((teststr, ' {}'.format(
doc_first_line, color=self.color)))
else:
return teststr
def startTest(self, test): # pylint: disable=invalid-name
super(QubesTestResult, self).startTest(test)
test.log.critical('started')
if self.showAll:
# if not qubes.tests.in_git:
self.stream.write('{}: '.format(self.hostname))
self.stream.write(self.getDescription(test))
self.stream.write(' ... ')
self.stream.flush()
def addSuccess(self, test): # pylint: disable=invalid-name
super(QubesTestResult, self).addSuccess(test)
test.log.warning('ok')
if self.showAll:
self.stream.writeln('{color[green]}ok{color[normal]}'.format(
color=self.color))
elif self.dots:
self.stream.write('.')
self.stream.flush()
def addError(self, test, err): # pylint: disable=invalid-name
super(QubesTestResult, self).addError(test, err)
test.log.critical('ERROR ({err[0].__name__}: {err[1]!r})'.format(err=err))
if self.showAll:
self.stream.writeln(
'{color[red]}{color[bold]}ERROR{color[normal]} ({})'.format(
self._fmtexc(err), color=self.color))
elif self.dots:
self.stream.write(
'{color[red]}{color[bold]}E{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addFailure(self, test, err): # pylint: disable=invalid-name
super(QubesTestResult, self).addFailure(test, err)
test.log.error('FAIL ({err[0]!s}: {err[1]!r})'.format(err=err))
if self.showAll:
self.stream.writeln('{color[red]}FAIL{color[normal]}'.format(
color=self.color))
elif self.dots:
self.stream.write('{color[red]}F{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addSkip(self, test, reason): # pylint: disable=invalid-name
super(QubesTestResult, self).addSkip(test, reason)
test.log.warning('skipped ({})'.format(reason))
if self.showAll:
self.stream.writeln(
'{color[cyan]}skipped{color[normal]} ({})'.format(
reason, color=self.color))
elif self.dots:
self.stream.write('{color[cyan]}s{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addExpectedFailure(self, test, err): # pylint: disable=invalid-name
super(QubesTestResult, self).addExpectedFailure(test, err)
test.log.warning('expected failure')
if self.showAll:
self.stream.writeln(
'{color[yellow]}expected failure{color[normal]}'.format(
color=self.color))
elif self.dots:
self.stream.write('{color[yellow]}x{color[normal]}'.format(
color=self.color))
self.stream.flush()
def addUnexpectedSuccess(self, test): # pylint: disable=invalid-name
super(QubesTestResult, self).addUnexpectedSuccess(test)
test.log.error('unexpected success')
if self.showAll:
self.stream.writeln(
'{color[yellow]}{color[bold]}unexpected success'
'{color[normal]}'.format(color=self.color))
elif self.dots:
self.stream.write(
'{color[yellow]}{color[bold]}u{color[normal]}'.format(
color=self.color))
self.stream.flush()
def printErrors(self): # pylint: disable=invalid-name
if self.dots or self.showAll:
self.stream.writeln()
self.printErrorList(
'{color[red]}{color[bold]}ERROR{color[normal]}'.format(
color=self.color),
self.errors)
self.printErrorList(
'{color[red]}FAIL{color[normal]}'.format(color=self.color),
self.failures)
def printErrorList(self, flavour, errors): # pylint: disable=invalid-name
for test, err in errors:
self.stream.writeln(self.separator1)
self.stream.writeln('%s: %s' % (flavour, self.getDescription(test)))
self.stream.writeln(self.separator2)
self.stream.writeln('%s' % err)
def main():
ha_file = logging.FileHandler(
os.path.join(os.environ['HOME'], 'qubes-tests.log'))
ha_file.setFormatter(
logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(message)s'))
logging.root.addHandler(ha_file)
ha_syslog = logging.handlers.SysLogHandler('/dev/log')
ha_syslog.setFormatter(
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
try:
subprocess.check_call(('sudo', 'chmod', '666', '/dev/kmsg'))
except subprocess.CalledProcessError:
pass
else:
ha_kmsg = logging.FileHandler('/dev/kmsg', 'w')
ha_kmsg.setFormatter(
logging.Formatter('%(name)s[%(process)d]: %(message)s'))
ha_kmsg.setLevel(logging.CRITICAL)
logging.root.addHandler(ha_kmsg)
suite = unittest.TestSuite()
loader = unittest.TestLoader()
suite.addTests(loader.loadTestsFromName('qubes.tests'))
runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2)
unittest.signals.installHandler()
runner.resultclass = QubesTestResult
return runner.run(suite).wasSuccessful()
if __name__ == '__main__':
sys.exit(not main())

View File

@ -1,24 +1,28 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
#
# The Qubes OS Project, http://www.qubes-os.org
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2014 Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2014-2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import multiprocessing
import os
import subprocess
@ -27,53 +31,23 @@ import time
from qubes.qubes import QubesVmCollection, defaults, QubesException
VM_PREFIX = "test-"
import qubes.tests
TEST_DATA = "0123456789" * 1024
class VmRunningTests(unittest.TestCase):
class TC_00_AppVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self):
self.qc = QubesVmCollection()
self.qc.lock_db_for_writing()
self.qc.load()
super(TC_00_AppVM, self).setUp()
self.testvm1 = self.qc.add_new_vm("QubesAppVm",
name="%svm1" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('vm1'),
template=self.qc.get_default_template())
self.testvm1.create_on_disk(verbose=False)
self.testvm2 = self.qc.add_new_vm("QubesAppVm",
name="%svm2" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('vm2'),
template=self.qc.get_default_template())
self.testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
def remove_vms(self, vms):
self.qc.lock_db_for_writing()
self.qc.load()
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
if vm.is_running():
try:
vm.force_shutdown()
except:
pass
try:
vm.remove_from_disk()
except OSError:
pass
self.qc.pop(vm.qid)
self.qc.save()
self.qc.unlock_db()
def tearDown(self):
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
def test_000_start_shutdown(self):
self.testvm1.start()
@ -89,6 +63,7 @@ class VmRunningTests(unittest.TestCase):
time.sleep(1)
self.assertEquals(self.testvm1.get_power_state(), "Halted")
def test_010_run_gui_app(self):
self.testvm1.start()
self.assertEquals(self.testvm1.get_power_state(), "Running")
@ -116,6 +91,7 @@ class VmRunningTests(unittest.TestCase):
"termination")
time.sleep(0.1)
def test_050_qrexec_simple_eof(self):
"""Test for data and EOF transmission dom0->VM"""
result = multiprocessing.Value('i', 0)
@ -143,6 +119,7 @@ class VmRunningTests(unittest.TestCase):
elif result.value == 2:
self.fail("Some data was printed to stderr")
@unittest.expectedFailure
def test_051_qrexec_simple_eof_reverse(self):
"""Test for EOF transmission VM->dom0"""
@ -178,6 +155,7 @@ class VmRunningTests(unittest.TestCase):
elif result.value == 3:
self.fail("VM proceess didn't terminated on EOF")
def test_052_qrexec_vm_service_eof(self):
"""Test for EOF transmission VM(src)->VM(dst)"""
result = multiprocessing.Value('i', 0)
@ -211,6 +189,7 @@ class VmRunningTests(unittest.TestCase):
if result.value == 1:
self.fail("Received data differs from what was expected")
@unittest.expectedFailure
def test_053_qrexec_vm_service_eof_reverse(self):
"""Test for EOF transmission VM(src)<-VM(dst)"""
@ -245,6 +224,7 @@ class VmRunningTests(unittest.TestCase):
if result.value == 1:
self.fail("Received data differs from what was expected")
def test_060_qrexec_exit_code_dom0(self):
self.testvm1.start()
@ -256,6 +236,7 @@ class VmRunningTests(unittest.TestCase):
p.wait()
self.assertEqual(3, p.returncode)
@unittest.expectedFailure
def test_065_qrexec_exit_code_vm(self):
self.testvm1.start()
@ -292,6 +273,7 @@ class VmRunningTests(unittest.TestCase):
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "3\n")
def test_100_qrexec_filecopy(self):
self.testvm1.start()
self.testvm2.start()
@ -308,6 +290,7 @@ class VmRunningTests(unittest.TestCase):
"/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True)
self.assertEqual(retcode, 0, "file differs")
def test_110_qrexec_filecopy_deny(self):
self.testvm1.start()
self.testvm2.start()
@ -343,99 +326,49 @@ class VmRunningTests(unittest.TestCase):
"/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True)
self.assertEqual(retcode, 0, "file differs")
class HVMTests(unittest.TestCase):
class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# TODO: test with some OS inside
# TODO: windows tools tests
def setUp(self):
self.qc = QubesVmCollection()
def remove_vms(self, vms):
self.qc.lock_db_for_writing()
self.qc.load()
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
if vm.is_running():
try:
vm.force_shutdown()
except:
pass
try:
vm.remove_from_disk()
except OSError:
pass
self.qc.pop(vm.qid)
self.qc.save()
self.qc.unlock_db()
def tearDown(self):
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
def test_000_create_start(self):
self.qc.lock_db_for_writing()
try:
self.qc.load()
self.testvm1 = self.qc.add_new_vm("QubesHVm",
name="%svm1" % VM_PREFIX)
self.testvm1.create_on_disk(verbose=False)
self.qc.save()
finally:
self.qc.unlock_db()
self.testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
self.testvm1.create_on_disk(verbose=False)
self.qc.save()
self.testvm1.start()
self.assertEquals(self.testvm1.get_power_state(), "Running")
def test_010_create_start_template(self):
self.qc.lock_db_for_writing()
try:
self.qc.load()
self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name="%stemplate" % VM_PREFIX)
self.templatevm.create_on_disk(verbose=False)
self.qc.save()
finally:
self.qc.unlock_db()
self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
self.templatevm.create_on_disk(verbose=False)
self.templatevm.start()
self.assertEquals(self.templatevm.get_power_state(), "Running")
def test_020_create_start_template_vm(self):
self.qc.lock_db_for_writing()
try:
self.qc.load()
self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name="%stemplate" % VM_PREFIX)
self.templatevm.create_on_disk(verbose=False)
self.testvm2 = self.qc.add_new_vm("QubesHVm",
name="%svm2" % VM_PREFIX,
template=self.templatevm)
self.testvm2.create_on_disk(verbose=False)
self.qc.save()
finally:
self.qc.unlock_db()
self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
self.templatevm.create_on_disk(verbose=False)
self.testvm2 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm2'),
template=self.templatevm)
self.testvm2.create_on_disk(verbose=False)
self.qc.save()
self.testvm2.start()
self.assertEquals(self.testvm2.get_power_state(), "Running")
def test_030_prevent_simultaneus_start(self):
self.qc.lock_db_for_writing()
try:
self.qc.load()
self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name="%stemplate" % VM_PREFIX)
self.templatevm.create_on_disk(verbose=False)
self.testvm2 = self.qc.add_new_vm("QubesHVm",
name="%svm2" % VM_PREFIX,
template=self.templatevm)
self.testvm2.create_on_disk(verbose=False)
self.qc.save()
finally:
self.qc.unlock_db()
self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
self.templatevm.create_on_disk(verbose=False)
self.testvm2 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm2'),
template=self.templatevm)
self.testvm2.create_on_disk(verbose=False)
self.qc.save()
self.templatevm.start()
self.assertEquals(self.templatevm.get_power_state(), "Running")
@ -446,53 +379,21 @@ class HVMTests(unittest.TestCase):
self.assertRaises(QubesException, self.templatevm.start)
class DispVmTests(unittest.TestCase):
def setUp(self):
self.qc = QubesVmCollection()
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
def remove_vms(self, vms):
self.qc.lock_db_for_writing()
self.qc.load()
for vm in vms:
if isinstance(vm, str):
vm = self.qc.get_vm_by_name(vm)
else:
vm = self.qc[vm.qid]
if vm.is_running():
try:
vm.force_shutdown()
except:
pass
try:
vm.remove_from_disk()
except OSError:
pass
self.qc.pop(vm.qid)
self.qc.save()
self.qc.unlock_db()
def tearDown(self):
vmlist = [vm for vm in self.qc.values() if vm.name.startswith(
VM_PREFIX)]
self.remove_vms(vmlist)
class TC_20_DispVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_prepare_dvm(self):
self.qc.unlock_db()
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
'--default-template'],
stderr=open(os.devnull, 'w'))
self.assertEqual(retcode, 0)
self.qc.lock_db_for_reading()
self.qc.lock_db_for_writing()
self.qc.load()
self.qc.unlock_db()
self.assertIsNotNone(self.qc.get_vm_by_name(
self.qc.get_default_template().name + "-dvm"))
# TODO: check mtime of snapshot file
def test_010_simple_dvm_run(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
@ -502,22 +403,27 @@ class DispVmTests(unittest.TestCase):
self.assertEqual(stdout, "test\n")
# TODO: check if DispVM is destroyed
def test_020_gui_app(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
# wait for DispVM startup:
p.stdin.write("echo test\n")
p.stdin.flush()
l = p.stdout.readline()
self.assertEqual(l, "test\n")
# potential race condition, but our tests are supposed to be
# running on dedicated machine, so should not be a problem
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
max_qid = 0
for vm in self.qc.values():
if not vm.is_disposablevm():
@ -527,6 +433,7 @@ class DispVmTests(unittest.TestCase):
dispvm = self.qc[max_qid]
self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
self.assertTrue(dispvm.is_running())
p.stdin.write("gnome-terminal\n")
wait_count = 0
window_title = 'user@%s' % (dispvm.template.name + "-dvm")
@ -576,17 +483,16 @@ class DispVmTests(unittest.TestCase):
"DispVM not removed from qubes.xml")
def test_030_edit_file(self):
self.qc.lock_db_for_writing()
self.qc.load()
self.testvm1 = self.qc.add_new_vm("QubesAppVm",
name="%svm1" % VM_PREFIX,
template=self.qc.get_default_template())
name=self.make_vm_name('vm1'),
template=self.qc.get_default_template())
self.testvm1.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
self.testvm1.start()
self.testvm1.run("echo test1 > /home/user/test.txt", wait=True)
self.qc.unlock_db()
p = self.testvm1.run("qvm-open-in-dvm /home/user/test.txt",
passio_popen=True)