diff --git a/dom0/qvm-core/qubesutils.py b/dom0/qvm-core/qubesutils.py index dcf510d6..afcec82d 100644 --- a/dom0/qvm-core/qubesutils.py +++ b/dom0/qvm-core/qubesutils.py @@ -1068,6 +1068,40 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e print "Will backup:",files_to_backup + # Setup worker to send encrypted data chunks to the backup_target + from multiprocessing import Queue,Process + class Send_Worker(Process): + def __init__(self,queue,base_dir,backup_stdout): + super(Send_Worker, self).__init__() + self.queue = queue + self.base_dir = base_dir + self.backup_stdout = backup_stdout + + def run(self): + print "Started sending thread" + + print "Moving to temporary dir",self.base_dir + os.chdir(self.base_dir) + + for filename in iter(self.queue.get,None): + if filename == "FINISHED": + break + + print "Sending file",filename + tar_final_cmd = ["tar", "-cO", "-C", self.base_dir, filename] + final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=self.backup_stdout) + final_proc.wait() + + # Delete the file as we don't need it anymore + print "Removing file",filename + os.remove(filename) + + print "Finished sending thread" + + to_send = Queue() + send_proc = Send_Worker(to_send, backup_tmpdir, backup_stdout) + send_proc.start() + for filename in files_to_backup: print "Backing up",filename @@ -1117,9 +1151,10 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e raise QubesException("Failed to perform backup: error with "+run_error) # Send the chunk to the backup target - tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]] - final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout) - final_proc.wait() + to_send.put(chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]) + #tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]] + #final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout) + #final_proc.wait() # Close HMAC hmac.stdin.close() @@ -1135,9 +1170,10 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e hmac_file.close() # Send the HMAC to the backup target - tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]+".hmac"] - final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout) - final_proc.wait() + to_send.put(chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]+".hmac") + #tar_final_cmd = ["tar", "-cO", "-C", backup_tmpdir, chunkfile.split(os.path.normpath(backup_tmpdir)+"/")[1]+".hmac"] + #final_proc = subprocess.Popen (tar_final_cmd, stdin=subprocess.PIPE, stdout=backup_stdout) + #final_proc.wait() if tar_sparse.poll() == None: # Release the next chunk @@ -1148,18 +1184,13 @@ def backup_do_copy(base_backup_dir, files_to_backup, progress_callback = None, e else: print "Finished tar sparse with error",tar_sparse.poll() - # Wait for all remaining subprocess to finish - #if addproc: - # addproc.wait() - # print "Addproc:",addproc.poll() - - #streamproc.wait() - #print "Streamproc:",streamproc.poll() - - #streamproc.wait() # Close the backup target and wait for it to finish #backup_stdout.close() + + to_send.put("FINISHED") + send_proc.join() + if vmproc: print "VMProc1:",vmproc.poll() print "Sparse1:",tar_sparse.poll() @@ -1189,7 +1220,7 @@ def wait_backup_feedback(progress_callback, streamproc, backup_target, total_bac blocks_backedup += len(buffer) progress = blocks_backedup / float(total_backup_sz) - #progress_callback(round(progress*100,2)) + progress_callback(round(progress*100,2)) run_count = 0 if hmac: @@ -1202,7 +1233,7 @@ def wait_backup_feedback(progress_callback, streamproc, backup_target, total_bac if addproc: retcode=addproc.poll() - print "Tar proc status:",retcode + #print "Tar proc status:",retcode if retcode != None: if retcode != 0: run_error = "addproc" @@ -1231,7 +1262,7 @@ def wait_backup_feedback(progress_callback, streamproc, backup_target, total_bac run_count += 1 else: - print "Process running:",len(buffer) + #print "Process running:",len(buffer) # Process still running backup_target.write(buffer)