Merge remote-tracking branch 'origin/master' into core3-devel-mm

This commit is contained in:
Marek Marczykowski-Górecki 2016-04-11 13:03:12 +02:00
commit 8c6fe7ed90
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
25 changed files with 1177 additions and 175 deletions

View File

@ -74,8 +74,10 @@ endif
cp qubes-rpc-policy/qubes.NotifyUpdates.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyUpdates cp qubes-rpc-policy/qubes.NotifyUpdates.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyUpdates
cp qubes-rpc-policy/qubes.NotifyTools.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyTools cp qubes-rpc-policy/qubes.NotifyTools.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.NotifyTools
cp qubes-rpc-policy/qubes.GetImageRGBA.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.GetImageRGBA cp qubes-rpc-policy/qubes.GetImageRGBA.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.GetImageRGBA
cp qubes-rpc-policy/qubes.GetRandomizedTime.policy $(DESTDIR)/etc/qubes-rpc/policy/qubes.GetRandomizedTime
cp qubes-rpc/qubes.NotifyUpdates $(DESTDIR)/etc/qubes-rpc/ cp qubes-rpc/qubes.NotifyUpdates $(DESTDIR)/etc/qubes-rpc/
cp qubes-rpc/qubes.NotifyTools $(DESTDIR)/etc/qubes-rpc/ cp qubes-rpc/qubes.NotifyTools $(DESTDIR)/etc/qubes-rpc/
cp qubes-rpc/qubes.GetRandomizedTime $(DESTDIR)/etc/qubes-rpc/
cp qubes-rpc/qubes-notify-updates $(DESTDIR)/usr/libexec/qubes/ cp qubes-rpc/qubes-notify-updates $(DESTDIR)/usr/libexec/qubes/
cp qubes-rpc/qubes-notify-tools $(DESTDIR)/usr/libexec/qubes/ cp qubes-rpc/qubes-notify-tools $(DESTDIR)/usr/libexec/qubes/

View File

