From 088c2553553cea334dae8f97f8797ec900bf867b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Sun, 3 Dec 2017 03:21:35 +0100 Subject: [PATCH] tests: add create_qdb_entries() unit test --- qubes/tests/vm/__init__.py | 27 ++++++- qubes/tests/vm/mix/net.py | 5 +- qubes/tests/vm/qubesvm.py | 151 ++++++++++++++++++++++++++++++++++++- 3 files changed, 176 insertions(+), 7 deletions(-) diff --git a/qubes/tests/vm/__init__.py b/qubes/tests/vm/__init__.py index 14eb2638..92a3a044 100644 --- a/qubes/tests/vm/__init__.py +++ b/qubes/tests/vm/__init__.py @@ -38,6 +38,23 @@ class TestHost(object): self.memory_total = 1000 * 1024 self.no_cpus = 4 +class TestVMsCollection(dict): + def get_vms_connected_to(self, vm): + return set() + + def close(self): + self.clear() + +class TestVolume(object): + def __init__(self, pool): + self.pool = pool + self.size = 0 + self.source = None + +class TestPool(object): + def init_volume(self, *args, **kwargs): + return TestVolume(self) + class TestApp(qubes.tests.TestEmitter): labels = {1: qubes.Label(1, '0xcc0000', 'red')} check_updates_vm = False @@ -58,12 +75,18 @@ class TestApp(qubes.tests.TestEmitter): super(TestApp, self).__init__() self.vmm = TestVMM() self.host = TestHost() - self.pools = {} + default_pool = TestPool() + self.pools = { + 'default': default_pool, + default_pool: default_pool, + 'linux-kernel': TestPool(), + } self.default_pool_volatile = 'default' self.default_pool_root = 'default' self.default_pool_private = 'default' self.default_pool_kernel = 'linux-kernel' - self.domains = {} + self.default_netvm = None + self.domains = TestVMsCollection() #: jinja2 environment for libvirt XML templates self.env = jinja2.Environment( loader=jinja2.FileSystemLoader([ diff --git a/qubes/tests/vm/mix/net.py b/qubes/tests/vm/mix/net.py index aa089eec..2e63ae6d 100644 --- a/qubes/tests/vm/mix/net.py +++ b/qubes/tests/vm/mix/net.py @@ -57,7 +57,10 @@ class TC_00_NetVMMixin( self.netvm1.close() self.netvm2.close() self.nonetvm.close() - self.app.domains.close() + try: + self.app.domains.close() + except AttributeError: + pass del self.netvm1 del self.netvm2 del self.nonetvm diff --git a/qubes/tests/vm/qubesvm.py b/qubes/tests/vm/qubesvm.py index 65572f9d..3288a507 100644 --- a/qubes/tests/vm/qubesvm.py +++ b/qubes/tests/vm/qubesvm.py @@ -19,7 +19,7 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, see . # - +import base64 import os import unittest @@ -54,6 +54,20 @@ class TestDeviceCollection(object): def persistent(self): return self._list +class TestQubesDB(object): + def __init__(self): + self.data = {} + + def write(self, path, value): + self.data[path] = value + + def rm(self, path): + if path.endswith('/'): + for key in [x for x in self.data if x.startswith(path)]: + del self.data[key] + else: + self.data.pop(path, None) + class TestVM(object): # pylint: disable=too-few-public-methods app = TestApp() @@ -133,11 +147,25 @@ class QubesVMTestsMixin(object): super(QubesVMTestsMixin, self).setUp() self.app = qubes.tests.vm.TestApp() self.app.vmm.offline_mode = True + # when full test run is called, extensions are loaded by earlier + # tests, but if just this test class is run, load them manually here, + # to have the same behaviour + qubes.ext.get_extensions() - def get_vm(self, name='test', **kwargs): - vm = qubes.vm.qubesvm.QubesVM(self.app, None, - qid=1, name=qubes.tests.VMPREFIX + name, + def tearDown(self): + try: + self.app.domains.close() + except AttributeError: + pass + super(QubesVMTestsMixin, self).tearDown() + + def get_vm(self, name='test', cls=qubes.vm.qubesvm.QubesVM, **kwargs): + vm = cls(self.app, None, + qid=kwargs.pop('qid', 1), name=qubes.tests.VMPREFIX + name, **kwargs) + self.app.domains[vm.qid] = vm + self.app.domains[vm.uuid] = vm + self.app.domains[vm] = vm self.addCleanup(vm.close) return vm @@ -685,3 +713,118 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase): libvirt_xml = vm.create_config_file() self.assertXMLEqual(lxml.etree.XML(libvirt_xml), lxml.etree.XML(expected)) + + @unittest.mock.patch('qubes.utils.get_timezone') + @unittest.mock.patch('qubes.utils.urandom') + @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.untrusted_qdb') + def test_620_qdb_standalone(self, mock_qubesdb, mock_urandom, + mock_timezone): + mock_urandom.return_value = b'A' * 64 + mock_timezone.return_value = 'UTC' + vm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM) + vm.netvm = None + vm.events_enabled = True + test_qubesdb = TestQubesDB() + mock_qubesdb.write.side_effect = test_qubesdb.write + mock_qubesdb.rm.side_effect = test_qubesdb.rm + vm.create_qdb_entries() + self.maxDiff = None + + iptables_header = ( + '# Generated by Qubes Core on {}\n' + '*filter\n' + ':INPUT DROP [0:0]\n' + ':FORWARD DROP [0:0]\n' + ':OUTPUT ACCEPT [0:0]\n' + '-A INPUT -i vif+ -p udp -m udp --dport 68 -j DROP\n' + '-A INPUT -m conntrack --ctstate ' + 'RELATED,ESTABLISHED -j ACCEPT\n' + '-A INPUT -p icmp -j ACCEPT\n' + '-A INPUT -i lo -j ACCEPT\n' + '-A INPUT -j REJECT --reject-with ' + 'icmp-host-prohibited\n' + '-A FORWARD -m conntrack --ctstate ' + 'RELATED,ESTABLISHED -j ACCEPT\n' + '-A FORWARD -i vif+ -o vif+ -j DROP\n' + 'COMMIT\n'.format(datetime.datetime.now().ctime())) + + self.assertEqual(test_qubesdb.data, { + '/name': 'test-inst-test', + '/type': 'StandaloneVM', + '/qubes-vm-type': 'AppVM', + '/qubes-debug-mode': '0', + '/qubes-base-template': '', + '/qubes-timezone': 'UTC', + '/qubes-random-seed': base64.b64encode(b'A' * 64), + '/qubes-vm-persistence': 'full', + '/qubes-vm-updateable': 'True', + '/qubes-block-devices': '', + '/qubes-usb-devices': '', + '/qubes-iptables': 'reload', + '/qubes-iptables-error': '', + '/qubes-iptables-header': iptables_header, + '/qubes-service/qubes-update-check': '0', + }) + + @unittest.mock.patch('qubes.utils.get_timezone') + @unittest.mock.patch('qubes.utils.urandom') + @unittest.mock.patch('qubes.vm.qubesvm.QubesVM.untrusted_qdb') + def test_621_qdb_appvm_with_network(self, mock_qubesdb, mock_urandom, + mock_timezone): + mock_urandom.return_value = b'A' * 64 + mock_timezone.return_value = 'UTC' + template = self.get_vm(cls=qubes.vm.templatevm.TemplateVM, name='template') + template.netvm = None + netvm = self.get_vm(cls=qubes.vm.standalonevm.StandaloneVM, + name='netvm', qid=2, provides_network=True) + vm = self.get_vm(cls=qubes.vm.appvm.AppVM, template=template, + name='appvm', qid=3) + vm.netvm = netvm + test_qubesdb = TestQubesDB() + mock_qubesdb.write.side_effect = test_qubesdb.write + mock_qubesdb.rm.side_effect = test_qubesdb.rm + self.maxDiff = None + + iptables_header = ( + '# Generated by Qubes Core on {}\n' + '*filter\n' + ':INPUT DROP [0:0]\n' + ':FORWARD DROP [0:0]\n' + ':OUTPUT ACCEPT [0:0]\n' + '-A INPUT -i vif+ -p udp -m udp --dport 68 -j DROP\n' + '-A INPUT -m conntrack --ctstate ' + 'RELATED,ESTABLISHED -j ACCEPT\n' + '-A INPUT -p icmp -j ACCEPT\n' + '-A INPUT -i lo -j ACCEPT\n' + '-A INPUT -j REJECT --reject-with ' + 'icmp-host-prohibited\n' + '-A FORWARD -m conntrack --ctstate ' + 'RELATED,ESTABLISHED -j ACCEPT\n' + '-A FORWARD -i vif+ -o vif+ -j DROP\n' + 'COMMIT\n'.format(datetime.datetime.now().ctime())) + + expected = { + '/name': 'test-inst-appvm', + '/type': 'AppVM', + '/qubes-vm-type': 'AppVM', + '/qubes-debug-mode': '0', + '/qubes-base-template': 'test-inst-template', + '/qubes-timezone': 'UTC', + '/qubes-random-seed': base64.b64encode(b'A' * 64), + '/qubes-vm-persistence': 'rw-only', + '/qubes-vm-updateable': 'False', + '/qubes-block-devices': '', + '/qubes-usb-devices': '', + '/qubes-iptables': 'reload', + '/qubes-iptables-error': '', + '/qubes-iptables-header': iptables_header, + '/qubes-service/qubes-update-check': '0', + '/qubes-ip': '10.137.0.3', + '/qubes-netmask': '255.255.255.255', + '/qubes-gateway': '10.137.0.2', + '/qubes-primary-dns': '10.139.1.1', + '/qubes-secondary-dns': '10.139.1.2', + } + + vm.create_qdb_entries() + self.assertEqual(test_qubesdb.data, expected)