Merge remote-tracking branch 'origin/master' into core3-devel

This commit is contained in:
Marek Marczykowski-Górecki 2016-08-08 00:11:46 +02:00
commit 4e022382a5
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
35 changed files with 1770 additions and 1135 deletions

View File

@ -202,7 +202,8 @@ class QubesVm(object):
'kernelopts', 'services', 'installed_by_rpm',\
'uses_default_netvm', 'include_in_backups', 'debug',\
'qrexec_timeout', 'autostart', 'uses_default_dispvm_netvm',
'backup_content', 'backup_size', 'backup_path', 'pool_name' ]:
'backup_content', 'backup_size', 'backup_path', 'pool_name',\
'pci_e820_host']:
attrs[prop]['save'] = lambda prop=prop: str(getattr(self, prop))
# Simple paths
for prop in ['conf_file', 'firewall_conf']:
@ -966,7 +967,7 @@ class QubesVm(object):
uuid = self.uuid
start_time = vmm.xs.read('', "/vm/%s/start_time" % str(uuid))
if start_time != '':
if start_time:
return datetime.datetime.fromtimestamp(float(start_time))
else:
return None
@ -1414,7 +1415,8 @@ class QubesVm(object):
raise
if os.path.exists("/etc/systemd/system/multi-user.target.wants/qubes-vm@" + self.name + ".service"):
subprocess.call(["sudo", "systemctl", "-q", "disable","qubes-vm@" + self.name + ".service"])
retcode = subprocess.call(["sudo", "systemctl", "-q", "disable",
"qubes-vm@" + self.name + ".service"])
if retcode != 0:
raise QubesException("Failed to delete autostart entry for VM")
@ -1690,13 +1692,14 @@ class QubesVm(object):
localcmd=localcmd, user=user, wait=wait, gui=gui)
elif input:
p = self.run("QUBESRPC %s %s" % (service, source),
user=user, wait=wait, gui=gui, passio_popen=True)
user=user, wait=wait, gui=gui, passio_popen=True,
passio_stderr=True)
p.communicate(input)
return p.returncode
else:
return self.run("QUBESRPC %s %s" % (service, source),
passio_popen=passio_popen, user=user, wait=wait,
gui=gui)
gui=gui, passio_stderr=passio_popen)
def attach_network(self, verbose = False, wait = True, netvm = None):
self.log.debug('attach_network(netvm={!r})'.format(netvm))
@ -2054,15 +2057,6 @@ class QubesVm(object):
if not self.is_running():
raise QubesException ("VM already stopped!")
# try to gracefully detach PCI devices before shutdown, to mitigate
# timeouts on forcible detach at domain destroy; if that fails, too bad
try:
for pcidev in self.pcidevs:
self.libvirt_domain.detachDevice(self._format_pci_dev(pcidev))
except libvirt.libvirtError as e:
print >>sys.stderr, "WARNING: {}, continuing VM shutdown " \
"anyway".format(str(e))
self.libvirt_domain.shutdown()
def force_shutdown(self, xid = None):

View File

@ -203,7 +203,30 @@ class QubesHVm(QubesResizableVm):
else:
raise QubesException("Needs qrexec agent installed in VM to use this function. See also qvm-prefs.")
@property
def stubdom_xid(self):
if self.xid < 0:
return -1
if vmm.xs is None:
return -1
stubdom_xid_str = vmm.xs.read('', '/local/domain/%d/image/device-model-domid' % self.xid)
if stubdom_xid_str is not None:
return int(stubdom_xid_str)
else:
return -1
def validate_drive_path(self, drive):
drive_type, drive_domain, drive_path = drive.split(':', 2)
if drive_domain == 'dom0':
if not os.path.exists(drive_path):
raise QubesException("Invalid drive path '{}'".format(
drive_path))
def start(self, *args, **kwargs):
if self.drive:
self.validate_drive_path(self.drive)
# make it available to storage.prepare_for_vm_startup, which is
# called before actually building VM libvirt configuration
self.storage.drive = self.drive

View File

@ -49,6 +49,9 @@ import xen.lowlevel.xs
AVAILABLE_FRONTENDS = ['xvd'+c for c in
string.lowercase[8:]+string.lowercase[:8]]
class USBProxyNotInstalled(QubesException):
pass
def mbytes_to_kmg(size):
if size > 1024:
return "%d GiB" % (size/1024)
@ -580,9 +583,14 @@ def usb_attach(qvmc, vm, device, auto_detach=False, wait=True):
p = vm.run_service('qubes.USBAttach', passio_popen=True, user='root')
(stdout, stderr) = p.communicate(
'{} {}\n'.format(device['vm'].name, device['device']))
if p.returncode != 0:
if p.returncode == 127:
raise USBProxyNotInstalled(
"qubes-usb-proxy not installed in the VM")
elif p.returncode != 0:
# TODO: sanitize and include stdout
raise QubesException('Device attach failed')
sanitized_stderr = ''.join([c for c in stderr if ord(c) >= 0x20])
raise QubesException('Device attach failed: {}'.format(
sanitized_stderr))
finally:
# FIXME: there is a race condition here - some other process might
# modify the file in the meantime. This may result in unexpected
@ -658,6 +666,7 @@ class QubesWatch(object):
# which can just remove the domain
if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
pass
else:
raise
# and for dom0
self._register_watches(None)
@ -689,6 +698,10 @@ class QubesWatch(object):
return '/local/domain/%s/memory/meminfo' % xid
def _register_watches(self, libvirt_domain):
if libvirt_domain and libvirt_domain.ID() == 0:
# don't use libvirt object for dom0, to always have the same
# hardcoded "dom0" name
libvirt_domain = None
if libvirt_domain:
name = libvirt_domain.name()
if name in self._qdb:
@ -709,6 +722,8 @@ class QubesWatch(object):
return
else:
name = "dom0"
if name in self._qdb:
return
self._qdb[name] = QubesDB(name)
try:
self._qdb[name].watch('/qubes-block-devices')
@ -729,6 +744,9 @@ class QubesWatch(object):
self._register_watches(libvirt_domain)
def _unregister_watches(self, libvirt_domain):
if libvirt_domain and libvirt_domain.ID() == 0:
name = "dom0"
else:
name = libvirt_domain.name()
if name in self._qdb_events:
libvirt.virEventRemoveHandle(self._qdb_events[name])

View File

@ -150,7 +150,7 @@ class QfileDaemonDvm:
return self.do_get_dvm()
@staticmethod
def remove_disposable_from_qdb(name):
def finish_disposable(name):
qvm_collection = QubesVmCollection()
qvm_collection.lock_db_for_writing()
qvm_collection.load()
@ -158,6 +158,12 @@ class QfileDaemonDvm:
if vm is None:
qvm_collection.unlock_db()
return False
try:
vm.force_shutdown()
except QubesException:
# VM already destroyed
pass
qvm_collection.pop(vm.qid)
qvm_collection.save()
qvm_collection.unlock_db()
@ -165,6 +171,10 @@ class QfileDaemonDvm:
def main():
exec_index = sys.argv[1]
if exec_index == "FINISH":
QfileDaemonDvm.finish_disposable(sys.argv[2])
return
src_vmname = sys.argv[2]
user = sys.argv[3]
# accessed directly by get_dvm()
@ -177,15 +187,14 @@ def main():
qfile = QfileDaemonDvm(src_vmname)
dispvm = qfile.get_dvm()
if dispvm is not None:
if exec_index == "LAUNCH":
print dispvm.name
return
print >>sys.stderr, "time=%s, starting VM process" % (str(time.time()))
subprocess.call(['/usr/lib/qubes/qrexec-client', '-d', dispvm.name,
user+':exec /usr/lib/qubes/qubes-rpc-multiplexer ' +
exec_index + " " + src_vmname])
try:
dispvm.force_shutdown()
except QubesException:
# VM already destroyed
pass
qfile.remove_disposable_from_qdb(dispvm.name)
QfileDaemonDvm.finish_disposable(dispvm.name)
main()

View File

@ -74,8 +74,13 @@ fstype=`df --output=fstype $VMDIR | tail -n 1`
if [ "$fstype" = "tmpfs" ]; then
# bsdtar doesn't work on tmpfs because FS_IOC_FIEMAP ioctl isn't supported
# there
tar -cSf saved-cows.tar volatile.img
tar -cSf saved-cows.tar volatile.img || exit 1
else
bsdtar -cSf saved-cows.tar volatile.img
errors=`bsdtar -cSf saved-cows.tar volatile.img 2>&1`
if [ -n "$errors" ]; then
echo "Failed to create saved-cows.tar: $errors" >&2
rm -f saved-cows.tar
exit 1
fi
fi
echo "DVM savefile created successfully."

View File

@ -34,6 +34,11 @@ Options
:option:`--format`. All columns along with short descriptions can be listed
with :option:`--help-columns`.
.. option:: --raw-data
Output data in easy to parse format. Table header is skipped and columns are
separated by `|` character.
.. option:: --verbose, -v
Increase verbosity.

View File

@ -62,14 +62,26 @@ Options
.. option:: --colour-output=COLOUR, --color-output=COLOR
Mark the qube output with given ANSI colour (ie. "31" for red). The exact
apping of numbers to colours and styles depends of the particular terminal
mapping of numbers to colours and styles depends of the particular terminal
emulator.
Colouring can be disabled with :option:`--no-colour-output`.
.. option:: --colour-stderr=COLOUR, --color-stderr=COLOR
Mark the qube stderr with given ANSI colour (ie. "31" for red). The exact
mapping of numbers to colours and styles depends of the particular terminal
emulator.
Colouring can be disabled with :option:`--no-colour-stderr`.
.. option:: --no-colour-output, --no-color-output
Disable colouring the stdio.
Disable colouring the stdout.
.. option:: --no-colour-stderr, --no-color-stderr
Disable colouring the stderr.
.. option:: --filter-escape-chars

View File

@ -1,9 +1,5 @@
#!/bin/sh
if [ "`id -u`" != "0" ]; then
exec sudo $0 $*
fi
set -e
if ! echo $PATH | grep -q sbin; then
@ -24,5 +20,6 @@ if [ -e "$FILENAME" ]; then
exit 1
fi
umask 002
TOTAL_SIZE=$[ $ROOT_SIZE + $SWAP_SIZE + 512 ]
truncate -s ${TOTAL_SIZE}M "$FILENAME"

View File

@ -2271,5 +2271,7 @@ class BackupRestore(object):
self.log.error("*** Error while setting home directory owner")
shutil.rmtree(self.tmpdir)
self.log.info("-> Done. Please install updates for all the restored "
"templates.")
# vim:sw=4:et:

View File

@ -455,10 +455,15 @@ class SystemTestsMixin(object):
if not self.app.default_template:
self.skipTest('Default template required for testing networking')
default_netvm = self.host_app.default_netvm
# if testing Whonix Workstation based VMs, try to use sys-whonix instead
if self.app.default_template.name.startswith('whonix-ws'):
if 'sys-whonix' in self.host_app.domains:
default_netvm = self.host_app.domains['sys-whonix']
if default_netvm is None:
self.skipTest('Default netvm required')
if not default_netvm.is_running():
self.skipTest('Default netvm required to be running')
self.skipTest('VM {} required to be running'.format(
default_netvm.name))
# Add NetVM stub to qubes-test.xml matching the one on host.
# Keeping 'qid' the same is critical because IP addresses are
# calculated from it.
@ -670,7 +675,7 @@ class SystemTestsMixin(object):
# accessing window properties
self.wait_for_window(title)
command = ['xdotool', 'search', '--name', title,
'windowactivate',
'windowactivate', '--sync',
'key'] + keys
subprocess.check_call(command)
@ -819,6 +824,7 @@ class BackupTestsMixin(SystemTestsMixin):
testnet = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=template, provides_network=True, label='red')
testnet.create_on_disk()
testnet.features['services/ntpd'] = True
vms.append(testnet)
self.fill_image(testnet.volumes['private'].path, 20*1024*1024)

View File

