Merge remote-tracking branch 'origin/master' into qvm-template

This commit is contained in:
WillyPillow 2020-08-31 01:49:47 +08:00
commit fbf6c4e3c3
No known key found for this signature in database
GPG Key ID: 3839E194B1415A9C
55 changed files with 2065 additions and 449 deletions

View File

@ -13,6 +13,7 @@ extension-pkg-whitelist=lxml.etree
disable=
bad-continuation,
raising-format-tuple,
raise-missing-from,
import-outside-toplevel,
inconsistent-return-statements,
duplicate-code,

View File

@ -1,50 +1,48 @@
sudo: required
dist: bionic
language: python
python:
- '3.5'
- '3.6'
- '3.7'
install:
- pip install --quiet docutils
- pip install --quiet -r ci/requirements.txt
- git clone https://github.com/"${TRAVIS_REPO_SLUG%%/*}"/qubes-builder ~/qubes-builder
script:
- test -z "$TESTS_ONLY" || python setup.py build
- test -z "$TESTS_ONLY" || { cd build/lib; PYTHONPATH=../../test-packages pylint --rcfile=../../.pylintrc qubesadmin; }
- test -z "$TESTS_ONLY" || { cd build/lib; ROOTDIR=../.. ../../run-tests; }
- test -n "$TESTS_ONLY" || ~/qubes-builder/scripts/travis-build
env:
- TESTS_ONLY=1 ENABLE_SLOW_TESTS=1
after_success:
- codecov
import:
- source: QubesOS/qubes-continuous-integration:R4.1/travis-base-r4.1.yml
mode: deep_merge_prepend
- source: QubesOS/qubes-continuous-integration:R4.1/travis-dom0-r4.1.yml
- source: QubesOS/qubes-continuous-integration:R4.1/travis-vms-r4.1.yml
jobs:
include:
- env: DIST_DOM0=fc25 USE_QUBES_REPO_VERSION=4.0 USE_QUBES_REPO_TESTING=1 TESTS_ONLY=
python: '3.5'
- env: DISTS_VM=fc29 USE_QUBES_REPO_VERSION=4.0 USE_QUBES_REPO_TESTING=1 TESTS_ONLY=
python: '3.5'
- env: DISTS_VM=fc30 USE_QUBES_REPO_VERSION=4.0 USE_QUBES_REPO_TESTING=1 TESTS_ONLY=
python: '3.5'
- env: DISTS_VM=stretch USE_QUBES_REPO_VERSION=4.0 USE_QUBES_REPO_TESTING=1 TESTS_ONLY=
python: '3.5'
- env: DISTS_VM=buster USE_QUBES_REPO_VERSION=4.0 USE_QUBES_REPO_TESTING=1 TESTS_ONLY=
python: '3.5'
- env:
- ENABLE_SLOW_TESTS=1
language: python
python: '3.6'
install:
- pip install --quiet -r ci/requirements.txt
script:
- python setup.py build
- PYTHONPATH=test-packages pylint qubesadmin
- ./run-tests
after_success:
- codecov
- env:
- ENABLE_SLOW_TESTS=1
language: python
python: '3.7'
install:
- pip install --quiet -r ci/requirements.txt
script:
- python setup.py build
- PYTHONPATH=test-packages pylint qubesadmin
- ./run-tests
after_success:
- codecov
- env:
- ENABLE_SLOW_TESTS=1
language: python
python: '3.8'
install:
- pip install --quiet -r ci/requirements.txt
script:
- python setup.py build
- PYTHONPATH=test-packages pylint qubesadmin
- ./run-tests
after_success:
- codecov
- stage: deploy
python: '3.5'
env: DIST_DOM0=fc25 TESTS_ONLY=
script: ~/qubes-builder/scripts/travis-deploy
# don't build tags which are meant for code signing only
branches:
except:
- /.*_.*/
addons:
apt:
packages:
- debootstrap
# vim: ts=2 sts=2 sw=2 et
env: DIST_DOM0=fc32
script:
- ~/qubes-builder/scripts/travis-deploy

View File

@ -12,8 +12,10 @@ install:
$(PYTHON) setup.py install -O1 $(PYTHON_PREFIX_ARG) --root $(DESTDIR)
install -d $(DESTDIR)/etc/xdg/autostart
install -m 0644 etc/qvm-start-daemon.desktop $(DESTDIR)/etc/xdg/autostart/
install -m 0644 etc/qvm-start-daemon-kde.desktop $(DESTDIR)/etc/xdg/autostart/
install -d $(DESTDIR)/usr/bin
ln -sf qvm-start-daemon $(DESTDIR)/usr/bin/qvm-start-gui
install -m 0755 scripts/qubes-guivm-session $(DESTDIR)/usr/bin/
clean:
rm -rf test-packages/__pycache__ qubesadmin/__pycache__

View File

@ -9,3 +9,4 @@ mock
lxml
PyYAML
xcffib
asynctest

98
debian/changelog vendored
View File

