Merge branch 'paranoid-restore'

* paranoid-restore:
  backup/restore: make error reporting work also for StandaloneVM based DispVM
  backup/restore: better error detection for --paranoid-mode
  doc: document 'tag-created-vm-with' feature
  tests: add paranoid backup restore unit tests
  tests: remove extra empty lines
  tests: extend run_service mockup for pre-recorded output
  rpm: add BR: python3-lxml and python3-xcffib
  backup/restore: add option for unattended restore and extracting log
  tools: remove obsolete _want_app argument
  Add "paranoid restore" mode
  rpm/deb: add dependency on scrypt
  utils: add simple locking primitive
  backup/restore: use qfile-unpacker in a VM, request disk space monitoring
  backup/restore: option for alternative qrexec service
  backup/restore: improve error message about restoring tags
  backup/restore: distinguish dom0 by name
This commit is contained in:
Marek Marczykowski-Górecki 2020-08-07 02:55:27 +02:00
commit b04a14685c
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
18 changed files with 958 additions and 55 deletions

1
debian/control vendored
View File

@ -23,6 +23,7 @@ Package: qubes-core-admin-client
Architecture: any
Depends:
python3-qubesadmin,
scrypt,
${python:Depends},
${python3:Depends},
${misc:Depends}

View File

@ -87,7 +87,26 @@ Options
Read passphrase from file, or use '-' to read from stdin
.. option:: --location-is-service
Provided backup location is a qrexec service name (optionally with an
argument, separated by ``+``), instead of file path or a command.
.. option:: --paranoid-mode, --plan-b
Isolate restore process in a DisposableVM, defend against potentially
compromised backup. In this mode some parts of the backup are skipped,
specifically:
- dom0 home directory (desktop environment settings)
- PCI devices assignments
.. option:: --auto-close
When running with --paranoid-mode (see above), automatically close restore
progress window after the restore process is finished and display restore log
on the standard output. The log will be colored red if the standard output is
a terminal.
Authors
=======

View File

@ -218,6 +218,13 @@ other modes it is ignored.
Default: True
tag-created-vm-with
^^^^^^^^^^^^^^^^^^^
When a qube with this feature create a new VM, it gets extra tags listed in this
feature value (separated with space) automatically. Tags are added before qube
creation finishes.
Authors
-------

343
qubesadmin/backup/dispvm.py Normal file
View File

