From 58586a0e38caf57b745241541853201b96b01c1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Sun, 28 Sep 2014 03:17:37 +0200 Subject: [PATCH] tests: add test for restoring R2B2 backup One of "format 1" backup. --- tests/backupcompatibility.py | 354 +++++++++++++++++++++++++++++++++++ 1 file changed, 354 insertions(+) create mode 100644 tests/backupcompatibility.py diff --git a/tests/backupcompatibility.py b/tests/backupcompatibility.py new file mode 100644 index 00000000..13c4d257 --- /dev/null +++ b/tests/backupcompatibility.py @@ -0,0 +1,354 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2014 Marek Marczykowski-Górecki +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# +from multiprocessing import Queue + +import os +import shutil +import subprocess + +import unittest +import sys +from qubes.qubes import QubesVmCollection, QubesException +from qubes import backup + +VM_PREFIX = "test-" + +QUBESXML_R2B2 = """ + + + + + + + + + + + + + + + + +""" + +APPTEMPLATE_R2B2 = """ +[Desktop Entry] +Name=%VMNAME%: {name} +GenericName=%VMNAME%: {name} +GenericName[ca]=%VMNAME%: Navegador web +GenericName[cs]=%VMNAME%: Webový prohlížeč +GenericName[es]=%VMNAME%: Navegador web +GenericName[fa]=%VMNAME%: مرورر اینترنتی +GenericName[fi]=%VMNAME%: WWW-selain +GenericName[fr]=%VMNAME%: Navigateur Web +GenericName[hu]=%VMNAME%: Webböngésző +GenericName[it]=%VMNAME%: Browser Web +GenericName[ja]=%VMNAME%: ウェブ・ブラウザ +GenericName[ko]=%VMNAME%: 웹 브라우저 +GenericName[nb]=%VMNAME%: Nettleser +GenericName[nl]=%VMNAME%: Webbrowser +GenericName[nn]=%VMNAME%: Nettlesar +GenericName[no]=%VMNAME%: Nettleser +GenericName[pl]=%VMNAME%: Przeglądarka WWW +GenericName[pt]=%VMNAME%: Navegador Web +GenericName[pt_BR]=%VMNAME%: Navegador Web +GenericName[sk]=%VMNAME%: Internetový prehliadač +GenericName[sv]=%VMNAME%: Webbläsare +Comment={comment} +Comment[ca]=Navegueu per el web +Comment[cs]=Prohlížení stránek World Wide Webu +Comment[de]=Im Internet surfen +Comment[es]=Navegue por la web +Comment[fa]=صفحات شبه جهانی اینترنت را مرور نمایید +Comment[fi]=Selaa Internetin WWW-sivuja +Comment[fr]=Navigue sur Internet +Comment[hu]=A világháló böngészése +Comment[it]=Esplora il web +Comment[ja]=ウェブを閲覧します +Comment[ko]=웹을 돌아 다닙니다 +Comment[nb]=Surf på nettet +Comment[nl]=Verken het internet +Comment[nn]=Surf på nettet +Comment[no]=Surf på nettet +Comment[pl]=Przeglądanie stron WWW +Comment[pt]=Navegue na Internet +Comment[pt_BR]=Navegue na Internet +Comment[sk]=Prehliadanie internetu +Comment[sv]=Surfa på webben +Exec=qvm-run -q --tray -a %VMNAME% '{command} %u' +Categories=Network;WebBrowser; +X-Qubes-VmName=%VMNAME% +Icon=%VMDIR%/icon.png +""" + +class BackupCompatibilityTests(unittest.TestCase): + def setUp(self): + self.error_detected = Queue() + self.verbose = False + self.qc = QubesVmCollection() + self.reload_qc() + + self.backupdir = os.path.join(os.environ["HOME"], "test-backup") + os.mkdir(self.backupdir) + + def tearDown(self): + vmlist = [vm for vm in self.qc.values() if vm.name.startswith( + VM_PREFIX)] + self.remove_vms(vmlist) + shutil.rmtree(self.backupdir) + + def reload_qc(self): + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + + def print_progress(self, progress): + if self.verbose: + print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress) + + def error_callback(self, message): + self.error_detected.put(message) + if self.verbose: + print >> sys.stderr, "ERROR: {0}".format(message) + + def print_callback(self, msg): + if self.verbose: + print msg + + def fill_image(self, path, size=None, sparse=False): + block_size = 4096 + + if self.verbose: + print >>sys.stderr, "-> Filling %s" % path + f = open(path, 'w+') + if size is None: + f.seek(0, 2) + size = f.tell() + f.seek(0) + + for block_num in xrange(size/block_size): + f.write('a' * block_size) + if sparse: + f.seek(block_size, 1) + + f.close() + + def create_sparse(self, path, size): + f = open(path, "w") + f.truncate(size) + f.close() + + def remove_vms(self, vms): + self.qc.lock_db_for_writing() + self.qc.load() + + for vm in vms: + if isinstance(vm, str): + vm = self.qc.get_vm_by_name(vm) + else: + vm = self.qc[vm.qid] + if self.verbose: + print >>sys.stderr, "-> Removing %s" % vm.name + vm.remove_from_disk() + self.qc.pop(vm.qid) + self.qc.save() + self.qc.unlock_db() + + def restore_backup(self, source=None, appvm=None, options=None): + if source is None: + backupfile = os.path.join(self.backupdir, + sorted(os.listdir(self.backupdir))[-1]) + else: + backupfile = source + try: + backup_info = backup.backup_restore_prepare( + backupfile, "qubes", print_callback=self.print_callback, + appvm=appvm, options=options) + except QubesException as e: self.fail( + "QubesException during backup_restore_prepare: %s" % str(e)) + if self.verbose: + backup.backup_restore_print_summary(backup_info) + + try: + backup.backup_restore_do( + backup_info, + print_callback=self.print_callback if self.verbose else None, + error_callback=self.error_callback) + except QubesException as e: + self.fail("QubesException during backup_restore_do: %s" % str(e)) + errors = [] + while not self.error_detected.empty(): + errors.append(self.error_detected.get()) + self.assertTrue(len(errors) == 0, + "Error(s) detected during backup_restore_do: %s" % + '\n'.join(errors)) + if os.path.isfile(backupfile): + os.unlink(backupfile) + + def create_whitelisted_appmenus(self, filename): + f = open(filename, "w") + f.write("gnome-terminal.desktop\n") + f.write("nautilus.desktop\n") + f.write("firefox.desktop\n") + f.write("mozilla-thunderbird.desktop\n") + f.write("libreoffice-startcenter.desktop\n") + f.close() + + def create_appmenus(self, dir, template, list): + for name in list: + f = open(os.path.join(dir, name + ".desktop"), "w") + f.write(template.format(name=name, comment=name, command=name)) + f.close() + + def create_private_img(self, filename): + self.create_sparse(filename, 2*2**30) + subprocess.check_call(["/usr/sbin/mkfs.ext4", "-q", "-F", filename]) + + def create_volatile_img(self, filename): + self.create_sparse(filename, 11.5*2**30) + sfdisk_input="0,1024,S\n,10240,L\n" + p = subprocess.Popen(["/usr/sbin/sfdisk", "--no-reread", "-u", + "M", + filename], stdout=open("/dev/null","w"), + stderr=subprocess.STDOUT, stdin=subprocess.PIPE) + p.communicate(input=sfdisk_input) + self.assertEqual(p.returncode, 0, "sfdisk failed with code %d" % p + .returncode) + # TODO: mkswap + + def fullpath(self, name): + return os.path.join(self.backupdir, name) + + def test_r2b2(self): + appmenus_list = [ + "firefox", "gnome-terminal", "evince", "evolution", + "mozilla-thunderbird", "libreoffice-startcenter", "nautilus", + "gedit", "gpk-update-viewer", "gpk-application" + ] + + f = open(self.fullpath("qubes.xml"), "w") + f.write(QUBESXML_R2B2) + f.close() + + os.mkdir(self.fullpath("appvms")) + os.mkdir(self.fullpath("servicevms")) + os.mkdir(self.fullpath("vm-templates")) + + # normal AppVM + os.mkdir(self.fullpath("appvms/test-work")) + self.create_whitelisted_appmenus(self.fullpath( + "appvms/test-work/whitelisted-appmenus.list")) + os.symlink("/usr/share/qubes/icons/green.png", + self.fullpath("appvms/test-work/icon.png")) + self.create_private_img(self.fullpath("appvms/test-work/private.img")) + + # StandaloneVM + os.mkdir(self.fullpath("appvms/test-standalonevm")) + self.create_whitelisted_appmenus(self.fullpath( + "appvms/test-standalonevm/whitelisted-appmenus.list")) + os.symlink("/usr/share/qubes/icons/blue.png", + self.fullpath("appvms/test-standalonevm/icon.png")) + self.create_private_img(self.fullpath( + "appvms/test-standalonevm/private.img")) + self.create_sparse( + self.fullpath("appvms/test-standalonevm/root.img"), 10*2**30) + self.fill_image(self.fullpath("appvms/test-standalonevm/root.img"), + 100*1024*1024, True) + os.mkdir(self.fullpath("appvms/test-standalonevm/apps.templates")) + self.create_appmenus(self.fullpath("appvms/test-standalonevm/apps" + ".templates"), + APPTEMPLATE_R2B2, + appmenus_list) + + # VM based on custom template + subprocess.check_call( + ["/bin/cp", "-a", self.fullpath("appvms/test-work"), + self.fullpath("appvms/test-custom-template-appvm")]) + + # HVM + subprocess.check_call( + ["/bin/cp", "-a", self.fullpath("appvms/test-standalonevm"), + self.fullpath("appvms/test-testhvm")]) + + # ProxyVM + os.mkdir(self.fullpath("servicevms/test-testproxy")) + self.create_whitelisted_appmenus(self.fullpath( + "servicevms/test-testproxy/whitelisted-appmenus.list")) + self.create_private_img( + self.fullpath("servicevms/test-testproxy/private.img")) + + # Custom template + os.mkdir(self.fullpath("vm-templates/test-template-clone")) + self.create_private_img( + self.fullpath("vm-templates/test-template-clone/private.img")) + self.create_sparse(self.fullpath( + "vm-templates/test-template-clone/root-cow.img"), 10*2**30) + self.create_sparse(self.fullpath( + "vm-templates/test-template-clone/root.img"), 10*2**30) + self.fill_image(self.fullpath( + "vm-templates/test-template-clone/root.img"), 1*2**30, True) + self.create_volatile_img(self.fullpath( + "vm-templates/test-template-clone/volatile.img")) + subprocess.check_call([ + "/bin/tar", "cS", + "-f", self.fullpath( + "vm-templates/test-template-clone/clean-volatile.img.tar"), + "-C", self.fullpath("vm-templates/test-template-clone"), + "volatile.img"]) + self.create_whitelisted_appmenus(self.fullpath( + "vm-templates/test-template-clone/whitelisted-appmenus.list")) + self.create_whitelisted_appmenus(self.fullpath( + "vm-templates/test-template-clone/vm-whitelisted-appmenus.list")) + self.create_whitelisted_appmenus(self.fullpath( + "vm-templates/test-template-clone/netvm-whitelisted-appmenus" + ".list")) + os.symlink("/usr/share/qubes/icons/green.png", + self.fullpath("vm-templates/test-template-clone/icon.png")) + os.mkdir( + self.fullpath("vm-templates/test-template-clone/apps.templates")) + self.create_appmenus( + self.fullpath("vm-templates/test-template-clone/apps.templates"), + APPTEMPLATE_R2B2, + appmenus_list) + os.mkdir(self.fullpath("vm-templates/test-template-clone/apps")) + self.create_appmenus( + self.fullpath("vm-templates/test-template-clone/apps"), + APPTEMPLATE_R2B2.replace("%VMNAME%", "test-template-clone") + .replace("%VMDIR%", self.fullpath( + "vm-templates/test-template-clone")), + appmenus_list) + + self.restore_backup(self.backupdir, options={ + 'use-default-template': True, + }) + self.reload_qc() + self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) + self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) + self.assertIsNotNone(self.qc.get_vm_by_name( + "test-custom-template-appvm")) + self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") + .template, + self.qc.get_vm_by_name("test-template-clone"))