123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607 |
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2016
- # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- import datetime
- import os
- import asyncio
- import lxml.etree
- import unittest
- import qubes.firewall
- import qubes.tests
- class TestOption(qubes.firewall.RuleChoice):
- opt1 = 'opt1'
- opt2 = 'opt2'
- another = 'another'
- class TestVMM(object):
- def __init__(self):
- self.offline_mode = True
- class TestApp(object):
- def __init__(self):
- self.vmm = TestVMM()
- class TestVM(object):
- def __init__(self):
- self.firewall_conf = 'test-firewall.xml'
- self.dir_path = '/tmp'
- self.app = TestApp()
- def fire_event(self, event):
- pass
- # noinspection PyPep8Naming
- class TC_00_RuleChoice(qubes.tests.QubesTestCase):
- def test_000_accept_allowed(self):
- with self.assertNotRaises(ValueError):
- TestOption('opt1')
- TestOption('opt2')
- TestOption('another')
- def test_001_value_list(self):
- instance = TestOption('opt1')
- self.assertEqual(
- set(instance.allowed_values), {'opt1', 'opt2', 'another'})
- def test_010_reject_others(self):
- self.assertRaises(ValueError, lambda: TestOption('invalid'))
- class TC_01_Action(qubes.tests.QubesTestCase):
- def test_000_allowed_values(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Action('accept')
- self.assertEqual(
- set(instance.allowed_values), {'accept', 'drop'})
- def test_001_rule(self):
- instance = qubes.firewall.Action('accept')
- self.assertEqual(instance.rule, 'action=accept')
- self.assertEqual(instance.api_rule, 'action=accept')
- # noinspection PyPep8Naming
- class TC_02_Proto(qubes.tests.QubesTestCase):
- def test_000_allowed_values(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Proto('tcp')
- self.assertEqual(
- set(instance.allowed_values), {'tcp', 'udp', 'icmp'})
- def test_001_rule(self):
- instance = qubes.firewall.Proto('tcp')
- self.assertEqual(instance.rule, 'proto=tcp')
- self.assertEqual(instance.api_rule, 'proto=tcp')
- # noinspection PyPep8Naming
- class TC_02_DstHost(qubes.tests.QubesTestCase):
- def test_000_hostname(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('qubes-os.org')
- self.assertEqual(instance.type, 'dsthost')
- def test_001_ipv4(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('127.0.0.1')
- self.assertEqual(instance.type, 'dst4')
- self.assertEqual(instance.prefixlen, 32)
- self.assertEqual(str(instance), '127.0.0.1/32')
- self.assertEqual(instance.rule, 'dst4=127.0.0.1/32')
- def test_002_ipv4_prefixlen(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('127.0.0.0', 8)
- self.assertEqual(instance.type, 'dst4')
- self.assertEqual(instance.prefixlen, 8)
- self.assertEqual(str(instance), '127.0.0.0/8')
- self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
- def test_003_ipv4_parse_prefixlen(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('127.0.0.0/8')
- self.assertEqual(instance.type, 'dst4')
- self.assertEqual(instance.prefixlen, 8)
- self.assertEqual(str(instance), '127.0.0.0/8')
- self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
- def test_004_ipv4_invalid_prefix(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('127.0.0.0/33')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('127.0.0.0', 33)
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('127.0.0.0/-1')
- def test_005_ipv4_reject_shortened(self):
- # not strictly required, but ppl are used to it
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('127/8')
- def test_006_ipv4_invalid_addr(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('137.327.0.0/16')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('1.2.3.4.5/32')
- @unittest.expectedFailure
- def test_007_ipv4_invalid_network(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('127.0.0.1/32')
- def test_010_ipv6(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('2001:abcd:efab::3')
- self.assertEqual(instance.type, 'dst6')
- self.assertEqual(instance.prefixlen, 128)
- self.assertEqual(str(instance), '2001:abcd:efab::3/128')
- self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128')
- self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::3/128')
- def test_011_ipv6_prefixlen(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('2001:abcd:efab::', 64)
- self.assertEqual(instance.type, 'dst6')
- self.assertEqual(instance.prefixlen, 64)
- self.assertEqual(str(instance), '2001:abcd:efab::/64')
- self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
- self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::/64')
- def test_012_ipv6_parse_prefixlen(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstHost('2001:abcd:efab::/64')
- self.assertEqual(instance.type, 'dst6')
- self.assertEqual(instance.prefixlen, 64)
- self.assertEqual(str(instance), '2001:abcd:efab::/64')
- self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
- self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::/64')
- def test_013_ipv6_invalid_prefix(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab::3/129')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab::3', 129)
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab::3/-1')
- def test_014_ipv6_invalid_addr(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab0123::3/128')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab:3/128')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efgh::3/128')
- @unittest.expectedFailure
- def test_015_ipv6_invalid_network(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('2001:abcd:efab::3/64')
- def test_020_invalid_hostname(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('www qubes-os.org')
- with self.assertRaises(ValueError):
- qubes.firewall.DstHost('https://qubes-os.org')
- class TC_03_DstPorts(qubes.tests.QubesTestCase):
- def test_000_single_str(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstPorts('80')
- self.assertEqual(str(instance), '80')
- self.assertEqual(instance.range, [80, 80])
- self.assertEqual(instance.rule, 'dstports=80-80')
- self.assertEqual(instance.api_rule, 'dstports=80-80')
- def test_001_single_int(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstPorts(80)
- self.assertEqual(str(instance), '80')
- self.assertEqual(instance.range, [80, 80])
- self.assertEqual(instance.rule, 'dstports=80-80')
- self.assertEqual(instance.api_rule, 'dstports=80-80')
- def test_002_range(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.DstPorts('80-90')
- self.assertEqual(str(instance), '80-90')
- self.assertEqual(instance.range, [80, 90])
- self.assertEqual(instance.rule, 'dstports=80-90')
- def test_003_invalid(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('80-90-100')
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('abcdef')
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('80 90')
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('')
- def test_004_reversed_range(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('100-20')
- def test_005_out_of_range(self):
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('1000000000000')
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts(1000000000000)
- with self.assertRaises(ValueError):
- qubes.firewall.DstPorts('1-1000000000000')
- class TC_04_IcmpType(qubes.tests.QubesTestCase):
- def test_000_number(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.IcmpType(8)
- self.assertEqual(str(instance), '8')
- self.assertEqual(instance.rule, 'icmptype=8')
- def test_001_str(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.IcmpType('8')
- self.assertEqual(str(instance), '8')
- self.assertEqual(instance.rule, 'icmptype=8')
- self.assertEqual(instance.api_rule, 'icmptype=8')
- def test_002_invalid(self):
- with self.assertRaises(ValueError):
- qubes.firewall.IcmpType(600)
- with self.assertRaises(ValueError):
- qubes.firewall.IcmpType(-1)
- with self.assertRaises(ValueError):
- qubes.firewall.IcmpType('abcde')
- with self.assertRaises(ValueError):
- qubes.firewall.IcmpType('')
- class TC_05_SpecialTarget(qubes.tests.QubesTestCase):
- def test_000_allowed_values(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.SpecialTarget('dns')
- self.assertEqual(
- set(instance.allowed_values), {'dns'})
- def test_001_rule(self):
- instance = qubes.firewall.SpecialTarget('dns')
- self.assertEqual(instance.rule, 'specialtarget=dns')
- self.assertEqual(instance.api_rule, 'specialtarget=dns')
- class TC_06_Expire(qubes.tests.QubesTestCase):
- def test_000_number(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Expire(1463292452)
- self.assertEqual(str(instance), '1463292452')
- self.assertEqual(instance.api_rule, 'expire=1463292452')
- self.assertEqual(instance.datetime,
- datetime.datetime.fromtimestamp(1463292452))
- self.assertIsNone(instance.rule)
- def test_001_str(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Expire('1463292452')
- self.assertEqual(str(instance), '1463292452')
- self.assertEqual(instance.datetime,
- datetime.datetime.fromtimestamp(1463292452))
- self.assertIsNone(instance.rule)
- def test_002_invalid(self):
- with self.assertRaises(ValueError):
- qubes.firewall.Expire('abcdef')
- with self.assertRaises(ValueError):
- qubes.firewall.Expire('')
- def test_003_expired(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Expire('1463292452')
- self.assertTrue(instance.expired)
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Expire('2583292452')
- self.assertFalse(instance.expired)
- class TC_07_Comment(qubes.tests.QubesTestCase):
- def test_000_str(self):
- with self.assertNotRaises(ValueError):
- instance = qubes.firewall.Comment('Some comment')
- self.assertEqual(str(instance), 'Some comment')
- self.assertEqual(instance.api_rule, 'comment=Some comment')
- self.assertIsNone(instance.rule)
- class TC_08_Rule(qubes.tests.QubesTestCase):
- def test_000_simple(self):
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule(None, action='accept', proto='icmp')
- self.assertEqual(rule.rule, 'action=accept proto=icmp')
- self.assertIsNone(rule.dsthost)
- self.assertIsNone(rule.dstports)
- self.assertIsNone(rule.icmptype)
- self.assertIsNone(rule.comment)
- self.assertIsNone(rule.expire)
- self.assertEqual(str(rule.action), 'accept')
- self.assertEqual(str(rule.proto), 'icmp')
- def test_001_expire(self):
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
- expire='1463292452')
- self.assertIsNone(rule.rule)
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
- expire='1663292452')
- self.assertIsNotNone(rule.rule)
- def test_002_dstports(self):
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
- dstports=80)
- self.assertEqual(str(rule.dstports), '80')
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule(None, action='accept', proto='udp',
- dstports=80)
- self.assertEqual(str(rule.dstports), '80')
- def test_003_reject_invalid(self):
- with self.assertRaises((ValueError, AssertionError)):
- # missing action
- qubes.firewall.Rule(None, proto='icmp')
- with self.assertRaises(ValueError):
- # not proto=tcp or proto=udp for dstports
- qubes.firewall.Rule(None, action='accept', proto='icmp',
- dstports=80)
- with self.assertRaises(ValueError):
- # not proto=tcp or proto=udp for dstports
- qubes.firewall.Rule(None, action='accept', dstports=80)
- with self.assertRaises(ValueError):
- # not proto=icmp for icmptype
- qubes.firewall.Rule(None, action='accept', proto='tcp',
- icmptype=8)
- with self.assertRaises(ValueError):
- # not proto=icmp for icmptype
- qubes.firewall.Rule(None, action='accept', icmptype=8)
- def test_004_proto_change(self):
- rule = qubes.firewall.Rule(None, action='accept', proto='tcp')
- with self.assertNotRaises(ValueError):
- rule.proto = 'udp'
- self.assertEqual(rule.rule, 'action=accept proto=udp')
- rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
- dstports=80)
- with self.assertNotRaises(ValueError):
- rule.proto = 'udp'
- self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
- rule = qubes.firewall.Rule(None, action='accept')
- with self.assertNotRaises(ValueError):
- rule.proto = 'udp'
- self.assertEqual(rule.rule, 'action=accept proto=udp')
- with self.assertNotRaises(ValueError):
- rule.dstports = 80
- self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
- with self.assertNotRaises(ValueError):
- rule.proto = 'icmp'
- self.assertEqual(rule.rule, 'action=accept proto=icmp')
- self.assertIsNone(rule.dstports)
- rule.icmptype = 8
- self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8')
- with self.assertNotRaises(ValueError):
- rule.proto = qubes.property.DEFAULT
- self.assertEqual(rule.rule, 'action=accept')
- self.assertIsNone(rule.dstports)
- def test_005_from_xml_v1(self):
- xml_txt = \
- '<rule address="192.168.0.0" proto="tcp" netmask="24" port="443"/>'
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule.from_xml_v1(
- lxml.etree.fromstring(xml_txt), 'accept')
- self.assertEqual(rule.dsthost, '192.168.0.0/24')
- self.assertEqual(rule.proto, 'tcp')
- self.assertEqual(rule.dstports, '443')
- self.assertIsNone(rule.expire)
- self.assertIsNone(rule.comment)
- def test_006_from_xml_v1(self):
- xml_txt = \
- '<rule address="qubes-os.org" proto="tcp" ' \
- 'port="443" toport="1024"/>'
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule.from_xml_v1(
- lxml.etree.fromstring(xml_txt), 'drop')
- self.assertEqual(rule.dsthost, 'qubes-os.org')
- self.assertEqual(rule.proto, 'tcp')
- self.assertEqual(rule.dstports, '443-1024')
- self.assertEqual(rule.action, 'drop')
- self.assertIsNone(rule.expire)
- self.assertIsNone(rule.comment)
- def test_007_from_xml_v1(self):
- xml_txt = \
- '<rule address="192.168.0.0" netmask="24" expire="1463292452"/>'
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule.from_xml_v1(
- lxml.etree.fromstring(xml_txt), 'accept')
- self.assertEqual(rule.dsthost, '192.168.0.0/24')
- self.assertEqual(rule.expire, '1463292452')
- self.assertEqual(rule.action, 'accept')
- self.assertIsNone(rule.proto)
- self.assertIsNone(rule.dstports)
- def test_008_from_api_string(self):
- rule_txt = 'action=drop proto=tcp dstports=80-80'
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule.from_api_string(
- rule_txt)
- self.assertEqual(rule.dstports.range, [80, 80])
- self.assertEqual(rule.proto, 'tcp')
- self.assertEqual(rule.action, 'drop')
- self.assertIsNone(rule.dsthost)
- self.assertIsNone(rule.expire)
- self.assertIsNone(rule.comment)
- self.assertEqual(rule.api_rule, rule_txt)
- def test_009_from_api_string(self):
- rule_txt = 'action=accept expire=2063292452 proto=tcp ' \
- 'comment=Some comment, with spaces'
- with self.assertNotRaises(ValueError):
- rule = qubes.firewall.Rule.from_api_string(
- rule_txt)
- self.assertEqual(rule.comment, 'Some comment, with spaces')
- self.assertEqual(rule.proto, 'tcp')
- self.assertEqual(rule.action, 'accept')
- self.assertEqual(rule.expire, '2063292452')
- self.assertIsNone(rule.dstports)
- self.assertIsNone(rule.dsthost)
- self.assertEqual(rule.api_rule, rule_txt)
- class TC_10_Firewall(qubes.tests.QubesTestCase):
- def setUp(self):
- super(TC_10_Firewall, self).setUp()
- self.vm = TestVM()
- firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
- if os.path.exists(firewall_path):
- os.unlink(firewall_path)
- def tearDown(self):
- firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
- if os.path.exists(firewall_path):
- os.unlink(firewall_path)
- return super(TC_10_Firewall, self).tearDown()
- def test_000_defaults(self):
- fw = qubes.firewall.Firewall(self.vm, False)
- fw.load_defaults()
- self.assertEqual(fw.policy, 'drop')
- self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
- def test_001_save_load_empty(self):
- fw = qubes.firewall.Firewall(self.vm, True)
- self.assertEqual(fw.policy, 'drop')
- self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
- fw.save()
- fw.load()
- self.assertEqual(fw.policy, 'drop')
- self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
- def test_002_save_load_rules(self):
- fw = qubes.firewall.Firewall(self.vm, True)
- rules = [
- qubes.firewall.Rule(None, action='drop', proto='icmp'),
- qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
- qubes.firewall.Rule(None, action='accept', proto='udp',
- dstports=67),
- qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
- ]
- fw.rules.extend(rules)
- fw.save()
- self.assertTrue(os.path.exists(os.path.join(
- self.vm.dir_path, self.vm.firewall_conf)))
- fw = qubes.firewall.Firewall(TestVM(), True)
- self.assertEqual(fw.policy, qubes.firewall.Action.drop)
- self.assertEqual(fw.rules,
- [qubes.firewall.Rule(None, action='accept')] + rules)
- def test_003_load_v1(self):
- xml_txt = """<QubesFirewallRules dns="allow" icmp="allow"
- policy="deny" yumProxy="allow">
- <rule address="192.168.0.0" proto="tcp" netmask="24" port="80"/>
- <rule address="qubes-os.org" proto="tcp" port="443"/>
- </QubesFirewallRules>
- """
- with open(os.path.join('/tmp', self.vm.firewall_conf), 'w') as f:
- f.write(xml_txt)
- with self.assertNotRaises(ValueError):
- fw = qubes.firewall.Firewall(self.vm)
- self.assertEqual(str(fw.policy), 'drop')
- rules = [
- qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
- qubes.firewall.Rule(None, action='accept', proto='icmp'),
- qubes.firewall.Rule(None, action='accept', proto='tcp',
- dsthost='192.168.0.0/24', dstports='80'),
- qubes.firewall.Rule(None, action='accept', proto='tcp',
- dsthost='qubes-os.org', dstports='443')
- ]
- self.assertEqual(fw.rules, rules)
- def test_004_save_skip_expired(self):
- fw = qubes.firewall.Firewall(self.vm, True)
- rules = [
- qubes.firewall.Rule(None, action='drop', proto='icmp'),
- qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
- qubes.firewall.Rule(None, action='accept', proto='udp',
- dstports=67, expire=1373300257),
- qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
- ]
- fw.rules = rules
- fw.save()
- rules.pop(2)
- fw = qubes.firewall.Firewall(self.vm, True)
- self.assertEqual(fw.rules, rules)
- def test_005_qdb_entries(self):
- fw = qubes.firewall.Firewall(self.vm, True)
- rules = [
- qubes.firewall.Rule(None, action='drop', proto='icmp'),
- qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
- qubes.firewall.Rule(None, action='accept', proto='udp'),
- qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
- ]
- fw.rules = rules
- expected_qdb_entries = {
- 'policy': 'drop',
- '0000': 'action=drop proto=icmp',
- '0001': 'action=drop proto=tcp dstports=80-80',
- '0002': 'action=accept proto=udp',
- '0003': 'action=accept specialtarget=dns',
- }
- self.assertEqual(fw.qdb_entries(), expected_qdb_entries)
- def test_006_auto_expire_rules(self):
- fw = qubes.firewall.Firewall(self.vm, True)
- rules = [
- qubes.firewall.Rule(None, action='drop', proto='icmp'),
- qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
- qubes.firewall.Rule(None, action='accept', proto='udp',
- dstports=67, expire=self.loop.time() + 5),
- qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
- ]
- fw.rules = rules
- fw.save()
- self.assertEqual(fw.rules, rules)
- self.loop.run_until_complete(asyncio.sleep(3))
- # still old rules should be there
- self.assertEqual(fw.rules, rules)
- rules.pop(2)
- self.loop.run_until_complete(asyncio.sleep(3))
- # expect new rules
- self.assertEqual(fw.rules, rules)
|