core-admin/qubes/tests/int/basic.py

555 lines
22 KiB
Python
Raw Normal View History

2014-10-02 10:56:04 +02:00
#!/usr/bin/python
2015-02-05 15:46:40 +01:00
# vim: fileencoding=utf-8
# pylint: disable=invalid-name
2014-10-02 10:56:04 +02:00
#
2015-02-05 15:46:40 +01:00
# The Qubes OS Project, https://www.qubes-os.org/
2014-10-02 10:56:04 +02:00
#
2015-02-05 15:46:40 +01:00
# Copyright (C) 2014-2015
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
# Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
2014-10-02 10:56:04 +02:00
#
2015-02-05 15:46:40 +01:00
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
2014-10-02 10:56:04 +02:00
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
2015-02-05 15:46:40 +01:00
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
2014-10-02 10:56:04 +02:00
#
from distutils import spawn
2015-02-05 15:46:40 +01:00
2014-10-02 10:56:04 +02:00
import os
import subprocess
import tempfile
import time
import unittest
2014-10-02 10:56:04 +02:00
import qubes
import qubes.tests
import qubes.vm.appvm
import qubes.vm.qubesvm
import qubes.vm.templatevm
import libvirt # pylint: disable=import-error
2015-07-08 04:26:11 +02:00
2014-10-02 10:56:04 +02:00
class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self):
super(TC_00_Basic, self).setUp()
self.init_default_template()
def test_000_qubes_create(self):
self.assertIsInstance(self.app, qubes.Qubes)
def test_100_qvm_create(self):
2015-02-05 15:46:40 +01:00
vmname = self.make_vm_name('appvm')
vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=self.app.default_template,
label='red')
2014-10-02 10:56:04 +02:00
self.assertIsNotNone(vm)
self.assertEqual(vm.name, vmname)
self.assertEqual(vm.template, self.app.default_template)
vm.create_on_disk()
2015-02-05 15:46:40 +01:00
with self.assertNotRaises(qubes.exc.QubesException):
vm.storage.verify()
2014-10-02 10:56:04 +02:00
class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# pylint: disable=attribute-defined-outside-init
2014-10-02 10:56:04 +02:00
def setUp(self):
2015-02-05 15:46:40 +01:00
super(TC_01_Properties, self).setUp()
self.init_default_template()
2015-02-05 15:46:40 +01:00
self.vmname = self.make_vm_name('appvm')
self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
template=self.app.default_template,
label='red')
self.vm.create_on_disk()
2014-10-02 10:56:04 +02:00
def save_and_reload_db(self):
super(TC_01_Properties, self).save_and_reload_db()
if hasattr(self, 'vm'):
self.vm = self.app.domains[self.vm.qid]
if hasattr(self, 'netvm'):
self.netvm = self.app.domains[self.netvm.qid]
2015-02-05 15:46:40 +01:00
def test_000_rename(self):
newname = self.make_vm_name('newname')
2014-10-02 10:56:04 +02:00
self.assertEqual(self.vm.name, self.vmname)
self.vm.write_firewall_conf({'allow': False, 'allowDns': False})
self.vm.autostart = True
self.addCleanup(os.system,
'sudo systemctl -q disable qubes-vm@{}.service || :'.
format(self.vmname))
pre_rename_firewall = self.vm.get_firewall_conf()
2015-02-05 15:46:40 +01:00
with self.assertNotRaises(
(OSError, libvirt.libvirtError, qubes.exc.QubesException)):
self.vm.name = newname
2014-10-02 10:56:04 +02:00
self.assertEqual(self.vm.name, newname)
self.assertEqual(self.vm.dir_path,
2016-05-21 04:15:09 +02:00
os.path.join(
qubes.config.system_path['qubes_base_dir'],
qubes.config.system_path['qubes_appvms_dir'], newname))
2014-10-02 10:56:04 +02:00
self.assertTrue(os.path.exists(
os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory")))
# FIXME: set whitelisted-appmenus.list first
self.assertTrue(os.path.exists(os.path.join(
self.vm.dir_path, "apps", newname + "-firefox.desktop")))
2014-10-02 10:56:04 +02:00
self.assertTrue(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
2015-02-05 15:46:40 +01:00
newname + "-vm.directory")))
2014-10-02 10:56:04 +02:00
self.assertTrue(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/applications",
2015-02-05 15:46:40 +01:00
newname + "-firefox.desktop")))
2014-10-02 10:56:04 +02:00
self.assertFalse(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
2015-02-05 15:46:40 +01:00
self.vmname + "-vm.directory")))
2014-10-02 10:56:04 +02:00
self.assertFalse(os.path.exists(
os.path.join(os.getenv("HOME"), ".local/share/applications",
2015-02-05 15:46:40 +01:00
self.vmname + "-firefox.desktop")))
self.assertEquals(pre_rename_firewall, self.vm.get_firewall_conf())
with self.assertNotRaises((qubes.exc.QubesException, OSError)):
self.vm.write_firewall_conf({'allow': False})
self.assertTrue(self.vm.autostart)
self.assertTrue(os.path.exists(
'/etc/systemd/system/multi-user.target.wants/'
'qubes-vm@{}.service'.format(newname)))
self.assertFalse(os.path.exists(
'/etc/systemd/system/multi-user.target.wants/'
'qubes-vm@{}.service'.format(self.vmname)))
2015-02-05 15:46:40 +01:00
def test_001_rename_libvirt_undefined(self):
self.vm.libvirt_domain.undefine()
self.vm._libvirt_domain = None # pylint: disable=protected-access
newname = self.make_vm_name('newname')
with self.assertNotRaises(
(OSError, libvirt.libvirtError, qubes.exc.QubesException)):
self.vm.name = newname
2015-07-08 04:26:11 +02:00
def test_030_clone(self):
testvm1 = self.app.add_new_vm(
qubes.vm.appvm.AppVM,
2015-07-08 04:26:11 +02:00
name=self.make_vm_name("vm"),
template=self.app.default_template,
label='red')
testvm1.create_on_disk()
testvm2 = self.app.add_new_vm(testvm1.__class__,
2015-07-08 04:26:11 +02:00
name=self.make_vm_name("clone"),
template=testvm1.template,
label='red')
testvm2.clone_properties(testvm1)
testvm2.clone_disk_files(testvm1)
self.assertTrue(testvm1.storage.verify())
self.assertIn('source', testvm1.volumes['root'].config)
self.assertNotEquals(testvm2, None)
self.assertNotEquals(testvm2.volumes, {})
self.assertIn('source', testvm2.volumes['root'].config)
2015-07-08 04:26:11 +02:00
# qubes.xml reload
self.save_and_reload_db()
testvm1 = self.app.domains[testvm1.qid]
testvm2 = self.app.domains[testvm2.qid]
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.label, testvm2.label)
self.assertEquals(testvm1.netvm, testvm2.netvm)
self.assertEquals(testvm1.property_is_default('netvm'),
testvm2.property_is_default('netvm'))
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.kernel, testvm2.kernel)
self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
self.assertEquals(testvm1.property_is_default('kernel'),
testvm2.property_is_default('kernel'))
self.assertEquals(testvm1.property_is_default('kernelopts'),
testvm2.property_is_default('kernelopts'))
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.memory, testvm2.memory)
self.assertEquals(testvm1.maxmem, testvm2.maxmem)
# TODO
# self.assertEquals(testvm1.pcidevs, testvm2.pcidevs)
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.include_in_backups,
testvm2.include_in_backups)
self.assertEquals(testvm1.default_user, testvm2.default_user)
2016-03-03 13:03:27 +01:00
self.assertEquals(testvm1.features, testvm2.features)
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.get_firewall_conf(),
testvm2.get_firewall_conf())
# now some non-default values
testvm1.netvm = None
testvm1.label = 'orange'
2015-07-08 04:26:11 +02:00
testvm1.memory = 512
firewall = testvm1.get_firewall_conf()
firewall['allowDns'] = False
firewall['allowYumProxy'] = False
firewall['rules'] = [{'address': '1.2.3.4',
'netmask': 24,
'proto': 'tcp',
'portBegin': 22,
'portEnd': 22,
}]
testvm1.write_firewall_conf(firewall)
testvm3 = self.app.add_new_vm(testvm1.__class__,
2015-07-08 04:26:11 +02:00
name=self.make_vm_name("clone2"),
template=testvm1.template,
label='red',)
testvm3.clone_properties(testvm1)
testvm3.clone_disk_files(testvm1)
2015-07-08 04:26:11 +02:00
# qubes.xml reload
self.save_and_reload_db()
testvm1 = self.app.domains[testvm1.qid]
testvm3 = self.app.domains[testvm3.qid]
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.label, testvm3.label)
self.assertEquals(testvm1.netvm, testvm3.netvm)
self.assertEquals(testvm1.property_is_default('netvm'),
testvm3.property_is_default('netvm'))
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.kernel, testvm3.kernel)
self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
self.assertEquals(testvm1.property_is_default('kernel'),
testvm3.property_is_default('kernel'))
self.assertEquals(testvm1.property_is_default('kernelopts'),
testvm3.property_is_default('kernelopts'))
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.memory, testvm3.memory)
self.assertEquals(testvm1.maxmem, testvm3.maxmem)
# TODO
# self.assertEquals(testvm1.pcidevs, testvm3.pcidevs)
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.include_in_backups,
testvm3.include_in_backups)
self.assertEquals(testvm1.default_user, testvm3.default_user)
2016-03-03 13:03:27 +01:00
self.assertEquals(testvm1.features, testvm3.features)
2015-07-08 04:26:11 +02:00
self.assertEquals(testvm1.get_firewall_conf(),
testvm3.get_firewall_conf())
def test_020_name_conflict_app(self):
# TODO decide what exception should be here
with self.assertRaises((qubes.exc.QubesException, ValueError)):
self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.vmname, template=self.app.default_template,
label='red')
self.vm2.create_on_disk()
def test_021_name_conflict_template(self):
# TODO decide what exception should be here
with self.assertRaises((qubes.exc.QubesException, ValueError)):
self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
name=self.vmname, label='red')
self.vm2.create_on_disk()
def test_030_rename_conflict_app(self):
vm2name = self.make_vm_name('newname')
self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vm2name, template=self.app.default_template, label='red')
self.vm2.create_on_disk()
with self.assertNotRaises(OSError):
with self.assertRaises(qubes.exc.QubesException):
self.vm2.name = self.vmname
class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# pylint: disable=attribute-defined-outside-init
def setUp(self):
super(TC_02_QvmPrefs, self).setUp()
self.init_default_template()
self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
def setup_appvm(self):
self.testvm = self.app.add_new_vm(
qubes.vm.appvm.AppVM,
name=self.make_vm_name("vm"),
label='red')
self.testvm.create_on_disk()
self.save_and_reload_db()
def setup_hvm(self):
self.testvm = self.app.add_new_vm(
qubes.vm.appvm.AppVM,
name=self.make_vm_name("hvm"),
label='red')
self.testvm.hvm = True
self.testvm.create_on_disk()
self.save_and_reload_db()
def pref_set(self, name, value, valid=True):
p = subprocess.Popen(
['qvm-prefs'] + self.sharedopts +
(['--'] if value != '-D' else []) + [self.testvm.name, name, value],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
(stdout, stderr) = p.communicate()
if valid:
self.assertEquals(p.returncode, 0,
"qvm-prefs .. '{}' '{}' failed: {}{}".format(
name, value, stdout, stderr
))
else:
self.assertNotEquals(p.returncode, 0,
"qvm-prefs should reject value '{}' for "
"property '{}'".format(value, name))
def pref_get(self, name):
p = subprocess.Popen(['qvm-prefs'] + self.sharedopts +
['--', self.testvm.name, name], stdout=subprocess.PIPE)
(stdout, _) = p.communicate()
self.assertEquals(p.returncode, 0)
return stdout.strip()
bool_test_values = [
('true', 'True', True),
('False', 'False', True),
('0', 'False', True),
('1', 'True', True),
('invalid', '', False)
]
def execute_tests(self, name, values):
"""
Helper function, which executes tests for given property.
:param values: list of tuples (value, expected, valid),
where 'value' is what should be set and 'expected' is what should
qvm-prefs returns as a property value and 'valid' marks valid and
invalid values - if it's False, qvm-prefs should reject the value
:return: None
"""
for (value, expected, valid) in values:
self.pref_set(name, value, valid)
if valid:
self.assertEquals(self.pref_get(name), expected)
@unittest.skip('test not converted to core3 API')
def test_006_template(self):
templates = [tpl for tpl in self.app.domains.values() if
2016-06-16 21:27:56 +02:00
isinstance(tpl, qubes.vm.templatevm.TemplateVM)]
if not templates:
self.skipTest("No templates installed")
some_template = templates[0].name
self.setup_appvm()
self.execute_tests('template', [
(some_template, some_template, True),
('invalid', '', False),
])
@unittest.skip('test not converted to core3 API')
def test_014_pcidevs(self):
self.setup_appvm()
self.execute_tests('pcidevs', [
('[]', '[]', True),
('[ "00:00.0" ]', "['00:00.0']", True),
('invalid', '', False),
('[invalid]', '', False),
# TODO:
# ('["12:12.0"]', '', False)
])
@unittest.skip('test not converted to core3 API')
def test_024_pv_reject_hvm_props(self):
self.setup_appvm()
self.execute_tests('guiagent_installed', [('False', '', False)])
self.execute_tests('qrexec_installed', [('False', '', False)])
self.execute_tests('drive', [('/tmp/drive.img', '', False)])
self.execute_tests('timezone', [('localtime', '', False)])
@unittest.skip('test not converted to core3 API')
def test_025_hvm_reject_pv_props(self):
self.setup_hvm()
self.execute_tests('kernel', [('default', '', False)])
self.execute_tests('kernelopts', [('default', '', False)])
class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
qubes.tests.QubesTestCase):
# pylint: disable=attribute-defined-outside-init
def setUp(self):
super(TC_03_QvmRevertTemplateChanges, self).setUp()
self.init_default_template()
def setup_pv_template(self):
self.test_template = self.app.add_new_vm(
qubes.vm.templatevm.TemplateVM,
name=self.make_vm_name("pv-clone"),
label='red'
)
self.test_template.clone_properties(self.app.default_template)
self.test_template.clone_disk_files(self.app.default_template)
self.save_and_reload_db()
def setup_hvm_template(self):
self.test_template = self.app.add_new_vm(
qubes.vm.templatevm.TemplateVM,
name=self.make_vm_name("hvm"),
label='red',
hvm=True
)
self.test_template.create_on_disk()
self.save_and_reload_db()
def get_rootimg_checksum(self):
2016-04-15 20:37:35 +02:00
p = subprocess.Popen(
['sha1sum', self.test_template.volumes['root'].path_cow],
2016-04-15 20:37:35 +02:00
stdout=subprocess.PIPE)
return p.communicate()[0]
def _do_test(self):
checksum_before = self.get_rootimg_checksum()
self.test_template.start()
self.shutdown_and_wait(self.test_template)
checksum_changed = self.get_rootimg_checksum()
if checksum_before == checksum_changed:
self.log.warning("template not modified, test result will be "
"unreliable")
self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
with self.assertNotRaises(subprocess.CalledProcessError):
pool_vid = repr(self.test_template.volumes['root']).strip("'")
revert_cmd = ['qvm-block', 'revert', pool_vid]
subprocess.check_call(revert_cmd)
checksum_after = self.get_rootimg_checksum()
self.assertEquals(checksum_before, checksum_after)
def test_000_revert_pv(self):
"""
Test qvm-revert-template-changes for PV template
"""
self.setup_pv_template()
self._do_test()
@unittest.skip('HVM not yet implemented')
def test_000_revert_hvm(self):
"""
Test qvm-revert-template-changes for HVM template
"""
# TODO: have some system there, so the root.img will get modified
self.setup_hvm_template()
self._do_test()
@unittest.skip('test not converted to core3 API')
class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
@unittest.skipUnless(
spawn.find_executable('xdotool'),
"xdotool not installed")
def test_000_clipboard(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'),
template=self.qc.get_default_template())
testvm1.create_on_disk(verbose=False)
testvm2 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm2'),
template=self.qc.get_default_template())
testvm2.create_on_disk(verbose=False)
self.qc.save()
self.qc.unlock_db()
testvm1.start()
testvm2.start()
window_title = 'user@{}'.format(testvm1.name)
testvm1.run('zenity --text-info --editable --title={}'.format(
window_title))
self.wait_for_window(window_title)
time.sleep(0.5)
test_string = "test{}".format(testvm1.xid)
# Type and copy some text
subprocess.check_call(['xdotool', 'search', '--name', window_title,
'windowactivate', '--sync',
'type', '{}'.format(test_string)])
# second xdotool call because type --terminator do not work (SEGV)
# additionally do not use search here, so window stack will be empty
# and xdotool will use XTEST instead of generating events manually -
# this will be much better - at least because events will have
# correct timestamp (so gui-daemon would not drop the copy request)
subprocess.check_call(['xdotool',
'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
'Escape'])
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, test_string,
"Clipboard copy operation failed - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source',
'r').read().strip()
self.assertEquals(clipboard_source, testvm1.name,
"Clipboard copy operation failed - owner")
# Then paste it to the other window
window_title = 'user@{}'.format(testvm2.name)
p = testvm2.run('zenity --entry --title={} > test.txt'.format(
window_title), passio_popen=True)
self.wait_for_window(window_title)
subprocess.check_call(['xdotool', 'key', '--delay', '100',
'ctrl+shift+v', 'ctrl+v', 'Return'])
p.wait()
# And compare the result
(test_output, _) = testvm2.run('cat test.txt',
passio_popen=True).communicate()
self.assertEquals(test_string, test_output.strip())
clipboard_content = \
open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
self.assertEquals(clipboard_content, "",
"Clipboard not wiped after paste - content")
clipboard_source = \
open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
).strip()
self.assertEquals(clipboard_source, "",
"Clipboard not wiped after paste - owner")
class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_000_create_start(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
testvm1.start()
self.assertEquals(testvm1.get_power_state(), "Running")
def test_100_resize_root_img(self):
testvm1 = self.qc.add_new_vm("QubesAppVm",
template=None,
name=self.make_vm_name('vm1'))
testvm1.create_on_disk(verbose=False,
source_template=self.qc.get_default_template())
self.qc.save()
self.qc.unlock_db()
with self.assertRaises(QubesException):
testvm1.resize_root_img(20*1024**3)
testvm1.resize_root_img(20*1024**3, allow_start=True)
timeout = 60
while testvm1.is_running():
time.sleep(1)
timeout -= 1
if timeout == 0:
self.fail("Timeout while waiting for VM shutdown")
self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3)
testvm1.start()
p = testvm1.run('df --output=size /|tail -n 1',
passio_popen=True)
# new_size in 1k-blocks
(new_size, _) = p.communicate()
# some safety margin for FS metadata
self.assertGreater(int(new_size.strip()), 19*1024**2)
2015-02-05 15:46:40 +01:00
# vim: ts=4 sw=4 et