@ -1,3 +1,101 @@
qubes-core-admin-client (4.1.9-1) unstable; urgency=medium
*
-- Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com> Wed, 12 Aug 2020 11:01:43 +0200
qubes-core-admin-client (4.1.8-1) unstable; urgency=medium
[ WillyPillow ]
* Add admin.vm.volume.Clear call (QubesOS/qubes-issues#5946)
[ Marek Marczykowski-Górecki ]
* tests: update for admin.vm.volume.Clear usage in qvm-template-
postprocess
* doc: document qvm-start-daemon --force, --kde
* Make Label() object hashable
[ Paweł Marczewski ]
* qvm-start-daemon: convert to async/await syntax
* qvm-start-daemon: allow --watch without --all
* Add qubes-guivm-session utility
[ Marek Marczykowski-Górecki ]
* travis: use sourced config, switch to R4.1
* tests: use asynctest some more
* utils: fix encoding '+' for qubes.VMExec
* backup/restore: distinguish dom0 by name
* backup/restore: improve error message about restoring tags
* backup/restore: option for alternative qrexec service
* backup/restore: use qfile-unpacker in a VM, request disk space
monitoring
* utils: add simple locking primitive
* rpm/deb: add dependency on scrypt
* Add "paranoid restore" mode
* tools: remove obsolete _want_app argument
* backup/restore: add option for unattended restore and extracting log
* rpm: add BR: python3-lxml and python3-xcffib
* tests: extend run_service mockup for pre-recorded output
* tests: remove extra empty lines
* tests: add paranoid backup restore unit tests
* doc: document 'tag-created-vm-with' feature
* backup/restore: better error detection for --paranoid-mode
* backup/restore: make error reporting work also for StandaloneVM
based DispVM
* Cleanup Admin API denial reporting
[ Marta Marczykowska-Górecka ]
* Added more resilience to missing permissions to utils
* qvm-run will unpause paused VMs by defaults
-- Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com> Tue, 11 Aug 2020 19:26:32 +0200
qubes-core-admin-client (4.1.7-1) unstable; urgency=medium
[ Marta Marczykowska-Górecka ]
* Added a safeguard for invalid firewall rules
[ Marek Marczykowski-Górecki ]
* tools/qvm-start-daemon: reduce required permissions to sys-gui
itself
[ Frédéric Pierret (fepitre) ]
* debian: add guivm related content
* Makefile: add clean of pkgs and debian changelog.*
[ Dmitry Fedorov ]
* connect to PA in stubdom if audio-model enabled run pacat in low
latency mode by default
* use function to determine pacat domid
[ Marta Marczykowska-Górecka ]
* Added better __eq__ method to Label class
[ Frédéric Pierret (fepitre) ]
* Handle KDE with specific arg/desktop file
* Fix missing semi-colon and new line
* tests: kde_args are passed with property of launcher
* qvm-start-daemon: common_guid_args is now a staticmethod
[ Marek Marczykowski-Górecki ]
* Fix VM validity check for cached VM objects
[ Marta Marczykowska-Górecka ]
* Fixed inconsistent firewall address checking
[ Paweł Marczewski ]
* Generate qubes-guid options based on features
* Clean up the guid-conf file on domain stop
[ Marek Marczykowski-Górecki ]
* Wrap too long line
* rpm/deb: require new enough qubes-gui-daemon
[ Marta Marczykowska-Górecka ]
* Added dynamic X keyboard event monitoring to qvm_start_daemon.py
-- Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com> Wed, 15 Jul 2020 16:18:20 +0200
qubes-core-admin-client (4.1.6-1) unstable; urgency=medium
* Make pylint happy

5
debian/control vendored
View File

@ -23,10 +23,13 @@ Package: qubes-core-admin-client
Architecture: any
Depends:
python3-qubesadmin,
scrypt,
${python:Depends},
${python3:Depends},
${misc:Depends}
Conflicts: qubes-core-agent (<< 4.1.9)
Conflicts:
qubes-core-agent (<< 4.1.9),
qubes-gui-daemon (<< 4.1.7)
Description: Qubes administrative tools
Tools to manage Qubes system using Admin API

View File

@ -87,7 +87,26 @@ Options
Read passphrase from file, or use '-' to read from stdin
.. option:: --location-is-service
Provided backup location is a qrexec service name (optionally with an
argument, separated by ``+``), instead of file path or a command.
.. option:: --paranoid-mode, --plan-b
Isolate restore process in a DisposableVM, defend against potentially
compromised backup. In this mode some parts of the backup are skipped,
specifically:
- dom0 home directory (desktop environment settings)
- PCI devices assignments
.. option:: --auto-close
When running with --paranoid-mode (see above), automatically close restore
progress window after the restore process is finished and display restore log
on the standard output. The log will be colored red if the standard output is
a terminal.
Authors
=======

View File

@ -82,6 +82,19 @@ See also `gui` feature.
If neither `gui` nor `gui-emulated` is set, emulated VGA is used (if
applicable for given VM virtualization mode).
gui-\*, gui-default-\*
^^^^^^^^^^^^^^^^^^^^^^
GUI daemon configuration. See `/etc/qubes/guid.conf` for a list of supported
options.
To change a given GUI option for a specific qube, set the `gui-{option}`
feature (with underscores replaced with dashes). For example, to enable
`allow_utf8_titles` for a qube, set `gui-allow-utf8-titles` to `True`.
To change a given GUI option globally, set the `gui-default-{option}` feature
on the GuiVM for that qube.
qrexec
^^^^^^
@ -205,6 +218,13 @@ other modes it is ignored.
Default: True
tag-created-vm-with
^^^^^^^^^^^^^^^^^^^
When a qube with this feature create a new VM, it gets extra tags listed in this
feature value (separated with space) automatically. Tags are added before qube
creation finishes.
Authors
-------

View File

@ -20,7 +20,7 @@
Synopsis
--------
:command:`qvm-start-daemon` [-h] [--verbose] [--quiet] [--all] [--exclude *EXCLUDE*] [--watch] [--force-stubdomain] [--pidfile *PIDFILE*] [--notify-monitory-layout] [*VMNAME* [*VMNAME* ...]]
:command:`qvm-start-daemon` [-h] [--verbose] [--quiet] [--all] [--exclude *EXCLUDE*] [--watch] [--kde] [--force] [--force-stubdomain] [--pidfile *PIDFILE*] [--notify-monitory-layout] [*VMNAME* [*VMNAME* ...]]
Options
-------
@ -47,12 +47,23 @@ Options
.. option:: --watch
Keep watching for further domains startups, must be used with --all
Keep watching for further domain startups
.. option:: --force-stubdomain
Start GUI to stubdomain-emulated VGA, even if gui-agent is running in the VM
.. option:: --force
Force running, even if this isn't GUI/Audio domain. GUI domain is a domain
with 'guivm-gui-agent' qvm-service enabled. Similarly for Audio domain it is
'audiovm-audio-agent' qvm-service.
.. option:: --kde
Set KDE specific arguments to gui-daemon - required for proper windows
decoration on KDE.
.. option:: --pidfile
Pidfile path to create in --watch mode

View File

@ -0,0 +1,9 @@
[Desktop Entry]
Name=Qubes Guid/Pacat
Comment=Starts GUI/AUDIO daemon for Qubes VMs in KDE
Icon=qubes
Exec=qvm-start-daemon --all --watch --kde
Terminal=false
Type=Application
OnlyShowIn=KDE;

View File

@ -5,3 +5,5 @@ Icon=qubes
Exec=qvm-start-daemon --all --watch
Terminal=false
Type=Application
NotShowIn=KDE;

View File

@ -82,7 +82,7 @@ class VMCollection(object):
if vm.name not in self._vm_list:
# VM no longer exists
del self._vm_objects[name]
elif vm.__class__.__name__ != self._vm_list[vm.name]['class']:
elif vm.klass != self._vm_list[vm.name]['class']:
# VM class have changed
del self._vm_objects[name]
# TODO: some generation ID, to detect VM re-creation
@ -167,7 +167,7 @@ class QubesBase(qubesadmin.base.PropertyHolder):
cache_enabled = False
def __init__(self):
super(QubesBase, self).__init__(self, 'admin.property.', 'dom0')
super().__init__(self, 'admin.property.', 'dom0')
self.domains = VMCollection(self)
self.labels = qubesadmin.base.WrapperObjectsCollection(
self, 'admin.label.List', qubesadmin.label.Label)
@ -250,7 +250,7 @@ class QubesBase(qubesadmin.base.PropertyHolder):
def get_label(self, label):
"""Get label as identified by index or name
:throws KeyError: when label is not found
:throws QubesLabelNotFoundError: when label is not found
"""
# first search for name, verbatim
@ -264,7 +264,7 @@ class QubesBase(qubesadmin.base.PropertyHolder):
for i in self.labels.values():
if i.index == int(label):
return i
raise KeyError(label)
raise qubesadmin.exc.QubesLabelNotFoundError(label)
@staticmethod
def get_vm_class(clsname):
@ -454,19 +454,19 @@ class QubesBase(qubesadmin.base.PropertyHolder):
['qvm-appmenus', '--init', '--update',
'--source', src_vm.name, dst_vm.name]
subprocess.check_output(appmenus_cmd, stderr=subprocess.STDOUT)
except OSError:
except OSError as e:
# this file needs to be python 2.7 compatible,
# so no FileNotFoundError
self.log.error('Failed to clone appmenus, qvm-appmenus missing')
if not ignore_errors:
raise qubesadmin.exc.QubesException(
'Failed to clone appmenus')
'Failed to clone appmenus') from e
except subprocess.CalledProcessError as e:
self.log.error('Failed to clone appmenus: %s',
e.output.decode())
if not ignore_errors:
raise qubesadmin.exc.QubesException(
'Failed to clone appmenus')
'Failed to clone appmenus') from e
except qubesadmin.exc.QubesException:
if not ignore_errors:
@ -838,7 +838,7 @@ class QubesRemote(QubesBase):
stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate(payload)
if p.returncode != 0:
raise qubesadmin.exc.QubesDaemonNoResponseError(
raise qubesadmin.exc.QubesDaemonAccessError(
'Service call error: %s', stderr.decode())
return self._parse_qubesd_response(stdout)

View File

@ -38,7 +38,7 @@ class Core2VM(qubesadmin.backup.BackupVM):
'''VM object'''
# pylint: disable=too-few-public-methods
def __init__(self):
super(Core2VM, self).__init__()
super().__init__()
self.backup_content = False
@property
@ -148,7 +148,7 @@ class Core2Qubes(qubesadmin.backup.BackupApp):
raise ValueError("store path required")
self.qid_map = {}
self.log = logging.getLogger('qubesadmin.backup.core2')
super(Core2Qubes, self).__init__(store)
super().__init__(store)
def load_globals(self, element):
'''Load global settings

View File

@ -57,7 +57,7 @@ class Core3Qubes(qubesadmin.backup.BackupApp):
raise ValueError("store path required")
self.log = logging.getLogger('qubesadmin.backup.core3')
self.labels = {}
super(Core3Qubes, self).__init__(store)
super().__init__(store)
@staticmethod
def get_property(xml_obj, prop):

343
qubesadmin/backup/dispvm.py Normal file
View File

@ -0,0 +1,343 @@
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2019 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Handle backup extraction using DisposableVM"""
import collections
import datetime
import itertools
import logging
import os
import string
import subprocess
#import typing
import qubesadmin
import qubesadmin.exc
import qubesadmin.utils
import qubesadmin.vm
LOCKFILE = '/var/run/qubes/backup-paranoid-restore.lock'
Option = collections.namedtuple('Option', ('opts', 'handler'))
# Convenient functions for 'handler' value of Option object
# (see RestoreInDisposableVM.arguments):
def handle_store_true(option, value):
"""Handle argument enabling an option (action="store_true")"""
if value:
return [option.opts[0]]
return []
def handle_store_false(option, value):
"""Handle argument disabling an option (action="false")"""
if not value:
return [option.opts[0]]
return []
def handle_verbose(option, value):
"""Handle argument --quiet / --verbose options (action="count")"""
if option.opts[0] == '--verbose':
value -= 1 # verbose defaults to 1
return [option.opts[0]] * value
def handle_store(option, value):
"""Handle argument with arbitrary string value (action="store")"""
if value:
return [option.opts[0], str(value)]
return []
def handle_append(option, value):
"""Handle argument with a list of values (action="append")"""
return itertools.chain(*([option.opts[0], v] for v in value))
def skip(_option, _value):
"""Skip argument"""
return []
class RestoreInDisposableVM:
"""Perform backup restore with actual archive extraction isolated
within DisposableVM"""
#dispvm: typing.Optional[qubesadmin.vm.QubesVM]
#: map of args attr -> original option
arguments = {
'quiet': Option(('--quiet', '-q'), handle_verbose),
'verbose': Option(('--verbose', '-v'), handle_verbose),
'verify_only': Option(('--verify-only',), handle_store_true),
'skip_broken': Option(('--skip-broken',), handle_store_true),
'ignore_missing': Option(('--ignore-missing',), handle_store_true),
'skip_conflicting': Option(('--skip-conflicting',), handle_store_true),
'rename_conflicting': Option(('--rename-conflicting',),
handle_store_true),
'exclude': Option(('--exclude', '-x'), handle_append),
'dom0_home': Option(('--skip-dom0-home',), handle_store_false),
'ignore_username_mismatch': Option(('--ignore-username-mismatch',),
handle_store_true),
'ignore_size_limit': Option(('--ignore-size-limit',),
handle_store_true),
'compression': Option(('--compression-filter', '-Z'), handle_store),
'appvm': Option(('--dest-vm', '-d'), handle_store),
'pass_file': Option(('--passphrase-file', '-p'), handle_store),
'location_is_service': Option(('--location-is-service',),
handle_store_true),
'paranoid_mode': Option(('--paranoid-mode', '--plan-b',), skip),
'auto_close': Option(('--auto-close',), skip),
# make the verification easier, those don't really matter
'help': Option(('--help', '-h'), skip),
'force_root': Option(('--force-root',), skip),
}
def __init__(self, app, args):
"""
:param app: Qubes() instance
:param args: namespace instance as with qvm-backup-restore arguments
parsed. See :py:module:`qubesadmin.tools.qvm_backup_restore`.
"""
self.app = app
self.args = args
# only one backup restore is allowed at the time, use constant names
#: name of DisposableVM using to extract the backup
self.dispvm_name = 'disp-backup-restore'
#: tag given to this DisposableVM - qrexec policy is configured for it
self.dispvm_tag = 'backup-restore-mgmt'
#: tag automatically added to restored VMs
self.restored_tag = 'backup-restore-in-progress'
#: tag added to a VM storing the backup archive
self.storage_tag = 'backup-restore-storage'
# FIXME: make it random, collision free
# (when considering non-disposable case)
self.backup_log_path = '/var/tmp/backup-restore.log'
self.terminal_app = ('xterm', '-hold', '-title', 'Backup restore', '-e',
'/bin/sh', '-c',
'("$0" "$@" 2>&1; echo exit code: $?) | tee {}'.
format(self.backup_log_path))
if args.auto_close:
# filter-out '-hold'
self.terminal_app = tuple(a for a in self.terminal_app
if a != '-hold')
self.dispvm = None
if args.appvm:
self.backup_storage_vm = self.app.domains[args.appvm]
else:
self.backup_storage_vm = self.app.domains['dom0']
self.storage_access_proc = None
self.storage_access_id = None
self.log = logging.getLogger('qubesadmin.backup.dispvm')
def clear_old_tags(self):
"""Remove tags from old restore operation"""
for domain in self.app.domains:
domain.tags.discard(self.restored_tag)
domain.tags.discard(self.dispvm_tag)
domain.tags.discard(self.storage_tag)
def create_dispvm(self):
"""Create DisposableVM used to restore"""
self.dispvm = self.app.add_new_vm('DispVM', self.dispvm_name, 'red',
template=self.app.management_dispvm)
self.dispvm.auto_cleanup = True
self.dispvm.features['tag-created-vm-with'] = self.restored_tag
def transfer_pass_file(self, path):
"""Copy passhprase file to the DisposableVM"""
subprocess.check_call(
['qvm-copy-to-vm', self.dispvm_name, path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return '/home/{}/QubesIncoming/{}/{}'.format(
self.dispvm.default_user,
os.uname()[1],
os.path.basename(path)
)
def register_backup_source(self):
"""Tell backup archive holding VM we want this content.
This function registers a backup source, receives a token needed to
access it (stored in *storage_access_id* attribute). The access is
revoked when connection referenced in *storage_access_proc* attribute
is closed.
"""
self.storage_access_proc = self.backup_storage_vm.run_service(
'qubes.RegisterBackupLocation', stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
self.storage_access_proc.stdin.write(
(self.args.backup_location.
replace("\r", "").replace("\n", "") + "\n").encode())
self.storage_access_proc.stdin.flush()
storage_access_id = self.storage_access_proc.stdout.readline().strip()
allowed_chars = (string.ascii_letters + string.digits).encode()
if not storage_access_id or \
not all(c in allowed_chars for c in storage_access_id):
if self.storage_access_proc.returncode == 127:
raise qubesadmin.exc.QubesException(
'Backup source registration failed - qubes-core-agent '
'package too old?')
raise qubesadmin.exc.QubesException(
'Backup source registration failed - got invalid id')
self.storage_access_id = storage_access_id.decode('ascii')
# keep connection open, closing it invalidates the access
self.backup_storage_vm.tags.add(self.storage_tag)
def invalidate_backup_access(self):
"""Revoke access to backup archive"""
self.backup_storage_vm.tags.discard(self.storage_tag)
self.storage_access_proc.stdin.close()
self.storage_access_proc.wait()
def prepare_inner_args(self):
"""Prepare arguments for inner (in-DispVM) qvm-backup-restore command"""
new_options = []
new_positional_args = []
for attr, opt in self.arguments.items():
if not hasattr(self.args, attr):
continue
new_options.extend(opt.handler(opt, getattr(self.args, attr)))
new_options.append('--location-is-service')
# backup location, replace by qrexec service to be called
new_positional_args.append(
'qubes.RestoreById+' + self.storage_access_id)
if self.args.vms:
new_positional_args.extend(self.args.vms)
return new_options + new_positional_args
def finalize_tags(self):
"""Make sure all the restored VMs are marked with
restored-from-backup-xxx tag, then remove backup-restore-in-progress
tag"""
self.app.domains.clear_cache()
for domain in self.app.domains:
if 'backup-restore-in-progress' not in domain.tags:
continue
if not any(t.startswith('restored-from-backup-')
for t in domain.tags):
self.log.warning('Restored domain %s was not tagged with '
'restored-from-backup-* tag',
domain.name)
# add fallback tag
domain.tags.add('restored-from-backup-at-{}'.format(
datetime.date.strftime(datetime.date.today(), '%F')))
domain.tags.discard('backup-restore-in-progress')
@staticmethod
def sanitize_log(untrusted_log):
"""Replace characters potentially dangerouns to terminal in
a backup log"""
allowed_set = set(range(0x20, 0x7e))
allowed_set.update({0x0a})
return bytes(c if c in allowed_set else ord('.') for c in untrusted_log)
def extract_log(self):
"""Extract restore log from the DisposableVM"""
untrusted_backup_log, _ = self.dispvm.run_with_args(
'cat', self.backup_log_path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
backup_log = self.sanitize_log(untrusted_backup_log)
return backup_log
def run(self):
"""Run the backup restore operation"""
lock = qubesadmin.utils.LockFile(LOCKFILE, True)
lock.acquire()
try:
self.create_dispvm()
self.clear_old_tags()
self.register_backup_source()
self.dispvm.start()
self.dispvm.run_service_for_stdio('qubes.WaitForSession')
if self.args.pass_file:
self.args.pass_file = self.transfer_pass_file(
self.args.pass_file)
args = self.prepare_inner_args()
self.dispvm.tags.add(self.dispvm_tag)
self.dispvm.run_with_args(*self.terminal_app,
'qvm-backup-restore', *args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
backup_log = self.extract_log()
last_line = backup_log.splitlines()[-1]
if not last_line.startswith(b'exit code:'):
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore did not reported exit code',
backup_log=backup_log)
try:
exit_code = int(last_line.split()[-1])
except ValueError:
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore reported unexpected exit code',
backup_log=backup_log)
if exit_code == 127:
raise qubesadmin.exc.QubesException(
'qvm-backup-restore tool '
'missing in {} template, install qubes-core-admin-client '
'package there'.format(
getattr(self.dispvm.template,
'template',
self.dispvm.template).name)
)
if exit_code != 0:
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore failed with {}'.format(exit_code),
backup_log=backup_log)
return backup_log
except subprocess.CalledProcessError as e:
if e.returncode == 127:
raise qubesadmin.exc.QubesException(
'{} missing in {} template, install it there '
'package there'.format(self.terminal_app[0],
self.dispvm.template.template.name)
)
try:
backup_log = self.extract_log()
except: # pylint: disable=bare-except
backup_log = None
raise qubesadmin.exc.BackupRestoreError(
'qvm-backup-restore failed with {}'.format(e.returncode),
backup_log=backup_log)
finally:
if self.dispvm is not None:
# first revoke permission, then cleanup
self.dispvm.tags.discard(self.dispvm_tag)
# autocleanup removes the VM
try:
self.dispvm.kill()
except qubesadmin.exc.QubesVMNotStartedError:
# delete it manually
del self.app.domains[self.dispvm]
self.finalize_tags()
lock.release()

View File

@ -82,7 +82,7 @@ _tar_file_size_re = re.compile(r"^[^ ]+ [^ ]+/[^ ]+ *([0-9]+) .*")
class BackupCanceledError(QubesException):
'''Exception raised when backup/restore was cancelled'''
def __init__(self, msg, tmpdir=None):
super(BackupCanceledError, self).__init__(msg)
super().__init__(msg)
self.tmpdir = tmpdir
def init_supported_hmac_and_crypto():
@ -361,7 +361,7 @@ class ExtractWorker3(Process):
:param bool verify_only: only verify data integrity, do not extract
:param dict handlers: handlers for actual data
'''
super(ExtractWorker3, self).__init__()
super().__init__()
#: queue with files to extract
self.queue = queue
#: paths on the queue are relative to this dir
@ -904,14 +904,14 @@ class BackupRestore(object):
USERNAME_MISMATCH = object()
def __init__(self, vm, subdir=None):
super(BackupRestore.Dom0ToRestore, self).__init__(vm)
super().__init__(vm)
if subdir:
self.subdir = subdir
self.username = os.path.basename(subdir)
def __init__(self, app, backup_location, backup_vm, passphrase,
force_compression_filter=None):
super(BackupRestore, self).__init__()
location_is_service=False, force_compression_filter=None):
super().__init__()
#: qubes.Qubes instance
self.app = app
@ -921,12 +921,16 @@ class BackupRestore(object):
#: VM from which backup should be retrieved
self.backup_vm = backup_vm
if backup_vm and backup_vm.qid == 0:
if backup_vm and backup_vm.name == 'dom0':
self.backup_vm = None
#: backup path, inside VM pointed by :py:attr:`backup_vm`
self.backup_location = backup_location
#: use alternative qrexec service to retrieve backup data, instead of
#: ``qubes.Restore`` with *backup_location* given on stdin
self.location_is_service = location_is_service
#: force using specific application for (de)compression, instead of
#: the one named in the backup header
self.force_compression_filter = force_compression_filter
@ -973,11 +977,14 @@ class BackupRestore(object):
vmproc = None
if self.backup_vm is not None:
# If APPVM, STDOUT is a PIPE
vmproc = self.backup_vm.run_service('qubes.Restore')
vmproc.stdin.write(
(self.backup_location.replace("\r", "").replace("\n",
"") + "\n").encode())
vmproc.stdin.flush()
if self.location_is_service:
vmproc = self.backup_vm.run_service(self.backup_location)
else:
vmproc = self.backup_vm.run_service('qubes.Restore')
vmproc.stdin.write(
(self.backup_location.replace("\r", "").replace("\n",
"") + "\n").encode())
vmproc.stdin.flush()
# Send to tar2qfile the VMs that should be extracted
vmproc.stdin.write((" ".join(filelist) + "\n").encode())
@ -985,9 +992,14 @@ class BackupRestore(object):
self.processes_to_kill_on_cancel.append(vmproc)
backup_stdin = vmproc.stdout
# FIXME use /usr/lib/qubes/qfile-unpacker in non-dom0
tar1_command = ['/usr/libexec/qubes/qfile-dom0-unpacker',
str(os.getuid()), self.tmpdir, '-v']
if isinstance(self.app, qubesadmin.app.QubesRemote):
qfile_unpacker_path = '/usr/lib/qubes/qfile-unpacker'
else:
qfile_unpacker_path = '/usr/libexec/qubes/qfile-dom0-unpacker'
# keep at least 500M free for decryption of a previous chunk
tar1_command = [qfile_unpacker_path,
str(os.getuid()), self.tmpdir, '-v',
'-w', str(500 * 1024 * 1024)]
else:
backup_stdin = open(self.backup_location, 'rb')
@ -2035,8 +2047,9 @@ class BackupRestore(object):
try:
new_vm.tags.add(tag)
except Exception as err: # pylint: disable=broad-except
self.log.error('Error adding tag %s to %s: %s',
tag, vm.name, err)
if tag not in new_vm.tags:
self.log.error('Error adding tag %s to %s: %s',
tag, vm.name, err)
for bus in vm.devices:
for backend_domain, ident in vm.devices[bus]:

