|
@@ -2,7 +2,7 @@
|
|
|
#
|
|
|
# The Qubes OS Project, https://www.qubes-os.org/
|
|
|
#
|
|
|
-# Copyright (C) 2016
|
|
|
+# Copyright (C) 2018
|
|
|
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
|
|
|
#
|
|
|
# This library is free software; you can redistribute it and/or
|
|
@@ -20,42 +20,44 @@
|
|
|
#
|
|
|
import os
|
|
|
|
|
|
+import sys
|
|
|
+
|
|
|
+import qubes
|
|
|
import qubes.tests
|
|
|
-import qubes.qubesutils
|
|
|
import subprocess
|
|
|
|
|
|
# the same class for both dom0 and VMs
|
|
|
-class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
+class TC_00_List(qubes.tests.SystemTestCase):
|
|
|
template = None
|
|
|
|
|
|
def setUp(self):
|
|
|
- super(TC_00_List, self).setUp()
|
|
|
+ super().setUp()
|
|
|
self.img_path = '/tmp/test.img'
|
|
|
self.mount_point = '/tmp/test-dir'
|
|
|
if self.template is not None:
|
|
|
- self.vm = self.qc.add_new_vm(
|
|
|
- "QubesAppVm",
|
|
|
- name=self.make_vm_name("vm"),
|
|
|
- template=self.qc.get_vm_by_name(self.template))
|
|
|
- self.vm.create_on_disk(verbose=False)
|
|
|
+ self.vm = self.app.add_new_vm(
|
|
|
+ "AppVM",
|
|
|
+ label='red',
|
|
|
+ name=self.make_vm_name("vm"))
|
|
|
+ self.loop.run_until_complete(
|
|
|
+ self.vm.create_on_disk())
|
|
|
self.app.save()
|
|
|
- self.qc.unlock_db()
|
|
|
- self.vm.start()
|
|
|
+ self.loop.run_until_complete(self.vm.start())
|
|
|
else:
|
|
|
- self.qc.unlock_db()
|
|
|
- self.vm = self.qc[0]
|
|
|
+ self.vm = self.app.domains[0]
|
|
|
|
|
|
def tearDown(self):
|
|
|
- super(TC_00_List, self).tearDown()
|
|
|
+ super().tearDown()
|
|
|
if self.template is None:
|
|
|
if os.path.exists(self.mount_point):
|
|
|
subprocess.call(['sudo', 'umount', self.mount_point])
|
|
|
subprocess.call(['sudo', 'rmdir', self.mount_point])
|
|
|
- subprocess.call(['sudo', 'dmsetup', 'remove', 'test-dm'])
|
|
|
+ if os.path.exists('/dev/mapper/test-dm'):
|
|
|
+ subprocess.call(['sudo', 'dmsetup', 'remove', 'test-dm'])
|
|
|
if os.path.exists(self.img_path):
|
|
|
loopdev = subprocess.check_output(['losetup', '-j',
|
|
|
self.img_path])
|
|
|
- for dev in loopdev.splitlines():
|
|
|
+ for dev in loopdev.decode().splitlines():
|
|
|
subprocess.call(
|
|
|
['sudo', 'losetup', '-d', dev.split(':')[0]])
|
|
|
subprocess.call(['sudo', 'rm', '-f', self.img_path])
|
|
@@ -67,9 +69,8 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
elif user == "root":
|
|
|
subprocess.check_call(['sudo', 'sh', '-c', script])
|
|
|
else:
|
|
|
- retcode = self.vm.run(script, user=user, wait=True)
|
|
|
- if retcode != 0:
|
|
|
- raise subprocess.CalledProcessError
|
|
|
+ self.loop.run_until_complete(
|
|
|
+ self.vm.run_for_stdio(script, user=user))
|
|
|
|
|
|
def test_000_list_loop(self):
|
|
|
if self.template is None:
|
|
@@ -80,19 +81,18 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
"losetup -f {path}; "
|
|
|
"udevadm settle".format(path=self.img_path), user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
found = False
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev_list[dev]['desc'] == self.img_path:
|
|
|
- self.assertTrue(dev.startswith(self.vm.name + ':loop'))
|
|
|
- self.assertEquals(dev_list[dev]['mode'], 'w')
|
|
|
- self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
|
|
|
- self.assertEquals(
|
|
|
- dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.description == self.img_path:
|
|
|
+ self.assertTrue(dev.ident.startswith('loop'))
|
|
|
+ self.assertEquals(dev.mode, 'w')
|
|
|
+ self.assertEquals(dev.size, 1024 * 1024 * 128)
|
|
|
found = True
|
|
|
|
|
|
if not found:
|
|
|
- self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
|
|
|
+ self.fail("Device {} not found in {!r}".format(
|
|
|
+ self.img_path, dev_list))
|
|
|
|
|
|
def test_001_list_loop_mounted(self):
|
|
|
if self.template is None:
|
|
@@ -108,9 +108,9 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
mntdir=self.mount_point),
|
|
|
user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev_list[dev]['desc'] == self.img_path:
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.description == self.img_path:
|
|
|
self.fail(
|
|
|
'Device {} ({}) should not be listed because is mounted'
|
|
|
.format(dev, self.img_path))
|
|
@@ -125,19 +125,17 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
"/sys/block/$(basename $loopdev)/dev) 0\";"
|
|
|
"udevadm settle".format(path=self.img_path), user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
found = False
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev.startswith(self.vm.name + ':loop'):
|
|
|
- self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.ident.startswith('loop'):
|
|
|
+ self.assertNotEquals(dev.description, self.img_path,
|
|
|
"Device {} ({}) should not be listed as it is used in "
|
|
|
"device-mapper".format(dev, self.img_path)
|
|
|
)
|
|
|
- elif dev_list[dev]['desc'] == 'test-dm':
|
|
|
- self.assertEquals(dev_list[dev]['mode'], 'w')
|
|
|
- self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
|
|
|
- self.assertEquals(
|
|
|
- dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
|
|
|
+ elif dev.description == 'test-dm':
|
|
|
+ self.assertEquals(dev.mode, 'w')
|
|
|
+ self.assertEquals(dev.size, 1024 * 1024 * 128)
|
|
|
found = True
|
|
|
|
|
|
if not found:
|
|
@@ -159,15 +157,15 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
mntdir=self.mount_point),
|
|
|
user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev.startswith(self.vm.name + ':loop'):
|
|
|
- self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.ident.startswith('loop'):
|
|
|
+ self.assertNotEquals(dev.description, self.img_path,
|
|
|
"Device {} ({}) should not be listed as it is used in "
|
|
|
"device-mapper".format(dev, self.img_path)
|
|
|
)
|
|
|
else:
|
|
|
- self.assertNotEquals(dev_list[dev]['desc'], 'test-dm',
|
|
|
+ self.assertNotEquals(dev.description, 'test-dm',
|
|
|
"Device {} ({}) should not be listed as it is "
|
|
|
"mounted".format(dev, 'test-dm')
|
|
|
)
|
|
@@ -183,19 +181,17 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
"/sys/block/$(basename $loopdev)/dev) 0\";"
|
|
|
"udevadm settle".format(path=self.img_path), user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
found = False
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev.startswith(self.vm.name + ':loop'):
|
|
|
- self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.ident.startswith('loop'):
|
|
|
+ self.assertNotEquals(dev.description, self.img_path,
|
|
|
"Device {} ({}) should not be listed as it is used in "
|
|
|
"device-mapper".format(dev, self.img_path)
|
|
|
)
|
|
|
- elif dev_list[dev]['desc'] == 'test-dm':
|
|
|
- self.assertEquals(dev_list[dev]['mode'], 'w')
|
|
|
- self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
|
|
|
- self.assertEquals(
|
|
|
- dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
|
|
|
+ elif dev.description == 'test-dm':
|
|
|
+ self.assertEquals(dev.mode, 'w')
|
|
|
+ self.assertEquals(dev.size, 1024 * 1024 * 128)
|
|
|
found = True
|
|
|
|
|
|
if not found:
|
|
@@ -216,15 +212,13 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
"dmsetup remove test-dm;"
|
|
|
"udevadm settle".format(path=self.img_path), user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
found = False
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev_list[dev]['desc'] == self.img_path:
|
|
|
- self.assertTrue(dev.startswith(self.vm.name + ':loop'))
|
|
|
- self.assertEquals(dev_list[dev]['mode'], 'w')
|
|
|
- self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
|
|
|
- self.assertEquals(
|
|
|
- dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.description == self.img_path:
|
|
|
+ self.assertTrue(dev.ident.startswith('loop'))
|
|
|
+ self.assertEquals(dev.mode, 'w')
|
|
|
+ self.assertEquals(dev.size, 1024 * 1024 * 128)
|
|
|
found = True
|
|
|
|
|
|
if not found:
|
|
@@ -242,16 +236,14 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
"blockdev --rereadpt $loopdev; "
|
|
|
"udevadm settle".format(path=self.img_path), user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
found = False
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev_list[dev]['desc'] == self.img_path:
|
|
|
- self.assertTrue(dev.startswith(self.vm.name + ':loop'))
|
|
|
- self.assertEquals(dev_list[dev]['mode'], 'w')
|
|
|
- self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
|
|
|
- self.assertEquals(
|
|
|
- dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
|
|
|
- self.assertIn(dev + 'p1', dev_list)
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.description == self.img_path:
|
|
|
+ self.assertTrue(dev.ident.startswith('loop'))
|
|
|
+ self.assertEquals(dev.mode, 'w')
|
|
|
+ self.assertEquals(dev.size, 1024 * 1024 * 128)
|
|
|
+ self.assertIn(dev.ident + 'p1', [d.ident for d in dev_list])
|
|
|
found = True
|
|
|
|
|
|
if not found:
|
|
@@ -274,14 +266,14 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
path=self.img_path, mntdir=self.mount_point),
|
|
|
user="root")
|
|
|
|
|
|
- dev_list = qubes.qubesutils.block_list_vm(self.vm)
|
|
|
- for dev in dev_list.keys():
|
|
|
- if dev_list[dev]['desc'] == self.img_path:
|
|
|
+ dev_list = list(self.vm.devices['block'])
|
|
|
+ for dev in dev_list:
|
|
|
+ if dev.description == self.img_path:
|
|
|
self.fail(
|
|
|
'Device {} ({}) should not be listed because its '
|
|
|
'partition is mounted'
|
|
|
.format(dev, self.img_path))
|
|
|
- elif dev.startswith(self.vm.name + ':loop') and dev.endswith('p1'):
|
|
|
+ elif dev.ident.startswith('loop') and dev.ident.endswith('p1'):
|
|
|
# FIXME: risky assumption that only tests create partitioned
|
|
|
# loop devices
|
|
|
self.fail(
|
|
@@ -289,21 +281,14 @@ class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
|
|
|
.format(dev, self.img_path))
|
|
|
|
|
|
|
|
|
-def load_tests(loader, tests, pattern):
|
|
|
- try:
|
|
|
- qc = qubes.qubes.QubesVmCollection()
|
|
|
- qc.lock_db_for_reading()
|
|
|
- qc.load()
|
|
|
- qc.unlock_db()
|
|
|
- templates = [vm.name for vm in qc.values() if
|
|
|
- isinstance(vm, qubes.qubes.QubesTemplateVm)]
|
|
|
- except OSError:
|
|
|
- templates = []
|
|
|
- for template in templates:
|
|
|
- tests.addTests(loader.loadTestsFromTestCase(
|
|
|
- type(
|
|
|
- 'TC_00_List_' + template,
|
|
|
- (TC_00_List, qubes.tests.QubesTestCase),
|
|
|
- {'template': template})))
|
|
|
+def create_testcases_for_templates():
|
|
|
+ return qubes.tests.create_testcases_for_templates('TC_00_List',
|
|
|
+ TC_00_List,
|
|
|
+ module=sys.modules[__name__])
|
|
|
|
|
|
+def load_tests(loader, tests, pattern):
|
|
|
+ tests.addTests(loader.loadTestsFromNames(
|
|
|
+ create_testcases_for_templates()))
|
|
|
return tests
|
|
|
+
|
|
|
+qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)
|