123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- # pylint: disable=invalid-name
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2014-2015
- # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
- # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, write to the Free Software Foundation, Inc.,
- # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- #
- from distutils import spawn
- import asyncio
- import os
- import subprocess
- import tempfile
- import time
- import unittest
- import qubes
- import qubes.firewall
- import qubes.tests
- import qubes.vm.appvm
- import qubes.vm.qubesvm
- import qubes.vm.standalonevm
- import qubes.vm.templatevm
- import libvirt # pylint: disable=import-error
- class TC_00_Basic(qubes.tests.SystemTestCase):
- def setUp(self):
- super(TC_00_Basic, self).setUp()
- self.init_default_template()
- def test_000_qubes_create(self):
- self.assertIsInstance(self.app, qubes.Qubes)
- def test_100_qvm_create(self):
- vmname = self.make_vm_name('appvm')
- vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
- name=vmname, template=self.app.default_template,
- label='red')
- self.assertIsNotNone(vm)
- self.assertEqual(vm.name, vmname)
- self.assertEqual(vm.template, self.app.default_template)
- self.loop.run_until_complete(vm.create_on_disk())
- with self.assertNotRaises(qubes.exc.QubesException):
- self.loop.run_until_complete(vm.storage.verify())
- def test_040_qdb_watch(self):
- flag = set()
- def handler(vm, event, path):
- if path == '/test-watch-path':
- flag.add(True)
- vm = self.app.domains[0]
- vm.watch_qdb_path('/test-watch-path')
- vm.add_handler('domain-qdb-change:/test-watch-path', handler)
- self.assertFalse(flag)
- vm.untrusted_qdb.write('/test-watch-path', 'test-value')
- self.loop.run_until_complete(asyncio.sleep(0.1))
- self.assertTrue(flag)
- class TC_01_Properties(qubes.tests.SystemTestCase):
- # pylint: disable=attribute-defined-outside-init
- def setUp(self):
- super(TC_01_Properties, self).setUp()
- self.init_default_template()
- self.vmname = self.make_vm_name('appvm')
- self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
- template=self.app.default_template,
- label='red')
- self.loop.run_until_complete(self.vm.create_on_disk())
- @unittest.expectedFailure
- def test_030_clone(self):
- testvm1 = self.app.add_new_vm(
- qubes.vm.appvm.AppVM,
- name=self.make_vm_name("vm"),
- template=self.app.default_template,
- label='red')
- self.loop.run_until_complete(testvm1.create_on_disk())
- testvm2 = self.app.add_new_vm(testvm1.__class__,
- name=self.make_vm_name("clone"),
- template=testvm1.template,
- label='red')
- testvm2.clone_properties(testvm1)
- self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
- self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
- self.assertIn('source', testvm1.volumes['root'].config)
- self.assertNotEquals(testvm2, None)
- self.assertNotEquals(testvm2.volumes, {})
- self.assertIn('source', testvm2.volumes['root'].config)
- # qubes.xml reload
- self.app.save()
- testvm1 = self.app.domains[testvm1.qid]
- testvm2 = self.app.domains[testvm2.qid]
- self.assertEqual(testvm1.label, testvm2.label)
- self.assertEqual(testvm1.netvm, testvm2.netvm)
- self.assertEqual(testvm1.property_is_default('netvm'),
- testvm2.property_is_default('netvm'))
- self.assertEqual(testvm1.kernel, testvm2.kernel)
- self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
- self.assertEqual(testvm1.property_is_default('kernel'),
- testvm2.property_is_default('kernel'))
- self.assertEqual(testvm1.property_is_default('kernelopts'),
- testvm2.property_is_default('kernelopts'))
- self.assertEqual(testvm1.memory, testvm2.memory)
- self.assertEqual(testvm1.maxmem, testvm2.maxmem)
- self.assertEqual(testvm1.devices, testvm2.devices)
- self.assertEqual(testvm1.include_in_backups,
- testvm2.include_in_backups)
- self.assertEqual(testvm1.default_user, testvm2.default_user)
- self.assertEqual(testvm1.features, testvm2.features)
- self.assertEqual(testvm1.firewall.rules,
- testvm2.firewall.rules)
- # now some non-default values
- testvm1.netvm = None
- testvm1.label = 'orange'
- testvm1.memory = 512
- firewall = testvm1.firewall
- firewall.rules = [
- qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
- proto='tcp', dstports=22)]
- firewall.save()
- testvm3 = self.app.add_new_vm(testvm1.__class__,
- name=self.make_vm_name("clone2"),
- template=testvm1.template,
- label='red',)
- testvm3.clone_properties(testvm1)
- self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
- # qubes.xml reload
- self.app.save()
- testvm1 = self.app.domains[testvm1.qid]
- testvm3 = self.app.domains[testvm3.qid]
- self.assertEqual(testvm1.label, testvm3.label)
- self.assertEqual(testvm1.netvm, testvm3.netvm)
- self.assertEqual(testvm1.property_is_default('netvm'),
- testvm3.property_is_default('netvm'))
- self.assertEqual(testvm1.kernel, testvm3.kernel)
- self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
- self.assertEqual(testvm1.property_is_default('kernel'),
- testvm3.property_is_default('kernel'))
- self.assertEqual(testvm1.property_is_default('kernelopts'),
- testvm3.property_is_default('kernelopts'))
- self.assertEqual(testvm1.memory, testvm3.memory)
- self.assertEqual(testvm1.maxmem, testvm3.maxmem)
- self.assertEqual(testvm1.devices, testvm3.devices)
- self.assertEqual(testvm1.include_in_backups,
- testvm3.include_in_backups)
- self.assertEqual(testvm1.default_user, testvm3.default_user)
- self.assertEqual(testvm1.features, testvm3.features)
- self.assertEqual(testvm1.firewall.rules,
- testvm2.firewall.rules)
- def test_020_name_conflict_app(self):
- # TODO decide what exception should be here
- with self.assertRaises((qubes.exc.QubesException, ValueError)):
- self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
- name=self.vmname, template=self.app.default_template,
- label='red')
- self.loop.run_until_complete(self.vm2.create_on_disk())
- def test_021_name_conflict_template(self):
- # TODO decide what exception should be here
- with self.assertRaises((qubes.exc.QubesException, ValueError)):
- self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
- name=self.vmname, label='red')
- self.loop.run_until_complete(self.vm2.create_on_disk())
- class TC_02_QvmPrefs(qubes.tests.SystemTestCase):
- # pylint: disable=attribute-defined-outside-init
- def setUp(self):
- super(TC_02_QvmPrefs, self).setUp()
- self.init_default_template()
- self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
- def setup_appvm(self):
- self.testvm = self.app.add_new_vm(
- qubes.vm.appvm.AppVM,
- name=self.make_vm_name("vm"),
- label='red')
- self.loop.run_until_complete(self.testvm.create_on_disk())
- self.app.save()
- def setup_hvm(self):
- self.testvm = self.app.add_new_vm(
- qubes.vm.appvm.AppVM,
- name=self.make_vm_name("hvm"),
- label='red')
- self.testvm.virt_mode = 'hvm'
- self.loop.run_until_complete(self.testvm.create_on_disk())
- self.app.save()
- def pref_set(self, name, value, valid=True):
- self.loop.run_until_complete(self._pref_set(name, value, valid))
- @asyncio.coroutine
- def _pref_set(self, name, value, valid=True):
- cmd = ['qvm-prefs']
- if value != '-D':
- cmd.append('--')
- cmd.extend((self.testvm.name, name, value))
- p = yield from asyncio.create_subprocess_exec(*cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- (stdout, stderr) = yield from p.communicate()
- if valid:
- self.assertEqual(p.returncode, 0,
- "qvm-prefs .. '{}' '{}' failed: {}{}".format(
- name, value, stdout, stderr
- ))
- else:
- self.assertNotEquals(p.returncode, 0,
- "qvm-prefs should reject value '{}' for "
- "property '{}'".format(value, name))
- def pref_get(self, name):
- self.loop.run_until_complete(self._pref_get(name))
- @asyncio.coroutine
- def _pref_get(self, name):
- p = yield from asyncio.create_subprocess_exec(
- 'qvm-prefs', *self.sharedopts, '--', self.testvm.name, name,
- stdout=subprocess.PIPE)
- (stdout, _) = yield from p.communicate()
- self.assertEqual(p.returncode, 0)
- return stdout.strip()
- bool_test_values = [
- ('true', 'True', True),
- ('False', 'False', True),
- ('0', 'False', True),
- ('1', 'True', True),
- ('invalid', '', False)
- ]
- def execute_tests(self, name, values):
- """
- Helper function, which executes tests for given property.
- :param values: list of tuples (value, expected, valid),
- where 'value' is what should be set and 'expected' is what should
- qvm-prefs returns as a property value and 'valid' marks valid and
- invalid values - if it's False, qvm-prefs should reject the value
- :return: None
- """
- for (value, expected, valid) in values:
- self.pref_set(name, value, valid)
- if valid:
- self.assertEqual(self.pref_get(name), expected)
- @unittest.skip('test not converted to core3 API')
- def test_006_template(self):
- templates = [tpl for tpl in self.app.domains.values() if
- isinstance(tpl, qubes.vm.templatevm.TemplateVM)]
- if not templates:
- self.skipTest("No templates installed")
- some_template = templates[0].name
- self.setup_appvm()
- self.execute_tests('template', [
- (some_template, some_template, True),
- ('invalid', '', False),
- ])
- @unittest.skip('test not converted to core3 API')
- def test_014_pcidevs(self):
- self.setup_appvm()
- self.execute_tests('pcidevs', [
- ('[]', '[]', True),
- ('[ "00:00.0" ]', "['00:00.0']", True),
- ('invalid', '', False),
- ('[invalid]', '', False),
- # TODO:
- # ('["12:12.0"]', '', False)
- ])
- @unittest.skip('test not converted to core3 API')
- def test_024_pv_reject_hvm_props(self):
- self.setup_appvm()
- self.execute_tests('guiagent_installed', [('False', '', False)])
- self.execute_tests('qrexec_installed', [('False', '', False)])
- self.execute_tests('drive', [('/tmp/drive.img', '', False)])
- self.execute_tests('timezone', [('localtime', '', False)])
- @unittest.skip('test not converted to core3 API')
- def test_025_hvm_reject_pv_props(self):
- self.setup_hvm()
- self.execute_tests('kernel', [('default', '', False)])
- self.execute_tests('kernelopts', [('default', '', False)])
- class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
- # pylint: disable=attribute-defined-outside-init
- def setUp(self):
- super(TC_03_QvmRevertTemplateChanges, self).setUp()
- self.init_default_template()
- def setup_pv_template(self):
- self.test_template = self.app.add_new_vm(
- qubes.vm.templatevm.TemplateVM,
- name=self.make_vm_name("pv-clone"),
- label='red'
- )
- self.test_template.clone_properties(self.app.default_template)
- self.loop.run_until_complete(
- self.test_template.clone_disk_files(self.app.default_template))
- self.app.save()
- def setup_hvm_template(self):
- self.test_template = self.app.add_new_vm(
- qubes.vm.templatevm.TemplateVM,
- name=self.make_vm_name("hvm"),
- label='red',
- virt_mode='hvm',
- )
- self.loop.run_until_complete(self.test_template.create_on_disk())
- self.app.save()
- def get_rootimg_checksum(self):
- p = subprocess.Popen(
- ['sha1sum', self.test_template.volumes['root'].path],
- stdout=subprocess.PIPE)
- return p.communicate()[0]
- def _do_test(self):
- checksum_before = self.get_rootimg_checksum()
- self.loop.run_until_complete(self.test_template.start())
- self.shutdown_and_wait(self.test_template)
- checksum_changed = self.get_rootimg_checksum()
- if checksum_before == checksum_changed:
- self.log.warning("template not modified, test result will be "
- "unreliable")
- self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
- pool_vid = repr(self.test_template.volumes['root']).strip("'")
- revert_cmd = ['qvm-block', 'revert', pool_vid]
- p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
- *revert_cmd))
- self.loop.run_until_complete(p.wait())
- self.assertEqual(p.returncode, 0)
- checksum_after = self.get_rootimg_checksum()
- self.assertEqual(checksum_before, checksum_after)
- @unittest.expectedFailure
- def test_000_revert_pv(self):
- """
- Test qvm-revert-template-changes for PV template
- """
- self.setup_pv_template()
- self._do_test()
- @unittest.skip('HVM not yet implemented')
- def test_000_revert_hvm(self):
- """
- Test qvm-revert-template-changes for HVM template
- """
- # TODO: have some system there, so the root.img will get modified
- self.setup_hvm_template()
- self._do_test()
- class TC_30_Gui_daemon(qubes.tests.SystemTestCase):
- def setUp(self):
- super(TC_30_Gui_daemon, self).setUp()
- self.init_default_template()
- @unittest.skipUnless(
- spawn.find_executable('xdotool'),
- "xdotool not installed")
- def test_000_clipboard(self):
- testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
- name=self.make_vm_name('vm1'), label='red')
- self.loop.run_until_complete(testvm1.create_on_disk())
- testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
- name=self.make_vm_name('vm2'), label='red')
- self.loop.run_until_complete(testvm2.create_on_disk())
- self.app.save()
- self.loop.run_until_complete(asyncio.wait([
- testvm1.start(),
- testvm2.start()]))
- window_title = 'user@{}'.format(testvm1.name)
- self.loop.run_until_complete(testvm1.run(
- 'zenity --text-info --editable --title={}'.format(window_title)))
- self.wait_for_window(window_title)
- time.sleep(0.5)
- test_string = "test{}".format(testvm1.xid)
- # Type and copy some text
- subprocess.check_call(['xdotool', 'search', '--name', window_title,
- 'windowactivate', '--sync',
- 'type', test_string])
- # second xdotool call because type --terminator do not work (SEGV)
- # additionally do not use search here, so window stack will be empty
- # and xdotool will use XTEST instead of generating events manually -
- # this will be much better - at least because events will have
- # correct timestamp (so gui-daemon would not drop the copy request)
- subprocess.check_call(['xdotool',
- 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
- 'Escape'])
- clipboard_content = \
- open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
- self.assertEqual(clipboard_content, test_string,
- "Clipboard copy operation failed - content")
- clipboard_source = \
- open('/var/run/qubes/qubes-clipboard.bin.source',
- 'r').read().strip()
- self.assertEqual(clipboard_source, testvm1.name,
- "Clipboard copy operation failed - owner")
- # Then paste it to the other window
- window_title = 'user@{}'.format(testvm2.name)
- p = self.loop.run_until_complete(testvm2.run(
- 'zenity --entry --title={} > /tmp/test.txt'.format(window_title)))
- self.wait_for_window(window_title)
- subprocess.check_call(['xdotool', 'key', '--delay', '100',
- 'ctrl+shift+v', 'ctrl+v', 'Return'])
- self.loop.run_until_complete(p.wait())
- # And compare the result
- (test_output, _) = self.loop.run_until_complete(
- testvm2.run_for_stdio('cat /tmp/test.txt'))
- self.assertEqual(test_string, test_output.strip().decode('ascii'))
- clipboard_content = \
- open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
- self.assertEqual(clipboard_content, "",
- "Clipboard not wiped after paste - content")
- clipboard_source = \
- open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
- read().strip()
- self.assertEqual(clipboard_source, "",
- "Clipboard not wiped after paste - owner")
- class TC_05_StandaloneVM(qubes.tests.SystemTestCase):
- def setUp(self):
- super(TC_05_StandaloneVM, self).setUp()
- self.init_default_template()
- def test_000_create_start(self):
- testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
- name=self.make_vm_name('vm1'), label='red')
- testvm1.features['qrexec'] = True
- self.loop.run_until_complete(
- testvm1.clone_disk_files(self.app.default_template))
- self.app.save()
- self.loop.run_until_complete(testvm1.start())
- self.assertEqual(testvm1.get_power_state(), "Running")
- def test_100_resize_root_img(self):
- testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
- name=self.make_vm_name('vm1'), label='red')
- testvm1.features['qrexec'] = True
- self.loop.run_until_complete(
- testvm1.clone_disk_files(self.app.default_template))
- self.app.save()
- self.loop.run_until_complete(
- testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3))
- self.assertEqual(testvm1.volumes['root'].size, 20 * 1024 ** 3)
- self.loop.run_until_complete(testvm1.start())
- # new_size in 1k-blocks
- (new_size, _) = self.loop.run_until_complete(
- testvm1.run_for_stdio('df --output=size /|tail -n 1'))
- # some safety margin for FS metadata
- self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
- # vim: ts=4 sw=4 et
|