From 55af04293b190215c4e2ecced4fbca60f6ef7a58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 16 May 2016 04:48:29 +0200 Subject: [PATCH] tests: block devices listing QubesOS/qubes-issues#1600 --- tests/Makefile | 2 + tests/__init__.py | 1 + tests/block.py | 302 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 305 insertions(+) create mode 100644 tests/block.py diff --git a/tests/Makefile b/tests/Makefile index 993c741a..8f01e2ad 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -17,6 +17,8 @@ endif cp backupcompatibility.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp basic.py $(DESTDIR)$(PYTHON_TESTSPATH) cp basic.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) + cp block.py $(DESTDIR)$(PYTHON_TESTSPATH) + cp block.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp dom0_update.py $(DESTDIR)$(PYTHON_TESTSPATH) cp dom0_update.py[co] $(DESTDIR)$(PYTHON_TESTSPATH) cp network.py $(DESTDIR)$(PYTHON_TESTSPATH) diff --git a/tests/__init__.py b/tests/__init__.py index c9d26b49..67ceeaee 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -758,6 +758,7 @@ def load_tests(loader, tests, pattern): 'qubes.tests.regressions', 'qubes.tests.storage', 'qubes.tests.storage_xen', + 'qubes.tests.block', 'qubes.tests.hardware', 'qubes.tests.extra', ): diff --git a/tests/block.py b/tests/block.py new file mode 100644 index 00000000..927c9c14 --- /dev/null +++ b/tests/block.py @@ -0,0 +1,302 @@ +# vim: fileencoding=utf-8 +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 +# Marek Marczykowski-Górecki +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import os + +import qubes.tests +import qubes.qubesutils +import subprocess + +# the same class for both dom0 and VMs +class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): + template = None + + def setUp(self): + super(TC_00_List, self).setUp() + self.img_path = '/tmp/test.img' + self.mount_point = '/tmp/test-dir' + if self.template is not None: + self.vm = self.qc.add_new_vm( + "QubesAppVm", + name=self.make_vm_name("vm"), + template=self.qc.get_vm_by_name(self.template)) + self.vm.create_on_disk(verbose=False) + self.vm.start() + else: + self.vm = self.qc[0] + + def tearDown(self): + super(TC_00_List, self).tearDown() + if self.template is None: + if os.path.exists(self.mount_point): + subprocess.call(['sudo', 'umount', self.mount_point]) + subprocess.call(['sudo', 'rmdir', self.mount_point]) + subprocess.call(['sudo', 'dmsetup', 'remove', 'test-dm']) + if os.path.exists(self.img_path): + loopdev = subprocess.check_output(['losetup', '-j', + self.img_path]) + for dev in loopdev.splitlines(): + subprocess.call( + ['sudo', 'losetup', '-d', dev.split(':')[0]]) + subprocess.call(['sudo', 'rm', '-f', self.img_path]) + + def run_script(self, script, user="user"): + if self.template is None: + if user == "user": + subprocess.check_call(script, shell=True) + elif user == "root": + subprocess.check_call(['sudo', 'sh', '-c', script]) + else: + retcode = self.vm.run(script, user=user, wait=True) + if retcode != 0: + raise subprocess.CalledProcessError + + def test_000_list_loop(self): + if self.template is None: + self.skipTest('loop devices excluded in dom0') + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "losetup -f {path}; " + "udevadm settle".format(path=self.img_path), user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + found = False + for dev in dev_list.keys(): + if dev_list[dev]['desc'] == self.img_path: + self.assertTrue(dev.startswith(self.vm.name + ':loop')) + self.assertEquals(dev_list[dev]['mode'], 'w') + self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128) + self.assertEquals( + dev_list[dev]['device'], '/dev/' + dev.split(':')[1]) + found = True + + if not found: + self.fail("Device {} not found in {!r}".format(self.img_path, dev_list)) + + def test_001_list_loop_mounted(self): + if self.template is None: + self.skipTest('loop devices excluded in dom0') + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "mkfs.ext4 -q -F {path}; " + "mkdir -p {mntdir}; " + "mount {path} {mntdir} -o loop; " + "udevadm settle".format( + path=self.img_path, + mntdir=self.mount_point), + user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + for dev in dev_list.keys(): + if dev_list[dev]['desc'] == self.img_path: + self.fail( + 'Device {} ({}) should not be listed because is mounted' + .format(dev, self.img_path)) + + def test_010_list_dm(self): + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "loopdev=`losetup -f`; " + "losetup $loopdev {path}; " + "dmsetup create test-dm --table \"0 262144 linear $(cat " + "/sys/block/$(basename $loopdev)/dev) 0\";" + "udevadm settle".format(path=self.img_path), user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + found = False + for dev in dev_list.keys(): + if dev.startswith(self.vm.name + ':loop'): + self.assertNotEquals(dev_list[dev]['desc'], self.img_path, + "Device {} ({}) should not be listed as it is used in " + "device-mapper".format(dev, self.img_path) + ) + elif dev_list[dev]['desc'] == 'test-dm': + self.assertEquals(dev_list[dev]['mode'], 'w') + self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128) + self.assertEquals( + dev_list[dev]['device'], '/dev/' + dev.split(':')[1]) + found = True + + if not found: + self.fail("Device {} not found in {!r}".format('test-dm', dev_list)) + + def test_011_list_dm_mounted(self): + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "loopdev=`losetup -f`; " + "losetup $loopdev {path}; " + "dmsetup create test-dm --table \"0 262144 linear $(cat " + "/sys/block/$(basename $loopdev)/dev) 0\";" + "mkfs.ext4 -q -F /dev/mapper/test-dm;" + "mkdir -p {mntdir};" + "mount /dev/mapper/test-dm {mntdir};" + "udevadm settle".format( + path=self.img_path, + mntdir=self.mount_point), + user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + for dev in dev_list.keys(): + if dev.startswith(self.vm.name + ':loop'): + self.assertNotEquals(dev_list[dev]['desc'], self.img_path, + "Device {} ({}) should not be listed as it is used in " + "device-mapper".format(dev, self.img_path) + ) + else: + self.assertNotEquals(dev_list[dev]['desc'], 'test-dm', + "Device {} ({}) should not be listed as it is " + "mounted".format(dev, 'test-dm') + ) + + def test_012_list_dm_delayed(self): + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "loopdev=`losetup -f`; " + "losetup $loopdev {path}; " + "udevadm settle; " + "dmsetup create test-dm --table \"0 262144 linear $(cat " + "/sys/block/$(basename $loopdev)/dev) 0\";" + "udevadm settle".format(path=self.img_path), user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + found = False + for dev in dev_list.keys(): + if dev.startswith(self.vm.name + ':loop'): + self.assertNotEquals(dev_list[dev]['desc'], self.img_path, + "Device {} ({}) should not be listed as it is used in " + "device-mapper".format(dev, self.img_path) + ) + elif dev_list[dev]['desc'] == 'test-dm': + self.assertEquals(dev_list[dev]['mode'], 'w') + self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128) + self.assertEquals( + dev_list[dev]['device'], '/dev/' + dev.split(':')[1]) + found = True + + if not found: + self.fail("Device {} not found in {!r}".format('test-dm', dev_list)) + + def test_013_list_dm_removed(self): + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "loopdev=`losetup -f`; " + "losetup $loopdev {path}; " + "dmsetup create test-dm --table \"0 262144 linear $(cat " + "/sys/block/$(basename $loopdev)/dev) 0\";" + "udevadm settle;" + "dmsetup remove test-dm;" + "udevadm settle".format(path=self.img_path), user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + found = False + for dev in dev_list.keys(): + if dev_list[dev]['desc'] == self.img_path: + self.assertTrue(dev.startswith(self.vm.name + ':loop')) + self.assertEquals(dev_list[dev]['mode'], 'w') + self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128) + self.assertEquals( + dev_list[dev]['device'], '/dev/' + dev.split(':')[1]) + found = True + + if not found: + self.fail("Device {} not found in {!r}".format(self.img_path, dev_list)) + + def test_020_list_loop_partition(self): + if self.template is None: + self.skipTest('loop devices excluded in dom0') + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "echo ,,L | sfdisk {path};" + "loopdev=`losetup -f`; " + "losetup -P $loopdev {path}; " + "udevadm settle".format(path=self.img_path), user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + found = False + for dev in dev_list.keys(): + if dev_list[dev]['desc'] == self.img_path: + self.assertTrue(dev.startswith(self.vm.name + ':loop')) + self.assertEquals(dev_list[dev]['mode'], 'w') + self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128) + self.assertEquals( + dev_list[dev]['device'], '/dev/' + dev.split(':')[1]) + self.assertIn(dev + 'p1', dev_list) + found = True + + if not found: + self.fail("Device {} not found in {!r}".format(self.img_path, dev_list)) + + def test_021_list_loop_partition_mounted(self): + if self.template is None: + self.skipTest('loop devices excluded in dom0') + self.run_script( + "set -e;" + "truncate -s 128M {path}; " + "echo ,,L | sfdisk {path};" + "loopdev=`losetup -f`; " + "losetup -P $loopdev {path}; " + "mkfs.ext4 -q -F ${{loopdev}}p1; " + "mkdir -p {mntdir}; " + "mount ${{loopdev}}p1 {mntdir}; " + "udevadm settle".format( + path=self.img_path, mntdir=self.mount_point), + user="root") + + dev_list = qubes.qubesutils.block_list_vm(self.vm) + for dev in dev_list.keys(): + if dev_list[dev]['desc'] == self.img_path: + self.fail( + 'Device {} ({}) should not be listed because its ' + 'partition is mounted' + .format(dev, self.img_path)) + elif dev.startswith(self.vm.name + ':loop') and dev.endswith('p1'): + # FIXME: risky assumption that only tests create partitioned + # loop devices + self.fail( + 'Device {} ({}) should not be listed because is mounted' + .format(dev, self.img_path)) + + +def load_tests(loader, tests, pattern): + try: + qc = qubes.qubes.QubesVmCollection() + qc.lock_db_for_reading() + qc.load() + qc.unlock_db() + templates = [vm.name for vm in qc.values() if + isinstance(vm, qubes.qubes.QubesTemplateVm)] + except OSError: + templates = [] + for template in templates: + tests.addTests(loader.loadTestsFromTestCase( + type( + 'TC_00_List_' + template, + (TC_00_List, qubes.tests.QubesTestCase), + {'template': template}))) + + return tests