#!/usr/bin/python # -*- coding: utf-8 -*- # # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2014 Marek Marczykowski-Górecki # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # from multiprocessing import Queue import os import shutil import subprocess import unittest import sys from qubes.qubes import QubesVmCollection, QubesException from qubes import backup VM_PREFIX = "test-" QUBESXML_R2B2 = """ """ APPTEMPLATE_R2B2 = """ [Desktop Entry] Name=%VMNAME%: {name} GenericName=%VMNAME%: {name} GenericName[ca]=%VMNAME%: Navegador web GenericName[cs]=%VMNAME%: Webový prohlížeč GenericName[es]=%VMNAME%: Navegador web GenericName[fa]=%VMNAME%: مرورر اینترنتی GenericName[fi]=%VMNAME%: WWW-selain GenericName[fr]=%VMNAME%: Navigateur Web GenericName[hu]=%VMNAME%: Webböngésző GenericName[it]=%VMNAME%: Browser Web GenericName[ja]=%VMNAME%: ウェブ・ブラウザ GenericName[ko]=%VMNAME%: 웹 브라우저 GenericName[nb]=%VMNAME%: Nettleser GenericName[nl]=%VMNAME%: Webbrowser GenericName[nn]=%VMNAME%: Nettlesar GenericName[no]=%VMNAME%: Nettleser GenericName[pl]=%VMNAME%: Przeglądarka WWW GenericName[pt]=%VMNAME%: Navegador Web GenericName[pt_BR]=%VMNAME%: Navegador Web GenericName[sk]=%VMNAME%: Internetový prehliadač GenericName[sv]=%VMNAME%: Webbläsare Comment={comment} Comment[ca]=Navegueu per el web Comment[cs]=Prohlížení stránek World Wide Webu Comment[de]=Im Internet surfen Comment[es]=Navegue por la web Comment[fa]=صفحات شبه جهانی اینترنت را مرور نمایید Comment[fi]=Selaa Internetin WWW-sivuja Comment[fr]=Navigue sur Internet Comment[hu]=A világháló böngészése Comment[it]=Esplora il web Comment[ja]=ウェブを閲覧します Comment[ko]=웹을 돌아 다닙니다 Comment[nb]=Surf på nettet Comment[nl]=Verken het internet Comment[nn]=Surf på nettet Comment[no]=Surf på nettet Comment[pl]=Przeglądanie stron WWW Comment[pt]=Navegue na Internet Comment[pt_BR]=Navegue na Internet Comment[sk]=Prehliadanie internetu Comment[sv]=Surfa på webben Exec=qvm-run -q --tray -a %VMNAME% '{command} %u' Categories=Network;WebBrowser; X-Qubes-VmName=%VMNAME% Icon=%VMDIR%/icon.png """ QUBESXML_R1 = """ """ class BackupCompatibilityTests(unittest.TestCase): def setUp(self): self.error_detected = Queue() self.verbose = False self.qc = QubesVmCollection() self.reload_qc() self.backupdir = os.path.join(os.environ["HOME"], "test-backup") os.mkdir(self.backupdir) def tearDown(self): vmlist = [vm for vm in self.qc.values() if vm.name.startswith( VM_PREFIX)] self.remove_vms(vmlist) shutil.rmtree(self.backupdir) def reload_qc(self): self.qc.lock_db_for_reading() self.qc.load() self.qc.unlock_db() def print_progress(self, progress): if self.verbose: print >> sys.stderr, "\r-> Backing up files: {0}%...".format(progress) def error_callback(self, message): self.error_detected.put(message) if self.verbose: print >> sys.stderr, "ERROR: {0}".format(message) def print_callback(self, msg): if self.verbose: print msg def fill_image(self, path, size=None, sparse=False): block_size = 4096 if self.verbose: print >>sys.stderr, "-> Filling %s" % path f = open(path, 'w+') if size is None: f.seek(0, 2) size = f.tell() f.seek(0) for block_num in xrange(size/block_size): f.write('a' * block_size) if sparse: f.seek(block_size, 1) f.close() def create_sparse(self, path, size): f = open(path, "w") f.truncate(size) f.close() def remove_vms(self, vms): self.qc.lock_db_for_writing() self.qc.load() for vm in vms: if isinstance(vm, str): vm = self.qc.get_vm_by_name(vm) else: vm = self.qc[vm.qid] if self.verbose: print >>sys.stderr, "-> Removing %s" % vm.name vm.remove_from_disk() self.qc.pop(vm.qid) self.qc.save() self.qc.unlock_db() def restore_backup(self, source=None, appvm=None, options=None): if source is None: backupfile = os.path.join(self.backupdir, sorted(os.listdir(self.backupdir))[-1]) else: backupfile = source try: backup_info = backup.backup_restore_prepare( backupfile, "qubes", print_callback=self.print_callback, appvm=appvm, options=options) except QubesException as e: self.fail( "QubesException during backup_restore_prepare: %s" % str(e)) if self.verbose: backup.backup_restore_print_summary(backup_info) try: backup.backup_restore_do( backup_info, print_callback=self.print_callback if self.verbose else None, error_callback=self.error_callback) except QubesException as e: self.fail("QubesException during backup_restore_do: %s" % str(e)) errors = [] while not self.error_detected.empty(): errors.append(self.error_detected.get()) self.assertTrue(len(errors) == 0, "Error(s) detected during backup_restore_do: %s" % '\n'.join(errors)) if os.path.isfile(backupfile): os.unlink(backupfile) def create_whitelisted_appmenus(self, filename): f = open(filename, "w") f.write("gnome-terminal.desktop\n") f.write("nautilus.desktop\n") f.write("firefox.desktop\n") f.write("mozilla-thunderbird.desktop\n") f.write("libreoffice-startcenter.desktop\n") f.close() def create_appmenus(self, dir, template, list): for name in list: f = open(os.path.join(dir, name + ".desktop"), "w") f.write(template.format(name=name, comment=name, command=name)) f.close() def create_private_img(self, filename): self.create_sparse(filename, 2*2**30) subprocess.check_call(["/usr/sbin/mkfs.ext4", "-q", "-F", filename]) def create_volatile_img(self, filename): self.create_sparse(filename, 11.5*2**30) sfdisk_input="0,1024,S\n,10240,L\n" p = subprocess.Popen(["/usr/sbin/sfdisk", "--no-reread", "-u", "M", filename], stdout=open("/dev/null","w"), stderr=subprocess.STDOUT, stdin=subprocess.PIPE) p.communicate(input=sfdisk_input) self.assertEqual(p.returncode, 0, "sfdisk failed with code %d" % p .returncode) # TODO: mkswap def fullpath(self, name): return os.path.join(self.backupdir, name) def create_v1_files(self, r2b2=False): appmenus_list = [ "firefox", "gnome-terminal", "evince", "evolution", "mozilla-thunderbird", "libreoffice-startcenter", "nautilus", "gedit", "gpk-update-viewer", "gpk-application" ] os.mkdir(self.fullpath("appvms")) os.mkdir(self.fullpath("servicevms")) os.mkdir(self.fullpath("vm-templates")) # normal AppVM os.mkdir(self.fullpath("appvms/test-work")) self.create_whitelisted_appmenus(self.fullpath( "appvms/test-work/whitelisted-appmenus.list")) os.symlink("/usr/share/qubes/icons/green.png", self.fullpath("appvms/test-work/icon.png")) self.create_private_img(self.fullpath("appvms/test-work/private.img")) # StandaloneVM os.mkdir(self.fullpath("appvms/test-standalonevm")) self.create_whitelisted_appmenus(self.fullpath( "appvms/test-standalonevm/whitelisted-appmenus.list")) os.symlink("/usr/share/qubes/icons/blue.png", self.fullpath("appvms/test-standalonevm/icon.png")) self.create_private_img(self.fullpath( "appvms/test-standalonevm/private.img")) self.create_sparse( self.fullpath("appvms/test-standalonevm/root.img"), 10*2**30) self.fill_image(self.fullpath("appvms/test-standalonevm/root.img"), 100*1024*1024, True) os.mkdir(self.fullpath("appvms/test-standalonevm/apps.templates")) self.create_appmenus(self.fullpath("appvms/test-standalonevm/apps" ".templates"), APPTEMPLATE_R2B2, appmenus_list) os.mkdir(self.fullpath("appvms/test-standalonevm/kernels")) for k_file in ["initramfs", "vmlinuz", "modules.img"]: self.fill_image(self.fullpath("appvms/test-standalonevm/kernels/" + k_file), 10*1024*1024) # VM based on custom template subprocess.check_call( ["/bin/cp", "-a", self.fullpath("appvms/test-work"), self.fullpath("appvms/test-custom-template-appvm")]) # HVM if r2b2: subprocess.check_call( ["/bin/cp", "-a", self.fullpath("appvms/test-standalonevm"), self.fullpath("appvms/test-testhvm")]) # ProxyVM os.mkdir(self.fullpath("servicevms/test-testproxy")) self.create_whitelisted_appmenus(self.fullpath( "servicevms/test-testproxy/whitelisted-appmenus.list")) self.create_private_img( self.fullpath("servicevms/test-testproxy/private.img")) # Custom template os.mkdir(self.fullpath("vm-templates/test-template-clone")) self.create_private_img( self.fullpath("vm-templates/test-template-clone/private.img")) self.create_sparse(self.fullpath( "vm-templates/test-template-clone/root-cow.img"), 10*2**30) self.create_sparse(self.fullpath( "vm-templates/test-template-clone/root.img"), 10*2**30) self.fill_image(self.fullpath( "vm-templates/test-template-clone/root.img"), 1*2**30, True) self.create_volatile_img(self.fullpath( "vm-templates/test-template-clone/volatile.img")) subprocess.check_call([ "/bin/tar", "cS", "-f", self.fullpath( "vm-templates/test-template-clone/clean-volatile.img.tar"), "-C", self.fullpath("vm-templates/test-template-clone"), "volatile.img"]) self.create_whitelisted_appmenus(self.fullpath( "vm-templates/test-template-clone/whitelisted-appmenus.list")) self.create_whitelisted_appmenus(self.fullpath( "vm-templates/test-template-clone/vm-whitelisted-appmenus.list")) if r2b2: self.create_whitelisted_appmenus(self.fullpath( "vm-templates/test-template-clone/netvm-whitelisted-appmenus" ".list")) os.symlink("/usr/share/qubes/icons/green.png", self.fullpath("vm-templates/test-template-clone/icon.png")) os.mkdir( self.fullpath("vm-templates/test-template-clone/apps.templates")) self.create_appmenus( self.fullpath("vm-templates/test-template-clone/apps.templates"), APPTEMPLATE_R2B2, appmenus_list) os.mkdir(self.fullpath("vm-templates/test-template-clone/apps")) self.create_appmenus( self.fullpath("vm-templates/test-template-clone/apps"), APPTEMPLATE_R2B2.replace("%VMNAME%", "test-template-clone") .replace("%VMDIR%", self.fullpath( "vm-templates/test-template-clone")), appmenus_list) def test_r2b2(self): self.create_v1_files(r2b2=True) f = open(self.fullpath("qubes.xml"), "w") f.write(QUBESXML_R2B2) f.close() self.restore_backup(self.backupdir, options={ 'use-default-template': True, }) self.reload_qc() self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) self.assertIsNotNone(self.qc.get_vm_by_name("test-testhvm")) self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) self.assertIsNotNone(self.qc.get_vm_by_name( "test-custom-template-appvm")) self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") .template, self.qc.get_vm_by_name("test-template-clone")) def test_r1(self): self.create_v1_files(r2b2=False) f = open(self.fullpath("qubes.xml"), "w") f.write(QUBESXML_R1) f.close() self.restore_backup(self.backupdir, options={ 'use-default-template': True, }) self.reload_qc() self.assertIsNotNone(self.qc.get_vm_by_name("test-template-clone")) self.assertIsNotNone(self.qc.get_vm_by_name("test-testproxy")) self.assertIsNotNone(self.qc.get_vm_by_name("test-work")) self.assertIsNotNone(self.qc.get_vm_by_name("test-standalonevm")) self.assertIsNotNone(self.qc.get_vm_by_name( "test-custom-template-appvm")) self.assertEqual(self.qc.get_vm_by_name("test-custom-template-appvm") .template, self.qc.get_vm_by_name("test-template-clone"))