backup: convert from multiprocessing to asyncio

QubesOS/qubes-issues#2931
This commit is contained in:
Marek Marczykowski-Górecki 2017-07-20 02:48:44 +02:00
parent c17e63588e
commit d4e9120903
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -1,7 +1,7 @@
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2013-2015 Marek Marczykowski-Górecki
# Copyright (C) 2013-2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
# Copyright (C) 2013 Olivier Médoc <o_medoc@yahoo.fr>
#
@ -25,6 +25,8 @@ import logging
import functools
import termios
import asyncio
from qubes.utils import size_to_human
import stat
import os
@ -37,7 +39,6 @@ import time
import grp
import pwd
import datetime
from multiprocessing import Queue, Process
import qubes
import qubes.core2migration
import qubes.storage
@ -182,7 +183,8 @@ class BackupHeader(object):
f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
class SendWorker(Process):
class SendWorker(object):
# pylint: disable=too-few-public-methods
def __init__(self, queue, base_dir, backup_stdout):
super(SendWorker, self).__init__()
self.queue = queue
@ -190,13 +192,12 @@ class SendWorker(Process):
self.backup_stdout = backup_stdout
self.log = logging.getLogger('qubes.backup')
@asyncio.coroutine
def run(self):
self.log.debug("Started sending thread")
self.log.debug("Moving to temporary dir %s", self.base_dir)
os.chdir(self.base_dir)
for filename in iter(self.queue.get, None):
while True:
filename = yield from self.queue.get()
if filename in (QUEUE_FINISHED, QUEUE_ERROR):
break
@ -206,14 +207,11 @@ class SendWorker(Process):
# verified before untaring.
tar_final_cmd = ["tar", "-cO", "--posix",
"-C", self.base_dir, filename]
final_proc = subprocess.Popen(tar_final_cmd,
stdin=subprocess.PIPE,
stdout=self.backup_stdout)
if final_proc.wait() >= 2:
if self.queue.full():
# if queue is already full, remove some entry to wake up
# main thread, so it will be able to notice error
self.queue.get()
final_proc = yield from asyncio.create_subprocess_exec(
*tar_final_cmd,
stdout=self.backup_stdout)
retcode = yield from final_proc.wait()
if retcode >= 2:
# handle only exit code 2 (tar fatal error) or
# greater (call failed?)
raise qubes.exc.QubesException(
@ -222,11 +220,11 @@ class SendWorker(Process):
# Delete the file as we don't need it anymore
self.log.debug("Removing file {}".format(filename))
os.remove(filename)
os.remove(os.path.join(self.base_dir, filename))
self.log.debug("Finished sending thread")
@asyncio.coroutine
def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
"""Similar to pty.fork, but handle stdin/stdout according to parameters
instead of connecting to the pty
@ -244,12 +242,16 @@ def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
termios_p[3] &= ~termios.ECHO
termios.tcsetattr(ctty_fd, termios.TCSANOW, termios_p)
(pty_master, pty_slave) = os.openpty()
p = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr,
p = yield from asyncio.create_subprocess_exec(*args,
stdin=stdin,
stdout=stdout,
stderr=stderr,
preexec_fn=lambda: set_ctty(pty_slave, pty_master))
os.close(pty_slave)
return p, os.fdopen(pty_master, 'wb+', buffering=0)
return p, open(pty_master, 'wb+', buffering=0)
@asyncio.coroutine
def launch_scrypt(action, input_name, output_name, passphrase):
'''
Launch 'scrypt' process, pass passphrase to it and return
@ -262,7 +264,7 @@ def launch_scrypt(action, input_name, output_name, passphrase):
:return: subprocess.Popen object
'''
command_line = ['scrypt', action, input_name, output_name]
(p, pty) = launch_proc_with_pty(command_line,
(p, pty) = yield from launch_proc_with_pty(command_line,
stdin=subprocess.PIPE if input_name == '-' else None,
stdout=subprocess.PIPE if output_name == '-' else None,
stderr=subprocess.PIPE,
@ -272,7 +274,7 @@ def launch_scrypt(action, input_name, output_name, passphrase):
else:
prompts = (b'Please enter passphrase: ',)
for prompt in prompts:
actual_prompt = p.stderr.read(len(prompt))
actual_prompt = yield from p.stderr.read(len(prompt))
if actual_prompt != prompt:
raise qubes.exc.QubesException(
'Unexpected prompt from scrypt: {}'.format(actual_prompt))
@ -301,7 +303,7 @@ class Backup(object):
>>> }
>>> backup_op = Backup(app, vms, exclude_vms, **options)
>>> print(backup_op.get_backup_summary())
>>> backup_op.backup_do()
>>> asyncio.get_event_loop().run_until_complete(backup_op.backup_do())
See attributes of this object for all available options.
@ -393,11 +395,6 @@ class Backup(object):
else:
raise AttributeError(key)
#: whether backup was canceled
self.canceled = False
#: list of PIDs to kill on backup cancel
self.processes_to_kill_on_cancel = []
self.log = logging.getLogger('qubes.backup')
if exclude_list is None:
@ -416,17 +413,6 @@ class Backup(object):
if self.tmpdir and os.path.exists(self.tmpdir):
shutil.rmtree(self.tmpdir)
def cancel(self):
"""Cancel running backup operation. Can be called from another thread.
"""
self.canceled = True
for proc in self.processes_to_kill_on_cancel:
try:
proc.terminate()
except OSError:
pass
def get_files_to_backup(self):
files_to_backup = {}
for vm in self.vms_for_backup:
@ -554,7 +540,8 @@ class Backup(object):
return summary
def prepare_backup_header(self):
@asyncio.coroutine
def _prepare_backup_header(self):
header_file_path = os.path.join(self.tmpdir, HEADER_FILENAME)
backup_header = BackupHeader(
version=CURRENT_BACKUP_FORMAT_VERSION,
@ -569,30 +556,21 @@ class Backup(object):
# protection
scrypt_passphrase = u'{filename}!{passphrase}'.format(
filename=HEADER_FILENAME, passphrase=self.passphrase)
scrypt = launch_scrypt(
scrypt = yield from launch_scrypt(
'enc', header_file_path, header_file_path + '.hmac',
scrypt_passphrase)
if scrypt.wait() != 0:
retcode = yield from scrypt.wait()
if retcode:
raise qubes.exc.QubesException(
"Failed to compute hmac of header file: "
+ scrypt.stderr.read())
return HEADER_FILENAME, HEADER_FILENAME + ".hmac"
@staticmethod
def _queue_put_with_check(proc, vmproc, queue, element):
if queue.full():
if not proc.is_alive():
if vmproc:
message = ("Failed to write the backup, VM output:\n" +
vmproc.stderr.read())
else:
message = "Failed to write the backup. Out of disk space?"
raise qubes.exc.QubesException(message)
queue.put(element)
def _send_progress_update(self):
if not self.total_backup_bytes:
return
if callable(self.progress_callback):
progress = (
100 * (self._done_vms_bytes + self._current_vm_bytes) /
@ -604,6 +582,168 @@ class Backup(object):
self._current_vm_bytes += bytes_done
self._send_progress_update()
@asyncio.coroutine
def _split_and_send(self, input_stream, file_basename,
output_queue):
'''Split *input_stream* into parts of max *chunk_size* bytes and send
to *output_queue*.
:param input_stream: stream (asyncio reader stream) of data to split
:param file_basename: basename (i.e. without part number and '.enc')
of output files
:param output_queue: asyncio.Queue instance to put produced files to
- queue will get only filenames of written chunks
'''
# Wait for compressor (tar) process to finish or for any
# error of other subprocesses
i = 0
run_error = "size_limit"
scrypt = None
while run_error == "size_limit":
# Prepare a first chunk
chunkfile = file_basename + ".%03d.enc" % i
i += 1
# Start encrypt, scrypt will also handle integrity
# protection
scrypt_passphrase = \
u'{backup_id}!{filename}!{passphrase}'.format(
backup_id=self.backup_id,
filename=os.path.relpath(chunkfile[:-4],
self.tmpdir),
passphrase=self.passphrase)
try:
scrypt = yield from launch_scrypt(
"enc", "-", chunkfile, scrypt_passphrase)
run_error = yield from handle_streams(
input_stream,
scrypt.stdin,
self.chunk_size,
self._add_vm_progress
)
self.log.debug(
"handle_streams returned: {}".format(run_error))
except:
scrypt.terminate()
raise
scrypt.stdin.close()
yield from scrypt.wait()
self.log.debug("scrypt return code: {}".format(
scrypt.returncode))
# Send the chunk to the backup target
yield from output_queue.put(
os.path.relpath(chunkfile, self.tmpdir))
@asyncio.coroutine
def _wrap_and_send_files(self, files_to_backup, output_queue):
for vm_info in files_to_backup:
for file_info in vm_info.files:
self.log.debug("Backing up {}".format(file_info))
backup_tempfile = os.path.join(
self.tmpdir, file_info.subdir,
file_info.name)
self.log.debug("Using temporary location: {}".format(
backup_tempfile))
# Ensure the temporary directory exists
if not os.path.isdir(os.path.dirname(backup_tempfile)):
os.makedirs(os.path.dirname(backup_tempfile))
# The first tar cmd can use any complex feature as we want.
# Files will be verified before untaring this.
# Prefix the path in archive with filename["subdir"] to have it
# verified during untar
tar_cmdline = (["tar", "-Pc", '--sparse',
'-C', os.path.dirname(file_info.path)] +
(['--dereference'] if
file_info.subdir != "dom0-home/" else []) +
['--xform=s:^%s:%s\\0:' % (
os.path.basename(file_info.path),
file_info.subdir),
os.path.basename(file_info.path)
])
file_stat = os.stat(file_info.path)
if stat.S_ISBLK(file_stat.st_mode) or \
file_info.name != os.path.basename(file_info.path):
# tar doesn't handle content of block device, use our
# writer
# also use our tar writer when renaming file
assert not stat.S_ISDIR(file_stat.st_mode), \
"Renaming directories not supported"
tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
'--override-name=%s' % (
os.path.join(file_info.subdir, os.path.basename(
file_info.name))),
file_info.path]
if self.compressed:
tar_cmdline.insert(-2,
"--use-compress-program=%s" % self.compression_filter)
self.log.debug(" ".join(tar_cmdline))
# Pipe: tar-sparse | scrypt | tar | backup_target
# TODO: log handle stderr
tar_sparse = yield from asyncio.create_subprocess_exec(
*tar_cmdline, stdout=subprocess.PIPE)
try:
yield from self._split_and_send(
tar_sparse.stdout,
backup_tempfile,
output_queue)
except:
try:
tar_sparse.terminate()
except ProcessLookupError:
pass
raise
# This VM done, update progress
self._done_vms_bytes += vm_info.size
self._current_vm_bytes = 0
self._send_progress_update()
# Save date of last backup
if vm_info.vm:
vm_info.vm.backup_timestamp = datetime.datetime.now()
yield from output_queue.put(QUEUE_FINISHED)
@staticmethod
@asyncio.coroutine
def _monitor_process(proc, error_message):
try:
yield from proc.wait()
except:
proc.terminate()
raise
if proc.returncode:
raise qubes.exc.QubesException(error_message)
@staticmethod
@asyncio.coroutine
def _cancel_on_error(future, previous_task):
'''If further element of chain fail, cancel previous one to
avoid deadlock.
When earlier element of chain fail, it will be handled by
:py:meth:`backup_do`.
The chain is:
:py:meth:`_wrap_and_send_files` -> :py:class:`SendWorker` -> vmproc
'''
try:
yield from future
except: # pylint: disable=bare-except
previous_task.cancel()
@asyncio.coroutine
def backup_do(self):
# pylint: disable=too-many-statements
if self.passphrase is None:
@ -613,10 +753,12 @@ class Backup(object):
shutil.copy(qubes_xml, os.path.join(self.tmpdir, 'qubes.xml'))
qubes_xml = os.path.join(self.tmpdir, 'qubes.xml')
backup_app = qubes.Qubes(qubes_xml)
backup_app.events_enabled = False
files_to_backup = self._files_to_backup
# make sure backup_content isn't set initially
for vm in backup_app.domains:
vm.events_enabled = False
vm.features['backup-content'] = False
for qid, vm_info in files_to_backup.items():
@ -629,17 +771,16 @@ class Backup(object):
backup_app.save()
vmproc = None
tar_sparse = None
if self.target_vm is not None:
# Prepare the backup target (Qubes service call)
# If APPVM, STDOUT is a PIPE
vmproc = self.target_vm.run_service('qubes.Backup',
passio_popen=True, passio_stderr=True)
vmproc.stdin.write((self.target_dir.
read_fd, write_fd = os.pipe()
vmproc = yield from self.target_vm.run_service('qubes.Backup',
stdin=read_fd, stderr=subprocess.PIPE)
os.close(read_fd)
os.write(write_fd, (self.target_dir.
replace("\r", "").replace("\n", "") + "\n").encode())
vmproc.stdin.flush()
backup_stdout = vmproc.stdin
self.processes_to_kill_on_cancel.append(vmproc)
backup_stdout = write_fd
else:
# Prepare the backup target (local file)
if os.path.isdir(self.target_dir):
@ -662,202 +803,80 @@ class Backup(object):
# For this reason, we will use named pipes instead
self.log.debug("Working in {}".format(self.tmpdir))
backup_pipe = os.path.join(self.tmpdir, "backup_pipe")
self.log.debug("Creating pipe in: {}".format(backup_pipe))
os.mkfifo(backup_pipe)
self.log.debug("Will backup: {}".format(files_to_backup))
header_files = self.prepare_backup_header()
header_files = yield from self._prepare_backup_header()
# Setup worker to send encrypted data chunks to the backup_target
to_send = Queue(10)
to_send = asyncio.Queue(10)
send_proc = SendWorker(to_send, self.tmpdir, backup_stdout)
send_proc.start()
send_task = asyncio.ensure_future(send_proc.run())
vmproc_task = None
if vmproc is not None:
vmproc_task = asyncio.ensure_future(self._cancel_on_error(
self._monitor_process(vmproc,
'Writing backup to VM {} failed'.format(
self.target_vm.name)),
send_task))
for file_name in header_files:
to_send.put(file_name)
yield from to_send.put(file_name)
qubes_xml_info = self.VMToBackup(
None,
[self.FileToBackup(qubes_xml, '')],
''
)
for vm_info in itertools.chain([qubes_xml_info],
files_to_backup.values()):
for file_info in vm_info.files:
inner_archive_task = asyncio.ensure_future(
self._wrap_and_send_files(
itertools.chain([qubes_xml_info], files_to_backup.values()),
to_send
))
asyncio.ensure_future(
self._cancel_on_error(send_task, inner_archive_task))
self.log.debug("Backing up {}".format(file_info))
try:
try:
yield from inner_archive_task
except:
yield from to_send.put(QUEUE_ERROR)
# in fact we may be handling CancelledError, induced by
# exception in send_task (and propagated by
# self._cancel_on_error call above); in such a case this
# yield from will raise exception, covering CancelledError -
# this is intended behaviour
yield from send_task
raise
backup_tempfile = os.path.join(
self.tmpdir, file_info.subdir,
file_info.name)
self.log.debug("Using temporary location: {}".format(
backup_tempfile))
yield from send_task
# Ensure the temporary directory exists
if not os.path.isdir(os.path.dirname(backup_tempfile)):
os.makedirs(os.path.dirname(backup_tempfile))
# The first tar cmd can use any complex feature as we want.
# Files will be verified before untaring this.
# Prefix the path in archive with filename["subdir"] to have it
# verified during untar
tar_cmdline = (["tar", "-Pc", '--sparse',
"-f", backup_pipe,
'-C', os.path.dirname(file_info.path)] +
(['--dereference'] if
file_info.subdir != "dom0-home/" else []) +
['--xform=s:^%s:%s\\0:' % (
os.path.basename(file_info.path),
file_info.subdir),
os.path.basename(file_info.path)
])
file_stat = os.stat(file_info.path)
if stat.S_ISBLK(file_stat.st_mode) or \
file_info.name != os.path.basename(file_info.path):
# tar doesn't handle content of block device, use our
# writer
# also use our tar writer when renaming file
assert not stat.S_ISDIR(file_stat.st_mode),\
"Renaming directories not supported"
tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
'--override-name=%s' % (
os.path.join(file_info.subdir, os.path.basename(
file_info.name))),
file_info.path,
backup_pipe]
if self.compressed:
tar_cmdline.insert(-2,
"--use-compress-program=%s" % self.compression_filter)
self.log.debug(" ".join(tar_cmdline))
# Pipe: tar-sparse | scrypt | tar | backup_target
# TODO: log handle stderr
tar_sparse = subprocess.Popen(
tar_cmdline)
self.processes_to_kill_on_cancel.append(tar_sparse)
# Wait for compressor (tar) process to finish or for any
# error of other subprocesses
i = 0
pipe = open(backup_pipe, 'rb')
run_error = "paused"
while run_error == "paused":
# Prepare a first chunk
chunkfile = backup_tempfile + ".%03d.enc" % i
i += 1
# Start encrypt, scrypt will also handle integrity
# protection
scrypt_passphrase = \
u'{backup_id}!{filename}!{passphrase}'.format(
backup_id=self.backup_id,
filename=os.path.relpath(chunkfile[:-4],
self.tmpdir),
passphrase=self.passphrase)
scrypt = launch_scrypt(
"enc", "-", chunkfile, scrypt_passphrase)
run_error = handle_streams(
pipe,
{'backup_target': scrypt.stdin},
{'vmproc': vmproc,
'addproc': tar_sparse,
'scrypt': scrypt,
},
self.chunk_size,
self._add_vm_progress
)
self.log.debug(
"Wait_backup_feedback returned: {}".format(run_error))
if self.canceled:
try:
tar_sparse.terminate()
except OSError:
pass
tar_sparse.wait()
to_send.put(QUEUE_ERROR)
send_proc.join()
shutil.rmtree(self.tmpdir)
raise BackupCanceledError("Backup canceled")
if run_error and run_error != "size_limit":
send_proc.terminate()
if run_error == "VM" and vmproc:
raise qubes.exc.QubesException(
"Failed to write the backup, VM output:\n" +
vmproc.stderr.read(MAX_STDERR_BYTES))
else:
raise qubes.exc.QubesException(
"Failed to perform backup: error in " +
run_error)
scrypt.stdin.close()
scrypt.wait()
self.log.debug("scrypt return code: {}".format(
scrypt.poll()))
# Send the chunk to the backup target
self._queue_put_with_check(
send_proc, vmproc, to_send,
os.path.relpath(chunkfile, self.tmpdir))
if tar_sparse.poll() is None or run_error == "size_limit":
run_error = "paused"
else:
self.processes_to_kill_on_cancel.remove(tar_sparse)
self.log.debug(
"Finished tar sparse with exit code {}".format(
tar_sparse.poll()))
pipe.close()
# This VM done, update progress
self._done_vms_bytes += vm_info.size
self._current_vm_bytes = 0
self._send_progress_update()
# Save date of last backup
if vm_info.vm:
vm_info.vm.backup_timestamp = datetime.datetime.now()
self._queue_put_with_check(send_proc, vmproc, to_send, QUEUE_FINISHED)
send_proc.join()
shutil.rmtree(self.tmpdir)
if self.canceled:
raise BackupCanceledError("Backup canceled")
if send_proc.exitcode != 0:
raise qubes.exc.QubesException(
"Failed to send backup: error in the sending process")
if vmproc:
self.log.debug("VMProc1 proc return code: {}".format(vmproc.poll()))
if tar_sparse is not None:
self.log.debug("Sparse1 proc return code: {}".format(
tar_sparse.poll()))
vmproc.stdin.close()
finally:
if isinstance(backup_stdout, int):
os.close(backup_stdout)
else:
backup_stdout.close()
if vmproc_task:
yield from vmproc_task
shutil.rmtree(self.tmpdir)
self.app.save()
def handle_streams(stream_in, streams_out, processes, size_limit=None,
@asyncio.coroutine
def handle_streams(stream_in, stream_out, size_limit=None,
progress_callback=None):
'''
Copy stream_in to all streams_out and monitor all mentioned processes.
If any of them terminate with non-zero code, interrupt the process. Copy
at most `size_limit` data (if given).
:param stream_in: file-like object to read data from
:param streams_out: dict of file-like objects to write data to
:param processes: dict of subprocess.Popen objects to monitor
:param stream_in: StreamReader object to read data from
:param stream_out: StreamWriter object to write data to
:param size_limit: int maximum data amount to process
:param progress_callback: callable function to report progress, will be
given copied data size (it should accumulate internally)
:return: failed process name, failed stream name, "size_limit" or None (
no error)
:return: "size_limit" or None (no error)
'''
buffer_size = 409600
bytes_copied = 0
@ -868,42 +887,14 @@ def handle_streams(stream_in, streams_out, processes, size_limit=None,
return "size_limit"
else:
to_copy = buffer_size
buf = stream_in.read(to_copy)
buf = yield from stream_in.read(to_copy)
if not buf:
# done
return None
if callable(progress_callback):
progress_callback(len(buf))
for name, stream in streams_out.items():
if stream is None:
continue
try:
stream.write(buf)
except IOError:
return name
stream_out.write(buf)
bytes_copied += len(buf)
for name, proc in processes.items():
if proc is None:
continue
if proc.poll():
return name
def get_supported_hmac_algo(hmac_algorithm=None):
# Start with provided default
if hmac_algorithm:
yield hmac_algorithm
if hmac_algorithm != 'scrypt':
yield 'scrypt'
proc = subprocess.Popen(['openssl', 'list-message-digest-algorithms'],
stdout=subprocess.PIPE)
for algo in proc.stdout.readlines():
algo = algo.decode('ascii')
if '=>' in algo:
continue
yield algo.strip()
proc.wait()
# vim:sw=4:et: