874 行
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			874 行
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
# The Qubes OS Project, http://www.qubes-os.org
 | 
						|
#
 | 
						|
# Copyright (C) 2013-2017  Marek Marczykowski-Górecki
 | 
						|
#                                   <marmarek@invisiblethingslab.com>
 | 
						|
# Copyright (C) 2013  Olivier Médoc <o_medoc@yahoo.fr>
 | 
						|
#
 | 
						|
# This library is free software; you can redistribute it and/or
 | 
						|
# modify it under the terms of the GNU Lesser General Public
 | 
						|
# License as published by the Free Software Foundation; either
 | 
						|
# version 2.1 of the License, or (at your option) any later version.
 | 
						|
#
 | 
						|
# This library is distributed in the hope that it will be useful,
 | 
						|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | 
						|
# Lesser General Public License for more details.
 | 
						|
#
 | 
						|
# You should have received a copy of the GNU Lesser General Public
 | 
						|
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
 | 
						|
#
 | 
						|
#
 | 
						|
from __future__ import unicode_literals
 | 
						|
 | 
						|
import asyncio
 | 
						|
import datetime
 | 
						|
import fcntl
 | 
						|
import functools
 | 
						|
import grp
 | 
						|
import itertools
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
import pwd
 | 
						|
import re
 | 
						|
import shutil
 | 
						|
import stat
 | 
						|
import string
 | 
						|
import subprocess
 | 
						|
import tempfile
 | 
						|
import termios
 | 
						|
import time
 | 
						|
 | 
						|
from .utils import size_to_human
 | 
						|
import qubes
 | 
						|
import qubes.core2migration
 | 
						|
import qubes.storage
 | 
						|
import qubes.storage.file
 | 
						|
import qubes.vm.templatevm
 | 
						|
 | 
						|
QUEUE_ERROR = "ERROR"
 | 
						|
 | 
						|
QUEUE_FINISHED = "FINISHED"
 | 
						|
 | 
						|
HEADER_FILENAME = 'backup-header'
 | 
						|
DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
 | 
						|
# 'scrypt' is not exactly HMAC algorithm, but a tool we use to
 | 
						|
# integrity-protect the data
 | 
						|
DEFAULT_HMAC_ALGORITHM = 'scrypt'
 | 
						|
DEFAULT_COMPRESSION_FILTER = 'gzip'
 | 
						|
CURRENT_BACKUP_FORMAT_VERSION = '4'
 | 
						|
# Maximum size of error message get from process stderr (including VM process)
 | 
						|
MAX_STDERR_BYTES = 1024
 | 
						|
# header + qubes.xml max size
 | 
						|
HEADER_QUBES_XML_MAX_SIZE = 1024 * 1024
 | 
						|
# hmac file max size - regardless of backup format version!
 | 
						|
HMAC_MAX_SIZE = 4096
 | 
						|
 | 
						|
BLKSIZE = 512
 | 
						|
 | 
						|
_re_alphanum = re.compile(r'^[A-Za-z0-9-]*$')
 | 
						|
 | 
						|
 | 
						|
class BackupCanceledError(qubes.exc.QubesException):
 | 
						|
    def __init__(self, msg, tmpdir=None):
 | 
						|
        super(BackupCanceledError, self).__init__(msg)
 | 
						|
        self.tmpdir = tmpdir
 | 
						|
 | 
						|
 | 
						|
class BackupHeader:
 | 
						|
    '''Structure describing backup-header file included as the first file in
 | 
						|
    backup archive
 | 
						|
    '''
 | 
						|
    # pylint: disable=too-few-public-methods
 | 
						|
    header_keys = {
 | 
						|
        'version': 'version',
 | 
						|
        'encrypted': 'encrypted',
 | 
						|
        'compressed': 'compressed',
 | 
						|
        'compression-filter': 'compression_filter',
 | 
						|
        'crypto-algorithm': 'crypto_algorithm',
 | 
						|
        'hmac-algorithm': 'hmac_algorithm',
 | 
						|
        'backup-id': 'backup_id'
 | 
						|
    }
 | 
						|
    bool_options = ['encrypted', 'compressed']
 | 
						|
    int_options = ['version']
 | 
						|
 | 
						|
    def __init__(self,
 | 
						|
            version=None,
 | 
						|
            encrypted=None,
 | 
						|
            compressed=None,
 | 
						|
            compression_filter=None,
 | 
						|
            hmac_algorithm=None,
 | 
						|
            crypto_algorithm=None,
 | 
						|
            backup_id=None):
 | 
						|
        # repeat the list to help code completion...
 | 
						|
        self.version = version
 | 
						|
        self.encrypted = encrypted
 | 
						|
        self.compressed = compressed
 | 
						|
        # Options introduced in backup format 3+, which always have a header,
 | 
						|
        # so no need for fallback in function parameter
 | 
						|
        self.compression_filter = compression_filter
 | 
						|
        self.hmac_algorithm = hmac_algorithm
 | 
						|
        self.crypto_algorithm = crypto_algorithm
 | 
						|
        self.backup_id = backup_id
 | 
						|
 | 
						|
    def save(self, filename):
 | 
						|
        with open(filename, "w") as f_header:
 | 
						|
            # make sure 'version' is the first key
 | 
						|
            f_header.write('version={}\n'.format(self.version))
 | 
						|
            for key, attr in self.header_keys.items():
 | 
						|
                if key == 'version':
 | 
						|
                    continue
 | 
						|
                if getattr(self, attr) is None:
 | 
						|
                    continue
 | 
						|
                f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
 | 
						|
 | 
						|
 | 
						|