@ -0,0 +1,343 @@
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2019 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Handle backup extraction using DisposableVM"""
import collections
import datetime
import itertools
import logging
import os
import string
import subprocess
#import typing
import qubesadmin
import qubesadmin.exc
import qubesadmin.utils
import qubesadmin.vm
LOCKFILE = '/var/run/qubes/backup-paranoid-restore.lock'
Option = collections.namedtuple('Option', ('opts', 'handler'))
# Convenient functions for 'handler' value of Option object
# (see RestoreInDisposableVM.arguments):
def handle_store_true(option, value):
"""Handle argument enabling an option (action="store_true")"""
if value:
return [option.opts[0]]
return []
def handle_store_false(option, value):
"""Handle argument disabling an option (action="false")"""
if not value:
return [option.opts[0]]
return []
def handle_verbose(option, value):
"""Handle argument --quiet / --verbose options (action="count")"""
if option.opts[0] == '--verbose':
value -= 1 # verbose defaults to 1
return [option.opts[0]] * value
def handle_store(option, value):
"""Handle argument with arbitrary string value (action="store")"""
if value:
return [option.opts[0], str(value)]
return []
def handle_append(option, value):
"""Handle argument with a list of values (action="append")"""
return itertools.chain(*([option.opts[0], v] for v in value))
def skip(_option, _value):
"""Skip argument"""
return []
class RestoreInDisposableVM:
"""Perform backup restore with actual archive extraction isolated
within DisposableVM"""
#dispvm: typing.Optional[qubesadmin.vm.QubesVM]
#: map of args attr -> original option
arguments = {
'quiet': Option(('--quiet', '-q'), handle_verbose),
'verbose': Option(('--verbose', '-v'), handle_verbose),
'verify_only': Option(('--verify-only',), handle_store_true),
'skip_broken': Option(('--skip-broken',), handle_store_true),
'ignore_missing': Option(('--ignore-missing',), handle_store_true),
'skip_conflicting': Option(('--skip-conflicting',), handle_store_true),
'rename_conflicting': Option(('--rename-conflicting',),
handle_store_true),
'exclude': Option(('--exclude', '-x'), handle_append),
'dom0_home': Option(('--skip-dom0-home',), handle_store_false),
'ignore_username_mismatch': Option(('--ignore-username-mismatch',),
handle_store_true),
'ignore_size_limit': Option(('--ignore-size-limit',),
handle_store_true),
'compression': Option(('--compression-filter', '-Z'), handle_store),
'appvm': Option(('--dest-vm', '-d'), handle_store),
'pass_file': Option(('--passphrase-file', '-p'), handle_store),
'location_is_service': Option(('--location-is-service',),
handle_store_true),
'paranoid_mode': Option(('--paranoid-mode', '--plan-b',), skip),
'auto_close': Option(('--auto-close',), skip),
# make the verification easier, those don't really matter
'help': Option(('--help', '-h'), skip),
'force_root': Option(('--force-root',), skip),
}
def __init__(self, app, args):
"""
:param app: Qubes() instance
:param args: namespace instance as with qvm-backup-restore arguments
parsed. See :py:module:`qubesadmin.tools.qvm_backup_restore`.
"""
self.app = app
self.args = args
# only one backup restore is allowed at the time, use constant names
#: name of DisposableVM using to extract the backup
self.dispvm_name = 'disp-backup-restore'
#: tag given to this DisposableVM - qrexec policy is configured for it
self.dispvm_tag = 'backup-restore-mgmt'
#: tag automatically added to restored VMs
self.restored_tag = 'backup-restore-in-progress'
#: tag added to a VM storing the backup archive
self.storage_tag = 'backup-restore-storage'
# FIXME: make it random, collision free
# (when considering non-disposable case)
self.backup_log_path = '/var/tmp/backup-restore.log'
self.terminal_app = ('xterm', '-hold', '-title', 'Backup restore', '-e',
'/bin/sh', '-c',
'("$0" "$@" 2>&1; echo exit code: $?) | tee {}'.
format(self.backup_log_path))
if args.auto_close:
# filter-out '-hold'
self.terminal_app = tuple(a for a in self.terminal_app
if a != '-hold')
self.dispvm = None
if args.appvm:
self.backup_storage_vm = self.app.domains[args.appvm]
else:
self.backup_storage_vm = self.app.domains['dom0']
self.storage_access_proc = None
self.storage_access_id = None
self.log = logging.getLogger('qubesadmin.backup.dispvm')
def clear_old_tags(self):
"""Remove tags from old restore operation"""
for domain in self.app.domains:
domain.tags.discard(self.restored_tag)
domain.tags.discard(self.dispvm_tag)
domain.tags.discard(self.storage_tag)
def create_dispvm(self):
"""Create DisposableVM used to restore"""
self.dispvm = self.app.add_new_vm('DispVM', self.dispvm_name, 'red',
template=self.app.management_dispvm)
self.dispvm.auto_cleanup = True
self.dispvm.features['tag-created-vm-with'] = self.restored_tag
def transfer_pass_file(self, path):
"""Copy passhprase file to the DisposableVM"""
subprocess.check_call(
['qvm-copy-to-vm', self.dispvm_name, path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return '/home/{}/QubesIncoming/{}/{}'.format(
self.dispvm.default_user,
os.uname()[1],
os.path.basename(path)
)
def register_backup_source(self):
"""Tell backup archive holding VM we want this content.
This function registers a backup source, receives a token needed to
access it (stored in *storage_access_id* attribute). The access is
revoked when connection referenced in *storage_access_proc* attribute
is closed.
"""
self.storage_access_proc = self.backup_storage_vm.run_service(
'qubes.RegisterBackupLocation', stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
self.storage_access_proc.stdin.write(
(self.args.backup_location.
replace("\r", "").replace("\n", "") + "\n").encode())
self.storage_access_proc.stdin.flush()
storage_access_id = self.storage_access_proc.stdout.readline().strip()
allowed_chars = (string.ascii_letters + string.digits).encode()
if not storage_access_id or \
not all(c in allowed_chars for c in storage_access_id):
if self.storage_access_proc.returncode == 127:
raise qubesadmin.exc.QubesException(
'Backup source registration failed - qubes-core-agent '
'package too old?')
raise qubesadmin.exc.QubesException(
'Backup source registration failed - got invalid id')
self.storage_access_id = storage_access_id.decode('ascii')
# keep connection open, closing it invalidates the access
self.backup_storage_vm.tags.add(self.storage_tag)
def invalidate_backup_access(self):
"""Revoke access to backup archive"""
self.backup_storage_vm.tags.discard(self.storage_tag)
self.storage_access_proc.stdin.close()
self.storage_access_proc.wait()
def prepare_inner_args(self):
"""Prepare arguments for inner (in-DispVM) qvm-backup-restore command"""
new_options = []
new_positional_args = []
for attr, opt in self.arguments.items():
if not hasattr(self.args, attr):
continue
new_options.extend(opt.handler(opt, getattr(self.args, attr)))
new_options.append('--location-is-service')
# backup location, replace by qrexec service to be called
new_positional_args.append(
'qubes.RestoreById+' + self.storage_access_id)
if self.args.vms:
new_positional_args.extend(self.args.vms)
return new_options + new_positional_args
def finalize_tags(self):
"""Make sure all the restored VMs are marked with
restored-from-backup-xxx tag, then remove backup-restore-in-progress
tag"""
self.app.domains.clear_cache()
for domain in self.app.domains:
if 'backup-restore-in-progress' not in domain.tags:
continue
if not any(t.startswith('restored-from-backup-')
for t in domain.tags):
self.log.warning('Restored domain %s was not tagged with '
'restored-from-backup-* tag',
domain.name)
# add fallback tag
domain.tags.add('restored-from-backup-at-{}'.format(
datetime.date.strftime(datetime.date.today(), '%F')))
domain.tags.discard('backup-restore-in-progress')
@staticmethod
def sanitize_log(untrusted_log):
"""Replace characters potentially dangerouns to terminal in
a backup log"""
allowed_set = set(range(0x20, 0x7e))
allowed_set.update({0x0a})
return bytes(c if c in allowed_set else ord('.') for c in untrusted_log)
def extract_log(self):
"""Extract restore log from the DisposableVM"""
untrusted_backup_log, _ = self.dispvm.run_with_args(
'cat', self.backup_log_path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
backup_log = self.sanitize_log(untrusted_backup_log)
return backup_log
def run(self):
"""Run the backup restore operation"""
lock = qubesadmin.utils.LockFile(LOCKFILE, True)
lock.acquire()
try:
self.create_dispvm()
self.clear_old_tags()
self.register_backup_source()
self.dispvm.start()
self.dispvm.run_service_for_stdio('qubes.WaitForSession')
if self.args.pass_file:
self.args.pass_file = self.transfer_pass_file(
self.args.pass_file)
args = self.prepare_inner_args()
self.dispvm.tags.add(self.dispvm_tag)
self.dispvm.run_with_args(*self.terminal_app,
'qvm-backup-restore', *args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
backup_log = self.extract_log()
last_line = backup_log.splitlines()[-1]
if not last_line.startswith(b'exit code:'):
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore did not reported exit code',
backup_log=backup_log)
try:
exit_code = int(last_line.split()[-1])
except ValueError:
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore reported unexpected exit code',
backup_log=backup_log)
if exit_code == 127:
raise qubesadmin.exc.QubesException(
'qvm-backup-restore tool '
'missing in {} template, install qubes-core-admin-client '
'package there'.format(
getattr(self.dispvm.template,
'template',
self.dispvm.template).name)
)
if exit_code != 0:
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore failed with {}'.format(exit_code),
backup_log=backup_log)
return backup_log
except subprocess.CalledProcessError as e:
if e.returncode == 127:
raise qubesadmin.exc.QubesException(
'{} missing in {} template, install it there '
'package there'.format(self.terminal_app[0],
self.dispvm.template.template.name)
)
try:
backup_log = self.extract_log()
except: # pylint: disable=bare-except
backup_log = None
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore failed with {}'.format(e.returncode),
backup_log=backup_log)
finally:
if self.dispvm is not None:
# first revoke permission, then cleanup
self.dispvm.tags.discard(self.dispvm_tag)
# autocleanup removes the VM
try:
self.dispvm.kill()
except qubesadmin.exc.QubesVMNotStartedError:
# delete it manually
del self.app.domains[self.dispvm]
self.finalize_tags()
lock.release()

View File

@ -910,7 +910,7 @@ class BackupRestore(object):
self.username = os.path.basename(subdir)
def __init__(self, app, backup_location, backup_vm, passphrase,
force_compression_filter=None):
location_is_service=False, force_compression_filter=None):
super(BackupRestore, self).__init__()
#: qubes.Qubes instance
@ -921,12 +921,16 @@ class BackupRestore(object):
#: VM from which backup should be retrieved
self.backup_vm = backup_vm
if backup_vm and backup_vm.qid == 0:
if backup_vm and backup_vm.name == 'dom0':
self.backup_vm = None
#: backup path, inside VM pointed by :py:attr:`backup_vm`
self.backup_location = backup_location
#: use alternative qrexec service to retrieve backup data, instead of
#: ``qubes.Restore`` with *backup_location* given on stdin
self.location_is_service = location_is_service
#: force using specific application for (de)compression, instead of
#: the one named in the backup header
self.force_compression_filter = force_compression_filter
@ -973,11 +977,14 @@ class BackupRestore(object):
vmproc = None
if self.backup_vm is not None:
# If APPVM, STDOUT is a PIPE
vmproc = self.backup_vm.run_service('qubes.Restore')
vmproc.stdin.write(
(self.backup_location.replace("\r", "").replace("\n",
"") + "\n").encode())
vmproc.stdin.flush()
if self.location_is_service:
vmproc = self.backup_vm.run_service(self.backup_location)
else:
vmproc = self.backup_vm.run_service('qubes.Restore')
vmproc.stdin.write(
(self.backup_location.replace("\r", "").replace("\n",
"") + "\n").encode())
vmproc.stdin.flush()
# Send to tar2qfile the VMs that should be extracted
vmproc.stdin.write((" ".join(filelist) + "\n").encode())
@ -985,9 +992,14 @@ class BackupRestore(object):
self.processes_to_kill_on_cancel.append(vmproc)
backup_stdin = vmproc.stdout
# FIXME use /usr/lib/qubes/qfile-unpacker in non-dom0
tar1_command = ['/usr/libexec/qubes/qfile-dom0-unpacker',
str(os.getuid()), self.tmpdir, '-v']
if isinstance(self.app, qubesadmin.app.QubesRemote):
qfile_unpacker_path = '/usr/lib/qubes/qfile-unpacker'
else:
qfile_unpacker_path = '/usr/libexec/qubes/qfile-dom0-unpacker'
# keep at least 500M free for decryption of a previous chunk
tar1_command = [qfile_unpacker_path,
str(os.getuid()), self.tmpdir, '-v',
'-w', str(500 * 1024 * 1024)]
else:
backup_stdin = open(self.backup_location, 'rb')
@ -2035,8 +2047,9 @@ class BackupRestore(object):
try:
new_vm.tags.add(tag)
except Exception as err: # pylint: disable=broad-except
self.log.error('Error adding tag %s to %s: %s',
tag, vm.name, err)
if tag not in new_vm.tags:
self.log.error('Error adding tag %s to %s: %s',
tag, vm.name, err)
for bus in vm.devices:
for backend_domain, ident in vm.devices[bus]:

View File

@ -154,6 +154,12 @@ class DeviceAlreadyAttached(QubesException, KeyError):
return QubesException.__str__(self)
class BackupRestoreError(QubesException):
'''Restoring a backup failed'''
def __init__(self, msg, backup_log=None):
super(BackupRestoreError, self).__init__(msg)
self.backup_log = backup_log
# pylint: disable=too-many-ancestors
class QubesDaemonNoResponseError(QubesDaemonCommunicationError):
'''Got empty response from qubesd'''

View File

@ -54,14 +54,12 @@ class TestVMCollection(dict):
class TestProcess(object):
def __init__(self, input_callback=None, stdout=None, stderr=None):
self.input_callback = input_callback
self.got_any_input = False
self.stdin = io.BytesIO()
# don't let anyone close it, before we get the value
self.stdin_close = self.stdin.close
if self.input_callback:
self.stdin.close = (
lambda: self.input_callback(self.stdin.getvalue()))
else:
self.stdin.close = lambda: None
self.stdin.close = self.store_input
self.stdin.flush = self.store_input
if stdout == subprocess.PIPE:
self.stdout = io.BytesIO()
else:
@ -72,6 +70,13 @@ class TestProcess(object):
self.stderr = stderr
self.returncode = 0
def store_input(self):
value = self.stdin.getvalue()
if (not self.got_any_input or value) and self.input_callback:
self.input_callback(self.stdin.getvalue())
self.got_any_input = True
self.stdin.truncate(0)
def communicate(self, input=None):
if input is not None:
self.stdin.write(input)
@ -102,11 +107,9 @@ class _AssertNotRaisesContext(object):
self.failureException = test_case.failureException
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
return True
@ -121,14 +124,17 @@ class _AssertNotRaisesContext(object):
class QubesTest(qubesadmin.app.QubesBase):
expected_service_calls = None
expected_calls = None
actual_calls = None
service_calls = None
def __init__(self):
super(QubesTest, self).__init__()
#: expected calls and saved replies for them
#: expected Admin API calls and saved replies for them
self.expected_calls = {}
#: expected qrexec service calls and saved replies for them
self.expected_service_calls = {}
#: actual calls made
self.actual_calls = []
#: rpc service calls
@ -152,6 +158,14 @@ class QubesTest(qubesadmin.app.QubesBase):
def run_service(self, dest, service, **kwargs):
self.service_calls.append((dest, service, kwargs))
call_key = (dest, service)
# TODO: consider it as a future extension, as a replacement for
# checking app.service_calls later
# if call_key not in self.expected_service_calls:
# raise AssertionError('Unexpected service call {!r}'.format(call_key))
if call_key in self.expected_service_calls:
kwargs = kwargs.copy()
kwargs['stdout'] = io.BytesIO(self.expected_service_calls[call_key])
return TestProcess(lambda input: self.service_calls.append((dest,
service, input)),
stdout=kwargs.get('stdout', None),

View File

@ -1440,8 +1440,14 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
str(value).encode())] = b'0\0'
for tag in vm['tags']:
self.app.expected_calls[
(name, 'admin.vm.tag.Set', tag, None)] = b'0\0'
if tag.startswith('created-by-'):
self.app.expected_calls[
(name, 'admin.vm.tag.Set', tag, None)] = b''
self.app.expected_calls[
(name, 'admin.vm.tag.Get', tag, None)] = b'0\0001'
else:
self.app.expected_calls[
(name, 'admin.vm.tag.Set', tag, None)] = b'0\0'
if vm['backup_path']:
appmenus = (
@ -1727,7 +1733,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@ -1797,7 +1804,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@ -1867,7 +1875,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@ -1968,7 +1977,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()

View File

@ -0,0 +1,409 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2019 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
import tempfile
import unittest
import unittest.mock
from unittest.mock import call
import subprocess
import qubesadmin.tests
from qubesadmin.tools import qvm_backup_restore
from qubesadmin.backup.dispvm import RestoreInDisposableVM
class TC_00_RestoreInDispVM(qubesadmin.tests.QubesTestCase):
def setUp(self):
super().setUp()
def test_000_prepare_inner_args(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--verbose', '--skip-broken', '--skip-dom0-home',
'--dest-vm', 'testvm',
'--compression-filter', 'gzip', '/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = argv[:-1] + \
['--location-is-service', 'qubes.RestoreById+abc']
self.assertCountEqual(expected_argv, reconstructed_argv)
def test_001_prepare_inner_args_exclude(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--exclude', 'vm1', '--exclude', 'vm2',
'/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
print(repr(args))
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = argv[:-1] + \
['--location-is-service', 'qubes.RestoreById+abc']
self.assertCountEqual(expected_argv, reconstructed_argv)
def test_002_prepare_inner_args_pass_file(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--passphrase-file=/tmp/some/file',
'/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
print(repr(args))
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = ['--passphrase-file', '/tmp/some/file',
'--location-is-service', 'qubes.RestoreById+abc']
self.assertEqual(expected_argv, reconstructed_argv)
def test_003_prepare_inner_args_auto_close(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--auto-close', '/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
print(repr(args))
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = ['--location-is-service', 'qubes.RestoreById+abc']
self.assertEqual(expected_argv, reconstructed_argv)
def test_010_clear_old_tags(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
for tag in ('backup-restore-mgmt',
'backup-restore-in-progress',
'backup-restore-storage'):
self.app.expected_calls[
('dom0', 'admin.vm.tag.Remove', tag, None)] = \
b'2\x00QubesTagNotFoundError\x00\x00Tag not found\x00'
self.app.expected_calls[
('fedora-25', 'admin.vm.tag.Remove', tag, None)] = b'0\0'
self.app.expected_calls[
('testvm', 'admin.vm.tag.Remove', tag, None)] = b'0\0'
args = unittest.mock.Mock(appvm='testvm')
obj = RestoreInDisposableVM(self.app, args)
obj.clear_old_tags()
self.assertAllCalled()
@unittest.mock.patch('subprocess.check_call')
def test_020_create_dispvm(self, mock_check_call):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
b'mgmt-dvm class=AppVM state=Halted\n'
# this should be only after creating...
b'disp-backup-restore class=DispVM state=Halted\n'
)
self.app.expected_calls[
('dom0', 'admin.property.Get', 'management_dispvm', None)] = \
b'0\0default=False type=vm mgmt-dvm'
self.app.expected_calls[
('dom0', 'admin.vm.Create.DispVM', 'mgmt-dvm',
b'name=disp-backup-restore label=red')] = b'0\0'
self.app.expected_calls[
('disp-backup-restore', 'admin.vm.property.Set', 'auto_cleanup',
b'True')] = \
b'0\0'
self.app.expected_calls[
('disp-backup-restore', 'admin.vm.feature.Set', 'tag-created-vm-with',
b'backup-restore-in-progress')] = \
b'0\0'
args = unittest.mock.Mock(appvm='dom0')
obj = RestoreInDisposableVM(self.app, args)
obj.create_dispvm()
self.assertAllCalled()
@unittest.mock.patch('subprocess.check_call')
@unittest.mock.patch('os.uname')
def test_030_transfer_pass_file(self, mock_uname, mock_check_call):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
mock_uname.return_value = ('Linux', 'dom0', '5.0.0', '#1', 'x86_64')
args = unittest.mock.Mock(appvm='testvm')
obj = RestoreInDisposableVM(self.app, args)
obj.dispvm = unittest.mock.Mock(default_user='user2')
new_path = obj.transfer_pass_file('/some/path')
self.assertEqual(new_path, '/home/user2/QubesIncoming/dom0/path')
mock_check_call.assert_called_once_with(
['qvm-copy-to-vm', 'disp-backup-restore', '/some/path'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
self.assertAllCalled()
def test_040_register_backup_source(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'backup-storage class=AppVM state=Running\n'
)
self.app.expected_service_calls[
('backup-storage', 'qubes.RegisterBackupLocation')] = \
b'someid\nsomething that should not be read'
self.app.expected_calls[
('backup-storage', 'admin.vm.tag.Set', 'backup-restore-storage',
None)] = b'0\0'
args = unittest.mock.Mock(backup_location='/backup/path',
appvm='backup-storage')
obj = RestoreInDisposableVM(self.app, args)
obj.dispvm = unittest.mock.Mock(default_user='user2')
obj.register_backup_source()
self.assertEqual(obj.storage_access_id, 'someid')
self.assertEqual(self.app.service_calls, [
('backup-storage', 'qubes.RegisterBackupLocation',
{'stdin':subprocess.PIPE, 'stdout':subprocess.PIPE}),
('backup-storage', 'qubes.RegisterBackupLocation', b'/backup/path\n'),
])
self.assertAllCalled()
def test_050_invalidate_backup_access(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'backup-storage class=AppVM state=Running\n'
)
self.app.expected_calls[
('backup-storage', 'admin.vm.tag.Remove', 'backup-restore-storage',
None)] = b'0\0'
args = unittest.mock.Mock(backup_location='/backup/path',
appvm='backup-storage')
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_proc = unittest.mock.Mock()
obj.invalidate_backup_access()
self.assertEqual(obj.storage_access_proc.mock_calls, [
call.stdin.close(),
call.wait(),
])
self.assertAllCalled()
@unittest.mock.patch('datetime.date')
def test_060_finalize_tags(self, mock_date):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'disp-backup-restore class=DispVM state=Running\n'
b'restored1 class=AppVM state=Halted\n'
b'restored2 class=AppVM state=Halted\n'
)
self.app.expected_calls[
('dom0', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x000'
self.app.expected_calls[
('fedora-25', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x000'
self.app.expected_calls[
('disp-backup-restore', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x000'
self.app.expected_calls[
('restored1', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x001'
self.app.expected_calls[
('restored1', 'admin.vm.tag.List', None, None)] = \
b'0\0backup-restore-in-progress\n' \
b'restored-from-backup-12345678\n' \
b'created-by-disp-backup-restore\n'
self.app.expected_calls[
('restored1', 'admin.vm.tag.Remove', 'backup-restore-in-progress',
None)] = b'0\0'
self.app.expected_calls[
('restored2', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x001'
self.app.expected_calls[
('restored2', 'admin.vm.tag.List', None, None)] = \
b'0\0backup-restore-in-progress\n' \
b'created-by-disp-backup-restore\n'
self.app.expected_calls[
('restored2', 'admin.vm.tag.Set',
'restored-from-backup-at-2019-10-01',
None)] = b'0\0'
self.app.expected_calls[
('restored2', 'admin.vm.tag.Remove', 'backup-restore-in-progress',
None)] = b'0\0'
mock_date.today.return_value = datetime.date.fromisoformat('2019-10-01')
mock_date.strftime.return_value = '2019-10-01'
args = unittest.mock.Mock(backup_location='/backup/path',
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
obj.finalize_tags()
self.assertAllCalled()
def test_070_sanitize_log(self):
sanitized = RestoreInDisposableVM.sanitize_log(b'sample message')
self.assertEqual(sanitized, b'sample message')
sanitized = RestoreInDisposableVM.sanitize_log(
b'sample message\nmultiline\n')
self.assertEqual(sanitized, b'sample message\nmultiline\n')
sanitized = RestoreInDisposableVM.sanitize_log(
b'\033[0;33m\xff\xfe\x80')
self.assertEqual(sanitized, b'.[0;33m...')
def test_080_extract_log(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
obj.dispvm = unittest.mock.Mock()
obj.dispvm.run_with_args.return_value = b'this is a log', None
backup_log = obj.extract_log()
obj.dispvm.run_with_args.assert_called_once_with(
'cat', '/var/tmp/backup-restore.log',
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
self.assertEqual(backup_log, b'this is a log')
def test_100_run(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
pass_file=None,
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
'prepare_inner_args', 'extract_log', 'finalize_tags']
for m in methods:
setattr(obj, m, unittest.mock.Mock())
obj.extract_log.return_value = b'Some logs\nexit code: 0\n'
obj.transfer_pass_file = unittest.mock.Mock()
obj.prepare_inner_args.return_value = ['args']
obj.terminal_app = ('terminal',)
obj.dispvm = unittest.mock.Mock()
with tempfile.NamedTemporaryFile() as tmp:
with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
tmp.name):
obj.run()
for m in methods:
self.assertEqual(len(getattr(obj, m).mock_calls), 1)
self.assertEqual(obj.dispvm.mock_calls, [
call.start(),
call.run_service_for_stdio('qubes.WaitForSession'),
call.tags.add('backup-restore-mgmt'),
call.run_with_args('terminal', 'qvm-backup-restore', 'args',
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL),
call.tags.discard('backup-restore-mgmt'),
call.kill()
])
obj.transfer_pass_file.assert_not_called()
def test_101_run_pass_file(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
pass_file='/some/path',
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
'prepare_inner_args', 'extract_log', 'finalize_tags',
'transfer_pass_file']
for m in methods:
setattr(obj, m, unittest.mock.Mock())
obj.extract_log.return_value = b'Some logs\nexit code: 0\n'
obj.prepare_inner_args.return_value = ['args']
obj.terminal_app = ('terminal',)
obj.dispvm = unittest.mock.Mock()
with tempfile.NamedTemporaryFile() as tmp:
with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
tmp.name):
obj.run()
for m in methods:
self.assertEqual(len(getattr(obj, m).mock_calls), 1)
self.assertEqual(obj.dispvm.mock_calls, [
call.start(),
call.run_service_for_stdio('qubes.WaitForSession'),
call.tags.add('backup-restore-mgmt'),
call.run_with_args('terminal', 'qvm-backup-restore', 'args',
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL),
call.tags.discard('backup-restore-mgmt'),
call.kill()
])
def test_102_run_error(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
pass_file=None,
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
'prepare_inner_args', 'extract_log', 'finalize_tags']
for m in methods:
setattr(obj, m, unittest.mock.Mock())
obj.extract_log.return_value = b'Some error\nexit code: 1\n'
obj.transfer_pass_file = unittest.mock.Mock()
obj.prepare_inner_args.return_value = ['args']
obj.terminal_app = ('terminal',)
obj.dispvm = unittest.mock.Mock()
with tempfile.NamedTemporaryFile() as tmp:
with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
tmp.name):
with self.assertRaises(qubesadmin.exc.BackupRestoreError):
obj.run()
for m in methods:
self.assertEqual(len(getattr(obj, m).mock_calls), 1)
self.assertEqual(obj.dispvm.mock_calls, [
call.start(),
call.run_service_for_stdio('qubes.WaitForSession'),
call.tags.add('backup-restore-mgmt'),
call.run_with_args('terminal', 'qvm-backup-restore', 'args',
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL),
call.tags.discard('backup-restore-mgmt'),
call.kill()
])
obj.transfer_pass_file.assert_not_called()

View File

@ -17,12 +17,15 @@
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import itertools
import qubesadmin.tests
import qubesadmin.tests.tools
import qubesadmin.tools.qvm_backup_restore
from unittest import mock
from qubesadmin.backup import BackupVM
from qubesadmin.backup.restore import BackupRestore
from qubesadmin.backup.dispvm import RestoreInDisposableVM
class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
@ -59,7 +62,7 @@ class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
self.app, mock.ANY, mock_restore_info)
mock_backup.assert_called_once_with(
self.app, '/some/path', None, 'testpass',
force_compression_filter=None)
force_compression_filter=None, location_is_service=False)
self.assertAllCalled()
@mock.patch('qubesadmin.tools.qvm_backup_restore.input', create=True)
@ -94,7 +97,7 @@ class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
app=self.app)
mock_backup.assert_called_once_with(
self.app, '/some/path', None, 'testpass',
force_compression_filter=None)
force_compression_filter=None, location_is_service=False)
self.assertEqual(mock_backup.return_value.options.exclude, ['test-vm2'])
self.assertAllCalled()
@ -231,3 +234,14 @@ class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
qubesadmin.tools.qvm_backup_restore.handle_broken(
self.app, mock_args, mock_restore_info)
self.assertAppropriateLogging('NetVM', 'error')
def test_100_restore_in_dispvm_parser(self):
"""Verify if qvm-backup-restore tool options matches un-parser
for paranoid restore mode"""
parser = qubesadmin.tools.qvm_backup_restore.parser
actions = parser._get_optional_actions()
options_tool = set(itertools.chain(*(a.option_strings for a in actions)))
options_parser = set(itertools.chain(
*(o.opts for o in RestoreInDisposableVM.arguments.values())))
self.assertEqual(options_tool, options_parser)

View File

@ -322,9 +322,6 @@ class PoolsAction(QubesAction):
class QubesArgumentParser(argparse.ArgumentParser):
'''Parser preconfigured for use in most of the Qubes command-line tools.
:param bool want_app: instantiate :py:class:`qubes.Qubes` object
:param bool want_app_no_instance: don't actually instantiate \
:py:class:`qubes.Qubes` object, just add argument for custom xml file
:param mixed vmname_nargs: The number of ``VMNAME`` arguments that should be
consumed. Values include:
* N (an integer) consumes N arguments (and produces a list)
@ -340,20 +337,11 @@ class QubesArgumentParser(argparse.ArgumentParser):
``--verbose`` and ``--quiet``
'''
def __init__(self, want_app=True, want_app_no_instance=False,
vmname_nargs=None, **kwargs):
def __init__(self, vmname_nargs=None, **kwargs):
super(QubesArgumentParser, self).__init__(add_help=False, **kwargs)
self._want_app = want_app
self._want_app_no_instance = want_app_no_instance
self._vmname_nargs = vmname_nargs
if self._want_app:
self.add_argument('--qubesxml', metavar='FILE', action='store',
dest='app', help=argparse.SUPPRESS)
self.add_argument('--offline-mode', action='store_true',
default=None, dest='offline_mode', help=argparse.SUPPRESS)
self.add_argument('--verbose', '-v', action='count',
help='increase verbosity')
@ -384,12 +372,11 @@ class QubesArgumentParser(argparse.ArgumentParser):
app = kwargs.pop('app', None)
namespace = super(QubesArgumentParser, self).parse_args(*args, **kwargs)
if self._want_app and not self._want_app_no_instance:
self.set_qubes_verbosity(namespace)
if app is not None:
namespace.app = app
else:
namespace.app = qubesadmin.Qubes()
self.set_qubes_verbosity(namespace)
if app is not None:
namespace.app = app
else:
namespace.app = qubesadmin.Qubes()
for action in self._actions:
# pylint: disable=protected-access

View File

@ -21,15 +21,21 @@
'''Console frontend for backup restore code'''
import getpass
import os
import sys
from qubesadmin.backup.restore import BackupRestore
from qubesadmin.backup.dispvm import RestoreInDisposableVM
import qubesadmin.exc
import qubesadmin.tools
import qubesadmin.utils
parser = qubesadmin.tools.QubesArgumentParser()
# WARNING:
# When adding options, update/verify also
# qubeadmin.restore.dispvm.RestoreInDisposableVM.arguments
#
parser.add_argument("--verify-only", action="store_true",
dest="verify_only", default=False,
help="Verify backup integrity without restoring any "
@ -84,6 +90,18 @@ parser.add_argument("-p", "--passphrase-file", action="store",
dest="pass_file", default=None,
help="Read passphrase from file, or use '-' to read from stdin")
parser.add_argument('--auto-close', action="store_true",
help="Auto-close restore window and display log on the stdout "
"(applies to --paranoid-mode)")
parser.add_argument("--location-is-service", action="store_true",
help="Interpret backup location as a qrexec service name,"
"possibly with an argument separated by +.Requires -d option.")
parser.add_argument('--paranoid-mode', '--plan-b', action="store_true",
help="Isolate restore process in a DispVM, defend against untrusted backup;"
"implies --skip-dom0-home")
parser.add_argument('backup_location', action='store',
help="Backup directory name, or command to pipe from")
@ -193,6 +211,18 @@ def handle_broken(app, args, restore_info):
"files should be copied or moved out of the new "
"directory before using them.")
def print_backup_log(backup_log):
"""Print a log on stdout, coloring it red if it's a terminal"""
if os.isatty(sys.stdout.fileno()):
sys.stdout.write('\033[0;31m')
sys.stdout.flush()
sys.stdout.buffer.write(backup_log)
if os.isatty(sys.stdout.fileno()):
sys.stdout.write('\033[0m')
sys.stdout.flush()
def main(args=None, app=None):
'''Main function of qvm-backup-restore'''
# pylint: disable=too-many-return-statements
@ -205,6 +235,29 @@ def main(args=None, app=None):
except KeyError:
parser.error('no such domain: {!r}'.format(args.appvm))
if args.location_is_service and not args.appvm:
parser.error('--location-is-service option requires -d')
if args.paranoid_mode:
args.dom0_home = False
args.app.log.info("Starting restore process in a DisposableVM...")
args.app.log.info("When operation completes, close its window "
"manually.")
restore_in_dispvm = RestoreInDisposableVM(args.app, args)
try:
backup_log = restore_in_dispvm.run()
if args.auto_close:
print_backup_log(backup_log)
except qubesadmin.exc.BackupRestoreError as e:
if e.backup_log is not None:
print_backup_log(e.backup_log)
parser.error_runtime(str(e))
return 1
except qubesadmin.exc.QubesException as e:
parser.error_runtime(str(e))
return 1
return
if args.pass_file is not None:
pass_f = open(args.pass_file) if args.pass_file != "-" else sys.stdin
passphrase = pass_f.readline().rstrip()
@ -218,7 +271,7 @@ def main(args=None, app=None):
try:
backup = BackupRestore(args.app, args.backup_location,
appvm, passphrase,
appvm, passphrase, location_is_service=args.location_is_service,
force_compression_filter=args.compression)
except qubesadmin.exc.QubesException as e:
parser.error_runtime(str(e))

