firewall: adjust tests to the new tuple returned by prepare_rules()

This commit is contained in:
3hhh 2021-05-15 22:13:01 +02:00
parent 78de37da92
commit 3067e469d3
No known key found for this signature in database
GPG Key ID: EB03A691DB2F0833

View File

@ -1,5 +1,6 @@
import logging import logging
import operator import operator
import re
from unittest import TestCase from unittest import TestCase
from unittest.mock import patch from unittest.mock import patch
@ -37,6 +38,9 @@ class DummyQubesDB(object):
else: else:
self.entries.pop(path) self.entries.pop(path)
def write(self, path, val):
self.entries[path] = val
def multiread(self, prefix): def multiread(self, prefix):
result = {} result = {}
for key, value in self.entries.items(): for key, value in self.entries.items():
@ -220,8 +224,12 @@ class TestIptablesWorker(TestCase):
"--reject-with icmp-admin-prohibited\n" "--reject-with icmp-admin-prohibited\n"
"COMMIT\n" "COMMIT\n"
) )
self.assertEqual(self.obj.prepare_rules('chain', rules, 4), ret = self.obj.prepare_rules('chain', rules, 4)
expected_iptables) self.assertEqual(ret[0], expected_iptables)
self.assertEqual(ret[1].keys(), {'yum.qubes-os.org'})
self.assertIsInstance(ret[1]['yum.qubes-os.org'], set)
self.assertIsNotNone(re.match('^\d+\.\d+\.\d+\.\d+/32$',
ret[1]['yum.qubes-os.org'].pop()))
with self.assertRaises(qubesagent.firewall.RuleParseError): with self.assertRaises(qubesagent.firewall.RuleParseError):
self.obj.prepare_rules('chain', [{'unknown': 'xxx'}], 4) self.obj.prepare_rules('chain', [{'unknown': 'xxx'}], 4)
with self.assertRaises(qubesagent.firewall.RuleParseError): with self.assertRaises(qubesagent.firewall.RuleParseError):
@ -258,8 +266,12 @@ class TestIptablesWorker(TestCase):
"--reject-with icmp6-adm-prohibited\n" "--reject-with icmp6-adm-prohibited\n"
"COMMIT\n" "COMMIT\n"
) )
self.assertEqual(self.obj.prepare_rules('chain', rules, 6), ret = self.obj.prepare_rules('chain', rules, 6)
expected_iptables) self.assertEqual(ret[0], expected_iptables)
self.assertEqual(ret[1].keys(), {'ripe.net'})
self.assertIsInstance(ret[1]['ripe.net'], set)
self.assertIsNotNone(re.match('^[0-9a-f:]+/\d+$',
ret[1]['ripe.net'].pop()))
def test_004_apply_rules4(self): def test_004_apply_rules4(self):
rules = [{'action': 'accept'}] rules = [{'action': 'accept'}]
@ -271,7 +283,7 @@ class TestIptablesWorker(TestCase):
['-I', 'QBS-FORWARD', '-s', '10.137.0.1', '-j', chain], ['-I', 'QBS-FORWARD', '-s', '10.137.0.1', '-j', chain],
['-F', chain]]) ['-F', chain]])
self.assertEqual(self.obj.loaded_iptables[4], self.assertEqual(self.obj.loaded_iptables[4],
self.obj.prepare_rules(chain, rules, 4)) self.obj.prepare_rules(chain, rules, 4)[0])
self.assertEqual(self.obj.called_commands[6], []) self.assertEqual(self.obj.called_commands[6], [])
self.assertIsNone(self.obj.loaded_iptables[6]) self.assertIsNone(self.obj.loaded_iptables[6])
@ -285,7 +297,7 @@ class TestIptablesWorker(TestCase):
['-I', 'QBS-FORWARD', '-s', '2000::a', '-j', chain], ['-I', 'QBS-FORWARD', '-s', '2000::a', '-j', chain],
['-F', chain]]) ['-F', chain]])
self.assertEqual(self.obj.loaded_iptables[6], self.assertEqual(self.obj.loaded_iptables[6],
self.obj.prepare_rules(chain, rules, 6)) self.obj.prepare_rules(chain, rules, 6)[0])
self.assertEqual(self.obj.called_commands[4], []) self.assertEqual(self.obj.called_commands[4], [])
self.assertIsNone(self.obj.loaded_iptables[4]) self.assertIsNone(self.obj.loaded_iptables[4])
@ -448,8 +460,12 @@ class TestNftablesWorker(TestCase):
' }\n' ' }\n'
'}\n' '}\n'
) )
self.assertEqual(self.obj.prepare_rules('chain', rules, 4), ret = self.obj.prepare_rules('chain', rules, 4)
expected_nft) self.assertEqual(ret[0], expected_nft)
self.assertEqual(ret[1].keys(), {'yum.qubes-os.org'})
self.assertIsInstance(ret[1]['yum.qubes-os.org'], set)
self.assertIsNotNone(re.match('^\d+\.\d+\.\d+\.\d+/32$',
ret[1]['yum.qubes-os.org'].pop()))
with self.assertRaises(qubesagent.firewall.RuleParseError): with self.assertRaises(qubesagent.firewall.RuleParseError):
self.obj.prepare_rules('chain', [{'unknown': 'xxx'}], 4) self.obj.prepare_rules('chain', [{'unknown': 'xxx'}], 4)
with self.assertRaises(qubesagent.firewall.RuleParseError): with self.assertRaises(qubesagent.firewall.RuleParseError):
@ -485,8 +501,12 @@ class TestNftablesWorker(TestCase):
' }\n' ' }\n'
'}\n' '}\n'
) )
self.assertEqual(self.obj.prepare_rules('chain', rules, 6), ret = self.obj.prepare_rules('chain', rules, 6)
expected_nft) self.assertEqual(ret[0], expected_nft)
self.assertEqual(ret[1].keys(), {'ripe.net'})
self.assertIsInstance(ret[1]['ripe.net'], set)
self.assertIsNotNone(re.match('^[0-9a-f:]+/\d+$',
ret[1]['ripe.net'].pop()))
def test_004_apply_rules4(self): def test_004_apply_rules4(self):
rules = [{'action': 'accept'}] rules = [{'action': 'accept'}]
@ -494,7 +514,7 @@ class TestNftablesWorker(TestCase):
self.obj.apply_rules('10.137.0.1', rules) self.obj.apply_rules('10.137.0.1', rules)
self.assertEqual(self.obj.loaded_rules, self.assertEqual(self.obj.loaded_rules,
[self.expected_create_chain('ip', '10.137.0.1', chain), [self.expected_create_chain('ip', '10.137.0.1', chain),
self.obj.prepare_rules(chain, rules, 4), self.obj.prepare_rules(chain, rules, 4)[0],
]) ])
def test_005_apply_rules6(self): def test_005_apply_rules6(self):
@ -503,7 +523,7 @@ class TestNftablesWorker(TestCase):
self.obj.apply_rules('2000::a', rules) self.obj.apply_rules('2000::a', rules)
self.assertEqual(self.obj.loaded_rules, self.assertEqual(self.obj.loaded_rules,
[self.expected_create_chain('ip6', '2000::a', chain), [self.expected_create_chain('ip6', '2000::a', chain),
self.obj.prepare_rules(chain, rules, 6), self.obj.prepare_rules(chain, rules, 6)[0],
]) ])
def test_006_init(self): def test_006_init(self):