From e499b529ad5ebb51050dc2e7d5589faed8480451 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?=
 <marmarek@invisiblethingslab.com>
Date: Sun, 25 Sep 2016 16:31:31 +0200
Subject: [PATCH] tests: move BackupTestMixin to qubes.tests.int.backup

This is much more logical place, don't pollute main qubes.tests module.
---
 qubes/tests/__init__.py                | 195 +-----------------------
 qubes/tests/int/backup.py              | 198 ++++++++++++++++++++++++-
 qubes/tests/int/backupcompatibility.py |   4 +-
 3 files changed, 202 insertions(+), 195 deletions(-)

diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py
index 6776eeba..bc086ef0 100644
--- a/qubes/tests/__init__.py
+++ b/qubes/tests/__init__.py
@@ -33,27 +33,26 @@
     don't run the tests.
 """
 
+import __builtin__
 import collections
-from distutils import spawn
 import functools
-import multiprocessing
 import logging
 import os
 import shutil
 import subprocess
 import sys
 import tempfile
+import time
 import traceback
 import unittest
-import __builtin__
+from distutils import spawn
 
 import lxml.etree
-import time
 
+import qubes.backup
 import qubes.config
 import qubes.devices
 import qubes.events
-import qubes.backup
 import qubes.exc
 import qubes.vm.standalonevm
 
@@ -771,192 +770,6 @@ class SystemTestsMixin(object):
             shutil.rmtree(mountpoint)
             subprocess.check_call(['sudo', 'losetup', '-d', loopdev])
 
-# noinspection PyAttributeOutsideInit
-class BackupTestsMixin(SystemTestsMixin):
-    class BackupErrorHandler(logging.Handler):
-        def __init__(self, errors_queue, level=logging.NOTSET):
-            super(BackupTestsMixin.BackupErrorHandler, self).__init__(level)
-            self.errors_queue = errors_queue
-
-        def emit(self, record):
-            self.errors_queue.put(record.getMessage())
-
-    def setUp(self):
-        super(BackupTestsMixin, self).setUp()
-        try:
-            self.init_default_template(self.template)
-        except AttributeError:
-            self.init_default_template()
-        self.error_detected = multiprocessing.Queue()
-        self.verbose = False
-
-        if self.verbose:
-            print >>sys.stderr, "-> Creating backupvm"
-
-        self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
-        if os.path.exists(self.backupdir):
-            shutil.rmtree(self.backupdir)
-        os.mkdir(self.backupdir)
-
-        self.error_handler = self.BackupErrorHandler(self.error_detected,
-            level=logging.WARNING)
-        backup_log = logging.getLogger('qubes.backup')
-        backup_log.addHandler(self.error_handler)
-
-    def tearDown(self):
-        super(BackupTestsMixin, self).tearDown()
-        shutil.rmtree(self.backupdir)
-
-        backup_log = logging.getLogger('qubes.backup')
-        backup_log.removeHandler(self.error_handler)
-
-    def fill_image(self, path, size=None, sparse=False):
-        block_size = 4096
-
-        if self.verbose:
-            print >>sys.stderr, "-> Filling %s" % path
-        f = open(path, 'w+')
-        if size is None:
-            f.seek(0, 2)
-            size = f.tell()
-        f.seek(0)
-
-        for block_num in xrange(size/block_size):
-            f.write('a' * block_size)
-            if sparse:
-                f.seek(block_size, 1)
-
-        f.close()
-
-    # NOTE: this was create_basic_vms
-    def create_backup_vms(self):
-        template = self.app.default_template
-
-        vms = []
-        vmname = self.make_vm_name('test-net')
-        if self.verbose:
-            print >>sys.stderr, "-> Creating %s" % vmname
-        testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM,
-            name=vmname, template=template, provides_network=True, label='red')
-        testnet.create_on_disk()
-        testnet.features['services/ntpd'] = True
-        vms.append(testnet)
-        self.fill_image(testnet.volumes['private'].path, 20*1024*1024)
-
-        vmname = self.make_vm_name('test1')
-        if self.verbose:
-            print >>sys.stderr, "-> Creating %s" % vmname
-        testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
-            name=vmname, template=template, label='red')
-        testvm1.uses_default_netvm = False
-        testvm1.netvm = testnet
-        testvm1.create_on_disk()
-        vms.append(testvm1)
-        self.fill_image(testvm1.volumes['private'].path, 100*1024*1024)
-
-        vmname = self.make_vm_name('testhvm1')
-        if self.verbose:
-            print >>sys.stderr, "-> Creating %s" % vmname
-        testvm2 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
-                                      name=vmname,
-                                      hvm=True,
-                                      label='red')
-        testvm2.create_on_disk()
-        self.fill_image(testvm2.volumes['root'].path, 1024 * 1024 * 1024, True)
-        vms.append(testvm2)
-
-        vmname = self.make_vm_name('template')
-        if self.verbose:
-            print >>sys.stderr, "-> Creating %s" % vmname
-        testvm3 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
-            name=vmname, label='red')
-        testvm3.create_on_disk()
-        self.fill_image(testvm3.volumes['root'].path, 100 * 1024 * 1024, True)
-        vms.append(testvm3)
-
-        vmname = self.make_vm_name('custom')
-        if self.verbose:
-            print >>sys.stderr, "-> Creating %s" % vmname
-        testvm4 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
-            name=vmname, template=testvm3, label='red')
-        testvm4.create_on_disk()
-        vms.append(testvm4)
-
-        self.app.save()
-
-        return vms
-
-    def make_backup(self, vms, target=None, expect_failure=False, **kwargs):
-        if target is None:
-            target = self.backupdir
-        try:
-            backup = qubes.backup.Backup(self.app, vms, **kwargs)
-        except qubes.exc.QubesException as e:
-            if not expect_failure:
-                self.fail("QubesException during backup_prepare: %s" % str(e))
-            else:
-                raise
-
-        backup.passphrase = 'qubes'
-        backup.target_dir = target
-
-        try:
-            backup.backup_do()
-        except qubes.exc.QubesException as e:
-            if not expect_failure:
-                self.fail("QubesException during backup_do: %s" % str(e))
-            else:
-                raise
-
-        # FIXME why?
-        #self.reload_db()
-
-    def restore_backup(self, source=None, appvm=None, options=None,
-                       expect_errors=None):
-        if source is None:
-            backupfile = os.path.join(self.backupdir,
-                                      sorted(os.listdir(self.backupdir))[-1])
-        else:
-            backupfile = source
-
-        with self.assertNotRaises(qubes.exc.QubesException):
-            restore_op = qubes.backup.BackupRestore(
-                self.app, backupfile, appvm, "qubes")
-            if options:
-                for key, value in options.iteritems():
-                    setattr(restore_op.options, key, value)
-            restore_info = restore_op.get_restore_info()
-        if self.verbose:
-            print restore_op.get_restore_summary(restore_info)
-
-        with self.assertNotRaises(qubes.exc.QubesException):
-            restore_op.restore_do(restore_info)
-
-        # maybe someone forgot to call .save()
-        self.reload_db()
-
-        errors = []
-        if expect_errors is None:
-            expect_errors = []
-        else:
-            self.assertFalse(self.error_detected.empty(),
-                "Restore errors expected, but none detected")
-        while not self.error_detected.empty():
-            current_error = self.error_detected.get()
-            if any(map(current_error.startswith, expect_errors)):
-                continue
-            errors.append(current_error)
-        self.assertTrue(len(errors) == 0,
-                         "Error(s) detected during backup_restore_do: %s" %
-                         '\n'.join(errors))
-        if not appvm and not os.path.isdir(backupfile):
-            os.unlink(backupfile)
-
-    def create_sparse(self, path, size):
-        f = open(path, "w")
-        f.truncate(size)
-        f.close()
-
 
 def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
     # discard any tests from this module, because it hosts base classes
diff --git a/qubes/tests/int/backup.py b/qubes/tests/int/backup.py
index 7caae244..c23bd1b7 100644
--- a/qubes/tests/int/backup.py
+++ b/qubes/tests/int/backup.py
@@ -22,18 +22,210 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 #
-
+import logging
+import multiprocessing
 
 import os
+import shutil
 
 import sys
 import qubes
+import qubes.backup
 import qubes.exc
 import qubes.tests
+import qubes.vm
 import qubes.vm.appvm
 import qubes.vm.templatevm
 
-class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
+
+# noinspection PyAttributeOutsideInit
+class BackupTestsMixin(qubes.tests.SystemTestsMixin):
+    class BackupErrorHandler(logging.Handler):
+        def __init__(self, errors_queue, level=logging.NOTSET):
+            super(BackupTestsMixin.BackupErrorHandler, self).__init__(level)
+            self.errors_queue = errors_queue
+
+        def emit(self, record):
+            self.errors_queue.put(record.getMessage())
+
+    def setUp(self):
+        super(BackupTestsMixin, self).setUp()
+        try:
+            self.init_default_template(self.template)
+        except AttributeError:
+            self.init_default_template()
+        self.error_detected = multiprocessing.Queue()
+        self.verbose = False
+
+        if self.verbose:
+            print >>sys.stderr, "-> Creating backupvm"
+
+        self.backupdir = os.path.join(os.environ["HOME"], "test-backup")
+        if os.path.exists(self.backupdir):
+            shutil.rmtree(self.backupdir)
+        os.mkdir(self.backupdir)
+
+        self.error_handler = self.BackupErrorHandler(self.error_detected,
+            level=logging.WARNING)
+        backup_log = logging.getLogger('qubes.backup')
+        backup_log.addHandler(self.error_handler)
+
+    def tearDown(self):
+        super(BackupTestsMixin, self).tearDown()
+        shutil.rmtree(self.backupdir)
+
+        backup_log = logging.getLogger('qubes.backup')
+        backup_log.removeHandler(self.error_handler)
+
+    def fill_image(self, path, size=None, sparse=False):
+        block_size = 4096
+
+        if self.verbose:
+            print >>sys.stderr, "-> Filling %s" % path
+        f = open(path, 'w+')
+        if size is None:
+            f.seek(0, 2)
+            size = f.tell()
+        f.seek(0)
+
+        for block_num in xrange(size/block_size):
+            f.write('a' * block_size)
+            if sparse:
+                f.seek(block_size, 1)
+
+        f.close()
+
+    # NOTE: this was create_basic_vms
+    def create_backup_vms(self):
+        template = self.app.default_template
+
+        vms = []
+        vmname = self.make_vm_name('test-net')
+        if self.verbose:
+            print >>sys.stderr, "-> Creating %s" % vmname
+        testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=vmname, template=template, provides_network=True, label='red')
+        testnet.create_on_disk()
+        testnet.features['services/ntpd'] = True
+        vms.append(testnet)
+        self.fill_image(testnet.volumes['private'].path, 20*1024*1024)
+
+        vmname = self.make_vm_name('test1')
+        if self.verbose:
+            print >>sys.stderr, "-> Creating %s" % vmname
+        testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=vmname, template=template, label='red')
+        testvm1.uses_default_netvm = False
+        testvm1.netvm = testnet
+        testvm1.create_on_disk()
+        vms.append(testvm1)
+        self.fill_image(testvm1.volumes['private'].path, 100*1024*1024)
+
+        vmname = self.make_vm_name('testhvm1')
+        if self.verbose:
+            print >>sys.stderr, "-> Creating %s" % vmname
+        testvm2 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
+                                      name=vmname,
+                                      hvm=True,
+                                      label='red')
+        testvm2.create_on_disk()
+        self.fill_image(testvm2.volumes['root'].path, 1024 * 1024 * 1024, True)
+        vms.append(testvm2)
+
+        vmname = self.make_vm_name('template')
+        if self.verbose:
+            print >>sys.stderr, "-> Creating %s" % vmname
+        testvm3 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
+            name=vmname, label='red')
+        testvm3.create_on_disk()
+        self.fill_image(testvm3.volumes['root'].path, 100 * 1024 * 1024, True)
+        vms.append(testvm3)
+
+        vmname = self.make_vm_name('custom')
+        if self.verbose:
+            print >>sys.stderr, "-> Creating %s" % vmname
+        testvm4 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=vmname, template=testvm3, label='red')
+        testvm4.create_on_disk()
+        vms.append(testvm4)
+
+        self.app.save()
+
+        return vms
+
+    def make_backup(self, vms, target=None, expect_failure=False, **kwargs):
+        if target is None:
+            target = self.backupdir
+        try:
+            backup = qubes.backup.Backup(self.app, vms, **kwargs)
+        except qubes.exc.QubesException as e:
+            if not expect_failure:
+                self.fail("QubesException during backup_prepare: %s" % str(e))
+            else:
+                raise
+
+        backup.passphrase = 'qubes'
+        backup.target_dir = target
+
+        try:
+            backup.backup_do()
+        except qubes.exc.QubesException as e:
+            if not expect_failure:
+                self.fail("QubesException during backup_do: %s" % str(e))
+            else:
+                raise
+
+        # FIXME why?
+        #self.reload_db()
+
+    def restore_backup(self, source=None, appvm=None, options=None,
+                       expect_errors=None):
+        if source is None:
+            backupfile = os.path.join(self.backupdir,
+                                      sorted(os.listdir(self.backupdir))[-1])
+        else:
+            backupfile = source
+
+        with self.assertNotRaises(qubes.exc.QubesException):
+            restore_op = qubes.backup.BackupRestore(
+                self.app, backupfile, appvm, "qubes")
+            if options:
+                for key, value in options.iteritems():
+                    setattr(restore_op.options, key, value)
+            restore_info = restore_op.get_restore_info()
+        if self.verbose:
+            print restore_op.get_restore_summary(restore_info)
+
+        with self.assertNotRaises(qubes.exc.QubesException):
+            restore_op.restore_do(restore_info)
+
+        # maybe someone forgot to call .save()
+        self.reload_db()
+
+        errors = []
+        if expect_errors is None:
+            expect_errors = []
+        else:
+            self.assertFalse(self.error_detected.empty(),
+                "Restore errors expected, but none detected")
+        while not self.error_detected.empty():
+            current_error = self.error_detected.get()
+            if any(map(current_error.startswith, expect_errors)):
+                continue
+            errors.append(current_error)
+        self.assertTrue(len(errors) == 0,
+                         "Error(s) detected during backup_restore_do: %s" %
+                         '\n'.join(errors))
+        if not appvm and not os.path.isdir(backupfile):
+            os.unlink(backupfile)
+
+    def create_sparse(self, path, size):
+        f = open(path, "w")
+        f.truncate(size)
+        f.close()
+
+
+class TC_00_Backup(BackupTestsMixin, qubes.tests.QubesTestCase):
     def test_000_basic_backup(self):
         vms = self.create_backup_vms()
         self.make_backup(vms)
@@ -177,7 +369,7 @@ class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
                 self.assertEqual(restored_vm.netvm.name, vm.netvm.name + '1')
 
 
-class TC_10_BackupVMMixin(qubes.tests.BackupTestsMixin):
+class TC_10_BackupVMMixin(BackupTestsMixin):
     def setUp(self):
         super(TC_10_BackupVMMixin, self).setUp()
         self.backupvm = self.app.add_new_vm(
diff --git a/qubes/tests/int/backupcompatibility.py b/qubes/tests/int/backupcompatibility.py
index dda87e69..cfede7c6 100644
--- a/qubes/tests/int/backupcompatibility.py
+++ b/qubes/tests/int/backupcompatibility.py
@@ -31,6 +31,7 @@ import sys
 import re
 
 import qubes.tests
+import qubes.tests.int.backup
 
 QUBESXML_R2B2 = '''
 <QubesVmCollection updatevm="3" default_kernel="3.7.6-2" default_netvm="3" default_fw_netvm="2" default_template="1" clockvm="2">
@@ -143,7 +144,8 @@ compressed={compressed}
 compression-filter=gzip
 '''
 
-class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
+class TC_00_BackupCompatibility(
+    qubes.tests.int.backup.BackupTestsMixin, qubes.tests.QubesTestCase):
 
     def tearDown(self):
         self.remove_test_vms(prefix="test-")