@ -41,6 +41,32 @@ class TC_00_Backup(qubes.tests.BackupTestsMixin, qubes.tests.QubesTestCase):
self.restore_backup()
for vm in vms:
self.assertIn(vm.name, self.app.domains)
restored_vm = self.app.domains[vm.name]
for prop in ('name', 'kernel', 'uses_default_kernel',
'uses_default_netvm', 'memory', 'maxmem', 'kernelopts',
'uses_default_kernelopts', 'services', 'vcpus', 'pcidevs',
'include_in_backups', 'default_user', 'qrexec_timeout',
'autostart', 'pci_strictreset', 'pci_e820_host', 'debug',
'internal'):
if not hasattr(vm, prop):
continue
self.assertEquals(
getattr(vm, prop), getattr(restored_vm, prop),
"VM {} - property {} not properly restored".format(
vm.name, prop))
for prop in ('netvm', 'template', 'label'):
orig_value = getattr(vm, prop)
restored_value = getattr(restored_vm, prop)
if orig_value and restored_value:
self.assertEquals(orig_value.name, restored_value.name,
"VM {} - property {} not properly restored".format(
vm.name, prop))
else:
self.assertEquals(orig_value, restored_value,
"VM {} - property {} not properly restored".format(
vm.name, prop))
self.remove_vms(vms)
def test_001_compressed_backup(self):
vms = self.create_backup_vms()

View File

@ -170,14 +170,20 @@ class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesT
def create_volatile_img(self, filename):
self.create_sparse(filename, 11.5*2**30)
sfdisk_input="0,1024,S\n,10240,L\n"
p = subprocess.Popen(["/usr/sbin/sfdisk", "--no-reread", "-u",
"M",
filename], stdout=open("/dev/null","w"),
stderr=subprocess.STDOUT, stdin=subprocess.PIPE)
p.communicate(input=sfdisk_input)
self.assertEqual(p.returncode, 0, "sfdisk failed with code %d" % p
.returncode)
# here used to be sfdisk call with "0,1024,S\n,10240,L\n" input,
# but since sfdisk folks like to change command arguments in
# incompatible way, have an partition table verbatim here
ptable = (
'\x00\x00\x00\x00\x00\x00\x00\x00\xab\x39\xd5\xd4\x00\x00\x20\x00'
'\x00\x21\xaa\x82\x82\x28\x08\x00\x00\x00\x00\x00\x00\x20\xaa\x00'
'\x82\x29\x15\x83\x9c\x79\x08\x00\x00\x20\x00\x00\x01\x40\x00\x00'
'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xaa\x55'
)
with open(filename, 'r+') as f:
f.seek(0x1b0)
f.write(ptable)
# TODO: mkswap
def fullpath(self, name):
@ -430,6 +436,7 @@ class TC_00_BackupCompatibility(qubes.tests.BackupTestsMixin, qubes.tests.QubesT
self.restore_backup(self.backupdir, options={
'use-default-template': True,
'use-default-netvm': True,
})
with self.assertNotRaises(KeyError):
vm = self.app.domains["test-template-clone"]

View File

@ -22,6 +22,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
from distutils import spawn
import os
import subprocess
@ -433,200 +434,121 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
self.setup_hvm_template()
self._do_test()
class TC_04_DispVM(qubes.tests.SystemTestsMixin,
qubes.tests.QubesTestCase):
def setUp(self):
self.skipTest('DisposableVMs not implemented in core3 yet')
super(TC_04_DispVM, self).setUp()
self.init_default_template()
disp_tpl = self.host_app.domains[self.get_dispvm_template_name()]
self.disp_tpl = self.app.add_new_vm(disp_tpl.__class__,
name=disp_tpl.name,
template=disp_tpl.template,
label='red')
self.app.save()
@staticmethod
def get_dispvm_template_name():
# FIXME: currently qubes.xml doesn't contain this information...
vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
return os.path.basename(vmdir)
def test_000_firewall_propagation(self):
"""
Check firewall propagation VM->DispVM, when VM have some firewall rules
"""
testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
template=self.app.default_template,
@unittest.skip('test not converted to core3 API')
class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
@unittest.skipUnless(
spawn.find_executable('xdotool'),
"xdotool not installed")
def test_000_clipboard(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
label='red')
testvm1.create_on_disk()
firewall = testvm1.get_firewall_conf()
firewall['allowDns'] = False
firewall['allowYumProxy'] = False
firewall['rules'] = [{'address': '1.2.3.4',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
testvm1.write_firewall_conf(firewall)
self.app.save()
template=self.qc.get_default_template())
testvm1.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm2'),
template=self.qc.get_default_template())
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.start()
testvm2.start()
p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
" read x'",
passio_popen=True)
window_title = 'user@{}'.format(testvm1.name)
testvm1.run('zenity --text-info --editable --title={}'.format(
window_title))
dispvm_name = p.stdout.readline().strip()
self.reload_db()
dispvm = self.app.domains[dispvm_name]
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
dispvm_name))
# check if firewall was propagated to the DispVM
self.assertEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
# and only there (#1608)
self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf())
# then modify some rule
firewall = dispvm.get_firewall_conf()
firewall['rules'] = [{'address': '4.3.2.1',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
dispvm.write_firewall_conf(firewall)
# and check again if wasn't saved anywhere else (#1608)
self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf())
self.assertNotEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
p.stdin.write('\n')
self.wait_for_window(window_title)
time.sleep(0.5)
test_string = "test{}".format(testvm1.xid)
# Type and copy some text
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'windowactivate', '--sync',
'type', '{}'.format(test_string)])
# second xdotool call because type --terminator do not work (SEGV)
# additionally do not use search here, so window stack will be empty
# and xdotool will use XTEST instead of generating events manually -
# this will be much better - at least because events will have
# correct timestamp (so gui-daemon would not drop the copy request)
subprocess.check_call(['xdotool',
'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
'Escape'])
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, test_string,
"Clipboard copy operation failed - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source',
'r').read().strip()
self.assertEquals(clipboard_source, testvm1.name,
"Clipboard copy operation failed - owner")
# Then paste it to the other window
window_title = 'user@{}'.format(testvm2.name)
p = testvm2.run('zenity --entry --title={} > test.txt'.format(
window_title), passio_popen=True)
self.wait_for_window(window_title)
subprocess.check_call(['xdotool', 'key', '--delay', '100',
'ctrl+shift+v', 'ctrl+v', 'Return'])
p.wait()
def test_001_firewall_propagation(self):
"""
Check firewall propagation VM->DispVM, when VM have no firewall rules
"""
testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
template=self.app.default_template,
name=self.make_vm_name('vm1'),
label='red')
testvm1.create_on_disk()
self.app.save()
# And compare the result
(test_output, _) = testvm2.run('cat test.txt',
passio_popen=True).communicate()
self.assertEquals(test_string, test_output.strip())
original_firewall = None
if os.path.exists(self.disp_tpl.firewall_conf):
original_firewall = tempfile.TemporaryFile()
with open(self.disp_tpl.firewall_conf) as f:
original_firewall.write(f.read())
try:
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, "",
"Clipboard not wiped after paste - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
firewall = self.disp_tpl.get_firewall_conf()
firewall['allowDns'] = False
firewall['allowYumProxy'] = False
firewall['rules'] = [{'address': '1.2.3.4',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
self.disp_tpl.write_firewall_conf(firewall)
).strip()
self.assertEquals(clipboard_source, "",
"Clipboard not wiped after paste - owner")
class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_create_start(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
" read x'",
passio_popen=True)
dispvm_name = p.stdout.readline().strip()
self.reload_db()
dispvm = self.app.domains[dispvm_name]
self.assertIsNotNone(
dispvm, "DispVM {} not found in qubes.xml".format(dispvm_name))
# check if firewall was propagated to the DispVM from the right VM
self.assertEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
# and only there (#1608)
self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf())
# then modify some rule
firewall = dispvm.get_firewall_conf()
firewall['rules'] = [{'address': '4.3.2.1',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
dispvm.write_firewall_conf(firewall)
# and check again if wasn't saved anywhere else (#1608)
self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf())
self.assertNotEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
p.stdin.write('\n')
p.wait()
finally:
if original_firewall:
original_firewall.seek(0)
with open(self.disp_tpl.firewall_conf, 'w') as f:
f.write(original_firewall.read())
original_firewall.close()
else:
os.unlink(self.disp_tpl.firewall_conf)
def test_002_cleanup(self):
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
(stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
"echo ERROR\n")
self.assertEquals(p.returncode, 0)
lines = stdout.splitlines()
self.assertEqual(lines[0], "test")
dispvm_name = lines[1]
self.reload_db()
dispvm = self.app.domains[dispvm_name]
self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
dispvm_name))
def test_003_cleanup_destroyed(self):
"""
Check if DispVM is properly removed even if it terminated itself (#1660)
:return:
"""
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
p.stdin.write("qubesdb-read /name\n")
p.stdin.write("echo ERROR\n")
p.stdin.write("poweroff\n")
# do not close p.stdin on purpose - wait to automatic disconnect when
# domain is destroyed
timeout = 30
while timeout > 0:
if p.poll():
break
def test_100_resize_root_img(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
with self.assertRaises(QubesException):
testvm1.resize_root_img(20*1024**3)
testvm1.resize_root_img(20*1024**3, allow_start=True)
timeout = 60
while testvm1.is_running():
time.sleep(1)
timeout -= 1
# includes check for None - timeout
self.assertEquals(p.returncode, 0)
lines = p.stdout.read().splitlines()
dispvm_name = lines[0]
self.assertNotEquals(dispvm_name, "ERROR")
self.reload_db()
dispvm = self.app.domains[dispvm_name]
self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
dispvm_name))
if timeout == 0:
self.fail("Timeout while waiting for VM shutdown")
self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3)
testvm1.start()
p = testvm1.run('df --output=size /|tail -n 1',
passio_popen=True)
# new_size in 1k-blocks
(new_size, _) = p.communicate()
# some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 19*1024**2)
# vim: ts=4 sw=4 et

View File

@ -99,6 +99,10 @@ enabled = 1
def setUp(self):
super(TC_00_Dom0UpgradeMixin, self).setUp()
if self.template.startswith('whonix-'):
# Whonix redirect all the traffic through tor, so repository
# on http://localhost:8080/ is unavailable
self.skipTest("Test not supported for this template")
self.init_default_template(self.template)
self.updatevm = self.app.add_new_vm(
qubes.vm.appvm.AppVM,
@ -260,6 +264,7 @@ Test package
open(self.update_flag_path, 'a').close()
# remove also repodata to test #1685
if os.path.exists('/var/lib/qubes/updates/repodata'):
shutil.rmtree('/var/lib/qubes/updates/repodata')
logpath = os.path.join(self.tmpdir, 'dom0-update-output.txt')
try:

View File

@ -58,6 +58,9 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
def setUp(self):
super(VmNetworkingMixin, self).setUp()
if self.template.startswith('whonix-'):
self.skipTest("Test not supported here - Whonix uses its own "
"firewall settings")
self.init_default_template(self.template)
self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('netvm1'),
@ -91,6 +94,8 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
run_netvm_cmd("ip link set test0 up")
run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT".format(self.test_ip))
# ignore failure
self.run_cmd(self.testnetvm, "killall --wait dnsmasq")
run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
ip=self.test_ip, name=self.test_name))
run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
@ -165,8 +170,8 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
# check for nm-applet presence
self.assertEqual(subprocess.call([
'xdotool', 'search', '--all', '--name',
'--class', '^(NetworkManager Applet|{})$'.format(self.proxy.name)],
'xdotool', 'search', '--class', '{}:nm-applet'.format(
self.proxy.name)],
stdout=open('/dev/null', 'w')), 0, "nm-applet window not found")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed (after NM reconnection")
@ -329,6 +334,19 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Spoofed ping should be blocked")
def test_100_late_xldevd_startup(self):
"""Regression test for #1990"""
# Simulater late xl devd startup
cmd = "systemctl stop xendriverdomain"
if self.run_cmd(self.testnetvm, cmd) != 0:
self.fail("Command '%s' failed" % cmd)
self.testvm1.start()
cmd = "systemctl start xendriverdomain"
if self.run_cmd(self.testnetvm, cmd) != 0:
self.fail("Command '%s' failed" % cmd)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
# noinspection PyAttributeOutsideInit
class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
@ -504,10 +522,10 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
p = self.netvm_repo.run(
"mkdir -p /tmp/apt-repo/dists/test && "
"cd /tmp/apt-repo/dists/test && "
"cat > Release <<EOF && "
"echo '' $(sha1sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}"
"cat > Release && "
"echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}"
" >> Release && "
"echo '' $(sha1sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}"
"echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}"
" >> Release"
.format(p="main/binary-amd64/Packages",
z="main/binary-amd64/Packages.gz"),
@ -517,11 +535,10 @@ class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
"Label: Test repo\n"
"Suite: test\n"
"Codename: test\n"
"Date: Tue, 27 Oct 2015 03:22:09 +0100\n"
"Date: Tue, 27 Oct 2015 03:22:09 UTC\n"
"Architectures: amd64\n"
"Components: main\n"
"SHA1:\n"
"EOF\n"
"SHA256:\n"
)
p.stdin.close()
if p.wait() != 0:

View File

@ -29,19 +29,46 @@ import sys
import qubes.tools
parser = qubes.tools.QubesArgumentParser(description=__doc__, vmname_nargs='+')
parser.add_argument("--running", action="store_true", dest="running",
default=False, help="Determine if (any of given) VM is running")
parser.add_argument("--paused", action="store_true", dest="paused",
default=False, help="Determine if (any of given) VM is paused")
parser.add_argument("--template", action="store_true", dest="template",
default=False, help="Determine if (any of given) VM is a template")
def print_msg(domains, what_single, what_plural):
if len(domains) == 0:
print("None of given VM {!s}".format(what_single))
if len(domains) == 1:
print("VM {!s} {!s}".format(domains[0], what_single))
else:
txt = ", ".join([vm.name for vm in domains])
print("VMs {!s} {!s}".format(txt, what_plural))
def main(args=None):
args = parser.parse_args(args)
domains = args.domains
if args.running:
running = [vm for vm in domains if vm.is_running()]
if args.verbose:
if len(domains) == 1:
print("VM {!s} exist".format(domains[0]))
print_msg(running, "is running", "are running")
return 0 if running else 1
elif args.paused:
paused = [vm for vm in domains if vm.is_paused()]
if args.verbose:
print_msg(paused, "is paused", "are running")
return 0 if paused else 1
elif args.template:
template = [vm for vm in domains if vm.is_template()]
if args.verbose:
print_msg(template, "is a template", "are templates")
return 0 if template else 1
else:
txt = ", ".join([vm.name for vm in domains]).strip()
msg = "VMs {!s} exist".format(txt)
print(msg.format(txt))
if args.verbose:
print_msg(domains, "exists", "exist")
return 0 if domains else 1
if __name__ == '__main__':
sys.exit(main())

View File

@ -463,9 +463,10 @@ class Table(object):
:param qubes.Qubes app: Qubes application object.
:param list colnames: Names of the columns (need not to be uppercase).
'''
def __init__(self, app, colnames):
def __init__(self, app, colnames, raw_data=False):
self.app = app
self.columns = tuple(Column.columns[col.upper()] for col in colnames)
self.raw_data = raw_data
def format_head(self):
@ -478,6 +479,9 @@ class Table(object):
def format_row(self, vm):
'''Format single table row (all columns for one domain).'''
if self.raw_data:
return '|'.join(col.format(vm) for col in self.columns)
else:
return ''.join(col.cell(vm) for col in self.columns)
@ -487,6 +491,7 @@ class Table(object):
:param file stream: Stream to write the table to.
'''
if not self.raw_data:
stream.write(self.format_head() + '\n')
for vm in self.app.domains:
stream.write(self.format_row(vm) + '\n')
@ -589,6 +594,10 @@ def get_parser():
help='user specified format (see available columns below)')
parser.add_argument('--raw-data', action='store_true',
help='Display specify data of specified VMs. Intended for '
'bash-parsing.')
# parser.add_argument('--conf', '-c',
# action='store', metavar='CFGFILE',
# help='Qubes config file')

