123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875 |
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2013-2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- # Copyright (C) 2013 Olivier Médoc <o_medoc@yahoo.fr>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- #
- from __future__ import unicode_literals
- import itertools
- import logging
- import functools
- import string
- import termios
- import asyncio
- from qubes.utils import size_to_human
- import stat
- import os
- import fcntl
- import subprocess
- import re
- import shutil
- import tempfile
- import time
- import grp
- import pwd
- import datetime
- import qubes
- import qubes.core2migration
- import qubes.storage
- import qubes.storage.file
- import qubes.vm.templatevm
- QUEUE_ERROR = "ERROR"
- QUEUE_FINISHED = "FINISHED"
- HEADER_FILENAME = 'backup-header'
- DEFAULT_CRYPTO_ALGORITHM = 'aes-256-cbc'
- # 'scrypt' is not exactly HMAC algorithm, but a tool we use to
- # integrity-protect the data
- DEFAULT_HMAC_ALGORITHM = 'scrypt'
- DEFAULT_COMPRESSION_FILTER = 'gzip'
- CURRENT_BACKUP_FORMAT_VERSION = '4'
- # Maximum size of error message get from process stderr (including VM process)
- MAX_STDERR_BYTES = 1024
- # header + qubes.xml max size
- HEADER_QUBES_XML_MAX_SIZE = 1024 * 1024
- # hmac file max size - regardless of backup format version!
- HMAC_MAX_SIZE = 4096
- BLKSIZE = 512
- _re_alphanum = re.compile(r'^[A-Za-z0-9-]*$')
- class BackupCanceledError(qubes.exc.QubesException):
- def __init__(self, msg, tmpdir=None):
- super(BackupCanceledError, self).__init__(msg)
- self.tmpdir = tmpdir
- class BackupHeader:
- '''Structure describing backup-header file included as the first file in
- backup archive
- '''
- # pylint: disable=too-few-public-methods
- header_keys = {
- 'version': 'version',
- 'encrypted': 'encrypted',
- 'compressed': 'compressed',
- 'compression-filter': 'compression_filter',
- 'crypto-algorithm': 'crypto_algorithm',
- 'hmac-algorithm': 'hmac_algorithm',
- 'backup-id': 'backup_id'
- }
- bool_options = ['encrypted', 'compressed']
- int_options = ['version']
- def __init__(self,
- version=None,
- encrypted=None,
- compressed=None,
- compression_filter=None,
- hmac_algorithm=None,
- crypto_algorithm=None,
- backup_id=None):
- # repeat the list to help code completion...
- self.version = version
- self.encrypted = encrypted
- self.compressed = compressed
- # Options introduced in backup format 3+, which always have a header,
- # so no need for fallback in function parameter
- self.compression_filter = compression_filter
- self.hmac_algorithm = hmac_algorithm
- self.crypto_algorithm = crypto_algorithm
- self.backup_id = backup_id
- def save(self, filename):
- with open(filename, "w") as f_header:
- # make sure 'version' is the first key
- f_header.write('version={}\n'.format(self.version))
- for key, attr in self.header_keys.items():
- if key == 'version':
- continue
- if getattr(self, attr) is None:
- continue
- f_header.write("{!s}={!s}\n".format(key, getattr(self, attr)))
- class SendWorker:
- # pylint: disable=too-few-public-methods
- def __init__(self, queue, base_dir, backup_stdout):
- super(SendWorker, self).__init__()
- self.queue = queue
- self.base_dir = base_dir
- self.backup_stdout = backup_stdout
- self.log = logging.getLogger('qubes.backup')
- @asyncio.coroutine
- def run(self):
- self.log.debug("Started sending thread")
- while True:
- filename = yield from self.queue.get()
- if filename in (QUEUE_FINISHED, QUEUE_ERROR):
- break
- self.log.debug("Sending file {}".format(filename))
- # This tar used for sending data out need to be as simple, as
- # simple, as featureless as possible. It will not be
- # verified before untaring.
- tar_final_cmd = ["tar", "-cO", "--posix",
- "-C", self.base_dir, filename]
- # pylint: disable=not-an-iterable
- final_proc = yield from asyncio.create_subprocess_exec(
- *tar_final_cmd,
- stdout=self.backup_stdout)
- retcode = yield from final_proc.wait()
- if retcode >= 2:
- # handle only exit code 2 (tar fatal error) or
- # greater (call failed?)
- raise qubes.exc.QubesException(
- "ERROR: Failed to write the backup, out of disk space? "
- "Check console output or ~/.xsession-errors for details.")
- # Delete the file as we don't need it anymore
- self.log.debug("Removing file {}".format(filename))
- os.remove(os.path.join(self.base_dir, filename))
- self.log.debug("Finished sending thread")
- @asyncio.coroutine
- def launch_proc_with_pty(args, stdin=None, stdout=None, stderr=None, echo=True):
- """Similar to pty.fork, but handle stdin/stdout according to parameters
- instead of connecting to the pty
- :return tuple (subprocess.Popen, pty_master)
- """
- def set_ctty(ctty_fd, master_fd):
- os.setsid()
- os.close(master_fd)
- fcntl.ioctl(ctty_fd, termios.TIOCSCTTY, 0)
- if not echo:
- termios_p = termios.tcgetattr(ctty_fd)
- # termios_p.c_lflags
- termios_p[3] &= ~termios.ECHO
- termios.tcsetattr(ctty_fd, termios.TCSANOW, termios_p)
- (pty_master, pty_slave) = os.openpty()
- # pylint: disable=not-an-iterable
- p = yield from asyncio.create_subprocess_exec(*args,
- stdin=stdin,
- stdout=stdout,
- stderr=stderr,
- preexec_fn=lambda: set_ctty(pty_slave, pty_master))
- os.close(pty_slave)
- return p, open(pty_master, 'wb+', buffering=0)
- @asyncio.coroutine
- def launch_scrypt(action, input_name, output_name, passphrase):
- '''
- Launch 'scrypt' process, pass passphrase to it and return
- subprocess.Popen object.
- :param action: 'enc' or 'dec'
- :param input_name: input path or '-' for stdin
- :param output_name: output path or '-' for stdout
- :param passphrase: passphrase
- :type passphrase: bytes
- :return: subprocess.Popen object
- '''
- command_line = ['scrypt', action, input_name, output_name]
- (p, pty) = yield from launch_proc_with_pty(command_line,
- stdin=subprocess.PIPE if input_name == '-' else None,
- stdout=subprocess.PIPE if output_name == '-' else None,
- stderr=subprocess.PIPE,
- echo=False)
- if action == 'enc':
- prompts = (b'Please enter passphrase: ', b'Please confirm passphrase: ')
- else:
- prompts = (b'Please enter passphrase: ',)
- for prompt in prompts:
- actual_prompt = yield from p.stderr.read(len(prompt))
- if actual_prompt != prompt:
- raise qubes.exc.QubesException(
- 'Unexpected prompt from scrypt: {}'.format(actual_prompt))
- pty.write(passphrase + b'\n')
- pty.flush()
- # save it here, so garbage collector would not close it (which would kill
- # the child)
- p.pty = pty
- return p
- class Backup:
- '''Backup operation manager. Usage:
- >>> app = qubes.Qubes()
- >>> # optional - you can use 'None' to use default list (based on
- >>> # vm.include_in_backups property)
- >>> vms = [app.domains[name] for name in ['my-vm1', 'my-vm2', 'my-vm3']]
- >>> exclude_vms = []
- >>> options = {
- >>> 'encrypted': True,
- >>> 'compressed': True,
- >>> 'passphrase': 'This is very weak backup passphrase',
- >>> 'target_vm': app.domains['sys-usb'],
- >>> 'target_dir': '/media/disk',
- >>> }
- >>> backup_op = Backup(app, vms, exclude_vms, **options)
- >>> print(backup_op.get_backup_summary())
- >>> asyncio.get_event_loop().run_until_complete(backup_op.backup_do())
- See attributes of this object for all available options.
- '''
- # pylint: disable=too-many-instance-attributes
- class FileToBackup:
- # pylint: disable=too-few-public-methods
- def __init__(self, file_path, subdir=None, name=None, size=None):
- if size is None:
- size = qubes.storage.file.get_disk_usage(file_path)
- if subdir is None:
- abs_file_path = os.path.abspath(file_path)
- abs_base_dir = os.path.abspath(
- qubes.config.system_path["qubes_base_dir"]) + '/'
- abs_file_dir = os.path.dirname(abs_file_path) + '/'
- (nothing, directory, subdir) = \
- abs_file_dir.partition(abs_base_dir)
- assert nothing == ""
- assert directory == abs_base_dir
- else:
- if subdir and not subdir.endswith('/'):
- subdir += '/'
- #: real path to the file
- self.path = file_path
- #: size of the file
- self.size = size
- #: directory in backup archive where file should be placed
- self.subdir = subdir
- #: use this name in the archive (aka rename)
- self.name = os.path.basename(file_path)
- if name is not None:
- self.name = name
- class VMToBackup:
- # pylint: disable=too-few-public-methods
- def __init__(self, vm, files, subdir):
- self.vm = vm
- self.files = files
- self.subdir = subdir
- @property
- def size(self):
- return functools.reduce(lambda x, y: x + y.size, self.files, 0)
- def __init__(self, app, vms_list=None, exclude_list=None, **kwargs):
- """
- If vms = None, include all (sensible) VMs;
- exclude_list is always applied
- """
- super(Backup, self).__init__()
- #: progress of the backup - bytes handled of the current VM
- self.chunk_size = 100 * 1024 * 1024
- self._current_vm_bytes = 0
- #: progress of the backup - bytes handled of finished VMs
- self._done_vms_bytes = 0
- #: total backup size (set by :py:meth:`get_files_to_backup`)
- self.total_backup_bytes = 0
- #: application object
- self.app = app
- #: directory for temporary files - set after creating the directory
- self.tmpdir = None
- # Backup settings - defaults
- #: should the backup be compressed?
- self.compressed = True
- #: what passphrase should be used to intergrity protect (and encrypt)
- #: the backup; required
- self.passphrase = None
- #: custom compression filter; a program which process stdin to stdout
- self.compression_filter = DEFAULT_COMPRESSION_FILTER
- #: VM to which backup should be sent (if any)
- self.target_vm = None
- #: directory to save backup in (either in dom0 or target VM,
- #: depending on :py:attr:`target_vm`
- self.target_dir = None
- #: callback for progress reporting. Will be called with one argument
- #: - progress in percents
- self.progress_callback = None
- #: backup ID, needs to be unique (for a given user),
- #: not necessary unpredictable; automatically generated
- self.backup_id = datetime.datetime.now().strftime(
- '%Y%m%dT%H%M%S-' + str(os.getpid()))
- for key, value in kwargs.items():
- if hasattr(self, key):
- setattr(self, key, value)
- else:
- raise AttributeError(key)
- self.log = logging.getLogger('qubes.backup')
- if exclude_list is None:
- exclude_list = []
- if vms_list is None:
- vms_list = [vm for vm in app.domains if vm.include_in_backups]
- # Apply exclude list
- self.vms_for_backup = [vm for vm in vms_list
- if vm.name not in exclude_list]
- self._files_to_backup = self.get_files_to_backup()
- def __del__(self):
- if self.tmpdir and os.path.exists(self.tmpdir):
- shutil.rmtree(self.tmpdir)
- def get_files_to_backup(self):
- files_to_backup = {}
- for vm in self.vms_for_backup:
- if vm.qid == 0:
- # handle dom0 later
- continue
- subdir = 'vm%d/' % vm.qid
- vm_files = []
- for name, volume in vm.volumes.items():
- if not volume.save_on_stop:
- continue
- vm_files.append(self.FileToBackup(
- volume.export(),
- subdir,
- name + '.img',
- volume.usage))
- vm_files.extend(self.FileToBackup(i, subdir)
- for i in vm.fire_event('backup-get-files'))
- firewall_conf = os.path.join(vm.dir_path, vm.firewall_conf)
- if os.path.exists(firewall_conf):
- vm_files.append(self.FileToBackup(firewall_conf, subdir))
- if not vm_files:
- # subdir/ is needed in the tar file, otherwise restore
- # of a (Disp)VM without any backed up files is going
- # to fail. Adding a zero-sized file here happens to be
- # more straightforward than adding an empty directory.
- empty = self.FileToBackup("/var/run/qubes/empty", subdir)
- assert empty.size == 0
- vm_files.append(empty)
- files_to_backup[vm.qid] = self.VMToBackup(vm, vm_files, subdir)
- # Dom0 user home
- if 0 in [vm.qid for vm in self.vms_for_backup]:
- local_user = grp.getgrnam('qubes').gr_mem[0]
- home_dir = pwd.getpwnam(local_user).pw_dir
- # Home dir should have only user-owned files, so fix it now
- # to prevent permissions problems - some root-owned files can
- # left after 'sudo bash' and similar commands
- subprocess.check_call(['sudo', 'chown', '-R', local_user, home_dir])
- home_to_backup = [
- self.FileToBackup(home_dir, 'dom0-home/')]
- vm_files = home_to_backup
- files_to_backup[0] = self.VMToBackup(self.app.domains[0],
- vm_files,
- os.path.join('dom0-home', os.path.basename(home_dir)))
- self.total_backup_bytes = functools.reduce(
- lambda x, y: x + y.size, files_to_backup.values(), 0)
- return files_to_backup
- def get_backup_summary(self):
- summary = ""
- fields_to_display = [
- {"name": "VM", "width": 16},
- {"name": "type", "width": 12},
- {"name": "size", "width": 12}
- ]
- # Display the header
- for field in fields_to_display:
- fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
- summary += fmt.format('-')
- summary += "\n"
- for field in fields_to_display:
- fmt = "{{0:>{0}}} |".format(field["width"] + 1)
- summary += fmt.format(field["name"])
- summary += "\n"
- for field in fields_to_display:
- fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
- summary += fmt.format('-')
- summary += "\n"
- files_to_backup = self._files_to_backup
- for qid, vm_info in files_to_backup.items():
- summary_line = ""
- fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
- summary_line += fmt.format(vm_info.vm.name)
- fmt = "{{0:>{0}}} |".format(fields_to_display[1]["width"] + 1)
- if qid == 0:
- summary_line += fmt.format("User home")
- elif isinstance(vm_info.vm, qubes.vm.templatevm.TemplateVM):
- summary_line += fmt.format("Template VM")
- else:
- summary_line += fmt.format("VM" + (" + Sys" if
- vm_info.vm.updateable else ""))
- vm_size = vm_info.size
- fmt = "{{0:>{0}}} |".format(fields_to_display[2]["width"] + 1)
- summary_line += fmt.format(size_to_human(vm_size))
- if qid != 0 and vm_info.vm.is_running():
- summary_line += " <-- The VM is running, backup will contain " \
- "its state from before its start!"
- summary += summary_line + "\n"
- for field in fields_to_display:
- fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
- summary += fmt.format('-')
- summary += "\n"
- fmt = "{{0:>{0}}} |".format(fields_to_display[0]["width"] + 1)
- summary += fmt.format("Total size:")
- fmt = "{{0:>{0}}} |".format(
- fields_to_display[1]["width"] + 1 + 2 + fields_to_display[2][
- "width"] + 1)
- summary += fmt.format(size_to_human(self.total_backup_bytes))
- summary += "\n"
- for field in fields_to_display:
- fmt = "{{0:-^{0}}}-+".format(field["width"] + 1)
- summary += fmt.format('-')
- summary += "\n"
- vms_not_for_backup = [vm.name for vm in self.app.domains
- if vm not in self.vms_for_backup]
- summary += "VMs not selected for backup:\n - " + "\n - ".join(
- sorted(vms_not_for_backup)) + "\n"
- return summary
- @asyncio.coroutine
- def _prepare_backup_header(self):
- header_file_path = os.path.join(self.tmpdir, HEADER_FILENAME)
- backup_header = BackupHeader(
- version=CURRENT_BACKUP_FORMAT_VERSION,
- hmac_algorithm=DEFAULT_HMAC_ALGORITHM,
- encrypted=True,
- compressed=self.compressed,
- compression_filter=self.compression_filter,
- backup_id=self.backup_id,
- )
- backup_header.save(header_file_path)
- # Start encrypt, scrypt will also handle integrity
- # protection
- scrypt_passphrase = '{filename}!'.format(
- filename=HEADER_FILENAME).encode() + self.passphrase
- scrypt = yield from launch_scrypt(
- 'enc', header_file_path, header_file_path + '.hmac',
- scrypt_passphrase)
- retcode = yield from scrypt.wait()
- if retcode:
- raise qubes.exc.QubesException(
- "Failed to compute hmac of header file: "
- + scrypt.stderr.read())
- return HEADER_FILENAME, HEADER_FILENAME + ".hmac"
- def _send_progress_update(self):
- if not self.total_backup_bytes:
- return
- if callable(self.progress_callback):
- progress = (
- 100 * (self._done_vms_bytes + self._current_vm_bytes) /
- self.total_backup_bytes)
- # pylint: disable=not-callable
- self.progress_callback(progress)
- def _add_vm_progress(self, bytes_done):
- self._current_vm_bytes += bytes_done
- self._send_progress_update()
- @asyncio.coroutine
- def _split_and_send(self, input_stream, file_basename,
- output_queue):
- '''Split *input_stream* into parts of max *chunk_size* bytes and send
- to *output_queue*.
- :param input_stream: stream (asyncio reader stream) of data to split
- :param file_basename: basename (i.e. without part number and '.enc')
- of output files
- :param output_queue: asyncio.Queue instance to put produced files to
- - queue will get only filenames of written chunks
- '''
- # Wait for compressor (tar) process to finish or for any
- # error of other subprocesses
- i = 0
- run_error = "size_limit"
- scrypt = None
- while run_error == "size_limit":
- # Prepare a first chunk
- chunkfile = file_basename + ".%03d.enc" % i
- i += 1
- # Start encrypt, scrypt will also handle integrity
- # protection
- scrypt_passphrase = \
- '{backup_id}!{filename}!'.format(
- backup_id=self.backup_id,
- filename=os.path.relpath(chunkfile[:-4],
- self.tmpdir)).encode() + self.passphrase
- try:
- scrypt = yield from launch_scrypt(
- "enc", "-", chunkfile, scrypt_passphrase)
- run_error = yield from handle_streams(
- input_stream,
- scrypt.stdin,
- self.chunk_size,
- self._add_vm_progress
- )
- self.log.debug(
- "handle_streams returned: {}".format(run_error))
- except:
- scrypt.terminate()
- raise
- scrypt.stdin.close()
- yield from scrypt.wait()
- self.log.debug("scrypt return code: {}".format(
- scrypt.returncode))
- # Send the chunk to the backup target
- yield from output_queue.put(
- os.path.relpath(chunkfile, self.tmpdir))
- @asyncio.coroutine
- def _wrap_and_send_files(self, files_to_backup, output_queue):
- for vm_info in files_to_backup:
- for file_info in vm_info.files:
- self.log.debug("Backing up {}".format(file_info))
- backup_tempfile = os.path.join(
- self.tmpdir, file_info.subdir,
- file_info.name)
- self.log.debug("Using temporary location: {}".format(
- backup_tempfile))
- # Ensure the temporary directory exists
- if not os.path.isdir(os.path.dirname(backup_tempfile)):
- os.makedirs(os.path.dirname(backup_tempfile))
- # The first tar cmd can use any complex feature as we want.
- # Files will be verified before untaring this.
- # Prefix the path in archive with filename["subdir"] to have it
- # verified during untar
- tar_cmdline = (["tar", "-Pc", '--sparse',
- '-C', os.path.dirname(file_info.path)] +
- (['--dereference'] if
- file_info.subdir != "dom0-home/" else []) +
- ['--xform=s:^%s:%s\\0:' % (
- os.path.basename(file_info.path),
- file_info.subdir),
- os.path.basename(file_info.path)
- ])
- file_stat = os.stat(file_info.path)
- if stat.S_ISBLK(file_stat.st_mode) or \
- file_info.name != os.path.basename(file_info.path):
- # tar doesn't handle content of block device, use our
- # writer
- # also use our tar writer when renaming file
- assert not stat.S_ISDIR(file_stat.st_mode), \
- "Renaming directories not supported"
- tar_cmdline = ['python3', '-m', 'qubes.tarwriter',
- '--override-name=%s' % (
- os.path.join(file_info.subdir, os.path.basename(
- file_info.name))),
- file_info.path]
- if self.compressed:
- tar_cmdline.insert(-2,
- "--use-compress-program=%s" % self.compression_filter)
- self.log.debug(" ".join(tar_cmdline))
- # Pipe: tar-sparse | scrypt | tar | backup_target
- # TODO: log handle stderr
- # pylint: disable=not-an-iterable
- tar_sparse = yield from asyncio.create_subprocess_exec(
- *tar_cmdline, stdout=subprocess.PIPE)
- try:
- yield from self._split_and_send(
- tar_sparse.stdout,
- backup_tempfile,
- output_queue)
- except:
- try:
- tar_sparse.terminate()
- except ProcessLookupError:
- pass
- raise
- yield from tar_sparse.wait()
- if tar_sparse.returncode:
- raise qubes.exc.QubesException(
- 'Failed to archive {} file'.format(file_info.path))
- # This VM done, update progress
- self._done_vms_bytes += vm_info.size
- self._current_vm_bytes = 0
- self._send_progress_update()
- yield from output_queue.put(QUEUE_FINISHED)
- @staticmethod
- @asyncio.coroutine
- def _monitor_process(proc, error_message):
- try:
- yield from proc.wait()
- except:
- proc.terminate()
- raise
- if proc.returncode:
- if proc.stderr is not None:
- proc_stderr = (yield from proc.stderr.read())
- proc_stderr = proc_stderr.decode('ascii', errors='ignore')
- proc_stderr = ''.join(
- c for c in proc_stderr if c in string.printable and
- c not in '\r\n%{}')
- error_message += ': ' + proc_stderr
- raise qubes.exc.QubesException(error_message)
- @staticmethod
- @asyncio.coroutine
- def _cancel_on_error(future, previous_task):
- '''If further element of chain fail, cancel previous one to
- avoid deadlock.
- When earlier element of chain fail, it will be handled by
- :py:meth:`backup_do`.
- The chain is:
- :py:meth:`_wrap_and_send_files` -> :py:class:`SendWorker` -> vmproc
- '''
- try:
- yield from future
- except: # pylint: disable=bare-except
- previous_task.cancel()
- @asyncio.coroutine
- def backup_do(self):
- # pylint: disable=too-many-statements
- if self.passphrase is None:
- raise qubes.exc.QubesException("No passphrase set")
- if not isinstance(self.passphrase, bytes):
- self.passphrase = self.passphrase.encode('utf-8')
- qubes_xml = self.app.store
- self.tmpdir = tempfile.mkdtemp()
- shutil.copy(qubes_xml, os.path.join(self.tmpdir, 'qubes.xml'))
- qubes_xml = os.path.join(self.tmpdir, 'qubes.xml')
- backup_app = qubes.Qubes(qubes_xml, offline_mode=True)
- backup_app.events_enabled = False
- files_to_backup = self._files_to_backup
- # make sure backup_content isn't set initially
- for vm in backup_app.domains:
- vm.events_enabled = False
- vm.features['backup-content'] = False
- for qid, vm_info in files_to_backup.items():
- # VM is included in the backup
- backup_app.domains[qid].features['backup-content'] = True
- backup_app.domains[qid].features['backup-path'] = vm_info.subdir
- backup_app.domains[qid].features['backup-size'] = vm_info.size
- backup_app.save()
- del backup_app
- vmproc = None
- if self.target_vm is not None:
- # Prepare the backup target (Qubes service call)
- # If APPVM, STDOUT is a PIPE
- read_fd, write_fd = os.pipe()
- vmproc = yield from self.target_vm.run_service('qubes.Backup',
- stdin=read_fd,
- stderr=subprocess.PIPE,
- stdout=subprocess.DEVNULL)
- os.close(read_fd)
- os.write(write_fd, (self.target_dir.
- replace("\r", "").replace("\n", "") + "\n").encode())
- backup_stdout = write_fd
- else:
- # Prepare the backup target (local file)
- if os.path.isdir(self.target_dir):
- backup_target = self.target_dir + "/qubes-{0}". \
- format(time.strftime("%Y-%m-%dT%H%M%S"))
- else:
- backup_target = self.target_dir
- # Create the target directory
- if not os.path.exists(os.path.dirname(self.target_dir)):
- raise qubes.exc.QubesException(
- "ERROR: the backup directory for {0} does not exists".
- format(self.target_dir))
- # If not APPVM, STDOUT is a local file
- backup_stdout = open(backup_target, 'wb')
- # Tar with tape length does not deals well with stdout
- # (close stdout between two tapes)
- # For this reason, we will use named pipes instead
- self.log.debug("Working in {}".format(self.tmpdir))
- self.log.debug("Will backup: {}".format(files_to_backup))
- header_files = yield from self._prepare_backup_header()
- # Setup worker to send encrypted data chunks to the backup_target
- to_send = asyncio.Queue(10)
- send_proc = SendWorker(to_send, self.tmpdir, backup_stdout)
- send_task = asyncio.ensure_future(send_proc.run())
- vmproc_task = None
- if vmproc is not None:
- vmproc_task = asyncio.ensure_future(
- self._monitor_process(vmproc,
- 'Writing backup to VM {} failed'.format(
- self.target_vm.name)))
- asyncio.ensure_future(self._cancel_on_error(
- vmproc_task, send_task))
- for file_name in header_files:
- yield from to_send.put(file_name)
- qubes_xml_info = self.VMToBackup(
- None,
- [self.FileToBackup(qubes_xml, '')],
- ''
- )
- inner_archive_task = asyncio.ensure_future(
- self._wrap_and_send_files(
- itertools.chain([qubes_xml_info], files_to_backup.values()),
- to_send
- ))
- asyncio.ensure_future(
- self._cancel_on_error(send_task, inner_archive_task))
- try:
- try:
- yield from inner_archive_task
- except:
- yield from to_send.put(QUEUE_ERROR)
- # in fact we may be handling CancelledError, induced by
- # exception in send_task or vmproc_task (and propagated by
- # self._cancel_on_error call above); in such a case this
- # yield from will raise exception, covering CancelledError -
- # this is intended behaviour
- if vmproc_task:
- yield from vmproc_task
- yield from send_task
- raise
- yield from send_task
- finally:
- if isinstance(backup_stdout, int):
- os.close(backup_stdout)
- else:
- backup_stdout.close()
- try:
- if vmproc_task:
- yield from vmproc_task
- finally:
- shutil.rmtree(self.tmpdir)
- # Save date of last backup, only when backup succeeded
- for qid, vm_info in files_to_backup.items():
- if vm_info.vm:
- vm_info.vm.backup_timestamp = \
- int(datetime.datetime.now().strftime('%s'))
- self.app.save()
- @asyncio.coroutine
- def handle_streams(stream_in, stream_out, size_limit=None,
- progress_callback=None):
- '''
- Copy stream_in to all streams_out and monitor all mentioned processes.
- If any of them terminate with non-zero code, interrupt the process. Copy
- at most `size_limit` data (if given).
- :param stream_in: StreamReader object to read data from
- :param stream_out: StreamWriter object to write data to
- :param size_limit: int maximum data amount to process
- :param progress_callback: callable function to report progress, will be
- given copied data size (it should accumulate internally)
- :return: "size_limit" or None (no error)
- '''
- buffer_size = 409600
- bytes_copied = 0
- while True:
- if size_limit:
- to_copy = min(buffer_size, size_limit - bytes_copied)
- if to_copy <= 0:
- return "size_limit"
- else:
- to_copy = buffer_size
- buf = yield from stream_in.read(to_copy)
- if not buf:
- # done
- break
- if callable(progress_callback):
- progress_callback(len(buf))
- stream_out.write(buf)
- bytes_copied += len(buf)
- return None
- # vim:sw=4:et:
|