diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 6776eeba..bc086ef0 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -33,27 +33,26 @@ don't run the tests. """ +import __builtin__ import collections -from distutils import spawn import functools -import multiprocessing import logging import os import shutil import subprocess import sys import tempfile +import time import traceback import unittest -import __builtin__ +from distutils import spawn import lxml.etree -import time +import qubes.backup import qubes.config import qubes.devices import qubes.events -import qubes.backup import qubes.exc import qubes.vm.standalonevm @@ -771,192 +770,6 @@ class SystemTestsMixin(object): shutil.rmtree(mountpoint) subprocess.check_call(['sudo', 'losetup', '-d', loopdev]) -# noinspection PyAttributeOutsideInit -class BackupTestsMixin(SystemTestsMixin): - class BackupErrorHandler(logging.Handler): - def __init__(self, errors_queue, level=logging.NOTSET): - super(BackupTestsMixin.BackupErrorHandler, self).__init__(level) - self.errors_queue = errors_queue - - def emit(self, record): - self.errors_queue.put(record.getMessage()) - - def setUp(self): - super(BackupTestsMixin, self).setUp() - try: - self.init_default_template(self.template) - except AttributeError: - self.init_default_template() - self.error_detected = multiprocessing.Queue() - self.verbose = False - - if self.verbose: - print >>sys.stderr, "-> Creating backupvm" - - self.backupdir = os.path.join(os.environ["HOME"], "test-backup") - if os.path.exists(self.backupdir): - shutil.rmtree(self.backupdir) - os.mkdir(self.backupdir) - - self.error_handler = self.BackupErrorHandler(self.error_detected, - level=logging.WARNING) - backup_log = logging.getLogger('qubes.backup') - backup_log.addHandler(self.error_handler) - - def tearDown(self): - super(BackupTestsMixin, self).tearDown() - shutil.rmtree(self.backupdir) - - backup_log = logging.getLogger('qubes.backup') - backup_log.removeHandler(self.error_handler) - - def fill_image(self, path, size=None, sparse=False): - block_size = 4096 - - if self.verbose: - print >>sys.stderr, "-> Filling %s" % path - f = open(path, 'w+') - if size is None: - f.seek(0, 2) - size = f.tell() - f.seek(0) - - for block_num in xrange(size/block_size): - f.write('a' * block_size) - if sparse: - f.seek(block_size, 1) - - f.close() - - # NOTE: this was create_basic_vms - def create_backup_vms(self): - template = self.app.default_template - - vms = [] - vmname = self.make_vm_name('test-net') - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM, - name=vmname, template=template, provides_network=True, label='red') - testnet.create_on_disk() - testnet.features['services/ntpd'] = True - vms.append(testnet) - self.fill_image(testnet.volumes['private'].path, 20*1024*1024) - - vmname = self.make_vm_name('test1') - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM, - name=vmname, template=template, label='red') - testvm1.uses_default_netvm = False - testvm1.netvm = testnet - testvm1.create_on_disk() - vms.append(testvm1) - self.fill_image(testvm1.volumes['private'].path, 100*1024*1024) - - vmname = self.make_vm_name('testhvm1') - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testvm2 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, - name=vmname, - hvm=True, - label='red') - testvm2.create_on_disk() - self.fill_image(testvm2.volumes['root'].path, 1024 * 1024 * 1024, True) - vms.append(testvm2) - - vmname = self.make_vm_name('template') - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testvm3 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM, - name=vmname, label='red') - testvm3.create_on_disk() - self.fill_image(testvm3.volumes['root'].path, 100 * 1024 * 1024, True) - vms.append(testvm3) - - vmname = self.make_vm_name('custom') - if self.verbose: - print >>sys.stderr, "-> Creating %s" % vmname - testvm4 = self.app.add_new_vm(qubes.vm.appvm.AppVM, - name=vmname, template=testvm3, label='red') - testvm4.create_on_disk() - vms.append(testvm4) - - self.app.save() - - return vms - - def make_backup(self, vms, target=None, expect_failure=False, **kwargs): - if target is None: - target = self.backupdir - try: - backup = qubes.backup.Backup(self.app, vms, **kwargs) - except qubes.exc.QubesException as e: - if not expect_failure: - self.fail("QubesException during backup_prepare: %s" % str(e)) - else: - raise - - backup.passphrase = 'qubes' - backup.target_dir = target - - try: - backup.backup_do() - except qubes.exc.QubesException as e: - if not expect_failure: - self.fail("QubesException during backup_do: %s" % str(e)) - else: - raise - - # FIXME why? - #self.reload_db() - - def restore_backup(self, source=None, appvm=None, options=None, - expect_errors=None): - if source is None: - backupfile = os.path.join(self.backupdir, - sorted(os.listdir(self.backupdir))[-1]) - else: - backupfile = source - - with self.assertNotRaises(qubes.exc.QubesException): - restore_op = qubes.backup.BackupRestore( - self.app, backupfile, appvm, "qubes") - if options: - for key, value in options.iteritems(): - setattr(restore_op.options, key, value) - restore_info = restore_op.get_restore_info() - if self.verbose: - print restore_op.get_restore_summary(restore_info) - - with self.assertNotRaises(qubes.exc.QubesException): - restore_op.restore_do(restore_info) - - # maybe someone forgot to call .save() - self.reload_db() - - errors = [] - if expect_errors is None: - expect_errors = [] - else: - self.assertFalse(self.error_detected.empty(), - "Restore errors expected, but none detected") - while not self.error_detected.empty(): - current_error = self.error_detected.get() - if any(map(current_error.startswith, expect_errors)): - continue - errors.append(current_error) - self.assertTrue(len(errors) == 0, - "Error(s) detected during backup_restore_do: %s" % - '\n'.join(errors)) - if not appvm and not os.path.isdir(backupfile): - os.unlink(backupfile) - - def create_sparse(self, path, size): - f = open(path, "w") - f.truncate(size) - f.close() - def load_tests(loader, tests, pattern): # pylint: disable=unused-argument # discard any tests from this module, because it hosts base classes diff --git a/qubes/tests/int/backup.py b/qubes/tests/int/backup.py index 7caae244..c23bd1b7 100644 --- a/qubes/tests/int/backup.py +++ b/qubes/tests/int/backup.py @@ -22,18 +22,210 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # - +import logging +import multiprocessing import os +import shutil import sys import qubes +import qubes.backup import qubes.exc import qubes.tests +import qubes.vm import qubes.vm.appvm import qubes.vm.templatevm -class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase): + +# noinspection PyAttributeOutsideInit +class BackupTestsMixin(qubes.tests.SystemTestsMixin): + class BackupErrorHandler(logging.Handler): + def __init__(self, errors_queue, level=logging.NOTSET): + super(BackupTestsMixin.BackupErrorHandler, self).__init__(level) + self.errors_queue = errors_queue + + def emit(self, record): + self.errors_queue.put(record.getMessage()) + + def setUp(self): + super(BackupTestsMixin, self).setUp() + try: + self.init_default_template(self.template) + except AttributeError: + self.init_default_template() + self.error_detected = multiprocessing.Queue() + self.verbose = False + + if self.verbose: + print >>sys.stderr, "-> Creating backupvm" + + self.backupdir = os.path.join(os.environ["HOME"], "test-backup") + if os.path.exists(self.backupdir): + shutil.rmtree(self.backupdir) + os.mkdir(self.backupdir) + + self.error_handler = self.BackupErrorHandler(self.error_detected, + level=logging.WARNING) + backup_log = logging.getLogger('qubes.backup') + backup_log.addHandler(self.error_handler) + + def tearDown(self): + super(BackupTestsMixin, self).tearDown() + shutil.rmtree(self.backupdir) + + backup_log = logging.getLogger('qubes.backup') + backup_log.removeHandler(self.error_handler) + + def fill_image(self, path, size=None, sparse=False): + block_size = 4096 + + if self.verbose: + print >>sys.stderr, "-> Filling %s" % path + f = open(path, 'w+') + if size is None: + f.seek(0, 2) + size = f.tell() + f.seek(0) + + for block_num in xrange(size/block_size): + f.write('a' * block_size) + if sparse: + f.seek(block_size, 1) + + f.close() + + # NOTE: this was create_basic_vms + def create_backup_vms(self): + template = self.app.default_template + + vms = [] + vmname = self.make_vm_name('test-net') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=vmname, template=template, provides_network=True, label='red') + testnet.create_on_disk() + testnet.features['services/ntpd'] = True + vms.append(testnet) + self.fill_image(testnet.volumes['private'].path, 20*1024*1024) + + vmname = self.make_vm_name('test1') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=vmname, template=template, label='red') + testvm1.uses_default_netvm = False + testvm1.netvm = testnet + testvm1.create_on_disk() + vms.append(testvm1) + self.fill_image(testvm1.volumes['private'].path, 100*1024*1024) + + vmname = self.make_vm_name('testhvm1') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testvm2 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, + name=vmname, + hvm=True, + label='red') + testvm2.create_on_disk() + self.fill_image(testvm2.volumes['root'].path, 1024 * 1024 * 1024, True) + vms.append(testvm2) + + vmname = self.make_vm_name('template') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testvm3 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM, + name=vmname, label='red') + testvm3.create_on_disk() + self.fill_image(testvm3.volumes['root'].path, 100 * 1024 * 1024, True) + vms.append(testvm3) + + vmname = self.make_vm_name('custom') + if self.verbose: + print >>sys.stderr, "-> Creating %s" % vmname + testvm4 = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=vmname, template=testvm3, label='red') + testvm4.create_on_disk() + vms.append(testvm4) + + self.app.save() + + return vms + + def make_backup(self, vms, target=None, expect_failure=False, **kwargs): + if target is None: + target = self.backupdir + try: + backup = qubes.backup.Backup(self.app, vms, **kwargs) + except qubes.exc.QubesException as e: + if not expect_failure: + self.fail("QubesException during backup_prepare: %s" % str(e)) + else: + raise + + backup.passphrase = 'qubes' + backup.target_dir = target + + try: + backup.backup_do() + except qubes.exc.QubesException as e: + if not expect_failure: + self.fail("QubesException during backup_do: %s" % str(e)) + else: + raise + + # FIXME why? + #self.reload_db() + + def restore_backup(self, source=None, appvm=None, options=None, + expect_errors=None): + if source is None: + backupfile = os.path.join(self.backupdir, + sorted(os.listdir(self.backupdir))[-1]) + else: + backupfile = source + + with self.assertNotRaises(qubes.exc.QubesException): + restore_op = qubes.backup.BackupRestore( + self.app, backupfile, appvm, "qubes") + if options: + for key, value in options.iteritems(): + setattr(restore_op.options, key, value) + restore_info = restore_op.get_restore_info() + if self.verbose: + print restore_op.get_restore_summary(restore_info) + + with self.assertNotRaises(qubes.exc.QubesException): + restore_op.restore_do(restore_info) + + # maybe someone forgot to call .save() + self.reload_db() + + errors = [] + if expect_errors is None: + expect_errors = [] + else: + self.assertFalse(self.error_detected.empty(), + "Restore errors expected, but none detected") + while not self.error_detected.empty(): + current_error = self.error_detected.get() + if any(map(current_error.startswith, expect_errors)): + continue + errors.append(current_error) + self.assertTrue(len(errors) == 0, + "Error(s) detected during backup_restore_do: %s" % + '\n'.join(errors)) + if not appvm and not os.path.isdir(backupfile): + os.unlink(backupfile) + + def create_sparse(self, path, size): + f = open(path, "w") + f.truncate(size) + f.close() + + +class TC_00_Backup(BackupTestsMixin, qubes.tests.QubesTestCase): def test_000_basic_backup(self): vms = self.create_backup_vms() self.make_backup(vms) @@ -177,7 +369,7 @@ class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase): self.assertEqual(restored_vm.netvm.name, vm.netvm.name + '1') -class TC_10_BackupVMMixin(qubes.tests.BackupTestsMixin): +class TC_10_BackupVMMixin(BackupTestsMixin): def setUp(self): super(TC_10_BackupVMMixin, self).setUp() self.backupvm = self.app.add_new_vm( diff --git a/qubes/tests/int/backupcompatibility.py b/qubes/tests/int/backupcompatibility.py index dda87e69..cfede7c6 100644 --- a/qubes/tests/int/backupcompatibility.py +++ b/qubes/tests/int/backupcompatibility.py @@ -31,6 +31,7 @@ import sys import re import qubes.tests +import qubes.tests.int.backup QUBESXML_R2B2 = ''' @@ -143,7 +144,8 @@ compressed={compressed} compression-filter=gzip ''' -class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase): +class TC_00_BackupCompatibility( + qubes.tests.int.backup.BackupTestsMixin, qubes.tests.QubesTestCase): def tearDown(self): self.remove_test_vms(prefix="test-")