core-admin/qubes/firewall.py
Marek Marczykowski-Górecki 842efb577d
firewall: always use policy 'drop'
There is a problem with having separate default action ("policy") and
rules because it isn't possible to set both of them atomically at the
same time.
To solve this problem, always have policy 'drop' (as a safe default),
but by default have a single rule with action 'accept'

Fixes QubesOS/qubes-issues#2869
2017-06-26 13:07:19 +02:00

518 lines
16 KiB
Python

# pylint: disable=too-few-public-methods
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2016
# Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import datetime
import subprocess
import itertools
import os
import socket
import lxml.etree
import qubes
import qubes.vm.qubesvm
class RuleOption(object):
def __init__(self, value):
self._value = str(value)
@property
def rule(self):
raise NotImplementedError
def __str__(self):
return self._value
def __eq__(self, other):
return str(self) == other
# noinspection PyAbstractClass
class RuleChoice(RuleOption):
# pylint: disable=abstract-method
def __init__(self, value):
super(RuleChoice, self).__init__(value)
self.allowed_values = \
[v for k, v in self.__class__.__dict__.items()
if not k.startswith('__') and isinstance(v, str) and
not v.startswith('__')]
if value not in self.allowed_values:
raise ValueError(value)
class Action(RuleChoice):
accept = 'accept'
drop = 'drop'
@property
def rule(self):
return 'action=' + str(self)
class Proto(RuleChoice):
tcp = 'tcp'
udp = 'udp'
icmp = 'icmp'
@property
def rule(self):
return 'proto=' + str(self)
class DstHost(RuleOption):
'''Represent host/network address: either IPv4, IPv6, or DNS name'''
def __init__(self, value, prefixlen=None):
# TODO: in python >= 3.3 ipaddress module could be used
if value.count('/') > 1:
raise ValueError('Too many /: ' + value)
elif not value.count('/'):
# add prefix length to bare IP addresses
try:
socket.inet_pton(socket.AF_INET6, value)
self.prefixlen = prefixlen or 128
if self.prefixlen < 0 or self.prefixlen > 128:
raise ValueError(
'netmask for IPv6 must be between 0 and 128')
value += '/' + str(self.prefixlen)
self.type = 'dst6'
except socket.error:
try:
socket.inet_pton(socket.AF_INET, value)
if value.count('.') != 3:
raise ValueError(
'Invalid number of dots in IPv4 address')
self.prefixlen = prefixlen or 32
if self.prefixlen < 0 or self.prefixlen > 32:
raise ValueError(
'netmask for IPv4 must be between 0 and 32')
value += '/' + str(self.prefixlen)
self.type = 'dst4'
except socket.error:
self.type = 'dsthost'
self.prefixlen = 0
else:
host, prefixlen = value.split('/', 1)
prefixlen = int(prefixlen)
if prefixlen < 0:
raise ValueError('netmask must be non-negative')
self.prefixlen = prefixlen
try:
socket.inet_pton(socket.AF_INET6, host)
if prefixlen > 128:
raise ValueError('netmask for IPv6 must be <= 128')
self.type = 'dst6'
except socket.error:
try:
socket.inet_pton(socket.AF_INET, host)
if prefixlen > 32:
raise ValueError('netmask for IPv4 must be <= 32')
self.type = 'dst4'
if host.count('.') != 3:
raise ValueError(
'Invalid number of dots in IPv4 address')
except socket.error:
raise ValueError('Invalid IP address: ' + host)
super(DstHost, self).__init__(value)
@property
def rule(self):
return self.type + '=' + str(self)
class DstPorts(RuleOption):
def __init__(self, value):
if isinstance(value, int):
value = str(value)
if value.count('-') == 1:
self.range = [int(x) for x in value.split('-', 1)]
elif not value.count('-'):
self.range = [int(value), int(value)]
else:
raise ValueError(value)
if any(port < 0 or port > 65536 for port in self.range):
raise ValueError('Ports out of range')
if self.range[0] > self.range[1]:
raise ValueError('Invalid port range')
super(DstPorts, self).__init__(
str(self.range[0]) if self.range[0] == self.range[1]
else '-'.join(map(str, self.range)))
@property
def rule(self):
return 'dstports=' + '{!s}-{!s}'.format(*self.range)
class IcmpType(RuleOption):
def __init__(self, value):
super(IcmpType, self).__init__(value)
value = int(value)
if value < 0 or value > 255:
raise ValueError('ICMP type out of range')
@property
def rule(self):
return 'icmptype=' + str(self)
class SpecialTarget(RuleChoice):
dns = 'dns'
@property
def rule(self):
return 'specialtarget=' + str(self)
class Expire(RuleOption):
def __init__(self, value):
super(Expire, self).__init__(value)
self.datetime = datetime.datetime.utcfromtimestamp(int(value))
@property
def rule(self):
return None
@property
def expired(self):
return self.datetime < datetime.datetime.utcnow()
class Comment(RuleOption):
@property
def rule(self):
return None
class Rule(qubes.PropertyHolder):
def __init__(self, xml=None, **kwargs):
'''Single firewall rule
:param xml: XML element describing rule, or None
:param kwargs: rule elements
'''
super(Rule, self).__init__(xml, **kwargs)
self.load_properties()
self.events_enabled = True
# validate dependencies
if self.dstports:
self.on_set_dstports('property-set:dstports', 'dstports',
self.dstports, None)
if self.icmptype:
self.on_set_icmptype('property-set:icmptype', 'icmptype',
self.icmptype, None)
self.property_require('action', False, True)
action = qubes.property('action',
type=Action,
order=0,
doc='rule action')
proto = qubes.property('proto',
type=Proto,
default=None,
order=1,
doc='protocol to match')
dsthost = qubes.property('dsthost',
type=DstHost,
default=None,
order=1,
doc='destination host/network')
dstports = qubes.property('dstports',
type=DstPorts,
default=None,
order=2,
doc='Destination port(s) (for \'tcp\' and \'udp\' protocol only)')
icmptype = qubes.property('icmptype',
type=IcmpType,
default=None,
order=2,
doc='ICMP packet type (for \'icmp\' protocol only)')
specialtarget = qubes.property('specialtarget',
type=SpecialTarget,
default=None,
order=1,
doc='Special target, for now only \'dns\' supported')
expire = qubes.property('expire',
type=Expire,
default=None,
doc='Timestamp (UNIX epoch) on which this rule expire')
comment = qubes.property('comment',
type=Comment,
default=None,
doc='User comment')
# noinspection PyUnusedLocal
@qubes.events.handler('property-pre-set:dstports')
def on_set_dstports(self, event, name, newvalue, oldvalue=None):
# pylint: disable=unused-argument
if self.proto not in ('tcp', 'udp'):
raise ValueError(
'dstports valid only for \'tcp\' and \'udp\' protocols')
# noinspection PyUnusedLocal
@qubes.events.handler('property-pre-set:icmptype')
def on_set_icmptype(self, event, name, newvalue, oldvalue=None):
# pylint: disable=unused-argument
if self.proto not in ('icmp',):
raise ValueError('icmptype valid only for \'icmp\' protocol')
# noinspection PyUnusedLocal
@qubes.events.handler('property-set:proto')
def on_set_proto(self, event, name, newvalue, oldvalue=None):
# pylint: disable=unused-argument
if newvalue not in ('tcp', 'udp'):
self.dstports = qubes.property.DEFAULT
if newvalue not in ('icmp',):
self.icmptype = qubes.property.DEFAULT
@qubes.events.handler('property-del:proto')
def on_del_proto(self, event, name, oldvalue):
# pylint: disable=unused-argument
self.dstports = qubes.property.DEFAULT
self.icmptype = qubes.property.DEFAULT
@property
def rule(self):
if self.expire and self.expire.expired:
return None
values = []
for prop in self.property_list():
value = getattr(self, prop.__name__)
if value is None:
continue
if value.rule is None:
continue
values.append(value.rule)
return ' '.join(values)
@classmethod
def from_xml_v1(cls, node, action):
netmask = node.get('netmask')
if netmask is None:
netmask = 32
else:
netmask = int(netmask)
address = node.get('address')
if address:
dsthost = DstHost(address, netmask)
else:
dsthost = None
proto = node.get('proto')
port = node.get('port')
toport = node.get('toport')
if port and toport:
dstports = port + '-' + toport
elif port:
dstports = port
else:
dstports = None
# backward compatibility: protocol defaults to TCP if port is specified
if dstports and not proto:
proto = 'tcp'
if proto == 'any':
proto = None
expire = node.get('expire')
kwargs = {
'action': action,
}
if dsthost:
kwargs['dsthost'] = dsthost
if dstports:
kwargs['dstports'] = dstports
if proto:
kwargs['proto'] = proto
if expire:
kwargs['expire'] = expire
return cls(**kwargs)
def __eq__(self, other):
return self.rule == other.rule
class Firewall(object):
def __init__(self, vm, load=True):
assert hasattr(vm, 'firewall_conf')
self.vm = vm
#: firewall rules
self.rules = []
if load:
self.load()
@property
def policy(self):
''' Default action - always 'drop' '''
return Action('drop')
def __eq__(self, other):
if isinstance(other, Firewall):
return self.rules == other.rules
return NotImplemented
def load_defaults(self):
'''Load default firewall settings'''
self.rules = [Rule(None, action='accept')]
def clone(self, other):
'''Clone firewall settings from other instance.
This method discards pre-existing firewall settings.
:param other: other :py:class:`Firewall` instance
'''
rules = []
for rule in other.rules:
new_rule = Rule()
new_rule.clone_properties(rule)
rules.append(new_rule)
self.rules = rules
def load(self):
'''Load firewall settings from a file'''
firewall_conf = os.path.join(self.vm.dir_path, self.vm.firewall_conf)
if os.path.exists(firewall_conf):
self.rules = []
tree = lxml.etree.parse(firewall_conf)
root = tree.getroot()
version = root.get('version', '1')
if version == '1':
self.load_v1(root)
elif version == '2':
self.load_v2(root)
else:
raise qubes.exc.QubesVMError(self.vm,
'Unsupported firewall.xml version: {}'.format(version))
else:
self.load_defaults()
def load_v1(self, xml_root):
'''Load old (Qubes < 4.0) firewall XML format'''
policy_v1 = xml_root.get('policy')
assert policy_v1 in ('allow', 'deny')
if policy_v1 == 'allow':
policy = Action('accept')
else:
policy = Action('drop')
def _translate_action(key):
if xml_root.get(key, policy_v1) == 'allow':
return Action.accept
return Action.drop
self.rules.append(Rule(None,
action=_translate_action('dns'),
specialtarget=SpecialTarget('dns')))
self.rules.append(Rule(None,
action=_translate_action('icmp'),
proto=Proto.icmp))
if policy == Action.accept:
rule_action = Action.drop
else:
rule_action = Action.accept
for element in xml_root:
rule = Rule.from_xml_v1(element, rule_action)
self.rules.append(rule)
if policy == Action.accept:
self.rules.append(Rule(None, action='accept'))
def load_v2(self, xml_root):
'''Load new (Qubes >= 4.0) firewall XML format'''
xml_rules = xml_root.find('rules')
for xml_rule in xml_rules:
rule = Rule(xml_rule)
self.rules.append(rule)
def save(self):
'''Save firewall rules to a file'''
firewall_conf = os.path.join(self.vm.dir_path, self.vm.firewall_conf)
expiring_rules_present = False
xml_root = lxml.etree.Element('firewall', version=str(2))
xml_rules = lxml.etree.Element('rules')
for rule in self.rules:
if rule.expire:
if rule.expire and rule.expire.expired:
continue
else:
expiring_rules_present = True
xml_rule = lxml.etree.Element('rule')
xml_rule.append(rule.xml_properties())
xml_rules.append(xml_rule)
xml_root.append(xml_rules)
xml_tree = lxml.etree.ElementTree(xml_root)
try:
old_umask = os.umask(0o002)
with open(firewall_conf, 'wb') as firewall_xml:
xml_tree.write(firewall_xml, encoding="UTF-8",
pretty_print=True)
os.umask(old_umask)
except EnvironmentError as err:
self.vm.log.error("save error: {}".format(err))
raise qubes.exc.QubesException('save error: {}'.format(err))
self.vm.fire_event('firewall-changed')
if expiring_rules_present and not self.vm.app.vmm.offline_mode:
subprocess.call(["sudo", "systemctl", "start",
"qubes-reload-firewall@%s.timer" % self.vm.name])
def qdb_entries(self, addr_family=None):
'''Return firewall settings serialized for QubesDB entries
:param addr_family: include rules only for IPv4 (4) or IPv6 (6); if
None, include both
'''
entries = {
'policy': str(self.policy)
}
exclude_dsttype = None
if addr_family is not None:
exclude_dsttype = 'dst4' if addr_family == 6 else 'dst6'
for ruleno, rule in zip(itertools.count(), self.rules):
# exclude rules for another address family
if rule.dsthost and rule.dsthost.type == exclude_dsttype:
continue
entries['{:04}'.format(ruleno)] = rule.rule
return entries