tests: convert tests/int/basic to core3 API

Not all tests converted - some of them will be removed in the next
commit. Also it doesn't mean that all tested functionality already
works...
This commit is contained in:
Marek Marczykowski-Górecki 2016-02-10 17:30:29 +01:00 committed by Wojtek Porczyk
parent 2daba9e0eb
commit 81bc615bce

View File

@ -31,17 +31,19 @@ import tempfile
import unittest import unittest
import time import time
from qubes.qubes import QubesVmCollection, QubesException, system_path import libvirt
import qubes import qubes
import qubes.vm.qubesvm import qubes.vm.qubesvm
import qubes.vm.appvm
import qubes.vm.templatevm
import qubes.tests import qubes.tests
from qubes.qubes import QubesVmLabels
class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self): def setUp(self):
self.app = qubes.Qubes.create_empty_store(qubes.tests.XMLPATH) super(TC_00_Basic, self).setUp()
self.init_default_template()
def test_000_qubes_create(self): def test_000_qubes_create(self):
self.assertIsInstance(self.app, qubes.Qubes) self.assertIsInstance(self.app, qubes.Qubes)
@ -51,35 +53,38 @@ class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def test_100_qvm_create(self): def test_100_qvm_create(self):
app = qubes.Qubes(qubes.tests.XMLPATH)
vmname = self.make_vm_name('appvm') vmname = self.make_vm_name('appvm')
vm = app.add_new_vm(qubes.vm.appvm.AppVM, vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=vmname, template=self.qc.get_default_template()) name=vmname, template=self.app.default_template,
label='red'
)
self.assertIsNotNone(vm) self.assertIsNotNone(vm)
self.assertEqual(vm.name, vmname) self.assertEqual(vm.name, vmname)
self.assertEqual(vm.template, self.qc.get_default_template()) self.assertEqual(vm.template, self.app.default_template)
vm.create_on_disk(verbose=False) vm.create_on_disk()
with self.assertNotRaises(qubes.qubes.QubesException): with self.assertNotRaises(qubes.exc.QubesException):
vm.verify_files() vm.verify_files()
class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self): def setUp(self):
super(TC_01_Properties, self).setUp() super(TC_01_Properties, self).setUp()
self.init_default_template()
self.vmname = self.make_vm_name('appvm') self.vmname = self.make_vm_name('appvm')
self.vm = self.qc.add_new_vm('QubesAppVm', self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.vmname, template=self.qc.get_default_template()) name=self.vmname,
self.vm.create_on_disk(verbose=False) label='red')
self.vm.create_on_disk()
def save_and_reload_db(self): def save_and_reload_db(self):
super(TC_01_Properties, self).save_and_reload_db() super(TC_01_Properties, self).save_and_reload_db()
if hasattr(self, 'vm'): if hasattr(self, 'vm'):
self.vm = self.qc.get(self.vm.qid, None) self.vm = self.app.domains[self.vm.qid]
if hasattr(self, 'netvm'): if hasattr(self, 'netvm'):
self.netvm = self.qc.get(self.netvm.qid, None) self.netvm = self.app[self.netvm.qid]
def test_000_rename(self): def test_000_rename(self):
newname = self.make_vm_name('newname') newname = self.make_vm_name('newname')
@ -92,11 +97,12 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
format(self.vmname)) format(self.vmname))
pre_rename_firewall = self.vm.get_firewall_conf() pre_rename_firewall = self.vm.get_firewall_conf()
#TODO: change to setting property when implemented with self.assertNotRaises(
self.vm.set_name(newname) (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
self.vm.name = newname
self.assertEqual(self.vm.name, newname) self.assertEqual(self.vm.name, newname)
self.assertEqual(self.vm.dir_path, self.assertEqual(self.vm.dir_path,
os.path.join(system_path['qubes_appvms_dir'], newname)) os.path.join(qubes.config.system_path['qubes_appvms_dir'], newname))
self.assertEqual(self.vm.conf_file, self.assertEqual(self.vm.conf_file,
os.path.join(self.vm.dir_path, newname + '.conf')) os.path.join(self.vm.dir_path, newname + '.conf'))
self.assertTrue(os.path.exists( self.assertTrue(os.path.exists(
@ -117,7 +123,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
os.path.join(os.getenv("HOME"), ".local/share/applications", os.path.join(os.getenv("HOME"), ".local/share/applications",
self.vmname + "-firefox.desktop"))) self.vmname + "-firefox.desktop")))
self.assertEquals(pre_rename_firewall, self.vm.get_firewall_conf()) self.assertEquals(pre_rename_firewall, self.vm.get_firewall_conf())
with self.assertNotRaises((QubesException, OSError)): with self.assertNotRaises((qubes.exc.QubesException, OSError)):
self.vm.write_firewall_conf({'allow': False}) self.vm.write_firewall_conf({'allow': False})
self.assertTrue(self.vm.autostart) self.assertTrue(self.vm.autostart)
self.assertTrue(os.path.exists( self.assertTrue(os.path.exists(
@ -187,36 +193,38 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.vm.dispvm_netvm = self.vm self.vm.dispvm_netvm = self.vm
def test_030_clone(self): def test_030_clone(self):
testvm1 = self.qc.add_new_vm( testvm1 = self.app.add_new_vm(
"QubesAppVm", qubes.vm.appvm.AppVM,
name=self.make_vm_name("vm"), name=self.make_vm_name("vm"),
template=self.qc.get_default_template()) label='red')
testvm1.create_on_disk(verbose=False) testvm1.create_on_disk()
testvm2 = self.qc.add_new_vm(testvm1.__class__.__name__, testvm2 = self.app.add_new_vm(testvm1.__class__,
name=self.make_vm_name("clone"), name=self.make_vm_name("clone"),
template=testvm1.template, template=testvm1.template,
label='red',
) )
testvm2.clone_attrs(src_vm=testvm1) testvm2.clone_properties(testvm1)
testvm2.clone_disk_files(src_vm=testvm1, verbose=False) testvm2.clone_disk_files(testvm1)
# qubes.xml reload # qubes.xml reload
self.save_and_reload_db() self.save_and_reload_db()
testvm1 = self.qc[testvm1.qid] testvm1 = self.app.domains[testvm1.qid]
testvm2 = self.qc[testvm2.qid] testvm2 = self.app.domains[testvm2.qid]
self.assertEquals(testvm1.label, testvm2.label) self.assertEquals(testvm1.label, testvm2.label)
self.assertEquals(testvm1.netvm, testvm2.netvm) self.assertEquals(testvm1.netvm, testvm2.netvm)
self.assertEquals(testvm1.uses_default_netvm, self.assertEquals(testvm1.property_is_default('netvm'),
testvm2.uses_default_netvm) testvm2.property_is_default('netvm'))
self.assertEquals(testvm1.kernel, testvm2.kernel) self.assertEquals(testvm1.kernel, testvm2.kernel)
self.assertEquals(testvm1.kernelopts, testvm2.kernelopts) self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
self.assertEquals(testvm1.uses_default_kernel, self.assertEquals(testvm1.property_is_default('kernel'),
testvm2.uses_default_kernel) testvm2.property_is_default('kernel'))
self.assertEquals(testvm1.uses_default_kernelopts, self.assertEquals(testvm1.property_is_default('kernelopts'),
testvm2.uses_default_kernelopts) testvm2.property_is_default('kernelopts'))
self.assertEquals(testvm1.memory, testvm2.memory) self.assertEquals(testvm1.memory, testvm2.memory)
self.assertEquals(testvm1.maxmem, testvm2.maxmem) self.assertEquals(testvm1.maxmem, testvm2.maxmem)
self.assertEquals(testvm1.pcidevs, testvm2.pcidevs) # TODO
# self.assertEquals(testvm1.pcidevs, testvm2.pcidevs)
self.assertEquals(testvm1.include_in_backups, self.assertEquals(testvm1.include_in_backups,
testvm2.include_in_backups) testvm2.include_in_backups)
self.assertEquals(testvm1.default_user, testvm2.default_user) self.assertEquals(testvm1.default_user, testvm2.default_user)
@ -226,8 +234,7 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# now some non-default values # now some non-default values
testvm1.netvm = None testvm1.netvm = None
testvm1.uses_default_netvm = False testvm1.label = 'orange'
testvm1.label = QubesVmLabels['orange']
testvm1.memory = 512 testvm1.memory = 512
firewall = testvm1.get_firewall_conf() firewall = testvm1.get_firewall_conf()
firewall['allowDns'] = False firewall['allowDns'] = False
@ -240,31 +247,33 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
}] }]
testvm1.write_firewall_conf(firewall) testvm1.write_firewall_conf(firewall)
testvm3 = self.qc.add_new_vm(testvm1.__class__.__name__, testvm3 = self.app.add_new_vm(testvm1.__class__,
name=self.make_vm_name("clone2"), name=self.make_vm_name("clone2"),
template=testvm1.template, template=testvm1.template,
label='red',
) )
testvm3.clone_attrs(src_vm=testvm1) testvm3.clone_properties(testvm1)
testvm3.clone_disk_files(src_vm=testvm1, verbose=False) testvm3.clone_disk_files(testvm1)
# qubes.xml reload # qubes.xml reload
self.save_and_reload_db() self.save_and_reload_db()
testvm1 = self.qc[testvm1.qid] testvm1 = self.app.domains[testvm1.qid]
testvm3 = self.qc[testvm3.qid] testvm3 = self.app.domains[testvm3.qid]
self.assertEquals(testvm1.label, testvm3.label) self.assertEquals(testvm1.label, testvm3.label)
self.assertEquals(testvm1.netvm, testvm3.netvm) self.assertEquals(testvm1.netvm, testvm3.netvm)
self.assertEquals(testvm1.uses_default_netvm, self.assertEquals(testvm1.property_is_default('netvm'),
testvm3.uses_default_netvm) testvm3.property_is_default('netvm'))
self.assertEquals(testvm1.kernel, testvm3.kernel) self.assertEquals(testvm1.kernel, testvm3.kernel)
self.assertEquals(testvm1.kernelopts, testvm3.kernelopts) self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
self.assertEquals(testvm1.uses_default_kernel, self.assertEquals(testvm1.property_is_default('kernel'),
testvm3.uses_default_kernel) testvm3.property_is_default('kernel'))
self.assertEquals(testvm1.uses_default_kernelopts, self.assertEquals(testvm1.property_is_default('kernelopts'),
testvm3.uses_default_kernelopts) testvm3.property_is_default('kernelopts'))
self.assertEquals(testvm1.memory, testvm3.memory) self.assertEquals(testvm1.memory, testvm3.memory)
self.assertEquals(testvm1.maxmem, testvm3.maxmem) self.assertEquals(testvm1.maxmem, testvm3.maxmem)
self.assertEquals(testvm1.pcidevs, testvm3.pcidevs) # TODO
# self.assertEquals(testvm1.pcidevs, testvm3.pcidevs)
self.assertEquals(testvm1.include_in_backups, self.assertEquals(testvm1.include_in_backups,
testvm3.include_in_backups) testvm3.include_in_backups)
self.assertEquals(testvm1.default_user, testvm3.default_user) self.assertEquals(testvm1.default_user, testvm3.default_user)
@ -273,33 +282,40 @@ class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
testvm3.get_firewall_conf()) testvm3.get_firewall_conf())
class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
def setUp(self):
super(TC_02_QvmPrefs, self).setUp()
self.init_default_template()
self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
def setup_appvm(self): def setup_appvm(self):
self.testvm = self.qc.add_new_vm( self.testvm = self.app.add_new_vm(
"QubesAppVm", qubes.vm.appvm.AppVM,
name=self.make_vm_name("vm"), name=self.make_vm_name("vm"),
template=self.qc.get_default_template()) label='red')
self.testvm.create_on_disk(verbose=False) self.testvm.create_on_disk()
self.save_and_reload_db() self.save_and_reload_db()
self.qc.unlock_db()
def setup_hvm(self): def setup_hvm(self):
self.testvm = self.qc.add_new_vm( self.testvm = self.app.add_new_vm(
"QubesHVm", qubes.vm.appvm.AppVM,
name=self.make_vm_name("hvm")) name=self.make_vm_name("hvm"),
self.testvm.create_on_disk(verbose=False) label='red')
self.testvm.hvm = True
self.testvm.create_on_disk()
self.save_and_reload_db() self.save_and_reload_db()
self.qc.unlock_db()
def pref_set(self, name, value, valid=True): def pref_set(self, name, value, valid=True):
p = subprocess.Popen( p = subprocess.Popen(
['qvm-prefs', '-s', '--', self.testvm.name, name, value], ['qvm-prefs'] + self.sharedopts +
(['--'] if value != '-D' else []) + [self.testvm.name, name, value],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
(stdout, stderr) = p.communicate() (stdout, stderr) = p.communicate()
if valid: if valid:
self.assertEquals(p.returncode, 0, self.assertEquals(p.returncode, 0,
"qvm-prefs -s .. '{}' '{}' failed: {}{}".format( "qvm-prefs .. '{}' '{}' failed: {}{}".format(
name, value, stdout, stderr name, value, stdout, stderr
)) ))
else: else:
@ -308,8 +324,8 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
"property '{}'".format(value, name)) "property '{}'".format(value, name))
def pref_get(self, name): def pref_get(self, name):
p = subprocess.Popen(['qvm-prefs', '-g', self.testvm.name, name], p = subprocess.Popen(['qvm-prefs'] + self.sharedopts +
stdout=subprocess.PIPE) ['--', self.testvm.name, name], stdout=subprocess.PIPE)
(stdout, _) = p.communicate() (stdout, _) = p.communicate()
self.assertEquals(p.returncode, 0) self.assertEquals(p.returncode, 0)
return stdout.strip() return stdout.strip()
@ -379,10 +395,12 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
('', '', True), ('', '', True),
]) ])
@unittest.skip('test not converted to core3 API')
def test_006_template(self): def test_006_template(self):
templates = [tpl for tpl in self.qc.values() if tpl.is_template()] templates = [tpl for tpl in self.app.domains.values() if
tpl.is_template()]
if not templates: if not templates:
self.skip("No templates installed") self.skipTest("No templates installed")
some_template = templates[0].name some_template = templates[0].name
self.setup_appvm() self.setup_appvm()
self.execute_tests('template', [ self.execute_tests('template', [
@ -467,6 +485,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
# TODO: tests for standalone VMs # TODO: tests for standalone VMs
]) ])
@unittest.skip('test not converted to core3 API')
def test_014_pcidevs(self): def test_014_pcidevs(self):
self.setup_appvm() self.setup_appvm()
self.execute_tests('pcidevs', [ self.execute_tests('pcidevs', [
@ -573,6 +592,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
('invalid', '', False), ('invalid', '', False),
]) ])
@unittest.skip('test not converted to core3 API')
def test_024_pv_reject_hvm_props(self): def test_024_pv_reject_hvm_props(self):
self.setup_appvm() self.setup_appvm()
self.execute_tests('guiagent_installed', [('False', '', False)]) self.execute_tests('guiagent_installed', [('False', '', False)])
@ -580,6 +600,7 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.execute_tests('drive', [('/tmp/drive.img', '', False)]) self.execute_tests('drive', [('/tmp/drive.img', '', False)])
self.execute_tests('timezone', [('localtime', '', False)]) self.execute_tests('timezone', [('localtime', '', False)])
@unittest.skip('test not converted to core3 API')
def test_025_hvm_reject_pv_props(self): def test_025_hvm_reject_pv_props(self):
self.setup_hvm() self.setup_hvm()
self.execute_tests('kernel', [('default', '', False)]) self.execute_tests('kernel', [('default', '', False)])
@ -588,26 +609,29 @@ class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin, class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
qubes.tests.QubesTestCase): qubes.tests.QubesTestCase):
def setUp(self):
super(TC_03_QvmRevertTemplateChanges, self).setUp()
self.init_default_template()
def setup_pv_template(self): def setup_pv_template(self):
self.test_template = self.qc.add_new_vm( self.test_template = self.app.add_new_vm(
"QubesTemplateVm", qubes.vm.templatevm.TemplateVM,
name=self.make_vm_name("pv-clone"), name=self.make_vm_name("pv-clone"),
label='red'
) )
self.test_template.clone_attrs(src_vm=self.qc.get_default_template()) self.test_template.clone_properties(self.app.default_template)
self.test_template.clone_disk_files( self.test_template.clone_disk_files(self.app.default_template)
src_vm=self.qc.get_default_template(),
verbose=False)
self.save_and_reload_db() self.save_and_reload_db()
self.qc.unlock_db()
def setup_hvm_template(self): def setup_hvm_template(self):
self.test_template = self.qc.add_new_vm( self.test_template = self.app.add_new_vm(
"QubesTemplateHVm", qubes.vm.templatevm.TemplateVM,
name=self.make_vm_name("hvm"), name=self.make_vm_name("hvm"),
label='red',
hvm=True
) )
self.test_template.create_on_disk(verbose=False) self.test_template.create_on_disk()
self.save_and_reload_db() self.save_and_reload_db()
self.qc.unlock_db()
def get_rootimg_checksum(self): def get_rootimg_checksum(self):
p = subprocess.Popen(['sha1sum', self.test_template.root_img], p = subprocess.Popen(['sha1sum', self.test_template.root_img],
@ -616,7 +640,7 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
def _do_test(self): def _do_test(self):
checksum_before = self.get_rootimg_checksum() checksum_before = self.get_rootimg_checksum()
self.test_template.start(verbose=False) self.test_template.start()
self.shutdown_and_wait(self.test_template) self.shutdown_and_wait(self.test_template)
checksum_changed = self.get_rootimg_checksum() checksum_changed = self.get_rootimg_checksum()
if checksum_before == checksum_changed: if checksum_before == checksum_changed:
@ -644,11 +668,25 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
self.setup_hvm_template() self.setup_hvm_template()
self._do_test() self._do_test()
class TC_04_DispVM(qubes.tests.SystemTestsMixin, class TC_04_DispVM(qubes.tests.SystemTestsMixin,
qubes.tests.QubesTestCase): qubes.tests.QubesTestCase):
def setUp(self):
self.skipTest('DisposableVMs not implemented in core3 yet')
super(TC_04_DispVM, self).setUp()
self.init_default_template()
disp_tpl = self.host_app.domains[self.get_dispvm_template_name()]
self.disp_tpl = self.app.add_new_vm(disp_tpl.__class__,
name=disp_tpl.name,
template=disp_tpl.template,
label='red'
)
self.app.save()
@staticmethod @staticmethod
def get_dispvm_template_name(): def get_dispvm_template_name():
# FIXME: currently qubes.xml doesn't contain this information...
vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir') vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
return os.path.basename(vmdir) return os.path.basename(vmdir)
@ -657,14 +695,10 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
Check firewall propagation VM->DispVM, when VM have some firewall rules Check firewall propagation VM->DispVM, when VM have some firewall rules
""" """
# FIXME: currently qubes.xml doesn't contain this information... testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
dispvm_template_name = self.get_dispvm_template_name()
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
testvm1 = self.qc.add_new_vm("QubesAppVm",
name=self.make_vm_name('vm1'), name=self.make_vm_name('vm1'),
template=self.qc.get_default_template()) label='red')
testvm1.create_on_disk(verbose=False) testvm1.create_on_disk()
firewall = testvm1.get_firewall_conf() firewall = testvm1.get_firewall_conf()
firewall['allowDns'] = False firewall['allowDns'] = False
firewall['allowYumProxy'] = False firewall['allowYumProxy'] = False
@ -675,8 +709,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
'portEnd': 22, 'portEnd': 22,
}] }]
testvm1.write_firewall_conf(firewall) testvm1.write_firewall_conf(firewall)
self.qc.save() self.app.save()
self.qc.unlock_db()
testvm1.start() testvm1.start()
@ -685,17 +718,15 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
passio_popen=True) passio_popen=True)
dispvm_name = p.stdout.readline().strip() dispvm_name = p.stdout.readline().strip()
self.qc.lock_db_for_reading() self.reload_db()
self.qc.load() dispvm = self.app.domains[dispvm_name]
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
dispvm_name)) dispvm_name))
# check if firewall was propagated to the DispVM # check if firewall was propagated to the DispVM
self.assertEquals(testvm1.get_firewall_conf(), self.assertEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
# and only there (#1608) # and only there (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(), self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
# then modify some rule # then modify some rule
firewall = dispvm.get_firewall_conf() firewall = dispvm.get_firewall_conf()
@ -707,7 +738,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
}] }]
dispvm.write_firewall_conf(firewall) dispvm.write_firewall_conf(firewall)
# and check again if wasn't saved anywhere else (#1608) # and check again if wasn't saved anywhere else (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(), self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
self.assertNotEquals(testvm1.get_firewall_conf(), self.assertNotEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
@ -718,24 +749,20 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
""" """
Check firewall propagation VM->DispVM, when VM have no firewall rules Check firewall propagation VM->DispVM, when VM have no firewall rules
""" """
testvm1 = self.qc.add_new_vm("QubesAppVm", testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm1'), name=self.make_vm_name('vm1'),
template=self.qc.get_default_template()) label='red')
testvm1.create_on_disk(verbose=False) testvm1.create_on_disk()
self.qc.save() self.app.save()
self.qc.unlock_db()
# FIXME: currently qubes.xml doesn't contain this information...
dispvm_template_name = self.get_dispvm_template_name()
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
original_firewall = None original_firewall = None
if os.path.exists(dispvm_template.firewall_conf): if os.path.exists(self.disp_tpl.firewall_conf):
original_firewall = tempfile.TemporaryFile() original_firewall = tempfile.TemporaryFile()
with open(dispvm_template.firewall_conf) as f: with open(self.disp_tpl.firewall_conf) as f:
original_firewall.write(f.read()) original_firewall.write(f.read())
try: try:
firewall = dispvm_template.get_firewall_conf() firewall = self.disp_tpl.get_firewall_conf()
firewall['allowDns'] = False firewall['allowDns'] = False
firewall['allowYumProxy'] = False firewall['allowYumProxy'] = False
firewall['rules'] = [{'address': '1.2.3.4', firewall['rules'] = [{'address': '1.2.3.4',
@ -744,7 +771,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
'portBegin': 22, 'portBegin': 22,
'portEnd': 22, 'portEnd': 22,
}] }]
dispvm_template.write_firewall_conf(firewall) self.disp_tpl.write_firewall_conf(firewall)
testvm1.start() testvm1.start()
@ -753,17 +780,15 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
passio_popen=True) passio_popen=True)
dispvm_name = p.stdout.readline().strip() dispvm_name = p.stdout.readline().strip()
self.qc.lock_db_for_reading() self.reload_db()
self.qc.load() dispvm = self.app.domains[dispvm_name]
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
dispvm_name)) dispvm_name))
# check if firewall was propagated to the DispVM from the right VM # check if firewall was propagated to the DispVM from the right VM
self.assertEquals(testvm1.get_firewall_conf(), self.assertEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
# and only there (#1608) # and only there (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(), self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
# then modify some rule # then modify some rule
firewall = dispvm.get_firewall_conf() firewall = dispvm.get_firewall_conf()
@ -775,7 +800,7 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
}] }]
dispvm.write_firewall_conf(firewall) dispvm.write_firewall_conf(firewall)
# and check again if wasn't saved anywhere else (#1608) # and check again if wasn't saved anywhere else (#1608)
self.assertNotEquals(dispvm_template.get_firewall_conf(), self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
self.assertNotEquals(testvm1.get_firewall_conf(), self.assertNotEquals(testvm1.get_firewall_conf(),
dispvm.get_firewall_conf()) dispvm.get_firewall_conf())
@ -784,14 +809,13 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
finally: finally:
if original_firewall: if original_firewall:
original_firewall.seek(0) original_firewall.seek(0)
with open(dispvm_template.firewall_conf, 'w') as f: with open(self.disp_tpl.firewall_conf, 'w') as f:
f.write(original_firewall.read()) f.write(original_firewall.read())
original_firewall.close() original_firewall.close()
else: else:
os.unlink(dispvm_template.firewall_conf) os.unlink(self.disp_tpl.firewall_conf)
def test_002_cleanup(self): def test_002_cleanup(self):
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'], 'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@ -803,10 +827,8 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
lines = stdout.splitlines() lines = stdout.splitlines()
self.assertEqual(lines[0], "test") self.assertEqual(lines[0], "test")
dispvm_name = lines[1] dispvm_name = lines[1]
self.qc.lock_db_for_reading() self.reload_db()
self.qc.load() dispvm = self.app.domains[dispvm_name]
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format( self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
dispvm_name)) dispvm_name))
@ -815,7 +837,6 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
Check if DispVM is properly removed even if it terminated itself (#1660) Check if DispVM is properly removed even if it terminated itself (#1660)
:return: :return:
""" """
self.qc.unlock_db()
p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm', p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
'qubes.VMShell', 'dom0', 'DEFAULT'], 'qubes.VMShell', 'dom0', 'DEFAULT'],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@ -837,10 +858,8 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
lines = p.stdout.read().splitlines() lines = p.stdout.read().splitlines()
dispvm_name = lines[0] dispvm_name = lines[0]
self.assertNotEquals(dispvm_name, "ERROR") self.assertNotEquals(dispvm_name, "ERROR")
self.qc.lock_db_for_reading() self.reload_db()
self.qc.load() dispvm = self.app.domains[dispvm_name]
self.qc.unlock_db()
dispvm = self.qc.get_vm_by_name(dispvm_name)
self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format( self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
dispvm_name)) dispvm_name))