View File

@ -44,6 +44,10 @@ def main(args=None): # pylint: disable=missing-docstring
args.app.save()
if not args.just_db:
vm.remove_from_disk()
else:
# normally it is done by vm.remove_from_disk(), but it isn't
# called in this case
vm.libvirt_domain.undefine()
return 0

View File

@ -59,10 +59,18 @@ parser.add_argument('--colour-output', '--color-output', metavar='COLOUR',
action='store', dest='color_output', default=None,
help='mark the qube output with given ANSI colour (ie. "31" for red)')
parser.add_argument('--colour-stderr', '--color-stderr', metavar='COLOUR',
action='store', dest='color_stderr', default=None,
help='mark the qube stderr with given ANSI colour (ie. "31" for red)')
parser.add_argument('--no-colour-output', '--no-color-output',
action='store_false', dest='color_output',
help='disable colouring the stdio')
parser.add_argument('--no-colour-stderr', '--no-color-stderr',
action='store_false', dest='color_stderr',
help='disable colouring the stderr')
parser.add_argument('--filter-escape-chars',
action='store_true', dest='filter_esc',
default=os.isatty(sys.stdout.fileno()),
@ -82,6 +90,9 @@ def main(args=None):
if args.color_output is None and args.filter_esc:
args.color_output = '31'
if args.color_output is None and os.isatty(sys.stderr.fileno()):
args.color_stderr = 31
if len(args.domains) > 1 and args.passio:
parser.error('--passio cannot be used when more than 1 qube is chosen')
if args.localcmd and not args.passio:
@ -97,6 +108,9 @@ def main(args=None):
if args.color_output:
sys.stdout.write('\033[0;{}m'.format(args.color_output))
sys.stdout.flush()
if args.color_stderr:
sys.stderr.write('\033[0;{}m'.format(args.color_stderr))
sys.stderr.flush()
try:
retcode = max(retcode, vm.run(args.cmd,
@ -117,6 +131,9 @@ def main(args=None):
if args.color_output:
sys.stdout.write('\033[0m')
sys.stdout.flush()
if args.color_stderr:
sys.stderr.write('\033[0m')
sys.stderr.flush()
return retcode

View File

@ -26,6 +26,7 @@
# TODO notification in tray
import argparse
import os
import sys
import qubes
@ -71,7 +72,7 @@ parser_drive.add_argument('--cdrom', metavar='IMAGE',
parser_drive.add_argument('--install-windows-tools',
action='store_const', dest='drive', default=False,
const='cdrom:dom0:/usr/lib/qubes/qubes-windows-tools.iso',
help='temporarily ttach Windows tools CDROM to the domain')
help='temporarily attach Windows tools CDROM to the domain')
parser.add_argument('--conf-file', metavar='FILE',
@ -120,6 +121,11 @@ def main(args=None):
if 'drive' not in (prop.__name__ for prop in vm.property_list()):
parser.error(
'domain {!r} does not support attaching drives'.format(vm.name))
else:
if args.drive == 'cdrom:dom0:/usr/lib/qubes/qubes-windows-tools.iso':
path = args.drive.split(':', 2)[2]
if not os.path.exists(path):
parser.error('qubes-windows-tools package not installed')
if args.conf_file is not None:
vm.conf_file = args.conf_file

View File

@ -755,19 +755,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.fire_event_pre('domain-pre-shutdown', force=force)
# try to gracefully detach PCI devices before shutdown, to mitigate
# timeouts on forcible detach at domain destroy; if that fails, too bad
for device in self.devices['pci']:
try:
self.libvirt_domain.detachDevice(self.app.env.get_template(
'libvirt/devices/pci.xml').render(device=device))
except libvirt.libvirtError as e:
self.log.warning(
'error while gracefully detaching PCI device ({!r}) during'
' shutdown of {!r}; error code: {!r}; continuing'
' anyway'.format(device, self.name, e.get_error_code()),
exc_info=1)
self.libvirt_domain.shutdown()
self.storage.stop()
@ -975,6 +962,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
# Internally use passio_popen, but do not return POpen object to
# the user - use internally for p.communicate()
passio_popen = True
passio_stderr = True
source = 'dom0' if source is None else self.app.domains[source].name

View File

@ -209,7 +209,7 @@ versions:
FIXLINK
---
" >> $HOME/$FILENAME.yml
" >> "$HOME/$FILENAME.yml"
if [[ "$SUPPORT_FILES" == 1 ]]
@ -217,7 +217,7 @@ if [[ "$SUPPORT_FILES" == 1 ]]
# cpio
cd $TEMP_DIR
find -print0 |cpio --quiet -o -H crc --null |gzip >$HOME/$FILENAME.cpio.gz
find -print0 | cpio --quiet -o -H crc --null | gzip > "$HOME/$FILENAME.cpio.gz"
cd
fi
@ -225,16 +225,16 @@ fi
if [[ "$COPY2VM" != "dom0" ]]
then
# Copy to VM
qvm-start -q $COPY2VM 2>/dev/null
qvm-start -q $COPY2VM 2> /dev/null
if [[ -f "$HOME/$FILENAME.cpio.gz" ]]
then
cat $HOME/$FILENAME.cpio.gz | qvm-run -a -q --pass-io $COPY2VM "cat >/home/user/$FILENAME.cpio.gz"
cat "$HOME/$FILENAME.cpio.gz" | qvm-run -a -q --pass-io $COPY2VM "cat > \"/home/user/$FILENAME.cpio.gz\""
fi
if [[ -f "$HOME/$FILENAME.yml" ]]
then
cat $HOME/$FILENAME.yml | qvm-run -a -q --pass-io $COPY2VM "cat >/home/user/$FILENAME.yml"
cat "$HOME/$FILENAME.yml" | qvm-run -a -q --pass-io $COPY2VM "cat > \"/home/user/$FILENAME.yml\""
fi
fi

View File

@ -41,39 +41,41 @@ def main():
parser.add_option ("-x", "--exclude", action="append",
dest="exclude_list", default=[],
help="Exclude the specified VM from backup (may be "
help="Exclude the specified VM from the backup (may be "
"repeated)")
parser.add_option ("--force-root", action="store_true", dest="force_root", default=False,
help="Force to run, even with root privileges")
help="Force to run with root privileges")
parser.add_option ("-d", "--dest-vm", action="store", dest="appvm",
help="The AppVM to send backups to (implies -e)")
help="Specify the destination VM to which the backup "
"will be sent (implies -e)")
parser.add_option ("-e", "--encrypt", action="store_true", dest="encrypt", default=False,
help="Encrypts the backup")
help="Encrypt the backup")
parser.add_option ("--no-encrypt", action="store_true",
dest="no_encrypt", default=False,
help="Skip encryption even if sending the backup to VM")
help="Skip encryption even if sending the backup to a "
"VM")
parser.add_option ("-p", "--passphrase-file", action="store",
dest="pass_file", default=None,
help="File containing the pass phrase to use, or '-' "
"to read it from stdin")
help="Read passphrase from a file, or use '-' to read "
"from stdin")
parser.add_option ("-E", "--enc-algo", action="store",
dest="crypto_algorithm", default=None,
help="Specify non-default encryption algorithm. For "
"list of supported algos execute 'openssl "
help="Specify a non-default encryption algorithm. For a "
"list of supported algorithms, execute 'openssl "
"list-cipher-algorithms' (implies -e)")
parser.add_option ("-H", "--hmac-algo", action="store",
dest="hmac_algorithm", default=None,
help="Specify non-default hmac algorithm. For list of "
"supported algos execute 'openssl "
help="Specify a non-default HMAC algorithm. For a list "
"of supported algorithms, execute 'openssl "
"list-message-digest-algorithms'")
parser.add_option ("-z", "--compress", action="store_true", dest="compress", default=False,
help="Compress the backup")
parser.add_option ("-Z", "--compress-filter", action="store",
dest="compress_filter", default=False,
help="Compress the backup using specified filter "
"program (default: gzip)")
help="Specify a non-default compression filter program "
"(default: gzip)")
parser.add_option("--tmpdir", action="store", dest="tmpdir", default=None,
help="Custom temporary directory (if you have at least "
help="Specify a temporary directory (if you have at least "
"1GB free RAM in dom0, use of /tmp is advised) ("
"default: /var/tmp)")
parser.add_option ("--debug", action="store_true", dest="debug",
@ -82,17 +84,21 @@ def main():
(options, args) = parser.parse_args ()
if (len (args) < 1):
print >> sys.stderr, "You must specify the target backup directory (e.g. /mnt/backup)"
print >> sys.stderr, "qvm-backup will create a subdirectory there for each individual backup."
print >> sys.stderr, "You must specify the target backup directory "\
" (e.g. /mnt/backup)."
print >> sys.stderr, "qvm-backup will create a subdirectory there for "\
" each individual backup."
exit (0)
base_backup_dir = args[0]
if hasattr(os, "geteuid") and os.geteuid() == 0:
if not options.force_root:
print >> sys.stderr, "*** Running this tool as root is strongly discouraged, this will lead you in permissions problems."
print >> sys.stderr, "Retry as unprivileged user."
print >> sys.stderr, "... or use --force-root to continue anyway."
print >> sys.stderr, "*** Running this tool as root is strongly "\
"discouraged. This will lead to permissions "\
"problems."
print >> sys.stderr, "Retry as an unprivileged user, or use "\
"--force-root to continue anyway."
exit(1)
# Only for locking
@ -136,14 +142,15 @@ def main():
backup_fs_free_sz = stat.f_bsize * stat.f_bavail
print
if (total_backup_sz > backup_fs_free_sz):
print >>sys.stderr, "ERROR: Not enough space available on the backup filesystem!"
print >>sys.stderr, "ERROR: Not enough space available on the "\
"backup filesystem!"
exit(1)
print "-> Available space: {0}".format(size_to_human(backup_fs_free_sz))
else:
appvm = qvm_collection.get_vm_by_name(options.appvm)
if appvm is None:
print >>sys.stderr, "ERROR: VM {0} does not exist".format(options.appvm)
print >>sys.stderr, "ERROR: VM {0} does not exist!".format(options.appvm)
exit(1)
stat = os.statvfs('/var/tmp')
@ -151,19 +158,19 @@ def main():
print
if (backup_fs_free_sz < 1000000000):
print >>sys.stderr, "ERROR: Not enough space available " \
"on the local filesystem (needs 1GB for temporary files)!"
"on the local filesystem (1GB required for temporary files)!"
exit(1)
if not appvm.is_running():
appvm.start(verbose=True)
if options.appvm:
print >>sys.stderr, ("WARNING: VM {} excluded because it's used to "
"store the backup.").format(options.appvm)
print >>sys.stderr, ("NOTE: VM {} will be excluded because it is "
"the backup destination.").format(options.appvm)
options.exclude_list.append(options.appvm)
if not options.encrypt:
print >>sys.stderr, "WARNING: encryption will not be used"
print >>sys.stderr, "WARNING: The backup will NOT be encrypted!"
if options.pass_file is not None:
f = open(options.pass_file) if options.pass_file != "-" else sys.stdin
@ -175,12 +182,12 @@ def main():
if raw_input("Do you want to proceed? [y/N] ").upper() != "Y":
exit(0)
s = ("Please enter the pass phrase that will be used to {}verify "
s = ("Please enter the passphrase that will be used to {}verify "
"the backup: ").format('encrypt and ' if options.encrypt else '')
passphrase = getpass.getpass(s)
if getpass.getpass("Enter again for verification: ") != passphrase:
print >>sys.stderr, "ERROR: Password mismatch"
print >>sys.stderr, "ERROR: Passphrase mismatch!"
exit(1)
encoding = sys.stdin.encoding or getpreferredencoding()

View File

@ -43,27 +43,31 @@ def main():
parser.add_option ("--verify-only", action="store_true",
dest="verify_only", default=False,
help="Do not restore the data, only verify backup "
"integrify.")
help="Verify backup integrity without restoring any "
"data")
parser.add_option ("--skip-broken", action="store_true", dest="skip_broken", default=False,
help="Do not restore VMs that have missing templates or netvms")
help="Do not restore VMs that have missing TemplateVMs "
"or NetVMs")
parser.add_option ("--ignore-missing", action="store_true", dest="ignore_missing", default=False,
help="Ignore missing templates or netvms, restore VMs anyway")
help="Restore VMs even if their associated TemplateVMs "
"and NetVMs are missing")
parser.add_option ("--skip-conflicting", action="store_true", dest="skip_conflicting", default=False,
help="Do not restore VMs that are already present on the host")
help="Do not restore VMs that are already present on "
"the host")
parser.add_option ("--rename-conflicting", action="store_true",
dest="rename_conflicting", default=False,
help="Restore VMs that are already present on the host under different name")
help="Restore VMs that are already present on the host "
"under different names")
parser.add_option ("--force-root", action="store_true", dest="force_root", default=False,
help="Force to run, even with root privileges")
help="Force to run with root privileges")
parser.add_option ("--replace-template", action="append", dest="replace_template", default=[],
help="Restore VMs using another template, syntax: "
help="Restore VMs using another TemplateVM; syntax: "
"old-template-name:new-template-name (may be "
"repeated)")
@ -71,20 +75,21 @@ def main():
help="Skip restore of specified VM (may be repeated)")
parser.add_option ("--skip-dom0-home", action="store_false", dest="dom0_home", default=True,
help="Do not restore dom0 user home dir")
help="Do not restore dom0 user home directory")
parser.add_option ("--ignore-username-mismatch", action="store_true", dest="ignore_username_mismatch", default=False,
help="Ignore dom0 username mismatch while restoring homedir")
help="Ignore dom0 username mismatch when restoring home "
"directory")
parser.add_option ("-d", "--dest-vm", action="store", dest="appvm",
help="The AppVM to send backups to")
help="Specify VM containing the backup to be restored")
parser.add_option ("-e", "--encrypted", action="store_true", dest="decrypt", default=False,
help="The backup is encrypted")
parser.add_option ("-p", "--passphrase-file", action="store",
dest="pass_file", default=None,
help="File containing the pass phrase to use, or '-' to read it from stdin")
help="Read passphrase from file, or use '-' to read from stdin")
parser.add_option ("-z", "--compressed", action="store_true", dest="compressed", default=False,
help="The backup is compressed")
@ -95,7 +100,8 @@ def main():
(options, args) = parser.parse_args ()
if (len (args) < 1):
print >> sys.stderr, "You must specify the backup directory (e.g. /mnt/backup/qubes-2010-12-01-235959)"
print >> sys.stderr, "You must specify the backup directory "\
"(e.g. /mnt/backup/qubes-2010-12-01-235959)"
exit (0)
backup_dir = args[0]
@ -141,7 +147,8 @@ def main():
if f is not sys.stdin:
f.close()
else:
passphrase = getpass.getpass("Please enter the pass phrase to decrypt/verify the backup: ")
passphrase = getpass.getpass("Please enter the passphrase to verify "
"and (if encrypted) decrypt the backup: ")
encoding = sys.stdin.encoding or getpreferredencoding()
passphrase = passphrase.decode(encoding)
@ -197,67 +204,93 @@ def main():
print
if hasattr(os, "geteuid") and os.geteuid() == 0:
print >> sys.stderr, "*** Running this tool as root is strongly discouraged, this will lead you in permissions problems."
print >> sys.stderr, "*** Running this tool as root is strongly "\
"discouraged. This will lead to permissions "\
"problems."
if options.force_root:
print >> sys.stderr, "Continuing as commanded. You have been warned."
print >> sys.stderr, "Continuing as commanded. You have been "\
"warned."
else:
print >> sys.stderr, "Retry as unprivileged user."
print >> sys.stderr, "... or use --force-root to continue anyway."
print >> sys.stderr, "Retry as an unprivileged user, or use "\
"--force-root to continue anyway."
exit(1)
if there_are_conflicting_vms:
print >> sys.stderr, "*** There VMs with conflicting names on the host! ***"
print >> sys.stderr, "*** There are VMs with conflicting names on the "\
"host! ***"
if options.skip_conflicting:
print >> sys.stderr, "Those VMs will not be restored, the host VMs will not be overwritten!"
print >> sys.stderr, "Those VMs will not be restored. The host "\
"VMs will NOT be overwritten."
else:
print >> sys.stderr, "Remove VMs with conflicting names from the host before proceeding."
print >> sys.stderr, "... or use --skip-conflicting to restore only those VMs that do not exist on the host."
print >> sys.stderr, "... or use --rename-conflicting to " \
"restore those VMs under modified " \
"name (with number at the end)"
print >> sys.stderr, "Remove VMs with conflicting names from the "\
"host before proceeding."
print >> sys.stderr, "Or use --skip-conflicting to restore only "\
"those VMs that do not exist on the host."
print >> sys.stderr, "Or use --rename-conflicting to restore " \
"those VMs under modified names (with "\
"numbers at the end)."
exit (1)
print "The above VMs will be copied and added to your system."
print "Exisiting VMs will not be removed."
print "Exisiting VMs will NOT be removed."
if there_are_missing_templates:
print >> sys.stderr, "*** One or more template VM is missing on the host! ***"
print >> sys.stderr, "*** One or more TemplateVMs are missing on the"\
"host! ***"
if not (options.skip_broken or options.ignore_missing):
print >> sys.stderr, "Install it first, before proceeding with backup restore."
print >> sys.stderr, "Or pass: --skip-broken or --ignore-missing switch."
print >> sys.stderr, "Install them before proceeding with the "\
"restore."
print >> sys.stderr, "Or pass: --skip-broken or --ignore-missing."
exit (1)
elif options.skip_broken:
print >> sys.stderr, "... VMs that depend on it will not be restored (--skip-broken used)"
print >> sys.stderr, "Skipping broken entries: VMs that depend on "\
"missing TemplateVMs will NOT be restored."
elif options.ignore_missing:
print >> sys.stderr, "... VMs that depend on it will be restored anyway (--ignore-missing used)"
print >> sys.stderr, "Ignoring missing entries: VMs that depend "\
"on missing TemplateVMs will NOT be restored."
else:
print >> sys.stderr, "INTERNAL ERROR?!"
print >> sys.stderr, "INTERNAL ERROR! Please report this to the "\
"Qubes OS team!"
exit (1)
if there_are_missing_netvms:
print >> sys.stderr, "*** One or more network VM is missing on the host! ***"
print >> sys.stderr, "*** One or more NetVMs are missing on the "\
"host! ***"
if not (options.skip_broken or options.ignore_missing):
print >> sys.stderr, "Install it first, before proceeding with backup restore."
print >> sys.stderr, "Or pass: --skip_broken or --ignore_missing switch."
print >> sys.stderr, "Install them before proceeding with the "\
"restore."
print >> sys.stderr, "Or pass: --skip-broken or --ignore-missing."
exit (1)
elif options.skip_broken:
print >> sys.stderr, "... VMs that depend on it will not be restored (--skip-broken used)"
print >> sys.stderr, "Skipping broken entries: VMs that depend on "\
"missing NetVMs will NOT be restored."
elif options.ignore_missing:
print >> sys.stderr, "... VMs that depend on it be restored anyway (--ignore-missing used)"
print >> sys.stderr, "Ignoring missing entries: VMs that depend "\
"on missing NetVMs will NOT be restored."
else:
print >> sys.stderr, "INTERNAL ERROR?!"
print >> sys.stderr, "INTERNAL ERROR! Please report this to the "\
"Qubes OS team!"
exit (1)
if 'dom0' in restore_info.keys() and options.dom0_home:
if dom0_username_mismatch:
print >> sys.stderr, "*** Dom0 username mismatch! This can break some settings ***"
print >> sys.stderr, "*** Dom0 username mismatch! This can break "\
"some settings! ***"
if not options.ignore_username_mismatch:
print >> sys.stderr, "Skip dom0 home restore (--skip-dom0-home)"
print >> sys.stderr, "Or pass: --ignore-username-mismatch to continue anyway"
print >> sys.stderr, "Skip restoring the dom0 home directory "\
"(--skip-dom0-home), or pass "\
"--ignore-username-mismatch to continue "\
"anyway."
exit(1)
else:
print >> sys.stderr, "Continuing as directed"
print >> sys.stderr, "While restoring user homedir, existing files/dirs will be backed up in 'home-pre-restore-<current-time>' dir"
print >> sys.stderr, "Continuing as directed."
print >> sys.stderr, "NOTE: Before restoring the dom0 home directory, "
"a new directory named "\
"'home-pre-restore-<current-time>' will be "\
"created inside the dom0 home directory. If any "\
"restored files conflict with existing files, "\
"the existing files will be moved to this new "\
"directory."
if options.pass_file is None:
if raw_input("Do you want to proceed? [y/N] ").upper() != "Y":

