From f2fa613dce03389dbab45b8dc387a5fc3b27c28d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 17 Jul 2017 23:30:37 +0200 Subject: [PATCH] backup: use 'cat' instead of read-write loop in python The most important part is fixing resize handling - call size_func before data_func, but after tar gets initial data (and output file size). But other than that, it makes the process a little faster. QubesOS/qubes-issues#1214 --- qubesadmin/backup/__init__.py | 210 +++++++++++++++++----------------- 1 file changed, 105 insertions(+), 105 deletions(-) diff --git a/qubesadmin/backup/__init__.py b/qubesadmin/backup/__init__.py index 6feaac3..132ce1c 100644 --- a/qubesadmin/backup/__init__.py +++ b/qubesadmin/backup/__init__.py @@ -64,6 +64,8 @@ HMAC_MAX_SIZE = 4096 BLKSIZE = 512 _re_alphanum = re.compile(r'^[A-Za-z0-9-]*$') +_tar_msg_re = re.compile(r".*#[0-9].*restore_pipe") +_tar_file_size_re = re.compile(r"^[^ ]+ [^ ]+/[^ ]+ *([0-9]+) .*") class BackupCanceledError(QubesException): '''Exception raised when backup/restore was cancelled''' @@ -305,8 +307,8 @@ class ExtractWorker3(Process): self.tar2_process = None #: current inner tar archive name self.tar2_current_file = None - #: call size_func handler for this file when tar report it on stderr - self.adjust_output_size = None + #: cat process feeding tar2_process + self.tar2_feeder = None #: decompressor subprocess.Popen instance self.decompressor_process = None #: decryptor subprocess.Popen instance @@ -324,57 +326,6 @@ class ExtractWorker3(Process): self.tar2_stderr = [] self.compression_filter = compression_filter - @staticmethod - def handle_streams(stream_in, streams_out, processes, size_limit=None, - progress_callback=None): - ''' - Copy stream_in to all streams_out and monitor all mentioned processes. - If any of them terminate with non-zero code, interrupt the process. Copy - at most `size_limit` data (if given). - - :param stream_in: file-like object to read data from - :param streams_out: dict of file-like objects to write data to - :param processes: dict of subprocess.Popen objects to monitor - :param size_limit: int maximum data amount to process - :param progress_callback: callable function to report progress, will be - given copied data size (it should accumulate internally) - :return: failed process name, failed stream name, "size_limit" or None ( - no error) - ''' - buffer_size = 409600 - bytes_copied = 0 - while True: - if size_limit: - to_copy = min(buffer_size, size_limit - bytes_copied) - if to_copy <= 0: - return "size_limit" - else: - to_copy = buffer_size - buf = stream_in.read(to_copy) - if not buf: - # done - return None - - if callable(progress_callback): - progress_callback(len(buf)) - for name, stream in streams_out.items(): - if stream is None: - continue - try: - stream.write(buf) - except IOError: - return name - bytes_copied += len(buf) - - for name, proc in processes.items(): - if proc is None: - continue - if isinstance(proc, Process): - if not proc.is_alive() and proc.exitcode != 0: - return name - elif proc.poll(): - return name - def collect_tar_output(self): '''Retrieve tar stderr and handle it appropriately @@ -398,22 +349,9 @@ class ExtractWorker3(Process): new_lines = [x.decode(self.stderr_encoding) for x in new_lines] - msg_re = re.compile(r".*#[0-9].*restore_pipe") - debug_msg = [msg for msg in new_lines if msg_re.match(msg)] + debug_msg = [msg for msg in new_lines if _tar_msg_re.match(msg)] self.log.debug('tar2_stderr: %s', '\n'.join(debug_msg)) - new_lines = [msg for msg in new_lines if not msg_re.match(msg)] - if self.adjust_output_size: - # search for first file size reported by tar after setting - # self.adjust_output_size (so don't look at self.tar2_stderr) - # this is used only when extracting single-file archive, so don't - # bother with checking file name - file_size_re = re.compile(r"^[^ ]+ [^ ]+/[^ ]+ *([0-9]+) .*") - for line in new_lines: - match = file_size_re.match(line) - if match: - file_size = match.groups()[0] - self.adjust_output_size(file_size) - self.adjust_output_size = None + new_lines = [msg for msg in new_lines if not _tar_msg_re.match(msg)] self.tar2_stderr += new_lines def run(self): @@ -490,26 +428,98 @@ class ExtractWorker3(Process): self.handle_dir( os.path.dirname(inner_name)) self.tar2_current_file = None - self.adjust_output_size = None self.tar2_process = None - @staticmethod - def _data_func_wrapper(close_fds, data_func, data_stream): - '''Close not needed file descriptors, then call data_func( - data_stream). + def _data_import_wrapper(self, close_fds, data_func, size_func, + tar2_process): + '''Close not needed file descriptors, handle output size reported + by tar (if needed) then call data_func(tar2_process.stdout). This is to prevent holding write end of a pipe in subprocess, preventing EOF transfer. ''' for fd in close_fds: - if fd == data_stream.fileno(): + if fd in (tar2_process.stdout.fileno(), + tar2_process.stderr.fileno()): continue try: os.close(fd) except OSError: pass - return data_func(data_stream) + # retrieve file size from tar's stderr; warning: we do + # not read data from tar's stdout at this point, it will + # hang if it tries to output file content before + # reporting its size on stderr first + if size_func: + # process lines on stderr until we get file size + # search for first file size reported by tar - + # this is used only when extracting single-file archive, so don't + # bother with checking file name + # Also, this needs to be called before anything is retrieved + # from tar stderr, otherwise the process may deadlock waiting for + # size (at this point nothing is retrieving data from tar stdout + # yet, so it will hang on write() when the output pipe fill up). + while True: + line = tar2_process.stderr.readline() + line = line.decode() + if _tar_msg_re.match(line): + self.log.debug('tar2_stderr: %s', line) + else: + match = _tar_file_size_re.match(line) + if match: + file_size = match.groups()[0] + size_func(file_size) + break + else: + self.log.warning( + 'unexpected tar output (no file size report): %s', + line) + + return data_func(tar2_process.stdout) + + def feed_tar2(self, filename, input_pipe): + '''Feed data from *filename* to *input_pipe* + + Start a cat process to do that (do not block this process). Cat + subprocess instance will be in :py:attr:`tar2_feeder` + ''' + assert self.tar2_feeder is None + + self.tar2_feeder = subprocess.Popen(['cat', filename], + stdout=input_pipe) + + def check_processes(self, processes): + '''Check if any process failed. + + And if so, wait for other relevant processes to cleanup. + ''' + run_error = None + for name, proc in processes.items(): + if proc is None: + continue + + if isinstance(proc, Process): + if not proc.is_alive() and proc.exitcode != 0: + run_error = name + break + elif proc.poll(): + run_error = name + break + + if run_error: + if run_error == "target": + self.collect_tar_output() + details = "\n".join(self.tar2_stderr) + else: + details = "%s failed" % run_error + if self.decryptor_process: + self.decryptor_process.terminate() + self.decryptor_process.wait() + self.decryptor_process = None + self.log.error('Error while processing \'%s\': %s', + self.tar2_current_file, details) + self.cleanup_tar2(wait=True, terminate=True) def __run__(self): self.log.debug("Started sending thread") @@ -596,20 +606,19 @@ class ExtractWorker3(Process): stderr=subprocess.PIPE) input_pipe = self.tar2_process.stdin + self.feed_tar2(filename, input_pipe) + if inner_name in self.handlers: assert redirect_stdout is subprocess.PIPE data_func, size_func = self.handlers[inner_name] self.import_process = multiprocessing.Process( - target=self._data_func_wrapper, + target=self._data_import_wrapper, args=([input_pipe.fileno()], - data_func, self.tar2_process.stdout)) + data_func, size_func, self.tar2_process)) + self.import_process.start() self.tar2_process.stdout.close() - self.adjust_output_size = size_func - fcntl.fcntl(self.tar2_process.stderr.fileno(), fcntl.F_SETFL, - fcntl.fcntl(self.tar2_process.stderr.fileno(), - fcntl.F_GETFL) | os.O_NONBLOCK) self.tar2_stderr = [] elif not self.tar2_process: # Extracting of the current archive failed, skip to the next @@ -628,35 +637,26 @@ class ExtractWorker3(Process): filename, expected_filename) os.remove(filename) continue + self.log.debug("Releasing next chunck") + self.feed_tar2(filename, input_pipe) self.tar2_current_file = filename - input_file = open(filename, 'rb') + self.tar2_feeder.wait() + # check if any process failed + processes = { + 'target': self.tar2_feeder, + 'vmproc': self.vmproc, + 'addproc': self.tar2_process, + 'data_import': self.import_process, + 'decryptor': self.decryptor_process, + } + self.check_processes(processes) + self.tar2_feeder = None - run_error = self.handle_streams( - input_file, - {'target': input_pipe}, - {'vmproc': self.vmproc, - 'addproc': self.tar2_process, - 'data_import': self.import_process, - 'decryptor': self.decryptor_process, - }, - progress_callback=self.progress_callback) - input_file.close() - if run_error: - if run_error == "target": - self.collect_tar_output() - details = "\n".join(self.tar2_stderr) - else: - details = "%s failed" % run_error - if self.decryptor_process: - self.decryptor_process.terminate() - self.decryptor_process.wait() - self.decryptor_process = None - self.log.error('Error while processing \'%s\': %s', - self.tar2_current_file, details) - self.cleanup_tar2(wait=True, terminate=True) + if callable(self.progress_callback): + self.progress_callback(os.path.getsize(filename)) # Delete the file as we don't need it anymore self.log.debug('Removing file %s', filename)