diff --git a/tests/basic.py b/tests/basic.py index e53a0b55..8293b3d9 100644 --- a/tests/basic.py +++ b/tests/basic.py @@ -27,6 +27,7 @@ import multiprocessing import os import shutil import subprocess +import tempfile import unittest import time @@ -633,7 +634,20 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin, class TC_04_DispVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): + @staticmethod + def get_dispvm_template_name(): + vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir') + return os.path.basename(vmdir) + def test_000_firewall_propagation(self): + """ + Check firewall propagation VM->DispVM, when VM have some firewall rules + """ + + # FIXME: currently qubes.xml doesn't contain this information... + dispvm_template_name = self.get_dispvm_template_name() + dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) + testvm1 = self.qc.add_new_vm("QubesAppVm", name=self.make_vm_name('vm1'), template=self.qc.get_default_template()) @@ -664,9 +678,6 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin, dispvm = self.qc.get_vm_by_name(dispvm_name) self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( dispvm_name)) - # FIXME: currently qubes.xml doesn't contain this information... - dispvm_template_name = os.path.basename(dispvm.dir_path) - dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) # check if firewall was propagated to the DispVM self.assertEquals(testvm1.get_firewall_conf(), dispvm.get_firewall_conf()) @@ -690,5 +701,81 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin, p.stdin.write('\n') p.wait() + def test_001_firewall_propagation(self): + """ + Check firewall propagation VM->DispVM, when VM have no firewall rules + """ + testvm1 = self.qc.add_new_vm("QubesAppVm", + name=self.make_vm_name('vm1'), + template=self.qc.get_default_template()) + testvm1.create_on_disk(verbose=False) + self.qc.save() + self.qc.unlock_db() + + # FIXME: currently qubes.xml doesn't contain this information... + dispvm_template_name = self.get_dispvm_template_name() + dispvm_template = self.qc.get_vm_by_name(dispvm_template_name) + original_firewall = None + if os.path.exists(dispvm_template.firewall_conf): + original_firewall = tempfile.TemporaryFile() + with open(dispvm_template.firewall_conf) as f: + original_firewall.write(f.read()) + try: + + firewall = dispvm_template.get_firewall_conf() + firewall['allowDns'] = False + firewall['allowYumProxy'] = False + firewall['rules'] = [{'address': '1.2.3.4', + 'netmask': 24, + 'proto': 'tcp', + 'portBegin': 22, + 'portEnd': 22, + }] + dispvm_template.write_firewall_conf(firewall) + + testvm1.start() + + p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;" + " read x'", + passio_popen=True) + + dispvm_name = p.stdout.readline().strip() + self.qc.lock_db_for_reading() + self.qc.load() + self.qc.unlock_db() + dispvm = self.qc.get_vm_by_name(dispvm_name) + self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format( + dispvm_name)) + # check if firewall was propagated to the DispVM from the right VM + self.assertEquals(testvm1.get_firewall_conf(), + dispvm.get_firewall_conf()) + # and only there (#1608) + self.assertNotEquals(dispvm_template.get_firewall_conf(), + dispvm.get_firewall_conf()) + # then modify some rule + firewall = dispvm.get_firewall_conf() + firewall['rules'] = [{'address': '4.3.2.1', + 'netmask': 24, + 'proto': 'tcp', + 'portBegin': 22, + 'portEnd': 22, + }] + dispvm.write_firewall_conf(firewall) + # and check again if wasn't saved anywhere else (#1608) + self.assertNotEquals(dispvm_template.get_firewall_conf(), + dispvm.get_firewall_conf()) + self.assertNotEquals(testvm1.get_firewall_conf(), + dispvm.get_firewall_conf()) + p.stdin.write('\n') + p.wait() + finally: + if original_firewall: + original_firewall.seek(0) + with open(dispvm_template.firewall_conf, 'w') as f: + f.write(original_firewall.read()) + original_firewall.close() + else: + os.unlink(dispvm_template.firewall_conf) + # vim: ts=4 sw=4 et