View File

@ -27,10 +27,17 @@ import sys
import time
def main():
usage = "usage: %prog [options] <vm-name>"
usage = """usage: %prog [options] <vm-name>\n
Specify no state options to check if VM exists"""
parser = OptionParser (usage)
parser.add_option ("-q", "--quiet", action="store_false", dest="verbose", default=True)
parser.add_option ("--running", action="store_true", dest="running", default=False,
help="Determine if VM is running")
parser.add_option ("--paused", action="store_true", dest="paused", default=False,
help="Determine if VM is paused")
parser.add_option ("--template", action="store_true", dest="template", default=False,
help="Determine if VM is a template")
(options, args) = parser.parse_args ()
if (len (args) != 1):
@ -48,6 +55,24 @@ def main():
print >> sys.stdout, "A VM with the name '{0}' does not exist in the system!".format(vmname)
exit(1)
elif options.running:
vm_state = not vm.is_running()
if options.verbose:
print >> sys.stdout, "A VM with the name {0} is {1}running.".format(vmname, "not " * vm_state)
exit(vm_state)
elif options.paused:
vm_state = not vm.is_paused()
if options.verbose:
print >> sys.stdout, "A VM with the name {0} is {1}paused.".format(vmname, "not " * vm_state)
exit(vm_state)
elif options.template:
vm_state = not vm.is_template()
if options.verbose:
print >> sys.stdout, "A VM with the name {0} is {1}a template.".format(vmname, "not " * vm_state)
exit(vm_state)
else:
if options.verbose:
print >> sys.stdout, "A VM with the name '{0}' does exist.".format(vmname)

