From 842efb577dda9fc3901616b669a847594fb6e994 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 26 Jun 2017 05:11:24 +0200 Subject: [PATCH] firewall: always use policy 'drop' There is a problem with having separate default action ("policy") and rules because it isn't possible to set both of them atomically at the same time. To solve this problem, always have policy 'drop' (as a safe default), but by default have a single rule with action 'accept' Fixes QubesOS/qubes-issues#2869 --- qubes/firewall.py | 27 ++++++++++++--------------- qubes/tests/firewall.py | 22 ++++++++++------------ 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/qubes/firewall.py b/qubes/firewall.py index 502fe7a1..1748047d 100644 --- a/qubes/firewall.py +++ b/qubes/firewall.py @@ -368,21 +368,23 @@ class Firewall(object): self.vm = vm #: firewall rules self.rules = [] - #: default action - self.policy = None if load: self.load() + @property + def policy(self): + ''' Default action - always 'drop' ''' + return Action('drop') + def __eq__(self, other): if isinstance(other, Firewall): - return self.policy == other.policy and self.rules == other.rules + return self.rules == other.rules return NotImplemented def load_defaults(self): '''Load default firewall settings''' - self.rules = [] - self.policy = Action('accept') + self.rules = [Rule(None, action='accept')] def clone(self, other): '''Clone firewall settings from other instance. @@ -390,7 +392,6 @@ class Firewall(object): :param other: other :py:class:`Firewall` instance ''' - self.policy = other.policy rules = [] for rule in other.rules: new_rule = Rule() @@ -422,9 +423,9 @@ class Firewall(object): policy_v1 = xml_root.get('policy') assert policy_v1 in ('allow', 'deny') if policy_v1 == 'allow': - self.policy = Action('accept') + policy = Action('accept') else: - self.policy = Action('drop') + policy = Action('drop') def _translate_action(key): if xml_root.get(key, policy_v1) == 'allow': @@ -439,7 +440,7 @@ class Firewall(object): action=_translate_action('icmp'), proto=Proto.icmp)) - if self.policy == Action.accept: + if policy == Action.accept: rule_action = Action.drop else: rule_action = Action.accept @@ -447,11 +448,11 @@ class Firewall(object): for element in xml_root: rule = Rule.from_xml_v1(element, rule_action) self.rules.append(rule) + if policy == Action.accept: + self.rules.append(Rule(None, action='accept')) def load_v2(self, xml_root): '''Load new (Qubes >= 4.0) firewall XML format''' - self.policy = Action(xml_root.findtext('policy')) - xml_rules = xml_root.find('rules') for xml_rule in xml_rules: rule = Rule(xml_rule) @@ -464,10 +465,6 @@ class Firewall(object): xml_root = lxml.etree.Element('firewall', version=str(2)) - xml_policy = lxml.etree.Element('policy') - xml_policy.text = str(self.policy) - xml_root.append(xml_policy) - xml_rules = lxml.etree.Element('rules') for rule in self.rules: if rule.expire: diff --git a/qubes/tests/firewall.py b/qubes/tests/firewall.py index e17a616d..5c1a330e 100644 --- a/qubes/tests/firewall.py +++ b/qubes/tests/firewall.py @@ -463,17 +463,17 @@ class TC_10_Firewall(qubes.tests.QubesTestCase): def test_000_defaults(self): fw = qubes.firewall.Firewall(self.vm, False) fw.load_defaults() - self.assertEqual(fw.policy, 'accept') - self.assertEqual(fw.rules, []) + self.assertEqual(fw.policy, 'drop') + self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')]) def test_001_save_load_empty(self): fw = qubes.firewall.Firewall(self.vm, True) - self.assertEqual(fw.policy, 'accept') - self.assertEqual(fw.rules, []) + self.assertEqual(fw.policy, 'drop') + self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')]) fw.save() fw.load() - self.assertEqual(fw.policy, 'accept') - self.assertEqual(fw.rules, []) + self.assertEqual(fw.policy, 'drop') + self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')]) def test_002_save_load_rules(self): fw = qubes.firewall.Firewall(self.vm, True) @@ -485,13 +485,13 @@ class TC_10_Firewall(qubes.tests.QubesTestCase): qubes.firewall.Rule(None, action='accept', specialtarget='dns'), ] fw.rules.extend(rules) - fw.policy = qubes.firewall.Action.drop fw.save() self.assertTrue(os.path.exists(os.path.join( self.vm.dir_path, self.vm.firewall_conf))) fw = qubes.firewall.Firewall(TestVM(), True) self.assertEqual(fw.policy, qubes.firewall.Action.drop) - self.assertEqual(fw.rules, rules) + self.assertEqual(fw.rules, + [qubes.firewall.Rule(None, action='accept')] + rules) def test_003_load_v1(self): xml_txt = """