tests: move BackupTestMixin to qubes.tests.int.backup

This is much more logical place, don't pollute main qubes.tests module.
This commit is contained in:
Marek Marczykowski-Górecki 2016-09-25 16:31:31 +02:00
parent 533804ebdc
commit e499b529ad
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 202 additions and 195 deletions

View File

@ -33,27 +33,26 @@
don't run the tests. don't run the tests.
""" """
import __builtin__
import collections import collections
from distutils import spawn
import functools import functools
import multiprocessing
import logging import logging
import os import os
import shutil import shutil
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import time
import traceback import traceback
import unittest import unittest
import __builtin__ from distutils import spawn
import lxml.etree import lxml.etree
import time
import qubes.backup
import qubes.config import qubes.config
import qubes.devices import qubes.devices
import qubes.events import qubes.events
import qubes.backup
import qubes.exc import qubes.exc
import qubes.vm.standalonevm import qubes.vm.standalonevm
@ -771,192 +770,6 @@ class SystemTestsMixin(object):
shutil.rmtree(mountpoint) shutil.rmtree(mountpoint)
subprocess.check_call(['sudo', 'losetup', '-d', loopdev]) subprocess.check_call(['sudo', 'losetup', '-d', loopdev])
# noinspection PyAttributeOutsideInit
class BackupTestsMixin(SystemTestsMixin):
class BackupErrorHandler(logging.Handler):
def __init__(self, errors_queue, level=logging.NOTSET):
super(BackupTestsMixin.BackupErrorHandler, self).__init__(level)
self.errors_queue = errors_queue
def emit(self, record):
self.errors_queue.put(record.getMessage())
def setUp(self):
super(BackupTestsMixin, self).setUp()
try:
self.init_default_template(self.template)
except AttributeError:
self.init_default_template()
self.error_detected = multiprocessing.Queue()
self.verbose = False
if self.verbose:
print >>sys.stderr, "-> Creating backupvm"
self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
if os.path.exists(self.backupdir):
shutil.rmtree(self.backupdir)
os.mkdir(self.backupdir)
self.error_handler = self.BackupErrorHandler(self.error_detected,
level=logging.WARNING)
backup_log = logging.getLogger('qubes.backup')
backup_log.addHandler(self.error_handler)
def tearDown(self):
super(BackupTestsMixin, self).tearDown()
shutil.rmtree(self.backupdir)
backup_log = logging.getLogger('qubes.backup')
backup_log.removeHandler(self.error_handler)
def fill_image(self, path, size=None, sparse=False):
block_size = 4096
if self.verbose:
print >>sys.stderr, "-> Filling %s" % path
f = open(path, 'w+')
if size is None:
f.seek(0, 2)
size = f.tell()
f.seek(0)
for block_num in xrange(size/block_size):
f.write('a' * block_size)
if sparse:
f.seek(block_size, 1)
f.close()
# NOTE: this was create_basic_vms
def create_backup_vms(self):
template = self.app.default_template
vms = []
vmname = self.make_vm_name('test-net')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=template, provides_network=True, label='red')
testnet.create_on_disk()
testnet.features['services/ntpd'] = True
vms.append(testnet)
self.fill_image(testnet.volumes['private'].path, 20*1024*1024)
vmname = self.make_vm_name('test1')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=template, label='red')
testvm1.uses_default_netvm = False
testvm1.netvm = testnet
testvm1.create_on_disk()
vms.append(testvm1)
self.fill_image(testvm1.volumes['private'].path, 100*1024*1024)
vmname = self.make_vm_name('testhvm1')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm2 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
name=vmname,
hvm=True,
label='red')
testvm2.create_on_disk()
self.fill_image(testvm2.volumes['root'].path, 1024 * 1024 * 1024, True)
vms.append(testvm2)
vmname = self.make_vm_name('template')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm3 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
name=vmname, label='red')
testvm3.create_on_disk()
self.fill_image(testvm3.volumes['root'].path, 100 * 1024 * 1024, True)
vms.append(testvm3)
vmname = self.make_vm_name('custom')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm4 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=testvm3, label='red')
testvm4.create_on_disk()
vms.append(testvm4)
self.app.save()
return vms
def make_backup(self, vms, target=None, expect_failure=False, **kwargs):
if target is None:
target = self.backupdir
try:
backup = qubes.backup.Backup(self.app, vms, **kwargs)
except qubes.exc.QubesException as e:
if not expect_failure:
self.fail("QubesException during backup_prepare: %s" % str(e))
else:
raise
backup.passphrase = 'qubes'
backup.target_dir = target
try:
backup.backup_do()
except qubes.exc.QubesException as e:
if not expect_failure:
self.fail("QubesException during backup_do: %s" % str(e))
else:
raise
# FIXME why?
#self.reload_db()
def restore_backup(self, source=None, appvm=None, options=None,
expect_errors=None):
if source is None:
backupfile = os.path.join(self.backupdir,
sorted(os.listdir(self.backupdir))[-1])
else:
backupfile = source
with self.assertNotRaises(qubes.exc.QubesException):
restore_op = qubes.backup.BackupRestore(
self.app, backupfile, appvm, "qubes")
if options:
for key, value in options.iteritems():
setattr(restore_op.options, key, value)
restore_info = restore_op.get_restore_info()
if self.verbose:
print restore_op.get_restore_summary(restore_info)
with self.assertNotRaises(qubes.exc.QubesException):
restore_op.restore_do(restore_info)
# maybe someone forgot to call .save()
self.reload_db()
errors = []
if expect_errors is None:
expect_errors = []
else:
self.assertFalse(self.error_detected.empty(),
"Restore errors expected, but none detected")
while not self.error_detected.empty():
current_error = self.error_detected.get()
if any(map(current_error.startswith, expect_errors)):
continue
errors.append(current_error)
self.assertTrue(len(errors) == 0,
"Error(s) detected during backup_restore_do: %s" %
'\n'.join(errors))
if not appvm and not os.path.isdir(backupfile):
os.unlink(backupfile)
def create_sparse(self, path, size):
f = open(path, "w")
f.truncate(size)
f.close()
def load_tests(loader, tests, pattern): # pylint: disable=unused-argument def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
# discard any tests from this module, because it hosts base classes # discard any tests from this module, because it hosts base classes

