backup: use a thread to send data to AppVM in parallel to tar main operations.

Additionnally, temporary files are removed once data has been sent
This commit is contained in:
Olivier MEDOC 2013-09-10 09:25:44 +02:00
parent a85f3a7d8e
commit 23065f6fa0

View File

@ -1068,6 +1068,40 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e
print "Will backup:",files_to_backup print "Will backup:",files_to_backup
# Setup worker to send encrypted data chunks to the backup_target
from multiprocessing import Queue,Process
class Send_Worker(Process):
def __init__(self,queue,base_dir,backup_stdout):
super(Send_Worker, self).__init__()
self.queue = queue
self.base_dir = base_dir
self.backup_stdout = backup_stdout
def run(self):
print "Started sending thread"
print "Moving to temporary dir",self.base_dir
os.chdir(self.base_dir)
for filename in iter(self.queue.get,None):
if filename == "FINISHED":
break
print "Sending file",filename
tar_final_cmd = ["tar", "-cO", "-C", self.base_dir, filename]
final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=self.backup_stdout)
final_proc.wait()
# Delete the file as we don't need it anymore
print "Removing file",filename
os.remove(filename)
print "Finished sending thread"
to_send = Queue()
send_proc = Send_Worker(to_send, backup_tmpdir, backup_stdout)
send_proc.start()
for filename in files_to_backup: for filename in files_to_backup:
print "Backing up",filename print "Backing up",filename
@ -1117,9 +1151,10 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e
raise QubesException("Failed to perform backup: error with "+run_error) raise QubesException("Failed to perform backup: error with "+run_error)
# Send the chunk to the backup target # Send the chunk to the backup target
tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]] to_send.put(chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1])
final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout) #tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]]
final_proc.wait() #final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout)
#final_proc.wait()
# Close HMAC # Close HMAC
hmac.stdin.close() hmac.stdin.close()
@ -1135,9 +1170,10 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e
hmac_file.close() hmac_file.close()
# Send the HMAC to the backup target # Send the HMAC to the backup target
tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]+".hmac"] to_send.put(chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]+".hmac")
final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout) #tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]+".hmac"]
final_proc.wait() #final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout)
#final_proc.wait()
if tar_sparse.poll() == None: if tar_sparse.poll() == None:
# Release the next chunk # Release the next chunk
@ -1148,18 +1184,13 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e
else: else:
print "Finished tar sparse with error",tar_sparse.poll() print "Finished tar sparse with error",tar_sparse.poll()
# Wait for all remaining subprocess to finish
#if addproc:
# addproc.wait()
# print "Addproc:",addproc.poll()
#streamproc.wait()
#print "Streamproc:",streamproc.poll()
#streamproc.wait()
# Close the backup target and wait for it to finish # Close the backup target and wait for it to finish
#backup_stdout.close() #backup_stdout.close()
to_send.put("FINISHED")
send_proc.join()
if vmproc: if vmproc:
print "VMProc1:",vmproc.poll() print "VMProc1:",vmproc.poll()
print "Sparse1:",tar_sparse.poll() print "Sparse1:",tar_sparse.poll()
@ -1189,7 +1220,7 @@ def wait_backup_feedback(progress_callback, streamproc, backup_target, total_bac
blocks_backedup += len(buffer) blocks_backedup += len(buffer)
progress = blocks_backedup / float(total_backup_sz) progress = blocks_backedup / float(total_backup_sz)
#progress_callback(round(progress*100,2)) progress_callback(round(progress*100,2))
run_count = 0 run_count = 0
if hmac: if hmac:
@ -1202,7 +1233,7 @@ def wait_backup_feedback(progress_callback, streamproc, backup_target, total_bac
if addproc: if addproc:
retcode=addproc.poll() retcode=addproc.poll()
print "Tar proc status:",retcode #print "Tar proc status:",retcode
if retcode != None: if retcode != None:
if retcode != 0: if retcode != 0:
run_error = "addproc" run_error = "addproc"
@ -1231,7 +1262,7 @@ def wait_backup_feedback(progress_callback, streamproc, backup_target, total_bac
run_count += 1 run_count += 1
else: else:
print "Process running:",len(buffer) #print "Process running:",len(buffer)
# Process still running # Process still running
backup_target.write(buffer) backup_target.write(buffer)