class SendWorker:
 | 
						|
    # pylint: disable=too-few-public-methods
 | 
						|
    def __init__(self, queue, base_dir, backup_stdout):
 | 
						|
        super(SendWorker, self).__init__()
 | 
						|
        self.queue = queue
 | 
						|
        self.base_dir = base_dir
 | 
						|
        self.backup_stdout = backup_stdout
 | 
						|
        self.log = logging.getLogger('qubes.backup')
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def run(self):
 | 
						|
        self.log.debug("Started sending thread")
 | 
						|
 | 
						|
        while True:
 | 
						|
            filename = yield from self.queue.get()
 | 
						|
            if filename in (QUEUE_FINISHED, QUEUE_ERROR):
 | 
						|
                break
 | 
						|
 | 
						|
            self.log.debug("Sending file {}".format(filename))
 | 
						|
            # This tar used for sending data out need to be as simple, as
 | 
						|
            # simple, as featureless as possible. It will not be
 | 
						|
            # verified before untaring.
 | 
						|
            tar_final_cmd = ["tar", "-cO", "--posix",
 | 
						|
                             "-C", self.base_dir, filename]
 | 
						|
            # pylint: disable=not-an-iterable
 | 
						|
            final_proc = yield from asyncio.create_subprocess_exec(
 | 
						|
                *tar_final_cmd,
 | 
						|
                stdout=self.backup_stdout)
 | 
						|
            retcode = yield from final_proc.wait()
 | 
						|
            if retcode >= 2:
 | 
						|
                # handle only exit code 2 (tar fatal error) or
 | 
						|
                # greater (call failed?)
 | 
						|
                raise qubes.exc.QubesException(
 | 
						|
                    "ERROR: Failed to write the backup, out of disk space? "
 | 
						|
                    "Check console output or ~/.xsession-errors for details.")
 | 
						|
 | 
						|
            # Delete the file as we don't need it anymore
 | 
						|
            self.log.debug("Removing file {}".format(filename))
 | 
						|
            os.remove(os.path.join(self.base_dir, filename))
 | 
						|
 | 
						|
        self.log.debug("Finished sending thread")
 | 
						|
 | 
						|
@asyncio.coroutine
 | 
						|
def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
 | 
						|
    """Similar to pty.fork, but handle stdin/stdout according to parameters
 | 
						|
    instead of connecting to the pty
 | 
						|
 | 
						|
    :return tuple (subprocess.Popen, pty_master)
 | 
						|
    """
 | 
						|
 | 
						|
    def set_ctty(ctty_fd, master_fd):
 | 
						|
        os.setsid()
 | 
						|
        os.close(master_fd)
 | 
						|
        fcntl.ioctl(ctty_fd, termios.TIOCSCTTY, 0)
 | 
						|
        if not echo:
 | 
						|
            termios_p = termios.tcgetattr(ctty_fd)
 | 
						|
            # termios_p.c_lflags
 | 
						|
            termios_p[3] &= ~termios.ECHO
 | 
						|
            termios.tcsetattr(ctty_fd, termios.TCSANOW, termios_p)
 | 
						|
    (pty_master, pty_slave) = os.openpty()
 | 
						|
    # pylint: disable=not-an-iterable
 | 
						|
    p = yield from asyncio.create_subprocess_exec(*args,
 | 
						|
        stdin=stdin,
 | 
						|
        stdout=stdout,
 | 
						|
        stderr=stderr,
 | 
						|
        preexec_fn=lambda: set_ctty(pty_slave, pty_master))
 | 
						|
    os.close(pty_slave)
 | 
						|
    return p, open(pty_master, 'wb+', buffering=0)
 | 
						|
 | 
						|
 | 
						|
@asyncio.coroutine
 | 
						|
def launch_scrypt(action, input_name, output_name, passphrase):
 | 
						|
    '''
 | 
						|
    Launch 'scrypt' process, pass passphrase to it and return
 | 
						|
    subprocess.Popen object.
 | 
						|
 | 
						|
    :param action: 'enc' or 'dec'
 | 
						|
    :param input_name: input path or '-' for stdin
 | 
						|
    :param output_name: output path or '-' for stdout
 | 
						|
    :param passphrase: passphrase
 | 
						|
    :type passphrase: bytes
 | 
						|
    :return: subprocess.Popen object
 | 
						|
    '''
 | 
						|
    command_line = ['scrypt', action, input_name, output_name]
 | 
						|
    (p, pty) = yield from launch_proc_with_pty(command_line,
 | 
						|
        stdin=subprocess.PIPE if input_name == '-' else None,
 | 
						|
        stdout=subprocess.PIPE if output_name == '-' else None,
 | 
						|
        stderr=subprocess.PIPE,
 | 
						|
        echo=False)
 | 
						|
    if action == 'enc':
 | 
						|
        prompts = (b'Please enter passphrase: ', b'Please confirm passphrase: ')
 | 
						|
    else:
 | 
						|
        prompts = (b'Please enter passphrase: ',)
 | 
						|
    for prompt in prompts:
 | 
						|
        actual_prompt = yield from p.stderr.read(len(prompt))
 | 
						|
        if actual_prompt != prompt:
 | 
						|
            raise qubes.exc.QubesException(
 | 
						|
                'Unexpected prompt from scrypt: {}'.format(actual_prompt))
 | 
						|
        pty.write(passphrase + b'\n')
 | 
						|
        pty.flush()
 | 
						|
    # save it here, so garbage collector would not close it (which would kill
 | 
						|
    #  the child)
 | 
						|
    p.pty = pty
 | 
						|
    return p
 | 
						|
 | 
						|
 | 
						|
