backup: use constants for queue control strings
This commit is contained in:
parent
ad1f1738fa
commit
c08f5986a9
@ -41,6 +41,10 @@ from multiprocessing import Queue, Process
|
|||||||
import qubes
|
import qubes
|
||||||
import qubes.core2migration
|
import qubes.core2migration
|
||||||
|
|
||||||
|
QUEUE_ERROR = "ERROR"
|
||||||
|
|
||||||
|
QUEUE_FINISHED = "FINISHED"
|
||||||
|
|
||||||
HEADER_FILENAME = 'backup-header'
|
HEADER_FILENAME = 'backup-header'
|
||||||
DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
|
DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
|
||||||
DEFAULT_HMAC_ALGORITHM = 'SHA512'
|
DEFAULT_HMAC_ALGORITHM = 'SHA512'
|
||||||
@ -191,7 +195,7 @@ class SendWorker(Process):
|
|||||||
os.chdir(self.base_dir)
|
os.chdir(self.base_dir)
|
||||||
|
|
||||||
for filename in iter(self.queue.get, None):
|
for filename in iter(self.queue.get, None):
|
||||||
if filename == "FINISHED" or filename == "ERROR":
|
if filename in (QUEUE_FINISHED, QUEUE_ERROR):
|
||||||
break
|
break
|
||||||
|
|
||||||
self.log.debug("Sending file {}".format(filename))
|
self.log.debug("Sending file {}".format(filename))
|
||||||
@ -759,7 +763,7 @@ class Backup(object):
|
|||||||
pass
|
pass
|
||||||
tar_sparse.wait()
|
tar_sparse.wait()
|
||||||
hmac.wait()
|
hmac.wait()
|
||||||
to_send.put("ERROR")
|
to_send.put(QUEUE_ERROR)
|
||||||
send_proc.join()
|
send_proc.join()
|
||||||
shutil.rmtree(self.tmpdir)
|
shutil.rmtree(self.tmpdir)
|
||||||
raise BackupCanceledError("Backup canceled")
|
raise BackupCanceledError("Backup canceled")
|
||||||
@ -813,7 +817,7 @@ class Backup(object):
|
|||||||
self._current_vm_bytes = 0
|
self._current_vm_bytes = 0
|
||||||
self._send_progress_update()
|
self._send_progress_update()
|
||||||
|
|
||||||
self._queue_put_with_check(send_proc, vmproc, to_send, "FINISHED")
|
self._queue_put_with_check(send_proc, vmproc, to_send, QUEUE_FINISHED)
|
||||||
send_proc.join()
|
send_proc.join()
|
||||||
shutil.rmtree(self.tmpdir)
|
shutil.rmtree(self.tmpdir)
|
||||||
|
|
||||||
@ -1013,7 +1017,7 @@ class ExtractWorker2(Process):
|
|||||||
filename = None
|
filename = None
|
||||||
|
|
||||||
for filename in iter(self.queue.get, None):
|
for filename in iter(self.queue.get, None):
|
||||||
if filename == "FINISHED" or filename == "ERROR":
|
if filename in (QUEUE_FINISHED, QUEUE_ERROR):
|
||||||
break
|
break
|
||||||
|
|
||||||
self.log.debug("Extracting file " + filename)
|
self.log.debug("Extracting file " + filename)
|
||||||
@ -1128,7 +1132,7 @@ class ExtractWorker2(Process):
|
|||||||
os.unlink(self.restore_pipe)
|
os.unlink(self.restore_pipe)
|
||||||
|
|
||||||
if self.tar2_process is not None:
|
if self.tar2_process is not None:
|
||||||
if filename == "ERROR":
|
if filename == QUEUE_ERROR:
|
||||||
self.tar2_process.terminate()
|
self.tar2_process.terminate()
|
||||||
self.tar2_process.wait()
|
self.tar2_process.wait()
|
||||||
elif self.tar2_process.wait() != 0:
|
elif self.tar2_process.wait() != 0:
|
||||||
@ -1169,7 +1173,7 @@ class ExtractWorker3(ExtractWorker2):
|
|||||||
|
|
||||||
input_pipe = None
|
input_pipe = None
|
||||||
for filename in iter(self.queue.get, None):
|
for filename in iter(self.queue.get, None):
|
||||||
if filename == "FINISHED" or filename == "ERROR":
|
if filename in (QUEUE_FINISHED, QUEUE_ERROR):
|
||||||
break
|
break
|
||||||
|
|
||||||
self.log.debug("Extracting file " + filename)
|
self.log.debug("Extracting file " + filename)
|
||||||
@ -1274,7 +1278,7 @@ class ExtractWorker3(ExtractWorker2):
|
|||||||
|
|
||||||
if self.tar2_process is not None:
|
if self.tar2_process is not None:
|
||||||
input_pipe.close()
|
input_pipe.close()
|
||||||
if filename == "ERROR":
|
if filename == QUEUE_ERROR:
|
||||||
if self.decryptor_process:
|
if self.decryptor_process:
|
||||||
self.decryptor_process.terminate()
|
self.decryptor_process.terminate()
|
||||||
self.decryptor_process.wait()
|
self.decryptor_process.wait()
|
||||||
@ -1608,7 +1612,7 @@ class BackupRestore(object):
|
|||||||
def _start_inner_extraction_worker(self, queue):
|
def _start_inner_extraction_worker(self, queue):
|
||||||
"""Start a worker process, extracting inner layer of bacup archive,
|
"""Start a worker process, extracting inner layer of bacup archive,
|
||||||
extract them to :py:attr:`tmpdir`.
|
extract them to :py:attr:`tmpdir`.
|
||||||
End the data by pushing "FINISHED" or "ERROR" to the queue.
|
End the data by pushing QUEUE_FINISHED or QUEUE_ERROR to the queue.
|
||||||
|
|
||||||
:param queue :py:class:`Queue` object to handle files from
|
:param queue :py:class:`Queue` object to handle files from
|
||||||
"""
|
"""
|
||||||
@ -1652,7 +1656,7 @@ class BackupRestore(object):
|
|||||||
self._verify_hmac("qubes.xml.000", "qubes.xml.000.hmac")
|
self._verify_hmac("qubes.xml.000", "qubes.xml.000.hmac")
|
||||||
queue = Queue()
|
queue = Queue()
|
||||||
queue.put("qubes.xml.000")
|
queue.put("qubes.xml.000")
|
||||||
queue.put("FINISHED")
|
queue.put(QUEUE_FINISHED)
|
||||||
|
|
||||||
extract_proc = self._start_inner_extraction_worker(queue)
|
extract_proc = self._start_inner_extraction_worker(queue)
|
||||||
extract_proc.join()
|
extract_proc.join()
|
||||||
@ -1761,11 +1765,11 @@ class BackupRestore(object):
|
|||||||
raise qubes.exc.QubesException(
|
raise qubes.exc.QubesException(
|
||||||
"Premature end of archive, the last file was %s" % filename)
|
"Premature end of archive, the last file was %s" % filename)
|
||||||
except:
|
except:
|
||||||
to_extract.put("ERROR")
|
to_extract.put(QUEUE_ERROR)
|
||||||
extract_proc.join()
|
extract_proc.join()
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
to_extract.put("FINISHED")
|
to_extract.put(QUEUE_FINISHED)
|
||||||
|
|
||||||
self.log.debug("Waiting for the extraction process to finish...")
|
self.log.debug("Waiting for the extraction process to finish...")
|
||||||
extract_proc.join()
|
extract_proc.join()
|
||||||
|
Loading…
Reference in New Issue
Block a user