diff --git a/tests/backupcompatibility.py b/tests/backupcompatibility.py index 2aa3faf4..775db92f 100644 --- a/tests/backupcompatibility.py +++ b/tests/backupcompatibility.py @@ -28,6 +28,7 @@ import subprocess import unittest import sys +import re from qubes.qubes import QubesVmCollection, QubesException from qubes import backup @@ -53,6 +54,35 @@ QUBESXML_R2B2 = ''' ''' +QUBESXML_R2 = ''' + + + + + + + + + + + + + + + + + +''' + +MANGLED_SUBDIRS_R2 = { + "test-work": "vm5", + "test-template-clone": "vm9", + "test-custom-template-appvm": "vm10", + "test-standalonevm": "vm11", + "test-testproxy": "vm12", + "test-testhvm": "vm14", +} + APPTEMPLATE_R2B2 = ''' [Desktop Entry] Name=%VMNAME%: {name} @@ -107,6 +137,13 @@ QUBESXML_R1 = ''' ''' +BACKUP_HEADER_R2 = '''version=3 +hmac-algorithm=SHA512 +crypto-algorithm=aes-256-cbc +encrypted={encrypted} +compressed={compressed} +compression-filter=gzip +''' class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase): def create_whitelisted_appmenus(self, filename): @@ -244,6 +281,118 @@ class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesT "vm-templates/test-template-clone")), appmenus_list) + def calculate_hmac(self, f_name, algorithm="sha512", password="qubes"): + subprocess.check_call(["openssl", "dgst", "-"+algorithm, "-hmac", + password], + stdin=open(self.fullpath(f_name), "r"), + stdout=open(self.fullpath(f_name+".hmac"), "w")) + + def append_backup_stream(self, f_name, stream, basedir=None): + if not basedir: + basedir = self.backupdir + subprocess.check_call(["tar", "-cO", "--posix", "-C", basedir, + f_name], + stdout=stream) + + def handle_v3_file(self, f_name, subdir, stream, compressed=True, + encrypted=True): + # create inner archive + tar_cmdline = ["tar", "-Pc", '--sparse', + '-C', self.fullpath(os.path.dirname(f_name)), + '--xform', 's:^%s:%s\\0:' % ( + os.path.basename(f_name), + subdir), + os.path.basename(f_name) + ] + if compressed: + tar_cmdline.insert(-1, "--use-compress-program=%s" % "gzip") + tar = subprocess.Popen(tar_cmdline, stdout=subprocess.PIPE) + if encrypted: + encryptor = subprocess.Popen( + ["openssl", "enc", "-e", "-aes-256-cbc", "-pass", "pass:qubes"], + stdin=tar.stdout, + stdout=subprocess.PIPE) + data = encryptor.stdout + else: + data = tar.stdout + + stage1_dir = self.fullpath(os.path.join("stage1", subdir)) + if not os.path.exists(stage1_dir): + os.makedirs(stage1_dir) + subprocess.check_call(["split", "--numeric-suffixes", + "--suffix-length=3", + "--bytes="+str(100*1024*1024), "-", + os.path.join(stage1_dir, + os.path.basename(f_name+"."))], + stdin=data) + + for part in sorted(os.listdir(stage1_dir)): + if not re.match( + r"^{}.[0-9][0-9][0-9]$".format(os.path.basename(f_name)), + part): + continue + part_with_dir = os.path.join(subdir, part) + self.calculate_hmac(os.path.join("stage1", part_with_dir)) + self.append_backup_stream(part_with_dir, stream, + basedir=self.fullpath("stage1")) + self.append_backup_stream(part_with_dir+".hmac", stream, + basedir=self.fullpath("stage1")) + + def create_v3_backup(self, encrypted=True, compressed=True): + """ + Create "backup format 3" backup - used in R2 and R3.0 + + :param encrypt: Should the backup be encrypted + :return: + """ + output = open(self.fullpath("backup.bin"), "w") + f = open(self.fullpath("backup-header"), "w") + f.write(BACKUP_HEADER_R2.format( + encrypted=str(encrypted), + compressed=str(compressed) + )) + f.close() + self.calculate_hmac("backup-header") + self.append_backup_stream("backup-header", output) + self.append_backup_stream("backup-header.hmac", output) + f = open(self.fullpath("qubes.xml"), "w") + if encrypted: + qubesxml = QUBESXML_R2 + for vmname, subdir in MANGLED_SUBDIRS_R2.items(): + qubesxml = re.sub(r"[a-z-]*/{}".format(vmname), + subdir, qubesxml) + f.write(qubesxml) + else: + f.write(QUBESXML_R2) + f.close() + + self.handle_v3_file("qubes.xml", "", output, encrypted=encrypted, + compressed=compressed) + + self.create_v1_files(r2b2=True) + for vm_type in ["appvms", "servicevms"]: + for vm_name in os.listdir(self.fullpath(vm_type)): + vm_dir = os.path.join(vm_type, vm_name) + for f_name in os.listdir(self.fullpath(vm_dir)): + if encrypted: + subdir = MANGLED_SUBDIRS_R2[vm_name] + else: + subdir = vm_dir + self.handle_v3_file( + os.path.join(vm_dir, f_name), + subdir+'/', output, encrypted=encrypted) + + for vm_name in os.listdir(self.fullpath("vm-templates")): + vm_dir = os.path.join("vm-templates", vm_name) + if encrypted: + subdir = MANGLED_SUBDIRS_R2[vm_name] + else: + subdir = vm_dir + self.handle_v3_file( + os.path.join(vm_dir, "."), + subdir+'/', output, encrypted=encrypted) + + output.close() def test_100_r1(self): self.create_v1_files(r2b2=False) @@ -286,3 +435,39 @@ class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesT self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") .template, self.qc.get_vm_by_name("test-template-clone")) + + def test_210_r2(self): + self.create_v3_backup(False) + + self.restore_backup(self.fullpath("backup.bin"), options={ + 'use-default-template': True, + 'use-default-netvm': True, + }) + self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-testhvm")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) + self.assertIsNotNone(self.qc.get_vm_by_name( + "test-custom-template-appvm")) + self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") + .template, + self.qc.get_vm_by_name("test-template-clone")) + + def test_220_r2_encrypted(self): + self.create_v3_backup(True) + + self.restore_backup(self.fullpath("backup.bin"), options={ + 'use-default-template': True, + 'use-default-netvm': True, + }) + self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-testhvm")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) + self.assertIsNotNone(self.qc.get_vm_by_name( + "test-custom-template-appvm")) + self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") + .template, + self.qc.get_vm_by_name("test-template-clone"))