0200fdadcb
In the end firewall is implemented as .Get and .Set rules, with policy statically set to 'drop'. This way allow atomic firewall updates. Since we already have appropriate firewall format handling in qubes.firewall module - reuse it from there, but adjust the code to be prepared for potentially malicious input. And also mark such variables with untrusted_ prefix. There is also third method: .Reload - which cause firewall reload without making any change. QubesOS/qubes-issues#2622 Fixes QubesOS/qubes-issues#2869
588 lines
23 KiB
Python
588 lines
23 KiB
Python
#
|
|
# The Qubes OS Project, https://www.qubes-os.org/
|
|
#
|
|
# Copyright (C) 2016
|
|
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
#
|
|
|
|
import datetime
|
|
import os
|
|
|
|
import lxml.etree
|
|
import unittest
|
|
|
|
import qubes.firewall
|
|
import qubes.tests
|
|
|
|
|
|
class TestOption(qubes.firewall.RuleChoice):
|
|
opt1 = 'opt1'
|
|
opt2 = 'opt2'
|
|
another = 'another'
|
|
|
|
class TestVMM(object):
|
|
def __init__(self):
|
|
self.offline_mode = True
|
|
|
|
|
|
class TestApp(object):
|
|
def __init__(self):
|
|
self.vmm = TestVMM()
|
|
|
|
|
|
class TestVM(object):
|
|
def __init__(self):
|
|
self.firewall_conf = 'test-firewall.xml'
|
|
self.dir_path = '/tmp'
|
|
self.app = TestApp()
|
|
|
|
def fire_event(self, event):
|
|
pass
|
|
|
|
# noinspection PyPep8Naming
|
|
class TC_00_RuleChoice(qubes.tests.QubesTestCase):
|
|
def test_000_accept_allowed(self):
|
|
with self.assertNotRaises(ValueError):
|
|
TestOption('opt1')
|
|
TestOption('opt2')
|
|
TestOption('another')
|
|
|
|
def test_001_value_list(self):
|
|
instance = TestOption('opt1')
|
|
self.assertEqual(
|
|
set(instance.allowed_values), {'opt1', 'opt2', 'another'})
|
|
|
|
def test_010_reject_others(self):
|
|
self.assertRaises(ValueError, lambda: TestOption('invalid'))
|
|
|
|
|
|
class TC_01_Action(qubes.tests.QubesTestCase):
|
|
def test_000_allowed_values(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Action('accept')
|
|
self.assertEqual(
|
|
set(instance.allowed_values), {'accept', 'drop'})
|
|
|
|
def test_001_rule(self):
|
|
instance = qubes.firewall.Action('accept')
|
|
self.assertEqual(instance.rule, 'action=accept')
|
|
self.assertEqual(instance.api_rule, 'action=accept')
|
|
|
|
|
|
# noinspection PyPep8Naming
|
|
class TC_02_Proto(qubes.tests.QubesTestCase):
|
|
def test_000_allowed_values(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Proto('tcp')
|
|
self.assertEqual(
|
|
set(instance.allowed_values), {'tcp', 'udp', 'icmp'})
|
|
|
|
def test_001_rule(self):
|
|
instance = qubes.firewall.Proto('tcp')
|
|
self.assertEqual(instance.rule, 'proto=tcp')
|
|
self.assertEqual(instance.api_rule, 'proto=tcp')
|
|
|
|
|
|
# noinspection PyPep8Naming
|
|
class TC_02_DstHost(qubes.tests.QubesTestCase):
|
|
def test_000_hostname(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('qubes-os.org')
|
|
self.assertEqual(instance.type, 'dsthost')
|
|
|
|
def test_001_ipv4(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('127.0.0.1')
|
|
self.assertEqual(instance.type, 'dst4')
|
|
self.assertEqual(instance.prefixlen, 32)
|
|
self.assertEqual(str(instance), '127.0.0.1/32')
|
|
self.assertEqual(instance.rule, 'dst4=127.0.0.1/32')
|
|
|
|
def test_002_ipv4_prefixlen(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('127.0.0.0', 8)
|
|
self.assertEqual(instance.type, 'dst4')
|
|
self.assertEqual(instance.prefixlen, 8)
|
|
self.assertEqual(str(instance), '127.0.0.0/8')
|
|
self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
|
|
|
|
def test_003_ipv4_parse_prefixlen(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('127.0.0.0/8')
|
|
self.assertEqual(instance.type, 'dst4')
|
|
self.assertEqual(instance.prefixlen, 8)
|
|
self.assertEqual(str(instance), '127.0.0.0/8')
|
|
self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
|
|
|
|
def test_004_ipv4_invalid_prefix(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('127.0.0.0/33')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('127.0.0.0', 33)
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('127.0.0.0/-1')
|
|
|
|
def test_005_ipv4_reject_shortened(self):
|
|
# not strictly required, but ppl are used to it
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('127/8')
|
|
|
|
def test_006_ipv4_invalid_addr(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('137.327.0.0/16')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('1.2.3.4.5/32')
|
|
|
|
@unittest.expectedFailure
|
|
def test_007_ipv4_invalid_network(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('127.0.0.1/32')
|
|
|
|
def test_010_ipv6(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('2001:abcd:efab::3')
|
|
self.assertEqual(instance.type, 'dst6')
|
|
self.assertEqual(instance.prefixlen, 128)
|
|
self.assertEqual(str(instance), '2001:abcd:efab::3/128')
|
|
self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128')
|
|
self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::3/128')
|
|
|
|
def test_011_ipv6_prefixlen(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('2001:abcd:efab::', 64)
|
|
self.assertEqual(instance.type, 'dst6')
|
|
self.assertEqual(instance.prefixlen, 64)
|
|
self.assertEqual(str(instance), '2001:abcd:efab::/64')
|
|
self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
|
|
self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::/64')
|
|
|
|
def test_012_ipv6_parse_prefixlen(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstHost('2001:abcd:efab::/64')
|
|
self.assertEqual(instance.type, 'dst6')
|
|
self.assertEqual(instance.prefixlen, 64)
|
|
self.assertEqual(str(instance), '2001:abcd:efab::/64')
|
|
self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
|
|
self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::/64')
|
|
|
|
def test_013_ipv6_invalid_prefix(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab::3/129')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab::3', 129)
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab::3/-1')
|
|
|
|
def test_014_ipv6_invalid_addr(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab0123::3/128')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab:3/128')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efgh::3/128')
|
|
|
|
@unittest.expectedFailure
|
|
def test_015_ipv6_invalid_network(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('2001:abcd:efab::3/64')
|
|
|
|
@unittest.expectedFailure
|
|
def test_020_invalid_hostname(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('www qubes-os.org')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstHost('https://qubes-os.org')
|
|
|
|
class TC_03_DstPorts(qubes.tests.QubesTestCase):
|
|
def test_000_single_str(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstPorts('80')
|
|
self.assertEqual(str(instance), '80')
|
|
self.assertEqual(instance.range, [80, 80])
|
|
self.assertEqual(instance.rule, 'dstports=80-80')
|
|
self.assertEqual(instance.api_rule, 'dstports=80-80')
|
|
|
|
def test_001_single_int(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstPorts(80)
|
|
self.assertEqual(str(instance), '80')
|
|
self.assertEqual(instance.range, [80, 80])
|
|
self.assertEqual(instance.rule, 'dstports=80-80')
|
|
self.assertEqual(instance.api_rule, 'dstports=80-80')
|
|
|
|
def test_002_range(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.DstPorts('80-90')
|
|
self.assertEqual(str(instance), '80-90')
|
|
self.assertEqual(instance.range, [80, 90])
|
|
self.assertEqual(instance.rule, 'dstports=80-90')
|
|
|
|
def test_003_invalid(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('80-90-100')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('abcdef')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('80 90')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('')
|
|
|
|
def test_004_reversed_range(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('100-20')
|
|
|
|
def test_005_out_of_range(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('1000000000000')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts(1000000000000)
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.DstPorts('1-1000000000000')
|
|
|
|
|
|
class TC_04_IcmpType(qubes.tests.QubesTestCase):
|
|
def test_000_number(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.IcmpType(8)
|
|
self.assertEqual(str(instance), '8')
|
|
self.assertEqual(instance.rule, 'icmptype=8')
|
|
|
|
def test_001_str(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.IcmpType('8')
|
|
self.assertEqual(str(instance), '8')
|
|
self.assertEqual(instance.rule, 'icmptype=8')
|
|
self.assertEqual(instance.api_rule, 'icmptype=8')
|
|
|
|
def test_002_invalid(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.IcmpType(600)
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.IcmpType(-1)
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.IcmpType('abcde')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.IcmpType('')
|
|
|
|
|
|
class TC_05_SpecialTarget(qubes.tests.QubesTestCase):
|
|
def test_000_allowed_values(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.SpecialTarget('dns')
|
|
self.assertEqual(
|
|
set(instance.allowed_values), {'dns'})
|
|
|
|
def test_001_rule(self):
|
|
instance = qubes.firewall.SpecialTarget('dns')
|
|
self.assertEqual(instance.rule, 'specialtarget=dns')
|
|
self.assertEqual(instance.api_rule, 'specialtarget=dns')
|
|
|
|
|
|
class TC_06_Expire(qubes.tests.QubesTestCase):
|
|
def test_000_number(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Expire(1463292452)
|
|
self.assertEqual(str(instance), '1463292452')
|
|
self.assertEqual(instance.api_rule, 'expire=1463292452')
|
|
self.assertEqual(instance.datetime,
|
|
datetime.datetime(2016, 5, 15, 6, 7, 32))
|
|
self.assertIsNone(instance.rule)
|
|
|
|
def test_001_str(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Expire('1463292452')
|
|
self.assertEqual(str(instance), '1463292452')
|
|
self.assertEqual(instance.datetime,
|
|
datetime.datetime(2016, 5, 15, 6, 7, 32))
|
|
self.assertIsNone(instance.rule)
|
|
|
|
def test_002_invalid(self):
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.Expire('abcdef')
|
|
with self.assertRaises(ValueError):
|
|
qubes.firewall.Expire('')
|
|
|
|
def test_003_expired(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Expire('1463292452')
|
|
self.assertTrue(instance.expired)
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Expire('1583292452')
|
|
self.assertFalse(instance.expired)
|
|
|
|
|
|
class TC_07_Comment(qubes.tests.QubesTestCase):
|
|
def test_000_str(self):
|
|
with self.assertNotRaises(ValueError):
|
|
instance = qubes.firewall.Comment('Some comment')
|
|
self.assertEqual(str(instance), 'Some comment')
|
|
self.assertEqual(instance.api_rule, 'comment=Some comment')
|
|
self.assertIsNone(instance.rule)
|
|
|
|
|
|
class TC_08_Rule(qubes.tests.QubesTestCase):
|
|
def test_000_simple(self):
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='icmp')
|
|
self.assertEqual(rule.rule, 'action=accept proto=icmp')
|
|
self.assertIsNone(rule.dsthost)
|
|
self.assertIsNone(rule.dstports)
|
|
self.assertIsNone(rule.icmptype)
|
|
self.assertIsNone(rule.comment)
|
|
self.assertIsNone(rule.expire)
|
|
self.assertEqual(str(rule.action), 'accept')
|
|
self.assertEqual(str(rule.proto), 'icmp')
|
|
|
|
def test_001_expire(self):
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
|
|
expire='1463292452')
|
|
self.assertIsNone(rule.rule)
|
|
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
|
|
expire='1663292452')
|
|
self.assertIsNotNone(rule.rule)
|
|
|
|
|
|
def test_002_dstports(self):
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
|
|
dstports=80)
|
|
self.assertEqual(str(rule.dstports), '80')
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='udp',
|
|
dstports=80)
|
|
self.assertEqual(str(rule.dstports), '80')
|
|
|
|
def test_003_reject_invalid(self):
|
|
with self.assertRaises((ValueError, AssertionError)):
|
|
# missing action
|
|
qubes.firewall.Rule(None, proto='icmp')
|
|
with self.assertRaises(ValueError):
|
|
# not proto=tcp or proto=udp for dstports
|
|
qubes.firewall.Rule(None, action='accept', proto='icmp',
|
|
dstports=80)
|
|
with self.assertRaises(ValueError):
|
|
# not proto=tcp or proto=udp for dstports
|
|
qubes.firewall.Rule(None, action='accept', dstports=80)
|
|
with self.assertRaises(ValueError):
|
|
# not proto=icmp for icmptype
|
|
qubes.firewall.Rule(None, action='accept', proto='tcp',
|
|
icmptype=8)
|
|
with self.assertRaises(ValueError):
|
|
# not proto=icmp for icmptype
|
|
qubes.firewall.Rule(None, action='accept', icmptype=8)
|
|
|
|
def test_004_proto_change(self):
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='tcp')
|
|
with self.assertNotRaises(ValueError):
|
|
rule.proto = 'udp'
|
|
self.assertEqual(rule.rule, 'action=accept proto=udp')
|
|
rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
|
|
dstports=80)
|
|
with self.assertNotRaises(ValueError):
|
|
rule.proto = 'udp'
|
|
self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
|
|
rule = qubes.firewall.Rule(None, action='accept')
|
|
with self.assertNotRaises(ValueError):
|
|
rule.proto = 'udp'
|
|
self.assertEqual(rule.rule, 'action=accept proto=udp')
|
|
with self.assertNotRaises(ValueError):
|
|
rule.dstports = 80
|
|
self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
|
|
with self.assertNotRaises(ValueError):
|
|
rule.proto = 'icmp'
|
|
self.assertEqual(rule.rule, 'action=accept proto=icmp')
|
|
self.assertIsNone(rule.dstports)
|
|
rule.icmptype = 8
|
|
self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8')
|
|
with self.assertNotRaises(ValueError):
|
|
rule.proto = qubes.property.DEFAULT
|
|
self.assertEqual(rule.rule, 'action=accept')
|
|
self.assertIsNone(rule.dstports)
|
|
|
|
def test_005_from_xml_v1(self):
|
|
xml_txt = \
|
|
'<rule address="192.168.0.0" proto="tcp" netmask="24" port="443"/>'
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule.from_xml_v1(
|
|
lxml.etree.fromstring(xml_txt), 'accept')
|
|
self.assertEqual(rule.dsthost, '192.168.0.0/24')
|
|
self.assertEqual(rule.proto, 'tcp')
|
|
self.assertEqual(rule.dstports, '443')
|
|
self.assertIsNone(rule.expire)
|
|
self.assertIsNone(rule.comment)
|
|
|
|
def test_006_from_xml_v1(self):
|
|
xml_txt = \
|
|
'<rule address="qubes-os.org" proto="tcp" ' \
|
|
'port="443" toport="1024"/>'
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule.from_xml_v1(
|
|
lxml.etree.fromstring(xml_txt), 'drop')
|
|
self.assertEqual(rule.dsthost, 'qubes-os.org')
|
|
self.assertEqual(rule.proto, 'tcp')
|
|
self.assertEqual(rule.dstports, '443-1024')
|
|
self.assertEqual(rule.action, 'drop')
|
|
self.assertIsNone(rule.expire)
|
|
self.assertIsNone(rule.comment)
|
|
|
|
def test_007_from_xml_v1(self):
|
|
xml_txt = \
|
|
'<rule address="192.168.0.0" netmask="24" expire="1463292452"/>'
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule.from_xml_v1(
|
|
lxml.etree.fromstring(xml_txt), 'accept')
|
|
self.assertEqual(rule.dsthost, '192.168.0.0/24')
|
|
self.assertEqual(rule.expire, '1463292452')
|
|
self.assertEqual(rule.action, 'accept')
|
|
self.assertIsNone(rule.proto)
|
|
self.assertIsNone(rule.dstports)
|
|
|
|
def test_008_from_api_string(self):
|
|
rule_txt = 'action=drop proto=tcp dstports=80-80'
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule.from_api_string(
|
|
rule_txt)
|
|
self.assertEqual(rule.dstports.range, [80, 80])
|
|
self.assertEqual(rule.proto, 'tcp')
|
|
self.assertEqual(rule.action, 'drop')
|
|
self.assertIsNone(rule.dsthost)
|
|
self.assertIsNone(rule.expire)
|
|
self.assertIsNone(rule.comment)
|
|
self.assertEqual(rule.api_rule, rule_txt)
|
|
|
|
def test_009_from_api_string(self):
|
|
rule_txt = 'action=accept expire=1463292452 proto=tcp ' \
|
|
'comment=Some comment, with spaces'
|
|
with self.assertNotRaises(ValueError):
|
|
rule = qubes.firewall.Rule.from_api_string(
|
|
rule_txt)
|
|
self.assertEqual(rule.comment, 'Some comment, with spaces')
|
|
self.assertEqual(rule.proto, 'tcp')
|
|
self.assertEqual(rule.action, 'accept')
|
|
self.assertEqual(rule.expire, '1463292452')
|
|
self.assertIsNone(rule.dstports)
|
|
self.assertIsNone(rule.dsthost)
|
|
self.assertEqual(rule.api_rule, rule_txt)
|
|
|
|
|
|
class TC_10_Firewall(qubes.tests.QubesTestCase):
|
|
def setUp(self):
|
|
super(TC_10_Firewall, self).setUp()
|
|
self.vm = TestVM()
|
|
firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
|
|
if os.path.exists(firewall_path):
|
|
os.unlink(firewall_path)
|
|
|
|
def tearDown(self):
|
|
firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
|
|
if os.path.exists(firewall_path):
|
|
os.unlink(firewall_path)
|
|
return super(TC_10_Firewall, self).tearDown()
|
|
|
|
def test_000_defaults(self):
|
|
fw = qubes.firewall.Firewall(self.vm, False)
|
|
fw.load_defaults()
|
|
self.assertEqual(fw.policy, 'drop')
|
|
self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
|
|
|
|
def test_001_save_load_empty(self):
|
|
fw = qubes.firewall.Firewall(self.vm, True)
|
|
self.assertEqual(fw.policy, 'drop')
|
|
self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
|
|
fw.save()
|
|
fw.load()
|
|
self.assertEqual(fw.policy, 'drop')
|
|
self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
|
|
|
|
def test_002_save_load_rules(self):
|
|
fw = qubes.firewall.Firewall(self.vm, True)
|
|
rules = [
|
|
qubes.firewall.Rule(None, action='drop', proto='icmp'),
|
|
qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
|
|
qubes.firewall.Rule(None, action='accept', proto='udp',
|
|
dstports=67),
|
|
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
|
]
|
|
fw.rules.extend(rules)
|
|
fw.save()
|
|
self.assertTrue(os.path.exists(os.path.join(
|
|
self.vm.dir_path, self.vm.firewall_conf)))
|
|
fw = qubes.firewall.Firewall(TestVM(), True)
|
|
self.assertEqual(fw.policy, qubes.firewall.Action.drop)
|
|
self.assertEqual(fw.rules,
|
|
[qubes.firewall.Rule(None, action='accept')] + rules)
|
|
|
|
def test_003_load_v1(self):
|
|
xml_txt = """<QubesFirewallRules dns="allow" icmp="allow"
|
|
policy="deny" yumProxy="allow">
|
|
<rule address="192.168.0.0" proto="tcp" netmask="24" port="80"/>
|
|
<rule address="qubes-os.org" proto="tcp" port="443"/>
|
|
</QubesFirewallRules>
|
|
"""
|
|
with open(os.path.join('/tmp', self.vm.firewall_conf), 'w') as f:
|
|
f.write(xml_txt)
|
|
with self.assertNotRaises(ValueError):
|
|
fw = qubes.firewall.Firewall(self.vm)
|
|
self.assertEqual(str(fw.policy), 'drop')
|
|
rules = [
|
|
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
|
qubes.firewall.Rule(None, action='accept', proto='icmp'),
|
|
qubes.firewall.Rule(None, action='accept', proto='tcp',
|
|
dsthost='192.168.0.0/24', dstports='80'),
|
|
qubes.firewall.Rule(None, action='accept', proto='tcp',
|
|
dsthost='qubes-os.org', dstports='443')
|
|
]
|
|
self.assertEqual(fw.rules, rules)
|
|
|
|
def test_004_save_skip_expired(self):
|
|
fw = qubes.firewall.Firewall(self.vm, True)
|
|
rules = [
|
|
qubes.firewall.Rule(None, action='drop', proto='icmp'),
|
|
qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
|
|
qubes.firewall.Rule(None, action='accept', proto='udp',
|
|
dstports=67, expire=1373300257),
|
|
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
|
]
|
|
fw.rules = rules
|
|
fw.save()
|
|
rules.pop(2)
|
|
fw = qubes.firewall.Firewall(self.vm, True)
|
|
self.assertEqual(fw.rules, rules)
|
|
|
|
def test_005_qdb_entries(self):
|
|
fw = qubes.firewall.Firewall(self.vm, True)
|
|
rules = [
|
|
qubes.firewall.Rule(None, action='drop', proto='icmp'),
|
|
qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
|
|
qubes.firewall.Rule(None, action='accept', proto='udp'),
|
|
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
|
]
|
|
fw.rules = rules
|
|
expected_qdb_entries = {
|
|
'policy': 'drop',
|
|
'0000': 'action=drop proto=icmp',
|
|
'0001': 'action=drop proto=tcp dstports=80-80',
|
|
'0002': 'action=accept proto=udp',
|
|
'0003': 'action=accept specialtarget=dns',
|
|
}
|
|
self.assertEqual(fw.qdb_entries(), expected_qdb_entries)
|