tests: "empty" firewall propagation
Check if "empty" firewall is also properly propagated. QubesOS/qubes-issues#1608
This commit is contained in:
parent
04415e13fe
commit
ce75ba411f
@ -27,6 +27,7 @@ import multiprocessing
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
import time
|
import time
|
||||||
@ -633,7 +634,20 @@ class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
|
|||||||
class TC_04_DispVM(qubes.tests.SystemTestsMixin,
|
class TC_04_DispVM(qubes.tests.SystemTestsMixin,
|
||||||
qubes.tests.QubesTestCase):
|
qubes.tests.QubesTestCase):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_dispvm_template_name():
|
||||||
|
vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
|
||||||
|
return os.path.basename(vmdir)
|
||||||
|
|
||||||
def test_000_firewall_propagation(self):
|
def test_000_firewall_propagation(self):
|
||||||
|
"""
|
||||||
|
Check firewall propagation VM->DispVM, when VM have some firewall rules
|
||||||
|
"""
|
||||||
|
|
||||||
|
# FIXME: currently qubes.xml doesn't contain this information...
|
||||||
|
dispvm_template_name = self.get_dispvm_template_name()
|
||||||
|
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
|
||||||
|
|
||||||
testvm1 = self.qc.add_new_vm("QubesAppVm",
|
testvm1 = self.qc.add_new_vm("QubesAppVm",
|
||||||
name=self.make_vm_name('vm1'),
|
name=self.make_vm_name('vm1'),
|
||||||
template=self.qc.get_default_template())
|
template=self.qc.get_default_template())
|
||||||
@ -664,9 +678,6 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
|
|||||||
dispvm = self.qc.get_vm_by_name(dispvm_name)
|
dispvm = self.qc.get_vm_by_name(dispvm_name)
|
||||||
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
|
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
|
||||||
dispvm_name))
|
dispvm_name))
|
||||||
# FIXME: currently qubes.xml doesn't contain this information...
|
|
||||||
dispvm_template_name = os.path.basename(dispvm.dir_path)
|
|
||||||
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
|
|
||||||
# check if firewall was propagated to the DispVM
|
# check if firewall was propagated to the DispVM
|
||||||
self.assertEquals(testvm1.get_firewall_conf(),
|
self.assertEquals(testvm1.get_firewall_conf(),
|
||||||
dispvm.get_firewall_conf())
|
dispvm.get_firewall_conf())
|
||||||
@ -690,5 +701,81 @@ class TC_04_DispVM(qubes.tests.SystemTestsMixin,
|
|||||||
p.stdin.write('\n')
|
p.stdin.write('\n')
|
||||||
p.wait()
|
p.wait()
|
||||||
|
|
||||||
|
def test_001_firewall_propagation(self):
|
||||||
|
"""
|
||||||
|
Check firewall propagation VM->DispVM, when VM have no firewall rules
|
||||||
|
"""
|
||||||
|
testvm1 = self.qc.add_new_vm("QubesAppVm",
|
||||||
|
name=self.make_vm_name('vm1'),
|
||||||
|
template=self.qc.get_default_template())
|
||||||
|
testvm1.create_on_disk(verbose=False)
|
||||||
|
self.qc.save()
|
||||||
|
self.qc.unlock_db()
|
||||||
|
|
||||||
|
# FIXME: currently qubes.xml doesn't contain this information...
|
||||||
|
dispvm_template_name = self.get_dispvm_template_name()
|
||||||
|
dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
|
||||||
|
original_firewall = None
|
||||||
|
if os.path.exists(dispvm_template.firewall_conf):
|
||||||
|
original_firewall = tempfile.TemporaryFile()
|
||||||
|
with open(dispvm_template.firewall_conf) as f:
|
||||||
|
original_firewall.write(f.read())
|
||||||
|
try:
|
||||||
|
|
||||||
|
firewall = dispvm_template.get_firewall_conf()
|
||||||
|
firewall['allowDns'] = False
|
||||||
|
firewall['allowYumProxy'] = False
|
||||||
|
firewall['rules'] = [{'address': '1.2.3.4',
|
||||||
|
'netmask': 24,
|
||||||
|
'proto': 'tcp',
|
||||||
|
'portBegin': 22,
|
||||||
|
'portEnd': 22,
|
||||||
|
}]
|
||||||
|
dispvm_template.write_firewall_conf(firewall)
|
||||||
|
|
||||||
|
testvm1.start()
|
||||||
|
|
||||||
|
p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
|
||||||
|
" read x'",
|
||||||
|
passio_popen=True)
|
||||||
|
|
||||||
|
dispvm_name = p.stdout.readline().strip()
|
||||||
|
self.qc.lock_db_for_reading()
|
||||||
|
self.qc.load()
|
||||||
|
self.qc.unlock_db()
|
||||||
|
dispvm = self.qc.get_vm_by_name(dispvm_name)
|
||||||
|
self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
|
||||||
|
dispvm_name))
|
||||||
|
# check if firewall was propagated to the DispVM from the right VM
|
||||||
|
self.assertEquals(testvm1.get_firewall_conf(),
|
||||||
|
dispvm.get_firewall_conf())
|
||||||
|
# and only there (#1608)
|
||||||
|
self.assertNotEquals(dispvm_template.get_firewall_conf(),
|
||||||
|
dispvm.get_firewall_conf())
|
||||||
|
# then modify some rule
|
||||||
|
firewall = dispvm.get_firewall_conf()
|
||||||
|
firewall['rules'] = [{'address': '4.3.2.1',
|
||||||
|
'netmask': 24,
|
||||||
|
'proto': 'tcp',
|
||||||
|
'portBegin': 22,
|
||||||
|
'portEnd': 22,
|
||||||
|
}]
|
||||||
|
dispvm.write_firewall_conf(firewall)
|
||||||
|
# and check again if wasn't saved anywhere else (#1608)
|
||||||
|
self.assertNotEquals(dispvm_template.get_firewall_conf(),
|
||||||
|
dispvm.get_firewall_conf())
|
||||||
|
self.assertNotEquals(testvm1.get_firewall_conf(),
|
||||||
|
dispvm.get_firewall_conf())
|
||||||
|
p.stdin.write('\n')
|
||||||
|
p.wait()
|
||||||
|
finally:
|
||||||
|
if original_firewall:
|
||||||
|
original_firewall.seek(0)
|
||||||
|
with open(dispvm_template.firewall_conf, 'w') as f:
|
||||||
|
f.write(original_firewall.read())
|
||||||
|
original_firewall.close()
|
||||||
|
else:
|
||||||
|
os.unlink(dispvm_template.firewall_conf)
|
||||||
|
|
||||||
|
|
||||||
# vim: ts=4 sw=4 et
|
# vim: ts=4 sw=4 et
|
||||||
|
Loading…
Reference in New Issue
Block a user