qubes/firewall: new firewall interface
First part - handling firewall.xml and rules formatting. Specification on https://qubes-os.org/doc/vm-interface/ TODO (for dom0): - plug into QubesVM object - expose rules in QubesDB (including reloading) - drop old functions (vm.get_firewall_conf etc) QubesOS/qubes-issues#1815
This commit is contained in:
parent
1af1784c69
commit
1da75a676f
464
qubes/firewall.py
Normal file
464
qubes/firewall.py
Normal file
@ -0,0 +1,464 @@
|
||||
#!/usr/bin/python2 -O
|
||||
# vim: fileencoding=utf-8
|
||||
# pylint: disable=too-few-public-methods
|
||||
#
|
||||
# The Qubes OS Project, https://www.qubes-os.org/
|
||||
#
|
||||
# Copyright (C) 2016
|
||||
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
import datetime
|
||||
import subprocess
|
||||
|
||||
import lxml.etree
|
||||
import os
|
||||
import socket
|
||||
|
||||
import qubes
|
||||
import qubes.vm.qubesvm
|
||||
|
||||
|
||||
class RuleOption(object):
|
||||
def __init__(self, value):
|
||||
self._value = str(value)
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def __str__(self):
|
||||
return self._value
|
||||
|
||||
def __eq__(self, other):
|
||||
return str(self) == other
|
||||
|
||||
# noinspection PyAbstractClass
|
||||
class RuleChoice(RuleOption):
|
||||
# pylint: disable=abstract-method
|
||||
def __init__(self, value):
|
||||
super(RuleChoice, self).__init__(value)
|
||||
self.allowed_values = \
|
||||
[v for k, v in self.__class__.__dict__.items()
|
||||
if not k.startswith('__') and isinstance(v, basestring) and
|
||||
not v.startswith('__')]
|
||||
if value not in self.allowed_values:
|
||||
raise ValueError(value)
|
||||
|
||||
|
||||
class Action(RuleChoice):
|
||||
accept = 'accept'
|
||||
drop = 'drop'
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return 'action=' + str(self)
|
||||
|
||||
|
||||
class Proto(RuleChoice):
|
||||
tcp = 'tcp'
|
||||
udp = 'udp'
|
||||
icmp = 'icmp'
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return 'proto=' + str(self)
|
||||
|
||||
|
||||
class DstHost(RuleOption):
|
||||
'''Represent host/network address: either IPv4, IPv6, or DNS name'''
|
||||
def __init__(self, value, prefixlen=None):
|
||||
# TODO: in python >= 3.3 ipaddress module could be used
|
||||
if value.count('/') > 1:
|
||||
raise ValueError('Too many /: ' + value)
|
||||
elif not value.count('/'):
|
||||
# add prefix length to bare IP addresses
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, value)
|
||||
self.prefixlen = prefixlen or 128
|
||||
if self.prefixlen < 0 or self.prefixlen > 128:
|
||||
raise ValueError(
|
||||
'netmask for IPv6 must be between 0 and 128')
|
||||
value += '/' + str(self.prefixlen)
|
||||
self.type = 'dst6'
|
||||
except socket.error:
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, value)
|
||||
if value.count('.') != 3:
|
||||
raise ValueError(
|
||||
'Invalid number of dots in IPv4 address')
|
||||
self.prefixlen = prefixlen or 32
|
||||
if self.prefixlen < 0 or self.prefixlen > 32:
|
||||
raise ValueError(
|
||||
'netmask for IPv4 must be between 0 and 32')
|
||||
value += '/' + str(self.prefixlen)
|
||||
self.type = 'dst4'
|
||||
except socket.error:
|
||||
self.type = 'dsthost'
|
||||
self.prefixlen = 0
|
||||
else:
|
||||
host, prefixlen = value.split('/', 1)
|
||||
prefixlen = int(prefixlen)
|
||||
if prefixlen < 0:
|
||||
raise ValueError('netmask must be non-negative')
|
||||
self.prefixlen = prefixlen
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, host)
|
||||
if prefixlen > 128:
|
||||
raise ValueError('netmask for IPv6 must be <= 128')
|
||||
self.type = 'dst6'
|
||||
except socket.error:
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, host)
|
||||
if prefixlen > 32:
|
||||
raise ValueError('netmask for IPv4 must be <= 32')
|
||||
self.type = 'dst4'
|
||||
if host.count('.') != 3:
|
||||
raise ValueError(
|
||||
'Invalid number of dots in IPv4 address')
|
||||
except socket.error:
|
||||
raise ValueError('Invalid IP address: ' + host)
|
||||
|
||||
super(DstHost, self).__init__(value)
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return self.type + '=' + str(self)
|
||||
|
||||
|
||||
class DstPorts(RuleOption):
|
||||
def __init__(self, value):
|
||||
if isinstance(value, int):
|
||||
value = str(value)
|
||||
if value.count('-') == 1:
|
||||
self.range = [int(x) for x in value.split('-', 1)]
|
||||
elif not value.count('-'):
|
||||
self.range = [int(value), int(value)]
|
||||
else:
|
||||
raise ValueError(value)
|
||||
if any(port < 0 or port > 65536 for port in self.range):
|
||||
raise ValueError('Ports out of range')
|
||||
if self.range[0] > self.range[1]:
|
||||
raise ValueError('Invalid port range')
|
||||
super(DstPorts, self).__init__(
|
||||
str(self.range[0]) if self.range[0] == self.range[1]
|
||||
else '-'.join(map(str, self.range)))
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return 'dstports=' + '{!s}-{!s}'.format(*self.range)
|
||||
|
||||
|
||||
class IcmpType(RuleOption):
|
||||
def __init__(self, value):
|
||||
super(IcmpType, self).__init__(value)
|
||||
value = int(value)
|
||||
if value < 0 or value > 255:
|
||||
raise ValueError('ICMP type out of range')
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return 'icmptype=' + str(self)
|
||||
|
||||
|
||||
class SpecialTarget(RuleChoice):
|
||||
dns = 'dns'
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return 'specialtarget=' + str(self)
|
||||
|
||||
|
||||
class Expire(RuleOption):
|
||||
def __init__(self, value):
|
||||
super(Expire, self).__init__(value)
|
||||
self.datetime = datetime.datetime.utcfromtimestamp(int(value))
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
return None
|
||||
|
||||
@property
|
||||
def expired(self):
|
||||
return self.datetime < datetime.datetime.utcnow()
|
||||
|
||||
|
||||
class Comment(RuleOption):
|
||||
@property
|
||||
def rule(self):
|
||||
return None
|
||||
|
||||
|
||||
class Rule(qubes.PropertyHolder):
|
||||
def __init__(self, xml, **kwargs):
|
||||
super(Rule, self).__init__(xml, **kwargs)
|
||||
self.load_properties()
|
||||
self.events_enabled = True
|
||||
# validate dependencies
|
||||
if self.dstports:
|
||||
self.on_set_dstports('property-set:dstports', 'dstports',
|
||||
self.dstports, None)
|
||||
if self.icmptype:
|
||||
self.on_set_icmptype('property-set:icmptype', 'icmptype',
|
||||
self.icmptype, None)
|
||||
self.property_require('action', False, True)
|
||||
|
||||
action = qubes.property('action',
|
||||
type=Action,
|
||||
order=0,
|
||||
doc='rule action')
|
||||
|
||||
proto = qubes.property('proto',
|
||||
type=Proto,
|
||||
default=None,
|
||||
order=1,
|
||||
doc='protocol to match')
|
||||
|
||||
dsthost = qubes.property('dsthost',
|
||||
type=DstHost,
|
||||
default=None,
|
||||
order=1,
|
||||
doc='destination host/network')
|
||||
|
||||
dstports = qubes.property('dstports',
|
||||
type=DstPorts,
|
||||
default=None,
|
||||
order=2,
|
||||
doc='Destination port(s) (for \'tcp\' and \'udp\' protocol only)')
|
||||
|
||||
icmptype = qubes.property('icmptype',
|
||||
type=IcmpType,
|
||||
default=None,
|
||||
order=2,
|
||||
doc='ICMP packet type (for \'icmp\' protocol only)')
|
||||
|
||||
specialtarget = qubes.property('specialtarget',
|
||||
type=SpecialTarget,
|
||||
default=None,
|
||||
order=1,
|
||||
doc='Special target, for now only \'dns\' supported')
|
||||
|
||||
expire = qubes.property('expire',
|
||||
type=Expire,
|
||||
default=None,
|
||||
doc='Timestamp (UNIX epoch) on which this rule expire')
|
||||
|
||||
comment = qubes.property('comment',
|
||||
type=Comment,
|
||||
default=None,
|
||||
doc='User comment')
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
@qubes.events.handler('property-pre-set:dstports')
|
||||
def on_set_dstports(self, _event, _prop, _new_value, _old_value=None):
|
||||
if self.proto not in ('tcp', 'udp'):
|
||||
raise ValueError(
|
||||
'dstports valid only for \'tcp\' and \'udp\' protocols')
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
@qubes.events.handler('property-pre-set:icmptype')
|
||||
def on_set_icmptype(self, _event, _prop, _new_value, _old_value=None):
|
||||
if self.proto not in ('icmp',):
|
||||
raise ValueError('icmptype valid only for \'icmp\' protocol')
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
@qubes.events.handler('property-set:proto')
|
||||
def on_set_proto(self, _event, _prop, new_value, _old_value=None):
|
||||
if new_value not in ('tcp', 'udp'):
|
||||
self.dstports = qubes.property.DEFAULT
|
||||
if new_value not in ('icmp',):
|
||||
self.icmptype = qubes.property.DEFAULT
|
||||
|
||||
@qubes.events.handler('property-del:proto')
|
||||
def on_del_proto(self, _event, _prop, _old_value):
|
||||
self.dstports = qubes.property.DEFAULT
|
||||
self.icmptype = qubes.property.DEFAULT
|
||||
|
||||
@property
|
||||
def rule(self):
|
||||
if self.expire and self.expire.expired:
|
||||
return None
|
||||
values = []
|
||||
for prop in self.property_list():
|
||||
value = getattr(self, prop.__name__)
|
||||
if value is None:
|
||||
continue
|
||||
if value.rule is None:
|
||||
continue
|
||||
values.append(value.rule)
|
||||
return ' '.join(values)
|
||||
|
||||
@classmethod
|
||||
def from_xml_v1(cls, node, action):
|
||||
netmask = node.get('netmask')
|
||||
if netmask is None:
|
||||
netmask = 32
|
||||
else:
|
||||
netmask = int(netmask)
|
||||
address = node.get('address')
|
||||
if address:
|
||||
dsthost = DstHost(address, netmask)
|
||||
else:
|
||||
dsthost = None
|
||||
|
||||
proto = node.get('proto')
|
||||
|
||||
port = node.get('port')
|
||||
toport = node.get('toport')
|
||||
if port and toport:
|
||||
dstports = port + '-' + toport
|
||||
elif port:
|
||||
dstports = port
|
||||
else:
|
||||
dstports = None
|
||||
|
||||
# backward compatibility: protocol defaults to TCP if port is specified
|
||||
if dstports and not proto:
|
||||
proto = 'tcp'
|
||||
|
||||
if proto == 'any':
|
||||
proto = None
|
||||
|
||||
expire = node.get('expire')
|
||||
|
||||
kwargs = {
|
||||
'action': action,
|
||||
}
|
||||
if dsthost:
|
||||
kwargs['dsthost'] = dsthost
|
||||
if dstports:
|
||||
kwargs['dstports'] = dstports
|
||||
if proto:
|
||||
kwargs['proto'] = proto
|
||||
if expire:
|
||||
kwargs['expire'] = expire
|
||||
|
||||
return cls(None, **kwargs)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.rule == other.rule
|
||||
|
||||
class Firewall(object):
|
||||
def __init__(self, vm, load=True):
|
||||
assert hasattr(vm, 'firewall_conf')
|
||||
self.vm = vm
|
||||
#: firewall rules
|
||||
self.rules = []
|
||||
#: default action
|
||||
self.policy = None
|
||||
|
||||
if load:
|
||||
self.load()
|
||||
|
||||
def load_defaults(self):
|
||||
self.rules = []
|
||||
self.policy = Action('accept')
|
||||
|
||||
def load(self):
|
||||
firewall_conf = os.path.join(self.vm.dir_path, self.vm.firewall_conf)
|
||||
if os.path.exists(firewall_conf):
|
||||
self.rules = []
|
||||
tree = lxml.etree.parse(firewall_conf)
|
||||
root = tree.getroot()
|
||||
|
||||
version = root.get('version', '1')
|
||||
if version == '1':
|
||||
self.load_v1(root)
|
||||
elif version == '2':
|
||||
self.load_v2(root)
|
||||
else:
|
||||
raise qubes.exc.QubesVMError(self.vm,
|
||||
'Unsupported firewall.xml version: {}'.format(version))
|
||||
else:
|
||||
self.load_defaults()
|
||||
|
||||
def load_v1(self, xml_root):
|
||||
policy_v1 = xml_root.get('policy')
|
||||
assert policy_v1 in ('allow', 'deny')
|
||||
if policy_v1 == 'allow':
|
||||
self.policy = Action('accept')
|
||||
else:
|
||||
self.policy = Action('drop')
|
||||
|
||||
def _translate_action(key):
|
||||
if xml_root.get(key, policy_v1) == 'allow':
|
||||
return Action.accept
|
||||
else:
|
||||
return Action.drop
|
||||
|
||||
self.rules.append(Rule(None,
|
||||
action=_translate_action('dns'),
|
||||
specialtarget=SpecialTarget('dns')))
|
||||
|
||||
self.rules.append(Rule(None,
|
||||
action=_translate_action('icmp'),
|
||||
proto=Proto.icmp))
|
||||
|
||||
if self.policy == Action.accept:
|
||||
rule_action = Action.drop
|
||||
else:
|
||||
rule_action = Action.accept
|
||||
|
||||
for element in xml_root:
|
||||
rule = Rule.from_xml_v1(element, rule_action)
|
||||
self.rules.append(rule)
|
||||
|
||||
def load_v2(self, xml_root):
|
||||
self.policy = Action(xml_root.findtext('policy'))
|
||||
|
||||
xml_rules = xml_root.find('rules')
|
||||
for xml_rule in xml_rules:
|
||||
rule = Rule(xml_rule)
|
||||
self.rules.append(rule)
|
||||
|
||||
def save(self):
|
||||
firewall_conf = os.path.join(self.vm.dir_path, self.vm.firewall_conf)
|
||||
expiring_rules_present = False
|
||||
|
||||
xml_root = lxml.etree.Element('firewall', version=str(2))
|
||||
|
||||
xml_policy = lxml.etree.Element('policy')
|
||||
xml_policy.text = str(self.policy)
|
||||
xml_root.append(xml_policy)
|
||||
|
||||
xml_rules = lxml.etree.Element('rules')
|
||||
for rule in self.rules:
|
||||
if rule.expire:
|
||||
if rule.expire and rule.expire.expired:
|
||||
continue
|
||||
else:
|
||||
expiring_rules_present = True
|
||||
xml_rule = lxml.etree.Element('rule')
|
||||
xml_rule.append(rule.xml_properties())
|
||||
xml_rules.append(xml_rule)
|
||||
|
||||
xml_root.append(xml_rules)
|
||||
|
||||
xml_tree = lxml.etree.ElementTree(xml_root)
|
||||
|
||||
try:
|
||||
old_umask = os.umask(0o002)
|
||||
with open(firewall_conf, 'w') as firewall_xml:
|
||||
xml_tree.write(firewall_xml, encoding="UTF-8",
|
||||
pretty_print=True)
|
||||
os.umask(old_umask)
|
||||
except EnvironmentError as err:
|
||||
self.vm.log.error("save error: {}".format(err))
|
||||
raise qubes.exc.QubesException('save error: {}'.format(err))
|
||||
|
||||
if expiring_rules_present and not self.vm.app.vmm.offline_mode:
|
||||
subprocess.call(["sudo", "systemctl", "start",
|
||||
"qubes-reload-firewall@%s.timer" % self.vm.name])
|
@ -966,6 +966,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
|
||||
# unit tests
|
||||
'qubes.tests.events',
|
||||
'qubes.tests.devices',
|
||||
'qubes.tests.firewall',
|
||||
'qubes.tests.init',
|
||||
'qubes.tests.vm.init',
|
||||
'qubes.tests.storage',
|
||||
|
534
qubes/tests/firewall.py
Normal file
534
qubes/tests/firewall.py
Normal file
@ -0,0 +1,534 @@
|
||||
#!/usr/bin/python2 -O
|
||||
# vim: fileencoding=utf-8
|
||||
|
||||
#
|
||||
# The Qubes OS Project, https://www.qubes-os.org/
|
||||
#
|
||||
# Copyright (C) 2016
|
||||
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import datetime
|
||||
import os
|
||||
|
||||
import lxml.etree
|
||||
import unittest
|
||||
|
||||
import qubes.firewall
|
||||
import qubes.tests
|
||||
|
||||
|
||||
class TestOption(qubes.firewall.RuleChoice):
|
||||
opt1 = 'opt1'
|
||||
opt2 = 'opt2'
|
||||
another = 'another'
|
||||
|
||||
class TestVMM(object):
|
||||
def __init__(self):
|
||||
self.offline_mode = True
|
||||
|
||||
|
||||
class TestApp(object):
|
||||
def __init__(self):
|
||||
self.vmm = TestVMM()
|
||||
|
||||
|
||||
class TestVM(object):
|
||||
def __init__(self):
|
||||
self.firewall_conf = 'test-firewall.xml'
|
||||
self.dir_path = '/tmp'
|
||||
self.app = TestApp()
|
||||
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
class TC_00_RuleChoice(qubes.tests.QubesTestCase):
|
||||
def test_000_accept_allowed(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
TestOption('opt1')
|
||||
TestOption('opt2')
|
||||
TestOption('another')
|
||||
|
||||
def test_001_value_list(self):
|
||||
instance = TestOption('opt1')
|
||||
self.assertEqual(
|
||||
set(instance.allowed_values), {'opt1', 'opt2', 'another'})
|
||||
|
||||
def test_010_reject_others(self):
|
||||
self.assertRaises(ValueError, lambda: TestOption('invalid'))
|
||||
|
||||
|
||||
class TC_01_Action(qubes.tests.QubesTestCase):
|
||||
def test_000_allowed_values(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Action('accept')
|
||||
self.assertEqual(
|
||||
set(instance.allowed_values), {'accept', 'drop'})
|
||||
|
||||
def test_001_rule(self):
|
||||
instance = qubes.firewall.Action('accept')
|
||||
self.assertEqual(instance.rule, 'action=accept')
|
||||
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
class TC_02_Proto(qubes.tests.QubesTestCase):
|
||||
def test_000_allowed_values(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Proto('tcp')
|
||||
self.assertEqual(
|
||||
set(instance.allowed_values), {'tcp', 'udp', 'icmp'})
|
||||
|
||||
def test_001_rule(self):
|
||||
instance = qubes.firewall.Proto('tcp')
|
||||
self.assertEqual(instance.rule, 'proto=tcp')
|
||||
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
class TC_02_DstHost(qubes.tests.QubesTestCase):
|
||||
def test_000_hostname(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('qubes-os.org')
|
||||
self.assertEqual(instance.type, 'dsthost')
|
||||
|
||||
def test_001_ipv4(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('127.0.0.1')
|
||||
self.assertEqual(instance.type, 'dst4')
|
||||
self.assertEqual(instance.prefixlen, 32)
|
||||
self.assertEqual(str(instance), '127.0.0.1/32')
|
||||
self.assertEqual(instance.rule, 'dst4=127.0.0.1/32')
|
||||
|
||||
def test_002_ipv4_prefixlen(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('127.0.0.0', 8)
|
||||
self.assertEqual(instance.type, 'dst4')
|
||||
self.assertEqual(instance.prefixlen, 8)
|
||||
self.assertEqual(str(instance), '127.0.0.0/8')
|
||||
self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
|
||||
|
||||
def test_003_ipv4_parse_prefixlen(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('127.0.0.0/8')
|
||||
self.assertEqual(instance.type, 'dst4')
|
||||
self.assertEqual(instance.prefixlen, 8)
|
||||
self.assertEqual(str(instance), '127.0.0.0/8')
|
||||
self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
|
||||
|
||||
def test_004_ipv4_invalid_prefix(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('127.0.0.0/33')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('127.0.0.0', 33)
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('127.0.0.0/-1')
|
||||
|
||||
def test_005_ipv4_reject_shortened(self):
|
||||
# not strictly required, but ppl are used to it
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('127/8')
|
||||
|
||||
def test_006_ipv4_invalid_addr(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('137.327.0.0/16')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('1.2.3.4.5/32')
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_007_ipv4_invalid_network(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('127.0.0.1/32')
|
||||
|
||||
def test_010_ipv6(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('2001:abcd:efab::3')
|
||||
self.assertEqual(instance.type, 'dst6')
|
||||
self.assertEqual(instance.prefixlen, 128)
|
||||
self.assertEqual(str(instance), '2001:abcd:efab::3/128')
|
||||
self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128')
|
||||
|
||||
def test_011_ipv6_prefixlen(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('2001:abcd:efab::', 64)
|
||||
self.assertEqual(instance.type, 'dst6')
|
||||
self.assertEqual(instance.prefixlen, 64)
|
||||
self.assertEqual(str(instance), '2001:abcd:efab::/64')
|
||||
self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
|
||||
|
||||
def test_012_ipv6_parse_prefixlen(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstHost('2001:abcd:efab::/64')
|
||||
self.assertEqual(instance.type, 'dst6')
|
||||
self.assertEqual(instance.prefixlen, 64)
|
||||
self.assertEqual(str(instance), '2001:abcd:efab::/64')
|
||||
self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
|
||||
|
||||
def test_013_ipv6_invalid_prefix(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab::3/129')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab::3', 129)
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab::3/-1')
|
||||
|
||||
def test_014_ipv6_invalid_addr(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab0123::3/128')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab:3/128')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efgh::3/128')
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_015_ipv6_invalid_network(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('2001:abcd:efab::3/64')
|
||||
|
||||
@unittest.expectedFailure
|
||||
def test_020_invalid_hostname(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('www qubes-os.org')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstHost('https://qubes-os.org')
|
||||
|
||||
class TC_03_DstPorts(qubes.tests.QubesTestCase):
|
||||
def test_000_single_str(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstPorts('80')
|
||||
self.assertEqual(str(instance), '80')
|
||||
self.assertEqual(instance.range, [80, 80])
|
||||
self.assertEqual(instance.rule, 'dstports=80-80')
|
||||
|
||||
def test_001_single_int(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstPorts(80)
|
||||
self.assertEqual(str(instance), '80')
|
||||
self.assertEqual(instance.range, [80, 80])
|
||||
self.assertEqual(instance.rule, 'dstports=80-80')
|
||||
|
||||
def test_002_range(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.DstPorts('80-90')
|
||||
self.assertEqual(str(instance), '80-90')
|
||||
self.assertEqual(instance.range, [80, 90])
|
||||
self.assertEqual(instance.rule, 'dstports=80-90')
|
||||
|
||||
def test_003_invalid(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('80-90-100')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('abcdef')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('80 90')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('')
|
||||
|
||||
def test_004_reversed_range(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('100-20')
|
||||
|
||||
def test_005_out_of_range(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('1000000000000')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts(1000000000000)
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.DstPorts('1-1000000000000')
|
||||
|
||||
|
||||
class TC_04_IcmpType(qubes.tests.QubesTestCase):
|
||||
def test_000_number(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.IcmpType(8)
|
||||
self.assertEqual(str(instance), '8')
|
||||
self.assertEqual(instance.rule, 'icmptype=8')
|
||||
|
||||
def test_001_str(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.IcmpType('8')
|
||||
self.assertEqual(str(instance), '8')
|
||||
self.assertEqual(instance.rule, 'icmptype=8')
|
||||
|
||||
def test_002_invalid(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.IcmpType(600)
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.IcmpType(-1)
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.IcmpType('abcde')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.IcmpType('')
|
||||
|
||||
|
||||
class TC_05_SpecialTarget(qubes.tests.QubesTestCase):
|
||||
def test_000_allowed_values(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.SpecialTarget('dns')
|
||||
self.assertEqual(
|
||||
set(instance.allowed_values), {'dns'})
|
||||
|
||||
def test_001_rule(self):
|
||||
instance = qubes.firewall.SpecialTarget('dns')
|
||||
self.assertEqual(instance.rule, 'specialtarget=dns')
|
||||
|
||||
|
||||
class TC_06_Expire(qubes.tests.QubesTestCase):
|
||||
def test_000_number(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Expire(1463292452)
|
||||
self.assertEqual(str(instance), '1463292452')
|
||||
self.assertEqual(instance.datetime,
|
||||
datetime.datetime(2016, 5, 15, 6, 7, 32))
|
||||
self.assertIsNone(instance.rule)
|
||||
|
||||
def test_001_str(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Expire('1463292452')
|
||||
self.assertEqual(str(instance), '1463292452')
|
||||
self.assertEqual(instance.datetime,
|
||||
datetime.datetime(2016, 5, 15, 6, 7, 32))
|
||||
self.assertIsNone(instance.rule)
|
||||
|
||||
def test_002_invalid(self):
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.Expire('abcdef')
|
||||
with self.assertRaises(ValueError):
|
||||
qubes.firewall.Expire('')
|
||||
|
||||
def test_003_expired(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Expire('1463292452')
|
||||
self.assertTrue(instance.expired)
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Expire('1583292452')
|
||||
self.assertFalse(instance.expired)
|
||||
|
||||
|
||||
class TC_07_Comment(qubes.tests.QubesTestCase):
|
||||
def test_000_str(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
instance = qubes.firewall.Comment('Some comment')
|
||||
self.assertEqual(str(instance), 'Some comment')
|
||||
self.assertIsNone(instance.rule)
|
||||
|
||||
|
||||
class TC_08_Rule(qubes.tests.QubesTestCase):
|
||||
def test_000_simple(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='icmp')
|
||||
self.assertEqual(rule.rule, 'action=accept proto=icmp')
|
||||
self.assertIsNone(rule.dsthost)
|
||||
self.assertIsNone(rule.dstports)
|
||||
self.assertIsNone(rule.icmptype)
|
||||
self.assertIsNone(rule.comment)
|
||||
self.assertIsNone(rule.expire)
|
||||
self.assertEqual(str(rule.action), 'accept')
|
||||
self.assertEqual(str(rule.proto), 'icmp')
|
||||
|
||||
def test_001_expire(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
|
||||
expire='1463292452')
|
||||
self.assertIsNone(rule.rule)
|
||||
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
|
||||
expire='1663292452')
|
||||
self.assertIsNotNone(rule.rule)
|
||||
|
||||
|
||||
def test_002_dstports(self):
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
|
||||
dstports=80)
|
||||
self.assertEqual(str(rule.dstports), '80')
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='udp',
|
||||
dstports=80)
|
||||
self.assertEqual(str(rule.dstports), '80')
|
||||
|
||||
def test_003_reject_invalid(self):
|
||||
with self.assertRaises((ValueError, AssertionError)):
|
||||
# missing action
|
||||
qubes.firewall.Rule(None, proto='icmp')
|
||||
with self.assertRaises(ValueError):
|
||||
# not proto=tcp or proto=udp for dstports
|
||||
qubes.firewall.Rule(None, action='accept', proto='icmp',
|
||||
dstports=80)
|
||||
with self.assertRaises(ValueError):
|
||||
# not proto=tcp or proto=udp for dstports
|
||||
qubes.firewall.Rule(None, action='accept', dstports=80)
|
||||
with self.assertRaises(ValueError):
|
||||
# not proto=icmp for icmptype
|
||||
qubes.firewall.Rule(None, action='accept', proto='tcp',
|
||||
icmptype=8)
|
||||
with self.assertRaises(ValueError):
|
||||
# not proto=icmp for icmptype
|
||||
qubes.firewall.Rule(None, action='accept', icmptype=8)
|
||||
|
||||
def test_004_proto_change(self):
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='tcp')
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule.proto = 'udp'
|
||||
self.assertEqual(rule.rule, 'action=accept proto=udp')
|
||||
rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
|
||||
dstports=80)
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule.proto = 'udp'
|
||||
self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
|
||||
rule = qubes.firewall.Rule(None, action='accept')
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule.proto = 'udp'
|
||||
self.assertEqual(rule.rule, 'action=accept proto=udp')
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule.dstports = 80
|
||||
self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule.proto = 'icmp'
|
||||
self.assertEqual(rule.rule, 'action=accept proto=icmp')
|
||||
self.assertIsNone(rule.dstports)
|
||||
rule.icmptype = 8
|
||||
self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8')
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule.proto = qubes.property.DEFAULT
|
||||
self.assertEqual(rule.rule, 'action=accept')
|
||||
self.assertIsNone(rule.dstports)
|
||||
|
||||
def test_005_from_xml_v1(self):
|
||||
xml_txt = \
|
||||
'<rule address="192.168.0.0" proto="tcp" netmask="24" port="443"/>'
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule.from_xml_v1(
|
||||
lxml.etree.fromstring(xml_txt), 'accept')
|
||||
self.assertEqual(rule.dsthost, '192.168.0.0/24')
|
||||
self.assertEqual(rule.proto, 'tcp')
|
||||
self.assertEqual(rule.dstports, '443')
|
||||
self.assertIsNone(rule.expire)
|
||||
self.assertIsNone(rule.comment)
|
||||
|
||||
def test_006_from_xml_v1(self):
|
||||
xml_txt = \
|
||||
'<rule address="qubes-os.org" proto="tcp" ' \
|
||||
'port="443" toport="1024"/>'
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule.from_xml_v1(
|
||||
lxml.etree.fromstring(xml_txt), 'drop')
|
||||
self.assertEqual(rule.dsthost, 'qubes-os.org')
|
||||
self.assertEqual(rule.proto, 'tcp')
|
||||
self.assertEqual(rule.dstports, '443-1024')
|
||||
self.assertEqual(rule.action, 'drop')
|
||||
self.assertIsNone(rule.expire)
|
||||
self.assertIsNone(rule.comment)
|
||||
|
||||
def test_007_from_xml_v1(self):
|
||||
xml_txt = \
|
||||
'<rule address="192.168.0.0" netmask="24" expire="1463292452"/>'
|
||||
with self.assertNotRaises(ValueError):
|
||||
rule = qubes.firewall.Rule.from_xml_v1(
|
||||
lxml.etree.fromstring(xml_txt), 'accept')
|
||||
self.assertEqual(rule.dsthost, '192.168.0.0/24')
|
||||
self.assertEqual(rule.expire, '1463292452')
|
||||
self.assertEqual(rule.action, 'accept')
|
||||
self.assertIsNone(rule.proto)
|
||||
self.assertIsNone(rule.dstports)
|
||||
|
||||
|
||||
class TC_10_Firewall(qubes.tests.QubesTestCase):
|
||||
def setUp(self):
|
||||
super(TC_10_Firewall, self).setUp()
|
||||
self.vm = TestVM()
|
||||
firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
|
||||
if os.path.exists(firewall_path):
|
||||
os.unlink(firewall_path)
|
||||
|
||||
def tearDown(self):
|
||||
firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
|
||||
if os.path.exists(firewall_path):
|
||||
os.unlink(firewall_path)
|
||||
return super(TC_10_Firewall, self).tearDown()
|
||||
|
||||
def test_000_defaults(self):
|
||||
fw = qubes.firewall.Firewall(self.vm, False)
|
||||
fw.load_defaults()
|
||||
self.assertEqual(fw.policy, 'accept')
|
||||
self.assertEqual(fw.rules, [])
|
||||
|
||||
def test_001_save_load_empty(self):
|
||||
fw = qubes.firewall.Firewall(self.vm, True)
|
||||
self.assertEqual(fw.policy, 'accept')
|
||||
self.assertEqual(fw.rules, [])
|
||||
fw.save()
|
||||
fw.load()
|
||||
self.assertEqual(fw.policy, 'accept')
|
||||
self.assertEqual(fw.rules, [])
|
||||
|
||||
def test_002_save_load_rules(self):
|
||||
fw = qubes.firewall.Firewall(self.vm, True)
|
||||
rules = [
|
||||
qubes.firewall.Rule(None, action='drop', proto='icmp'),
|
||||
qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
|
||||
qubes.firewall.Rule(None, action='accept', proto='udp',
|
||||
dstports=67),
|
||||
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
||||
]
|
||||
fw.rules.extend(rules)
|
||||
fw.policy = qubes.firewall.Action.drop
|
||||
fw.save()
|
||||
self.assertTrue(os.path.exists(os.path.join(
|
||||
self.vm.dir_path, self.vm.firewall_conf)))
|
||||
fw = qubes.firewall.Firewall(TestVM(), True)
|
||||
self.assertEqual(fw.policy, qubes.firewall.Action.drop)
|
||||
self.assertEqual(fw.rules, rules)
|
||||
|
||||
def test_003_load_v1(self):
|
||||
xml_txt = """<QubesFirewallRules dns="allow" icmp="allow"
|
||||
policy="deny" yumProxy="allow">
|
||||
<rule address="192.168.0.0" proto="tcp" netmask="24" port="80"/>
|
||||
<rule address="qubes-os.org" proto="tcp" port="443"/>
|
||||
</QubesFirewallRules>
|
||||
"""
|
||||
with open(os.path.join('/tmp', self.vm.firewall_conf), 'w') as f:
|
||||
f.write(xml_txt)
|
||||
with self.assertNotRaises(ValueError):
|
||||
fw = qubes.firewall.Firewall(self.vm)
|
||||
self.assertEqual(str(fw.policy), 'drop')
|
||||
rules = [
|
||||
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
||||
qubes.firewall.Rule(None, action='accept', proto='icmp'),
|
||||
qubes.firewall.Rule(None, action='accept', proto='tcp',
|
||||
dsthost='192.168.0.0/24', dstports='80'),
|
||||
qubes.firewall.Rule(None, action='accept', proto='tcp',
|
||||
dsthost='qubes-os.org', dstports='443')
|
||||
]
|
||||
self.assertEqual(fw.rules, rules)
|
||||
|
||||
def test_004_save_skip_expired(self):
|
||||
fw = qubes.firewall.Firewall(self.vm, True)
|
||||
rules = [
|
||||
qubes.firewall.Rule(None, action='drop', proto='icmp'),
|
||||
qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
|
||||
qubes.firewall.Rule(None, action='accept', proto='udp',
|
||||
dstports=67, expire=1373300257),
|
||||
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
|
||||
]
|
||||
fw.rules.extend(rules)
|
||||
fw.policy = qubes.firewall.Action.drop
|
||||
fw.save()
|
||||
rules.pop(2)
|
||||
fw = qubes.firewall.Firewall(self.vm, True)
|
||||
self.assertEqual(fw.rules, rules)
|
||||
|
@ -216,6 +216,7 @@ fi
|
||||
%{python_sitelib}/qubes/devices.py*
|
||||
%{python_sitelib}/qubes/dochelpers.py*
|
||||
%{python_sitelib}/qubes/events.py*
|
||||
%{python_sitelib}/qubes/firewall.py*
|
||||
%{python_sitelib}/qubes/exc.py*
|
||||
%{python_sitelib}/qubes/log.py*
|
||||
%{python_sitelib}/qubes/rngdoc.py*
|
||||
@ -281,6 +282,7 @@ fi
|
||||
%{python_sitelib}/qubes/tests/app.py*
|
||||
%{python_sitelib}/qubes/tests/devices.py*
|
||||
%{python_sitelib}/qubes/tests/events.py*
|
||||
%{python_sitelib}/qubes/tests/firewall.py*
|
||||
%{python_sitelib}/qubes/tests/init.py*
|
||||
%{python_sitelib}/qubes/tests/storage.py*
|
||||
%{python_sitelib}/qubes/tests/storage_file.py*
|
||||
|
Loading…
Reference in New Issue
Block a user