Parcourir la source

backup: use 'cat' instead of read-write loop in python

The most important part is fixing resize handling - call size_func
before data_func, but after tar gets initial data (and output file
size).
But other than that, it makes the process a little faster.

QubesOS/qubes-issues#1214
Marek Marczykowski-Górecki il y a 6 ans
Parent
commit
f2fa613dce
1 fichiers modifiés avec 106 ajouts et 106 suppressions
  1. 106 106
      qubesadmin/backup/__init__.py

+ 106 - 106
qubesadmin/backup/__init__.py

@@ -64,6 +64,8 @@ HMAC_MAX_SIZE = 4096
 BLKSIZE = 512
 
 _re_alphanum = re.compile(r'^[A-Za-z0-9-]*$')
+_tar_msg_re = re.compile(r".*#[0-9].*restore_pipe")
+_tar_file_size_re = re.compile(r"^[^ ]+ [^ ]+/[^ ]+ *([0-9]+) .*")
 
 class BackupCanceledError(QubesException):
     '''Exception raised when backup/restore was cancelled'''
@@ -305,8 +307,8 @@ class ExtractWorker3(Process):
         self.tar2_process = None
         #: current inner tar archive name
         self.tar2_current_file = None
-        #: call size_func handler for this file when tar report it on stderr
-        self.adjust_output_size = None
+        #: cat process feeding tar2_process
+        self.tar2_feeder = None
         #: decompressor subprocess.Popen instance
         self.decompressor_process = None
         #: decryptor subprocess.Popen instance
@@ -324,57 +326,6 @@ class ExtractWorker3(Process):
         self.tar2_stderr = []
         self.compression_filter = compression_filter
 
-    @staticmethod
-    def handle_streams(stream_in, streams_out, processes, size_limit=None,
-            progress_callback=None):
-        '''
-        Copy stream_in to all streams_out and monitor all mentioned processes.
-        If any of them terminate with non-zero code, interrupt the process. Copy
-        at most `size_limit` data (if given).
-
-        :param stream_in: file-like object to read data from
-        :param streams_out: dict of file-like objects to write data to
-        :param processes: dict of subprocess.Popen objects to monitor
-        :param size_limit: int maximum data amount to process
-        :param progress_callback: callable function to report progress, will be
-            given copied data size (it should accumulate internally)
-        :return: failed process name, failed stream name, "size_limit" or None (
-            no error)
-        '''
-        buffer_size = 409600
-        bytes_copied = 0
-        while True:
-            if size_limit:
-                to_copy = min(buffer_size, size_limit - bytes_copied)
-                if to_copy <= 0:
-                    return "size_limit"
-            else:
-                to_copy = buffer_size
-            buf = stream_in.read(to_copy)
-            if not buf:
-                # done
-                return None
-
-            if callable(progress_callback):
-                progress_callback(len(buf))
-            for name, stream in streams_out.items():
-                if stream is None:
-                    continue
-                try:
-                    stream.write(buf)
-                except IOError:
-                    return name
-            bytes_copied += len(buf)
-
-            for name, proc in processes.items():
-                if proc is None:
-                    continue
-                if isinstance(proc, Process):
-                    if not proc.is_alive() and proc.exitcode != 0:
-                        return name
-                elif proc.poll():
-                    return name
-
     def collect_tar_output(self):
         '''Retrieve tar stderr and handle it appropriately
 
@@ -398,22 +349,9 @@ class ExtractWorker3(Process):
 
         new_lines = [x.decode(self.stderr_encoding) for x in new_lines]
 
-        msg_re = re.compile(r".*#[0-9].*restore_pipe")
-        debug_msg = [msg for msg in new_lines if msg_re.match(msg)]
+        debug_msg = [msg for msg in new_lines if _tar_msg_re.match(msg)]
         self.log.debug('tar2_stderr: %s', '\n'.join(debug_msg))
-        new_lines = [msg for msg in new_lines if not msg_re.match(msg)]
-        if self.adjust_output_size:
-            # search for first file size reported by tar after setting
-            # self.adjust_output_size (so don't look at self.tar2_stderr)
-            # this is used only when extracting single-file archive, so don't
-            #  bother with checking file name
-            file_size_re = re.compile(r"^[^ ]+ [^ ]+/[^ ]+ *([0-9]+) .*")
-            for line in new_lines:
-                match = file_size_re.match(line)
-                if match:
-                    file_size = match.groups()[0]
-                    self.adjust_output_size(file_size)
-                    self.adjust_output_size = None
+        new_lines = [msg for msg in new_lines if not _tar_msg_re.match(msg)]
         self.tar2_stderr += new_lines
 
     def run(self):
@@ -490,26 +428,98 @@ class ExtractWorker3(Process):
                 self.handle_dir(
                     os.path.dirname(inner_name))
             self.tar2_current_file = None
-            self.adjust_output_size = None
         self.tar2_process = None
 
-    @staticmethod
-    def _data_func_wrapper(close_fds, data_func, data_stream):
-        '''Close not needed file descriptors, then call data_func(
-        data_stream).
+    def _data_import_wrapper(self, close_fds, data_func, size_func,
+            tar2_process):
+        '''Close not needed file descriptors, handle output size reported
+        by tar (if needed) then call data_func(tar2_process.stdout).
 
         This is to prevent holding write end of a pipe in subprocess,
         preventing EOF transfer.
         '''
         for fd in close_fds:
-            if fd == data_stream.fileno():
+            if fd in (tar2_process.stdout.fileno(),
+                    tar2_process.stderr.fileno()):
                 continue
             try:
                 os.close(fd)
             except OSError:
                 pass
