|
@@ -1,5 +1,6 @@
|
|
|
import logging
|
|
|
import operator
|
|
|
+import re
|
|
|
from unittest import TestCase
|
|
|
from unittest.mock import patch
|
|
|
|
|
@@ -29,6 +30,17 @@ class DummyQubesDB(object):
|
|
|
except KeyError:
|
|
|
return None
|
|
|
|
|
|
+ def rm(self, path):
|
|
|
+ if path.endswith('/'):
|
|
|
+ for key in list(self.entries):
|
|
|
+ if key.startswith(path):
|
|
|
+ self.entries.pop(key)
|
|
|
+ else:
|
|
|
+ self.entries.pop(path)
|
|
|
+
|
|
|
+ def write(self, path, val):
|
|
|
+ self.entries[path] = val
|
|
|
+
|
|
|
def multiread(self, prefix):
|
|
|
result = {}
|
|
|
for key, value in self.entries.items():
|
|
@@ -154,8 +166,31 @@ class NftablesWorker(qubesagent.firewall.NftablesWorker):
|
|
|
else:
|
|
|
return ['2001::1', '2001::2']
|
|
|
|
|
|
+class WorkerCommon(object):
|
|
|
+ def assertPrepareRulesDnsRet(self, dns_ret, expected_domain, family):
|
|
|
+ self.assertEqual(dns_ret.keys(), {expected_domain})
|
|
|
+ self.assertIsInstance(dns_ret[expected_domain], set)
|
|
|
+ if family == 4:
|
|
|
+ self.assertIsNotNone(re.match('^\d+\.\d+\.\d+\.\d+/32$',
|
|
|
+ dns_ret[expected_domain].pop()))
|
|
|
+ elif family == 6:
|
|
|
+ self.assertIsNotNone(re.match('^[0-9a-f:]+/\d+$',
|
|
|
+ dns_ret[expected_domain].pop()))
|
|
|
+ else:
|
|
|
+ raise ValueError()
|
|
|
+
|
|
|
+ def test_701_dns_info(self):
|
|
|
+ rules = [
|
|
|
+ {'action': 'accept', 'proto': 'tcp',
|
|
|
+ 'dstports': '80-80', 'dsthost': 'ripe.net'},
|
|
|
+ {'action': 'drop'},
|
|
|
+ ]
|
|
|
+ self.obj.apply_rules('10.137.0.1', rules)
|
|
|
+ self.assertIsNotNone(self.obj.qdb.read('/dns/10.137.0.1/ripe.net'))
|
|
|
+ self.obj.apply_rules('10.137.0.1', [{'action': 'drop'}])
|
|
|
+ self.assertIsNone(self.obj.qdb.read('/dns/10.137.0.1/ripe.net'))
|
|
|
|
|
|
-class TestIptablesWorker(TestCase):
|
|
|
+class TestIptablesWorker(TestCase, WorkerCommon):
|
|
|
def setUp(self):
|
|
|
super(TestIptablesWorker, self).setUp()
|
|
|
self.obj = IptablesWorker()
|
|
@@ -212,8 +247,9 @@ class TestIptablesWorker(TestCase):
|
|
|
"--reject-with icmp-admin-prohibited\n"
|
|
|
"COMMIT\n"
|
|
|
)
|
|
|
- self.assertEqual(self.obj.prepare_rules('chain', rules, 4),
|
|
|
- expected_iptables)
|
|
|
+ ret = self.obj.prepare_rules('chain', rules, 4)
|
|
|
+ self.assertEqual(ret[0], expected_iptables)
|
|
|
+ self.assertPrepareRulesDnsRet(ret[1], 'yum.qubes-os.org', 4)
|
|
|
with self.assertRaises(qubesagent.firewall.RuleParseError):
|
|
|
self.obj.prepare_rules('chain', [{'unknown': 'xxx'}], 4)
|
|
|
with self.assertRaises(qubesagent.firewall.RuleParseError):
|
|
@@ -250,8 +286,9 @@ class TestIptablesWorker(TestCase):
|
|
|
"--reject-with icmp6-adm-prohibited\n"
|
|
|
"COMMIT\n"
|
|
|
)
|
|
|
- self.assertEqual(self.obj.prepare_rules('chain', rules, 6),
|
|
|
- expected_iptables)
|
|
|
+ ret = self.obj.prepare_rules('chain', rules, 6)
|
|
|
+ self.assertEqual(ret[0], expected_iptables)
|
|
|
+ self.assertPrepareRulesDnsRet(ret[1], 'ripe.net', 6)
|
|
|
|
|
|
def test_004_apply_rules4(self):
|
|
|
rules = [{'action': 'accept'}]
|
|
@@ -263,7 +300,7 @@ class TestIptablesWorker(TestCase):
|
|
|
['-I', 'QBS-FORWARD', '-s', '10.137.0.1', '-j', chain],
|
|
|
['-F', chain]])
|
|
|
self.assertEqual(self.obj.loaded_iptables[4],
|
|
|
- self.obj.prepare_rules(chain, rules, 4))
|
|
|
+ self.obj.prepare_rules(chain, rules, 4)[0])
|
|
|
self.assertEqual(self.obj.called_commands[6], [])
|
|
|
self.assertIsNone(self.obj.loaded_iptables[6])
|
|
|
|
|
@@ -277,7 +314,7 @@ class TestIptablesWorker(TestCase):
|
|
|
['-I', 'QBS-FORWARD', '-s', '2000::a', '-j', chain],
|
|
|
['-F', chain]])
|
|
|
self.assertEqual(self.obj.loaded_iptables[6],
|
|
|
- self.obj.prepare_rules(chain, rules, 6))
|
|
|
+ self.obj.prepare_rules(chain, rules, 6)[0])
|
|
|
self.assertEqual(self.obj.called_commands[4], [])
|
|
|
self.assertIsNone(self.obj.loaded_iptables[4])
|
|
|
|
|
@@ -372,8 +409,7 @@ class TestIptablesWorker(TestCase):
|
|
|
['-t', 'mangle', '-F', 'QBS-POSTROUTING'],
|
|
|
])
|
|
|
|
|
|
-
|
|
|
-class TestNftablesWorker(TestCase):
|
|
|
+class TestNftablesWorker(TestCase, WorkerCommon):
|
|
|
def setUp(self):
|
|
|
super(TestNftablesWorker, self).setUp()
|
|
|
self.obj = NftablesWorker()
|
|
@@ -440,8 +476,9 @@ class TestNftablesWorker(TestCase):
|
|
|
' }\n'
|
|
|
'}\n'
|
|
|
)
|
|
|
- self.assertEqual(self.obj.prepare_rules('chain', rules, 4),
|
|
|
- expected_nft)
|
|
|
+ ret = self.obj.prepare_rules('chain', rules, 4)
|
|
|
+ self.assertEqual(ret[0], expected_nft)
|
|
|
+ self.assertPrepareRulesDnsRet(ret[1], 'yum.qubes-os.org', 4)
|
|
|
with self.assertRaises(qubesagent.firewall.RuleParseError):
|
|
|
self.obj.prepare_rules('chain', [{'unknown': 'xxx'}], 4)
|
|
|
with self.assertRaises(qubesagent.firewall.RuleParseError):
|
|
@@ -477,8 +514,9 @@ class TestNftablesWorker(TestCase):
|
|
|
' }\n'
|
|
|
'}\n'
|
|
|
)
|
|
|
- self.assertEqual(self.obj.prepare_rules('chain', rules, 6),
|
|
|
- expected_nft)
|
|
|
+ ret = self.obj.prepare_rules('chain', rules, 6)
|
|
|
+ self.assertEqual(ret[0], expected_nft)
|
|
|
+ self.assertPrepareRulesDnsRet(ret[1], 'ripe.net', 6)
|
|
|
|
|
|
def test_004_apply_rules4(self):
|
|
|
rules = [{'action': 'accept'}]
|
|
@@ -486,7 +524,7 @@ class TestNftablesWorker(TestCase):
|
|
|
self.obj.apply_rules('10.137.0.1', rules)
|
|
|
self.assertEqual(self.obj.loaded_rules,
|
|
|
[self.expected_create_chain('ip', '10.137.0.1', chain),
|
|
|
- self.obj.prepare_rules(chain, rules, 4),
|
|
|
+ self.obj.prepare_rules(chain, rules, 4)[0],
|
|
|
])
|
|
|
|
|
|
def test_005_apply_rules6(self):
|
|
@@ -495,7 +533,7 @@ class TestNftablesWorker(TestCase):
|
|
|
self.obj.apply_rules('2000::a', rules)
|
|
|
self.assertEqual(self.obj.loaded_rules,
|
|
|
[self.expected_create_chain('ip6', '2000::a', chain),
|
|
|
- self.obj.prepare_rules(chain, rules, 6),
|
|
|
+ self.obj.prepare_rules(chain, rules, 6)[0],
|
|
|
])
|
|
|
|
|
|
def test_006_init(self):
|
|
@@ -647,11 +685,17 @@ class TestFirewallWorker(TestCase):
|
|
|
def test_handle_addr(self):
|
|
|
self.obj.handle_addr('10.137.0.2')
|
|
|
self.assertEqual(self.obj.rules['10.137.0.2'], [{'action': 'accept'}])
|
|
|
+ self.assertEqual(self.obj.qdb.entries['/qubes-firewall-handled/10.137.0.2'], '1')
|
|
|
+ self.obj.handle_addr('10.137.0.2')
|
|
|
+ self.assertEqual(self.obj.rules['10.137.0.2'], [{'action': 'accept'}])
|
|
|
+ self.assertEqual(self.obj.qdb.entries['/qubes-firewall-handled/10.137.0.2'], '2')
|
|
|
# fallback to block all
|
|
|
self.obj.handle_addr('10.137.0.3')
|
|
|
self.assertEqual(self.obj.rules['10.137.0.3'], [{'action': 'drop'}])
|
|
|
+ self.assertEqual(self.obj.qdb.entries['/qubes-firewall-handled/10.137.0.3'], '1')
|
|
|
self.obj.handle_addr('10.137.0.4')
|
|
|
self.assertEqual(self.obj.rules['10.137.0.4'], [{'action': 'drop'}])
|
|
|
+ self.assertEqual(self.obj.qdb.entries['/qubes-firewall-handled/10.137.0.4'], '1')
|
|
|
|
|
|
@patch('os.path.isfile')
|
|
|
@patch('os.access')
|