backups: implement async backup/restore cancel method (#793)

The backup_cancel() method kills processes registered by main thread and
set "running_backup_operation.canceled" to True. Then main thread get an
error because of killed processes and check if that was because of
cancel request.

Introduce BackupCanceledError, which can report temporary dir to remove.
This commit is contained in:
Marek Marczykowski-Górecki 2014-03-08 03:55:47 +01:00
parent b52d1a4379
commit f4194c9d08

View File

@ -47,6 +47,20 @@ MAX_STDERR_BYTES = 1024
# header + qubes.xml max size
HEADER_QUBES_XML_MAX_SIZE = 1024 * 1024
# global state for backup_cancel()
running_backup_operation = None
class BackupOperationInfo:
def __init__(self):
self.canceled = False
self.processes_to_kill_on_cancel = []
self.tmpdir_to_remove = None
class BackupCanceledError(QubesException):
def __init__(self, msg, tmpdir=None):
super(BackupCanceledError, self).__init__(msg)
self.tmpdir = tmpdir
class BackupHeader:
encrypted = 'encrypted'
compressed = 'compressed'
@ -81,6 +95,23 @@ def file_to_backup (file_path, subdir = None):
subdir += '/'
return [ { "path" : file_path, "size": sz, "subdir": subdir} ]
def backup_cancel():
"""
Cancel currently running backup/restore operation
@return: True if any operation was signaled
"""
if running_backup_operation is None:
return False
running_backup_operation.canceled = True
for proc in running_backup_operation.processes_to_kill_on_cancel:
try:
proc.terminate()
except:
pass
return True
def backup_prepare(vms_list = None, exclude_list = None,
print_callback = print_stdout, hide_vm_names=True):
"""If vms = None, include all (sensible) VMs; exclude_list is always applied"""
@ -333,7 +364,7 @@ class SendWorker(Process):
os.chdir(self.base_dir)
for filename in iter(self.queue.get,None):
if filename == "FINISHED":
if filename == "FINISHED" or filename == "ERROR":
break
if BACKUP_DEBUG:
@ -381,6 +412,8 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
progress_callback = None, encrypted=False, appvm=None,
compressed=False, hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM):
global running_backup_operation
total_backup_sz = 0
passphrase = passphrase.encode('utf-8')
for file in files_to_backup:
@ -390,6 +423,7 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
raise QubesException("Compressed and encrypted backups are not "
"supported (yet).")
running_backup_operation = BackupOperationInfo()
vmproc = None
if appvm != None:
# Prepare the backup target (Qubes service call)
@ -401,6 +435,7 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
vmproc.stdin.write(base_backup_dir.
replace("\r", "").replace("\n", "")+"\n")
backup_stdout = vmproc.stdin
running_backup_operation.processes_to_kill_on_cancel.append(vmproc)
else:
# Prepare the backup target (local file)
backup_target = base_backup_dir + "/qubes-{0}". \
@ -421,6 +456,7 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
progress_callback(progress)
backup_tmpdir = tempfile.mkdtemp(prefix="/var/tmp/backup_")
running_backup_operation.tmpdir_to_remove = backup_tmpdir
# Tar with tapelength does not deals well with stdout (close stdout between
# two tapes)
@ -489,6 +525,7 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
# Pipe: tar-sparse [| hmac] | tar | backup_target
tar_sparse = subprocess.Popen (tar_cmdline, stdin=subprocess.PIPE,
stderr=(open(os.devnull, 'w') if not BACKUP_DEBUG else None))
running_backup_operation.processes_to_kill_on_cancel.append(tar_sparse)
# Wait for compressor (tar) process to finish or for any error of other
# subprocesses
@ -542,6 +579,22 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
if BACKUP_DEBUG:
print "Wait_backup_feedback returned:", run_error
if running_backup_operation.canceled:
try:
tar_sparse.terminate()
except:
pass
try:
hmac.terminate()
except:
pass
tar_sparse.wait()
hmac.wait()
to_send.put("ERROR")
send_proc.join()
shutil.rmtree(backup_tmpdir)
running_backup_operation = None
raise BackupCanceledError("Backup canceled")
if len(run_error) > 0:
send_proc.terminate()
if run_error == "VM" and vmproc:
@ -583,11 +636,20 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
tar_sparse.stdin.flush()
run_error="paused"
else:
running_backup_operation.processes_to_kill_on_cancel.remove(
tar_sparse)
if BACKUP_DEBUG:
print "Finished tar sparse with error", tar_sparse.poll()
to_send.put("FINISHED")
send_proc.join()
shutil.rmtree(backup_tmpdir)
if running_backup_operation.canceled:
running_backup_operation = None
raise BackupCanceledError("Backup canceled")
running_backup_operation = None
if send_proc.exitcode != 0:
raise QubesException("Failed to send backup: error in the sending process")
@ -598,7 +660,6 @@ def backup_do(base_backup_dir, files_to_backup, passphrase,
print "Sparse1 proc return code:", tar_sparse.poll()
vmproc.stdin.close()
shutil.rmtree(backup_tmpdir)
'''
' Wait for backup chunk to finish
@ -920,6 +981,8 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
compressed = False, hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
crypto_algorithm=DEFAULT_CRYPTO_ALGORITHM):
global running_backup_operation
if BACKUP_DEBUG:
print_callback("Working in temporary dir:"+restore_tmpdir)
print_callback("Extracting data: " + size_to_human(vms_size)+" to restore")
@ -937,6 +1000,8 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
# Send to tar2qfile the VMs that should be extracted
vmproc.stdin.write(" ".join(vms_dirs)+"\n")
if running_backup_operation:
running_backup_operation.processes_to_kill_on_cancel.append(vmproc)
backup_stdin = vmproc.stdout
tar1_command = ['/usr/libexec/qubes/qfile-dom0-unpacker',
@ -970,6 +1035,8 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
stdout=vmproc.stdin if vmproc else subprocess.PIPE,
stderr=subprocess.PIPE,
env=tar1_env)
if running_backup_operation:
running_backup_operation.processes_to_kill_on_cancel.append(command)
# qfile-dom0-unpacker output filelist on stderr (and have stdout connected
# to the VM), while tar output filelist on stdout
@ -1059,6 +1126,8 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
try:
filename = None
while True:
if running_backup_operation and running_backup_operation.canceled:
break
if nextfile is not None:
filename = nextfile
else:
@ -1071,6 +1140,9 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
break
hmacfile = filelist_pipe.readline().strip()
if running_backup_operation and running_backup_operation.canceled:
break
# if reading archive directly with tar, wait for next filename -
# tar prints filename before processing it, so wait for the next one to be
# sure that whole file was extracted
@ -1095,6 +1167,10 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
passphrase, hmac_algorithm):
to_extract.put(os.path.join(restore_tmpdir, filename))
if running_backup_operation and running_backup_operation.canceled:
raise BackupCanceledError("Restore canceled",
tmpdir=restore_tmpdir)
if command.wait() != 0 and not expect_tar_error:
raise QubesException(
"ERROR: unable to read the qubes backup file {0} ({1}). " \
@ -1111,6 +1187,7 @@ def restore_vm_dirs (backup_source, restore_tmpdir, passphrase, vms_dirs, vms,
except:
to_extract.put("ERROR")
extract_proc.join()
running_backup_operation = None
raise
else:
to_extract.put("FINISHED")
@ -1163,7 +1240,9 @@ def backup_restore_header(source, passphrase,
hmac_algorithm = DEFAULT_HMAC_ALGORITHM,
crypto_algorithm = DEFAULT_CRYPTO_ALGORITHM):
global running_backup_operation
vmproc = None
running_backup_operation = None
restore_tmpdir = tempfile.mkdtemp(prefix="/var/tmp/restore_")
@ -1531,6 +1610,8 @@ def backup_restore_do(restore_info,
error_callback = print_stderr, progress_callback = None,
):
global running_backup_operation
### Private functions begin
def restore_vm_dir_v1 (backup_dir, src_dir, dst_dir):
@ -1580,6 +1661,8 @@ def backup_restore_do(restore_info,
vms_dirs.append(vm.backup_path)
vms[vm.name] = vm
running_backup_operation = BackupOperationInfo()
if format_version == 2:
if 'dom0' in restore_info.keys() and restore_info['dom0']['good-to-go']:
vms_dirs.append('dom0-home')
@ -1603,7 +1686,13 @@ def backup_restore_do(restore_info,
# Add VM in right order
for (vm_class_name, vm_class) in sorted(QubesVmClasses.items(),
key=lambda _x: _x[1].load_order):
if running_backup_operation.canceled:
break
for vm in vms.values():
if running_backup_operation.canceled:
# only break the loop to save qubes.xml with already restored
# VMs
break
if not vm.__class__ == vm_class:
continue
print_callback("-> Restoring {type} {0}...".format(vm.name, type=vm_class_name))
@ -1674,6 +1763,14 @@ def backup_restore_do(restore_info,
if lock_obtained:
host_collection.unlock_db()
if running_backup_operation.canceled:
if format_version == 2:
raise BackupCanceledError("Restore canceled",
tmpdir=restore_tmpdir)
else:
raise BackupCanceledError("Restore canceled")
# ... and dom0 home as last step
if 'dom0' in restore_info.keys() and restore_info['dom0']['good-to-go']:
backup_path = restore_info['dom0']['subdir']