View File

@ -101,7 +101,7 @@ def main():
gui=False, passio_popen=True, ignore_stderr=True)
date_out = p.stdout.read(100)
date_out = date_out.strip()
if not re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+0000$', date_out):
if not re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+00:?00$', date_out):
print >> sys.stderr, 'Invalid date output, aborting!'
sys.exit(1)

View File

@ -19,12 +19,22 @@ endif
cp basic.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp block.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp block.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp dispvm.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp dispvm.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp dom0_update.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp dom0_update.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp hvm.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp hvm.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp mime.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp mime.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp network.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp network.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp pvgrub.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp pvgrub.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp regressions.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp regressions.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp run.py $(DESTDIR)$(PYTHON_TESTSPATH)
@ -33,7 +43,7 @@ endif
cp storage.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp storage_file.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp storage_file.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp hardware.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp hardware.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp extra.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp extra.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp storage_xen.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp storage_xen.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)
cp vm_qrexec_gui.py $(DESTDIR)$(PYTHON_TESTSPATH)
cp vm_qrexec_gui.py[co] $(DESTDIR)$(PYTHON_TESTSPATH)

View File

@ -39,8 +39,11 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
name=self.make_vm_name("vm"),
template=self.qc.get_vm_by_name(self.template))
self.vm.create_on_disk(verbose=False)
self.save_and_reload_db()
self.qc.unlock_db()
self.vm.start()
else:
self.qc.unlock_db()
self.vm = self.qc[0]
def tearDown(self):
@ -200,6 +203,9 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.fail("Device {} not found in {!r}".format('test-dm', dev_list))
def test_013_list_dm_removed(self):
if self.template is None:
self.skipTest('test not supported in dom0 - loop devices excluded '
'in dom0')
self.run_script(
"set -e;"
"truncate -s 128M {path}; "

448
tests/dispvm.py Normal file
View File

@ -0,0 +1,448 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
from distutils import spawn
import qubes.tests
import subprocess
import tempfile
import unittest
import os
import time
class TC_04_DispVM(qubes.tests.SystemTestsMixin,
qubes.tests.QubesTestCase):
@staticmethod
def get_dispvm_template_name():
vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
return os.path.basename(vmdir)
def test_000_firewall_propagation(self):
"""
Check firewall propagation VM->DispVM, when VM have some firewall rules
"""
# FIXME: currently qubes.xml doesn't contain this information...
dispvm_template_name = self.get_dispvm_template_name()
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
template=self.qc.get_default_template())
testvm1.create_on_disk(verbose=False)
firewall = testvm1.get_firewall_conf()
firewall['allowDns'] = False
firewall['allowYumProxy'] = False
firewall['rules'] = [{'address': '1.2.3.4',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
testvm1.write_firewall_conf(firewall)
self.qc.save()
self.qc.unlock_db()
testvm1.start()
p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
" read x'",
passio_popen=True)
dispvm_name = p.stdout.readline().strip()
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
dispvm_name))
# check if firewall was propagated to the DispVM
self.assertEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
# and only there (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(),
dispvm.get_firewall_conf())
# then modify some rule
firewall = dispvm.get_firewall_conf()
firewall['rules'] = [{'address': '4.3.2.1',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
dispvm.write_firewall_conf(firewall)
# and check again if wasn't saved anywhere else (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(),
dispvm.get_firewall_conf())
self.assertNotEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
p.stdin.write('\n')
p.wait()
def test_001_firewall_propagation(self):
"""
Check firewall propagation VM->DispVM, when VM have no firewall rules
"""
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
template=self.qc.get_default_template())
testvm1.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
# FIXME: currently qubes.xml doesn't contain this information...
dispvm_template_name = self.get_dispvm_template_name()
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
original_firewall = None
if os.path.exists(dispvm_template.firewall_conf):
original_firewall = tempfile.TemporaryFile()
with open(dispvm_template.firewall_conf) as f:
original_firewall.write(f.read())
try:
firewall = dispvm_template.get_firewall_conf()
firewall['allowDns'] = False
firewall['allowYumProxy'] = False
firewall['rules'] = [{'address': '1.2.3.4',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
dispvm_template.write_firewall_conf(firewall)
testvm1.start()
p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
" read x'",
passio_popen=True)
dispvm_name = p.stdout.readline().strip()
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
dispvm_name))
# check if firewall was propagated to the DispVM from the right VM
self.assertEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
# and only there (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(),
dispvm.get_firewall_conf())
# then modify some rule
firewall = dispvm.get_firewall_conf()
firewall['rules'] = [{'address': '4.3.2.1',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
dispvm.write_firewall_conf(firewall)
# and check again if wasn't saved anywhere else (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(),
dispvm.get_firewall_conf())
self.assertNotEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf())
p.stdin.write('\n')
p.wait()
finally:
if original_firewall:
original_firewall.seek(0)
with open(dispvm_template.firewall_conf, 'w') as f:
f.write(original_firewall.read())
original_firewall.close()
else:
os.unlink(dispvm_template.firewall_conf)
def test_002_cleanup(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
(stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
"echo ERROR\n")
self.assertEquals(p.returncode, 0)
lines = stdout.splitlines()
self.assertEqual(lines[0], "test")
dispvm_name = lines[1]
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
dispvm_name))
def test_003_cleanup_destroyed(self):
"""
Check if DispVM is properly removed even if it terminated itself (#1660)
:return:
"""
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
p.stdin.write("qubesdb-read /name\n")
p.stdin.write("echo ERROR\n")
p.stdin.write("sudo poweroff\n")
# do not close p.stdin on purpose - wait to automatic disconnect when
# domain is destroyed
timeout = 30
while timeout > 0:
if p.poll():
break
time.sleep(1)
timeout -= 1
# includes check for None - timeout
self.assertEquals(p.returncode, 0)
lines = p.stdout.read().splitlines()
dispvm_name = lines[0]
self.assertNotEquals(dispvm_name, "ERROR")
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
dispvm_name))
class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
def test_000_prepare_dvm(self):
self.qc.unlock_db()
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
self.template],
stderr=open(os.devnull, 'w'))
self.assertEqual(retcode, 0)
self.qc.lock_db_for_writing()
self.qc.load()
self.assertIsNotNone(self.qc.get_vm_by_name(
self.template + "-dvm"))
# TODO: check mtime of snapshot file
def test_010_simple_dvm_run(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
(stdout, _) = p.communicate(input="echo test")
self.assertEqual(stdout, "test\n")
# TODO: check if DispVM is destroyed
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_020_gui_app(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
# wait for DispVM startup:
p.stdin.write("echo test\n")
p.stdin.flush()
l = p.stdout.readline()
self.assertEqual(l, "test\n")
# potential race condition, but our tests are supposed to be
# running on dedicated machine, so should not be a problem
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
max_qid = 0
for vm in self.qc.values():
if not vm.is_disposablevm():
continue
if vm.qid > max_qid:
max_qid = vm.qid
dispvm = self.qc[max_qid]
self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
self.assertTrue(dispvm.is_running())
try:
window_title = 'user@%s' % (dispvm.template.name + "-dvm")
p.stdin.write("xterm -e "
"\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
format(window_title))
self.wait_for_window(window_title)
time.sleep(0.5)
self.enter_keys_in_window(window_title, ['Return'])
# Wait for window to close
self.wait_for_window(window_title, show=False)
finally:
p.stdin.close()
wait_count = 0
while dispvm.is_running():
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for DispVM destruction")
time.sleep(0.1)
wait_count = 0
while p.poll() is None:
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for qfile-daemon-dvm "
"termination")
time.sleep(0.1)
self.assertEqual(p.returncode, 0)
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
"DispVM not removed from qubes.xml")
def _handle_editor(self, winid):
(window_title, _) = subprocess.Popen(
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
communicate()
window_title = window_title.strip().\
replace('(', '\(').replace(')', '\)')
time.sleep(1)
if "gedit" in window_title:
subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
'type', 'Test test 2'])
subprocess.check_call(['xdotool', 'key', '--window', winid,
'key', 'Return'])
time.sleep(0.5)
subprocess.check_call(['xdotool',
'key', 'ctrl+s', 'ctrl+q'])
elif "LibreOffice" in window_title:
# wait for actual editor (we've got splash screen)
search = subprocess.Popen(['xdotool', 'search', '--sync',
'--onlyvisible', '--all', '--name', '--class', 'disp*|Writer'],
stdout=subprocess.PIPE,
stderr=open(os.path.devnull, 'w'))
retcode = search.wait()
if retcode == 0:
winid = search.stdout.read().strip()
time.sleep(0.5)
subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
'type', 'Test test 2'])
subprocess.check_call(['xdotool', 'key', '--window', winid,
'key', 'Return'])
time.sleep(0.5)
subprocess.check_call(['xdotool',
'key', '--delay', '100', 'ctrl+s',
'Return', 'ctrl+q'])
elif "emacs" in window_title:
subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
'type', 'Test test 2'])
subprocess.check_call(['xdotool', 'key', '--window', winid,
'key', 'Return'])
time.sleep(0.5)
subprocess.check_call(['xdotool',
'key', 'ctrl+x', 'ctrl+s'])
subprocess.check_call(['xdotool',
'key', 'ctrl+x', 'ctrl+c'])
elif "vim" in window_title or "user@" in window_title:
subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
'key', 'i', 'type', 'Test test 2'])
subprocess.check_call(['xdotool', 'key', '--window', winid,
'key', 'Return'])
subprocess.check_call(
['xdotool',
'key', 'Escape', 'colon', 'w', 'q', 'Return'])
else:
self.fail("Unknown editor window: {}".format(window_title))
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_030_edit_file(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
template=self.qc.get_vm_by_name(
self.template))
testvm1.create_on_disk(verbose=False)
self.qc.save()
testvm1.start()
testvm1.run("echo test1 > /home/user/test.txt", wait=True)
self.qc.unlock_db()
p = testvm1.run("qvm-open-in-dvm /home/user/test.txt",
passio_popen=True)
wait_count = 0
winid = None
while True:
search = subprocess.Popen(['xdotool', 'search',
'--onlyvisible', '--class', 'disp*'],
stdout=subprocess.PIPE,
stderr=open(os.path.devnull, 'w'))
retcode = search.wait()
if retcode == 0:
winid = search.stdout.read().strip()
# get window title
(window_title, _) = subprocess.Popen(
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
communicate()
window_title = window_title.strip()
# ignore LibreOffice splash screen and window with no title
# set yet
if window_title and not window_title.startswith("LibreOffice")\
and not window_title == 'VMapp command':
break
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for editor window")
time.sleep(0.3)
time.sleep(0.5)
self._handle_editor(winid)
p.wait()
p = testvm1.run("cat /home/user/test.txt",
passio_popen=True)
(test_txt_content, _) = p.communicate()
# Drop BOM if added by editor
if test_txt_content.startswith('\xef\xbb\xbf'):
test_txt_content = test_txt_content[3:]
self.assertEqual(test_txt_content, "Test test 2\ntest1\n")
def load_tests(loader, tests, pattern):
try:
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_20_DispVM_' + template,
(TC_20_DispVMMixin, qubes.tests.QubesTestCase),
{'template': template})))
return tests

125
tests/hvm.py Normal file
View File

@ -0,0 +1,125 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
import qubes.tests
from qubes.qubes import QubesException
class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# TODO: test with some OS inside
# TODO: windows tools tests
def test_000_create_start(self):
testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
def test_010_create_start_template(self):
templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
templatevm.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
templatevm.start()
self.assertEquals(templatevm.get_power_state(), "Running")
def test_020_create_start_template_vm(self):
templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
templatevm.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm2'),
template=templatevm)
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm2.start()
self.assertEquals(testvm2.get_power_state(), "Running")
def test_030_prevent_simultaneus_start(self):
templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
templatevm.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm2'),
template=templatevm)
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
templatevm.start()
self.assertEquals(templatevm.get_power_state(), "Running")
self.assertRaises(QubesException, testvm2.start)
templatevm.force_shutdown()
testvm2.start()
self.assertEquals(testvm2.get_power_state(), "Running")
self.assertRaises(QubesException, templatevm.start)
def test_100_resize_root_img(self):
testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.resize_root_img(30*1024**3)
self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3)
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
# TODO: launch some OS there and check the size
def test_200_start_invalid_drive(self):
"""Regression test for #1619"""
testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False)
testvm1.drive = 'hd:dom0:/invalid'
self.qc.save()
self.qc.unlock_db()
try:
testvm1.start()
except Exception as e:
self.assertIsInstance(e, QubesException)
else:
self.fail('No exception raised')
def test_201_start_invalid_drive_cdrom(self):
"""Regression test for #1619"""
testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False)
testvm1.drive = 'cdrom:dom0:/invalid'
self.qc.save()
self.qc.unlock_db()
try:
testvm1.start()
except Exception as e:
self.assertIsInstance(e, QubesException)
else:
self.fail('No exception raised')