View File

@ -78,7 +78,7 @@ class PropertyHolder(object):
'''
if response_data == b'':
raise qubesadmin.exc.QubesDaemonNoResponseError(
raise qubesadmin.exc.QubesDaemonAccessError(
'Got empty response from qubesd. See journalctl in dom0 for '
'details.')
@ -151,11 +151,14 @@ class PropertyHolder(object):
# cached properties list
if self._properties is not None and item not in self._properties:
raise AttributeError(item)
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'Get',
item,
None)
try:
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'Get',
item,
None)
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError(item)
is_default, value = self._deserialize_property(property_str)
if self.app.cache_enabled:
self._properties_cache[item] = (is_default, value)
@ -170,11 +173,14 @@ class PropertyHolder(object):
'''
if item.startswith('_'):
raise AttributeError(item)
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'GetDefault',
item,
None)
try:
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'GetDefault',
item,
None)
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError(item)
if not property_str:
raise AttributeError(item + ' has no default')
(prop_type, value) = property_str.split(b' ', 1)
@ -339,7 +345,7 @@ class PropertyHolder(object):
def __setattr__(self, key, value):
if key.startswith('_') or key in self._local_properties():
return super(PropertyHolder, self).__setattr__(key, value)
return super().__setattr__(key, value)
if value is qubesadmin.DEFAULT:
try:
self.qubesd_call(
@ -365,7 +371,7 @@ class PropertyHolder(object):
def __delattr__(self, name):
if name.startswith('_') or name in self._local_properties():
return super(PropertyHolder, self).__delattr__(name)
return super().__delattr__(name)
try:
self.qubesd_call(
self._method_dest,

View File

@ -115,7 +115,7 @@ class UnknownDevice(DeviceInfo):
**kwargs):
if description is None:
description = "Unknown device"
super(UnknownDevice, self).__init__(backend_domain, devclass, ident,
super().__init__(backend_domain, devclass, ident,
description, **kwargs)
@ -295,7 +295,7 @@ class DeviceManager(dict):
"""
def __init__(self, vm):
super(DeviceManager, self).__init__()
super().__init__()
self._vm = vm
def __missing__(self, key):

View File

@ -25,7 +25,7 @@ class QubesException(Exception):
'''Base exception for all Qubes-related errors.'''
def __init__(self, message_format, *args, **kwargs):
# TODO: handle translations
super(QubesException, self).__init__(
super().__init__(
message_format % tuple(int(d) if d.isdigit() else d for d in args),
**kwargs)
@ -138,6 +138,13 @@ class QubesTagNotFoundError(QubesException, KeyError):
return QubesException.__str__(self)
class QubesLabelNotFoundError(QubesException, KeyError):
"""Label does not exists"""
def __str__(self):
# KeyError overrides __str__ method
return QubesException.__str__(self)
class StoragePoolException(QubesException):
''' A general storage exception '''
@ -154,14 +161,23 @@ class DeviceAlreadyAttached(QubesException, KeyError):
return QubesException.__str__(self)
class BackupRestoreError(QubesException):
'''Restoring a backup failed'''
def __init__(self, msg, backup_log=None):
super().__init__(msg)
self.backup_log = backup_log
# pylint: disable=too-many-ancestors
class QubesDaemonNoResponseError(QubesDaemonCommunicationError):
'''Got empty response from qubesd'''
class QubesDaemonAccessError(QubesDaemonCommunicationError):
'''Got empty response from qubesd. This can be lack of permission,
or some server-side issue.'''
class QubesPropertyAccessError(QubesException, AttributeError):
class QubesPropertyAccessError(QubesDaemonAccessError, AttributeError):
'''Failed to read/write property value, cause is unknown (insufficient
permissions, no such property, invalid value, other)'''
def __init__(self, prop):
super(QubesPropertyAccessError, self).__init__(
'Failed to access \'%s\' property' % prop)
super().__init__('Failed to access \'%s\' property' % prop)
# legacy name
QubesDaemonNoResponseError = QubesDaemonAccessError

View File

@ -35,7 +35,7 @@ class Features(object):
# pylint: disable=too-few-public-methods
def __init__(self, vm):
super(Features, self).__init__()
super().__init__()
self.vm = vm
def __delitem__(self, key):

View File

@ -23,6 +23,8 @@
import datetime
import socket
import string
class RuleOption(object):
'''Base class for a single rule element'''
@ -51,7 +53,7 @@ class RuleChoice(RuleOption):
'''Base class for multiple-choices rule elements'''
# pylint: disable=abstract-method
def __init__(self, value):
super(RuleChoice, self).__init__(value)
super().__init__(value)
self.allowed_values = \
[v for k, v in self.__class__.__dict__.items()
if not k.startswith('__') and isinstance(v, str) and
@ -120,6 +122,9 @@ class DstHost(RuleOption):
except socket.error:
self.type = 'dsthost'
self.prefixlen = 0
safe_set = string.ascii_lowercase + string.digits + '-._'
if not all(c in safe_set for c in value):
raise ValueError('Invalid hostname')
else:
host, prefixlen = value.split('/', 1)
prefixlen = int(prefixlen)
@ -143,7 +148,7 @@ class DstHost(RuleOption):
except socket.error:
raise ValueError('Invalid IP address: ' + host)
super(DstHost, self).__init__(value)
super().__init__(value)
@property
def rule(self):
@ -170,7 +175,7 @@ class DstPorts(RuleOption):
raise ValueError('Ports out of range')
if self.range[0] > self.range[1]:
raise ValueError('Invalid port range')
super(DstPorts, self).__init__(
super().__init__(
str(self.range[0]) if self.range[0] == self.range[1]
else '{!s}-{!s}'.format(*self.range))
@ -183,7 +188,7 @@ class DstPorts(RuleOption):
class IcmpType(RuleOption):
'''ICMP packet type'''
def __init__(self, value):
super(IcmpType, self).__init__(value)
super().__init__(value)
value = int(value)
if value < 0 or value > 255:
raise ValueError('ICMP type out of range')
@ -207,7 +212,7 @@ class SpecialTarget(RuleChoice):
class Expire(RuleOption):
'''Rule expire time'''
def __init__(self, value):
super(Expire, self).__init__(value)
super().__init__(value)
self.datetime = datetime.datetime.utcfromtimestamp(int(value))
@property

View File

@ -46,7 +46,7 @@ class Label(object):
qubesd_response = self.app.qubesd_call(
'dom0', 'admin.label.Get', self._name, None)
except qubesadmin.exc.QubesDaemonNoResponseError:
raise AttributeError
raise qubesadmin.exc.QubesPropertyAccessError('label.color')
self._color = qubesd_response.decode()
return self._color
@ -63,15 +63,23 @@ class Label(object):
@property
def index(self):
'''color specification as in HTML (``#abcdef``)'''
'''label numeric identifier'''
if self._index is None:
try:
qubesd_response = self.app.qubesd_call(
'dom0', 'admin.label.Index', self._name, None)
except qubesadmin.exc.QubesDaemonNoResponseError:
raise AttributeError
raise qubesadmin.exc.QubesPropertyAccessError('label.index')
self._index = int(qubesd_response.decode())
return self._index
def __str__(self):
return self._name
def __eq__(self, other):
if isinstance(other, Label):
return self.name == other.name
return NotImplemented
def __hash__(self):
return hash(self.name)

View File

@ -91,7 +91,7 @@ class QubesSpinner(AbstractSpinner):
This spinner uses standard ASCII control characters'''
def __init__(self, *args, **kwargs):
super(QubesSpinner, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.hidelen = 0
self.cub1 = '\b'
@ -120,7 +120,7 @@ class QubesSpinnerEnterpriseEdition(QubesSpinner):
if charset is None:
charset = ENTERPRISE_CHARSET if self.stream_isatty else '.'
super(QubesSpinnerEnterpriseEdition, self).__init__(stream, charset)
super().__init__(stream, charset)
if self.stream_isatty:
try:

View File

@ -19,6 +19,7 @@
# with this program; if not, see <http://www.gnu.org/licenses/>.
'''Storage subsystem.'''
import qubesadmin.exc
class Volume(object):
'''Storage volume.'''
@ -112,7 +113,10 @@ class Volume(object):
'''Storage volume pool name.'''
if self._pool is not None:
return self._pool
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('pool')
return str(self._info['pool'])
@property
@ -120,25 +124,37 @@ class Volume(object):
'''Storage volume id, unique within given pool.'''
if self._vid is not None:
return self._vid
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('vid')
return str(self._info['vid'])
@property
def size(self):
'''Size of volume, in bytes.'''
self._fetch_info(True)
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('size')
return int(self._info['size'])
@property
def usage(self):
'''Used volume space, in bytes.'''
self._fetch_info(True)
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('usage')
return int(self._info['usage'])
@property
def rw(self):
'''True if volume is read-write.'''
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('rw')
return self._info['rw'] == 'True'
@rw.setter
@ -150,13 +166,19 @@ class Volume(object):
@property
def snap_on_start(self):
'''Create a snapshot from source on VM start.'''
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('snap_on_start')
return self._info['snap_on_start'] == 'True'
@property
def save_on_stop(self):
'''Commit changes to original volume on VM stop.'''
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('save_on_stop')
return self._info['save_on_stop'] == 'True'
@property
@ -165,7 +187,10 @@ class Volume(object):
If None, this volume itself will be used.
'''
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('source')
if self._info['source']:
return self._info['source']
return None
@ -173,7 +198,10 @@ class Volume(object):
@property
def revisions_to_keep(self):
'''Number of revisions to keep around'''
self._fetch_info()
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('revisions_to_keep')
return int(self._info['revisions_to_keep'])
@revisions_to_keep.setter
@ -186,7 +214,10 @@ class Volume(object):
'''Returns `True` if this snapshot of a source volume (for
`snap_on_start`=True) is outdated.
'''
self._fetch_info(True)
try:
self._fetch_info()
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('is_outdated')
return self._info.get('is_outdated', False) == 'True'
def resize(self, size):
@ -290,8 +321,11 @@ class Pool(object):
@property
def usage_details(self):
''' Storage pool usage details (current - not cached) '''
pool_usage_data = self.app.qubesd_call(
'dom0', 'admin.pool.UsageDetails', self.name, None)
try:
pool_usage_data = self.app.qubesd_call(
'dom0', 'admin.pool.UsageDetails', self.name, None)
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('usage_details')
pool_usage_data = pool_usage_data.decode('utf-8')
assert pool_usage_data.endswith('\n') or pool_usage_data == ''
pool_usage_data = pool_usage_data[:-1]
@ -306,8 +340,11 @@ class Pool(object):
def config(self):
''' Storage pool config '''
if self._config is None:
pool_info_data = self.app.qubesd_call(
'dom0', 'admin.pool.Info', self.name, None)
try:
pool_info_data = self.app.qubesd_call(
'dom0', 'admin.pool.Info', self.name, None)
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('config')
pool_info_data = pool_info_data.decode('utf-8')
assert pool_info_data.endswith('\n')
pool_info_data = pool_info_data[:-1]
@ -355,8 +392,11 @@ class Pool(object):
@property
def volumes(self):
''' Volumes managed by this pool '''
volumes_data = self.app.qubesd_call(
'dom0', 'admin.pool.volume.List', self.name, None)
try:
volumes_data = self.app.qubesd_call(
'dom0', 'admin.pool.volume.List', self.name, None)
except qubesadmin.exc.QubesDaemonAccessError:
raise qubesadmin.exc.QubesPropertyAccessError('volumes')
assert volumes_data.endswith(b'\n')
volumes_data = volumes_data[:-1].decode('ascii')
for vid in volumes_data.splitlines():

View File

@ -31,7 +31,7 @@ class Tags(object):
# pylint: disable=too-few-public-methods
def __init__(self, vm):
super(Tags, self).__init__()
super().__init__()
self.vm = vm
def remove(self, elem):

View File

@ -54,14 +54,12 @@ class TestVMCollection(dict):
class TestProcess(object):
def __init__(self, input_callback=None, stdout=None, stderr=None):
self.input_callback = input_callback
self.got_any_input = False
self.stdin = io.BytesIO()
# don't let anyone close it, before we get the value
self.stdin_close = self.stdin.close
if self.input_callback:
self.stdin.close = (
lambda: self.input_callback(self.stdin.getvalue()))
else:
self.stdin.close = lambda: None
self.stdin.close = self.store_input
self.stdin.flush = self.store_input
if stdout == subprocess.PIPE:
self.stdout = io.BytesIO()
else:
@ -72,6 +70,13 @@ class TestProcess(object):
self.stderr = stderr
self.returncode = 0
def store_input(self):
value = self.stdin.getvalue()
if (not self.got_any_input or value) and self.input_callback:
self.input_callback(self.stdin.getvalue())
self.got_any_input = True
self.stdin.truncate(0)
def communicate(self, input=None):
if input is not None:
self.stdin.write(input)
@ -102,11 +107,9 @@ class _AssertNotRaisesContext(object):
self.failureException = test_case.failureException
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
return True
@ -121,14 +124,17 @@ class _AssertNotRaisesContext(object):
class QubesTest(qubesadmin.app.QubesBase):
expected_service_calls = None
expected_calls = None
actual_calls = None
service_calls = None
def __init__(self):
super(QubesTest, self).__init__()
#: expected calls and saved replies for them
#: expected Admin API calls and saved replies for them
self.expected_calls = {}
#: expected qrexec service calls and saved replies for them
self.expected_service_calls = {}
#: actual calls made
self.actual_calls = []
#: rpc service calls
@ -152,6 +158,14 @@ class QubesTest(qubesadmin.app.QubesBase):
def run_service(self, dest, service, **kwargs):
self.service_calls.append((dest, service, kwargs))
call_key = (dest, service)
# TODO: consider it as a future extension, as a replacement for
# checking app.service_calls later
# if call_key not in self.expected_service_calls:
# raise AssertionError('Unexpected service call {!r}'.format(call_key))
if call_key in self.expected_service_calls:
kwargs = kwargs.copy()
kwargs['stdout'] = io.BytesIO(self.expected_service_calls[call_key])
return TestProcess(lambda input: self.service_calls.append((dest,
service, input)),
stdout=kwargs.get('stdout', None),

View File

@ -158,6 +158,30 @@ class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
self.fail('VM not found in collection')
self.assertAllCalled()
def test_012_getitem_cached_object(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
try:
vm1 = self.app.domains['test-vm']
vm2 = self.app.domains['test-vm']
self.assertIs(vm1, vm2)
except KeyError:
self.fail('VM not found in collection')
self.app.domains.clear_cache()
# even after clearing the cache, the same instance should be returned
# if the class haven't changed
vm3 = self.app.domains['test-vm']
self.assertIs(vm1, vm3)
# change the class and expected different object
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=StandaloneVM state=Running\n'
self.app.domains.clear_cache()
vm4 = self.app.domains['test-vm']
self.assertIsNot(vm1, vm4)
self.assertAllCalled()
class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
def setUp(self):
@ -254,6 +278,13 @@ class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
self.assertEqual(label.name, 'red')
self.assertAllCalled()
def test_021_get_nonexistant_label(self):
self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
b'0\x00red\nblue\n'
with self.assertRaises(qubesadmin.exc.QubesLabelNotFoundError):
self.app.get_label('green')
self.assertAllCalled()
def clone_setup_common_calls(self, src, dst):
# have each property type with default=no, each special-cased,
# and some with default=yes

View File

@ -1440,8 +1440,14 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
str(value).encode())] = b'0\0'
for tag in vm['tags']:
self.app.expected_calls[
(name, 'admin.vm.tag.Set', tag, None)] = b'0\0'
if tag.startswith('created-by-'):
self.app.expected_calls[
(name, 'admin.vm.tag.Set', tag, None)] = b''
self.app.expected_calls[
(name, 'admin.vm.tag.Get', tag, None)] = b'0\0001'
else:
self.app.expected_calls[
(name, 'admin.vm.tag.Set', tag, None)] = b'0\0'
if vm['backup_path']:
appmenus = (
@ -1727,7 +1733,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@ -1797,7 +1804,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@ -1867,7 +1875,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@ -1968,7 +1977,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
with contextlib.suppress(qubesadmin.exc.QubesException):
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()

View File

@ -0,0 +1,409 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2019 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import datetime
import tempfile
import unittest
import unittest.mock
from unittest.mock import call
import subprocess
import qubesadmin.tests
from qubesadmin.tools import qvm_backup_restore
from qubesadmin.backup.dispvm import RestoreInDisposableVM
class TC_00_RestoreInDispVM(qubesadmin.tests.QubesTestCase):
def setUp(self):
super().setUp()
def test_000_prepare_inner_args(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--verbose', '--skip-broken', '--skip-dom0-home',
'--dest-vm', 'testvm',
'--compression-filter', 'gzip', '/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = argv[:-1] + \
['--location-is-service', 'qubes.RestoreById+abc']
self.assertCountEqual(expected_argv, reconstructed_argv)
def test_001_prepare_inner_args_exclude(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--exclude', 'vm1', '--exclude', 'vm2',
'/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
print(repr(args))
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = argv[:-1] + \
['--location-is-service', 'qubes.RestoreById+abc']
self.assertCountEqual(expected_argv, reconstructed_argv)
def test_002_prepare_inner_args_pass_file(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--passphrase-file=/tmp/some/file',
'/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
print(repr(args))
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = ['--passphrase-file', '/tmp/some/file',
'--location-is-service', 'qubes.RestoreById+abc']
self.assertEqual(expected_argv, reconstructed_argv)
def test_003_prepare_inner_args_auto_close(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
argv = ['--auto-close', '/backup/location']
args = qvm_backup_restore.parser.parse_args(argv)
print(repr(args))
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_id = 'abc'
reconstructed_argv = obj.prepare_inner_args()
expected_argv = ['--location-is-service', 'qubes.RestoreById+abc']
self.assertEqual(expected_argv, reconstructed_argv)
def test_010_clear_old_tags(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
for tag in ('backup-restore-mgmt',
'backup-restore-in-progress',
'backup-restore-storage'):
self.app.expected_calls[
('dom0', 'admin.vm.tag.Remove', tag, None)] = \
b'2\x00QubesTagNotFoundError\x00\x00Tag not found\x00'
self.app.expected_calls[
('fedora-25', 'admin.vm.tag.Remove', tag, None)] = b'0\0'
self.app.expected_calls[
('testvm', 'admin.vm.tag.Remove', tag, None)] = b'0\0'
args = unittest.mock.Mock(appvm='testvm')
obj = RestoreInDisposableVM(self.app, args)
obj.clear_old_tags()
self.assertAllCalled()
@unittest.mock.patch('subprocess.check_call')
def test_020_create_dispvm(self, mock_check_call):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
b'mgmt-dvm class=AppVM state=Halted\n'
# this should be only after creating...
b'disp-backup-restore class=DispVM state=Halted\n'
)
self.app.expected_calls[
('dom0', 'admin.property.Get', 'management_dispvm', None)] = \
b'0\0default=False type=vm mgmt-dvm'
self.app.expected_calls[
('dom0', 'admin.vm.Create.DispVM', 'mgmt-dvm',
b'name=disp-backup-restore label=red')] = b'0\0'
self.app.expected_calls[
('disp-backup-restore', 'admin.vm.property.Set', 'auto_cleanup',
b'True')] = \
b'0\0'
self.app.expected_calls[
('disp-backup-restore', 'admin.vm.feature.Set', 'tag-created-vm-with',
b'backup-restore-in-progress')] = \
b'0\0'
args = unittest.mock.Mock(appvm='dom0')
obj = RestoreInDisposableVM(self.app, args)
obj.create_dispvm()
self.assertAllCalled()
@unittest.mock.patch('subprocess.check_call')
@unittest.mock.patch('os.uname')
def test_030_transfer_pass_file(self, mock_uname, mock_check_call):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
mock_uname.return_value = ('Linux', 'dom0', '5.0.0', '#1', 'x86_64')
args = unittest.mock.Mock(appvm='testvm')
obj = RestoreInDisposableVM(self.app, args)
obj.dispvm = unittest.mock.Mock(default_user='user2')
new_path = obj.transfer_pass_file('/some/path')
self.assertEqual(new_path, '/home/user2/QubesIncoming/dom0/path')
mock_check_call.assert_called_once_with(
['qvm-copy-to-vm', 'disp-backup-restore', '/some/path'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
self.assertAllCalled()
def test_040_register_backup_source(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'backup-storage class=AppVM state=Running\n'
)
self.app.expected_service_calls[
('backup-storage', 'qubes.RegisterBackupLocation')] = \
b'someid\nsomething that should not be read'
self.app.expected_calls[
('backup-storage', 'admin.vm.tag.Set', 'backup-restore-storage',
None)] = b'0\0'
args = unittest.mock.Mock(backup_location='/backup/path',
appvm='backup-storage')
obj = RestoreInDisposableVM(self.app, args)
obj.dispvm = unittest.mock.Mock(default_user='user2')
obj.register_backup_source()
self.assertEqual(obj.storage_access_id, 'someid')
self.assertEqual(self.app.service_calls, [
('backup-storage', 'qubes.RegisterBackupLocation',
{'stdin':subprocess.PIPE, 'stdout':subprocess.PIPE}),
('backup-storage', 'qubes.RegisterBackupLocation', b'/backup/path\n'),
])
self.assertAllCalled()
def test_050_invalidate_backup_access(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'backup-storage class=AppVM state=Running\n'
)
self.app.expected_calls[
('backup-storage', 'admin.vm.tag.Remove', 'backup-restore-storage',
None)] = b'0\0'
args = unittest.mock.Mock(backup_location='/backup/path',
appvm='backup-storage')
obj = RestoreInDisposableVM(self.app, args)
obj.storage_access_proc = unittest.mock.Mock()
obj.invalidate_backup_access()
self.assertEqual(obj.storage_access_proc.mock_calls, [
call.stdin.close(),
call.wait(),
])
self.assertAllCalled()
@unittest.mock.patch('datetime.date')
def test_060_finalize_tags(self, mock_date):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'disp-backup-restore class=DispVM state=Running\n'
b'restored1 class=AppVM state=Halted\n'
b'restored2 class=AppVM state=Halted\n'
)
self.app.expected_calls[
('dom0', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x000'
self.app.expected_calls[
('fedora-25', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x000'
self.app.expected_calls[
('disp-backup-restore', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x000'
self.app.expected_calls[
('restored1', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x001'
self.app.expected_calls[
('restored1', 'admin.vm.tag.List', None, None)] = \
b'0\0backup-restore-in-progress\n' \
b'restored-from-backup-12345678\n' \
b'created-by-disp-backup-restore\n'
self.app.expected_calls[
('restored1', 'admin.vm.tag.Remove', 'backup-restore-in-progress',
None)] = b'0\0'
self.app.expected_calls[
('restored2', 'admin.vm.tag.Get', 'backup-restore-in-progress',
None)] = b'0\x001'
self.app.expected_calls[
('restored2', 'admin.vm.tag.List', None, None)] = \
b'0\0backup-restore-in-progress\n' \
b'created-by-disp-backup-restore\n'
self.app.expected_calls[
('restored2', 'admin.vm.tag.Set',
'restored-from-backup-at-2019-10-01',
None)] = b'0\0'
self.app.expected_calls[
('restored2', 'admin.vm.tag.Remove', 'backup-restore-in-progress',
None)] = b'0\0'
mock_date.today.return_value = datetime.date.fromisoformat('2019-10-01')
mock_date.strftime.return_value = '2019-10-01'
args = unittest.mock.Mock(backup_location='/backup/path',
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
obj.finalize_tags()
self.assertAllCalled()
def test_070_sanitize_log(self):
sanitized = RestoreInDisposableVM.sanitize_log(b'sample message')
self.assertEqual(sanitized, b'sample message')
sanitized = RestoreInDisposableVM.sanitize_log(
b'sample message\nmultiline\n')
self.assertEqual(sanitized, b'sample message\nmultiline\n')
sanitized = RestoreInDisposableVM.sanitize_log(
b'\033[0;33m\xff\xfe\x80')
self.assertEqual(sanitized, b'.[0;33m...')
def test_080_extract_log(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
obj.dispvm = unittest.mock.Mock()
obj.dispvm.run_with_args.return_value = b'this is a log', None
backup_log = obj.extract_log()
obj.dispvm.run_with_args.assert_called_once_with(
'cat', '/var/tmp/backup-restore.log',
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
self.assertEqual(backup_log, b'this is a log')
def test_100_run(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
pass_file=None,
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
'prepare_inner_args', 'extract_log', 'finalize_tags']
for m in methods:
setattr(obj, m, unittest.mock.Mock())
obj.extract_log.return_value = b'Some logs\nexit code: 0\n'
obj.transfer_pass_file = unittest.mock.Mock()
obj.prepare_inner_args.return_value = ['args']
obj.terminal_app = ('terminal',)
obj.dispvm = unittest.mock.Mock()
with tempfile.NamedTemporaryFile() as tmp:
with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
tmp.name):
obj.run()
for m in methods:
self.assertEqual(len(getattr(obj, m).mock_calls), 1)
self.assertEqual(obj.dispvm.mock_calls, [
call.start(),
call.run_service_for_stdio('qubes.WaitForSession'),
call.tags.add('backup-restore-mgmt'),
call.run_with_args('terminal', 'qvm-backup-restore', 'args',
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL),
call.tags.discard('backup-restore-mgmt'),
call.kill()
])
obj.transfer_pass_file.assert_not_called()
def test_101_run_pass_file(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
pass_file='/some/path',
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
'prepare_inner_args', 'extract_log', 'finalize_tags',
'transfer_pass_file']
for m in methods:
setattr(obj, m, unittest.mock.Mock())
obj.extract_log.return_value = b'Some logs\nexit code: 0\n'
obj.prepare_inner_args.return_value = ['args']
obj.terminal_app = ('terminal',)
obj.dispvm = unittest.mock.Mock()
with tempfile.NamedTemporaryFile() as tmp:
with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
tmp.name):
obj.run()
for m in methods:
self.assertEqual(len(getattr(obj, m).mock_calls), 1)
self.assertEqual(obj.dispvm.mock_calls, [
call.start(),
call.run_service_for_stdio('qubes.WaitForSession'),
call.tags.add('backup-restore-mgmt'),
call.run_with_args('terminal', 'qvm-backup-restore', 'args',
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL),
call.tags.discard('backup-restore-mgmt'),
call.kill()
])
def test_102_run_error(self):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
)
args = unittest.mock.Mock(backup_location='/backup/path',
pass_file=None,
appvm=None)
obj = RestoreInDisposableVM(self.app, args)
methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
'prepare_inner_args', 'extract_log', 'finalize_tags']
for m in methods:
setattr(obj, m, unittest.mock.Mock())
obj.extract_log.return_value = b'Some error\nexit code: 1\n'
obj.transfer_pass_file = unittest.mock.Mock()
obj.prepare_inner_args.return_value = ['args']
obj.terminal_app = ('terminal',)
obj.dispvm = unittest.mock.Mock()
with tempfile.NamedTemporaryFile() as tmp:
with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
tmp.name):
with self.assertRaises(qubesadmin.exc.BackupRestoreError):
obj.run()
for m in methods:
self.assertEqual(len(getattr(obj, m).mock_calls), 1)
self.assertEqual(obj.dispvm.mock_calls, [
call.start(),
call.run_service_for_stdio('qubes.WaitForSession'),
call.tags.add('backup-restore-mgmt'),
call.run_with_args('terminal', 'qvm-backup-restore', 'args',
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL),
call.tags.discard('backup-restore-mgmt'),
call.kill()
])
obj.transfer_pass_file.assert_not_called()

View File

@ -176,7 +176,6 @@ class TC_02_DstHost(qubesadmin.tests.QubesTestCase):
with self.assertRaises(ValueError):
qubesadmin.firewall.DstHost('2001:abcd:efab::3/64')
@unittest.expectedFailure
def test_020_invalid_hostname(self):
with self.assertRaises(ValueError):
qubesadmin.firewall.DstHost('www qubes-os.org')

View File

@ -22,6 +22,7 @@ import os
import unittest.mock as mock
import asyncio
import asynctest
import qubesadmin.tests
import qubesadmin.tests.tools
@ -177,9 +178,11 @@ class TC_00_qvm_backup(qubesadmin.tests.QubesTestCase):
None)] = \
b'0\0'
try:
mock_events = asynctest.CoroutineMock()
patch = mock.patch(
'qubesadmin.events.EventsDispatcher._get_events_reader')
mock_events = patch.start()
'qubesadmin.events.EventsDispatcher._get_events_reader',
mock_events)
patch.start()
self.addCleanup(patch.stop)
mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([
b'1\0\0connection-established\0\0',

View File

@ -17,12 +17,15 @@
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import itertools
import qubesadmin.tests
import qubesadmin.tests.tools
import qubesadmin.tools.qvm_backup_restore
from unittest import mock
from qubesadmin.backup import BackupVM
from qubesadmin.backup.restore import BackupRestore
from qubesadmin.backup.dispvm import RestoreInDisposableVM
class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
@ -59,7 +62,7 @@ class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
self.app, mock.ANY, mock_restore_info)
mock_backup.assert_called_once_with(
self.app, '/some/path', None, 'testpass',
force_compression_filter=None)
force_compression_filter=None, location_is_service=False)
self.assertAllCalled()
@mock.patch('qubesadmin.tools.qvm_backup_restore.input', create=True)
@ -94,7 +97,7 @@ class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
app=self.app)
mock_backup.assert_called_once_with(
self.app, '/some/path', None, 'testpass',
force_compression_filter=None)
force_compression_filter=None, location_is_service=False)
self.assertEqual(mock_backup.return_value.options.exclude, ['test-vm2'])
self.assertAllCalled()
@ -231,3 +234,14 @@ class TC_00_qvm_backup_restore(qubesadmin.tests.QubesTestCase):
qubesadmin.tools.qvm_backup_restore.handle_broken(
self.app, mock_args, mock_restore_info)
self.assertAppropriateLogging('NetVM', 'error')
def test_100_restore_in_dispvm_parser(self):
"""Verify if qvm-backup-restore tool options matches un-parser
for paranoid restore mode"""
parser = qubesadmin.tools.qvm_backup_restore.parser
actions = parser._get_optional_actions()
options_tool = set(itertools.chain(*(a.option_strings for a in actions)))
options_parser = set(itertools.chain(
*(o.opts for o in RestoreInDisposableVM.arguments.values())))
self.assertEqual(options_tool, options_parser)

View File

@ -45,9 +45,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--no-gui', 'test-vm', 'command'],
app=self.app)
@ -110,6 +110,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
@ -139,9 +142,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
with unittest.mock.patch('sys.stdin', echo.stdout):
ret = qubesadmin.tools.qvm_run.main(
@ -276,9 +279,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
mock_popen.return_value.wait.return_value = 0
ret = qubesadmin.tools.qvm_run.main(
['--no-gui', '--pass-io', '--localcmd', 'local-command',
@ -309,9 +312,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['test-vm', 'command'],
app=self.app)
@ -339,9 +342,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
b'0\x00default=yes type=str user'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--service', 'test-vm', 'service.name'],
app=self.app)
@ -363,6 +366,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.assertAllCalled()
def test_008_dispvm_remote(self):
self.app.expected_calls[
('$dispvm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--dispvm', '--service', 'test.service'], app=self.app)
self.assertEqual(ret, 0)
@ -377,6 +383,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.assertAllCalled()
def test_009_dispvm_remote_specific(self):
self.app.expected_calls[
('$dispvm:test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--dispvm=test-vm', '--service', 'test.service'], app=self.app)
self.assertEqual(ret, 0)
@ -400,6 +409,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('disp123', 'admin.vm.property.Get', 'qrexec_timeout', None)] = \
b'0\0default=yes type=int 30'
self.app.expected_calls[
('$dispvm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--dispvm', '--service', 'test.service'], app=self.app)
self.assertEqual(ret, 0)
@ -424,6 +436,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('disp123', 'admin.vm.property.Get', 'qrexec_timeout', None)] = \
b'0\0default=yes type=int 30'
self.app.expected_calls[
('$dispvm:test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--dispvm=test-vm', '--service', 'test.service'], app=self.app)
self.assertEqual(ret, 0)
@ -496,6 +511,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('disp123', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
self.app.expected_calls[
('$dispvm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--dispvm', '--', 'test.command'], app=self.app)
self.assertEqual(ret, 0)
@ -524,6 +542,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('disp123', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
self.app.expected_calls[
('$dispvm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--dispvm', '--no-gui', 'test.command'], app=self.app)
self.assertEqual(ret, 0)
@ -545,9 +566,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'0\x00Windows'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--no-gui', 'test-vm', 'command'],
app=self.app)
@ -572,9 +593,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'vmexec', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'vmexec\' not set\x00'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--no-gui', 'test-vm', 'command', 'arg'],
app=self.app)
@ -597,9 +618,9 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
('test-vm', 'admin.vm.feature.CheckWithTemplate',
'vmexec', None)] = \
b'0\x001'
# self.app.expected_calls[
# ('test-vm', 'admin.vm.List', None, None)] = \
# b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
ret = qubesadmin.tools.qvm_run.main(
['--no-gui', 'test-vm', 'command', 'arg'],
app=self.app)
@ -613,3 +634,30 @@ class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
('test-vm', 'qubes.VMExec+command+arg', b'')
])
self.assertAllCalled()
def test_021_paused_vm(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Paused\n'
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate', 'os', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature \'os\' not set\x00'
self.app.expected_calls[
('test-vm', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Paused'
self.app.expected_calls[
('test-vm', 'admin.vm.Unpause', None, None)] = \
b'0\x00'
ret = qubesadmin.tools.qvm_run.main(
['--no-gui', 'test-vm', 'command'],
app=self.app)
self.assertEqual(ret, 0)
self.assertEqual(self.app.service_calls, [
('test-vm', 'qubes.VMShell', {
'stdout': subprocess.DEVNULL,
'stderr': subprocess.DEVNULL,
'user': None,
}),
('test-vm', 'qubes.VMShell', b'command; exit\n')
])
self.assertAllCalled()

View File

@ -18,7 +18,9 @@
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import asyncio
import asynctest
import unittest.mock
import qubesadmin.tests
import qubesadmin.tests.tools
import qubesadmin.tools.qvm_shutdown
@ -85,9 +87,11 @@ class TC_00_qvm_shutdown(qubesadmin.tests.QubesTestCase):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_events = asynctest.CoroutineMock()
patch = unittest.mock.patch(
'qubesadmin.events.EventsDispatcher._get_events_reader')
mock_events = patch.start()
'qubesadmin.events.EventsDispatcher._get_events_reader',
mock_events)
patch.start()
self.addCleanup(patch.stop)
mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([
b'1\0\0connection-established\0\0',
@ -114,9 +118,11 @@ class TC_00_qvm_shutdown(qubesadmin.tests.QubesTestCase):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_events = asynctest.CoroutineMock()
patch = unittest.mock.patch(
'qubesadmin.events.EventsDispatcher._get_events_reader')
mock_events = patch.start()
'qubesadmin.events.EventsDispatcher._get_events_reader',
mock_events)
patch.start()
self.addCleanup(patch.stop)
mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([
b'1\0\0connection-established\0\0',
@ -159,9 +165,11 @@ class TC_00_qvm_shutdown(qubesadmin.tests.QubesTestCase):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
mock_events = asynctest.CoroutineMock()
patch = unittest.mock.patch(
'qubesadmin.events.EventsDispatcher._get_events_reader')
mock_events = patch.start()
'qubesadmin.events.EventsDispatcher._get_events_reader',
mock_events)
patch.start()
self.addCleanup(patch.stop)
mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([
b'1\0\0connection-established\0\0',

View File

@ -22,11 +22,14 @@ import os
import signal
import tempfile
import unittest.mock
import re
import asyncio
import asynctest
import qubesadmin.tests
import qubesadmin.tools.qvm_start_daemon
from qubesadmin.tools.qvm_start_daemon import GUI_DAEMON_OPTIONS
import qubesadmin.vm
@ -53,6 +56,7 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
]
args = self.launcher.kde_guid_args(self.app.domains['test-vm'])
self.launcher.kde = True
self.assertEqual(args, ['-T', '-p',
'_KDE_NET_WM_COLOR_SCHEME=s:' +
os.path.expanduser(
@ -60,29 +64,20 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
self.assertAllCalled()
@unittest.mock.patch('subprocess.check_output')
def test_001_kde_args_none(self, proc_mock):
def setup_common_args(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
proc_mock.side_effect = [b'']
args = self.launcher.kde_guid_args(self.app.domains['test-vm'])
self.assertEqual(args, [])
self.assertAllCalled()
def test_010_common_args(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
b'0\x00test-vm class=AppVM state=Running\n' \
b'gui-vm class=AppVM state=Running'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'label', None)] = \
b'0\x00default=False type=label red'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'debug', None)] = \
b'0\x00default=False type=bool False'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'guivm', None)] = \
b'0\x00default=False type=vm gui-vm'
self.app.expected_calls[
('dom0', 'admin.label.Get', 'red', None)] = \
b'0\x000xff0000'
@ -94,88 +89,126 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
'rpc-clipboard', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
with unittest.mock.patch.object(self.launcher, 'kde_guid_args') as \
kde_mock:
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'xid', None)] = \
b'0\x00default=99 type=int 99'
for name, _kind in GUI_DAEMON_OPTIONS:
self.app.expected_calls[
('test-vm', 'admin.vm.feature.Get',
'gui-' + name.replace('_', '-'), None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
self.app.expected_calls[
('gui-vm', 'admin.vm.feature.Get',
'gui-default-' + name.replace('_', '-'), None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
def run_common_args(self):
with unittest.mock.patch.object(
self.launcher, 'kde_guid_args') as kde_mock, \
unittest.mock.patch.object(
self.launcher, 'write_guid_config') as write_config_mock:
kde_mock.return_value = []
args = self.launcher.common_guid_args(self.app.domains['test-vm'])
self.assertEqual(args, [
'/usr/bin/qubes-guid', '-N', 'test-vm',
'-c', '0xff0000',
'-i', '/usr/share/icons/hicolor/128x128/devices/appvm-red.png',
'-l', '1', '-q'])
self.assertEqual(len(write_config_mock.mock_calls), 1)
config_args = write_config_mock.mock_calls[0][1]
self.assertEqual(config_args[0], '/var/run/qubes/guid-conf.99')
config = config_args[1]
# Strip comments and empty lines
config = re.sub(r'^#.*\n', '', config)
config = re.sub(r'^\n', '', config)
self.assertAllCalled()
return args, config
def test_010_common_args(self):
self.setup_common_args()
args, config = self.run_common_args()
self.assertEqual(args, [
'/usr/bin/qubes-guid', '-N', 'test-vm',
'-c', '0xff0000',
'-i', '/usr/share/icons/hicolor/128x128/devices/appvm-red.png',
'-l', '1', '-q',
'-C', '/var/run/qubes/guid-conf.99',
])
self.assertEqual(config, '''\
global: {
}
''')
def test_011_common_args_debug(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'label', None)] = \
b'0\x00default=False type=label red'
self.setup_common_args()
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'debug', None)] = \
b'0\x00default=False type=bool True'
self.app.expected_calls[
('dom0', 'admin.label.Get', 'red', None)] = \
b'0\x000xff0000'
self.app.expected_calls[
('dom0', 'admin.label.Index', 'red', None)] = \
b'0\x001'
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate',
'rpc-clipboard', None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
with unittest.mock.patch.object(self.launcher, 'kde_guid_args') as \
kde_mock:
kde_mock.return_value = []
args = self.launcher.common_guid_args(self.app.domains['test-vm'])
self.assertEqual(args, [
'/usr/bin/qubes-guid', '-N', 'test-vm',
'-c', '0xff0000',
'-i', '/usr/share/icons/hicolor/128x128/devices/appvm-red.png',
'-l', '1', '-v', '-v'])
self.assertAllCalled()
args, config = self.run_common_args()
self.assertEqual(args, [
'/usr/bin/qubes-guid', '-N', 'test-vm',
'-c', '0xff0000',
'-i', '/usr/share/icons/hicolor/128x128/devices/appvm-red.png',
'-l', '1', '-v', '-v',
'-C', '/var/run/qubes/guid-conf.99',
])
self.assertEqual(config, '''\
global: {
}
''')
def test_012_common_args_rpc_clipboard(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'label', None)] = \
b'0\x00default=False type=label red'
self.app.expected_calls[
('test-vm', 'admin.vm.property.Get', 'debug', None)] = \
b'0\x00default=False type=bool False'
self.app.expected_calls[
('dom0', 'admin.label.Get', 'red', None)] = \
b'0\x000xff0000'
self.app.expected_calls[
('dom0', 'admin.label.Index', 'red', None)] = \
b'0\x001'
self.setup_common_args()
self.app.expected_calls[
('test-vm', 'admin.vm.feature.CheckWithTemplate',
'rpc-clipboard', None)] = \
b'0\x001'
with unittest.mock.patch.object(self.launcher, 'kde_guid_args') as \
kde_mock:
kde_mock.return_value = []
args, config = self.run_common_args()
args = self.launcher.common_guid_args(self.app.domains['test-vm'])
self.assertEqual(args, [
'/usr/bin/qubes-guid', '-N', 'test-vm',
'-c', '0xff0000',
'-i', '/usr/share/icons/hicolor/128x128/devices/appvm-red.png',
'-l', '1', '-q', '-Q'])
self.assertEqual(args, [
'/usr/bin/qubes-guid', '-N', 'test-vm',
'-c', '0xff0000',
'-i', '/usr/share/icons/hicolor/128x128/devices/appvm-red.png',
'-l', '1', '-q', '-Q',
'-C', '/var/run/qubes/guid-conf.99',
])
self.assertEqual(config, '''\
global: {
}
''')
self.assertAllCalled()
def test_013_common_args_guid_config(self):
self.setup_common_args()
@unittest.mock.patch('asyncio.create_subprocess_exec')
self.app.expected_calls[
('test-vm', 'admin.vm.feature.Get',
'gui-allow-fullscreen', None)] = \
b'0\x001'
# The template will not be asked for this feature
del self.app.expected_calls[
('gui-vm', 'admin.vm.feature.Get',
'gui-default-allow-fullscreen', None)]
self.app.expected_calls[
('gui-vm', 'admin.vm.feature.Get',
'gui-default-secure-copy-sequence', None)] = \
b'0\x00Ctrl-Alt-Shift-c'
_args, config = self.run_common_args()
self.assertEqual(config, '''\
global: {
allow_fullscreen = true;
secure_copy_sequence = "Ctrl-Alt-Shift-c";
}
''')
@asynctest.patch('asyncio.create_subprocess_exec')
def test_020_start_gui_for_vm(self, proc_mock):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -206,7 +239,7 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
self.assertAllCalled()
@unittest.mock.patch('asyncio.create_subprocess_exec')
@asynctest.patch('asyncio.create_subprocess_exec')
def test_021_start_gui_for_vm_hvm(self, proc_mock):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@ -275,7 +308,7 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
pidfile.flush()
self.addCleanup(pidfile.close)
patch_proc = unittest.mock.patch('asyncio.create_subprocess_exec')
patch_proc = asynctest.patch('asyncio.create_subprocess_exec')
patch_args = unittest.mock.patch.object(self.launcher,
'common_guid_args',
lambda vm: [])
@ -318,7 +351,7 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
None)] = \
b'2\x00QubesFeatureNotFoundError\x00\x00Feature not set\x00'
proc_mock = unittest.mock.Mock()
with unittest.mock.patch('asyncio.create_subprocess_exec',
with asynctest.patch('asyncio.create_subprocess_exec',
lambda *args: self.mock_coroutine(proc_mock,
*args)):
with unittest.mock.patch.object(self.launcher,
@ -352,7 +385,7 @@ class TC_00_qvm_start_gui(qubesadmin.tests.QubesTestCase):
None)] = \
b'0\x001'
proc_mock = unittest.mock.Mock()
with unittest.mock.patch('asyncio.create_subprocess_exec',
with asynctest.patch('asyncio.create_subprocess_exec',
lambda *args: self.mock_coroutine(proc_mock,
*args)):
with unittest.mock.patch.object(self.launcher,

View File

@ -168,8 +168,8 @@ class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
self.app.expected_calls[
('test-vm', 'admin.vm.volume.List', None, None)] = \
b'0\0root\nprivate\nvolatile\nkernel\n'
self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'private',
b'')] = b'0\0'
self.app.expected_calls[('test-vm', 'admin.vm.volume.Clear', 'private',
None)] = b'0\0'
vm = self.app.domains['test-vm']
qubesadmin.tools.qvm_template_postprocess.reset_private_img(vm)

View File

@ -116,8 +116,8 @@ class TestVMUsage(qubesadmin.tests.QubesTestCase):
class TestVMExecEncode(qubesadmin.tests.QubesTestCase):
def test_00_encode(self):
self.assertEqual(
qubesadmin.utils.encode_for_vmexec(['ls', '-a']),
'ls+--a')
qubesadmin.utils.encode_for_vmexec(['ls', '-a', '+x']),
'ls+--a+-2Bx')
self.assertEqual(
qubesadmin.utils.encode_for_vmexec(
['touch', '/home/user/.profile']),

View File

@ -62,7 +62,7 @@ class PropertyAction(argparse.Action):
metavar='NAME=VALUE',
required=False,
help='set property to a value'):
super(PropertyAction, self).__init__(option_strings, 'properties',
super().__init__(option_strings, 'properties',
metavar=metavar, default={}, help=help)
def __call__(self, parser, namespace, values, option_string=None):
@ -99,7 +99,7 @@ class SinglePropertyAction(argparse.Action):
if const is not None:
nargs = 0
super(SinglePropertyAction, self).__init__(option_strings, 'properties',
super().__init__(option_strings, 'properties',
metavar=metavar, help=help, default={}, const=const,
nargs=nargs)
@ -141,7 +141,7 @@ class VmNameAction(QubesAction):
nargs, "Passed unexpected value {!s} as {!s} nargs ".format(
nargs, dest))
super(VmNameAction, self).__init__(option_strings, dest=dest, help=help,
super().__init__(option_strings, dest=dest, help=help,
nargs=nargs, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
@ -200,11 +200,11 @@ class RunningVmNameAction(VmNameAction):
raise argparse.ArgumentError(
nargs, "Passed unexpected value {!s} as {!s} nargs ".format(
nargs, dest))
super(RunningVmNameAction, self).__init__(
super().__init__(
option_strings, dest=dest, help=help, nargs=nargs, **kwargs)
def parse_qubes_app(self, parser, namespace):
super(RunningVmNameAction, self).parse_qubes_app(parser, namespace)
super().parse_qubes_app(parser, namespace)
for vm in namespace.domains:
if not vm.is_running():
parser.error_runtime("domain {!r} is not running".format(
@ -220,7 +220,7 @@ class VolumeAction(QubesAction):
def __init__(self, help='A pool & volume id combination',
required=True, **kwargs):
# pylint: disable=redefined-builtin
super(VolumeAction, self).__init__(help=help, required=required,
super().__init__(help=help, required=required,
**kwargs)
def __call__(self, parser, namespace, values, option_string=None):
@ -261,7 +261,7 @@ class VMVolumeAction(QubesAction):
def __init__(self, help='A pool & volume id combination',
required=True, **kwargs):
# pylint: disable=redefined-builtin
super(VMVolumeAction, self).__init__(help=help, required=required,
super().__init__(help=help, required=required,
**kwargs)
def __call__(self, parser, namespace, values, option_string=None):
@ -322,9 +322,6 @@ class PoolsAction(QubesAction):
class QubesArgumentParser(argparse.ArgumentParser):
'''Parser preconfigured for use in most of the Qubes command-line tools.
:param bool want_app: instantiate :py:class:`qubes.Qubes` object
:param bool want_app_no_instance: don't actually instantiate \
:py:class:`qubes.Qubes` object, just add argument for custom xml file
:param mixed vmname_nargs: The number of ``VMNAME`` arguments that should be
consumed. Values include:
* N (an integer) consumes N arguments (and produces a list)
@ -340,20 +337,11 @@ class QubesArgumentParser(argparse.ArgumentParser):
``--verbose`` and ``--quiet``
'''
def __init__(self, want_app=True, want_app_no_instance=False,
vmname_nargs=None, **kwargs):
def __init__(self, vmname_nargs=None, **kwargs):
super(QubesArgumentParser, self).__init__(add_help=False, **kwargs)
super().__init__(add_help=False, **kwargs)
self._want_app = want_app
self._want_app_no_instance = want_app_no_instance
self._vmname_nargs = vmname_nargs
if self._want_app:
self.add_argument('--qubesxml', metavar='FILE', action='store',
dest='app', help=argparse.SUPPRESS)
self.add_argument('--offline-mode', action='store_true',
default=None, dest='offline_mode', help=argparse.SUPPRESS)
self.add_argument('--verbose', '-v', action='count',
help='increase verbosity')
@ -382,14 +370,13 @@ class QubesArgumentParser(argparse.ArgumentParser):
# pylint: disable=arguments-differ,signature-differs
# hack for tests
app = kwargs.pop('app', None)
namespace = super(QubesArgumentParser, self).parse_args(*args, **kwargs)
namespace = super().parse_args(*args, **kwargs)
if self._want_app and not self._want_app_no_instance:
self.set_qubes_verbosity(namespace)
if app is not None:
namespace.app = app
else:
namespace.app = qubesadmin.Qubes()
self.set_qubes_verbosity(namespace)
if app is not None:
namespace.app = app
else:
namespace.app = qubesadmin.Qubes()
for action in self._actions:
# pylint: disable=protected-access
@ -485,8 +472,7 @@ class AliasedSubParsersAction(argparse._SubParsersAction):
dest = name
if aliases:
dest += ' (%s)' % ','.join(aliases)
super(AliasedSubParsersAction._AliasedPseudoAction, self).\
__init__(option_strings=[], dest=dest, help=help)
super().__init__(option_strings=[], dest=dest, help=help)
def __call__(self, parser, namespace, values, option_string=None):
pass
@ -498,8 +484,7 @@ class AliasedSubParsersAction(argparse._SubParsersAction):
else:
aliases = []
local_parser = super(AliasedSubParsersAction, self).add_parser(
name, **kwargs)
local_parser = super().add_parser(name, **kwargs)
# Make the aliases work.
for alias in aliases:
@ -545,7 +530,7 @@ class VmNameGroup(argparse._MutuallyExclusiveGroup):
def __init__(self, container, required, vm_action=VmNameAction, help=None):
# pylint: disable=redefined-builtin
super(VmNameGroup, self).__init__(container, required=required)
super().__init__(container, required=required)
if not help:
help = 'perform the action on all qubes'
self.add_argument('--all', action='store_true', dest='all_domains',

View File

@ -21,15 +21,21 @@
'''Console frontend for backup restore code'''
import getpass
import os
import sys
from qubesadmin.backup.restore import BackupRestore
from qubesadmin.backup.dispvm import RestoreInDisposableVM
import qubesadmin.exc
import qubesadmin.tools
import qubesadmin.utils
parser = qubesadmin.tools.QubesArgumentParser()
# WARNING:
# When adding options, update/verify also
# qubeadmin.restore.dispvm.RestoreInDisposableVM.arguments
#
parser.add_argument("--verify-only", action="store_true",
dest="verify_only", default=False,
help="Verify backup integrity without restoring any "
@ -84,6 +90,18 @@ parser.add_argument("-p", "--passphrase-file", action="store",
dest="pass_file", default=None,
help="Read passphrase from file, or use '-' to read from stdin")
parser.add_argument('--auto-close', action="store_true",
help="Auto-close restore window and display log on the stdout "
"(applies to --paranoid-mode)")
parser.add_argument("--location-is-service", action="store_true",
help="Interpret backup location as a qrexec service name,"
"possibly with an argument separated by +.Requires -d option.")
parser.add_argument('--paranoid-mode', '--plan-b', action="store_true",
help="Isolate restore process in a DispVM, defend against untrusted backup;"
"implies --skip-dom0-home")
parser.add_argument('backup_location', action='store',
help="Backup directory name, or command to pipe from")
@ -193,6 +211,18 @@ def handle_broken(app, args, restore_info):
"files should be copied or moved out of the new "
"directory before using them.")
def print_backup_log(backup_log):
"""Print a log on stdout, coloring it red if it's a terminal"""
if os.isatty(sys.stdout.fileno()):
sys.stdout.write('\033[0;31m')
sys.stdout.flush()
sys.stdout.buffer.write(backup_log)
if os.isatty(sys.stdout.fileno()):
sys.stdout.write('\033[0m')
sys.stdout.flush()
def main(args=None, app=None):
'''Main function of qvm-backup-restore'''
# pylint: disable=too-many-return-statements
@ -205,6 +235,29 @@ def main(args=None, app=None):
except KeyError:
parser.error('no such domain: {!r}'.format(args.appvm))
if args.location_is_service and not args.appvm:
parser.error('--location-is-service option requires -d')
if args.paranoid_mode:
args.dom0_home = False
args.app.log.info("Starting restore process in a DisposableVM...")
args.app.log.info("When operation completes, close its window "
"manually.")
restore_in_dispvm = RestoreInDisposableVM(args.app, args)
try:
backup_log = restore_in_dispvm.run()
if args.auto_close:
print_backup_log(backup_log)
except qubesadmin.exc.BackupRestoreError as e:
if e.backup_log is not None:
print_backup_log(e.backup_log)
parser.error_runtime(str(e))
return 1
except qubesadmin.exc.QubesException as e:
parser.error_runtime(str(e))
return 1
return
if args.pass_file is not None:
pass_f = open(args.pass_file) if args.pass_file != "-" else sys.stdin
passphrase = pass_f.readline().rstrip()
@ -218,7 +271,7 @@ def main(args=None, app=None):
try:
backup = BackupRestore(args.app, args.backup_location,
appvm, passphrase,
appvm, passphrase, location_is_service=args.location_is_service,
force_compression_filter=args.compression)
except qubesadmin.exc.QubesException as e:
parser.error_runtime(str(e))

View File

@ -163,7 +163,7 @@ class DeviceAction(qubesadmin.tools.QubesAction):
required=True, allow_unknown=False, **kwargs):
# pylint: disable=redefined-builtin
self.allow_unknown = allow_unknown
super(DeviceAction, self).__init__(help=help, required=required,
super().__init__(help=help, required=required,
**kwargs)
def __call__(self, parser, namespace, values, option_string=None):
@ -207,8 +207,7 @@ def get_parser(device_class=None):
"""Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-block`.
"""
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True)
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register('action', 'parsers',
qubesadmin.tools.AliasedSubParsersAction)
parser.allow_abbrev = False

View File

@ -141,9 +141,7 @@ class PropertyColumn(Column):
def __init__(self, name):
ls_head = name.replace('_', '-').upper()
super(PropertyColumn, self).__init__(
head=ls_head,
attr=name)
super().__init__(head=ls_head, attr=name)
def __repr__(self):
return '{}(head={!r}'.format(
@ -201,9 +199,7 @@ class FlagsColumn(Column):
# pylint: disable=no-self-use
def __init__(self):
super(FlagsColumn, self).__init__(
head='FLAGS',
doc=self.__class__.__doc__)
super().__init__(head='FLAGS', doc=self.__class__.__doc__)
@flag(1)
@ -505,7 +501,7 @@ class _HelpColumnsAction(argparse.Action):
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help='list all available columns with short descriptions and exit'):
super(_HelpColumnsAction, self).__init__(
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,
@ -536,7 +532,7 @@ class _HelpFormatsAction(argparse.Action):
dest=argparse.SUPPRESS,
default=argparse.SUPPRESS,
help='list all available formats with their definitions and exit'):
super(_HelpFormatsAction, self).__init__(
super().__init__(
option_strings=option_strings,
dest=dest,
default=default,

View File

@ -156,8 +156,7 @@ def get_parser():
''' Creates :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-pool`.
'''
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True)
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register('action', 'parsers',
qubesadmin.tools.AliasedSubParsersAction)

View File

@ -36,11 +36,11 @@ class _Info(qubesadmin.tools.PoolsAction):
def __init__(self, option_strings, help='print pool info and exit',
**kwargs):
# pylint: disable=redefined-builtin
super(_Info, self).__init__(option_strings, help=help, **kwargs)
super().__init__(option_strings, help=help, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, 'command', 'info')
super(_Info, self).__call__(parser, namespace, values, option_string)
super().__call__(parser, namespace, values, option_string)
def pool_info(pool):
@ -62,11 +62,11 @@ class _Remove(argparse.Action):
''' Action for argument parser that removes a pool '''
def __init__(self, option_strings, dest=None, default=None, metavar=None):
super(_Remove, self).__init__(option_strings=option_strings,
dest=dest,
metavar=metavar,
default=default,
help='remove pool')
super().__init__(option_strings=option_strings,
dest=dest,
metavar=metavar,
default=default,
help='remove pool')
def __call__(self, parser, namespace, name, option_string=None):
setattr(namespace, 'command', 'remove')
@ -77,12 +77,12 @@ class _Add(argparse.Action):
''' Action for argument parser that adds a pool. '''
def __init__(self, option_strings, dest=None, default=None, metavar=None):
super(_Add, self).__init__(option_strings=option_strings,
dest=dest,
metavar=metavar,
default=default,
nargs=2,
help='add pool')
super().__init__(option_strings=option_strings,
dest=dest,
metavar=metavar,
default=default,
nargs=2,
help='add pool')
def __call__(self, parser, namespace, values, option_string=None):
name, driver = values
@ -95,23 +95,23 @@ class _Set(qubesadmin.tools.PoolsAction):
''' Action for argument parser that sets pool options. '''
def __init__(self, option_strings, dest=None, default=None, metavar=None):
super(_Set, self).__init__(option_strings=option_strings,
dest=dest,
metavar=metavar,
default=default,
help='modify pool (use -o to specify '
'modifications)')
super().__init__(option_strings=option_strings,
dest=dest,
metavar=metavar,
default=default,
help='modify pool (use -o to specify '
'modifications)')
def __call__(self, parser, namespace, name, option_string=None):
setattr(namespace, 'command', 'set')
super(_Set, self).__call__(parser, namespace, name, option_string)
super().__call__(parser, namespace, name, option_string)
class _Options(argparse.Action):
''' Action for argument parser that parsers options. '''
def __init__(self, option_strings, dest, default, metavar='options'):
super(_Options, self).__init__(
super().__init__(
option_strings=option_strings,
dest=dest,
metavar=metavar,

View File

@ -28,7 +28,6 @@ from qubesadmin.tools import QubesArgumentParser
import qubesadmin.utils
parser = QubesArgumentParser(description=__doc__,
want_app=True,
vmname_nargs='+')
parser.add_argument("--force", "-f", action="store_true", dest="no_confirm",
default=False, help="Do not prompt for confirmation")

View File

@ -45,7 +45,7 @@ parser.add_argument('--autostart', '--auto', '-a',
parser.add_argument('--no-autostart', '--no-auto', '-n',
action='store_false', dest='autostart',
help='do not autostart qube')
help='do not autostart/unpause qube')
parser.add_argument('--pass-io', '-p',
action='store_true', dest='passio', default=False,
@ -270,9 +270,27 @@ def main(args=None, app=None):
if not args.autostart and not vm.is_running():
if verbose > 0:
print_no_color('Qube \'{}\' not started'.format(vm.name),
file=sys.stderr, color=args.color_stderr)
file=sys.stderr, color=args.color_stderr)
retcode = max(retcode, 1)
continue
if vm.is_paused():
if not args.autostart:
if verbose > 0:
print_no_color(
'Qube \'{}\' is paused'.format(vm.name),
file=sys.stderr, color=args.color_stderr)
retcode = max(retcode, 1)
continue
try:
vm.unpause()
except qubesadmin.exc.QubesException:
if verbose > 0:
print_no_color(
'Qube \'{}\' cannot be unpaused'.format(
vm.name),
file=sys.stderr, color=args.color_stderr)
retcode = max(retcode, 1)
continue
try:
if verbose > 0:
print_no_color(

View File

@ -42,7 +42,7 @@ class DriveAction(argparse.Action):
metavar='IMAGE',
required=False,
help='Attach drive'):
super(DriveAction, self).__init__(option_strings, dest,
super().__init__(option_strings, dest,
metavar=metavar, help=help)
self.prefix = prefix

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
@ -32,23 +32,120 @@ import xcffib.xproto # pylint: disable=unused-import
import daemon.pidfile
import qubesadmin
import qubesadmin.events
import qubesadmin.exc
import qubesadmin.tools
import qubesadmin.vm
have_events = False
try:
# pylint: disable=wrong-import-position
import qubesadmin.events
have_events = True
except ImportError:
pass
from . import xcffibhelpers
GUI_DAEMON_PATH = '/usr/bin/qubes-guid'
PACAT_DAEMON_PATH = '/usr/bin/pacat-simple-vchan'
QUBES_ICON_DIR = '/usr/share/icons/hicolor/128x128/devices'
GUI_DAEMON_OPTIONS = [
('allow_fullscreen', 'bool'),
('override_redirect_protection', 'bool'),
('allow_utf8_titles', 'bool'),
('secure_copy_sequence', 'str'),
('secure_paste_sequence', 'str'),
('windows_count_limit', 'int'),
('trayicon_mode', 'str'),
('startup_timeout', 'int'),
]
def retrieve_gui_daemon_options(vm, guivm):
'''
Construct a list of GUI daemon options based on VM features.
This checks 'gui-*' features on the VM, and if they're absent,
'gui-default-*' features on the GuiVM.
'''
options = {}
for name, kind in GUI_DAEMON_OPTIONS:
feature_value = vm.features.get(
'gui-' + name.replace('_', '-'), None)
if feature_value is None:
feature_value = guivm.features.get(
'gui-default-' + name.replace('_', '-'), None)
if feature_value is None:
continue
if kind == 'bool':
value = bool(feature_value)
elif kind == 'int':
value = int(feature_value)
elif kind == 'str':
value = feature_value
else:
assert False, kind
options[name] = value
return options
def serialize_gui_daemon_options(options):
'''
Prepare configuration file content for GUI daemon. Currently uses libconfig
format.
'''
lines = [
'# Auto-generated file, do not edit!',
'',
'global: {',
]
for name, kind in GUI_DAEMON_OPTIONS:
if name in options:
value = options[name]
if kind == 'bool':
serialized = 'true' if value else 'false'
elif kind == 'int':
serialized = str(value)
elif kind == 'str':
serialized = escape_config_string(value)
else:
assert False, kind
lines.append(' {} = {};'.format(name, serialized))
lines.append('}')
lines.append('')
return '\n'.join(lines)
NON_ASCII_RE = re.compile(r'[^\x00-\x7F]')
UNPRINTABLE_CHARACTER_RE = re.compile(r'[\x00-\x1F\x7F]')
def escape_config_string(value):
'''
Convert a string to libconfig format.
Format specification:
http://www.hyperrealm.com/libconfig/libconfig_manual.html#String-Values
See dump_string() for python-libconf:
https://github.com/Grk0/python-libconf/blob/master/libconf.py
'''
assert not NON_ASCII_RE.match(value),\
'expected an ASCII string: {!r}'.format(value)
value = (
value.replace('\\', '\\\\')
.replace('"', '\\"')
.replace('\f', r'\f')
.replace('\n', r'\n')
.replace('\r', r'\r')
.replace('\t', r'\t')
)
value = UNPRINTABLE_CHARACTER_RE.sub(
lambda m: r'\x{:02x}'.format(ord(m.group(0))),
value)
return '"' + value + '"'
# "LVDS connected 1024x768+0+0 (normal left inverted right) 304mm x 228mm"
REGEX_OUTPUT = re.compile(r"""
(?x) # ignore whitespace
@ -73,6 +170,106 @@ REGEX_OUTPUT = re.compile(r"""
""")
class KeyboardLayout:
"""Class to store and parse X Keyboard layout data"""
# pylint: disable=too-few-public-methods
def __init__(self, binary_string):
split_string = binary_string.split(b'\0')
self.languages = split_string[2].decode().split(',')
self.variants = split_string[3].decode().split(',')
self.options = split_string[4].decode()
def get_property(self, layout_num):
"""Return the selected keyboard layout as formatted for keyboard_layout
property."""
return '+'.join([self.languages[layout_num],
self.variants[layout_num],
self.options])
class XWatcher:
"""Watch and react for X events related to the keyboard layout changes."""
def __init__(self, conn, app):
self.app = app
self.current_vm = self.app.domains[self.app.local_name]
self.conn = conn
self.ext = self.initialize_extension()
# get root window
self.setup = self.conn.get_setup()
self.root = self.setup.roots[0].root
# atoms (strings) of events we need to watch
# keyboard layout was switched
self.atom_xklavier = self.conn.core.InternAtom(
False, len("XKLAVIER_ALLOW_SECONDARY"),
"XKLAVIER_ALLOW_SECONDARY").reply().atom
# keyboard layout was changed
self.atom_xkb_rules = self.conn.core.InternAtom(
False, len("_XKB_RULES_NAMES"),
"_XKB_RULES_NAMES").reply().atom
self.conn.core.ChangeWindowAttributesChecked(
self.root, xcffib.xproto.CW.EventMask,
[xcffib.xproto.EventMask.PropertyChange])
self.conn.flush()
# initialize state
self.keyboard_layout = KeyboardLayout(self.get_keyboard_layout())
self.selected_layout = self.get_selected_layout()
def initialize_extension(self):
"""Initialize XKB extension (not supported by xcffib by default"""
ext = self.conn(xcffibhelpers.key)
ext.UseExtension()
return ext
def get_keyboard_layout(self):
"""Check what is current keyboard layout definition"""
property_cookie = self.conn.core.GetProperty(
False, # delete
self.root, # window
self.atom_xkb_rules,
xcffib.xproto.Atom.STRING,
0, 1000
)
prop_reply = property_cookie.reply()
return prop_reply.value.buf()
def get_selected_layout(self):
"""Check which keyboard layout is currently selected"""
state_reply = self.ext.GetState().reply()
return state_reply.lockedGroup[0]
def update_keyboard_layout(self):
"""Update current vm's keyboard_layout property"""
new_property = self.keyboard_layout.get_property(
self.selected_layout)
current_property = self.current_vm.keyboard_layout
if new_property != current_property:
self.current_vm.keyboard_layout = new_property
def event_reader(self, callback):
"""Poll for X events related to keyboard layout"""
try:
for event in iter(self.conn.poll_for_event, None):
if isinstance(event, xcffib.xproto.PropertyNotifyEvent):
if event.atom == self.atom_xklavier:
self.selected_layout = self.get_selected_layout()
elif event.atom == self.atom_xkb_rules:
self.keyboard_layout = KeyboardLayout(
self.get_keyboard_layout())
else:
continue
self.update_keyboard_layout()
except xcffib.ConnectionException:
callback()
def get_monitor_layout():
"""Get list of monitors and their size/position"""
outputs = []
@ -114,44 +311,22 @@ def get_monitor_layout():
return outputs
def set_keyboard_layout(vm):
"""Set layout configuration into features for Gui admin extension"""
try:
# Examples of 'xprop -root _XKB_RULES_NAMES' output values:
# "evdev", "pc105", "fr", "oss", ""
# "evdev", "pc105", "pl,us", ",", "grp:win_switch,compose:caps"
# We use the first layout provided
xkb_re = r'_XKB_RULES_NAMES\(STRING\) = ' \
r'\"(.*)\", \"(.*)\", \"(.*)\", \"(.*)\", \"(.*)\"\n'
xkb_rules_names = subprocess.check_output(
['xprop', '-root', '_XKB_RULES_NAMES']).decode()
xkb_parsed = re.match(xkb_re, xkb_rules_names)
if xkb_parsed:
xkb_layout = [x.split(',')[0] for x in xkb_parsed.groups()[2:4]]
# We keep all options
xkb_layout.append(xkb_parsed.group(5))
keyboard_layout = '+'.join(xkb_layout)
vm.features['keyboard-layout'] = keyboard_layout
else:
vm.log.warning('Failed to parse layout for %s', vm)
except subprocess.CalledProcessError as e:
vm.log.warning('Failed to set layout for %s: %s', vm, str(e))
class DAEMONLauncher:
"""Launch GUI/AUDIO daemon for VMs"""
def __init__(self, app: qubesadmin.app.QubesBase):
def __init__(self, app: qubesadmin.app.QubesBase, vm_names=None, kde=False):
""" Initialize DAEMONLauncher.
:param app: :py:class:`qubesadmin.Qubes` instance
:param vm_names: VM names to watch for, or None if watching for all
:param kde: add KDE-specific arguments for guid
"""
self.app = app
self.started_processes = {}
self.vm_names = vm_names
self.kde = kde
@asyncio.coroutine
def send_monitor_layout(self, vm, layout=None, startup=False):
async def send_monitor_layout(self, vm, layout=None, startup=False):
"""Send monitor layout to a given VM
This function is a coroutine.
@ -186,7 +361,7 @@ class DAEMONLauncher:
pass
try:
yield from asyncio.get_event_loop(). \
await asyncio.get_event_loop(). \
run_in_executor(None,
functools.partial(
vm.run_service_for_stdio,
@ -215,33 +390,28 @@ class DAEMONLauncher:
"""Return KDE-specific arguments for gui-daemon, if applicable"""
guid_cmd = []
# Avoid using environment variables for checking the current session,
# because this script may be called with cleared env (like with sudo).
if subprocess.check_output(
['xprop', '-root', '-notype', 'KWIN_RUNNING']) == \
b'KWIN_RUNNING = 0x1\n':
# native decoration plugins is used, so adjust window properties
# accordingly
guid_cmd += ['-T'] # prefix window titles with VM name
# get owner of X11 session
session_owner = None
for line in subprocess.check_output(['xhost']).splitlines():
if line == b'SI:localuser:root':
pass
elif line.startswith(b'SI:localuser:'):
session_owner = line.split(b':')[2].decode()
if session_owner is not None:
data_dir = os.path.expanduser(
'~{}/.local/share'.format(session_owner))
else:
# fallback to current user
data_dir = os.path.expanduser('~/.local/share')
# native decoration plugins is used, so adjust window properties
# accordingly
guid_cmd += ['-T'] # prefix window titles with VM name
# get owner of X11 session
session_owner = None
for line in subprocess.check_output(['xhost']).splitlines():
if line == b'SI:localuser:root':
pass
elif line.startswith(b'SI:localuser:'):
session_owner = line.split(b':')[2].decode()
if session_owner is not None:
data_dir = os.path.expanduser(
'~{}/.local/share'.format(session_owner))
else:
# fallback to current user
data_dir = os.path.expanduser('~/.local/share')
guid_cmd += ['-p',
'_KDE_NET_WM_COLOR_SCHEME=s:{}'.format(
os.path.join(data_dir,
'qubes-kde',
vm.label.name + '.colors'))]
guid_cmd += ['-p',
'_KDE_NET_WM_COLOR_SCHEME=s:{}'.format(
os.path.join(data_dir,
'qubes-kde',
vm.label.name + '.colors'))]
return guid_cmd
def common_guid_args(self, vm):
@ -262,14 +432,30 @@ class DAEMONLauncher:
if vm.features.check_with_template('rpc-clipboard', False):
guid_cmd.extend(['-Q'])
guid_cmd += self.kde_guid_args(vm)
guivm = self.app.domains[vm.guivm]
options = retrieve_gui_daemon_options(vm, guivm)
config = serialize_gui_daemon_options(options)
config_path = self.guid_config_file(vm.xid)
self.write_guid_config(config_path, config)
guid_cmd.extend(['-C', config_path])
return guid_cmd
@staticmethod
def write_guid_config(config_path, config):
"""Write guid configuration to a file"""
with open(config_path, 'w') as config_file:
config_file.write(config)
@staticmethod
def guid_pidfile(xid):
"""Helper function to construct a GUI pidfile path"""
return '/var/run/qubes/guid-running.{}'.format(xid)
@staticmethod
def guid_config_file(xid):
"""Helper function to construct a GUI configuration file path"""
return '/var/run/qubes/guid-conf.{}'.format(xid)
@staticmethod
def pacat_pidfile(xid):
"""Helper function to construct an AUDIO pidfile path"""
@ -284,8 +470,7 @@ class DAEMONLauncher:
else vm.xid
return xid
@asyncio.coroutine
def start_gui_for_vm(self, vm, monitor_layout=None):
async def start_gui_for_vm(self, vm, monitor_layout=None):
"""Start GUI daemon (qubes-guid) connected directly to a VM
This function is a coroutine.
@ -295,6 +480,8 @@ class DAEMONLauncher:
local X server.
"""
guid_cmd = self.common_guid_args(vm)
if self.kde:
guid_cmd.extend(self.kde_guid_args(vm))
guid_cmd.extend(['-d', str(vm.xid)])
if vm.virt_mode == 'hvm':
@ -309,13 +496,12 @@ class DAEMONLauncher:
vm.log.info('Starting GUI')
yield from asyncio.create_subprocess_exec(*guid_cmd)
await asyncio.create_subprocess_exec(*guid_cmd)
yield from self.send_monitor_layout(vm, layout=monitor_layout,
await self.send_monitor_layout(vm, layout=monitor_layout,
startup=True)
@asyncio.coroutine
def start_gui_for_stubdomain(self, vm, force=False):
async def start_gui_for_stubdomain(self, vm, force=False):
"""Start GUI daemon (qubes-guid) connected to a stubdomain
This function is a coroutine.
@ -339,10 +525,9 @@ class DAEMONLauncher:
guid_cmd = self.common_guid_args(vm)
guid_cmd.extend(['-d', str(vm.stubdom_xid), '-t', str(vm.xid)])
yield from asyncio.create_subprocess_exec(*guid_cmd)
await asyncio.create_subprocess_exec(*guid_cmd)
@asyncio.coroutine
def start_audio_for_vm(self, vm):
async def start_audio_for_vm(self, vm):
"""Start AUDIO daemon (pacat-simple-vchan) connected directly to a VM
This function is a coroutine.
@ -353,10 +538,9 @@ class DAEMONLauncher:
pacat_cmd = [PACAT_DAEMON_PATH, '-l', self.pacat_domid(vm), vm.name]
vm.log.info('Starting AUDIO')
yield from asyncio.create_subprocess_exec(*pacat_cmd)
await asyncio.create_subprocess_exec(*pacat_cmd)
@asyncio.coroutine
def start_gui(self, vm, force_stubdom=False, monitor_layout=None):
async def start_gui(self, vm, force_stubdom=False, monitor_layout=None):
"""Start GUI daemon regardless of start event.
This function is a coroutine.
@ -372,16 +556,15 @@ class DAEMONLauncher:
return
if vm.virt_mode == 'hvm':
yield from self.start_gui_for_stubdomain(vm, force=force_stubdom)
await self.start_gui_for_stubdomain(vm, force=force_stubdom)
if not vm.features.check_with_template('gui', True):
return
if not os.path.exists(self.guid_pidfile(vm.xid)):
yield from self.start_gui_for_vm(vm, monitor_layout=monitor_layout)
await self.start_gui_for_vm(vm, monitor_layout=monitor_layout)
@asyncio.coroutine
def start_audio(self, vm):
async def start_audio(self, vm):
"""Start AUDIO daemon regardless of start event.
This function is a coroutine.
@ -398,10 +581,14 @@ class DAEMONLauncher:
xid = self.pacat_domid(vm)
if not os.path.exists(self.pacat_pidfile(xid)):
yield from self.start_audio_for_vm(vm)
await self.start_audio_for_vm(vm)
def on_domain_spawn(self, vm, _event, **kwargs):
"""Handler of 'domain-spawn' event, starts GUI daemon for stubdomain"""
if not self.is_watched(vm):
return
try:
if getattr(vm, 'guivm', None) != vm.app.local_name:
return
@ -416,6 +603,10 @@ class DAEMONLauncher:
def on_domain_start(self, vm, _event, **kwargs):
"""Handler of 'domain-start' event, starts GUI/AUDIO daemon for
actual VM """
if not self.is_watched(vm):
return
try:
if getattr(vm, 'guivm', None) == vm.app.local_name and \
vm.features.check_with_template('gui', True) and \
@ -442,6 +633,9 @@ class DAEMONLauncher:
if vm.klass == 'AdminVM':
continue
if not self.is_watched(vm):
continue
power_state = vm.get_power_state()
if power_state == 'Running':
asyncio.ensure_future(
@ -454,22 +648,42 @@ class DAEMONLauncher:
asyncio.ensure_future(
self.start_gui_for_stubdomain(vm))
def on_domain_stopped(self, vm, _event, **_kwargs):
"""Handler of 'domain-stopped' event, cleans up"""
if not self.is_watched(vm):
return
self.cleanup_guid(vm.xid)
if vm.virt_mode == 'hvm':
self.cleanup_guid(vm.stubdom_xid)
def cleanup_guid(self, xid):
"""
Clean up after qubes-guid. Removes the auto-generated configuration
file, if any.
"""
config_path = self.guid_config_file(xid)
if os.path.exists(config_path):
os.unlink(config_path)
def register_events(self, events):
"""Register domain startup events in app.events dispatcher"""
events.add_handler('domain-spawn', self.on_domain_spawn)
events.add_handler('domain-start', self.on_domain_start)
events.add_handler('connection-established',
self.on_connection_established)
events.add_handler('domain-stopped', self.on_domain_stopped)
def is_watched(self, vm):
"""
Should we watch this VM for changes
"""
def x_reader(conn, callback):
"""Try reading something from X connection to check if it's still alive.
In case it isn't, call *callback*.
"""
try:
conn.poll_for_event()
except xcffib.ConnectionException:
callback()
if self.vm_names is None:
return True
return vm.name in self.vm_names
if 'XDG_RUNTIME_DIR' in os.environ:
@ -482,8 +696,7 @@ else:
parser = qubesadmin.tools.QubesArgumentParser(
description='start GUI for qube(s)', vmname_nargs='*')
parser.add_argument('--watch', action='store_true',
help='Keep watching for further domains'
' startups, must be used with --all')
help='Keep watching for further domain startups')
parser.add_argument('--force-stubdomain', action='store_true',
help='Start GUI to stubdomain-emulated VGA,'
' even if gui-agent is running in the VM')
@ -492,9 +705,8 @@ parser.add_argument('--pidfile', action='store', default=pidfile_path,
parser.add_argument('--notify-monitor-layout', action='store_true',
help='Notify running instance in --watch mode'
' about changed monitor layout')
parser.add_argument('--set-keyboard-layout', action='store_true',
help='Set keyboard layout values into GuiVM features.'
'This option is implied by --watch')
parser.add_argument('--kde', action='store_true',
help='Set KDE specific arguments to gui-daemon.')
# Add it for the help only
parser.add_argument('--force', action='store_true', default=False,
help='Force running daemon without enabled services'
@ -511,19 +723,19 @@ def main(args=None):
print(parser.format_help())
return
args = parser.parse_args(args)
if args.watch and not args.all_domains:
parser.error('--watch option must be used with --all')
if args.watch and args.notify_monitor_layout:
parser.error('--watch cannot be used with --notify-monitor-layout')
if args.watch and 'guivm-gui-agent' in enabled_services:
args.set_keyboard_layout = True
if args.set_keyboard_layout or os.path.exists('/etc/qubes-release'):
guivm = args.app.domains.get_blind(args.app.local_name)
set_keyboard_layout(guivm)
launcher = DAEMONLauncher(args.app)
if args.all_domains:
vm_names = None
else:
vm_names = [vm.name for vm in args.domains]
launcher = DAEMONLauncher(
args.app,
vm_names=vm_names,
kde=args.kde)
if args.watch:
if not have_events:
parser.error('--watch option require Python >= 3.5')
with daemon.pidfile.TimeoutPIDLockFile(args.pidfile):
loop = asyncio.get_event_loop()
# pylint: disable=no-member
@ -541,8 +753,10 @@ def main(args=None):
launcher.send_monitor_layout_all)
conn = xcffib.connect()
x_watcher = XWatcher(conn, args.app)
x_fd = conn.get_file_descriptor()
loop.add_reader(x_fd, x_reader, conn, events_listener.cancel)
loop.add_reader(x_fd, x_watcher.event_reader,
events_listener.cancel)
try:
loop.run_until_complete(events_listener)

View File

@ -323,8 +323,7 @@ def get_parser():
'''Create :py:class:`argparse.ArgumentParser` suitable for
:program:`qvm-volume`.
'''
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
want_app=True)
parser = qubesadmin.tools.QubesArgumentParser(description=__doc__)
parser.register('action', 'parsers',
qubesadmin.tools.AliasedSubParsersAction)
sub_parsers = parser.add_subparsers(

View File

@ -0,0 +1,128 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2020 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
"""
This is a set of helper classes, designed to facilitate importing an X extension
that's not supported by default by xcffib.
"""
import io
import struct
import xcffib
class XkbUseExtensionReply(xcffib.Reply):
"""Helper class to parse XkbUseExtensionReply
Contains hardcoded values based on X11/XKBproto.h"""
# pylint: disable=too-few-public-methods
def __init__(self, unpacker):
if isinstance(unpacker, xcffib.Protobj):
unpacker = xcffib.MemoryUnpacker(unpacker.pack())
xcffib.Reply.__init__(self, unpacker)
base = unpacker.offset
self.major_version, self.minor_version = unpacker.unpack(
"xx2x4xHH4x4x4x4x")
self.bufsize = unpacker.offset - base
class XkbUseExtensionCookie(xcffib.Cookie):
"""Helper class for use in loading Xkb extension"""
reply_type = XkbUseExtensionReply
class XkbGetStateReply(xcffib.Reply):
"""Helper class to parse XkbGetState; copy&paste from X11/XKBproto.h"""
# pylint: disable=too-few-public-methods
_typedef = """
BYTE type;
BYTE deviceID;
CARD16 sequenceNumber B16;
CARD32 length B32;
CARD8 mods;
CARD8 baseMods;
CARD8 latchedMods;
CARD8 lockedMods;
CARD8 group;
CARD8 lockedGroup;
INT16 baseGroup B16;
INT16 latchedGroup B16;
CARD8 compatState;
CARD8 grabMods;
CARD8 compatGrabMods;
CARD8 lookupMods;
CARD8 compatLookupMods;
CARD8 pad1;
CARD16 ptrBtnState B16;
CARD16 pad2 B16;
CARD32 pad3 B32;"""
_type_mapping = {
"BYTE": "B",
"CARD16": "H",
"CARD8": "B",
"CARD32": "I",
"INT16": "h",
}
def __init__(self, unpacker):
if isinstance(unpacker, xcffib.Protobj):
unpacker = xcffib.MemoryUnpacker(unpacker.pack())
xcffib.Reply.__init__(self, unpacker)
base = unpacker.offset
# dynamic parse of copy&pasted struct content, for easy re-usability
for line in self._typedef.splitlines():
line = line.strip()
line = line.rstrip(';')
if not line:
continue
typename, name = line.split()[:2] # ignore optional third part
setattr(self, name, unpacker.unpack(self._type_mapping[typename]))
self.bufsize = unpacker.offset - base
class XkbGetStateCookie(xcffib.Cookie):
"""Helper class for use in parsing Xkb GetState"""
reply_type = XkbGetStateReply
class XkbExtension(xcffib.Extension):
"""Helper class to load and use Xkb xcffib extension; needed
because there is not XKB support in xcffib."""
# pylint: disable=invalid-name,missing-function-docstring
def UseExtension(self, is_checked=True):
buf = io.BytesIO()
buf.write(struct.pack("=xx2xHH", 1, 0))
return self.send_request(0, buf, XkbGetStateCookie,
is_checked=is_checked)
def GetState(self, deviceSpec=0x100, is_checked=True):
buf = io.BytesIO()
buf.write(struct.pack("=xx2xHxx", deviceSpec))
return self.send_request(4, buf, XkbGetStateCookie,
is_checked=is_checked)
key = xcffib.ExtensionKey("XKEYBOARD")
# this is a lie: there are events and errors types
_events = {}
_errors = {}
# pylint: disable=protected-access
xcffib._add_ext(key, XkbExtension, _events, _errors)

View File

@ -23,6 +23,8 @@
#
"""Various utility functions."""
import fcntl
import os
import re
@ -142,8 +144,13 @@ def vm_dependencies(app, reference_vm):
if vm == reference_vm:
continue
for prop in vm_properties:
if reference_vm == getattr(vm, prop, None) and \
not vm.property_is_default(prop):
if not hasattr(vm, prop):
continue
try:
is_prop_default = vm.property_is_default(prop)
except qubesadmin.exc.QubesPropertyAccessError:
is_prop_default = False
if reference_vm == getattr(vm, prop, None) and not is_prop_default:
result.append((vm, prop))
return result
@ -161,6 +168,32 @@ def encode_for_vmexec(args):
parts = []
for arg in args:
part = re.sub(br'[^a-zA-Z0-9_.+]', encode, arg.encode('utf-8'))
part = re.sub(br'[^a-zA-Z0-9_.]', encode, arg.encode('utf-8'))
parts.append(part)
return b'+'.join(parts).decode('ascii')
class LockFile(object):
"""Simple locking context manager. It opens a file with an advisory lock
taken (fcntl.lockf)"""
def __init__(self, path, nonblock=False):
"""Open the file. Call *acquire* or enter the context to lock
the file"""
self.file = open(path, "w")
self.nonblock = nonblock
def __enter__(self, *args, **kwargs):
self.acquire()
return self
def acquire(self):
"""Lock the opened file"""
fcntl.lockf(self.file,
fcntl.LOCK_EX | (fcntl.LOCK_NB if self.nonblock else 0))
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self.release()
def release(self):
"""Unlock the file and close the file object"""
fcntl.lockf(self.file, fcntl.LOCK_UN)
self.file.close()

View File

@ -53,7 +53,7 @@ class QubesVM(qubesadmin.base.PropertyHolder):
firewall = None
def __init__(self, app, name, klass=None, power_state=None):
super(QubesVM, self).__init__(app, 'admin.vm.property.', name)
super().__init__(app, 'admin.vm.property.', name)
self._volumes = None
self._klass = klass
self._power_state_cache = power_state
@ -373,7 +373,7 @@ class QubesVM(qubesadmin.base.PropertyHolder):
# use cached value if available
if self._klass is None:
# pylint: disable=no-member
self._klass = super(QubesVM, self).klass
self._klass = super().klass
return self._klass
class DispVMWrapper(QubesVM):
@ -398,7 +398,7 @@ class DispVMWrapper(QubesVM):
# Service call may wait for session start, give it more time
# than default 5s
kwargs['connect_timeout'] = self.qrexec_timeout
return super(DispVMWrapper, self).run_service(service, **kwargs)
return super().run_service(service, **kwargs)
def cleanup(self):
'''Cleanup after DispVM usage'''

View File

@ -11,11 +11,16 @@ BuildRequires: python%{python3_pkgversion}-setuptools
BuildRequires: python%{python3_pkgversion}-devel
BuildRequires: python%{python3_pkgversion}-sphinx
BuildRequires: python%{python3_pkgversion}-dbus
BuildRequires: python%{python3_pkgversion}-lxml
BuildRequires: python%{python3_pkgversion}-xcffib
Requires: python%{python3_pkgversion}-qubesadmin
Requires: python%{python3_pkgversion}-yaml
Requires: scrypt
BuildArch: noarch
Source0: %{name}-%{version}.tar.gz
Conflicts: qubes-core-agent < 4.1.9
# qubes-guid -C option
Conflicts: qubes-gui-daemon < 4.1.7
%description
This package include managemt tools, like qvm-*.
@ -53,6 +58,7 @@ make -C doc DESTDIR=$RPM_BUILD_ROOT \
%defattr(-,root,root,-)
%doc LICENSE
%config /etc/xdg/autostart/qvm-start-daemon.desktop
%config /etc/xdg/autostart/qvm-start-daemon-kde.desktop
%{_bindir}/qubes-*
%{_bindir}/qvm-*
%{_mandir}/man1/qvm-*.1*

21
scripts/qubes-guivm-session Executable file
View File

@ -0,0 +1,21 @@
#!/bin/bash -e
print_usage() {
cat >&2 <<USAGE
Usage: $0 vmname
Starts given VM and runs its associated GUI daemon. Used as X session for the
GUI domain.
USAGE
}
if [ $# -lt 1 ] ; then
print_usage
exit 1
fi
# Start VM, gui-daemon and audio
qvm-start --skip-if-running "$1"
qvm-start-daemon --watch "$1" &
# Run the inner session (Xephyr) and wait until it exits
exec qvm-run -p --no-gui --service "$1" qubes.GuiVMSession

View File

@ -17,9 +17,11 @@ def get_console_scripts():
if sys.version_info[0:2] >= (3, 4):
for filename in os.listdir('./qubesadmin/tools'):
basename, ext = os.path.splitext(os.path.basename(filename))
if basename in ['__init__', 'dochelpers'] or ext != '.py':
if basename in ['__init__', 'dochelpers', 'xcffibhelpers']\
or ext != '.py':
continue
yield basename.replace('_', '-'), 'qubesadmin.tools.{}'.format(basename)
yield basename.replace('_', '-'), 'qubesadmin.tools.{}'.format(
basename)
# create simple scripts that run much faster than "console entry points"
class CustomInstall(setuptools.command.install.install):

View File

@ -1 +1 @@
4.1.6
4.1.9