View File

@ -207,8 +207,7 @@ def get_parser(device_class=None):
"""Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-block`.
"""
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True)
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register('action', 'parsers',
qubesadmin.tools.AliasedSubParsersAction)
parser.allow_abbrev = False

View File

@ -156,8 +156,7 @@ def get_parser():
''' Creates :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-pool`.
'''
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True)
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register('action', 'parsers',
qubesadmin.tools.AliasedSubParsersAction)

View File

@ -28,7 +28,6 @@ from qubesadmin.tools import QubesArgumentParser
import qubesadmin.utils
parser = QubesArgumentParser(description=__doc__,
want_app=True,
vmname_nargs='+')
parser.add_argument("--force", "-f", action="store_true", dest="no_confirm",
default=False, help="Do not prompt for confirmation")

View File

@ -323,8 +323,7 @@ def get_parser():
'''Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-volume`.
'''
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True)
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register('action', 'parsers',
qubesadmin.tools.AliasedSubParsersAction)
sub_parsers = parser.add_subparsers(

View File

@ -23,6 +23,8 @@
#
"""Various utility functions."""
import fcntl
import os
import re
@ -164,3 +166,29 @@ def encode_for_vmexec(args):
part = re.sub(br'[^a-zA-Z0-9_.]', encode, arg.encode('utf-8'))
parts.append(part)
return b'+'.join(parts).decode('ascii')
class LockFile(object):
"""Simple locking context manager. It opens a file with an advisory lock
taken (fcntl.lockf)"""
def __init__(self, path, nonblock=False):
"""Open the file. Call *acquire* or enter the context to lock
the file"""
self.file = open(path, "w")
self.nonblock = nonblock
def __enter__(self, *args, **kwargs):
self.acquire()
return self
def acquire(self):
"""Lock the opened file"""
fcntl.lockf(self.file,
fcntl.LOCK_EX | (fcntl.LOCK_NB if self.nonblock else 0))
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self.release()
def release(self):
"""Unlock the file and close the file object"""
fcntl.lockf(self.file, fcntl.LOCK_UN)
self.file.close()

View File

@ -11,8 +11,11 @@ BuildRequires: python%{python3_pkgversion}-setuptools
BuildRequires: python%{python3_pkgversion}-devel
BuildRequires: python%{python3_pkgversion}-sphinx
BuildRequires: python%{python3_pkgversion}-dbus
BuildRequires: python%{python3_pkgversion}-lxml
BuildRequires: python%{python3_pkgversion}-xcffib
Requires: python%{python3_pkgversion}-qubesadmin
Requires: python%{python3_pkgversion}-yaml
Requires: scrypt
BuildArch: noarch
Source0: %{name}-%{version}.tar.gz
Conflicts: qubes-core-agent < 4.1.9