354
tests/mime.py Normal file
View File

@ -0,0 +1,354 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
from distutils import spawn
import os
import re
import subprocess
import time
import unittest
import qubes.tests
import qubes.qubes
from qubes.qubes import QubesVmCollection
@unittest.skipUnless(
spawn.find_executable('xprop') and
spawn.find_executable('xdotool') and
spawn.find_executable('wmctrl'),
"xprop or xdotool or wmctrl not installed")
class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
@classmethod
def setUpClass(cls):
if cls.template == 'whonix-gw' or 'minimal' in cls.template:
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
if cls.template == 'whonix-ws':
# TODO remove when Whonix-based DispVMs will work (Whonix 13?)
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
qc = QubesVmCollection()
cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
qc.lock_db_for_writing()
qc.load()
cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
prefix=qubes.tests.CLSVMPREFIX)
cls.source_vmname = cls.make_vm_name('source', True)
source_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.source_vmname)
source_vm.create_on_disk(verbose=False)
cls.target_vmname = cls.make_vm_name('target', True)
target_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.target_vmname)
target_vm.create_on_disk(verbose=False)
qc.save()
qc.unlock_db()
source_vm.start()
target_vm.start()
# make sure that DispVMs will be started of the same template
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
cls.template],
stderr=open(os.devnull, 'w'))
assert retcode == 0, "Error preparing DispVM"
def setUp(self):
super(TC_50_MimeHandlers, self).setUp()
self.source_vm = self.qc.get_vm_by_name(self.source_vmname)
self.target_vm = self.qc.get_vm_by_name(self.target_vmname)
def get_window_class(self, winid, dispvm=False):
(vm_winid, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
stdout=subprocess.PIPE
).communicate()
vm_winid = vm_winid.split("#")[1].strip('\n" ')
if dispvm:
(vmname, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMNAME'],
stdout=subprocess.PIPE
).communicate()
vmname = vmname.split("=")[1].strip('\n" ')
window_class = None
while window_class is None:
# XXX to use self.qc.get_vm_by_name would require reloading
# qubes.xml, so use qvm-run instead
xprop = subprocess.Popen(
['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
vm_winid)], stdout=subprocess.PIPE)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
else:
window_class = None
while window_class is None:
xprop = self.target_vm.run(
'xprop -id {} WM_CLASS'.format(vm_winid),
passio_popen=True)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
# output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
try:
window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
except IndexError:
raise Exception(
"Unexpected output from xprop: '{}'".format(window_class))
return window_class
def open_file_and_check_viewer(self, filename, expected_app_titles,
expected_app_classes, dispvm=False):
self.qc.unlock_db()
if dispvm:
p = self.source_vm.run("qvm-open-in-dvm {}".format(filename),
passio_popen=True)
vmpattern = "disp*"
else:
self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
self.target_vmname)
self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
self.target_vmname)
p = self.source_vm.run("qvm-open-in-vm {} {}".format(
self.target_vmname, filename), passio_popen=True)
vmpattern = self.target_vmname
wait_count = 0
winid = None
window_title = None
while True:
search = subprocess.Popen(['xdotool', 'search',
'--onlyvisible', '--class', vmpattern],
stdout=subprocess.PIPE,
stderr=open(os.path.devnull, 'w'))
retcode = search.wait()
if retcode == 0:
winid = search.stdout.read().strip()
# get window title
(window_title, _) = subprocess.Popen(
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
communicate()
window_title = window_title.strip()
# ignore LibreOffice splash screen and window with no title
# set yet
if window_title and not window_title.startswith("LibreOffice")\
and not window_title == 'VMapp command':
break
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for editor window")
time.sleep(0.3)
# get window class
window_class = self.get_window_class(winid, dispvm)
# close the window - we've got the window class, it is no longer needed
subprocess.check_call(['wmctrl', '-i', '-c', winid])
p.wait()
self.wait_for_window(window_title, show=False)
def check_matches(obj, patterns):
return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
else pat in obj) for pat in patterns)
if not check_matches(window_title, expected_app_titles) and \
not check_matches(window_class, expected_app_classes):
self.fail("Opening file {} resulted in window '{} ({})', which is "
"none of {!r} ({!r})".format(
filename, window_title, window_class,
expected_app_titles, expected_app_classes))
def prepare_txt(self, filename):
p = self.source_vm.run("cat > {}".format(filename), passio_popen=True)
p.stdin.write("This is test\n")
p.stdin.close()
retcode = p.wait()
assert retcode == 0, "Failed to write {} file".format(filename)
def prepare_pdf(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
assert retcode == 0, "Failed to run '{}'".format(cmd)
def prepare_doc(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_pptx(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_png(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_jpg(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def test_000_txt(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
self.open_file_and_check_viewer(filename, ["vim", "user@"],
["gedit", "emacs", "libreoffice"])
def test_001_pdf(self):
filename = "/home/user/test_file.pdf"
self.prepare_pdf(filename)
self.open_file_and_check_viewer(filename, [],
["evince"])
def test_002_doc(self):
filename = "/home/user/test_file.doc"
self.prepare_doc(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice", "abiword"])
def test_003_pptx(self):
filename = "/home/user/test_file.pptx"
self.prepare_pptx(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice"])
def test_004_png(self):
filename = "/home/user/test_file.png"
self.prepare_png(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_005_jpg(self):
filename = "/home/user/test_file.jpg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_006_jpeg(self):
filename = "/home/user/test_file.jpeg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_010_url(self):
self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
["Firefox", "Iceweasel", "Navigator"])
def test_100_txt_dispvm(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
self.open_file_and_check_viewer(filename, ["vim", "user@"],
["gedit", "emacs", "libreoffice"],
dispvm=True)
def test_101_pdf_dispvm(self):
filename = "/home/user/test_file.pdf"
self.prepare_pdf(filename)
self.open_file_and_check_viewer(filename, [],
["evince"],
dispvm=True)
def test_102_doc_dispvm(self):
filename = "/home/user/test_file.doc"
self.prepare_doc(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice", "abiword"],
dispvm=True)
def test_103_pptx_dispvm(self):
filename = "/home/user/test_file.pptx"
self.prepare_pptx(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice"],
dispvm=True)
def test_104_png_dispvm(self):
filename = "/home/user/test_file.png"
self.prepare_png(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_105_jpg_dispvm(self):
filename = "/home/user/test_file.jpg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_106_jpeg_dispvm(self):
filename = "/home/user/test_file.jpeg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_110_url_dispvm(self):
self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
["Firefox", "Iceweasel", "Navigator"],
dispvm=True)
def load_tests(loader, tests, pattern):
try:
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_50_MimeHandlers_' + template,
(TC_50_MimeHandlers, qubes.tests.QubesTestCase),
{'template': template})))
return tests

172
tests/pvgrub.py Normal file
View File

@ -0,0 +1,172 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2016 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
#
import os
import unittest
import qubes.tests
@unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'),
'grub-xen package not installed')
class TC_40_PVGrub(qubes.tests.SystemTestsMixin):
def setUp(self):
super(TC_40_PVGrub, self).setUp()
supported = False
if self.template.startswith('fedora-'):
supported = True
elif self.template.startswith('debian-'):
supported = True
if not supported:
self.skipTest("Template {} not supported by this test".format(
self.template))
def install_packages(self, vm):
if self.template.startswith('fedora-'):
cmd_install1 = 'dnf clean expire-cache && ' \
'dnf install -y qubes-kernel-vm-support grub2-tools'
cmd_install2 = 'dnf install -y kernel && ' \
'KVER=$(rpm -q --qf %{VERSION}-%{RELEASE}.%{ARCH} kernel) && ' \
'dnf install --allowerasing -y kernel-devel-$KVER && ' \
'dkms autoinstall -k $KVER'
cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg'
elif self.template.startswith('debian-'):
cmd_install1 = 'apt-get update && apt-get install -y ' \
'qubes-kernel-vm-support grub2-common'
cmd_install2 = 'apt-get install -y linux-image-amd64'
cmd_update_grub = 'mkdir /boot/grub && update-grub2'
else:
assert False, "Unsupported template?!"
for cmd in [cmd_install1, cmd_install2, cmd_update_grub]:
p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertEquals(p.returncode, 0,
"Failed command: {}\nSTDOUT: {}\nSTDERR: {}"
.format(cmd, stdout, stderr))
def get_kernel_version(self, vm):
if self.template.startswith('fedora-'):
cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \
'cut -d - -f 2-'
elif self.template.startswith('debian-'):
cmd_get_kernel_version = \
'dpkg-query --showformat=\'${Package}\\n\' --show ' \
'\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-'
else:
raise RuntimeError("Unsupported template?!")
p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True)
(kver, _) = p.communicate()
self.assertEquals(p.returncode, 0,
"Failed command: {}".format(cmd_get_kernel_version))
return kver.strip()
def test_000_standalone_vm(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_vm_by_name(
self.template))
self.save_and_reload_db()
self.qc.unlock_db()
testvm1 = self.qc[testvm1.qid]
testvm1.start()
self.install_packages(testvm1)
kver = self.get_kernel_version(testvm1)
self.shutdown_and_wait(testvm1)
self.qc.lock_db_for_writing()
self.qc.load()
testvm1 = self.qc[testvm1.qid]
testvm1.kernel = 'pvgrub2'
self.save_and_reload_db()
self.qc.unlock_db()
testvm1 = self.qc[testvm1.qid]
testvm1.start()
p = testvm1.run('uname -r', passio_popen=True)
(actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver)
def test_010_template_based_vm(self):
test_template = self.qc.add_new_vm("QubesTemplateVm",
template=None,
name=self.make_vm_name('template'))
test_template.clone_attrs(self.qc.get_vm_by_name(self.template))
test_template.clone_disk_files(
src_vm=self.qc.get_vm_by_name(self.template),
verbose=False)
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=test_template,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=test_template)
self.save_and_reload_db()
self.qc.unlock_db()
test_template = self.qc[test_template.qid]
testvm1 = self.qc[testvm1.qid]
test_template.start()
self.install_packages(test_template)
kver = self.get_kernel_version(test_template)
self.shutdown_and_wait(test_template)
self.qc.lock_db_for_writing()
self.qc.load()
test_template = self.qc[test_template.qid]
test_template.kernel = 'pvgrub2'
testvm1 = self.qc[testvm1.qid]
testvm1.kernel = 'pvgrub2'
self.save_and_reload_db()
self.qc.unlock_db()
# Check if TemplateBasedVM boots and has the right kernel
testvm1 = self.qc[testvm1.qid]
testvm1.start()
p = testvm1.run('uname -r', passio_popen=True)
(actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver)
# And the same for the TemplateVM itself
test_template = self.qc[test_template.qid]
test_template.start()
p = test_template.run('uname -r', passio_popen=True)
(actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver)
def load_tests(loader, tests, pattern):
try:
qc = qubes.qubes.QubesVmCollection()
qc.lock_db_for_reading()
qc.load()
qc.unlock_db()
templates = [vm.name for vm in qc.values() if
isinstance(vm, qubes.qubes.QubesTemplateVm)]
except OSError:
templates = []
for template in templates:
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_40_PVGrub_' + template,
(TC_40_PVGrub, qubes.tests.QubesTestCase),
{'template': template})))
return tests

View File

@ -78,4 +78,3 @@ class TC_00_Regressions(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase)
p.stdin.close()
self.assertTrue(dispvm_name.startswith("disp"),
"Try {} failed".format(try_no))

View File

