backups: do not use tar multi-volume feature, backup format 3 (#902)

Tar multi-volume support is broken when used with sparse files[1], so do
not use it. Instead simply cut the archive manually and concatenate at
restore time. This change require a little modification in restore
process, so make this new backup format ("3"). Also add backup format
version to the header, instead of some guessing code.
For now only cleartext and encrypted backups implemented, compression
will come as a separate commit.
This commit is contained in:
Marek Marczykowski-Górecki 2014-09-26 03:24:19 +02:00
parent 2c3159c7f9
commit fc0c0adff8

View File

@ -44,6 +44,7 @@ BACKUP_DEBUG = False
HEADER_FILENAME = 'backup-header'
DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
DEFAULT_HMAC_ALGORITHM = 'SHA512'
CURRENT_BACKUP_FORMAT_VERSION = '3'
# Maximum size of error message get from process stderr (including VM process)
MAX_STDERR_BYTES = 1024
# header + qubes.xml max size
@ -64,11 +65,13 @@ class BackupCanceledError(QubesException):
self.tmpdir = tmpdir
class BackupHeader:
version = 'version'
encrypted = 'encrypted'
compressed = 'compressed'
crypto_algorithm = 'crypto-algorithm'
hmac_algorithm = 'hmac-algorithm'
bool_options = ['encrypted', 'compressed']
int_options = ['version']
def file_to_backup (file_path, subdir = None):
@ -391,6 +394,8 @@ def prepare_backup_header(target_directory, passphrase, compressed=False,
crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM):
header_file_path = os.path.join(target_directory, HEADER_FILENAME)
with open(header_file_path, "w") as f:
f.write(str("%s=%s\n" % (BackupHeader.version,
CURRENT_BACKUP_FORMAT_VERSION)))
f.write(str("%s=%s\n" % (BackupHeader.hmac_algorithm, hmac_algorithm)))
f.write(str("%s=%s\n" % (BackupHeader.crypto_algorithm,
crypto_algorithm)))
@ -513,7 +518,6 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
# Prefix the path in archive with filename["subdir"] to have it verified during untar
tar_cmdline = ["tar", "-Pc", '--sparse',
"-f", backup_pipe,
'--tape-length', str(100000),
'-C', os.path.dirname(filename["path"]),
'--xform', 's:^%s:%s\\0:' % (
os.path.basename(filename["path"]),
@ -535,8 +539,18 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
# subprocesses
i = 0
run_error = "paused"
while run_error == "paused":
encryptor = None
if encrypted:
# Start encrypt
# If no cipher is provided, the data is forwarded unencrypted !!!
encryptor = subprocess.Popen (["openssl", "enc",
"-e", "-" + crypto_algorithm,
"-pass", "pass:"+passphrase],
stdin=open(backup_pipe,'rb'), stdout=subprocess.PIPE)
pipe = encryptor.stdout
else:
pipe = open(backup_pipe,'rb')
while run_error == "paused":
# Start HMAC
hmac = subprocess.Popen (["openssl", "dgst",
@ -555,29 +569,11 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
'vmproc': vmproc,
'addproc': tar_sparse,
'progress_callback': compute_progress,
'size_limit': 100 * 1024 * 1024,
}
if encrypted:
# Start encrypt
# If no cipher is provided, the data is forwarded unencrypted !!!
encryptor = subprocess.Popen (["openssl", "enc",
"-e", "-" + crypto_algorithm,
"-pass", "pass:"+passphrase] +
(["-z"] if compressed else []),
stdin=pipe, stdout=subprocess.PIPE)
run_error = wait_backup_feedback(
in_stream=encryptor.stdout, streamproc=encryptor,
**common_args)
elif compressed:
compressor = subprocess.Popen (["gzip"],
stdin=pipe, stdout=subprocess.PIPE)
run_error = wait_backup_feedback(
in_stream=compressor.stdout, streamproc=compressor,
**common_args)
else:
run_error = wait_backup_feedback(
in_stream=pipe, streamproc=None,
**common_args)
run_error = wait_backup_feedback(
in_stream=pipe, streamproc=encryptor,
**common_args)
chunkfile_p.close()
if BACKUP_DEBUG:
@ -599,7 +595,7 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
shutil.rmtree(backup_tmpdir)
running_backup_operation = None
raise BackupCanceledError("Backup canceled")
if len(run_error) > 0:
if run_error and run_error != "size_limit":
send_proc.terminate()
if run_error == "VM" and vmproc:
raise QubesException("Failed to write the backup, VM output:\n" +
@ -626,24 +622,19 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
hmac_file.flush()
hmac_file.close()
pipe.close()
# Send the HMAC to the backup target
to_send.put(os.path.relpath(chunkfile, backup_tmpdir)+".hmac")
if tar_sparse.poll() is None:
# Release the next chunk
if BACKUP_DEBUG:
print "Release next chunk for process:", tar_sparse.poll()
#tar_sparse.stdout = subprocess.PIPE
tar_sparse.stdin.write("\n")
tar_sparse.stdin.flush()
if tar_sparse.poll() is None or run_error == "size_limit":
run_error="paused"
else:
running_backup_operation.processes_to_kill_on_cancel.remove(
tar_sparse)
if BACKUP_DEBUG:
print "Finished tar sparse with error", tar_sparse.poll()
print "Finished tar sparse with exit code", tar_sparse\
.poll()
pipe.close()
to_send.put("FINISHED")
send_proc.join()
@ -689,20 +680,24 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
' (vmproc termination is controlled by the python script)
' - streamproc does not delivers any data anymore (return with the error
' "")
' - size_limit is provided and is about to be exceeded
'''
def wait_backup_feedback(progress_callback, in_stream, streamproc,
backup_target, total_backup_sz, hmac=None, vmproc=None, addproc=None,
remove_trailing_bytes=0):
remove_trailing_bytes=0, size_limit=None):
buffer_size = 409600
run_error = None
run_count = 1
blocks_backedup = 0
bytes_copied = 0
while run_count > 0 and run_error == None:
if size_limit and bytes_copied + buffer_size > size_limit:
return "size_limit"
buffer = in_stream.read(buffer_size)
progress_callback(len(buffer), total_backup_sz)
bytes_copied += len(buffer)
run_count = 0
if hmac:
@ -795,12 +790,12 @@ def verify_hmac(filename, hmacfile, passphrase, algorithm):
return False
class ExtractWorker(Process):
class ExtractWorker2(Process):
def __init__(self, queue, base_dir, passphrase, encrypted, total_size,
print_callback, error_callback, progress_callback, vmproc=None,
compressed = False, crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM,
verify_only=False):
super(ExtractWorker, self).__init__()
super(ExtractWorker2, self).__init__()
self.queue = queue
self.base_dir = base_dir
self.passphrase = passphrase
@ -1023,6 +1018,153 @@ class ExtractWorker(Process):
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Finished extracting thread")
class ExtractWorker3(ExtractWorker2):
def __init__(self, queue, base_dir, passphrase, encrypted, total_size,
print_callback, error_callback, progress_callback, vmproc=None,
compressed=False, crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM,
verify_only=False):
super(ExtractWorker3, self).__init__(queue, base_dir, passphrase,
encrypted, total_size,
print_callback, error_callback,
progress_callback, vmproc,
compressed, crypto_algorithm,
verify_only)
os.unlink(self.restore_pipe)
def __run__(self):
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Started sending thread")
self.print_callback("Moving to dir "+self.base_dir)
os.chdir(self.base_dir)
filename = None
input_pipe = None
for filename in iter(self.queue.get, None):
if filename == "FINISHED" or filename == "ERROR":
break
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Extracting file "+filename)
if filename.endswith('.000'):
# next file
if self.tar2_process != None:
input_pipe.close()
if self.tar2_process.wait() != 0:
self.collect_tar_output()
self.error_callback(
"ERROR: unable to extract files for {0}, tar "
"output:\n {1}".\
format(self.tar2_current_file,
"\n ".join(self.tar2_stderr)))
else:
# Finished extracting the tar file
self.tar2_process = None
self.tar2_current_file = None
tar2_cmdline = ['tar',
'-%sk%s' % ("t" if self.verify_only else "x",
"v" if BACKUP_DEBUG else ""),
os.path.relpath(filename.rstrip('.000'))]
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Running command "+
unicode(tar2_cmdline))
if self.encrypted:
# Start decrypt
self.decryptor_process = subprocess.Popen (["openssl", "enc",
"-d", "-" + self.crypto_algorithm,
"-pass", "pass:"+self.passphrase],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
self.tar2_process = subprocess.Popen(tar2_cmdline,
stdin=self.decryptor_process.stdout,
stderr=subprocess.PIPE)
input_pipe = self.decryptor_process.stdin
else:
self.tar2_process = subprocess.Popen(tar2_cmdline,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
input_pipe = self.tar2_process.stdin
fcntl.fcntl(self.tar2_process.stderr.fileno(), fcntl.F_SETFL,
fcntl.fcntl(self.tar2_process.stderr.fileno(),
fcntl.F_GETFL) | os.O_NONBLOCK)
self.tar2_stderr = []
elif not self.tar2_process:
# Extracting of the current archive failed, skip to the next
# archive
if not BACKUP_DEBUG:
os.remove(filename)
continue
else:
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Releasing next chunck")
self.tar2_current_file = filename
common_args = {
'backup_target': input_pipe,
'total_backup_sz': self.total_size,
'hmac': None,
'vmproc': self.vmproc,
'addproc': self.tar2_process
}
run_error = wait_backup_feedback(
progress_callback=self.compute_progress,
in_stream=open(filename,"rb"), streamproc=None,
**common_args)
if len(run_error):
if run_error == "target":
self.collect_tar_output()
details = "\n".join(self.tar2_stderr)
else:
details = "%s failed" % run_error
if self.decryptor_process:
self.decryptor_process.terminate()
self.decryptor_process.wait()
self.decryptor_process = None
self.tar2_process.terminate()
self.tar2_process.wait()
self.tar2_process = None
self.error_callback("Error while processing '%s': %s " % \
(self.tar2_current_file, details))
# Delete the file as we don't need it anymore
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Removing file "+filename)
os.remove(filename)
if self.tar2_process is not None:
input_pipe.close()
if filename == "ERROR":
if self.decryptor_process:
self.decryptor_process.terminate()
self.decryptor_process.wait()
self.decryptor_process = None
self.tar2_process.terminate()
self.tar2_process.wait()
elif self.tar2_process.wait() != 0:
self.collect_tar_output()
raise QubesException(
"unable to extract files for {0}.{1} Tar command "
"output: %s".
format(self.tar2_current_file,
(" Perhaps the backup is encrypted?"
if not self.encrypted else "",
"\n".join(self.tar2_stderr))))
else:
# Finished extracting the tar file
self.tar2_process = None
if BACKUP_DEBUG and callable(self.print_callback):
self.print_callback("Finished extracting thread")
def get_supported_hmac_algo(hmac_algorithm):
# Start with provided default
@ -1049,6 +1191,8 @@ def parse_backup_header(filename):
continue
if key in BackupHeader.bool_options:
value = value.lower() in ["1", "true", "yes"]
elif key in BackupHeader.int_options:
value = int(value)
header_data[key] = value
return header_data
@ -1057,7 +1201,7 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
progress_callback=None, encrypted=False, appvm=None,
compressed = False, hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM,
verify_only=False):
verify_only=False, format_version = CURRENT_BACKUP_FORMAT_VERSION):
global running_backup_operation
@ -1168,6 +1312,8 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
"failed). Is the password correct?")
if os.path.basename(filename) == HEADER_FILENAME:
header_data = parse_backup_header(filename)
if BackupHeader.version in header_data:
format_version = header_data[BackupHeader.version]
if BackupHeader.crypto_algorithm in header_data:
crypto_algorithm = header_data[BackupHeader.crypto_algorithm]
if BackupHeader.hmac_algorithm in header_data:
@ -1191,17 +1337,25 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
# Setup worker to extract encrypted data chunks to the restore dirs
# Create the process here to pass it options extracted from backup header
extract_proc = ExtractWorker(queue=to_extract,
base_dir=restore_tmpdir,
passphrase=passphrase,
encrypted=encrypted,
compressed=compressed,
crypto_algorithm = crypto_algorithm,
verify_only=verify_only,
total_size=vms_size,
print_callback=print_callback,
error_callback=error_callback,
progress_callback=progress_callback)
extractor_params = {
'queue': to_extract,
'base_dir': restore_tmpdir,
'passphrase': passphrase,
'encrypted': encrypted,
'compressed': compressed,
'crypto_algorithm': crypto_algorithm,
'verify_only': verify_only,
'total_size': vms_size,
'print_callback': print_callback,
'error_callback': error_callback,
'progress_callback': progress_callback,
}
if format_version == 2:
extract_proc = ExtractWorker2(**extractor_params)
elif format_version == 3:
extract_proc = ExtractWorker3(**extractor_params)
else:
raise NotImplemented("Backup format version %d not supported" % format_version)
extract_proc.start()
try:
@ -1326,6 +1480,8 @@ def backup_detect_format_version(backup_location):
if os.path.exists(os.path.join(backup_location, 'qubes.xml')):
return 1
else:
# this could mean also 3, but not distinguishable until backup header
# is read
return 2
def backup_restore_header(source, passphrase,
@ -1359,6 +1515,7 @@ def backup_restore_header(source, passphrase,
vms_dirs=extract_filter,
vms=None,
vms_size=HEADER_QUBES_XML_MAX_SIZE,
format_version=format_version,
hmac_algorithm=hmac_algorithm,
crypto_algorithm=crypto_algorithm,
print_callback=print_callback,
@ -1480,7 +1637,7 @@ def backup_restore_prepare(backup_location, passphrase, options = {},
if format_version == 1:
is_vm_included_in_backup = is_vm_included_in_backup_v1
elif format_version == 2:
elif format_version in [2, 3]:
is_vm_included_in_backup = is_vm_included_in_backup_v2
if not appvm:
if not os.path.isfile(backup_location):
@ -1504,6 +1661,8 @@ def backup_restore_prepare(backup_location, passphrase, options = {},
format_version=format_version)
if header_data:
if BackupHeader.version in header_data:
format_version = header_data[BackupHeader.version]
if BackupHeader.crypto_algorithm in header_data:
crypto_algorithm = header_data[BackupHeader.crypto_algorithm]
if BackupHeader.hmac_algorithm in header_data:
@ -1599,7 +1758,7 @@ def backup_restore_prepare(backup_location, passphrase, options = {},
vms_to_restore['dom0']['good-to-go'] = True
# Not needed - all the data stored in vms_to_restore
if format_version == 2:
if format_version >= 2:
os.unlink(qubes_xml)
return vms_to_restore
@ -1756,14 +1915,14 @@ def backup_restore_do(restore_info,
if not vm_info['good-to-go']:
continue
vm = vm_info['vm']
if format_version == 2:
if format_version >= 2:
vms_size += vm.backup_size
vms_dirs.append(vm.backup_path)
vms[vm.name] = vm
running_backup_operation = BackupOperationInfo()
if format_version == 2:
if format_version >= 2:
if 'dom0' in restore_info.keys() and restore_info['dom0']['good-to-go']:
vms_dirs.append(os.path.dirname(restore_info['dom0']['subdir']))
vms_size += restore_info['dom0']['size']
@ -1775,6 +1934,7 @@ def backup_restore_do(restore_info,
vms_dirs=vms_dirs,
vms=vms,
vms_size=vms_size,
format_version=format_version,
hmac_algorithm=hmac_algorithm,
crypto_algorithm=crypto_algorithm,
verify_only=verify_only,
@ -1841,7 +2001,7 @@ def backup_restore_do(restore_info,
restore_vm_dir_v1(backup_location,
vm.dir_path,
os.path.dirname(new_vm.dir_path))
elif format_version == 2:
elif format_version >= 2:
shutil.move(os.path.join(restore_tmpdir, vm.backup_path),
new_vm.dir_path)
@ -1884,7 +2044,7 @@ def backup_restore_do(restore_info,
host_collection.unlock_db()
if running_backup_operation.canceled:
if format_version == 2:
if format_version >= 2:
raise BackupCanceledError("Restore canceled",
tmpdir=restore_tmpdir)
else:
@ -1912,7 +2072,7 @@ def backup_restore_do(restore_info,
os.rename(home_file, home_dir + '/' + restore_home_backupdir + '/' + f)
if format_version == 1:
retcode = subprocess.call (["cp", "-nrp", backup_dom0_home_dir + '/' + f, home_file])
elif format_version == 2:
elif format_version >= 2:
shutil.move(backup_dom0_home_dir + '/' + f, home_file)
retcode = subprocess.call(['sudo', 'chown', '-R', local_user, home_dir])
if retcode != 0: