From 0a35bd06aa12f9dc936ba5bdb5ec84693f671c68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 29 Sep 2016 01:52:31 +0200 Subject: [PATCH] backup: support relocating files to different storage pool To ease all this, rework restore workflow: first create QubesVM objects, and all their files (as for fresh VM), then override them with data from backup - possibly redirecting some files to new location. This allows generic code to create LVM volumes and then only restore its content. --- qubes/backup.py | 462 ++++++++++++++++++++++++++------------ qubes/tests/int/backup.py | 22 ++ 2 files changed, 346 insertions(+), 138 deletions(-) diff --git a/qubes/backup.py b/qubes/backup.py index fb413426..8bbb05e0 100644 --- a/qubes/backup.py +++ b/qubes/backup.py @@ -859,25 +859,48 @@ class ExtractWorker2(Process): def __init__(self, queue, base_dir, passphrase, encrypted, progress_callback, vmproc=None, compressed=False, crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM, - verify_only=False): + verify_only=False, relocate=None): super(ExtractWorker2, self).__init__() + #: queue with files to extract self.queue = queue + #: paths on the queue are relative to this dir self.base_dir = base_dir + #: passphrase to decrypt/authenticate data self.passphrase = passphrase + #: extract those files/directories to alternative locations (truncate, + # but not unlink target beforehand); if specific file is in the map, + # redirect it accordingly, otherwise check if the whole directory is + # there + self.relocate = relocate + #: is the backup encrypted? self.encrypted = encrypted + #: is the backup compressed? self.compressed = compressed + #: what crypto algorithm is used for encryption? self.crypto_algorithm = crypto_algorithm + #: only verify integrity, don't extract anything self.verify_only = verify_only + #: progress self.blocks_backedup = 0 + #: inner tar layer extraction (subprocess.Popen instance) self.tar2_process = None + #: current inner tar archive name self.tar2_current_file = None + #: set size of this file when tar report it on stderr (adjust LVM + # volume size) + self.adjust_output_size = None + #: decompressor subprocess.Popen instance self.decompressor_process = None + #: decryptor subprocess.Popen instance self.decryptor_process = None - + #: callback reporting progress to UI self.progress_callback = progress_callback - + #: process (subprocess.Popen instance) feeding the data into + # extraction tool self.vmproc = vmproc + #: pipe to feed the data into tar (use pipe instead of stdin, + # as stdin is used for tar control commands) self.restore_pipe = os.path.join(self.base_dir, "restore_pipe") self.log = logging.getLogger('qubes.backup.extract') @@ -929,6 +952,24 @@ class ExtractWorker2(Process): self.log.error("ERROR: " + unicode(e)) raise e, None, exc_traceback + def handle_dir_relocations(self, dirname): + ''' Relocate files in given director when it's already extracted + + :param dirname: directory path to handle (relative to backup root), + without trailing slash + ''' + + for old, new in self.relocate: + if not old.startswith(dirname + '/'): + continue + # if directory is relocated too (most likely is), the file + # is extracted there + if dirname in self.relocate: + old = old.replace(dirname, self.relocate[dirname], 1) + subprocess.check_call( + ['dd', 'if='+old, 'of='+new, 'conv=sparse']) + os.unlink(old) + def __run__(self): self.log.debug("Started sending thread") self.log.debug("Moving to dir " + self.base_dir) @@ -954,17 +995,47 @@ class ExtractWorker2(Process): "\n ".join(self.tar2_stderr))) else: # Finished extracting the tar file + self.collect_tar_output() self.tar2_process = None + # if that was whole-directory archive, handle + # relocated files now + inner_name = os.path.relpath( + os.path.splitext(self.tar2_current_file)[0]) + if os.path.basename(inner_name) == '.': + self.handle_dir_relocations( + os.path.dirname(inner_name)) self.tar2_current_file = None - tar2_cmdline = ['tar', - '-%sMkvf' % ("t" if self.verify_only else "x"), - self.restore_pipe, - os.path.relpath(filename.rstrip('.000'))] + inner_name = os.path.relpath(filename.rstrip('.000')) + redirect_stdout = None + if self.relocate and inner_name in self.relocate: + # TODO: add `dd conv=sparse` when removing tar layer + tar2_cmdline = ['tar', + '-%sMOf' % ("t" if self.verify_only else "x"), + self.restore_pipe, + inner_name] + output_file = self.relocate[inner_name] + redirect_stdout = open(output_file, 'w') + elif self.relocate and \ + os.path.dirname(inner_name) in self.relocate: + tar2_cmdline = ['tar', + '-%sMf' % ("t" if self.verify_only else "x"), + self.restore_pipe, + '-C', self.relocate[os.path.dirname(inner_name)], + # strip all directories - leave only final filename + '--strip-components', str(inner_name.count(os.sep)), + inner_name] + + else: + tar2_cmdline = ['tar', + '-%sMkf' % ("t" if self.verify_only else "x"), + self.restore_pipe, + inner_name] + self.log.debug("Running command " + unicode(tar2_cmdline)) self.tar2_process = subprocess.Popen(tar2_cmdline, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE) + stdin=subprocess.PIPE, stderr=subprocess.PIPE, + stdout=redirect_stdout) fcntl.fcntl(self.tar2_process.stderr.fileno(), fcntl.F_SETFL, fcntl.fcntl(self.tar2_process.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) @@ -1066,6 +1137,13 @@ class ExtractWorker2(Process): else: # Finished extracting the tar file self.tar2_process = None + # if that was whole-directory archive, handle + # relocated files now + inner_name = os.path.relpath( + os.path.splitext(self.tar2_current_file)[0]) + if os.path.basename(inner_name) == '.': + self.handle_dir_relocations( + os.path.dirname(inner_name)) self.log.debug("Finished extracting thread") @@ -1074,12 +1152,12 @@ class ExtractWorker3(ExtractWorker2): def __init__(self, queue, base_dir, passphrase, encrypted, progress_callback, vmproc=None, compressed=False, crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM, - compression_filter=None, verify_only=False): + compression_filter=None, verify_only=False, relocate=None): super(ExtractWorker3, self).__init__(queue, base_dir, passphrase, encrypted, progress_callback, vmproc, compressed, crypto_algorithm, - verify_only) + verify_only, relocate) self.compression_filter = compression_filter os.unlink(self.restore_pipe) @@ -1111,11 +1189,37 @@ class ExtractWorker3(ExtractWorker2): else: # Finished extracting the tar file self.tar2_process = None + # if that was whole-directory archive, handle + # relocated files now + inner_name = os.path.relpath( + os.path.splitext(self.tar2_current_file)[0]) + if os.path.basename(inner_name) == '.': + self.handle_dir_relocations( + os.path.dirname(inner_name)) self.tar2_current_file = None - tar2_cmdline = ['tar', - '-%sk' % ("t" if self.verify_only else "x"), - os.path.relpath(filename.rstrip('.000'))] + inner_name = os.path.relpath(filename.rstrip('.000')) + redirect_stdout = None + if self.relocate and inner_name in self.relocate: + # TODO: add dd conv=sparse when removing tar layer + tar2_cmdline = ['tar', + '-%sO' % ("t" if self.verify_only else "x"), + inner_name] + output_file = self.relocate[inner_name] + redirect_stdout = open(output_file, 'w') + elif self.relocate and \ + os.path.dirname(inner_name) in self.relocate: + tar2_cmdline = ['tar', + '-%s' % ("t" if self.verify_only else "x"), + '-C', self.relocate[os.path.dirname(inner_name)], + # strip all directories - leave only final filename + '--strip-components', str(inner_name.count(os.sep)), + inner_name] + else: + tar2_cmdline = ['tar', + '-%sk' % ("t" if self.verify_only else "x"), + inner_name] + if self.compressed: if self.compression_filter: tar2_cmdline.insert(-1, @@ -1140,12 +1244,14 @@ class ExtractWorker3(ExtractWorker2): self.tar2_process = subprocess.Popen( tar2_cmdline, stdin=self.decryptor_process.stdout, + stdout=redirect_stdout, stderr=subprocess.PIPE) input_pipe = self.decryptor_process.stdin else: self.tar2_process = subprocess.Popen( tar2_cmdline, stdin=subprocess.PIPE, + stdout=redirect_stdout, stderr=subprocess.PIPE) input_pipe = self.tar2_process.stdin @@ -1215,6 +1321,13 @@ class ExtractWorker3(ExtractWorker2): else: # Finished extracting the tar file self.tar2_process = None + # if that was whole-directory archive, handle + # relocated files now + inner_name = os.path.relpath( + os.path.splitext(self.tar2_current_file)[0]) + if os.path.basename(inner_name) == '.': + self.handle_dir_relocations( + os.path.dirname(inner_name)) self.log.debug("Finished extracting thread") @@ -1260,6 +1373,8 @@ class BackupRestoreOptions(object): self.rename_conflicting = True #: list of VM names to exclude self.exclude = [] + #: restore VMs into selected storage pool + self.override_pool = None class BackupRestore(object): @@ -1304,6 +1419,7 @@ class BackupRestore(object): self.netvm = None self.name = vm.name self.orig_template = None + self.restored_vm = None @property def good_to_go(self): @@ -1576,7 +1692,7 @@ class BackupRestore(object): ) return header_data - def _start_inner_extraction_worker(self, queue): + def _start_inner_extraction_worker(self, queue, relocate): """Start a worker process, extracting inner layer of bacup archive, extract them to :py:attr:`tmpdir`. End the data by pushing QUEUE_FINISHED or QUEUE_ERROR to the queue. @@ -1596,7 +1712,10 @@ class BackupRestore(object): 'crypto_algorithm': self.header_data.crypto_algorithm, 'verify_only': self.options.verify_only, 'progress_callback': self.progress_callback, + 'relocate': relocate, } + self.log.debug('Starting extraction worker in {}, file relocation ' + 'map: {!r}'.format(self.tmpdir, relocate)) format_version = self.header_data.version if format_version == 2: extract_proc = ExtractWorker2(**extractor_params) @@ -1626,7 +1745,7 @@ class BackupRestore(object): queue.put("qubes.xml.000") queue.put(QUEUE_FINISHED) - extract_proc = self._start_inner_extraction_worker(queue) + extract_proc = self._start_inner_extraction_worker(queue, None) extract_proc.join() if extract_proc.exitcode != 0: raise qubes.exc.QubesException( @@ -1643,7 +1762,7 @@ class BackupRestore(object): os.unlink(os.path.join(self.tmpdir, 'qubes.xml')) return backup_app - def _restore_vm_dirs(self, vms_dirs, vms_size): + def _restore_vm_dirs(self, vms_dirs, vms_size, relocate): # Currently each VM consists of at most 7 archives (count # file_to_backup calls in backup_prepare()), but add some safety # margin for further extensions. Each archive is divided into 100MB @@ -1658,12 +1777,14 @@ class BackupRestore(object): # retrieve backup from the backup stream (either VM, or dom0 file) (retrieve_proc, filelist_pipe, error_pipe) = \ - self._start_retrieval_process(vms_dirs, limit_count, vms_size) + self._start_retrieval_process( + vms_dirs, limit_count, vms_size) to_extract = Queue() # extract data retrieved by retrieve_proc - extract_proc = self._start_inner_extraction_worker(to_extract) + extract_proc = self._start_inner_extraction_worker( + to_extract, relocate) try: filename = None @@ -1721,7 +1842,7 @@ class BackupRestore(object): if retrieve_proc.wait() != 0: raise qubes.exc.QubesException( - "unable to read the qubes backup file {0} ({1}): {2}" + "unable to read the qubes backup file {0}: {1}" .format(self.backup_location, error_pipe.read( MAX_STDERR_BYTES))) # wait for other processes (if any) @@ -2074,25 +2195,57 @@ class BackupRestore(object): "*** Error while copying file {0} to {1}".format(backup_src_dir, dst_dir)) + @staticmethod + def _templates_first(vms): + def key_function(instance): + if isinstance(instance, qubes.vm.BaseVM): + return isinstance(instance, qubes.vm.templatevm.TemplateVM) + elif hasattr(instance, 'vm'): + return key_function(instance.vm) + else: + return 0 + return sorted(vms, + key=key_function, + reverse=True) + def restore_do(self, restore_info): + ''' + + + High level workflow: + 1. Create VMs object in host collection (qubes.xml) + 2. Create them on disk (vm.create_on_disk) + 3. Restore VM data, overriding/converting VM files + 4. Apply possible fixups and save qubes.xml + + :param restore_info: + :return: + ''' + # FIXME handle locking + self._restore_vms_metadata(restore_info) + # Perform VM restoration in backup order vms_dirs = [] + relocate = {} vms_size = 0 - vms = {} - for vm_info in restore_info.values(): - assert isinstance(vm_info, self.VMToRestore) - if not vm_info.vm: - continue - if not vm_info.good_to_go: - continue - vm = vm_info.vm - if self.header_data.version >= 2: - if vm.features['backup-size']: - vms_size += int(vm.features['backup-size']) - vms_dirs.append(vm.features['backup-path']) - vms[vm.name] = vm + for vm_info in self._templates_first(restore_info.values()): + vm = vm_info.restored_vm + if vm: + vms_size += int(vm_info.size) + vms_dirs.append(vm_info.subdir) + relocate[vm_info.subdir.rstrip('/')] = vm.dir_path + for name, volume in vm.volumes.items(): + if not volume.save_on_stop: + continue + export_path = vm.storage.export(name) + backup_path = os.path.join( + vm_info.vm.dir_path, name + '.img') + if backup_path != export_path: + relocate[ + os.path.join(vm_info.subdir, name + '.img')] = \ + export_path if self.header_data.version >= 2: if 'dom0' in restore_info.keys() and \ @@ -2101,7 +2254,8 @@ class BackupRestore(object): vms_size += restore_info['dom0'].size try: - self._restore_vm_dirs(vms_dirs=vms_dirs, vms_size=vms_size) + self._restore_vm_dirs(vms_dirs=vms_dirs, vms_size=vms_size, + relocate=relocate) except qubes.exc.QubesException: if self.options.verify_only: raise @@ -2111,6 +2265,22 @@ class BackupRestore(object): "continuing anyway to restore at least some " "VMs") else: + for vm_info in self._templates_first(restore_info.values()): + vm = vm_info.restored_vm + if vm: + try: + self._restore_vm_dir_v1(vm_info.vm.dir_path, + os.path.dirname(vm.dir_path)) + except qubes.exc.QubesException as e: + if self.options.verify_only: + raise + else: + self.log.error( + "Failed to restore VM '{}': {}".format( + vm.name, str(e))) + vm.remove_from_disk() + del self.app.domains[vm] + if self.options.verify_only: self.log.warning( "Backup verification not supported for this backup format.") @@ -2119,117 +2289,24 @@ class BackupRestore(object): shutil.rmtree(self.tmpdir) return - # First load templates, then other VMs - for vm in sorted(vms.values(), - key=lambda x: isinstance(x, qubes.vm.templatevm.TemplateVM), - reverse=True): - if self.canceled: - # only break the loop to save qubes.xml - # with already restored VMs - break - self.log.info("-> Restoring {0}...".format(vm.name)) - retcode = subprocess.call( - ["mkdir", "-p", os.path.dirname(vm.dir_path)]) - if retcode != 0: - self.log.error("*** Cannot create directory: {0}?!".format( - vm.dir_path)) - self.log.warning("Skipping VM {}...".format(vm.name)) + for vm_info in self._templates_first(restore_info.values()): + if not vm_info.restored_vm: continue - - kwargs = {} - if hasattr(vm, 'template'): - template = restore_info[vm.name].template - # handle potentially renamed template - if template in restore_info \ - and restore_info[template].good_to_go: - template = restore_info[template].name - kwargs['template'] = template - - new_vm = None - vm_name = restore_info[vm.name].name - try: - # first only minimal set, later clone_properties - # will be called - new_vm = self.app.add_new_vm( - vm.__class__, - name=vm_name, - label=vm.label, - installed_by_rpm=False, - **kwargs) - if os.path.exists(new_vm.dir_path): - move_to_path = tempfile.mkdtemp('', os.path.basename( - new_vm.dir_path), os.path.dirname(new_vm.dir_path)) - try: - os.rename(new_vm.dir_path, move_to_path) - self.log.warning( - "*** Directory {} already exists! It has " - "been moved to {}".format(new_vm.dir_path, - move_to_path)) - except OSError: - self.log.error( - "*** Directory {} already exists and " - "cannot be moved!".format(new_vm.dir_path)) - self.log.warning("Skipping VM {}...".format( - vm.name)) - continue - - if self.header_data.version == 1: - self._restore_vm_dir_v1(vm.dir_path, - os.path.dirname(new_vm.dir_path)) - else: - shutil.move(os.path.join(self.tmpdir, - vm.features['backup-path']), - new_vm.dir_path) - - new_vm.storage.verify() - except Exception as err: - self.log.error("ERROR: {0}".format(err)) - self.log.warning("*** Skipping VM: {0}".format(vm.name)) - if new_vm: - del self.app.domains[new_vm.qid] - continue - - # remove no longer needed backup metadata - if 'backup-content' in vm.features: - del vm.features['backup-content'] - del vm.features['backup-size'] - del vm.features['backup-path'] - try: - # exclude VM references - handled manually according to - # restore options - proplist = [prop for prop in new_vm.property_list() - if prop.clone and prop.__name__ not in - ['template', 'netvm', 'dispvm_netvm']] - new_vm.clone_properties(vm, proplist=proplist) - except Exception as err: - self.log.error("ERROR: {0}".format(err)) - self.log.warning("*** Some VM property will not be " - "restored") - - try: - new_vm.fire_event('domain-restore') + vm_info.restored_vm.fire_event('domain-restore') except Exception as err: self.log.error("ERROR during appmenu restore: " - "{0}".format(err)) + "{0}".format(err)) self.log.warning( - "*** VM '{0}' will not have appmenus".format(vm.name)) + "*** VM '{0}' will not have appmenus".format(vm_info.name)) - # Set network dependencies - only non-default netvm setting - for vm in vms.values(): - vm_info = restore_info[vm.name] - vm_name = vm_info.name try: - host_vm = self.app.domains[vm_name] - except KeyError: - # Failed/skipped VM - continue - - if not vm.property_is_default('netvm'): - if vm_info.netvm in restore_info: - host_vm.netvm = restore_info[vm_info.netvm].name - else: - host_vm.netvm = vm_info.netvm + vm_info.restored_vm.storage.verify() + except Exception as err: + self.log.error("ERROR: {0}".format(err)) + if vm_info.restored_vm: + vm_info.restored_vm.remove_from_disk() + del self.app.domains[vm_info.restored_vm] self.app.save() @@ -2279,4 +2356,113 @@ class BackupRestore(object): self.log.info("-> Done. Please install updates for all the restored " "templates.") + def _restore_vms_metadata(self, restore_info): + vms = {} + for vm_info in restore_info.values(): + assert isinstance(vm_info, self.VMToRestore) + if not vm_info.vm: + continue + if not vm_info.good_to_go: + continue + vm = vm_info.vm + vms[vm.name] = vm + + # First load templates, then other VMs + for vm in self._templates_first(vms.values()): + if self.canceled: + # only break the loop to save qubes.xml + # with already restored VMs + break + self.log.info("-> Restoring {0}...".format(vm.name)) + kwargs = {} + if hasattr(vm, 'template'): + template = restore_info[vm.name].template + # handle potentially renamed template + if template in restore_info \ + and restore_info[template].good_to_go: + template = restore_info[template].name + kwargs['template'] = template + + new_vm = None + vm_name = restore_info[vm.name].name + + try: + # first only minimal set, later clone_properties + # will be called + new_vm = self.app.add_new_vm( + vm.__class__, + name=vm_name, + label=vm.label, + installed_by_rpm=False, + **kwargs) + if os.path.exists(new_vm.dir_path): + move_to_path = tempfile.mkdtemp('', os.path.basename( + new_vm.dir_path), os.path.dirname(new_vm.dir_path)) + try: + os.rename(new_vm.dir_path, move_to_path) + self.log.warning( + "*** Directory {} already exists! It has " + "been moved to {}".format(new_vm.dir_path, + move_to_path)) + except OSError: + self.log.error( + "*** Directory {} already exists and " + "cannot be moved!".format(new_vm.dir_path)) + self.log.warning("Skipping VM {}...".format( + vm.name)) + continue + except Exception as err: + self.log.error("ERROR: {0}".format(err)) + self.log.warning("*** Skipping VM: {0}".format(vm.name)) + if new_vm: + del self.app.domains[new_vm.qid] + continue + + # remove no longer needed backup metadata + if 'backup-content' in vm.features: + del vm.features['backup-content'] + del vm.features['backup-size'] + del vm.features['backup-path'] + try: + # exclude VM references - handled manually according to + # restore options + proplist = [prop for prop in new_vm.property_list() + if prop.clone and prop.__name__ not in + ['template', 'netvm', 'dispvm_netvm']] + new_vm.clone_properties(vm, proplist=proplist) + except Exception as err: + self.log.error("ERROR: {0}".format(err)) + self.log.warning("*** Some VM property will not be " + "restored") + + if not self.options.verify_only: + try: + # have it here, to (maybe) patch storage config before + # creating child VMs (template first) + # TODO: adjust volumes config - especially size + new_vm.create_on_disk(pool=self.options.override_pool) + except qubes.exc.QubesException as e: + self.log.warning("Failed to create VM {}: {}".format( + vm.name, str(e))) + del self.app.domains[new_vm] + continue + + restore_info[vm.name].restored_vm = new_vm + + # Set network dependencies - only non-default netvm setting + for vm in vms.values(): + vm_info = restore_info[vm.name] + vm_name = vm_info.name + try: + host_vm = self.app.domains[vm_name] + except KeyError: + # Failed/skipped VM + continue + + if not vm.property_is_default('netvm'): + if vm_info.netvm in restore_info: + host_vm.netvm = restore_info[vm_info.netvm].name + else: + host_vm.netvm = vm_info.netvm + # vim:sw=4:et: diff --git a/qubes/tests/int/backup.py b/qubes/tests/int/backup.py index 18da633e..0e04d034 100644 --- a/qubes/tests/int/backup.py +++ b/qubes/tests/int/backup.py @@ -427,6 +427,28 @@ class TC_00_Backup(BackupTestsMixin, qubes.tests.QubesTestCase): self.assertCorrectlyRestored(vms, orig_hashes) self.remove_vms(reversed(vms)) + @qubes.tests.storage_lvm.skipUnlessLvmPoolExists + def test_301_restore_to_lvm(self): + volume_group, thin_pool = \ + qubes.tests.storage_lvm.DEFAULT_LVM_POOL.split('/', 1) + self.pool = self._find_pool(volume_group, thin_pool) + if not self.pool: + self.pool = self.app.add_pool( + **qubes.tests.storage_lvm.POOL_CONF) + self.created_pool = True + vms = self.create_backup_vms() + orig_hashes = self.vm_checksum(vms) + self.make_backup(vms) + self.remove_vms(reversed(vms)) + self.restore_backup(options={'override_pool': self.pool.name}) + self.assertCorrectlyRestored(vms, orig_hashes) + for vm in vms: + vm = self.app.domains[vm.name] + for volume in vm.volumes.values(): + if volume.save_on_stop: + self.assertEqual(volume.pool, self.pool.name) + self.remove_vms(reversed(vms)) + class TC_10_BackupVMMixin(BackupTestsMixin): def setUp(self):