@ -127,7 +127,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
time.sleep(0.5)
subprocess.check_call(
['xdotool', 'search', '--name', title,
'windowactivate', 'type', 'exit\n'])
'windowactivate', '--sync', 'type', 'exit\n'])
wait_count = 0
while subprocess.call(['xdotool', 'search', '--name', title],
@ -168,7 +168,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
time.sleep(0.5)
subprocess.check_call(
['xdotool', 'search', '--name', title,
'windowactivate', 'type', 'exit\n'])
'windowactivate', '--sync', 'type', 'exit\n'])
wait_count = 0
while subprocess.call(['xdotool', 'search', '--name', title],
@ -785,6 +785,8 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
def test_210_time_sync(self):
"""Test time synchronization mechanism"""
if self.template.startswith('whonix-'):
self.skipTest('qvm-sync-clock disabled for Whonix VMs')
self.testvm1.start()
self.testvm2.start()
(start_time, _) = subprocess.Popen(["date", "-u", "+%s"],
@ -860,778 +862,150 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
# some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 5.8*1024**2)
class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_create_start(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
def test_100_resize_root_img(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
with self.assertRaises(QubesException):
testvm1.resize_root_img(20*1024**3)
testvm1.resize_root_img(20*1024**3, allow_start=True)
timeout = 60
while testvm1.is_running():
time.sleep(1)
timeout -= 1
if timeout == 0:
self.fail("Timeout while waiting for VM shutdown")
self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3)
testvm1.start()
p = testvm1.run('df --output=size /|tail -n 1',
passio_popen=True)
# new_size in 1k-blocks
(new_size, _) = p.communicate()
# some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 19*1024**2)
class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# TODO: test with some OS inside
# TODO: windows tools tests
def test_000_create_start(self):
testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
def test_010_create_start_template(self):
templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
templatevm.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
templatevm.start()
self.assertEquals(templatevm.get_power_state(), "Running")
def test_020_create_start_template_vm(self):
templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
templatevm.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm2'),
template=templatevm)
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm2.start()
self.assertEquals(testvm2.get_power_state(), "Running")
def test_030_prevent_simultaneus_start(self):
templatevm = self.qc.add_new_vm("QubesTemplateHVm",
name=self.make_vm_name('template'))
templatevm.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm2'),
template=templatevm)
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
templatevm.start()
self.assertEquals(templatevm.get_power_state(), "Running")
self.assertRaises(QubesException, testvm2.start)
templatevm.force_shutdown()
testvm2.start()
self.assertEquals(testvm2.get_power_state(), "Running")
self.assertRaises(QubesException, templatevm.start)
def test_100_resize_root_img(self):
testvm1 = self.qc.add_new_vm("QubesHVm",
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.resize_root_img(30*1024**3)
self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3)
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
# TODO: launch some OS there and check the size
class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
def test_000_prepare_dvm(self):
self.qc.unlock_db()
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
self.template],
stderr=open(os.devnull, 'w'))
self.assertEqual(retcode, 0)
self.qc.lock_db_for_writing()
self.qc.load()
self.assertIsNotNone(self.qc.get_vm_by_name(
self.template + "-dvm"))
# TODO: check mtime of snapshot file
def test_010_simple_dvm_run(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
(stdout, _) = p.communicate(input="echo test")
self.assertEqual(stdout, "test\n")
# TODO: check if DispVM is destroyed
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_020_gui_app(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=open(os.devnull, 'w'))
def test_300_bug_1028_gui_memory_pinning(self):
"""
If VM window composition buffers are relocated in memory, GUI will
still use old pointers and will display old pages
:return:
"""
self.testvm1.memory = 800
self.testvm1.maxmem = 800
# exclude from memory balancing
self.testvm1.services['meminfo-writer'] = False
self.testvm1.start()
# and allow large map count
self.testvm1.run("echo 256000 > /proc/sys/vm/max_map_count",
user="root", wait=True)
allocator_c = (
"#include <sys/mman.h>\n"
"#include <stdlib.h>\n"
"#include <stdio.h>\n"
"\n"
"int main(int argc, char **argv) {\n"
" int total_pages;\n"
" char *addr, *iter;\n"
"\n"
" total_pages = atoi(argv[1]);\n"
" addr = mmap(NULL, total_pages * 0x1000, PROT_READ | "
"PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_POPULATE, -1, 0);\n"
" if (addr == MAP_FAILED) {\n"
" perror(\"mmap\");\n"
" exit(1);\n"
" }\n"
" printf(\"Stage1\\n\");\n"
" fflush(stdout);\n"
" getchar();\n"
" for (iter = addr; iter < addr + total_pages*0x1000; iter += "
"0x2000) {\n"
" if (mlock(iter, 0x1000) == -1) {\n"
" perror(\"mlock\");\n"
" fprintf(stderr, \"%d of %d\\n\", (iter-addr)/0x1000, "
"total_pages);\n"
" exit(1);\n"
" }\n"
" }\n"
" printf(\"Stage2\\n\");\n"
" fflush(stdout);\n"
" for (iter = addr+0x1000; iter < addr + total_pages*0x1000; "
"iter += 0x2000) {\n"
" if (munmap(iter, 0x1000) == -1) {\n"
" perror(\"munmap\");\n"
" exit(1);\n"
" }\n"
" }\n"
" printf(\"Stage3\\n\");\n"
" fflush(stdout);\n"
" fclose(stdout);\n"
" getchar();\n"
"\n"
" return 0;\n"
"}\n")
# wait for DispVM startup:
p.stdin.write("echo test\n")
p.stdin.flush()
l = p.stdout.readline()
self.assertEqual(l, "test\n")
# potential race condition, but our tests are supposed to be
# running on dedicated machine, so should not be a problem
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
max_qid = 0
for vm in self.qc.values():
if not vm.is_disposablevm():
continue
if vm.qid > max_qid:
max_qid = vm.qid
dispvm = self.qc[max_qid]
self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
self.assertTrue(dispvm.is_running())
try:
window_title = 'user@%s' % (dispvm.template.name + "-dvm")
p.stdin.write("xterm -e "
"\"sh -s -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
format(window_title))
self.wait_for_window(window_title)
time.sleep(0.5)
self.enter_keys_in_window(window_title, ['Return'])
# Wait for window to close
self.wait_for_window(window_title, show=False)
finally:
p.stdin.close()
wait_count = 0
while dispvm.is_running():
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for DispVM destruction")
time.sleep(0.1)
wait_count = 0
while p.poll() is None:
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for qfile-daemon-dvm "
"termination")
time.sleep(0.1)
self.assertEqual(p.returncode, 0)
self.qc.lock_db_for_reading()
self.qc.load()
self.qc.unlock_db()
self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
"DispVM not removed from qubes.xml")
def _handle_editor(self, winid):
(window_title, _) = subprocess.Popen(
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
communicate()
window_title = window_title.strip().\
replace('(', '\(').replace(')', '\)')
time.sleep(1)
if "gedit" in window_title:
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'windowactivate', 'type', 'test test 2\n'])
time.sleep(0.5)
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'key', 'ctrl+s', 'ctrl+q'])
elif "emacs" in window_title:
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'windowactivate', 'type', 'test test 2\n'])
time.sleep(0.5)
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'key', 'ctrl+x', 'ctrl+s'])
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'key', 'ctrl+x', 'ctrl+c'])
elif "vim" in window_title:
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'windowactivate', 'key', 'i',
'type', 'test test 2\n'])
subprocess.check_call(
['xdotool', 'search', '--name', window_title,
'key', 'Escape', 'colon', 'w', 'q', 'Return'])
else:
self.fail("Unknown editor window: {}".format(window_title))
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_030_edit_file(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
template=self.qc.get_vm_by_name(
self.template))
testvm1.create_on_disk(verbose=False)
self.qc.save()
testvm1.start()
testvm1.run("echo test1 > /home/user/test.txt", wait=True)
self.qc.unlock_db()
p = testvm1.run("qvm-open-in-dvm /home/user/test.txt",
passio_popen=True)
wait_count = 0
winid = None
while True:
search = subprocess.Popen(['xdotool', 'search',
'--onlyvisible', '--class', 'disp*'],
stdout=subprocess.PIPE,
stderr=open(os.path.devnull, 'w'))
retcode = search.wait()
if retcode == 0:
winid = search.stdout.read().strip()
break
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for editor window")
time.sleep(0.3)
self._handle_editor(winid)
p.wait()
p = testvm1.run("cat /home/user/test.txt",
passio_popen=True)
(test_txt_content, _) = p.communicate()
self.assertEqual(test_txt_content, "test test 2\ntest1\n")
class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_000_clipboard(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
template=self.qc.get_default_template())
testvm1.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm2'),
template=self.qc.get_default_template())
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.start()
testvm2.start()
window_title = 'user@{}'.format(testvm1.name)
testvm1.run('zenity --text-info --editable --title={}'.format(
window_title))
self.wait_for_window(window_title)
time.sleep(0.5)
test_string = "test{}".format(testvm1.xid)
# Type and copy some text
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'windowactivate',
'type', '{}'.format(test_string)])
# second xdotool call because type --terminator do not work (SEGV)
# additionally do not use search here, so window stack will be empty
# and xdotool will use XTEST instead of generating events manually -
# this will be much better - at least because events will have
# correct timestamp (so gui-daemon would not drop the copy request)
subprocess.check_call(['xdotool',
'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
'Escape'])
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, test_string,
"Clipboard copy operation failed - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source',
'r').read().strip()
self.assertEquals(clipboard_source, testvm1.name,
"Clipboard copy operation failed - owner")
# Then paste it to the other window
window_title = 'user@{}'.format(testvm2.name)
p = testvm2.run('zenity --entry --title={} > test.txt'.format(
window_title), passio_popen=True)
self.wait_for_window(window_title)
subprocess.check_call(['xdotool', 'key', '--delay', '100',
'ctrl+shift+v', 'ctrl+v', 'Return'])
p.wait()
# And compare the result
(test_output, _) = testvm2.run('cat test.txt',
passio_popen=True).communicate()
self.assertEquals(test_string, test_output.strip())
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, "",
"Clipboard not wiped after paste - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
).strip()
self.assertEquals(clipboard_source, "",
"Clipboard not wiped after paste - owner")
@unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'),
'grub-xen package not installed')
class TC_40_PVGrub(qubes.tests.SystemTestsMixin):
def setUp(self):
super(TC_40_PVGrub, self).setUp()
supported = False
if self.template.startswith('fedora-'):
supported = True
elif self.template.startswith('debian-'):
supported = True
if not supported:
self.skipTest("Template {} not supported by this test".format(
self.template))
def install_packages(self, vm):
if self.template.startswith('fedora-'):
cmd_install1 = 'yum clean expire-cache && ' \
'yum install -y qubes-kernel-vm-support grub2-tools'
cmd_install2 = 'yum install -y kernel kernel-devel'
cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg'
elif self.template.startswith('debian-'):
cmd_install1 = 'apt-get update && apt-get install -y ' \
'qubes-kernel-vm-support grub2-common'
cmd_install2 = 'apt-get install -y linux-image-amd64'
cmd_update_grub = 'mkdir /boot/grub && update-grub2'
else:
assert False, "Unsupported template?!"
for cmd in [cmd_install1, cmd_install2, cmd_update_grub]:
p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True)
p = self.testvm1.run("cat > allocator.c", passio_popen=True)
p.communicate(allocator_c)
p = self.testvm1.run("gcc allocator.c -o allocator",
passio_popen=True, passio_stderr=True)
(stdout, stderr) = p.communicate()
self.assertEquals(p.returncode, 0,
"Failed command: {}\nSTDOUT: {}\nSTDERR: {}"
.format(cmd, stdout, stderr))
if p.returncode != 0:
self.skipTest("allocator compile failed: {}".format(stderr))
def get_kernel_version(self, vm):
if self.template.startswith('fedora-'):
cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \
'cut -d - -f 2-'
elif self.template.startswith('debian-'):
cmd_get_kernel_version = \
'dpkg-query --showformat=\'${Package}\\n\' --show ' \
'\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-'
else:
raise RuntimeError("Unsupported template?!")
# drop caches to have even more memory pressure
self.testvm1.run("echo 3 > /proc/sys/vm/drop_caches",
user="root", wait=True)
p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True)
(kver, _) = p.communicate()
self.assertEquals(p.returncode, 0,
"Failed command: {}".format(cmd_get_kernel_version))
return kver.strip()
def test_000_standalone_vm(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_vm_by_name(
self.template))
self.save_and_reload_db()
self.qc.unlock_db()
testvm1 = self.qc[testvm1.qid]
testvm1.start()
self.install_packages(testvm1)
kver = self.get_kernel_version(testvm1)
self.shutdown_and_wait(testvm1)
self.qc.lock_db_for_writing()
self.qc.load()
testvm1 = self.qc[testvm1.qid]
testvm1.kernel = 'pvgrub2'
self.save_and_reload_db()
self.qc.unlock_db()
testvm1 = self.qc[testvm1.qid]
testvm1.start()
p = testvm1.run('uname -r', passio_popen=True)
(actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver)
def test_010_template_based_vm(self):
test_template = self.qc.add_new_vm("QubesTemplateVm",
template=None,
name=self.make_vm_name('template'))
test_template.clone_attrs(self.qc.get_vm_by_name(self.template))
test_template.clone_disk_files(
src_vm=self.qc.get_vm_by_name(self.template),
verbose=False)
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=test_template,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=test_template)
self.save_and_reload_db()
self.qc.unlock_db()
test_template = self.qc[test_template.qid]
testvm1 = self.qc[testvm1.qid]
test_template.start()
self.install_packages(test_template)
kver = self.get_kernel_version(test_template)
self.shutdown_and_wait(test_template)
self.qc.lock_db_for_writing()
self.qc.load()
test_template = self.qc[test_template.qid]
test_template.kernel = 'pvgrub2'
testvm1 = self.qc[testvm1.qid]
testvm1.kernel = 'pvgrub2'
self.save_and_reload_db()
self.qc.unlock_db()
# Check if TemplateBasedVM boots and has the right kernel
testvm1 = self.qc[testvm1.qid]
testvm1.start()
p = testvm1.run('uname -r', passio_popen=True)
(actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver)
# And the same for the TemplateVM itself
test_template = self.qc[test_template.qid]
test_template.start()
p = test_template.run('uname -r', passio_popen=True)
(actual_kver, _) = p.communicate()
self.assertEquals(actual_kver.strip(), kver)
@unittest.skipUnless(
spawn.find_executable('xprop') and
spawn.find_executable('xdotool') and
spawn.find_executable('wmctrl'),
"xprop or xdotool or wmctrl not installed")
class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
@classmethod
def setUpClass(cls):
if cls.template == 'whonix-gw' or 'minimal' in cls.template:
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
if cls.template == 'whonix-ws':
# TODO remove when Whonix-based DispVMs will work (Whonix 13?)
raise unittest.SkipTest(
'Template {} not supported by this test'.format(cls.template))
qc = QubesVmCollection()
cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
qc.lock_db_for_writing()
qc.load()
cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
prefix=qubes.tests.CLSVMPREFIX)
cls.source_vmname = cls.make_vm_name('source', True)
source_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.source_vmname)
source_vm.create_on_disk(verbose=False)
cls.target_vmname = cls.make_vm_name('target', True)
target_vm = qc.add_new_vm("QubesAppVm",
template=qc.get_vm_by_name(cls.template),
name=cls.target_vmname)
target_vm.create_on_disk(verbose=False)
qc.save()
qc.unlock_db()
source_vm.start()
target_vm.start()
# make sure that DispVMs will be started of the same template
retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
cls.template],
stderr=open(os.devnull, 'w'))
assert retcode == 0, "Error preparing DispVM"
def setUp(self):
super(TC_50_MimeHandlers, self).setUp()
self.source_vm = self.qc.get_vm_by_name(self.source_vmname)
self.target_vm = self.qc.get_vm_by_name(self.target_vmname)
def get_window_class(self, winid, dispvm=False):
(vm_winid, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
stdout=subprocess.PIPE
).communicate()
vm_winid = vm_winid.split("#")[1].strip('\n" ')
if dispvm:
(vmname, _) = subprocess.Popen(
['xprop', '-id', winid, '_QUBES_VMNAME'],
stdout=subprocess.PIPE
).communicate()
vmname = vmname.split("=")[1].strip('\n" ')
window_class = None
while window_class is None:
# XXX to use self.qc.get_vm_by_name would require reloading
# qubes.xml, so use qvm-run instead
xprop = subprocess.Popen(
['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
vm_winid)], stdout=subprocess.PIPE)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
else:
window_class = None
while window_class is None:
xprop = self.target_vm.run(
'xprop -id {} WM_CLASS'.format(vm_winid),
# now fragment all free memory
p = self.testvm1.run("grep ^MemFree: /proc/meminfo|awk '{print $2}'",
passio_popen=True)
(window_class, _) = xprop.communicate()
if xprop.returncode != 0:
self.skipTest("xprop failed, not installed?")
if 'not found' in window_class:
# WM_CLASS not set yet, wait a little
time.sleep(0.1)
window_class = None
# output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
try:
window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
except IndexError:
raise Exception(
"Unexpected output from xprop: '{}'".format(window_class))
memory_pages = int(p.communicate()[0].strip())
memory_pages /= 4 # 4k pages
alloc1 = self.testvm1.run(
"ulimit -l unlimited; exec /home/user/allocator {}".format(
memory_pages),
user="root", passio_popen=True, passio_stderr=True)
# wait for memory being allocated; can't use just .read(), because EOF
# passing is unreliable while the process is still running
alloc1.stdin.write("\n")
alloc1.stdin.flush()
alloc_out = alloc1.stdout.read(len("Stage1\nStage2\nStage3\n"))
return window_class
if "Stage3" not in alloc_out:
# read stderr only in case of failed assert, but still have nice
# failure message (don't use self.fail() directly)
self.assertIn("Stage3", alloc_out, alloc1.stderr.read())
def open_file_and_check_viewer(self, filename, expected_app_titles,
expected_app_classes, dispvm=False):
self.qc.unlock_db()
if dispvm:
p = self.source_vm.run("qvm-open-in-dvm {}".format(filename),
# now, launch some window - it should get fragmented composition buffer
# it is important to have some changing content there, to generate
# content update events (aka damage notify)
proc = self.testvm1.run("gnome-terminal --full-screen -e top",
passio_popen=True)
vmpattern = "disp*"
else:
self.qrexec_policy('qubes.OpenInVM', self.source_vm.name,
self.target_vmname)
self.qrexec_policy('qubes.OpenURL', self.source_vm.name,
self.target_vmname)
p = self.source_vm.run("qvm-open-in-vm {} {}".format(
self.target_vmname, filename), passio_popen=True)
vmpattern = self.target_vmname
wait_count = 0
winid = None
window_title = None
while True:
search = subprocess.Popen(['xdotool', 'search',
'--onlyvisible', '--class', vmpattern],
stdout=subprocess.PIPE,
stderr=open(os.path.devnull, 'w'))
retcode = search.wait()
if retcode == 0:
winid = search.stdout.read().strip()
# get window title
(window_title, _) = subprocess.Popen(
['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
communicate()
window_title = window_title.strip()
# ignore LibreOffice splash screen and window with no title
# set yet
if window_title and not window_title.startswith("LibreOffice")\
and not window_title == 'VMapp command':
break
wait_count += 1
if wait_count > 100:
self.fail("Timeout while waiting for editor window")
time.sleep(0.3)
# get window class
window_class = self.get_window_class(winid, dispvm)
# close the window - we've got the window class, it is no longer needed
subprocess.check_call(['wmctrl', '-i', '-c', winid])
p.wait()
self.wait_for_window(window_title, show=False)
# help xdotool a little...
time.sleep(2)
# get window ID
search = subprocess.Popen(['xdotool', 'search', '--sync',
'--onlyvisible', '--class', self.testvm1.name + ':.*erminal'],
stdout=subprocess.PIPE)
winid = search.communicate()[0].strip()
xprop = subprocess.Popen(['xprop', '-notype', '-id', winid,
'_QUBES_VMWINDOWID'], stdout=subprocess.PIPE)
vm_winid = xprop.stdout.read().strip().split(' ')[4]
def check_matches(obj, patterns):
return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
else pat in obj) for pat in patterns)
# now free the fragmented memory and trigger compaction
alloc1.stdin.write("\n")
alloc1.wait()
self.testvm1.run("echo 1 > /proc/sys/vm/compact_memory", user="root")
if not check_matches(window_title, expected_app_titles) and \
not check_matches(window_class, expected_app_classes):
self.fail("Opening file {} resulted in window '{} ({})', which is "
"none of {!r} ({!r})".format(
filename, window_title, window_class,
expected_app_titles, expected_app_classes))
# now window may be already "broken"; to be sure, allocate (=zero)
# some memory
alloc2 = self.testvm1.run(
"ulimit -l unlimited; /home/user/allocator {}".format(memory_pages),
user="root", passio_popen=True, passio_stderr=True)
alloc2.stdout.read(len("Stage1\n"))
def prepare_txt(self, filename):
p = self.source_vm.run("cat > {}".format(filename), passio_popen=True)
p.stdin.write("This is test\n")
p.stdin.close()
retcode = p.wait()
assert retcode == 0, "Failed to write {} file".format(filename)
# wait for damage notify - top updates every 3 sec by default
time.sleep(6)
def prepare_pdf(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
assert retcode == 0, "Failed to run '{}'".format(cmd)
# now take screenshot of the window, from dom0 and VM
# choose pnm format, as it doesn't have any useless metadata - easy
# to compare
p = self.testvm1.run("import -window {} pnm:-".format(vm_winid),
passio_popen=True, passio_stderr=True)
(vm_image, stderr) = p.communicate()
if p.returncode != 0:
raise Exception("Failed to get VM window image: {}".format(
stderr))
def prepare_doc(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
p = subprocess.Popen(["import", "-window", winid, "pnm:-"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(dom0_image, stderr) = p.communicate()
if p.returncode != 0:
raise Exception("Failed to get dom0 window image: {}".format(
stderr))
def prepare_pptx(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_png(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def prepare_jpg(self, filename):
self.prepare_txt("/tmp/source.txt")
cmd = "convert /tmp/source.txt {}".format(filename)
retcode = self.source_vm.run(cmd, wait=True)
if retcode != 0:
self.skipTest("Failed to run '{}', not installed?".format(cmd))
def test_000_txt(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
self.open_file_and_check_viewer(filename, ["vim"],
["gedit", "emacs"])
def test_001_pdf(self):
filename = "/home/user/test_file.pdf"
self.prepare_pdf(filename)
self.open_file_and_check_viewer(filename, [],
["evince"])
def test_002_doc(self):
filename = "/home/user/test_file.doc"
self.prepare_doc(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice", "abiword"])
def test_003_pptx(self):
filename = "/home/user/test_file.pptx"
self.prepare_pptx(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice"])
def test_004_png(self):
filename = "/home/user/test_file.png"
self.prepare_png(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_005_jpg(self):
filename = "/home/user/test_file.jpg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_006_jpeg(self):
filename = "/home/user/test_file.jpeg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"])
def test_010_url(self):
self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
["Firefox", "Iceweasel"])
def test_100_txt_dispvm(self):
filename = "/home/user/test_file.txt"
self.prepare_txt(filename)
self.open_file_and_check_viewer(filename, ["vim"],
["gedit", "emacs"],
dispvm=True)
def test_101_pdf_dispvm(self):
filename = "/home/user/test_file.pdf"
self.prepare_pdf(filename)
self.open_file_and_check_viewer(filename, [],
["evince"],
dispvm=True)
def test_102_doc_dispvm(self):
filename = "/home/user/test_file.doc"
self.prepare_doc(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice", "abiword"],
dispvm=True)
def test_103_pptx_dispvm(self):
filename = "/home/user/test_file.pptx"
self.prepare_pptx(filename)
self.open_file_and_check_viewer(filename, [],
["libreoffice"],
dispvm=True)
def test_104_png_dispvm(self):
filename = "/home/user/test_file.png"
self.prepare_png(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_105_jpg_dispvm(self):
filename = "/home/user/test_file.jpg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_106_jpeg_dispvm(self):
filename = "/home/user/test_file.jpeg"
self.prepare_jpg(filename)
self.open_file_and_check_viewer(filename, [],
["shotwell", "eog", "display"],
dispvm=True)
def test_110_url_dispvm(self):
self.open_file_and_check_viewer("https://www.qubes-os.org/", [],
["Firefox", "Iceweasel"],
dispvm=True)
if vm_image != dom0_image:
self.fail("Dom0 window doesn't match VM window content")
def load_tests(loader, tests, pattern):
@ -1651,21 +1025,4 @@ def load_tests(loader, tests, pattern):
(TC_00_AppVMMixin, qubes.tests.QubesTestCase),
{'template': template})))
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_20_DispVM_' + template,
(TC_20_DispVMMixin, qubes.tests.QubesTestCase),
{'template': template})))
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_40_PVGrub_' + template,
(TC_40_PVGrub, qubes.tests.QubesTestCase),
{'template': template})))
tests.addTests(loader.loadTestsFromTestCase(
type(
'TC_50_MimeHandlers_' + template,
(TC_50_MimeHandlers, qubes.tests.QubesTestCase),
{'template': template})))
return tests

View File

@ -1 +1 @@
3.2.3
3.2.8