View File

@ -22,18 +22,210 @@
# with this program; if not, write to the Free Software Foundation, Inc., # with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# #
import logging
import multiprocessing
import os import os
import shutil
import sys import sys
import qubes import qubes
import qubes.backup
import qubes.exc import qubes.exc
import qubes.tests import qubes.tests
import qubes.vm
import qubes.vm.appvm import qubes.vm.appvm
import qubes.vm.templatevm import qubes.vm.templatevm
class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
# noinspection PyAttributeOutsideInit
class BackupTestsMixin(qubes.tests.SystemTestsMixin):
class BackupErrorHandler(logging.Handler):
def __init__(self, errors_queue, level=logging.NOTSET):
super(BackupTestsMixin.BackupErrorHandler, self).__init__(level)
self.errors_queue = errors_queue
def emit(self, record):
self.errors_queue.put(record.getMessage())
def setUp(self):
super(BackupTestsMixin, self).setUp()
try:
self.init_default_template(self.template)
except AttributeError:
self.init_default_template()
self.error_detected = multiprocessing.Queue()
self.verbose = False
if self.verbose:
print >>sys.stderr, "-> Creating backupvm"
self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
if os.path.exists(self.backupdir):
shutil.rmtree(self.backupdir)
os.mkdir(self.backupdir)
self.error_handler = self.BackupErrorHandler(self.error_detected,
level=logging.WARNING)
backup_log = logging.getLogger('qubes.backup')
backup_log.addHandler(self.error_handler)
def tearDown(self):
super(BackupTestsMixin, self).tearDown()
shutil.rmtree(self.backupdir)
backup_log = logging.getLogger('qubes.backup')
backup_log.removeHandler(self.error_handler)
def fill_image(self, path, size=None, sparse=False):
block_size = 4096
if self.verbose:
print >>sys.stderr, "-> Filling %s" % path
f = open(path, 'w+')
if size is None:
f.seek(0, 2)
size = f.tell()
f.seek(0)
for block_num in xrange(size/block_size):
f.write('a' * block_size)
if sparse:
f.seek(block_size, 1)
f.close()
# NOTE: this was create_basic_vms
def create_backup_vms(self):
template = self.app.default_template
vms = []
vmname = self.make_vm_name('test-net')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=template, provides_network=True, label='red')
testnet.create_on_disk()
testnet.features['services/ntpd'] = True
vms.append(testnet)
self.fill_image(testnet.volumes['private'].path, 20*1024*1024)
vmname = self.make_vm_name('test1')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=template, label='red')
testvm1.uses_default_netvm = False
testvm1.netvm = testnet
testvm1.create_on_disk()
vms.append(testvm1)
self.fill_image(testvm1.volumes['private'].path, 100*1024*1024)
vmname = self.make_vm_name('testhvm1')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm2 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
name=vmname,
hvm=True,
label='red')
testvm2.create_on_disk()
self.fill_image(testvm2.volumes['root'].path, 1024 * 1024 * 1024, True)
vms.append(testvm2)
vmname = self.make_vm_name('template')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm3 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
name=vmname, label='red')
testvm3.create_on_disk()
self.fill_image(testvm3.volumes['root'].path, 100 * 1024 * 1024, True)
vms.append(testvm3)
vmname = self.make_vm_name('custom')
if self.verbose:
print >>sys.stderr, "-> Creating %s" % vmname
testvm4 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=testvm3, label='red')
testvm4.create_on_disk()
vms.append(testvm4)
self.app.save()
return vms
def make_backup(self, vms, target=None, expect_failure=False, **kwargs):
if target is None:
target = self.backupdir
try:
backup = qubes.backup.Backup(self.app, vms, **kwargs)
except qubes.exc.QubesException as e:
if not expect_failure:
self.fail("QubesException during backup_prepare: %s" % str(e))
else:
raise
backup.passphrase = 'qubes'
backup.target_dir = target
try:
backup.backup_do()
except qubes.exc.QubesException as e:
if not expect_failure:
self.fail("QubesException during backup_do: %s" % str(e))
else:
raise
# FIXME why?
#self.reload_db()
def restore_backup(self, source=None, appvm=None, options=None,
expect_errors=None):
if source is None:
backupfile = os.path.join(self.backupdir,
sorted(os.listdir(self.backupdir))[-1])
else:
backupfile = source
with self.assertNotRaises(qubes.exc.QubesException):
restore_op = qubes.backup.BackupRestore(
self.app, backupfile, appvm, "qubes")
if options:
for key, value in options.iteritems():
setattr(restore_op.options, key, value)
restore_info = restore_op.get_restore_info()
if self.verbose:
print restore_op.get_restore_summary(restore_info)
with self.assertNotRaises(qubes.exc.QubesException):
restore_op.restore_do(restore_info)
# maybe someone forgot to call .save()
self.reload_db()
errors = []
if expect_errors is None:
expect_errors = []
else:
self.assertFalse(self.error_detected.empty(),
"Restore errors expected, but none detected")
while not self.error_detected.empty():
current_error = self.error_detected.get()
if any(map(current_error.startswith, expect_errors)):
continue
errors.append(current_error)
self.assertTrue(len(errors) == 0,
"Error(s) detected during backup_restore_do: %s" %
'\n'.join(errors))
if not appvm and not os.path.isdir(backupfile):
os.unlink(backupfile)
def create_sparse(self, path, size):
f = open(path, "w")
f.truncate(size)
f.close()
class TC_00_Backup(BackupTestsMixin, qubes.tests.QubesTestCase):
def test_000_basic_backup(self): def test_000_basic_backup(self):
vms = self.create_backup_vms() vms = self.create_backup_vms()
self.make_backup(vms) self.make_backup(vms)
@ -177,7 +369,7 @@ class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
self.assertEqual(restored_vm.netvm.name, vm.netvm.name + '1') self.assertEqual(restored_vm.netvm.name, vm.netvm.name + '1')
class TC_10_BackupVMMixin(qubes.tests.BackupTestsMixin): class TC_10_BackupVMMixin(BackupTestsMixin):
def setUp(self): def setUp(self):
super(TC_10_BackupVMMixin, self).setUp() super(TC_10_BackupVMMixin, self).setUp()
self.backupvm = self.app.add_new_vm( self.backupvm = self.app.add_new_vm(

View File

@ -31,6 +31,7 @@ import sys
import re import re
import qubes.tests import qubes.tests
import qubes.tests.int.backup
QUBESXML_R2B2 = ''' QUBESXML_R2B2 = '''
<QubesVmCollection updatevm="3" default_kernel="3.7.6-2" default_netvm="3" default_fw_netvm="2" default_template="1" clockvm="2"> <QubesVmCollection updatevm="3" default_kernel="3.7.6-2" default_netvm="3" default_fw_netvm="2" default_template="1" clockvm="2">
@ -143,7 +144,8 @@ compressed={compressed}
compression-filter=gzip compression-filter=gzip
''' '''
class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase): class TC_00_BackupCompatibility(
qubes.tests.int.backup.BackupTestsMixin, qubes.tests.QubesTestCase):
def tearDown(self): def tearDown(self):
self.remove_test_vms(prefix="test-") self.remove_test_vms(prefix="test-")