backup: support relocating files to different storage pool

To ease all this, rework restore workflow: first create QubesVM objects,
and all their files (as for fresh VM), then override them with data
from backup - possibly redirecting some files to new location. This
allows generic code to create LVM volumes and then only restore its
content.
This commit is contained in:
Marek Marczykowski-Górecki 2016-09-29 01:52:31 +02:00
parent 4d45dd5549
commit 0a35bd06aa
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 346 additions and 138 deletions

View File

@ -859,25 +859,48 @@ class ExtractWorker2(Process):
def __init__(self, queue, base_dir, passphrase, encrypted,
progress_callback, vmproc=None,
compressed=False, crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM,
verify_only=False):
verify_only=False, relocate=None):
super(ExtractWorker2, self).__init__()
#: queue with files to extract
self.queue = queue
#: paths on the queue are relative to this dir
self.base_dir = base_dir
#: passphrase to decrypt/authenticate data
self.passphrase = passphrase
#: extract those files/directories to alternative locations (truncate,
# but not unlink target beforehand); if specific file is in the map,
# redirect it accordingly, otherwise check if the whole directory is
# there
self.relocate = relocate
#: is the backup encrypted?
self.encrypted = encrypted
#: is the backup compressed?
self.compressed = compressed
#: what crypto algorithm is used for encryption?
self.crypto_algorithm = crypto_algorithm
#: only verify integrity, don't extract anything
self.verify_only = verify_only
#: progress
self.blocks_backedup = 0
#: inner tar layer extraction (subprocess.Popen instance)
self.tar2_process = None
#: current inner tar archive name
self.tar2_current_file = None
#: set size of this file when tar report it on stderr (adjust LVM
# volume size)
self.adjust_output_size = None
#: decompressor subprocess.Popen instance
self.decompressor_process = None
#: decryptor subprocess.Popen instance
self.decryptor_process = None
#: callback reporting progress to UI
self.progress_callback = progress_callback
#: process (subprocess.Popen instance) feeding the data into
# extraction tool
self.vmproc = vmproc
#: pipe to feed the data into tar (use pipe instead of stdin,
# as stdin is used for tar control commands)
self.restore_pipe = os.path.join(self.base_dir, "restore_pipe")
self.log = logging.getLogger('qubes.backup.extract')
@ -929,6 +952,24 @@ class ExtractWorker2(Process):
self.log.error("ERROR: " + unicode(e))
raise e, None, exc_traceback
def handle_dir_relocations(self, dirname):
''' Relocate files in given director when it's already extracted
:param dirname: directory path to handle (relative to backup root),
without trailing slash
'''
for old, new in self.relocate:
if not old.startswith(dirname + '/'):
continue
# if directory is relocated too (most likely is), the file
# is extracted there
if dirname in self.relocate:
old = old.replace(dirname, self.relocate[dirname], 1)
subprocess.check_call(
['dd', 'if='+old, 'of='+new, 'conv=sparse'])
os.unlink(old)
def __run__(self):
self.log.debug("Started sending thread")
self.log.debug("Moving to dir " + self.base_dir)
@ -954,17 +995,47 @@ class ExtractWorker2(Process):
"\n ".join(self.tar2_stderr)))
else:
# Finished extracting the tar file
self.collect_tar_output()
self.tar2_process = None
# if that was whole-directory archive, handle
# relocated files now
inner_name = os.path.relpath(
os.path.splitext(self.tar2_current_file)[0])
if os.path.basename(inner_name) == '.':
self.handle_dir_relocations(
os.path.dirname(inner_name))
self.tar2_current_file = None
tar2_cmdline = ['tar',
'-%sMkvf' % ("t" if self.verify_only else "x"),
self.restore_pipe,
os.path.relpath(filename.rstrip('.000'))]
inner_name = os.path.relpath(filename.rstrip('.000'))
redirect_stdout = None
if self.relocate and inner_name in self.relocate:
# TODO: add `dd conv=sparse` when removing tar layer
tar2_cmdline = ['tar',
'-%sMOf' % ("t" if self.verify_only else "x"),
self.restore_pipe,
inner_name]
output_file = self.relocate[inner_name]
redirect_stdout = open(output_file, 'w')
elif self.relocate and \
os.path.dirname(inner_name) in self.relocate:
tar2_cmdline = ['tar',
'-%sMf' % ("t" if self.verify_only else "x"),
self.restore_pipe,
'-C', self.relocate[os.path.dirname(inner_name)],
# strip all directories - leave only final filename
'--strip-components', str(inner_name.count(os.sep)),
inner_name]
else:
tar2_cmdline = ['tar',
'-%sMkf' % ("t" if self.verify_only else "x"),
self.restore_pipe,
inner_name]
self.log.debug("Running command " + unicode(tar2_cmdline))
self.tar2_process = subprocess.Popen(tar2_cmdline,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
stdin=subprocess.PIPE, stderr=subprocess.PIPE,
stdout=redirect_stdout)
fcntl.fcntl(self.tar2_process.stderr.fileno(), fcntl.F_SETFL,
fcntl.fcntl(self.tar2_process.stderr.fileno(),
fcntl.F_GETFL) | os.O_NONBLOCK)
@ -1066,6 +1137,13 @@ class ExtractWorker2(Process):
else:
# Finished extracting the tar file
self.tar2_process = None
# if that was whole-directory archive, handle
# relocated files now
inner_name = os.path.relpath(
os.path.splitext(self.tar2_current_file)[0])
if os.path.basename(inner_name) == '.':
self.handle_dir_relocations(
os.path.dirname(inner_name))
self.log.debug("Finished extracting thread")
@ -1074,12 +1152,12 @@ class ExtractWorker3(ExtractWorker2):
def __init__(self, queue, base_dir, passphrase, encrypted,
progress_callback, vmproc=None,
compressed=False, crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM,
compression_filter=None, verify_only=False):
compression_filter=None, verify_only=False, relocate=None):
super(ExtractWorker3, self).__init__(queue, base_dir, passphrase,
encrypted,
progress_callback, vmproc,
compressed, crypto_algorithm,
verify_only)
verify_only, relocate)
self.compression_filter = compression_filter
os.unlink(self.restore_pipe)
@ -1111,11 +1189,37 @@ class ExtractWorker3(ExtractWorker2):
else:
# Finished extracting the tar file
self.tar2_process = None
# if that was whole-directory archive, handle
# relocated files now
inner_name = os.path.relpath(
os.path.splitext(self.tar2_current_file)[0])
if os.path.basename(inner_name) == '.':
self.handle_dir_relocations(
os.path.dirname(inner_name))
self.tar2_current_file = None
tar2_cmdline = ['tar',
'-%sk' % ("t" if self.verify_only else "x"),
os.path.relpath(filename.rstrip('.000'))]
inner_name = os.path.relpath(filename.rstrip('.000'))
redirect_stdout = None
if self.relocate and inner_name in self.relocate:
# TODO: add dd conv=sparse when removing tar layer
tar2_cmdline = ['tar',
'-%sO' % ("t" if self.verify_only else "x"),
inner_name]
output_file = self.relocate[inner_name]
redirect_stdout = open(output_file, 'w')
elif self.relocate and \
os.path.dirname(inner_name) in self.relocate:
tar2_cmdline = ['tar',
'-%s' % ("t" if self.verify_only else "x"),
'-C', self.relocate[os.path.dirname(inner_name)],
# strip all directories - leave only final filename
'--strip-components', str(inner_name.count(os.sep)),
inner_name]
else:
tar2_cmdline = ['tar',
'-%sk' % ("t" if self.verify_only else "x"),
inner_name]
if self.compressed:
if self.compression_filter:
tar2_cmdline.insert(-1,
@ -1140,12 +1244,14 @@ class ExtractWorker3(ExtractWorker2):
self.tar2_process = subprocess.Popen(
tar2_cmdline,
stdin=self.decryptor_process.stdout,
stdout=redirect_stdout,
stderr=subprocess.PIPE)
input_pipe = self.decryptor_process.stdin
else:
self.tar2_process = subprocess.Popen(
tar2_cmdline,
stdin=subprocess.PIPE,
stdout=redirect_stdout,
stderr=subprocess.PIPE)
input_pipe = self.tar2_process.stdin
@ -1215,6 +1321,13 @@ class ExtractWorker3(ExtractWorker2):
else:
# Finished extracting the tar file
self.tar2_process = None
# if that was whole-directory archive, handle
# relocated files now
inner_name = os.path.relpath(
os.path.splitext(self.tar2_current_file)[0])
if os.path.basename(inner_name) == '.':
self.handle_dir_relocations(
os.path.dirname(inner_name))
self.log.debug("Finished extracting thread")
@ -1260,6 +1373,8 @@ class BackupRestoreOptions(object):
self.rename_conflicting = True
#: list of VM names to exclude
self.exclude = []
#: restore VMs into selected storage pool
self.override_pool = None
class BackupRestore(object):
@ -1304,6 +1419,7 @@ class BackupRestore(object):
self.netvm = None
self.name = vm.name
self.orig_template = None
self.restored_vm = None
@property
def good_to_go(self):
@ -1576,7 +1692,7 @@ class BackupRestore(object):
)
return header_data
def _start_inner_extraction_worker(self, queue):
def _start_inner_extraction_worker(self, queue, relocate):
"""Start a worker process, extracting inner layer of bacup archive,
extract them to :py:attr:`tmpdir`.
End the data by pushing QUEUE_FINISHED or QUEUE_ERROR to the queue.
@ -1596,7 +1712,10 @@ class BackupRestore(object):
'crypto_algorithm': self.header_data.crypto_algorithm,
'verify_only': self.options.verify_only,
'progress_callback': self.progress_callback,
'relocate': relocate,
}
self.log.debug('Starting extraction worker in {}, file relocation '
'map: {!r}'.format(self.tmpdir, relocate))
format_version = self.header_data.version
if format_version == 2:
extract_proc = ExtractWorker2(**extractor_params)
@ -1626,7 +1745,7 @@ class BackupRestore(object):
queue.put("qubes.xml.000")
queue.put(QUEUE_FINISHED)
extract_proc = self._start_inner_extraction_worker(queue)
extract_proc = self._start_inner_extraction_worker(queue, None)
extract_proc.join()
if extract_proc.exitcode != 0:
raise qubes.exc.QubesException(
@ -1643,7 +1762,7 @@ class BackupRestore(object):
os.unlink(os.path.join(self.tmpdir, 'qubes.xml'))
return backup_app
def _restore_vm_dirs(self, vms_dirs, vms_size):
def _restore_vm_dirs(self, vms_dirs, vms_size, relocate):
# Currently each VM consists of at most 7 archives (count
# file_to_backup calls in backup_prepare()), but add some safety
# margin for further extensions. Each archive is divided into 100MB
@ -1658,12 +1777,14 @@ class BackupRestore(object):
# retrieve backup from the backup stream (either VM, or dom0 file)
(retrieve_proc, filelist_pipe, error_pipe) = \
self._start_retrieval_process(vms_dirs, limit_count, vms_size)
self._start_retrieval_process(
vms_dirs, limit_count, vms_size)
to_extract = Queue()
# extract data retrieved by retrieve_proc
extract_proc = self._start_inner_extraction_worker(to_extract)
extract_proc = self._start_inner_extraction_worker(
to_extract, relocate)
try:
filename = None
@ -1721,7 +1842,7 @@ class BackupRestore(object):
if retrieve_proc.wait() != 0:
raise qubes.exc.QubesException(
"unable to read the qubes backup file {0} ({1}): {2}"
"unable to read the qubes backup file {0}: {1}"
.format(self.backup_location, error_pipe.read(
MAX_STDERR_BYTES)))
# wait for other processes (if any)
@ -2074,25 +2195,57 @@ class BackupRestore(object):
"*** Error while copying file {0} to {1}".format(backup_src_dir,
dst_dir))
@staticmethod
def _templates_first(vms):
def key_function(instance):
if isinstance(instance, qubes.vm.BaseVM):
return isinstance(instance, qubes.vm.templatevm.TemplateVM)
elif hasattr(instance, 'vm'):
return key_function(instance.vm)
else:
return 0
return sorted(vms,
key=key_function,
reverse=True)
def restore_do(self, restore_info):
'''
High level workflow:
1. Create VMs object in host collection (qubes.xml)
2. Create them on disk (vm.create_on_disk)
3. Restore VM data, overriding/converting VM files
4. Apply possible fixups and save qubes.xml
:param restore_info:
:return:
'''
# FIXME handle locking
self._restore_vms_metadata(restore_info)
# Perform VM restoration in backup order
vms_dirs = []
relocate = {}
vms_size = 0
vms = {}
for vm_info in restore_info.values():
assert isinstance(vm_info, self.VMToRestore)
if not vm_info.vm:
continue
if not vm_info.good_to_go:
continue
vm = vm_info.vm
if self.header_data.version >= 2:
if vm.features['backup-size']:
vms_size += int(vm.features['backup-size'])
vms_dirs.append(vm.features['backup-path'])
vms[vm.name] = vm
for vm_info in self._templates_first(restore_info.values()):
vm = vm_info.restored_vm
if vm:
vms_size += int(vm_info.size)
vms_dirs.append(vm_info.subdir)
relocate[vm_info.subdir.rstrip('/')] = vm.dir_path
for name, volume in vm.volumes.items():
if not volume.save_on_stop:
continue
export_path = vm.storage.export(name)
backup_path = os.path.join(
vm_info.vm.dir_path, name + '.img')
if backup_path != export_path:
relocate[
os.path.join(vm_info.subdir, name + '.img')] = \
export_path
if self.header_data.version >= 2:
if 'dom0' in restore_info.keys() and \
@ -2101,7 +2254,8 @@ class BackupRestore(object):
vms_size += restore_info['dom0'].size
try:
self._restore_vm_dirs(vms_dirs=vms_dirs, vms_size=vms_size)
self._restore_vm_dirs(vms_dirs=vms_dirs, vms_size=vms_size,
relocate=relocate)
except qubes.exc.QubesException:
if self.options.verify_only:
raise
@ -2111,6 +2265,22 @@ class BackupRestore(object):
"continuing anyway to restore at least some "
"VMs")
else:
for vm_info in self._templates_first(restore_info.values()):
vm = vm_info.restored_vm
if vm:
try:
self._restore_vm_dir_v1(vm_info.vm.dir_path,
os.path.dirname(vm.dir_path))
except qubes.exc.QubesException as e:
if self.options.verify_only:
raise
else:
self.log.error(
"Failed to restore VM '{}': {}".format(
vm.name, str(e)))
vm.remove_from_disk()
del self.app.domains[vm]
if self.options.verify_only:
self.log.warning(
"Backup verification not supported for this backup format.")
@ -2119,117 +2289,24 @@ class BackupRestore(object):
shutil.rmtree(self.tmpdir)
return
# First load templates, then other VMs
for vm in sorted(vms.values(),
key=lambda x: isinstance(x, qubes.vm.templatevm.TemplateVM),
reverse=True):
if self.canceled:
# only break the loop to save qubes.xml
# with already restored VMs
break
self.log.info("-> Restoring {0}...".format(vm.name))
retcode = subprocess.call(
["mkdir", "-p", os.path.dirname(vm.dir_path)])
if retcode != 0:
self.log.error("*** Cannot create directory: {0}?!".format(
vm.dir_path))
self.log.warning("Skipping VM {}...".format(vm.name))
for vm_info in self._templates_first(restore_info.values()):
if not vm_info.restored_vm:
continue
kwargs = {}
if hasattr(vm, 'template'):
template = restore_info[vm.name].template
# handle potentially renamed template
if template in restore_info \
and restore_info[template].good_to_go:
template = restore_info[template].name
kwargs['template'] = template
new_vm = None
vm_name = restore_info[vm.name].name
try:
# first only minimal set, later clone_properties
# will be called
new_vm = self.app.add_new_vm(
vm.__class__,
name=vm_name,
label=vm.label,
installed_by_rpm=False,
**kwargs)
if os.path.exists(new_vm.dir_path):
move_to_path = tempfile.mkdtemp('', os.path.basename(
new_vm.dir_path), os.path.dirname(new_vm.dir_path))
try:
os.rename(new_vm.dir_path, move_to_path)
self.log.warning(
"*** Directory {} already exists! It has "
"been moved to {}".format(new_vm.dir_path,
move_to_path))
except OSError:
self.log.error(
"*** Directory {} already exists and "
"cannot be moved!".format(new_vm.dir_path))
self.log.warning("Skipping VM {}...".format(
vm.name))
continue
if self.header_data.version == 1:
self._restore_vm_dir_v1(vm.dir_path,
os.path.dirname(new_vm.dir_path))
else:
shutil.move(os.path.join(self.tmpdir,
vm.features['backup-path']),
new_vm.dir_path)
new_vm.storage.verify()
except Exception as err:
self.log.error("ERROR: {0}".format(err))
self.log.warning("*** Skipping VM: {0}".format(vm.name))
if new_vm:
del self.app.domains[new_vm.qid]
continue
# remove no longer needed backup metadata
if 'backup-content' in vm.features:
del vm.features['backup-content']
del vm.features['backup-size']
del vm.features['backup-path']
try:
# exclude VM references - handled manually according to
# restore options
proplist = [prop for prop in new_vm.property_list()
if prop.clone and prop.__name__ not in
['template', 'netvm', 'dispvm_netvm']]
new_vm.clone_properties(vm, proplist=proplist)
except Exception as err:
self.log.error("ERROR: {0}".format(err))
self.log.warning("*** Some VM property will not be "
"restored")
try:
new_vm.fire_event('domain-restore')
vm_info.restored_vm.fire_event('domain-restore')
except Exception as err:
self.log.error("ERROR during appmenu restore: "
"{0}".format(err))
"{0}".format(err))
self.log.warning(
"*** VM '{0}' will not have appmenus".format(vm.name))
"*** VM '{0}' will not have appmenus".format(vm_info.name))
# Set network dependencies - only non-default netvm setting
for vm in vms.values():
vm_info = restore_info[vm.name]
vm_name = vm_info.name
try:
host_vm = self.app.domains[vm_name]
except KeyError:
# Failed/skipped VM
continue
if not vm.property_is_default('netvm'):
if vm_info.netvm in restore_info:
host_vm.netvm = restore_info[vm_info.netvm].name
else:
host_vm.netvm = vm_info.netvm
vm_info.restored_vm.storage.verify()
except Exception as err:
self.log.error("ERROR: {0}".format(err))
if vm_info.restored_vm:
vm_info.restored_vm.remove_from_disk()
del self.app.domains[vm_info.restored_vm]
self.app.save()
@ -2279,4 +2356,113 @@ class BackupRestore(object):
self.log.info("-> Done. Please install updates for all the restored "
"templates.")
def _restore_vms_metadata(self, restore_info):
vms = {}
for vm_info in restore_info.values():
assert isinstance(vm_info, self.VMToRestore)
if not vm_info.vm:
continue
if not vm_info.good_to_go:
continue
vm = vm_info.vm
vms[vm.name] = vm
# First load templates, then other VMs
for vm in self._templates_first(vms.values()):
if self.canceled:
# only break the loop to save qubes.xml
# with already restored VMs
break
self.log.info("-> Restoring {0}...".format(vm.name))
kwargs = {}
if hasattr(vm, 'template'):
template = restore_info[vm.name].template
# handle potentially renamed template
if template in restore_info \
and restore_info[template].good_to_go:
template = restore_info[template].name
kwargs['template'] = template
new_vm = None
vm_name = restore_info[vm.name].name
try:
# first only minimal set, later clone_properties
# will be called
new_vm = self.app.add_new_vm(
vm.__class__,
name=vm_name,
label=vm.label,
installed_by_rpm=False,
**kwargs)
if os.path.exists(new_vm.dir_path):
move_to_path = tempfile.mkdtemp('', os.path.basename(
new_vm.dir_path), os.path.dirname(new_vm.dir_path))
try:
os.rename(new_vm.dir_path, move_to_path)
self.log.warning(
"*** Directory {} already exists! It has "
"been moved to {}".format(new_vm.dir_path,
move_to_path))
except OSError:
self.log.error(
"*** Directory {} already exists and "
"cannot be moved!".format(new_vm.dir_path))
self.log.warning("Skipping VM {}...".format(
vm.name))
continue
except Exception as err:
self.log.error("ERROR: {0}".format(err))
self.log.warning("*** Skipping VM: {0}".format(vm.name))
if new_vm:
del self.app.domains[new_vm.qid]
continue
# remove no longer needed backup metadata
if 'backup-content' in vm.features:
del vm.features['backup-content']
del vm.features['backup-size']
del vm.features['backup-path']
try:
# exclude VM references - handled manually according to
# restore options
proplist = [prop for prop in new_vm.property_list()
if prop.clone and prop.__name__ not in
['template', 'netvm', 'dispvm_netvm']]
new_vm.clone_properties(vm, proplist=proplist)
except Exception as err:
self.log.error("ERROR: {0}".format(err))
self.log.warning("*** Some VM property will not be "
"restored")
if not self.options.verify_only:
try:
# have it here, to (maybe) patch storage config before
# creating child VMs (template first)
# TODO: adjust volumes config - especially size
new_vm.create_on_disk(pool=self.options.override_pool)
except qubes.exc.QubesException as e:
self.log.warning("Failed to create VM {}: {}".format(
vm.name, str(e)))
del self.app.domains[new_vm]
continue
restore_info[vm.name].restored_vm = new_vm
# Set network dependencies - only non-default netvm setting
for vm in vms.values():
vm_info = restore_info[vm.name]
vm_name = vm_info.name
try:
host_vm = self.app.domains[vm_name]
except KeyError:
# Failed/skipped VM
continue
if not vm.property_is_default('netvm'):
if vm_info.netvm in restore_info:
host_vm.netvm = restore_info[vm_info.netvm].name
else:
host_vm.netvm = vm_info.netvm
# vim:sw=4:et:

View File

@ -427,6 +427,28 @@ class TC_00_Backup(BackupTestsMixin, qubes.tests.QubesTestCase):
self.assertCorrectlyRestored(vms, orig_hashes)
self.remove_vms(reversed(vms))
@qubes.tests.storage_lvm.skipUnlessLvmPoolExists
def test_301_restore_to_lvm(self):
volume_group, thin_pool = \
qubes.tests.storage_lvm.DEFAULT_LVM_POOL.split('/', 1)
self.pool = self._find_pool(volume_group, thin_pool)
if not self.pool:
self.pool = self.app.add_pool(
**qubes.tests.storage_lvm.POOL_CONF)
self.created_pool = True
vms = self.create_backup_vms()
orig_hashes = self.vm_checksum(vms)
self.make_backup(vms)
self.remove_vms(reversed(vms))
self.restore_backup(options={'override_pool': self.pool.name})
self.assertCorrectlyRestored(vms, orig_hashes)
for vm in vms:
vm = self.app.domains[vm.name]
for volume in vm.volumes.values():
if volume.save_on_stop:
self.assertEqual(volume.pool, self.pool.name)
self.remove_vms(reversed(vms))
class TC_10_BackupVMMixin(BackupTestsMixin):
def setUp(self):