class Backup:
 | 
						|
    '''Backup operation manager. Usage:
 | 
						|
 | 
						|
    >>> app = qubes.Qubes()
 | 
						|
    >>> # optional - you can use 'None' to use default list (based on
 | 
						|
    >>> #  vm.include_in_backups property)
 | 
						|
    >>> vms = [app.domains[name] for name in ['my-vm1', 'my-vm2', 'my-vm3']]
 | 
						|
    >>> exclude_vms = []
 | 
						|
    >>> options = {
 | 
						|
    >>>     'encrypted': True,
 | 
						|
    >>>     'compressed': True,
 | 
						|
    >>>     'passphrase': 'This is very weak backup passphrase',
 | 
						|
    >>>     'target_vm': app.domains['sys-usb'],
 | 
						|
    >>>     'target_dir': '/media/disk',
 | 
						|
    >>> }
 | 
						|
    >>> backup_op = Backup(app, vms, exclude_vms, **options)
 | 
						|
    >>> print(backup_op.get_backup_summary())
 | 
						|
    >>> asyncio.get_event_loop().run_until_complete(backup_op.backup_do())
 | 
						|
 | 
						|
    See attributes of this object for all available options.
 | 
						|
 | 
						|
    '''
 | 
						|
    # pylint: disable=too-many-instance-attributes
 | 
						|
    class FileToBackup:
 | 
						|
        # pylint: disable=too-few-public-methods
 | 
						|
        def __init__(self, file_path, subdir=None, name=None, size=None):
 | 
						|
            if size is None:
 | 
						|
                size = qubes.storage.file.get_disk_usage(file_path)
 | 
						|
 | 
						|
            if subdir is None:
 | 
						|
                abs_file_dir = pathlib.Path(file_path).resolve().parent
 | 
						|
                abs_base_dir = pathlib.Path(
 | 
						|
                    qubes.config.system_path["qubes_base_dir"]).resolve()
 | 
						|
                # this raises ValueError if abs_file_dir is not in abs_base_dir
 | 
						|
                subdir = str(abs_file_dir.relative_to(abs_base_dir))
 | 
						|
 | 
						|
            if not subdir.endswith(os.path.sep):
 | 
						|
                subdir += os.path.sep
 | 
						|
 | 
						|
            #: real path to the file
 | 
						|
            self.path = file_path
 | 
						|
            #: size of the file
 | 
						|
            self.size = size
 | 
						|
            #: directory in backup archive where file should be placed
 | 
						|
            self.subdir = subdir
 | 
						|
            #: use this name in the archive (aka rename)
 | 
						|
            self.name = os.path.basename(file_path)
 | 
						|
            if name is not None:
 | 
						|
                self.name = name
 | 
						|
 | 
						|
    class VMToBackup:
 | 
						|
        # pylint: disable=too-few-public-methods
 | 
						|
        def __init__(self, vm, files, subdir):
 | 
						|
            self.vm = vm
 | 
						|
            self.files = files
 | 
						|
            self.subdir = subdir
 | 
						|
 | 
						|
        @property
 | 
						|
        def size(self):
 | 
						|
            return functools.reduce(lambda x, y: x + y.size, self.files, 0)
 | 
						|
 | 
						|
    def __init__(self, app, vms_list=None, exclude_list=None, **kwargs):
 | 
						|
        """
 | 
						|
        If vms = None, include all (sensible) VMs;
 | 
						|
        exclude_list is always applied
 | 
						|
        """
 | 
						|
        super(Backup, self).__init__()
 | 
						|
 | 
						|
        #: progress of the backup - bytes handled of the current VM
 | 
						|
        self.chunk_size = 100 * 1024 * 1024
 | 
						|
        self._current_vm_bytes = 0
 | 
						|
        #: progress of the backup - bytes handled of finished VMs
 | 
						|
        self._done_vms_bytes = 0
 | 
						|
        #: total backup size (set by :py:meth:`get_files_to_backup`)
 | 
						|
        self.total_backup_bytes = 0
 | 
						|
        #: application object
 | 
						|
        self.app = app
 | 
						|
        #: directory for temporary files - set after creating the directory
 | 
						|
        self.tmpdir = None
 | 
						|
 | 
						|
        # Backup settings - defaults
 | 
						|
        #: should the backup be compressed?
 | 
						|
        self.compressed = True
 | 
						|
        #: what passphrase should be used to intergrity protect (and encrypt)
 | 
						|
        #: the backup; required
 | 
						|
        self.passphrase = None
 | 
						|
        #: custom compression filter; a program which process stdin to stdout
 | 
						|
        self.compression_filter = DEFAULT_COMPRESSION_FILTER
 | 
						|
        #: VM to which backup should be sent (if any)
 | 
						|
        self.target_vm = None
 | 
						|
        #: directory to save backup in (either in dom0 or target VM,
 | 
						|
        #: depending on :py:attr:`target_vm`
 | 
						|
        self.target_dir = None
 | 
						|
        #: callback for progress reporting. Will be called with one argument
 | 
						|
        #: - progress in percents
 | 
						|
        self.progress_callback = None
 | 
						|
        #: backup ID, needs to be unique (for a given user),
 | 
						|
        #: not necessary unpredictable; automatically generated
 | 
						|
        self.backup_id = datetime.datetime.now().strftime(
 | 
						|
            '%Y%m%dT%H%M%S-' + str(os.getpid()))
 | 
						|
 | 
						|
        for key, value in kwargs.items():
 | 
						|
            if hasattr(self, key):
 | 
						|
                setattr(self, key, value)
 | 
						|
            else:
 | 
						|
                raise AttributeError(key)
 | 
						|
 | 
						|
        self.log = logging.getLogger('qubes.backup')
 | 
						|
 | 
						|
        if exclude_list is None:
 | 
						|
            exclude_list = []
 | 
						|
 | 
						|
        if vms_list is None:
 | 
						|
            vms_list = [vm for vm in app.domains if vm.include_in_backups]
 | 
						|
 | 
						|
        # Apply exclude list
 | 
						|
        self.vms_for_backup = [vm for vm in vms_list
 | 
						|
            if vm.name not in exclude_list]
 | 
						|
 | 
						|
        self._files_to_backup = self.get_files_to_backup()
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        if self.tmpdir and os.path.exists(self.tmpdir):
 | 
						|
            shutil.rmtree(self.tmpdir)
 | 
						|
 | 
						|
    def get_files_to_backup(self):
 | 
						|
        files_to_backup = {}
 | 
						|
        for vm in self.vms_for_backup:
 | 
						|
            if vm.qid == 0:
 | 
						|
                # handle dom0 later
 | 
						|
                continue
 | 
						|
 | 
						|
            subdir = 'vm%d/' % vm.qid
 | 
						|
 | 
						|
            vm_files = []
 | 
						|
            for name, volume in vm.volumes.items():
 | 
						|
                if not volume.save_on_stop:
 | 
						|
                    continue
 | 
						|
                vm_files.append(self.FileToBackup(
 | 
						|
                    volume.export(),
 | 
						|
                    subdir,
 | 
						|
                    name + '.img',
 | 
						|
                    volume.usage))
 | 
						|
 | 
						|
            vm_files.extend(self.FileToBackup(i, subdir)
 | 
						|
                for i in vm.fire_event('backup-get-files'))
 | 
						|
 | 
						|
            firewall_conf = os.path.join(vm.dir_path, vm.firewall_conf)
 | 
						|
            if os.path.exists(firewall_conf):
 | 
						|
                vm_files.append(self.FileToBackup(firewall_conf, subdir))
 | 
						|
 | 
						|
            if not vm_files:
 | 
						|
                # subdir/ is needed in the tar file, otherwise restore
 | 
						|
                # of a (Disp)VM without any backed up files is going
 | 
						|
                # to fail. Adding a zero-sized file here happens to be
 | 
						|
                # more straightforward than adding an empty directory.
 | 
						|
                empty = self.FileToBackup("/var/run/qubes/empty", subdir)
 | 
						|
                assert empty.size == 0
 | 
						|
                vm_files.append(empty)
 | 
						|
 | 
						|
            files_to_backup[vm.qid] = self.VMToBackup(vm, vm_files, subdir)
 | 
						|
 | 
						|
        # Dom0 user home
 | 
						|
        if 0 in [vm.qid for vm in self.vms_for_backup]:
 | 
						|
            local_user = grp.getgrnam('qubes').gr_mem[0]
 | 
						|
            home_dir = pwd.getpwnam(local_user).pw_dir
 | 
						|
            # Home dir should have only user-owned files, so fix it now
 | 
						|
            # to prevent permissions problems - some root-owned files can
 | 
						|
            # left after 'sudo bash' and similar commands
 | 
						|
            subprocess.check_call(['sudo', 'chown', '-R', local_user, home_dir])
 | 
						|
 | 
						|
            home_to_backup = [
 | 
						|
                self.FileToBackup(home_dir, 'dom0-home/')]
 | 
						|
            vm_files = home_to_backup
 | 
						|
 | 
						|
            files_to_backup[0] = self.VMToBackup(self.app.domains[0],
 | 
						|
                vm_files,
 | 
						|
                os.path.join('dom0-home', os.path.basename(home_dir)))
 | 
						|
 | 
						|
        self.total_backup_bytes = functools.reduce(
 | 
						|
            lambda x, y: x + y.size, files_to_backup.values(), 0)
 | 
						|
        return files_to_backup
 | 
						|
 | 
						|
 | 
						|
    def get_backup_summary(self):
 | 
						|
        summary = ""
 | 
						|
 | 
						|
        fields_to_display = [
 | 
						|
            {"name": "VM", "width": 16},
 | 
						|
            {"name": "type", "width": 12},
 | 
						|
            {"name": "size", "width": 12}
 | 
						|
        ]
 | 
						|
 | 
						|
        # Display the header
 | 
						|
        for field in fields_to_display:
 | 
						|
            fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
 | 
						|
            summary += fmt.format('-')
 | 
						|
        summary += "\n"
 | 
						|
        for field in fields_to_display:
 | 
						|
            fmt = "{{0:>{0}}} |".format(field["width"] + 1)
 | 
						|
            summary += fmt.format(field["name"])
 | 
						|
        summary += "\n"
 | 
						|
        for field in fields_to_display:
 | 
						|
            fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
 | 
						|
            summary += fmt.format('-')
 | 
						|
        summary += "\n"
 | 
						|
 | 
						|
        files_to_backup = self._files_to_backup
 | 
						|
 | 
						|
        for qid, vm_info in files_to_backup.items():
 | 
						|
            summary_line = ""
 | 
						|
            fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
 | 
						|
            summary_line += fmt.format(vm_info.vm.name)
 | 
						|
 | 
						|
            fmt = "{{0:>{0}}} |".format(fields_to_display[1]["width"] + 1)
 | 
						|
            if qid == 0:
 | 
						|
                summary_line += fmt.format("User home")
 | 
						|
            elif isinstance(vm_info.vm, qubes.vm.templatevm.TemplateVM):
 | 
						|
                summary_line += fmt.format("Template VM")
 | 
						|
            else:
 | 
						|
                summary_line += fmt.format("VM" + (" + Sys" if
 | 
						|
                    vm_info.vm.updateable else ""))
 | 
						|
 | 
						|
            vm_size = vm_info.size
 | 
						|
 | 
						|
            fmt = "{{0:>{0}}} |".format(fields_to_display[2]["width"] + 1)
 | 
						|
            summary_line += fmt.format(size_to_human(vm_size))
 | 
						|
 | 
						|
            if qid != 0 and vm_info.vm.is_running():
 | 
						|
                summary_line += " <-- The VM is running, backup will contain " \
 | 
						|
                                "its state from before its start!"
 | 
						|
 | 
						|
            summary += summary_line + "\n"
 | 
						|
 | 
						|
        for field in fields_to_display:
 | 
						|
            fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
 | 
						|
            summary += fmt.format('-')
 | 
						|
        summary += "\n"
 | 
						|
 | 
						|
        fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
 | 
						|
        summary += fmt.format("Total size:")
 | 
						|
        fmt = "{{0:>{0}}} |".format(
 | 
						|
            fields_to_display[1]["width"] + 1 + 2 + fields_to_display[2][
 | 
						|
                "width"] + 1)
 | 
						|
        summary += fmt.format(size_to_human(self.total_backup_bytes))
 | 
						|
        summary += "\n"
 | 
						|
 | 
						|
        for field in fields_to_display:
 | 
						|
            fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
 | 
						|
            summary += fmt.format('-')
 | 
						|
        summary += "\n"
 | 
						|
 | 
						|
        vms_not_for_backup = [vm.name for vm in self.app.domains
 | 
						|
                              if vm not in self.vms_for_backup]
 | 
						|
        summary += "VMs not selected for backup:\n - " + "\n - ".join(
 | 
						|
            sorted(vms_not_for_backup)) + "\n"
 | 
						|
 | 
						|
        return summary
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def _prepare_backup_header(self):
 | 
						|
        header_file_path = os.path.join(self.tmpdir, HEADER_FILENAME)
 | 
						|
        backup_header = BackupHeader(
 | 
						|
            version=CURRENT_BACKUP_FORMAT_VERSION,
 | 
						|
            hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
 | 
						|
            encrypted=True,
 | 
						|
            compressed=self.compressed,
 | 
						|
            compression_filter=self.compression_filter,
 | 
						|
            backup_id=self.backup_id,
 | 
						|
        )
 | 
						|
        backup_header.save(header_file_path)
 | 
						|
        # Start encrypt, scrypt will also handle integrity
 | 
						|
        # protection
 | 
						|
        scrypt_passphrase = '{filename}!'.format(
 | 
						|
            filename=HEADER_FILENAME).encode() + self.passphrase
 | 
						|
        scrypt = yield from launch_scrypt(
 | 
						|
            'enc', header_file_path, header_file_path + '.hmac',
 | 
						|
            scrypt_passphrase)
 | 
						|
 | 
						|
        retcode = yield from scrypt.wait()
 | 
						|
        if retcode:
 | 
						|
            raise qubes.exc.QubesException(
 | 
						|
                "Failed to compute hmac of header file: "
 | 
						|
                + scrypt.stderr.read())
 | 
						|
        return HEADER_FILENAME, HEADER_FILENAME + ".hmac"
 | 
						|
 | 
						|
 | 
						|
    def _send_progress_update(self):
 | 
						|
        if not self.total_backup_bytes:
 | 
						|
            return
 | 
						|
        if callable(self.progress_callback):
 | 
						|
            progress = (
 | 
						|
                100 * (self._done_vms_bytes + self._current_vm_bytes) /
 | 
						|
                self.total_backup_bytes)
 | 
						|
            # pylint: disable=not-callable
 | 
						|
            self.progress_callback(progress)
 | 
						|
 | 
						|
    def _add_vm_progress(self, bytes_done):
 | 
						|
        self._current_vm_bytes += bytes_done
 | 
						|
        self._send_progress_update()
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def _split_and_send(self, input_stream, file_basename,
 | 
						|
            output_queue):
 | 
						|
        '''Split *input_stream* into parts of max *chunk_size* bytes and send
 | 
						|
        to *output_queue*.
 | 
						|
 | 
						|
        :param input_stream: stream (asyncio reader stream) of data to split
 | 
						|
        :param file_basename: basename (i.e. without part number and '.enc')
 | 
						|
        of output files
 | 
						|
        :param output_queue: asyncio.Queue instance to put produced files to
 | 
						|
        - queue will get only filenames of written chunks
 | 
						|
        '''
 | 
						|
        # Wait for compressor (tar) process to finish or for any
 | 
						|
        # error of other subprocesses
 | 
						|
        i = 0
 | 
						|
        run_error = "size_limit"
 | 
						|
        scrypt = None
 | 
						|
        while run_error == "size_limit":
 | 
						|
            # Prepare a first chunk
 | 
						|
            chunkfile = file_basename + ".%03d.enc" % i
 | 
						|
            i += 1
 | 
						|
 | 
						|
            # Start encrypt, scrypt will also handle integrity
 | 
						|
            # protection
 | 
						|
            scrypt_passphrase = \
 | 
						|
                '{backup_id}!{filename}!'.format(
 | 
						|
                    backup_id=self.backup_id,
 | 
						|
                    filename=os.path.relpath(chunkfile[:-4],
 | 
						|
                        self.tmpdir)).encode() + self.passphrase
 | 
						|
            try:
 | 
						|
                scrypt = yield from launch_scrypt(
 | 
						|
                    "enc", "-", chunkfile, scrypt_passphrase)
 | 
						|
 | 
						|
                run_error = yield from handle_streams(
 | 
						|
                    input_stream,
 | 
						|
                    scrypt.stdin,
 | 
						|
                    self.chunk_size,
 | 
						|
                    self._add_vm_progress
 | 
						|
                )
 | 
						|
 | 
						|
                self.log.debug(
 | 
						|
                    "handle_streams returned: {}".format(run_error))
 | 
						|
            except:
 | 
						|
                scrypt.terminate()
 | 
						|
                raise
 | 
						|
 | 
						|
            scrypt.stdin.close()
 | 
						|
            yield from scrypt.wait()
 | 
						|
            self.log.debug("scrypt return code: {}".format(
 | 
						|
                scrypt.returncode))
 | 
						|
 | 
						|
            # Send the chunk to the backup target
 | 
						|
            yield from output_queue.put(
 | 
						|
                os.path.relpath(chunkfile, self.tmpdir))
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def _wrap_and_send_files(self, files_to_backup, output_queue):
 | 
						|
        for vm_info in files_to_backup:
 | 
						|
            for file_info in vm_info.files:
 | 
						|
 | 
						|
                self.log.debug("Backing up {}".format(file_info))
 | 
						|
 | 
						|
                backup_tempfile = os.path.join(
 | 
						|
                    self.tmpdir, file_info.subdir,
 | 
						|
                    file_info.name)
 | 
						|
                self.log.debug("Using temporary location: {}".format(
 | 
						|
                    backup_tempfile))
 | 
						|
 | 
						|
                # Ensure the temporary directory exists
 | 
						|
                if not os.path.isdir(os.path.dirname(backup_tempfile)):
 | 
						|
                    os.makedirs(os.path.dirname(backup_tempfile))
 | 
						|
 | 
						|
                # The first tar cmd can use any complex feature as we want.
 | 
						|
                # Files will be verified before untaring this.
 | 
						|
                # Prefix the path in archive with filename["subdir"] to have it
 | 
						|
                # verified during untar
 | 
						|
                tar_cmdline = (["tar", "-Pc", '--sparse',
 | 
						|
                                '-C', os.path.dirname(file_info.path)] +
 | 
						|
                               (['--dereference'] if
 | 
						|
                               file_info.subdir != "dom0-home/" else []) +
 | 
						|
                               ['--xform=s:^%s:%s\\0:' % (
 | 
						|
                                   os.path.basename(file_info.path),
 | 
						|
                                   file_info.subdir),
 | 
						|
                                   os.path.basename(file_info.path)
 | 
						|
                               ])
 | 
						|
                file_stat = os.stat(file_info.path)
 | 
						|
                if stat.S_ISBLK(file_stat.st_mode) or \
 | 
						|
                        file_info.name != os.path.basename(file_info.path):
 | 
						|
                    # tar doesn't handle content of block device, use our
 | 
						|
                    # writer
 | 
						|
                    # also use our tar writer when renaming file
 | 
						|
                    assert not stat.S_ISDIR(file_stat.st_mode), \
 | 
						|
                        "Renaming directories not supported"
 | 
						|
                    tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
 | 
						|
                        '--override-name=%s' % (
 | 
						|
                            os.path.join(file_info.subdir, os.path.basename(
 | 
						|
                                file_info.name))),
 | 
						|
                        file_info.path]
 | 
						|
                if self.compressed:
 | 
						|
                    tar_cmdline.insert(-2,
 | 
						|
                        "--use-compress-program=%s" % self.compression_filter)
 | 
						|
 | 
						|
                self.log.debug(" ".join(tar_cmdline))
 | 
						|
 | 
						|
                # Pipe: tar-sparse | scrypt | tar | backup_target
 | 
						|
                # TODO: log handle stderr
 | 
						|
                # pylint: disable=not-an-iterable
 | 
						|
                tar_sparse = yield from asyncio.create_subprocess_exec(
 | 
						|
                    *tar_cmdline, stdout=subprocess.PIPE)
 | 
						|
 | 
						|
                try:
 | 
						|
                    yield from self._split_and_send(
 | 
						|
                        tar_sparse.stdout,
 | 
						|
                        backup_tempfile,
 | 
						|
                        output_queue)
 | 
						|
                except:
 | 
						|
                    try:
 | 
						|
                        tar_sparse.terminate()
 | 
						|
                    except ProcessLookupError:
 | 
						|
                        pass
 | 
						|
                    raise
 | 
						|
 | 
						|
                yield from tar_sparse.wait()
 | 
						|
                if tar_sparse.returncode:
 | 
						|
                    raise qubes.exc.QubesException(
 | 
						|
                        'Failed to archive {} file'.format(file_info.path))
 | 
						|
 | 
						|
 | 
						|
            # This VM done, update progress
 | 
						|
            self._done_vms_bytes += vm_info.size
 | 
						|
            self._current_vm_bytes = 0
 | 
						|
            self._send_progress_update()
 | 
						|
 | 
						|
        yield from output_queue.put(QUEUE_FINISHED)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    @asyncio.coroutine
 | 
						|
    def _monitor_process(proc, error_message):
 | 
						|
        try:
 | 
						|
            yield from proc.wait()
 | 
						|
        except:
 | 
						|
            proc.terminate()
 | 
						|
            raise
 | 
						|
 | 
						|
        if proc.returncode:
 | 
						|
            if proc.stderr is not None:
 | 
						|
                proc_stderr = (yield from proc.stderr.read())
 | 
						|
                proc_stderr = proc_stderr.decode('ascii', errors='ignore')
 | 
						|
                proc_stderr = ''.join(
 | 
						|
                    c for c in proc_stderr if c in string.printable and
 | 
						|
                                              c not in '\r\n%{}')
 | 
						|
                error_message += ': ' + proc_stderr
 | 
						|
            raise qubes.exc.QubesException(error_message)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    @asyncio.coroutine
 | 
						|
    def _cancel_on_error(future, previous_task):
 | 
						|
        '''If further element of chain fail, cancel previous one to
 | 
						|
        avoid deadlock.
 | 
						|
        When earlier element of chain fail, it will be handled by
 | 
						|
        :py:meth:`backup_do`.
 | 
						|
 | 
						|
        The chain is:
 | 
						|
        :py:meth:`_wrap_and_send_files` -> :py:class:`SendWorker` -> vmproc
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            yield from future
 | 
						|
        except:  # pylint: disable=bare-except
 | 
						|
            previous_task.cancel()
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def backup_do(self):
 | 
						|
        # pylint: disable=too-many-statements
 | 
						|
        if self.passphrase is None:
 | 
						|
            raise qubes.exc.QubesException("No passphrase set")
 | 
						|
        if not isinstance(self.passphrase, bytes):
 | 
						|
            self.passphrase = self.passphrase.encode('utf-8')
 | 
						|
        qubes_xml = self.app.store
 | 
						|
        self.tmpdir = tempfile.mkdtemp()
 | 
						|
        shutil.copy(qubes_xml, os.path.join(self.tmpdir, 'qubes.xml'))
 | 
						|
        qubes_xml = os.path.join(self.tmpdir, 'qubes.xml')
 | 
						|
        backup_app = qubes.Qubes(qubes_xml, offline_mode=True)
 | 
						|
        backup_app.events_enabled = False
 | 
						|
 | 
						|
        files_to_backup = self._files_to_backup
 | 
						|
        # make sure backup_content isn't set initially
 | 
						|
        for vm in backup_app.domains:
 | 
						|
            vm.events_enabled = False
 | 
						|
            vm.features['backup-content'] = False
 | 
						|
 | 
						|
        for qid, vm_info in files_to_backup.items():
 | 
						|
            # VM is included in the backup
 | 
						|
            backup_app.domains[qid].features['backup-content'] = True
 | 
						|
            backup_app.domains[qid].features['backup-path'] = vm_info.subdir
 | 
						|
            backup_app.domains[qid].features['backup-size'] = vm_info.size
 | 
						|
        backup_app.save()
 | 
						|
        del backup_app
 | 
						|
 | 
						|
        vmproc = None
 | 
						|
        if self.target_vm is not None:
 | 
						|
            # Prepare the backup target (Qubes service call)
 | 
						|
            # If APPVM, STDOUT is a PIPE
 | 
						|
            read_fd, write_fd = os.pipe()
 | 
						|
            vmproc = yield from self.target_vm.run_service('qubes.Backup',
 | 
						|
                stdin=read_fd,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.DEVNULL)
 | 
						|
            os.close(read_fd)
 | 
						|
            os.write(write_fd, (self.target_dir.
 | 
						|
                replace("\r", "").replace("\n", "") + "\n").encode())
 | 
						|
            backup_stdout = write_fd
 | 
						|
        else:
 | 
						|
            # Prepare the backup target (local file)
 | 
						|
            if os.path.isdir(self.target_dir):
 | 
						|
                backup_target = self.target_dir + "/qubes-{0}". \
 | 
						|
                    format(time.strftime("%Y-%m-%dT%H%M%S"))
 | 
						|
            else:
 | 
						|
                backup_target = self.target_dir
 | 
						|
 | 
						|
                # Create the target directory
 | 
						|
                if not os.path.exists(os.path.dirname(self.target_dir)):
 | 
						|
                    raise qubes.exc.QubesException(
 | 
						|
                        "ERROR: the backup directory for {0} does not exists".
 | 
						|
                        format(self.target_dir))
 | 
						|
 | 
						|
            # If not APPVM, STDOUT is a local file
 | 
						|
            backup_stdout = open(backup_target, 'wb')
 | 
						|
 | 
						|
        # Tar with tape length does not deals well with stdout
 | 
						|
        # (close stdout between two tapes)
 | 
						|
        # For this reason, we will use named pipes instead
 | 
						|
        self.log.debug("Working in {}".format(self.tmpdir))
 | 
						|
 | 
						|
        self.log.debug("Will backup: {}".format(files_to_backup))
 | 
						|
 | 
						|
        header_files = yield from self._prepare_backup_header()
 | 
						|
 | 
						|
        # Setup worker to send encrypted data chunks to the backup_target
 | 
						|
        to_send = asyncio.Queue(10)
 | 
						|
        send_proc = SendWorker(to_send, self.tmpdir, backup_stdout)
 | 
						|
        send_task = asyncio.ensure_future(send_proc.run())
 | 
						|
 | 
						|
        vmproc_task = None
 | 
						|
        if vmproc is not None:
 | 
						|
            vmproc_task = asyncio.ensure_future(
 | 
						|
                self._monitor_process(vmproc,
 | 
						|
                    'Writing backup to VM {} failed'.format(
 | 
						|
                        self.target_vm.name)))
 | 
						|
            asyncio.ensure_future(self._cancel_on_error(
 | 
						|
                vmproc_task, send_task))
 | 
						|
 | 
						|
        for file_name in header_files:
 | 
						|
            yield from to_send.put(file_name)
 | 
						|
 | 
						|
        qubes_xml_info = self.VMToBackup(
 | 
						|
            None,
 | 
						|
            [self.FileToBackup(qubes_xml, '')],
 | 
						|
            ''
 | 
						|
        )
 | 
						|
        inner_archive_task = asyncio.ensure_future(
 | 
						|
            self._wrap_and_send_files(
 | 
						|
                itertools.chain([qubes_xml_info], files_to_backup.values()),
 | 
						|
                to_send
 | 
						|
            ))
 | 
						|
        asyncio.ensure_future(
 | 
						|
            self._cancel_on_error(send_task, inner_archive_task))
 | 
						|
 | 
						|
        try:
 | 
						|
            try:
 | 
						|
                yield from inner_archive_task
 | 
						|
            except:
 | 
						|
                yield from to_send.put(QUEUE_ERROR)
 | 
						|
                # in fact we may be handling CancelledError, induced by
 | 
						|
                # exception in send_task or vmproc_task (and propagated by
 | 
						|
                # self._cancel_on_error call above); in such a case this
 | 
						|
                # yield from will raise exception, covering CancelledError -
 | 
						|
                # this is intended behaviour
 | 
						|
                if vmproc_task:
 | 
						|
                    yield from vmproc_task
 | 
						|
                yield from send_task
 | 
						|
                raise
 | 
						|
 | 
						|
            yield from send_task
 | 
						|
 | 
						|
        finally:
 | 
						|
            if isinstance(backup_stdout, int):
 | 
						|
                os.close(backup_stdout)
 | 
						|
            else:
 | 
						|
                backup_stdout.close()
 | 
						|
            try:
 | 
						|
                if vmproc_task:
 | 
						|
                    yield from vmproc_task
 | 
						|
            finally:
 | 
						|
                shutil.rmtree(self.tmpdir)
 | 
						|
 | 
						|
        # Save date of last backup, only when backup succeeded
 | 
						|
        for qid, vm_info in files_to_backup.items():
 | 
						|
            if vm_info.vm:
 | 
						|
                vm_info.vm.backup_timestamp = \
 | 
						|
                    int(datetime.datetime.now().strftime('%s'))
 | 
						|
 | 
						|
        self.app.save()
 | 
						|
 | 
						|
 | 
						|
@asyncio.coroutine
 | 
						|
def handle_streams(stream_in, stream_out, size_limit=None,
 | 
						|
        progress_callback=None):
 | 
						|
    '''
 | 
						|
    Copy stream_in to all streams_out and monitor all mentioned processes.
 | 
						|
    If any of them terminate with non-zero code, interrupt the process. Copy
 | 
						|
    at most `size_limit` data (if given).
 | 
						|
 | 
						|
    :param stream_in: StreamReader object to read data from
 | 
						|
    :param stream_out: StreamWriter object to write data to
 | 
						|
    :param size_limit: int maximum data amount to process
 | 
						|
    :param progress_callback: callable function to report progress, will be
 | 
						|
        given copied data size (it should accumulate internally)
 | 
						|
    :return: "size_limit" or None (no error)
 | 
						|
    '''
 | 
						|
    buffer_size = 409600
 | 
						|
    bytes_copied = 0
 | 
						|
    while True:
 | 
						|
        if size_limit:
 | 
						|
            to_copy = min(buffer_size, size_limit - bytes_copied)
 | 
						|
            if to_copy <= 0:
 | 
						|
                return "size_limit"
 | 
						|
        else:
 | 
						|
            to_copy = buffer_size
 | 
						|
        buf = yield from stream_in.read(to_copy)
 | 
						|
        if not buf:
 | 
						|
            # done
 | 
						|
            break
 | 
						|
 | 
						|
        if callable(progress_callback):
 | 
						|
            progress_callback(len(buf))
 | 
						|
        stream_out.write(buf)
 | 
						|
        bytes_copied += len(buf)
 | 
						|
    return None
 | 
						|
 | 
						|
# vim:sw=4:et:
 |