@ -591,9 +591,18 @@ class QubesVm(object):
if self.installed_by_rpm: if self.installed_by_rpm:
raise QubesException("Cannot rename VM installed by RPM -- first clone VM and then use yum to remove package.") raise QubesException("Cannot rename VM installed by RPM -- first clone VM and then use yum to remove package.")
assert self._collection is not None
if self._collection.get_vm_by_name(name):
raise QubesException("VM with this name already exists")
self.pre_rename(name) self.pre_rename(name)
if self.libvirt_domain: try:
self.libvirt_domain.undefine() self.libvirt_domain.undefine()
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
pass
else:
raise
if self._qdb_connection: if self._qdb_connection:
self._qdb_connection.close() self._qdb_connection.close()
self._qdb_connection = None self._qdb_connection = None
@ -779,6 +788,8 @@ class QubesVm(object):
# libxl_domain_info failed - domain no longer exists # libxl_domain_info failed - domain no longer exists
elif e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR: elif e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR:
return 0 return 0
elif e.get_error_code() is None: # unknown...
return 0
else: else:
print >>sys.stderr, "libvirt error code: {!r}".format( print >>sys.stderr, "libvirt error code: {!r}".format(
e.get_error_code()) e.get_error_code())
@ -796,7 +807,9 @@ class QubesVm(object):
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
return 0 return 0
# libxl_domain_info failed - domain no longer exists # libxl_domain_info failed - domain no longer exists
elif e.get_error_code() == libvirt.VIR_INTERNAL_ERROR: elif e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR:
return 0
elif e.get_error_code() is None: # unknown...
return 0 return 0
else: else:
print >>sys.stderr, "libvirt error code: {!r}".format( print >>sys.stderr, "libvirt error code: {!r}".format(
@ -918,6 +931,11 @@ class QubesVm(object):
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
return False return False
# libxl_domain_info failed - domain no longer exists
elif e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR:
return False
elif e.get_error_code() is None: # unknown...
return False
else: else:
print >>sys.stderr, "libvirt error code: {!r}".format( print >>sys.stderr, "libvirt error code: {!r}".format(
e.get_error_code()) e.get_error_code())
@ -932,6 +950,11 @@ class QubesVm(object):
except libvirt.libvirtError as e: except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
return False return False
# libxl_domain_info failed - domain no longer exists
elif e.get_error_code() == libvirt.VIR_ERR_INTERNAL_ERROR:
return False
elif e.get_error_code() is None: # unknown...
return False
else: else:
print >>sys.stderr, "libvirt error code: {!r}".format( print >>sys.stderr, "libvirt error code: {!r}".format(
e.get_error_code()) e.get_error_code())
@ -1082,6 +1105,7 @@ class QubesVm(object):
if self.is_netvm(): if self.is_netvm():
self.qdb.write("/qubes-netvm-gateway", self.gateway) self.qdb.write("/qubes-netvm-gateway", self.gateway)
self.qdb.write("/qubes-netvm-primary-dns", self.gateway)
self.qdb.write("/qubes-netvm-secondary-dns", self.secondary_dns) self.qdb.write("/qubes-netvm-secondary-dns", self.secondary_dns)
self.qdb.write("/qubes-netvm-netmask", self.netmask) self.qdb.write("/qubes-netvm-netmask", self.netmask)
self.qdb.write("/qubes-netvm-network", self.network) self.qdb.write("/qubes-netvm-network", self.network)
@ -1090,6 +1114,7 @@ class QubesVm(object):
self.qdb.write("/qubes-ip", self.ip) self.qdb.write("/qubes-ip", self.ip)
self.qdb.write("/qubes-netmask", self.netvm.netmask) self.qdb.write("/qubes-netmask", self.netvm.netmask)
self.qdb.write("/qubes-gateway", self.netvm.gateway) self.qdb.write("/qubes-gateway", self.netvm.gateway)
self.qdb.write("/qubes-primary-dns", self.netvm.gateway)
self.qdb.write("/qubes-secondary-dns", self.netvm.secondary_dns) self.qdb.write("/qubes-secondary-dns", self.netvm.secondary_dns)
tzname = self.get_timezone() tzname = self.get_timezone()
@ -1651,13 +1676,17 @@ class QubesVm(object):
if bool(input) + bool(passio_popen) + bool(localcmd) > 1: if bool(input) + bool(passio_popen) + bool(localcmd) > 1:
raise ValueError("'input', 'passio_popen', 'localcmd' cannot be " raise ValueError("'input', 'passio_popen', 'localcmd' cannot be "
"used together") "used together")
if not wait and (localcmd or input):
raise ValueError("Cannot use wait=False with input or "
"localcmd specified")
if localcmd: if localcmd:
return self.run("QUBESRPC %s %s" % (service, source), return self.run("QUBESRPC %s %s" % (service, source),
localcmd=localcmd, user=user, wait=wait, gui=gui) localcmd=localcmd, user=user, wait=wait, gui=gui)
elif input: elif input:
return self.run("QUBESRPC %s %s" % (service, source), p = self.run("QUBESRPC %s %s" % (service, source),
localcmd="echo %s" % input, user=user, wait=wait, user=user, wait=wait, gui=gui, passio_popen=True)
gui=gui) p.communicate(input)
return p.returncode
else: else:
return self.run("QUBESRPC %s %s" % (service, source), return self.run("QUBESRPC %s %s" % (service, source),
passio_popen=passio_popen, user=user, wait=wait, passio_popen=passio_popen, user=user, wait=wait,

View File

@ -51,6 +51,7 @@ class QfileDaemonDvm:
qvm_collection = QubesVmCollection() qvm_collection = QubesVmCollection()
qvm_collection.lock_db_for_writing() qvm_collection.lock_db_for_writing()
try:
tar_process = subprocess.Popen( tar_process = subprocess.Popen(
['bsdtar', '-C', current_savefile_vmdir, ['bsdtar', '-C', current_savefile_vmdir,
@ -62,7 +63,6 @@ class QfileDaemonDvm:
vm = qvm_collection.get_vm_by_name(self.name) vm = qvm_collection.get_vm_by_name(self.name)
if vm is None: if vm is None:
sys.stderr.write('Domain ' + self.name + ' does not exist ?') sys.stderr.write('Domain ' + self.name + ' does not exist ?')
qvm_collection.unlock_db()
return None return None
label = vm.label label = vm.label
if len(sys.argv) > 4 and len(sys.argv[4]) > 0: if len(sys.argv) > 4 and len(sys.argv[4]) > 0:
@ -72,7 +72,6 @@ class QfileDaemonDvm:
vm_disptempl = qvm_collection.get_vm_by_name(disp_templ) vm_disptempl = qvm_collection.get_vm_by_name(disp_templ)
if vm_disptempl is None: if vm_disptempl is None:
sys.stderr.write('Domain ' + disp_templ + ' does not exist ?') sys.stderr.write('Domain ' + disp_templ + ' does not exist ?')
qvm_collection.unlock_db()
return None return None
dispvm = qvm_collection.add_new_vm('QubesDisposableVm', dispvm = qvm_collection.add_new_vm('QubesDisposableVm',
disp_template=vm_disptempl, disp_template=vm_disptempl,
@ -98,7 +97,6 @@ class QfileDaemonDvm:
# Wait for tar to finish # Wait for tar to finish
if tar_process.wait() != 0: if tar_process.wait() != 0:
sys.stderr.write('Failed to unpack saved-cows.tar') sys.stderr.write('Failed to unpack saved-cows.tar')
qvm_collection.unlock_db()
return None return None
print >>sys.stderr, "time=%s, VM starting" % (str(time.time())) print >>sys.stderr, "time=%s, VM starting" % (str(time.time()))
try: try:
@ -112,6 +110,7 @@ class QfileDaemonDvm:
dispvm.netvm = vm.dispvm_netvm dispvm.netvm = vm.dispvm_netvm
print >>sys.stderr, "time=%s, VM started" % (str(time.time())) print >>sys.stderr, "time=%s, VM started" % (str(time.time()))
qvm_collection.save() qvm_collection.save()
finally:
qvm_collection.unlock_db() qvm_collection.unlock_db()
# Reload firewall rules # Reload firewall rules
print >>sys.stderr, "time=%s, reloading firewall" % (str(time.time())) print >>sys.stderr, "time=%s, reloading firewall" % (str(time.time()))

View File

@ -15,6 +15,10 @@ Options
Show this help message and exit Show this help message and exit
.. option:: --verify-only
Do not restore the data, only verify backup integrity
.. option:: --skip-broken .. option:: --skip-broken
Do not restore VMs that have missing templates or netvms Do not restore VMs that have missing templates or netvms
@ -48,6 +52,22 @@ Options
Ignore dom0 username mismatch while restoring homedir Ignore dom0 username mismatch while restoring homedir
.. option:: --dest-vm=APPVM, -d APPVM
Restore from a backup located in a specific AppVM
.. option:: --encrypted, -e
The backup is encrypted
.. option:: --compressed. -z
The backup is compressed
.. option:: --debug
Enable (a lot of) debug output
Authors Authors
======= =======
| Joanna Rutkowska <joanna at invisiblethingslab dot com> | Joanna Rutkowska <joanna at invisiblethingslab dot com>

View File

@ -156,7 +156,7 @@ mac
Can be used to force specific of virtual ethernet card in the VM. Setting Can be used to force specific of virtual ethernet card in the VM. Setting
to ``auto`` will use automatic-generated MAC - based on VM id. Especially to ``auto`` will use automatic-generated MAC - based on VM id. Especially
useful when some licencing depending on static MAC address. useful when licensing requires a static MAC address.
For template-based HVM ``auto`` mode means to clone template MAC. For template-based HVM ``auto`` mode means to clone template MAC.
default_user default_user
@ -169,7 +169,7 @@ debug
Accepted values: ``on``, ``off`` Accepted values: ``on``, ``off``
Enables debug mode for VM. This can be used to turn on/off verbose logging Enables debug mode for VM. This can be used to turn on/off verbose logging
in many qubes components at once (gui virtualization, VM kernel, some other in many Qubes components at once (gui virtualization, VM kernel, some other
services). services).
For template-based HVM, enabling debug mode also disables automatic reset For template-based HVM, enabling debug mode also disables automatic reset
@ -196,7 +196,7 @@ guiagent_installed
This HVM have gui agent installed. This option disables full screen GUI This HVM have gui agent installed. This option disables full screen GUI
virtualization and enables per-window seemless GUI mode. This option will virtualization and enables per-window seemless GUI mode. This option will
be automatically turned on during Qubes Windows Tools installation, but if be automatically turned on during Qubes Windows Tools installation, but if
you install qubes gui agent in some other OS, you need to turn this option you install Qubes gui agent in some other OS, you need to turn this option
on manually. You can turn this option off to troubleshoot some early HVM OS on manually. You can turn this option off to troubleshoot some early HVM OS
boot problems (enter safe mode etc), but the option will be automatically boot problems (enter safe mode etc), but the option will be automatically
enabled at first VM normal startup (and will take effect from the next enabled at first VM normal startup (and will take effect from the next

View File

@ -0,0 +1,6 @@
## Note that policy parsing stops at the first match,
## so adding anything below "$anyvm $anyvm action" line will have no effect
## Please use a single # to start your custom comments
$anyvm dom0 allow

View File

@ -0,0 +1,77 @@
#!/bin/bash
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Patrick Schleizer <adrelanos@riseup.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
## Similar code as Boot Clock Randomization.
## https://www.whonix.org/wiki/Boot_Clock_Randomization
set -e
## Get a random 0 or 1.
## Will use this to decide to use plus or minus.
ZERO_OR_ONE="$(shuf -i0-1 -n1 --random-source=/dev/random)"
## Create a random number between 0 and 180.
DELAY="$(shuf -i0-180 -n1 --random-source=/dev/random)"
## Create a random number between 0 and 999999999.
##
## Thanks to
## https://stackoverflow.com/questions/22887891/how-can-i-get-a-random-dev-random-number-between-0-and-999999999-in-bash
NANOSECONDS="$(shuf -i0-999999999 -n1 --random-source=/dev/random)"
## Examples NANOSECONDS:
## 117752805
## 38653957
## Add leading zeros, because `date` expects 9 digits.
NANOSECONDS="$(printf '%0*d\n' 9 "$NANOSECONDS")"
## Using
## printf '%0*d\n' 9 "38653957"
## 38653957
## becomes
## 038653957
## Examples NANOSECONDS:
## 117752805
## 038653957
if [ "$ZERO_OR_ONE" = "0" ]; then
PLUS_OR_MINUS="-"
elif [ "$ZERO_OR_ONE" = "1" ]; then
PLUS_OR_MINUS="+"
else
exit 2
fi
#OLD_TIME="$(date)"
#OLD_TIME_NANOSECONDS="$(date +%s.%N)"
OLD_UNIXTIME="$(date +%s)"
NEW_TIME="$(( $OLD_UNIXTIME $PLUS_OR_MINUS $DELAY ))"
NEW_TIME_NANOSECONDS="$NEW_TIME.$NANOSECONDS"
echo "$NEW_TIME_NANOSECONDS"
## Testing the `date` syntax:
## date --date @1396733199.112834496
## date --date "@$NEW_TIME_NANOSECONDS"

View File

@ -451,7 +451,7 @@ class Backup(object):
vms_not_for_backup = [vm.name for vm in self.app.domains vms_not_for_backup = [vm.name for vm in self.app.domains
if vm not in self.vms_for_backup] if vm not in self.vms_for_backup]
summary += "VMs not selected for backup:\n - " + "\n - ".join( summary += "VMs not selected for backup:\n - " + "\n - ".join(
vms_not_for_backup) sorted(vms_not_for_backup))
return summary return summary

View File

@ -25,13 +25,24 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# #
"""
.. warning::
The test suite hereby claims any domain whose name starts with
:py:data:`VMPREFIX` as fair game. This is needed to enforce sane
test executing environment. If you have domains named ``test-*``,
don't run the tests.
"""
import collections import collections
from distutils import spawn
import functools
import multiprocessing import multiprocessing
import logging import logging
import os import os
import shutil import shutil
import subprocess import subprocess
import sys import sys
import tempfile
import traceback import traceback
import unittest import unittest
@ -129,6 +140,32 @@ class TestEmitter(qubes.events.Emitter):
super(TestEmitter, self).fire_event_pre(event, *args, **kwargs) super(TestEmitter, self).fire_event_pre(event, *args, **kwargs)
self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1 self.fired_events[(event, args, tuple(sorted(kwargs.items())))] += 1
def expectedFailureIfTemplate(templates):
"""
Decorator for marking specific test as expected to fail only for some
templates. Template name is compared as substring, so 'whonix' will
handle both 'whonix-ws' and 'whonix-gw'.
templates can be either a single string, or an iterable
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
template = self.template
if isinstance(templates, basestring):
should_expect_fail = template in templates
else:
should_expect_fail = any([template in x for x in templates])
if should_expect_fail:
try:
func(self, *args, **kwargs)
except Exception:
raise unittest.case._ExpectedFailure(sys.exc_info())
raise unittest.case._UnexpectedSuccess()
else:
# Call directly:
func(self, *args, **kwargs)
return wrapper
return decorator
class _AssertNotRaisesContext(object): class _AssertNotRaisesContext(object):
"""A context manager used to implement TestCase.assertNotRaises methods. """A context manager used to implement TestCase.assertNotRaises methods.
@ -537,12 +574,6 @@ class SystemTestsMixin(object):
@classmethod @classmethod
def remove_test_vms(cls, xmlpath=XMLPATH, prefix=VMPREFIX): def remove_test_vms(cls, xmlpath=XMLPATH, prefix=VMPREFIX):
'''Aggresively remove any domain that has name in testing namespace. '''Aggresively remove any domain that has name in testing namespace.
.. warning::
The test suite hereby claims any domain whose name starts with
:py:data:`VMPREFIX` as fair game. This is needed to enforce sane
test executing environment. If you have domains named ``test-*``,
don't run the tests.
''' '''
# first, remove them Qubes-way # first, remove them Qubes-way
@ -578,6 +609,30 @@ class SystemTestsMixin(object):
for vmname in vmnames: for vmname in vmnames:
cls._remove_vm_disk(vmname) cls._remove_vm_disk(vmname)
def qrexec_policy(self, service, source, destination, allow=True):
"""
Allow qrexec calls for duration of the test
:param service: service name
:param source: source VM name
:param destination: destination VM name
:return:
"""
def add_remove_rule(add=True):
with open('/etc/qubes-rpc/policy/{}'.format(service), 'r+') as policy:
policy_rules = policy.readlines()
rule = "{} {} {}\n".format(source, destination,
'allow' if allow else 'deny')
if add:
policy_rules.insert(0, rule)
else:
policy_rules.remove(rule)
policy.truncate(0)
policy.seek(0)
policy.write(''.join(policy_rules))
add_remove_rule(add=True)
self.addCleanup(add_remove_rule, add=False)
def wait_for_window(self, title, timeout=30, show=True): def wait_for_window(self, title, timeout=30, show=True):
""" """
Wait for a window with a given title. Depending on show parameter, Wait for a window with a given title. Depending on show parameter,
@ -628,6 +683,76 @@ class SystemTestsMixin(object):
timeout -= 1 timeout -= 1
self.fail("Timeout while waiting for VM {} shutdown".format(vm.name)) self.fail("Timeout while waiting for VM {} shutdown".format(vm.name))
def prepare_hvm_system_linux(self, vm, init_script, extra_files=None):
if not os.path.exists('/usr/lib/grub/i386-pc'):
self.skipTest('grub2 not installed')
if not spawn.find_executable('grub2-install'):
self.skipTest('grub2-tools not installed')
if not spawn.find_executable('dracut'):
self.skipTest('dracut not installed')
# create a single partition
p = subprocess.Popen(['sfdisk', '-q', '-L', vm.storage.root_img],
stdin=subprocess.PIPE,
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT)
p.communicate('2048,\n')
assert p.returncode == 0, 'sfdisk failed'
# TODO: check if root_img is really file, not already block device
p = subprocess.Popen(['sudo', 'losetup', '-f', '-P', '--show',
vm.storage.root_img], stdout=subprocess.PIPE)
(loopdev, _) = p.communicate()
loopdev = loopdev.strip()
looppart = loopdev + 'p1'
assert p.returncode == 0, 'losetup failed'
subprocess.check_call(['sudo', 'mkfs.ext2', '-q', '-F', looppart])
mountpoint = tempfile.mkdtemp()
subprocess.check_call(['sudo', 'mount', looppart, mountpoint])
try:
subprocess.check_call(['sudo', 'grub2-install',
'--target', 'i386-pc',
'--modules', 'part_msdos ext2',
'--boot-directory', mountpoint, loopdev],
stderr=open(os.devnull, 'w')
)
grub_cfg = '{}/grub2/grub.cfg'.format(mountpoint)
subprocess.check_call(
['sudo', 'chown', '-R', os.getlogin(), mountpoint])
with open(grub_cfg, 'w') as f:
f.write(
"set timeout=1\n"
"menuentry 'Default' {\n"
" linux /vmlinuz root=/dev/xvda1 "
"rd.driver.blacklist=bochs_drm "
"rd.driver.blacklist=uhci_hcd\n"
" initrd /initrd\n"
"}"
)
p = subprocess.Popen(['uname', '-r'], stdout=subprocess.PIPE)
(kernel_version, _) = p.communicate()
kernel_version = kernel_version.strip()
kernel = '/boot/vmlinuz-{}'.format(kernel_version)
shutil.copy(kernel, os.path.join(mountpoint, 'vmlinuz'))
init_path = os.path.join(mountpoint, 'init')
with open(init_path, 'w') as f:
f.write(init_script)
os.chmod(init_path, 0755)
dracut_args = [
'--kver', kernel_version,
'--include', init_path,
'/usr/lib/dracut/hooks/pre-pivot/initscript.sh',
'--no-hostonly', '--nolvmconf', '--nomdadmconf',
]
if extra_files:
dracut_args += ['--install', ' '.join(extra_files)]
subprocess.check_call(
['dracut'] + dracut_args + [os.path.join(mountpoint,
'initrd')],
stderr=open(os.devnull, 'w')
)
finally:
subprocess.check_call(['sudo', 'umount', mountpoint])
shutil.rmtree(mountpoint)
subprocess.check_call(['sudo', 'losetup', '-d', loopdev])
# noinspection PyAttributeOutsideInit # noinspection PyAttributeOutsideInit
class BackupTestsMixin(SystemTestsMixin): class BackupTestsMixin(SystemTestsMixin):

View File

@ -187,6 +187,8 @@ class TC_10_BackupVMMixin(qubes.tests.BackupTestsMixin):
vms = self.create_backup_vms() vms = self.create_backup_vms()
self.backupvm.start() self.backupvm.start()
retcode = self.backupvm.run( retcode = self.backupvm.run(
# Debian 7 has too old losetup to handle loop-control device
"mknod /dev/loop0 b 7 0;"
"truncate -s 50M /home/user/backup.img && " "truncate -s 50M /home/user/backup.img && "
"mkfs.ext4 -F /home/user/backup.img && " "mkfs.ext4 -F /home/user/backup.img && "
"mkdir /home/user/backup && " "mkdir /home/user/backup && "

View File

@ -40,6 +40,7 @@ class TC_00_Dom0UpgradeMixin(qubes.tests.SystemTestsMixin):
pkg_name = 'qubes-test-pkg' pkg_name = 'qubes-test-pkg'
dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test', dom0_update_common_opts = ['--disablerepo=*', '--enablerepo=test',
'--setopt=test.copy_local=1'] '--setopt=test.copy_local=1']
update_flag_path = '/var/lib/qubes/updates/dom0-updates-available'
@classmethod @classmethod
def generate_key(cls, keydir): def generate_key(cls, keydir):
@ -181,10 +182,18 @@ Test package
"test".format(retcode)) "test".format(retcode))
def test_000_update(self): def test_000_update(self):
"""Dom0 update tests
Check if package update is:
- detected
- installed
- "updates pending" flag is cleared
"""
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0') filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
subprocess.check_call(['sudo', 'rpm', '-i', filename]) subprocess.check_call(['sudo', 'rpm', '-i', filename])
filename = self.create_pkg(self.tmpdir, self.pkg_name, '2.0') filename = self.create_pkg(self.tmpdir, self.pkg_name, '2.0')
self.send_pkg(filename) self.send_pkg(filename)
open(self.update_flag_path, 'a').close()
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt') logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try: try:
@ -204,6 +213,67 @@ Test package
self.pkg_name)], stdout=open(os.devnull, 'w')) self.pkg_name)], stdout=open(os.devnull, 'w'))
self.assertEqual(retcode, 0, 'Package {}-2.0 not installed after ' self.assertEqual(retcode, 0, 'Package {}-2.0 not installed after '
'update'.format(self.pkg_name)) 'update'.format(self.pkg_name))
self.assertFalse(os.path.exists(self.update_flag_path),
"'updates pending' flag not cleared")
def test_005_update_flag_clear(self):
"""Check if 'updates pending' flag is creared"""
# create any pkg (but not install it) to initialize repo in the VM
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
self.send_pkg(filename)
open(self.update_flag_path, 'a').close()
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y'] +
self.dom0_update_common_opts,
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("qubes-dom0-update failed: " + open(
logpath).read())
with open(logpath) as f:
dom0_update_output = f.read()
self.assertFalse('Errno' in dom0_update_output or
'Couldn\'t' in dom0_update_output,
"qubes-dom0-update reported an error: {}".
format(dom0_update_output))
self.assertFalse(os.path.exists(self.update_flag_path),
"'updates pending' flag not cleared")
def test_006_update_flag_clear(self):
"""Check if 'updates pending' flag is creared, using --clean"""
# create any pkg (but not install it) to initialize repo in the VM
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')
self.send_pkg(filename)
open(self.update_flag_path, 'a').close()
# remove also repodata to test #1685
shutil.rmtree('/var/lib/qubes/updates/repodata')
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:
subprocess.check_call(['sudo', 'qubes-dom0-update', '-y',
'--clean'] +
self.dom0_update_common_opts,
stdout=open(logpath, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
self.fail("qubes-dom0-update failed: " + open(
logpath).read())
with open(logpath) as f:
dom0_update_output = f.read()
self.assertFalse('Errno' in dom0_update_output or
'Couldn\'t' in dom0_update_output,
"qubes-dom0-update reported an error: {}".
format(dom0_update_output))
self.assertFalse(os.path.exists(self.update_flag_path),
"'updates pending' flag not cleared")
def test_010_instal(self): def test_010_instal(self):
filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0') filename = self.create_pkg(self.tmpdir, self.pkg_name, '1.0')

View File

@ -126,6 +126,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
"Ping by IP from AppVM failed") "Ping by IP from AppVM failed")
@qubes.tests.expectedFailureIfTemplate('debian-7')
@unittest.skipUnless(spawn.find_executable('xdotool'), @unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed") "xdotool not installed")
def test_020_simple_proxyvm_nm(self): def test_020_simple_proxyvm_nm(self):

View File

@ -176,6 +176,7 @@ class QMemmanReqHandler(SocketServer.BaseRequestHandler):
self.log = logging.getLogger('qmemman.daemon.reqhandler') self.log = logging.getLogger('qmemman.daemon.reqhandler')
got_lock = False got_lock = False
try:
# self.request is the TCP socket connected to the client # self.request is the TCP socket connected to the client
while True: while True:
self.data = self.request.recv(1024).strip() self.data = self.request.recv(1024).strip()
@ -185,8 +186,6 @@ class QMemmanReqHandler(SocketServer.BaseRequestHandler):
if got_lock: if got_lock:
global force_refresh_domain_list global force_refresh_domain_list
force_refresh_domain_list = True force_refresh_domain_list = True
global_lock.release()
self.log.debug('global_lock released')
return return
# XXX something is wrong here: return without release? # XXX something is wrong here: return without release?
@ -205,8 +204,13 @@ class QMemmanReqHandler(SocketServer.BaseRequestHandler):
resp = "FAIL\n" resp = "FAIL\n"
self.log.debug('resp={!r}'.format(resp)) self.log.debug('resp={!r}'.format(resp))
self.request.send(resp) self.request.send(resp)
except BaseException as e:
# XXX no release of lock? self.log.exception(
"exception while handling request: {!r}".format(e))
finally:
if got_lock:
global_lock.release()
self.log.debug('global_lock released')
parser = qubes.tools.QubesArgumentParser(want_app=False) parser = qubes.tools.QubesArgumentParser(want_app=False)

View File

@ -86,6 +86,8 @@ def main(args=None):
if args.vm is qubes.tools.VM_ALL and args.passio: if args.vm is qubes.tools.VM_ALL and args.passio:
parser.error('--all and --passio are mutually exclusive') parser.error('--all and --passio are mutually exclusive')
if args.localcmd and not passio.passio:
parser.error('--localcmd have no effect without --pass-io')
if args.color_output and not args.filter_esc: if args.color_output and not args.filter_esc:
parser.error('--color-output must be used with --filter-escape-chars') parser.error('--color-output must be used with --filter-escape-chars')

View File

@ -511,6 +511,13 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
subprocess.check_call(['sudo', 'systemctl', '-q', 'disable', subprocess.check_call(['sudo', 'systemctl', '-q', 'disable',
'qubes-vm@{}.service'.format(oldvalue)]) 'qubes-vm@{}.service'.format(oldvalue)])
try:
self.app.domains[newvalue]
except KeyError:
pass
else:
raise qubes.exc.QubesValueError(
'VM named {!r} already exists'.format(newvalue))
@qubes.events.handler('property-set:name') @qubes.events.handler('property-set:name')
def on_property_set_name(self, event, name, new_name, old_name=None): def on_property_set_name(self, event, name, new_name, old_name=None):
@ -896,18 +903,28 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
raise TypeError( raise TypeError(
'input, passio_popen and localcmd cannot be used together') 'input, passio_popen and localcmd cannot be used together')
if not wait and (localcmd or input):
raise ValueError("Cannot use wait=False with input or "
"localcmd specified")
if passio_stderr and not passio_popen: if passio_stderr and not passio_popen:
raise TypeError('passio_stderr can be used only with passio_popen') raise TypeError('passio_stderr can be used only with passio_popen')
if input: if input:
localcmd = 'printf %s {}'.format(pipes.quote(input)) # Internally use passio_popen, but do not return POpen object to
# the user - use internally for p.communicate()
passio_popen = True
source = 'dom0' if source is None else self.app.domains[source].name source = 'dom0' if source is None else self.app.domains[source].name
return self.run('QUBESRPC {} {}'.format(service, source), p = self.run('QUBESRPC {} {}'.format(service, source),
localcmd=localcmd, passio_popen=passio_popen, user=user, wait=wait, localcmd=localcmd, passio_popen=passio_popen, user=user, wait=wait,
gui=gui, passio_stderr=passio_stderr) gui=gui, passio_stderr=passio_stderr)
if input:
p.communicate(input)
return p.returncode
else:
return p
def request_memory(self, mem_required=None): def request_memory(self, mem_required=None):
# overhead of per-qube/per-vcpu Xen structures, # overhead of per-qube/per-vcpu Xen structures,

View File

@ -18,7 +18,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
VERSION=2.4 VERSION=2.5
COPY2VM="dom0" COPY2VM="dom0"
SUPPORT_FILES=0 SUPPORT_FILES=0
@ -103,7 +103,7 @@ XL_VTX=`cat $TEMP_DIR/xl-info |grep xen_caps | grep hvm`
XL_VTD=`cat $TEMP_DIR/xl-info |grep virt_caps |grep hvm_directio` XL_VTD=`cat $TEMP_DIR/xl-info |grep virt_caps |grep hvm_directio`
PCRS=`find /sys/devices/ -name pcrs` PCRS=`find /sys/devices/ -name pcrs`
FILENAME="Qubes-HCL-${BRAND// /_}-${PRODUCT// /_}-$DATE" FILENAME="Qubes-HCL-${BRAND//+([^[:alnum:]])/_}-${PRODUCT//+([^[:alnum:]])/_}-$DATE"
if [[ $XL_VTX ]] if [[ $XL_VTX ]]
then then

View File

@ -30,6 +30,7 @@ import qubes.backup
import os import os
import sys import sys
import getpass import getpass
from locale import getpreferredencoding
def print_progress(progress): def print_progress(progress):
print >> sys.stderr, "\r-> Backing up files: {0}%...".format (progress), print >> sys.stderr, "\r-> Backing up files: {0}%...".format (progress),
@ -51,6 +52,10 @@ def main():
parser.add_option ("--no-encrypt", action="store_true", parser.add_option ("--no-encrypt", action="store_true",
dest="no_encrypt", default=False, dest="no_encrypt", default=False,
help="Skip encryption even if sending the backup to VM") help="Skip encryption even if sending the backup to VM")
parser.add_option ("-p", "--passphrase-file", action="store",
dest="pass_file", default=None,
help="File containing the pass phrase to use, or '-' "
"to read it from stdin")
parser.add_option ("-E", "--enc-algo", action="store", parser.add_option ("-E", "--enc-algo", action="store",
dest="crypto_algorithm", default=None, dest="crypto_algorithm", default=None,
help="Specify non-default encryption algorithm. For " help="Specify non-default encryption algorithm. For "
@ -156,24 +161,26 @@ def main():
if not options.encrypt: if not options.encrypt:
print >>sys.stderr, "WARNING: encryption will not be used" print >>sys.stderr, "WARNING: encryption will not be used"
prompt = raw_input ("Do you want to proceed? [y/N] ") if options.pass_file is not None:
if not (prompt == "y" or prompt == "Y"): f = open(options.pass_file) if options.pass_file != "-" else sys.stdin
passphrase = f.readline().rstrip()
if f is not sys.stdin:
f.close()
else:
if raw_input("Do you want to proceed? [y/N] ").upper() != "Y":
exit(0) exit(0)
if options.encrypt: s = ("Please enter the pass phrase that will be used to {}verify "
passphrase = getpass.getpass("Please enter the pass phrase that will " "the backup: ").format('encrypt and ' if options.encrypt else '')
"be used to encrypt and verify the " passphrase = getpass.getpass(s)
"backup: ")
else:
passphrase = getpass.getpass("Please enter the pass phrase that will "
"be used to verify the backup: ")
passphrase2 = getpass.getpass("Enter again for verification: ") if getpass.getpass("Enter again for verification: ") != passphrase:
if passphrase != passphrase2:
print >>sys.stderr, "ERROR: Password mismatch" print >>sys.stderr, "ERROR: Password mismatch"
exit(1) exit(1)
passphrase = passphrase.decode(sys.stdin.encoding) encoding = sys.stdin.encoding or getpreferredencoding()
passphrase = passphrase.decode(encoding)
kwargs = {} kwargs = {}
if options.hmac_algorithm: if options.hmac_algorithm:

View File

@ -31,6 +31,7 @@ from qubes.backup import backup_restore_do
import qubes.backup import qubes.backup
import sys import sys
from optparse import OptionParser from optparse import OptionParser
from locale import getpreferredencoding
import os import os
import sys import sys
@ -81,6 +82,10 @@ def main():
parser.add_option ("-e", "--encrypted", action="store_true", dest="decrypt", default=False, parser.add_option ("-e", "--encrypted", action="store_true", dest="decrypt", default=False,
help="The backup is encrypted") help="The backup is encrypted")
parser.add_option ("-p", "--passphrase-file", action="store",
dest="pass_file", default=None,
help="File containing the pass phrase to use, or '-' to read it from stdin")
parser.add_option ("-z", "--compressed", action="store_true", dest="compressed", default=False, parser.add_option ("-z", "--compressed", action="store_true", dest="compressed", default=False,
help="The backup is compressed") help="The backup is compressed")
@ -110,6 +115,8 @@ def main():
restore_options['use-default-netvm'] = True restore_options['use-default-netvm'] = True
if options.replace_template: if options.replace_template:
restore_options['replace-template'] = options.replace_template restore_options['replace-template'] = options.replace_template
if options.rename_conflicting:
restore_options['rename-conflicting'] = True
if not options.dom0_home: if not options.dom0_home:
restore_options['dom0-home'] = False restore_options['dom0-home'] = False
if options.ignore_username_mismatch: if options.ignore_username_mismatch:
@ -128,8 +135,16 @@ def main():
print >>sys.stderr, "ERROR: VM {0} does not exist".format(options.appvm) print >>sys.stderr, "ERROR: VM {0} does not exist".format(options.appvm)
exit(1) exit(1)
passphrase = getpass.getpass("Please enter the pass phrase that will be used to decrypt/verify the backup: ") if options.pass_file is not None:
passphrase = passphrase.decode(sys.stdin.encoding) f = open(options.pass_file) if options.pass_file != "-" else sys.stdin
passphrase = f.readline().rstrip()
if f is not sys.stdin:
f.close()
else:
passphrase = getpass.getpass("Please enter the pass phrase to decrypt/verify the backup: ")
encoding = sys.stdin.encoding or getpreferredencoding()
passphrase = passphrase.decode(encoding)
print >> sys.stderr, "Checking backup content..." print >> sys.stderr, "Checking backup content..."
@ -244,11 +259,10 @@ def main():
print >> sys.stderr, "Continuing as directed" print >> sys.stderr, "Continuing as directed"
print >> sys.stderr, "While restoring user homedir, existing files/dirs will be backed up in 'home-pre-restore-<current-time>' dir" print >> sys.stderr, "While restoring user homedir, existing files/dirs will be backed up in 'home-pre-restore-<current-time>' dir"
prompt = raw_input ("Do you want to proceed? [y/N] ") if options.pass_file is None:
if not (prompt == "y" or prompt == "Y"): if raw_input("Do you want to proceed? [y/N] ").upper() != "Y":
exit(0) exit(0)
try: try:
backup_restore_do(restore_info, backup_restore_do(restore_info,
host_collection=host_collection, host_collection=host_collection,

View File

@ -22,6 +22,7 @@
# #
import fcntl import fcntl
from optparse import OptionParser
from qubes.qubes import QubesVmCollection from qubes.qubes import QubesVmCollection
import os.path import os.path
import os import os
@ -41,9 +42,11 @@ def get_netvm_of_vm(vm):
return netvm return netvm
def main(): def main():
verbose = False parser = OptionParser()
if len(sys.argv) > 1 and sys.argv[1] in [ '--verbose', '-v' ]: parser.add_option ("-v", "--verbose", action="store_true", dest="verbose", default=False)
verbose = True parser.add_option ("-f", "--force", action="store_true", dest="force", default=False)
(options, args) = parser.parse_args ()
lockfile_name = "/var/run/qubes/qvm-sync-clock.lock" lockfile_name = "/var/run/qubes/qvm-sync-clock.lock"
if os.path.exists(lockfile_name): if os.path.exists(lockfile_name):
@ -74,23 +77,27 @@ def main():
sys.exit(1) sys.exit(1)
net_vm = get_netvm_of_vm(clock_vm) net_vm = get_netvm_of_vm(clock_vm)
if verbose: if options.verbose:
print >> sys.stderr, '--> Waiting for network for ClockVM.' print >> sys.stderr, '--> Waiting for network for ClockVM.'
# Ignore retcode, try even if nm-online failed - user can setup network manually # Ignore retcode, try even if nm-online failed - user can setup network manually
# on-online has timeout 30sec by default # on-online has timeout 30sec by default
net_vm.run('nm-online -x', verbose=verbose, gui=False, wait=True, net_vm.run('nm-online -x', verbose=options.verbose, gui=False, wait=True,
ignore_stderr=True) ignore_stderr=True)
# Sync clock # Sync clock
if clock_vm.run('QUBESRPC qubes.SyncNtpClock dom0', user="root", if clock_vm.run('QUBESRPC qubes.SyncNtpClock dom0', user="root",
verbose=verbose, gui=False, wait=True, ignore_stderr=True) \ verbose=options.verbose, gui=False, wait=True, ignore_stderr=True) \
!= 0: != 0:
print >> sys.stderr, 'Time sync failed, aborting!' if options.force:
print >> sys.stderr, 'Time sync failed! - Syncing with dom0 ' \
'anyway as requested'
else:
print >> sys.stderr, 'Time sync failed! - Exiting'
sys.exit(1) sys.exit(1)
else:
# Use the date format based on RFC2822 to avoid localisation issues # Use the date format based on RFC2822 to avoid localisation issues
p = clock_vm.run('date -u -Iseconds', verbose=verbose, p = clock_vm.run('date -u -Iseconds', verbose=options.verbose,
gui=False, passio_popen=True, ignore_stderr=True) gui=False, passio_popen=True, ignore_stderr=True)
date_out = p.stdout.read(100) date_out = p.stdout.read(100)
date_out = date_out.strip() date_out = date_out.strip()
@ -99,18 +106,18 @@ def main():
sys.exit(1) sys.exit(1)
# Sync dom0 time # Sync dom0 time
if verbose: if options.verbose:
print >> sys.stderr, '--> Syncing dom0 clock.' print >> sys.stderr, '--> Syncing dom0 clock.'
subprocess.check_call(['sudo', 'date', '-u', '-Iseconds', '-s', date_out], subprocess.check_call(['sudo', 'date', '-u', '-Iseconds', '-s', date_out],
stdout=None if verbose else open(os.devnull, 'w')) stdout=None if options.verbose else open(os.devnull, 'w'))
subprocess.check_call(['sudo', 'hwclock', '--systohc'], subprocess.check_call(['sudo', 'hwclock', '--systohc'],
stdout=None if verbose else open(os.devnull, 'w')) stdout=None if options.verbose else open(os.devnull, 'w'))
# Sync other VMs clock # Sync other VMs clock
for vm in qvm_collection.values(): for vm in qvm_collection.values():
if vm.is_running() and vm.qid != 0 and vm.qid != clock_vm.qid: if vm.is_running() and vm.qid != 0 and vm.qid != clock_vm.qid:
if verbose: if options.verbose:
print >> sys.stderr, '--> Syncing \'%s\' clock.' % vm.name print >> sys.stderr, '--> Syncing \'%s\' clock.' % vm.name
try: try:
vm.run_service("qubes.SetDateTime", user="root", vm.run_service("qubes.SetDateTime", user="root",

View File

@ -27,6 +27,9 @@
%{!?version: %define version %(cat version)} %{!?version: %define version %(cat version)}
# debug_package hack should be removed when BuildArch:noarch is enabled below
%define debug_package %{nil}
%define _dracutmoddir /usr/lib/dracut/modules.d %define _dracutmoddir /usr/lib/dracut/modules.d
%if %{fedora} < 17 %if %{fedora} < 17
%define _dracutmoddir /usr/share/dracut/modules.d %define _dracutmoddir /usr/share/dracut/modules.d
@ -48,6 +51,9 @@ URL: http://www.qubes-os.org
# /bin -> usr/bin symlink). python*.rpm provides only /usr/bin/python. # /bin -> usr/bin symlink). python*.rpm provides only /usr/bin/python.
AutoReq: no AutoReq: no
# FIXME: Enable this and disable debug_package
#BuildArch: noarch
BuildRequires: ImageMagick BuildRequires: ImageMagick
BuildRequires: systemd-units BuildRequires: systemd-units
@ -61,7 +67,7 @@ Requires(preun): systemd-units
Requires(postun): systemd-units Requires(postun): systemd-units
Requires: python, pciutils, python-inotify, python-daemon Requires: python, pciutils, python-inotify, python-daemon
Requires: python-setuptools Requires: python-setuptools
Requires: qubes-core-dom0-linux >= 2.0.24 Requires: qubes-core-dom0-linux >= 3.1.8
Requires: qubes-db-dom0 Requires: qubes-db-dom0
Requires: python-lxml Requires: python-lxml
# TODO: R: qubes-gui-dom0 >= 2.1.11 # TODO: R: qubes-gui-dom0 >= 2.1.11
@ -333,8 +339,10 @@ fi
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.NotifyTools %attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.NotifyTools
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.NotifyUpdates %attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.NotifyUpdates
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.VMShell %attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.VMShell
%attr(0664,root,qubes) %config(noreplace) /etc/qubes-rpc/policy/qubes.GetRandomizedTime
/etc/qubes-rpc/qubes.NotifyTools /etc/qubes-rpc/qubes.NotifyTools
/etc/qubes-rpc/qubes.NotifyUpdates /etc/qubes-rpc/qubes.NotifyUpdates
/etc/qubes-rpc/qubes.GetRandomizedTime
%attr(2770,root,qubes) %dir /var/log/qubes %attr(2770,root,qubes) %dir /var/log/qubes
%attr(0770,root,qubes) %dir /var/run/qubes %attr(0770,root,qubes) %dir /var/run/qubes
/etc/xdg/autostart/qubes-guid.desktop /etc/xdg/autostart/qubes-guid.desktop

View File

@ -31,3 +31,7 @@ endif
cp storage.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp storage.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp storage_xen.py $(DESTDIR)$(PYTHON_TESTSPATH) cp storage_xen.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp storage_xen.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp storage_xen.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)

109
tests/extra.py Normal file
View File

@ -0,0 +1,109 @@
#!/usr/bin/python2 -O
# vim: fileencoding=utf-8
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2016
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import pkg_resources
import qubes.tests
import qubes.qubes
class ExtraTestMixin(qubes.tests.SystemTestsMixin):
template = None
def setUp(self):
super(ExtraTestMixin, self).setUp()
self.qc.unlock_db()
def create_vms(self, names):
"""
Create AppVMs for the duration of the test. Will be automatically
removed after completing the test.
:param names: list of VM names to create (each of them will be
prefixed with some test specific string)
:return: list of created VM objects
"""
self.qc.lock_db_for_writing()
self.qc.load()
if self.template:
template = self.qc.get_vm_by_name(self.template)
else:
template = self.qc.get_default_template()
for vmname in names:
vm = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name(vmname),
template=template)
vm.create_on_disk(verbose=False)
self.save_and_reload_db()
self.qc.unlock_db()
# get objects after reload
vms = []
for vmname in names:
vms.append(self.qc.get_vm_by_name(self.make_vm_name(vmname)))
return vms
def enable_network(self):
"""
Enable access to the network. Must be called before creating VMs.
"""
# nothing to do in core2
pass
def load_tests(loader, tests, pattern):
for entry in pkg_resources.iter_entry_points('qubes.tests.extra'):
for test_case in entry():
tests.addTests(loader.loadTestsFromTestCase(
type(
entry.name + '_' + test_case.__name__,
(test_case, ExtraTestMixin, qubes.tests.QubesTestCase),
{}
)
))
try:
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for entry in pkg_resources.iter_entry_points(
'qubes.tests.extra.for_template'):
for test_case in entry.load()():
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'{}_{}_{}'.format(
entry.name, test_case.__name__, template),
(test_case, ExtraTestMixin,
qubes.tests.QubesTestCase),
{'template': template}
)
))
return tests

75
tests/hardware.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/python2
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
import os
import qubes.tests
import time
import subprocess
from unittest import expectedFailure
class TC_00_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self):
super(TC_00_HVM, self).setUp()
self.vm = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
self.vm.create_on_disk(verbose=False)
@expectedFailure
def test_000_pci_passthrough_presence(self):
pcidev = os.environ.get('QUBES_TEST_PCIDEV', None)
if pcidev is None:
self.skipTest('Specify PCI device with QUBES_TEST_PCIDEV '
'environment variable')
self.vm.pcidevs = [pcidev]
self.vm.pci_strictreset = False
self.qc.save()
self.qc.unlock_db()
init_script = (
"#!/bin/sh\n"
"set -e\n"
"lspci -n > /dev/xvdb\n"
"poweroff\n"
)
self.prepare_hvm_system_linux(self.vm, init_script,
['/usr/sbin/lspci'])
self.vm.start()
timeout = 60
while timeout > 0:
if not self.vm.is_running():
break
time.sleep(1)
timeout -= 1
if self.vm.is_running():
self.fail("Timeout while waiting for VM shutdown")
with open(self.vm.storage.private_img, 'r') as f:
lspci_vm = f.read(512).strip('\0')
p = subprocess.Popen(['lspci', '-ns', pcidev], stdout=subprocess.PIPE)
(lspci_host, _) = p.communicate()
# strip BDF, as it is different in VM
pcidev_desc = ' '.join(lspci_host.strip().split(' ')[1:])
self.assertIn(pcidev_desc, lspci_vm)

View File

@ -33,6 +33,7 @@ import time
from qubes.qubes import QubesVmCollection, defaults, QubesException from qubes.qubes import QubesVmCollection, defaults, QubesException
import qubes.tests import qubes.tests
import re
TEST_DATA = "0123456789" * 1024 TEST_DATA = "0123456789" * 1024
@ -528,16 +529,123 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
self.fail("Timeout, probably deadlock") self.fail("Timeout, probably deadlock")
self.assertEqual(result.value, 0, "Service call failed") self.assertEqual(result.value, 0, "Service call failed")
@unittest.skipUnless(spawn.find_executable('xdotool'), def test_080_qrexec_service_argument_allow_default(self):
"xdotool not installed") """Qrexec service call with argument"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "argument\n")
def test_081_qrexec_service_argument_allow_specific(self):
"""Qrexec service call with argument - allow only specific value"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("$anyvm $anyvm deny")
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \
policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink,
"/etc/qubes-rpc/policy/test.Argument+argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "argument\n")
def test_082_qrexec_service_argument_deny_specific(self):
"""Qrexec service call with argument - deny specific value"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("$anyvm $anyvm allow")
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \
policy:
policy.write("%s %s deny" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink,
"/etc/qubes-rpc/policy/test.Argument+argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "")
self.assertEqual(p.returncode, 1, "Service request should be denied")
def test_083_qrexec_service_argument_specific_implementation(self):
"""Qrexec service call with argument - argument specific
implementatation"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $1")
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument",
user="root", passio_popen=True)
p.communicate("/bin/echo specific: $1")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "specific: argument\n")
def test_084_qrexec_service_argument_extra_env(self):
"""Qrexec service call with argument - extra env variables"""
self.testvm1.start()
self.testvm2.start()
p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
passio_popen=True)
p.communicate("/bin/echo $QREXEC_SERVICE_FULL_NAME "
"$QREXEC_SERVICE_ARGUMENT")
with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
"test.Argument+argument".format(self.testvm2.name),
passio_popen=True)
(stdout, stderr) = p.communicate()
self.assertEqual(stdout, "test.Argument+argument argument\n")
def test_100_qrexec_filecopy(self): def test_100_qrexec_filecopy(self):
self.testvm1.start() self.testvm1.start()
self.testvm2.start() self.testvm2.start()
self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
self.testvm2.name)
p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
self.testvm2.name, passio_popen=True, self.testvm2.name, passio_popen=True,
passio_stderr=True) passio_stderr=True)
# Confirm transfer
self.enter_keys_in_window('Question', ['y'])
p.wait() p.wait()
self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" % self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
p.stderr.read()) p.stderr.read())
@ -547,15 +655,34 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
wait=True) wait=True)
self.assertEqual(retcode, 0, "file differs") self.assertEqual(retcode, 0, "file differs")
@unittest.skipUnless(spawn.find_executable('xdotool'), def test_105_qrexec_filemove(self):
"xdotool not installed") self.testvm1.start()
self.testvm2.start()
self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
self.testvm2.name)
retcode = self.testvm1.run("cp /etc/passwd passwd", wait=True)
assert retcode == 0, "Failed to prepare source file"
p = self.testvm1.run("qvm-move-to-vm %s passwd" %
self.testvm2.name, passio_popen=True,
passio_stderr=True)
p.wait()
self.assertEqual(p.returncode, 0, "qvm-move-to-vm failed: %s" %
p.stderr.read())
retcode = self.testvm2.run("diff /etc/passwd "
"/home/user/QubesIncoming/{}/passwd".format(
self.testvm1.name),
wait=True)
self.assertEqual(retcode, 0, "file differs")
retcode = self.testvm1.run("test -f passwd", wait=True)
self.assertEqual(retcode, 1, "source file not removed")
def test_101_qrexec_filecopy_with_autostart(self): def test_101_qrexec_filecopy_with_autostart(self):
self.testvm1.start() self.testvm1.start()
self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
self.testvm2.name)
p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
self.testvm2.name, passio_popen=True, self.testvm2.name, passio_popen=True,
passio_stderr=True) passio_stderr=True)
# Confirm transfer
self.enter_keys_in_window('Question', ['y'])
p.wait() p.wait()
self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" % self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
p.stderr.read()) p.stderr.read())
@ -570,15 +697,13 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
wait=True) wait=True)
self.assertEqual(retcode, 0, "file differs") self.assertEqual(retcode, 0, "file differs")
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_110_qrexec_filecopy_deny(self): def test_110_qrexec_filecopy_deny(self):
self.testvm1.start() self.testvm1.start()
self.testvm2.start() self.testvm2.start()
self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
self.testvm2.name, allow=False)
p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
self.testvm2.name, passio_popen=True) self.testvm2.name, passio_popen=True)
# Deny transfer
self.enter_keys_in_window('Question', ['n'])
p.wait() p.wait()
self.assertNotEqual(p.returncode, 0, "qvm-copy-to-vm unexpectedly " self.assertNotEqual(p.returncode, 0, "qvm-copy-to-vm unexpectedly "
"succeeded") "succeeded")
@ -590,15 +715,13 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
@unittest.skip("Xen gntalloc driver crashes when page is mapped in the " @unittest.skip("Xen gntalloc driver crashes when page is mapped in the "
"same domain") "same domain")
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_120_qrexec_filecopy_self(self): def test_120_qrexec_filecopy_self(self):
self.testvm1.start() self.testvm1.start()
self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
self.testvm1.name)
p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" % p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
self.testvm1.name, passio_popen=True, self.testvm1.name, passio_popen=True,
passio_stderr=True) passio_stderr=True)
# Confirm transfer
self.enter_keys_in_window('Question', ['y'])
p.wait() p.wait()
self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" % self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
p.stderr.read()) p.stderr.read())
@ -613,6 +736,8 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
def test_130_qrexec_filemove_disk_full(self): def test_130_qrexec_filemove_disk_full(self):
self.testvm1.start() self.testvm1.start()
self.testvm2.start() self.testvm2.start()
self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
self.testvm2.name)
# Prepare test file # Prepare test file
prepare_cmd = ("yes teststring | dd of=testfile bs=1M " prepare_cmd = ("yes teststring | dd of=testfile bs=1M "
"count=50 iflag=fullblock") "count=50 iflag=fullblock")
@ -633,8 +758,6 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
p = self.testvm1.run("qvm-move-to-vm %s testfile" % p = self.testvm1.run("qvm-move-to-vm %s testfile" %
self.testvm2.name, passio_popen=True, self.testvm2.name, passio_popen=True,
passio_stderr=True) passio_stderr=True)
# Confirm transfer
self.enter_keys_in_window('Question', ['y'])
# Close GUI error message # Close GUI error message
self.enter_keys_in_window('Error', ['Return']) self.enter_keys_in_window('Error', ['Return'])
p.wait() p.wait()
@ -689,6 +812,10 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
self.assertEquals(retcode, 0, self.assertEquals(retcode, 0,
"qvm-sync-clock failed with code {}". "qvm-sync-clock failed with code {}".
format(retcode)) format(retcode))
# qvm-sync-clock is asynchronous - it spawns qubes.SetDateTime
# service, send it timestamp value and exists without waiting for
# actual time set
time.sleep(1)
(vm_time, _) = self.testvm1.run("date -u +%s", (vm_time, _) = self.testvm1.run("date -u +%s",
passio_popen=True).communicate() passio_popen=True).communicate()
self.assertAlmostEquals(int(vm_time), int(start_time), delta=30) self.assertAlmostEquals(int(vm_time), int(start_time), delta=30)
@ -733,7 +860,6 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
# some safety margin for FS metadata # some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 5.8*1024**2) self.assertGreater(int(new_size.strip()), 5.8*1024**2)
class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_create_start(self): def test_000_create_start(self):
testvm1 = self.qc.add_new_vm("QubesAppVm", testvm1 = self.qc.add_new_vm("QubesAppVm",
@ -1204,6 +1330,298 @@ class TC_40_PVGrub(qubes.tests.SystemTestsMixin):
(actual_kver, _) = p.communicate() (actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver) self.assertEquals(actual_kver.strip(), kver)
@unittest.skipUnless(
spawn.find_executable('xprop') and
spawn.find_executable('xdotool') and
spawn.find_executable('wmctrl'),
"xprop or xdotool or wmctrl not installed")
class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
@classmethod
def setUpClass(cls):
if cls.template == 'whonix-gw' or 'minimal' in cls.template:
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
if cls.template == 'whonix-ws':
# TODO remove when Whonix-based DispVMs will work (Whonix 13?)
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
qc = QubesVmCollection()
cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
qc.lock_db_for_writing()
qc.load()
cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
prefix=qubes.tests.CLSVMPREFIX)
cls.source_vmname = cls.make_vm_name('source', True)
source_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.source_vmname)
source_vm.create_on_disk(verbose=False)
cls.target_vmname = cls.make_vm_name('target', True)
target_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.target_vmname)
target_vm.create_on_disk(verbose=False)
qc.save()
qc.unlock_db()
source_vm.start()
target_vm.start()
# make sure that DispVMs will be started of the same template
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
cls.template],
stderr=open(os.devnull, 'w'))
assert retcode == 0, "Error preparing DispVM"
def setUp(self):
super(TC_50_MimeHandlers, self).setUp()
self.source_vm = self.qc.get_vm_by_name(self.source_vmname)
self.target_vm = self.qc.get_vm_by_name(self.target_vmname)
def get_window_class(self, winid, dispvm=False):
(vm_winid, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
stdout=subprocess.PIPE
).communicate()
vm_winid = vm_winid.split("#")[1].strip('\n" ')
if dispvm:
(vmname, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMNAME'],
stdout=subprocess.PIPE
).communicate()
vmname = vmname.split("=")[1].strip('\n" ')
window_class = None
while window_class is None:
# XXX to use self.qc.get_vm_by_name would require reloading
# qubes.xml, so use qvm-run instead
xprop = subprocess.Popen(
['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
vm_winid)], stdout=subprocess.PIPE)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
else:
window_class = None
while window_class is None:
xprop = self.target_vm.run(
'xprop -id {} WM_CLASS'.format(vm_winid),
passio_popen=True)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
# output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
try:
window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
except IndexError:
raise Exception(
"Unexpected output from xprop: '{}'".format(window_class))
return window_class
def open_file_and_check_viewer(self, filename, expected_app_titles,
expected_app_classes, dispvm=False):
self.qc.unlock_db()
if dispvm:
p = self.source_vm.run("qvm-open-in-dvm {}".format(filename),
passio_popen=True)
vmpattern = "disp*"
else:
self.qrexec_policy('qubes.Filecopy', self.source_vm.name,
self.target_vmname)
p = self.source_vm.run("qvm-open-in-vm {} {}".format(
self.target_vmname, filename), passio_popen=True)
vmpattern = self.target_vmname
wait_count = 0
winid = None
window_title = None
while True:
search = subprocess.Popen(['xdotool', 'search',
'--onlyvisible', '--class', vmpattern],
stdout=subprocess.PIPE,
stderr=open(os.path.devnull, 'w'))
retcode = search.wait()
if retcode == 0:
winid = search.stdout.read().strip()
# get window title
(window_title, _) = subprocess.Popen(
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
communicate()
window_title = window_title.strip()
# ignore LibreOffice splash screen and window with no title
# set yet
if window_title and not window_title.startswith("LibreOffice")\
and not window_title == 'VMapp command':
break
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for editor window")
time.sleep(0.3)
# get window class
window_class = self.get_window_class(winid, dispvm)
# close the window - we've got the window class, it is no longer needed
subprocess.check_call(['wmctrl', '-i', '-c', winid])
p.wait()
self.wait_for_window(window_title, show=False)
def check_matches(obj, patterns):
return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
else pat in obj) for pat in patterns)
if not check_matches(window_title, expected_app_titles) and \
not check_matches(window_class, expected_app_classes):
self.fail("Opening file {} resulted in window '{} ({})', which is "
"none of {!r} ({!r})".format(
filename, window_title, window_class,
expected_app_titles, expected_app_classes))
def prepare_txt(self, filename):
p = self.source_vm.run("cat > {}".format(filename), passio_popen=True)
p.stdin.write("This is test\n")
p.stdin.close()
retcode = p.wait()
assert retcode == 0, "Failed to write {} file".format(filename)
def prepare_pdf(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
assert retcode == 0, "Failed to run '{}'".format(cmd)
def prepare_doc(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_pptx(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_png(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_jpg(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def test_000_txt(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
self.open_file_and_check_viewer(filename, ["vim"],
["gedit", "emacs"])
def test_001_pdf(self):
filename = "/home/user/test_file.pdf"
self.prepare_pdf(filename)
self.open_file_and_check_viewer(filename, [],
["evince"])
def test_002_doc(self):
filename = "/home/user/test_file.doc"
self.prepare_doc(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice", "abiword"])
def test_003_pptx(self):
filename = "/home/user/test_file.pptx"
self.prepare_pptx(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice"])
def test_004_png(self):
filename = "/home/user/test_file.png"
self.prepare_png(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_005_jpg(self):
filename = "/home/user/test_file.jpg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_006_jpeg(self):
filename = "/home/user/test_file.jpeg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_100_txt_dispvm(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
self.open_file_and_check_viewer(filename, ["vim"],
["gedit", "emacs"],
dispvm=True)
def test_101_pdf_dispvm(self):
filename = "/home/user/test_file.pdf"
self.prepare_pdf(filename)
self.open_file_and_check_viewer(filename, [],
["evince"],
dispvm=True)
def test_102_doc_dispvm(self):
filename = "/home/user/test_file.doc"
self.prepare_doc(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice", "abiword"],
dispvm=True)
def test_103_pptx_dispvm(self):
filename = "/home/user/test_file.pptx"
self.prepare_pptx(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice"],
dispvm=True)
def test_104_png_dispvm(self):
filename = "/home/user/test_file.png"
self.prepare_png(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_105_jpg_dispvm(self):
filename = "/home/user/test_file.jpg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_106_jpeg_dispvm(self):
filename = "/home/user/test_file.jpeg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def load_tests(loader, tests, pattern): def load_tests(loader, tests, pattern):
try: try:
@ -1233,4 +1651,10 @@ def load_tests(loader, tests, pattern):
(TC_40_PVGrub, qubes.tests.QubesTestCase), (TC_40_PVGrub, qubes.tests.QubesTestCase),
{'template': template}))) {'template': template})))
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_50_MimeHandlers_' + template,
(TC_50_MimeHandlers, qubes.tests.QubesTestCase),
{'template': template})))
return tests return tests

View File

@ -1 +1 @@
3.1.11 3.2.0