-        return data_func(data_stream)
 
+        # retrieve file size from tar's stderr; warning: we do
+        # not read data from tar's stdout at this point, it will
+        # hang if it tries to output file content before
+        # reporting its size on stderr first
+        if size_func:
+            # process lines on stderr until we get file size
+            # search for first file size reported by tar -
+            # this is used only when extracting single-file archive, so don't
+            #  bother with checking file name
+            # Also, this needs to be called before anything is retrieved
+            # from tar stderr, otherwise the process may deadlock waiting for
+            # size (at this point nothing is retrieving data from tar stdout
+            # yet, so it will hang on write() when the output pipe fill up).
+            while True:
+                line = tar2_process.stderr.readline()
+                line = line.decode()
+                if _tar_msg_re.match(line):
+                    self.log.debug('tar2_stderr: %s', line)
+                else:
+                    match = _tar_file_size_re.match(line)
+                    if match:
+                        file_size = match.groups()[0]
+                        size_func(file_size)
+                        break
+                    else:
+                        self.log.warning(
+                            'unexpected tar output (no file size report): %s',
+                            line)
+
+        return data_func(tar2_process.stdout)
+
+    def feed_tar2(self, filename, input_pipe):
+        '''Feed data from *filename* to *input_pipe*
+
+        Start a cat process to do that (do not block this process). Cat
+        subprocess instance will be in :py:attr:`tar2_feeder`
+        '''
+        assert self.tar2_feeder is None
+
+        self.tar2_feeder = subprocess.Popen(['cat', filename],
+            stdout=input_pipe)
+
+    def check_processes(self, processes):
+        '''Check if any process failed.
+
+        And if so, wait for other relevant processes to cleanup.
+        '''
+        run_error = None
+        for name, proc in processes.items():
+            if proc is None:
+                continue
+
+            if isinstance(proc, Process):
+                if not proc.is_alive() and proc.exitcode != 0:
+                    run_error = name
+                    break
+            elif proc.poll():
+                run_error = name
+                break
+
+        if run_error:
+            if run_error == "target":
+                self.collect_tar_output()
+                details = "\n".join(self.tar2_stderr)
+            else:
+                details = "%s failed" % run_error
+            if self.decryptor_process:
+                self.decryptor_process.terminate()
+                self.decryptor_process.wait()
+                self.decryptor_process = None
+            self.log.error('Error while processing \'%s\': %s',
+                self.tar2_current_file, details)
+            self.cleanup_tar2(wait=True, terminate=True)
 
     def __run__(self):
         self.log.debug("Started sending thread")
@@ -596,20 +606,19 @@ class ExtractWorker3(Process):
                         stderr=subprocess.PIPE)
                     input_pipe = self.tar2_process.stdin
 
+                self.feed_tar2(filename, input_pipe)
+
                 if inner_name in self.handlers:
                     assert redirect_stdout is subprocess.PIPE
                     data_func, size_func = self.handlers[inner_name]
                     self.import_process = multiprocessing.Process(
-                        target=self._data_func_wrapper,
+                        target=self._data_import_wrapper,
                         args=([input_pipe.fileno()],
-                        data_func, self.tar2_process.stdout))
+                        data_func, size_func, self.tar2_process))
+
                     self.import_process.start()
                     self.tar2_process.stdout.close()
-                    self.adjust_output_size = size_func
 
-                fcntl.fcntl(self.tar2_process.stderr.fileno(), fcntl.F_SETFL,
-                            fcntl.fcntl(self.tar2_process.stderr.fileno(),
-                                        fcntl.F_GETFL) | os.O_NONBLOCK)
                 self.tar2_stderr = []
             elif not self.tar2_process:
                 # Extracting of the current archive failed, skip to the next
@@ -628,35 +637,26 @@ class ExtractWorker3(Process):
                             filename, expected_filename)
                     os.remove(filename)
                     continue
+
                 self.log.debug("Releasing next chunck")
+                self.feed_tar2(filename, input_pipe)
 
             self.tar2_current_file = filename
 
-            input_file = open(filename, 'rb')
-
-            run_error = self.handle_streams(
-                input_file,
-                {'target': input_pipe},
-                {'vmproc': self.vmproc,
-                 'addproc': self.tar2_process,
-                 'data_import': self.import_process,
-                 'decryptor': self.decryptor_process,
-                },
-                progress_callback=self.progress_callback)
-            input_file.close()
-            if run_error:
-                if run_error == "target":
-                    self.collect_tar_output()
-                    details = "\n".join(self.tar2_stderr)
-                else:
-                    details = "%s failed" % run_error
-                if self.decryptor_process:
-                    self.decryptor_process.terminate()
-                    self.decryptor_process.wait()
-                    self.decryptor_process = None
-                self.log.error('Error while processing \'%s\': %s',
-                    self.tar2_current_file, details)
-                self.cleanup_tar2(wait=True, terminate=True)
+            self.tar2_feeder.wait()
+            # check if any process failed
+            processes = {
+                'target': self.tar2_feeder,
+                'vmproc': self.vmproc,
+                'addproc': self.tar2_process,
+                'data_import': self.import_process,
+                'decryptor': self.decryptor_process,
+            }
+            self.check_processes(processes)
+            self.tar2_feeder = None
+
+            if callable(self.progress_callback):
+                self.progress_callback(os.path.getsize(filename))
 
             # Delete the file as we don't need it anymore
             self.log.debug('Removing file %s', filename)