# pylint: disable=invalid-name # # The Qubes OS Project, https://www.qubes-os.org/ # # Copyright (C) 2014-2015 # Marek Marczykowski-Górecki # Copyright (C) 2015 Wojtek Porczyk # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, see . # from distutils import spawn import asyncio import os import subprocess import tempfile import time import unittest import collections import pkg_resources import shutil import sys import qubes import qubes.firewall import qubes.tests import qubes.storage import qubes.vm.appvm import qubes.vm.qubesvm import qubes.vm.standalonevm import qubes.vm.templatevm import libvirt # pylint: disable=import-error class TC_00_Basic(qubes.tests.SystemTestCase): def setUp(self): super(TC_00_Basic, self).setUp() self.init_default_template() def test_000_qubes_create(self): self.assertIsInstance(self.app, qubes.Qubes) def test_100_qvm_create(self): vmname = self.make_vm_name('appvm') vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vmname, template=self.app.default_template, label='red') self.assertIsNotNone(vm) self.assertEqual(vm.name, vmname) self.assertEqual(vm.template, self.app.default_template) self.loop.run_until_complete(vm.create_on_disk()) with self.assertNotRaises(qubes.exc.QubesException): self.loop.run_until_complete(vm.storage.verify()) def test_040_qdb_watch(self): flag = set() def handler(vm, event, path): if path == '/test-watch-path': flag.add(True) vm = self.app.domains[0] vm.watch_qdb_path('/test-watch-path') vm.add_handler('domain-qdb-change:/test-watch-path', handler) self.assertFalse(flag) vm.untrusted_qdb.write('/test-watch-path', 'test-value') self.loop.run_until_complete(asyncio.sleep(0.1)) self.assertTrue(flag) @unittest.skipUnless( spawn.find_executable('xdotool'), "xdotool not installed") def test_120_start_standalone_with_cdrom_dom0(self): vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm('StandaloneVM', label='red', name=vmname) self.loop.run_until_complete(self.vm.create_on_disk()) self.vm.kernel = None self.vm.virt_mode = 'hvm' iso_path = self.create_bootable_iso() # start the VM using qvm-start tool, to test --cdrom option there p = self.loop.run_until_complete(asyncio.create_subprocess_exec( 'qvm-start', '--cdrom=dom0:' + iso_path, self.vm.name, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) (stdout, _) = self.loop.run_until_complete(p.communicate()) self.assertEqual(p.returncode, 0, stdout) # check if VM do not crash instantly self.loop.run_until_complete(asyncio.sleep(5)) self.assertTrue(self.vm.is_running()) # Type 'poweroff' subprocess.check_call(['xdotool', 'search', '--name', self.vm.name, 'type', '--window', '%1', 'poweroff\r']) for _ in range(10): if not self.vm.is_running(): break self.loop.run_until_complete(asyncio.sleep(1)) self.assertFalse(self.vm.is_running()) def _test_200_on_domain_start(self, vm, event, **_kwargs): '''Simulate domain crash just after startup''' vm.libvirt_domain.destroy() def test_200_shutdown_event_race(self): '''Regression test for 3164''' vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vmname, template=self.app.default_template, label='red') # help the luck a little - don't wait for qrexec to easier win the race self.vm.features['qrexec'] = False self.loop.run_until_complete(self.vm.create_on_disk()) # another way to help the luck a little - make sure the private # volume is first in (normally unordered) dict - this way if any # volume action fails, it will be at or after private volume - not # before (preventing private volume action) old_volumes = self.vm.volumes self.vm.volumes = collections.OrderedDict() self.vm.volumes['private'] = old_volumes.pop('private') self.vm.volumes.update(old_volumes.items()) del old_volumes self.loop.run_until_complete(self.vm.start()) # kill it the way it does not give a chance for domain-shutdown it # execute self.vm.libvirt_domain.destroy() # now, lets try to start the VM again, before domain-shutdown event # got handled (#3164), and immediately trigger second domain-shutdown self.vm.add_handler('domain-start', self._test_200_on_domain_start) self.loop.run_until_complete(self.vm.start()) # and give a chance for both domain-shutdown handlers to execute self.loop.run_until_complete(asyncio.sleep(1)) with self.assertNotRaises(qubes.exc.QubesException): # if the above caused two domain-shutdown handlers being called # one after another, private volume is gone self.loop.run_until_complete(self.vm.storage.verify()) def _test_201_on_domain_pre_start(self, vm, event, **_kwargs): '''Simulate domain crash just after startup''' if not self.domain_shutdown_handled and not self.test_failure_reason: self.test_failure_reason = \ 'domain-shutdown event was not dispatched before subsequent ' \ 'start' self.domain_shutdown_handled = False def _test_201_domain_shutdown_handler(self, vm, event, **kwargs): if self.domain_shutdown_handled and not self.test_failure_reason: self.test_failure_reason = 'domain-shutdown event received twice' self.domain_shutdown_handled = True def test_201_shutdown_event_race(self): '''Regression test for 3164 - pure events edition''' vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vmname, template=self.app.default_template, label='red') # help the luck a little - don't wait for qrexec to easier win the race self.vm.features['qrexec'] = False self.loop.run_until_complete(self.vm.create_on_disk()) # do not throw exception from inside event handler - test framework # will not recover from it (various objects leaks) self.test_failure_reason = None self.domain_shutdown_handled = False self.vm.add_handler('domain-shutdown', self._test_201_domain_shutdown_handler) self.loop.run_until_complete(self.vm.start()) if self.test_failure_reason: self.fail(self.test_failure_reason) self.vm.add_handler('domain-pre-start', self._test_201_on_domain_pre_start) # kill it the way it does not give a chance for domain-shutdown it # execute self.vm.libvirt_domain.destroy() # now, lets try to start the VM again, before domain-shutdown event # got handled (#3164), and immediately trigger second domain-shutdown self.vm.add_handler('domain-start', self._test_200_on_domain_start) self.loop.run_until_complete(self.vm.start()) if self.test_failure_reason: self.fail(self.test_failure_reason) while self.vm.get_power_state() != 'Halted': self.loop.run_until_complete(asyncio.sleep(1)) # and give a chance for both domain-shutdown handlers to execute self.loop.run_until_complete(asyncio.sleep(3)) if self.test_failure_reason: self.fail(self.test_failure_reason) self.assertTrue(self.domain_shutdown_handled, 'second domain-shutdown event was not dispatched after domain ' 'shutdown') def _check_udev_for_uuid(self, uuid_value): udev_data_path = '/run/udev/data' for udev_item in os.listdir(udev_data_path): # check only block devices if not udev_item.startswith('b'): continue with open(os.path.join(udev_data_path, udev_item)) as udev_file: self.assertNotIn(uuid_value, udev_file.read(), 'udev parsed filesystem UUID! ' + udev_item) def assertVolumesExcludedFromUdev(self, vm): try: # first boot, mkfs private volume self.loop.run_until_complete(vm.start()) # get private volume UUID private_uuid, _ = self.loop.run_until_complete( vm.run_for_stdio('blkid -o value /dev/xvdb', user='root')) private_uuid = private_uuid.decode().splitlines()[0] # now check if dom0 udev know about it - it shouldn't self._check_udev_for_uuid(private_uuid) # now restart the VM and check again self.loop.run_until_complete(vm.shutdown(wait=True)) self.loop.run_until_complete(vm.start()) self._check_udev_for_uuid(private_uuid) finally: del vm def test_202_udev_block_exclude_default(self): '''Check if VM images are excluded from udev parsing - default volume pool''' vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vmname, template=self.app.default_template, label='red') self.loop.run_until_complete(self.vm.create_on_disk()) self.assertVolumesExcludedFromUdev(self.vm) def test_203_udev_block_exclude_varlibqubes(self): '''Check if VM images are excluded from udev parsing - varlibqubes pool''' vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vmname, template=self.app.default_template, label='red') self.loop.run_until_complete(self.vm.create_on_disk( pool=self.app.pools['varlibqubes'])) self.assertVolumesExcludedFromUdev(self.vm) def test_204_udev_block_exclude_custom_file(self): '''Check if VM images are excluded from udev parsing - custom file pool''' vmname = self.make_vm_name('appvm') pool_path = tempfile.mkdtemp( prefix='qubes-pool-', dir='/var/tmp') self.addCleanup(shutil.rmtree, pool_path) pool = self.app.add_pool('test-filep', dir_path=pool_path, driver='file') self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=vmname, template=self.app.default_template, label='red') self.loop.run_until_complete(self.vm.create_on_disk( pool=pool)) self.assertVolumesExcludedFromUdev(self.vm) class TC_01_Properties(qubes.tests.SystemTestCase): # pylint: disable=attribute-defined-outside-init def setUp(self): super(TC_01_Properties, self).setUp() self.init_default_template() self.vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname, template=self.app.default_template, label='red') self.loop.run_until_complete(self.vm.create_on_disk()) self.addCleanup(self.cleanup_props) def cleanup_props(self): del self.vm def test_030_clone(self): try: testvm1 = self.app.add_new_vm( qubes.vm.appvm.AppVM, name=self.make_vm_name("vm"), template=self.app.default_template, label='red') self.loop.run_until_complete(testvm1.create_on_disk()) testvm2 = self.app.add_new_vm(testvm1.__class__, name=self.make_vm_name("clone"), template=testvm1.template, label='red') testvm2.clone_properties(testvm1) testvm2.firewall.clone(testvm1.firewall) self.loop.run_until_complete(testvm2.clone_disk_files(testvm1)) self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify())) self.assertIn('source', testvm1.volumes['root'].config) self.assertNotEquals(testvm2, None) self.assertNotEquals(testvm2.volumes, {}) self.assertIn('source', testvm2.volumes['root'].config) # qubes.xml reload self.app.save() testvm1 = self.app.domains[testvm1.qid] testvm2 = self.app.domains[testvm2.qid] self.assertEqual(testvm1.label, testvm2.label) self.assertEqual(testvm1.netvm, testvm2.netvm) self.assertEqual(testvm1.property_is_default('netvm'), testvm2.property_is_default('netvm')) self.assertEqual(testvm1.kernel, testvm2.kernel) self.assertEqual(testvm1.kernelopts, testvm2.kernelopts) self.assertEqual(testvm1.property_is_default('kernel'), testvm2.property_is_default('kernel')) self.assertEqual(testvm1.property_is_default('kernelopts'), testvm2.property_is_default('kernelopts')) self.assertEqual(testvm1.memory, testvm2.memory) self.assertEqual(testvm1.maxmem, testvm2.maxmem) self.assertEqual(testvm1.devices, testvm2.devices) self.assertEqual(testvm1.include_in_backups, testvm2.include_in_backups) self.assertEqual(testvm1.default_user, testvm2.default_user) self.assertEqual(testvm1.features, testvm2.features) self.assertEqual(testvm1.firewall.rules, testvm2.firewall.rules) # now some non-default values testvm1.netvm = None testvm1.label = 'orange' testvm1.memory = 512 firewall = testvm1.firewall firewall.rules = [ qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24', proto='tcp', dstports=22)] firewall.save() testvm3 = self.app.add_new_vm(testvm1.__class__, name=self.make_vm_name("clone2"), template=testvm1.template, label='red',) testvm3.clone_properties(testvm1) testvm3.firewall.clone(testvm1.firewall) self.loop.run_until_complete(testvm3.clone_disk_files(testvm1)) # qubes.xml reload self.app.save() testvm1 = self.app.domains[testvm1.qid] testvm3 = self.app.domains[testvm3.qid] self.assertEqual(testvm1.label, testvm3.label) self.assertEqual(testvm1.netvm, testvm3.netvm) self.assertEqual(testvm1.property_is_default('netvm'), testvm3.property_is_default('netvm')) self.assertEqual(testvm1.kernel, testvm3.kernel) self.assertEqual(testvm1.kernelopts, testvm3.kernelopts) self.assertEqual(testvm1.property_is_default('kernel'), testvm3.property_is_default('kernel')) self.assertEqual(testvm1.property_is_default('kernelopts'), testvm3.property_is_default('kernelopts')) self.assertEqual(testvm1.memory, testvm3.memory) self.assertEqual(testvm1.maxmem, testvm3.maxmem) self.assertEqual(testvm1.devices, testvm3.devices) self.assertEqual(testvm1.include_in_backups, testvm3.include_in_backups) self.assertEqual(testvm1.default_user, testvm3.default_user) self.assertEqual(testvm1.features, testvm3.features) self.assertEqual(testvm1.firewall.rules, testvm3.firewall.rules) finally: try: del firewall except NameError: pass try: del testvm1 except NameError: pass try: del testvm2 except NameError: pass try: del testvm3 except NameError: pass def test_020_name_conflict_app(self): # TODO decide what exception should be here with self.assertRaises((qubes.exc.QubesException, ValueError)): self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname, template=self.app.default_template, label='red') self.loop.run_until_complete(self.vm2.create_on_disk()) def test_021_name_conflict_template(self): # TODO decide what exception should be here with self.assertRaises((qubes.exc.QubesException, ValueError)): self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM, name=self.vmname, label='red') self.loop.run_until_complete(self.vm2.create_on_disk()) class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase): # pylint: disable=attribute-defined-outside-init def setUp(self): super(TC_03_QvmRevertTemplateChanges, self).setUp() self.init_default_template() def cleanup_template(self): del self.test_template def setup_template(self): self.test_template = self.app.add_new_vm( qubes.vm.templatevm.TemplateVM, name=self.make_vm_name("pv-clone"), label='red' ) self.addCleanup(self.cleanup_template) self.test_template.clone_properties(self.app.default_template) self.test_template.features.update(self.app.default_template.features) self.test_template.tags.update(self.app.default_template.tags) self.loop.run_until_complete( self.test_template.clone_disk_files(self.app.default_template)) self.test_template.volumes['root'].revisions_to_keep = 3 self.app.save() def get_rootimg_checksum(self): return subprocess.check_output( ['sha1sum', self.test_template.volumes['root'].export()]).\ decode().split(' ')[0] def _do_test(self): checksum_before = self.get_rootimg_checksum() self.loop.run_until_complete(self.test_template.start()) self.shutdown_and_wait(self.test_template) checksum_changed = self.get_rootimg_checksum() if checksum_before == checksum_changed: self.log.warning("template not modified, test result will be " "unreliable") self.assertNotEqual(self.test_template.volumes['root'].revisions, {}) revert_cmd = ['qvm-volume', 'revert', self.test_template.name + ':root'] p = self.loop.run_until_complete(asyncio.create_subprocess_exec( *revert_cmd)) self.loop.run_until_complete(p.wait()) self.assertEqual(p.returncode, 0) del p checksum_after = self.get_rootimg_checksum() self.assertEqual(checksum_before, checksum_after) def test_000_revert_linux(self): """ Test qvm-revert-template-changes for PV template """ self.setup_template() self._do_test() @unittest.skip('TODO: some non-linux system') def test_001_revert_non_linux(self): """ Test qvm-revert-template-changes for HVM template """ # TODO: have some system there, so the root.img will get modified self.setup_template() self._do_test() class TC_30_Gui_daemon(qubes.tests.SystemTestCase): def setUp(self): super(TC_30_Gui_daemon, self).setUp() self.init_default_template() @unittest.skipUnless( spawn.find_executable('xdotool'), "xdotool not installed") def test_000_clipboard(self): testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm1'), label='red') self.loop.run_until_complete(testvm1.create_on_disk()) testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.make_vm_name('vm2'), label='red') self.loop.run_until_complete(testvm2.create_on_disk()) self.app.save() self.loop.run_until_complete(asyncio.wait([ testvm1.start(), testvm2.start()])) self.loop.run_until_complete(asyncio.wait([ self.wait_for_session(testvm1), self.wait_for_session(testvm2)])) window_title = 'user@{}'.format(testvm1.name) self.loop.run_until_complete(testvm1.run( 'zenity --text-info --editable --title={}'.format(window_title))) self.wait_for_window(window_title) time.sleep(0.5) test_string = "test{}".format(testvm1.xid) # Type and copy some text subprocess.check_call(['xdotool', 'search', '--name', window_title, 'windowactivate', '--sync', 'type', test_string]) # second xdotool call because type --terminator do not work (SEGV) # additionally do not use search here, so window stack will be empty # and xdotool will use XTEST instead of generating events manually - # this will be much better - at least because events will have # correct timestamp (so gui-daemon would not drop the copy request) subprocess.check_call(['xdotool', 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c', 'Escape']) clipboard_content = \ open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() self.assertEqual(clipboard_content, test_string, "Clipboard copy operation failed - content") clipboard_source = \ open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read().strip() self.assertEqual(clipboard_source, testvm1.name, "Clipboard copy operation failed - owner") # Then paste it to the other window window_title = 'user@{}'.format(testvm2.name) p = self.loop.run_until_complete(testvm2.run( 'zenity --entry --title={} > /tmp/test.txt'.format(window_title))) self.wait_for_window(window_title) subprocess.check_call(['xdotool', 'key', '--delay', '100', 'ctrl+shift+v', 'ctrl+v', 'Return']) self.loop.run_until_complete(p.wait()) # And compare the result (test_output, _) = self.loop.run_until_complete( testvm2.run_for_stdio('cat /tmp/test.txt')) self.assertEqual(test_string, test_output.strip().decode('ascii')) clipboard_content = \ open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip() self.assertEqual(clipboard_content, "", "Clipboard not wiped after paste - content") clipboard_source = \ open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\ read().strip() self.assertEqual(clipboard_source, "", "Clipboard not wiped after paste - owner") class TC_05_StandaloneVMMixin(object): def setUp(self): super(TC_05_StandaloneVMMixin, self).setUp() self.init_default_template(self.template) def test_000_create_start(self): self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, name=self.make_vm_name('vm1'), label='red') self.testvm1.features.update(self.app.default_template.features) self.loop.run_until_complete( self.testvm1.clone_disk_files(self.app.default_template)) self.app.save() self.loop.run_until_complete(self.testvm1.start()) self.assertEqual(self.testvm1.get_power_state(), "Running") def test_100_resize_root_img(self): self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, name=self.make_vm_name('vm1'), label='red') self.testvm1.features.update(self.app.default_template.features) self.loop.run_until_complete( self.testvm1.clone_disk_files(self.app.default_template)) self.app.save() try: self.loop.run_until_complete( self.testvm1.storage.resize(self.testvm1.volumes['root'], 20 * 1024 ** 3)) except (subprocess.CalledProcessError, qubes.storage.StoragePoolException) as e: # exception object would leak VM reference self.fail(str(e)) self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3) self.loop.run_until_complete(self.testvm1.start()) # new_size in 1k-blocks (new_size, _) = self.loop.run_until_complete( self.testvm1.run_for_stdio('df --output=size /|tail -n 1')) # some safety margin for FS metadata self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2) def test_101_resize_root_img_online(self): self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM, name=self.make_vm_name('vm1'), label='red') self.testvm1.features['qrexec'] = True self.loop.run_until_complete( self.testvm1.clone_disk_files(self.app.default_template)) self.testvm1.features.update(self.app.default_template.features) self.app.save() self.loop.run_until_complete(self.testvm1.start()) try: self.loop.run_until_complete( self.testvm1.storage.resize(self.testvm1.volumes['root'], 20 * 1024 ** 3)) except (subprocess.CalledProcessError, qubes.storage.StoragePoolException) as e: # exception object would leak VM reference self.fail(str(e)) self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3) # new_size in 1k-blocks (new_size, _) = self.loop.run_until_complete( self.testvm1.run_for_stdio('df --output=size /|tail -n 1')) # some safety margin for FS metadata self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2) class TC_06_AppVMMixin(object): template = None def setUp(self): super(TC_06_AppVMMixin, self).setUp() self.init_default_template(self.template) @unittest.skipUnless( spawn.find_executable('xdotool'), "xdotool not installed") def test_121_start_standalone_with_cdrom_vm(self): cdrom_vmname = self.make_vm_name('cdrom') self.cdrom_vm = self.app.add_new_vm('AppVM', label='red', name=cdrom_vmname) self.loop.run_until_complete(self.cdrom_vm.create_on_disk()) self.loop.run_until_complete(self.cdrom_vm.start()) iso_path = self.create_bootable_iso() with open(iso_path, 'rb') as iso_f: self.loop.run_until_complete( self.cdrom_vm.run_for_stdio('cat > /home/user/boot.iso', stdin=iso_f)) vmname = self.make_vm_name('appvm') self.vm = self.app.add_new_vm('StandaloneVM', label='red', name=vmname) self.loop.run_until_complete(self.vm.create_on_disk()) self.vm.kernel = None self.vm.virt_mode = 'hvm' # start the VM using qvm-start tool, to test --cdrom option there p = self.loop.run_until_complete(asyncio.create_subprocess_exec( 'qvm-start', '--cdrom=' + cdrom_vmname + ':/home/user/boot.iso', self.vm.name, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) (stdout, _) = self.loop.run_until_complete(p.communicate()) self.assertEqual(p.returncode, 0, stdout) # check if VM do not crash instantly self.loop.run_until_complete(asyncio.sleep(5)) self.assertTrue(self.vm.is_running()) # Type 'poweroff' subprocess.check_call(['xdotool', 'search', '--name', self.vm.name, 'type', '--window', '%1', 'poweroff\r']) for _ in range(10): if not self.vm.is_running(): break self.loop.run_until_complete(asyncio.sleep(1)) self.assertFalse(self.vm.is_running()) def create_testcases_for_templates(): yield from qubes.tests.create_testcases_for_templates('TC_05_StandaloneVM', TC_05_StandaloneVMMixin, qubes.tests.SystemTestCase, module=sys.modules[__name__]) yield from qubes.tests.create_testcases_for_templates('TC_06_AppVM', TC_06_AppVMMixin, qubes.tests.SystemTestCase, module=sys.modules[__name__]) def load_tests(loader, tests, pattern): tests.addTests(loader.loadTestsFromNames( create_testcases_for_templates())) return tests qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates) # vim: ts=4 sw=4 et