From 1da75a676f685c452e223de68388ab0c258d9f0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Thu, 8 Sep 2016 04:10:02 +0200 Subject: [PATCH] qubes/firewall: new firewall interface First part - handling firewall.xml and rules formatting. Specification on https://qubes-os.org/doc/vm-interface/ TODO (for dom0): - plug into QubesVM object - expose rules in QubesDB (including reloading) - drop old functions (vm.get_firewall_conf etc) QubesOS/qubes-issues#1815 --- qubes/firewall.py | 464 ++++++++++++++++++++++++++++++++++ qubes/tests/__init__.py | 1 + qubes/tests/firewall.py | 534 ++++++++++++++++++++++++++++++++++++++++ rpm_spec/core-dom0.spec | 2 + 4 files changed, 1001 insertions(+) create mode 100644 qubes/firewall.py create mode 100644 qubes/tests/firewall.py diff --git a/qubes/firewall.py b/qubes/firewall.py new file mode 100644 index 00000000..aec973f2 --- /dev/null +++ b/qubes/firewall.py @@ -0,0 +1,464 @@ +#!/usr/bin/python2 -O +# vim: fileencoding=utf-8 +# pylint: disable=too-few-public-methods +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 +# Marek Marczykowski-Górecki +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import datetime +import subprocess + +import lxml.etree +import os +import socket + +import qubes +import qubes.vm.qubesvm + + +class RuleOption(object): + def __init__(self, value): + self._value = str(value) + + @property + def rule(self): + raise NotImplementedError + + def __str__(self): + return self._value + + def __eq__(self, other): + return str(self) == other + +# noinspection PyAbstractClass +class RuleChoice(RuleOption): + # pylint: disable=abstract-method + def __init__(self, value): + super(RuleChoice, self).__init__(value) + self.allowed_values = \ + [v for k, v in self.__class__.__dict__.items() + if not k.startswith('__') and isinstance(v, basestring) and + not v.startswith('__')] + if value not in self.allowed_values: + raise ValueError(value) + + +class Action(RuleChoice): + accept = 'accept' + drop = 'drop' + + @property + def rule(self): + return 'action=' + str(self) + + +class Proto(RuleChoice): + tcp = 'tcp' + udp = 'udp' + icmp = 'icmp' + + @property + def rule(self): + return 'proto=' + str(self) + + +class DstHost(RuleOption): + '''Represent host/network address: either IPv4, IPv6, or DNS name''' + def __init__(self, value, prefixlen=None): + # TODO: in python >= 3.3 ipaddress module could be used + if value.count('/') > 1: + raise ValueError('Too many /: ' + value) + elif not value.count('/'): + # add prefix length to bare IP addresses + try: + socket.inet_pton(socket.AF_INET6, value) + self.prefixlen = prefixlen or 128 + if self.prefixlen < 0 or self.prefixlen > 128: + raise ValueError( + 'netmask for IPv6 must be between 0 and 128') + value += '/' + str(self.prefixlen) + self.type = 'dst6' + except socket.error: + try: + socket.inet_pton(socket.AF_INET, value) + if value.count('.') != 3: + raise ValueError( + 'Invalid number of dots in IPv4 address') + self.prefixlen = prefixlen or 32 + if self.prefixlen < 0 or self.prefixlen > 32: + raise ValueError( + 'netmask for IPv4 must be between 0 and 32') + value += '/' + str(self.prefixlen) + self.type = 'dst4' + except socket.error: + self.type = 'dsthost' + self.prefixlen = 0 + else: + host, prefixlen = value.split('/', 1) + prefixlen = int(prefixlen) + if prefixlen < 0: + raise ValueError('netmask must be non-negative') + self.prefixlen = prefixlen + try: + socket.inet_pton(socket.AF_INET6, host) + if prefixlen > 128: + raise ValueError('netmask for IPv6 must be <= 128') + self.type = 'dst6' + except socket.error: + try: + socket.inet_pton(socket.AF_INET, host) + if prefixlen > 32: + raise ValueError('netmask for IPv4 must be <= 32') + self.type = 'dst4' + if host.count('.') != 3: + raise ValueError( + 'Invalid number of dots in IPv4 address') + except socket.error: + raise ValueError('Invalid IP address: ' + host) + + super(DstHost, self).__init__(value) + + @property + def rule(self): + return self.type + '=' + str(self) + + +class DstPorts(RuleOption): + def __init__(self, value): + if isinstance(value, int): + value = str(value) + if value.count('-') == 1: + self.range = [int(x) for x in value.split('-', 1)] + elif not value.count('-'): + self.range = [int(value), int(value)] + else: + raise ValueError(value) + if any(port < 0 or port > 65536 for port in self.range): + raise ValueError('Ports out of range') + if self.range[0] > self.range[1]: + raise ValueError('Invalid port range') + super(DstPorts, self).__init__( + str(self.range[0]) if self.range[0] == self.range[1] + else '-'.join(map(str, self.range))) + + @property + def rule(self): + return 'dstports=' + '{!s}-{!s}'.format(*self.range) + + +class IcmpType(RuleOption): + def __init__(self, value): + super(IcmpType, self).__init__(value) + value = int(value) + if value < 0 or value > 255: + raise ValueError('ICMP type out of range') + + @property + def rule(self): + return 'icmptype=' + str(self) + + +class SpecialTarget(RuleChoice): + dns = 'dns' + + @property + def rule(self): + return 'specialtarget=' + str(self) + + +class Expire(RuleOption): + def __init__(self, value): + super(Expire, self).__init__(value) + self.datetime = datetime.datetime.utcfromtimestamp(int(value)) + + @property + def rule(self): + return None + + @property + def expired(self): + return self.datetime < datetime.datetime.utcnow() + + +class Comment(RuleOption): + @property + def rule(self): + return None + + +class Rule(qubes.PropertyHolder): + def __init__(self, xml, **kwargs): + super(Rule, self).__init__(xml, **kwargs) + self.load_properties() + self.events_enabled = True + # validate dependencies + if self.dstports: + self.on_set_dstports('property-set:dstports', 'dstports', + self.dstports, None) + if self.icmptype: + self.on_set_icmptype('property-set:icmptype', 'icmptype', + self.icmptype, None) + self.property_require('action', False, True) + + action = qubes.property('action', + type=Action, + order=0, + doc='rule action') + + proto = qubes.property('proto', + type=Proto, + default=None, + order=1, + doc='protocol to match') + + dsthost = qubes.property('dsthost', + type=DstHost, + default=None, + order=1, + doc='destination host/network') + + dstports = qubes.property('dstports', + type=DstPorts, + default=None, + order=2, + doc='Destination port(s) (for \'tcp\' and \'udp\' protocol only)') + + icmptype = qubes.property('icmptype', + type=IcmpType, + default=None, + order=2, + doc='ICMP packet type (for \'icmp\' protocol only)') + + specialtarget = qubes.property('specialtarget', + type=SpecialTarget, + default=None, + order=1, + doc='Special target, for now only \'dns\' supported') + + expire = qubes.property('expire', + type=Expire, + default=None, + doc='Timestamp (UNIX epoch) on which this rule expire') + + comment = qubes.property('comment', + type=Comment, + default=None, + doc='User comment') + + # noinspection PyUnusedLocal + @qubes.events.handler('property-pre-set:dstports') + def on_set_dstports(self, _event, _prop, _new_value, _old_value=None): + if self.proto not in ('tcp', 'udp'): + raise ValueError( + 'dstports valid only for \'tcp\' and \'udp\' protocols') + + # noinspection PyUnusedLocal + @qubes.events.handler('property-pre-set:icmptype') + def on_set_icmptype(self, _event, _prop, _new_value, _old_value=None): + if self.proto not in ('icmp',): + raise ValueError('icmptype valid only for \'icmp\' protocol') + + # noinspection PyUnusedLocal + @qubes.events.handler('property-set:proto') + def on_set_proto(self, _event, _prop, new_value, _old_value=None): + if new_value not in ('tcp', 'udp'): + self.dstports = qubes.property.DEFAULT + if new_value not in ('icmp',): + self.icmptype = qubes.property.DEFAULT + + @qubes.events.handler('property-del:proto') + def on_del_proto(self, _event, _prop, _old_value): + self.dstports = qubes.property.DEFAULT + self.icmptype = qubes.property.DEFAULT + + @property + def rule(self): + if self.expire and self.expire.expired: + return None + values = [] + for prop in self.property_list(): + value = getattr(self, prop.__name__) + if value is None: + continue + if value.rule is None: + continue + values.append(value.rule) + return ' '.join(values) + + @classmethod + def from_xml_v1(cls, node, action): + netmask = node.get('netmask') + if netmask is None: + netmask = 32 + else: + netmask = int(netmask) + address = node.get('address') + if address: + dsthost = DstHost(address, netmask) + else: + dsthost = None + + proto = node.get('proto') + + port = node.get('port') + toport = node.get('toport') + if port and toport: + dstports = port + '-' + toport + elif port: + dstports = port + else: + dstports = None + + # backward compatibility: protocol defaults to TCP if port is specified + if dstports and not proto: + proto = 'tcp' + + if proto == 'any': + proto = None + + expire = node.get('expire') + + kwargs = { + 'action': action, + } + if dsthost: + kwargs['dsthost'] = dsthost + if dstports: + kwargs['dstports'] = dstports + if proto: + kwargs['proto'] = proto + if expire: + kwargs['expire'] = expire + + return cls(None, **kwargs) + + def __eq__(self, other): + return self.rule == other.rule + +class Firewall(object): + def __init__(self, vm, load=True): + assert hasattr(vm, 'firewall_conf') + self.vm = vm + #: firewall rules + self.rules = [] + #: default action + self.policy = None + + if load: + self.load() + + def load_defaults(self): + self.rules = [] + self.policy = Action('accept') + + def load(self): + firewall_conf = os.path.join(self.vm.dir_path, self.vm.firewall_conf) + if os.path.exists(firewall_conf): + self.rules = [] + tree = lxml.etree.parse(firewall_conf) + root = tree.getroot() + + version = root.get('version', '1') + if version == '1': + self.load_v1(root) + elif version == '2': + self.load_v2(root) + else: + raise qubes.exc.QubesVMError(self.vm, + 'Unsupported firewall.xml version: {}'.format(version)) + else: + self.load_defaults() + + def load_v1(self, xml_root): + policy_v1 = xml_root.get('policy') + assert policy_v1 in ('allow', 'deny') + if policy_v1 == 'allow': + self.policy = Action('accept') + else: + self.policy = Action('drop') + + def _translate_action(key): + if xml_root.get(key, policy_v1) == 'allow': + return Action.accept + else: + return Action.drop + + self.rules.append(Rule(None, + action=_translate_action('dns'), + specialtarget=SpecialTarget('dns'))) + + self.rules.append(Rule(None, + action=_translate_action('icmp'), + proto=Proto.icmp)) + + if self.policy == Action.accept: + rule_action = Action.drop + else: + rule_action = Action.accept + + for element in xml_root: + rule = Rule.from_xml_v1(element, rule_action) + self.rules.append(rule) + + def load_v2(self, xml_root): + self.policy = Action(xml_root.findtext('policy')) + + xml_rules = xml_root.find('rules') + for xml_rule in xml_rules: + rule = Rule(xml_rule) + self.rules.append(rule) + + def save(self): + firewall_conf = os.path.join(self.vm.dir_path, self.vm.firewall_conf) + expiring_rules_present = False + + xml_root = lxml.etree.Element('firewall', version=str(2)) + + xml_policy = lxml.etree.Element('policy') + xml_policy.text = str(self.policy) + xml_root.append(xml_policy) + + xml_rules = lxml.etree.Element('rules') + for rule in self.rules: + if rule.expire: + if rule.expire and rule.expire.expired: + continue + else: + expiring_rules_present = True + xml_rule = lxml.etree.Element('rule') + xml_rule.append(rule.xml_properties()) + xml_rules.append(xml_rule) + + xml_root.append(xml_rules) + + xml_tree = lxml.etree.ElementTree(xml_root) + + try: + old_umask = os.umask(0o002) + with open(firewall_conf, 'w') as firewall_xml: + xml_tree.write(firewall_xml, encoding="UTF-8", + pretty_print=True) + os.umask(old_umask) + except EnvironmentError as err: + self.vm.log.error("save error: {}".format(err)) + raise qubes.exc.QubesException('save error: {}'.format(err)) + + if expiring_rules_present and not self.vm.app.vmm.offline_mode: + subprocess.call(["sudo", "systemctl", "start", + "qubes-reload-firewall@%s.timer" % self.vm.name]) diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index d3c58779..b13effa7 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -966,6 +966,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument # unit tests 'qubes.tests.events', 'qubes.tests.devices', + 'qubes.tests.firewall', 'qubes.tests.init', 'qubes.tests.vm.init', 'qubes.tests.storage', diff --git a/qubes/tests/firewall.py b/qubes/tests/firewall.py new file mode 100644 index 00000000..8e4be47d --- /dev/null +++ b/qubes/tests/firewall.py @@ -0,0 +1,534 @@ +#!/usr/bin/python2 -O +# vim: fileencoding=utf-8 + +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 +# Marek Marczykowski-Górecki +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import datetime +import os + +import lxml.etree +import unittest + +import qubes.firewall +import qubes.tests + + +class TestOption(qubes.firewall.RuleChoice): + opt1 = 'opt1' + opt2 = 'opt2' + another = 'another' + +class TestVMM(object): + def __init__(self): + self.offline_mode = True + + +class TestApp(object): + def __init__(self): + self.vmm = TestVMM() + + +class TestVM(object): + def __init__(self): + self.firewall_conf = 'test-firewall.xml' + self.dir_path = '/tmp' + self.app = TestApp() + + +# noinspection PyPep8Naming +class TC_00_RuleChoice(qubes.tests.QubesTestCase): + def test_000_accept_allowed(self): + with self.assertNotRaises(ValueError): + TestOption('opt1') + TestOption('opt2') + TestOption('another') + + def test_001_value_list(self): + instance = TestOption('opt1') + self.assertEqual( + set(instance.allowed_values), {'opt1', 'opt2', 'another'}) + + def test_010_reject_others(self): + self.assertRaises(ValueError, lambda: TestOption('invalid')) + + +class TC_01_Action(qubes.tests.QubesTestCase): + def test_000_allowed_values(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Action('accept') + self.assertEqual( + set(instance.allowed_values), {'accept', 'drop'}) + + def test_001_rule(self): + instance = qubes.firewall.Action('accept') + self.assertEqual(instance.rule, 'action=accept') + + +# noinspection PyPep8Naming +class TC_02_Proto(qubes.tests.QubesTestCase): + def test_000_allowed_values(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Proto('tcp') + self.assertEqual( + set(instance.allowed_values), {'tcp', 'udp', 'icmp'}) + + def test_001_rule(self): + instance = qubes.firewall.Proto('tcp') + self.assertEqual(instance.rule, 'proto=tcp') + + +# noinspection PyPep8Naming +class TC_02_DstHost(qubes.tests.QubesTestCase): + def test_000_hostname(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('qubes-os.org') + self.assertEqual(instance.type, 'dsthost') + + def test_001_ipv4(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('127.0.0.1') + self.assertEqual(instance.type, 'dst4') + self.assertEqual(instance.prefixlen, 32) + self.assertEqual(str(instance), '127.0.0.1/32') + self.assertEqual(instance.rule, 'dst4=127.0.0.1/32') + + def test_002_ipv4_prefixlen(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('127.0.0.0', 8) + self.assertEqual(instance.type, 'dst4') + self.assertEqual(instance.prefixlen, 8) + self.assertEqual(str(instance), '127.0.0.0/8') + self.assertEqual(instance.rule, 'dst4=127.0.0.0/8') + + def test_003_ipv4_parse_prefixlen(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('127.0.0.0/8') + self.assertEqual(instance.type, 'dst4') + self.assertEqual(instance.prefixlen, 8) + self.assertEqual(str(instance), '127.0.0.0/8') + self.assertEqual(instance.rule, 'dst4=127.0.0.0/8') + + def test_004_ipv4_invalid_prefix(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('127.0.0.0/33') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('127.0.0.0', 33) + with self.assertRaises(ValueError): + qubes.firewall.DstHost('127.0.0.0/-1') + + def test_005_ipv4_reject_shortened(self): + # not strictly required, but ppl are used to it + with self.assertRaises(ValueError): + qubes.firewall.DstHost('127/8') + + def test_006_ipv4_invalid_addr(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('137.327.0.0/16') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('1.2.3.4.5/32') + + @unittest.expectedFailure + def test_007_ipv4_invalid_network(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('127.0.0.1/32') + + def test_010_ipv6(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('2001:abcd:efab::3') + self.assertEqual(instance.type, 'dst6') + self.assertEqual(instance.prefixlen, 128) + self.assertEqual(str(instance), '2001:abcd:efab::3/128') + self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128') + + def test_011_ipv6_prefixlen(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('2001:abcd:efab::', 64) + self.assertEqual(instance.type, 'dst6') + self.assertEqual(instance.prefixlen, 64) + self.assertEqual(str(instance), '2001:abcd:efab::/64') + self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64') + + def test_012_ipv6_parse_prefixlen(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstHost('2001:abcd:efab::/64') + self.assertEqual(instance.type, 'dst6') + self.assertEqual(instance.prefixlen, 64) + self.assertEqual(str(instance), '2001:abcd:efab::/64') + self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64') + + def test_013_ipv6_invalid_prefix(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab::3/129') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab::3', 129) + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab::3/-1') + + def test_014_ipv6_invalid_addr(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab0123::3/128') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab:3/128') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efgh::3/128') + + @unittest.expectedFailure + def test_015_ipv6_invalid_network(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('2001:abcd:efab::3/64') + + @unittest.expectedFailure + def test_020_invalid_hostname(self): + with self.assertRaises(ValueError): + qubes.firewall.DstHost('www qubes-os.org') + with self.assertRaises(ValueError): + qubes.firewall.DstHost('https://qubes-os.org') + +class TC_03_DstPorts(qubes.tests.QubesTestCase): + def test_000_single_str(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstPorts('80') + self.assertEqual(str(instance), '80') + self.assertEqual(instance.range, [80, 80]) + self.assertEqual(instance.rule, 'dstports=80-80') + + def test_001_single_int(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstPorts(80) + self.assertEqual(str(instance), '80') + self.assertEqual(instance.range, [80, 80]) + self.assertEqual(instance.rule, 'dstports=80-80') + + def test_002_range(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.DstPorts('80-90') + self.assertEqual(str(instance), '80-90') + self.assertEqual(instance.range, [80, 90]) + self.assertEqual(instance.rule, 'dstports=80-90') + + def test_003_invalid(self): + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('80-90-100') + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('abcdef') + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('80 90') + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('') + + def test_004_reversed_range(self): + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('100-20') + + def test_005_out_of_range(self): + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('1000000000000') + with self.assertRaises(ValueError): + qubes.firewall.DstPorts(1000000000000) + with self.assertRaises(ValueError): + qubes.firewall.DstPorts('1-1000000000000') + + +class TC_04_IcmpType(qubes.tests.QubesTestCase): + def test_000_number(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.IcmpType(8) + self.assertEqual(str(instance), '8') + self.assertEqual(instance.rule, 'icmptype=8') + + def test_001_str(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.IcmpType('8') + self.assertEqual(str(instance), '8') + self.assertEqual(instance.rule, 'icmptype=8') + + def test_002_invalid(self): + with self.assertRaises(ValueError): + qubes.firewall.IcmpType(600) + with self.assertRaises(ValueError): + qubes.firewall.IcmpType(-1) + with self.assertRaises(ValueError): + qubes.firewall.IcmpType('abcde') + with self.assertRaises(ValueError): + qubes.firewall.IcmpType('') + + +class TC_05_SpecialTarget(qubes.tests.QubesTestCase): + def test_000_allowed_values(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.SpecialTarget('dns') + self.assertEqual( + set(instance.allowed_values), {'dns'}) + + def test_001_rule(self): + instance = qubes.firewall.SpecialTarget('dns') + self.assertEqual(instance.rule, 'specialtarget=dns') + + +class TC_06_Expire(qubes.tests.QubesTestCase): + def test_000_number(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Expire(1463292452) + self.assertEqual(str(instance), '1463292452') + self.assertEqual(instance.datetime, + datetime.datetime(2016, 5, 15, 6, 7, 32)) + self.assertIsNone(instance.rule) + + def test_001_str(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Expire('1463292452') + self.assertEqual(str(instance), '1463292452') + self.assertEqual(instance.datetime, + datetime.datetime(2016, 5, 15, 6, 7, 32)) + self.assertIsNone(instance.rule) + + def test_002_invalid(self): + with self.assertRaises(ValueError): + qubes.firewall.Expire('abcdef') + with self.assertRaises(ValueError): + qubes.firewall.Expire('') + + def test_003_expired(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Expire('1463292452') + self.assertTrue(instance.expired) + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Expire('1583292452') + self.assertFalse(instance.expired) + + +class TC_07_Comment(qubes.tests.QubesTestCase): + def test_000_str(self): + with self.assertNotRaises(ValueError): + instance = qubes.firewall.Comment('Some comment') + self.assertEqual(str(instance), 'Some comment') + self.assertIsNone(instance.rule) + + +class TC_08_Rule(qubes.tests.QubesTestCase): + def test_000_simple(self): + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule(None, action='accept', proto='icmp') + self.assertEqual(rule.rule, 'action=accept proto=icmp') + self.assertIsNone(rule.dsthost) + self.assertIsNone(rule.dstports) + self.assertIsNone(rule.icmptype) + self.assertIsNone(rule.comment) + self.assertIsNone(rule.expire) + self.assertEqual(str(rule.action), 'accept') + self.assertEqual(str(rule.proto), 'icmp') + + def test_001_expire(self): + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule(None, action='accept', proto='icmp', + expire='1463292452') + self.assertIsNone(rule.rule) + + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule(None, action='accept', proto='icmp', + expire='1663292452') + self.assertIsNotNone(rule.rule) + + + def test_002_dstports(self): + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule(None, action='accept', proto='tcp', + dstports=80) + self.assertEqual(str(rule.dstports), '80') + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule(None, action='accept', proto='udp', + dstports=80) + self.assertEqual(str(rule.dstports), '80') + + def test_003_reject_invalid(self): + with self.assertRaises((ValueError, AssertionError)): + # missing action + qubes.firewall.Rule(None, proto='icmp') + with self.assertRaises(ValueError): + # not proto=tcp or proto=udp for dstports + qubes.firewall.Rule(None, action='accept', proto='icmp', + dstports=80) + with self.assertRaises(ValueError): + # not proto=tcp or proto=udp for dstports + qubes.firewall.Rule(None, action='accept', dstports=80) + with self.assertRaises(ValueError): + # not proto=icmp for icmptype + qubes.firewall.Rule(None, action='accept', proto='tcp', + icmptype=8) + with self.assertRaises(ValueError): + # not proto=icmp for icmptype + qubes.firewall.Rule(None, action='accept', icmptype=8) + + def test_004_proto_change(self): + rule = qubes.firewall.Rule(None, action='accept', proto='tcp') + with self.assertNotRaises(ValueError): + rule.proto = 'udp' + self.assertEqual(rule.rule, 'action=accept proto=udp') + rule = qubes.firewall.Rule(None, action='accept', proto='tcp', + dstports=80) + with self.assertNotRaises(ValueError): + rule.proto = 'udp' + self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80') + rule = qubes.firewall.Rule(None, action='accept') + with self.assertNotRaises(ValueError): + rule.proto = 'udp' + self.assertEqual(rule.rule, 'action=accept proto=udp') + with self.assertNotRaises(ValueError): + rule.dstports = 80 + self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80') + with self.assertNotRaises(ValueError): + rule.proto = 'icmp' + self.assertEqual(rule.rule, 'action=accept proto=icmp') + self.assertIsNone(rule.dstports) + rule.icmptype = 8 + self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8') + with self.assertNotRaises(ValueError): + rule.proto = qubes.property.DEFAULT + self.assertEqual(rule.rule, 'action=accept') + self.assertIsNone(rule.dstports) + + def test_005_from_xml_v1(self): + xml_txt = \ + '' + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule.from_xml_v1( + lxml.etree.fromstring(xml_txt), 'accept') + self.assertEqual(rule.dsthost, '192.168.0.0/24') + self.assertEqual(rule.proto, 'tcp') + self.assertEqual(rule.dstports, '443') + self.assertIsNone(rule.expire) + self.assertIsNone(rule.comment) + + def test_006_from_xml_v1(self): + xml_txt = \ + '' + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule.from_xml_v1( + lxml.etree.fromstring(xml_txt), 'drop') + self.assertEqual(rule.dsthost, 'qubes-os.org') + self.assertEqual(rule.proto, 'tcp') + self.assertEqual(rule.dstports, '443-1024') + self.assertEqual(rule.action, 'drop') + self.assertIsNone(rule.expire) + self.assertIsNone(rule.comment) + + def test_007_from_xml_v1(self): + xml_txt = \ + '' + with self.assertNotRaises(ValueError): + rule = qubes.firewall.Rule.from_xml_v1( + lxml.etree.fromstring(xml_txt), 'accept') + self.assertEqual(rule.dsthost, '192.168.0.0/24') + self.assertEqual(rule.expire, '1463292452') + self.assertEqual(rule.action, 'accept') + self.assertIsNone(rule.proto) + self.assertIsNone(rule.dstports) + + +class TC_10_Firewall(qubes.tests.QubesTestCase): + def setUp(self): + super(TC_10_Firewall, self).setUp() + self.vm = TestVM() + firewall_path = os.path.join('/tmp', self.vm.firewall_conf) + if os.path.exists(firewall_path): + os.unlink(firewall_path) + + def tearDown(self): + firewall_path = os.path.join('/tmp', self.vm.firewall_conf) + if os.path.exists(firewall_path): + os.unlink(firewall_path) + return super(TC_10_Firewall, self).tearDown() + + def test_000_defaults(self): + fw = qubes.firewall.Firewall(self.vm, False) + fw.load_defaults() + self.assertEqual(fw.policy, 'accept') + self.assertEqual(fw.rules, []) + + def test_001_save_load_empty(self): + fw = qubes.firewall.Firewall(self.vm, True) + self.assertEqual(fw.policy, 'accept') + self.assertEqual(fw.rules, []) + fw.save() + fw.load() + self.assertEqual(fw.policy, 'accept') + self.assertEqual(fw.rules, []) + + def test_002_save_load_rules(self): + fw = qubes.firewall.Firewall(self.vm, True) + rules = [ + qubes.firewall.Rule(None, action='drop', proto='icmp'), + qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80), + qubes.firewall.Rule(None, action='accept', proto='udp', + dstports=67), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + fw.rules.extend(rules) + fw.policy = qubes.firewall.Action.drop + fw.save() + self.assertTrue(os.path.exists(os.path.join( + self.vm.dir_path, self.vm.firewall_conf))) + fw = qubes.firewall.Firewall(TestVM(), True) + self.assertEqual(fw.policy, qubes.firewall.Action.drop) + self.assertEqual(fw.rules, rules) + + def test_003_load_v1(self): + xml_txt = """ + + + + """ + with open(os.path.join('/tmp', self.vm.firewall_conf), 'w') as f: + f.write(xml_txt) + with self.assertNotRaises(ValueError): + fw = qubes.firewall.Firewall(self.vm) + self.assertEqual(str(fw.policy), 'drop') + rules = [ + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + qubes.firewall.Rule(None, action='accept', proto='icmp'), + qubes.firewall.Rule(None, action='accept', proto='tcp', + dsthost='192.168.0.0/24', dstports='80'), + qubes.firewall.Rule(None, action='accept', proto='tcp', + dsthost='qubes-os.org', dstports='443') + ] + self.assertEqual(fw.rules, rules) + + def test_004_save_skip_expired(self): + fw = qubes.firewall.Firewall(self.vm, True) + rules = [ + qubes.firewall.Rule(None, action='drop', proto='icmp'), + qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80), + qubes.firewall.Rule(None, action='accept', proto='udp', + dstports=67, expire=1373300257), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + fw.rules.extend(rules) + fw.policy = qubes.firewall.Action.drop + fw.save() + rules.pop(2) + fw = qubes.firewall.Firewall(self.vm, True) + self.assertEqual(fw.rules, rules) + diff --git a/rpm_spec/core-dom0.spec b/rpm_spec/core-dom0.spec index a0cca0db..3ffb12e2 100644 --- a/rpm_spec/core-dom0.spec +++ b/rpm_spec/core-dom0.spec @@ -216,6 +216,7 @@ fi %{python_sitelib}/qubes/devices.py* %{python_sitelib}/qubes/dochelpers.py* %{python_sitelib}/qubes/events.py* +%{python_sitelib}/qubes/firewall.py* %{python_sitelib}/qubes/exc.py* %{python_sitelib}/qubes/log.py* %{python_sitelib}/qubes/rngdoc.py* @@ -281,6 +282,7 @@ fi %{python_sitelib}/qubes/tests/app.py* %{python_sitelib}/qubes/tests/devices.py* %{python_sitelib}/qubes/tests/events.py* +%{python_sitelib}/qubes/tests/firewall.py* %{python_sitelib}/qubes/tests/init.py* %{python_sitelib}/qubes/tests/storage.py* %{python_sitelib}/qubes/tests/storage_file.py*