449 lines
14 KiB
Python
449 lines
14 KiB
Python
# -*- encoding: utf8 -*-
|
|
# pylint: disable=too-few-public-methods
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2017 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation; either version 2.1 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License along
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
'''Firewall configuration interface'''
|
|
|
|
import datetime
|
|
import socket
|
|
|
|
class RuleOption(object):
|
|
'''Base class for a single rule element'''
|
|
def __init__(self, value):
|
|
self._value = str(value)
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
raise NotImplementedError
|
|
|
|
def __str__(self):
|
|
return self._value
|
|
|
|
def __eq__(self, other):
|
|
return str(self) == other
|
|
|
|
|
|
# noinspection PyAbstractClass
|
|
class RuleChoice(RuleOption):
|
|
'''Base class for multiple-choices rule elements'''
|
|
# pylint: disable=abstract-method
|
|
def __init__(self, value):
|
|
super(RuleChoice, self).__init__(value)
|
|
self.allowed_values = \
|
|
[v for k, v in self.__class__.__dict__.items()
|
|
if not k.startswith('__') and isinstance(v, str) and
|
|
not v.startswith('__')]
|
|
if value not in self.allowed_values:
|
|
raise ValueError(value)
|
|
|
|
|
|
class Action(RuleChoice):
|
|
'''Rule action'''
|
|
accept = 'accept'
|
|
drop = 'drop'
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'action=' + str(self)
|
|
|
|
|
|
class Proto(RuleChoice):
|
|
'''Protocol name'''
|
|
tcp = 'tcp'
|
|
udp = 'udp'
|
|
icmp = 'icmp'
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'proto=' + str(self)
|
|
|
|
|
|
class DstHost(RuleOption):
|
|
'''Represent host/network address: either IPv4, IPv6, or DNS name'''
|
|
def __init__(self, value, prefixlen=None):
|
|
# TODO: in python >= 3.3 ipaddress module could be used
|
|
if value.count('/') > 1:
|
|
raise ValueError('Too many /: ' + value)
|
|
elif not value.count('/'):
|
|
# add prefix length to bare IP addresses
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, value)
|
|
self.prefixlen = prefixlen or 128
|
|
if self.prefixlen < 0 or self.prefixlen > 128:
|
|
raise ValueError(
|
|
'netmask for IPv6 must be between 0 and 128')
|
|
value += '/' + str(self.prefixlen)
|
|
self.type = 'dst6'
|
|
except socket.error:
|
|
try:
|
|
socket.inet_pton(socket.AF_INET, value)
|
|
if value.count('.') != 3:
|
|
raise ValueError(
|
|
'Invalid number of dots in IPv4 address')
|
|
self.prefixlen = prefixlen or 32
|
|
if self.prefixlen < 0 or self.prefixlen > 32:
|
|
raise ValueError(
|
|
'netmask for IPv4 must be between 0 and 32')
|
|
value += '/' + str(self.prefixlen)
|
|
self.type = 'dst4'
|
|
except socket.error:
|
|
self.type = 'dsthost'
|
|
self.prefixlen = 0
|
|
else:
|
|
host, prefixlen = value.split('/', 1)
|
|
prefixlen = int(prefixlen)
|
|
if prefixlen < 0:
|
|
raise ValueError('netmask must be non-negative')
|
|
self.prefixlen = prefixlen
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, host)
|
|
if prefixlen > 128:
|
|
raise ValueError('netmask for IPv6 must be <= 128')
|
|
self.type = 'dst6'
|
|
except socket.error:
|
|
try:
|
|
socket.inet_pton(socket.AF_INET, host)
|
|
if prefixlen > 32:
|
|
raise ValueError('netmask for IPv4 must be <= 32')
|
|
self.type = 'dst4'
|
|
if host.count('.') != 3:
|
|
raise ValueError(
|
|
'Invalid number of dots in IPv4 address')
|
|
except socket.error:
|
|
raise ValueError('Invalid IP address: ' + host)
|
|
|
|
super(DstHost, self).__init__(value)
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return self.type + '=' + str(self)
|
|
|
|
|
|
class DstPorts(RuleOption):
|
|
'''Destination port(s), for TCP/UDP only'''
|
|
def __init__(self, value):
|
|
if isinstance(value, int):
|
|
value = str(value)
|
|
if value.count('-') == 1:
|
|
self.range = [int(x) for x in value.split('-', 1)]
|
|
elif not value.count('-'):
|
|
self.range = [int(value), int(value)]
|
|
else:
|
|
raise ValueError(value)
|
|
if any(port < 0 or port > 65536 for port in self.range):
|
|
raise ValueError('Ports out of range')
|
|
if self.range[0] > self.range[1]:
|
|
raise ValueError('Invalid port range')
|
|
super(DstPorts, self).__init__(
|
|
str(self.range[0]) if self.range[0] == self.range[1]
|
|
else '{!s}-{!s}'.format(*self.range))
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'dstports=' + '{!s}-{!s}'.format(*self.range)
|
|
|
|
|
|
class IcmpType(RuleOption):
|
|
'''ICMP packet type'''
|
|
def __init__(self, value):
|
|
super(IcmpType, self).__init__(value)
|
|
value = int(value)
|
|
if value < 0 or value > 255:
|
|
raise ValueError('ICMP type out of range')
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'icmptype=' + str(self)
|
|
|
|
|
|
class SpecialTarget(RuleChoice):
|
|
'''Special destination'''
|
|
dns = 'dns'
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'specialtarget=' + str(self)
|
|
|
|
|
|
class Expire(RuleOption):
|
|
'''Rule expire time'''
|
|
def __init__(self, value):
|
|
super(Expire, self).__init__(value)
|
|
self.datetime = datetime.datetime.utcfromtimestamp(int(value))
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'expire=' + str(self)
|
|
|
|
@property
|
|
def expired(self):
|
|
'''Have this rule expired already?'''
|
|
return self.datetime < datetime.datetime.utcnow()
|
|
|
|
|
|
class Comment(RuleOption):
|
|
'''User comment'''
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule element'''
|
|
return 'comment=' + str(self)
|
|
|
|
|
|
class Rule(object):
|
|
'''A single firewall rule'''
|
|
|
|
def __init__(self, rule, **kwargs):
|
|
'''Single firewall rule
|
|
|
|
:param xml: XML element describing rule, or None
|
|
:param kwargs: rule elements
|
|
'''
|
|
self._action = None
|
|
self._proto = None
|
|
self._dsthost = None
|
|
self._dstports = None
|
|
self._icmptype = None
|
|
self._specialtarget = None
|
|
self._expire = None
|
|
self._comment = None
|
|
|
|
rule_dict = {}
|
|
if rule is not None:
|
|
rule_opts, _, comment = rule.partition('comment=')
|
|
|
|
rule_dict = dict(rule_opt.split('=', 1) for rule_opt in
|
|
rule_opts.split(' ') if rule_opt)
|
|
if comment:
|
|
rule_dict['comment'] = comment
|
|
rule_dict.update(kwargs)
|
|
|
|
rule_elements = ('action', 'proto', 'dsthost', 'dst4', 'dst6',
|
|
'specialtarget', 'dstports', 'icmptype', 'expire', 'comment')
|
|
for rule_opt in rule_elements:
|
|
value = rule_dict.pop(rule_opt, None)
|
|
if value is None:
|
|
continue
|
|
if rule_opt in ('dst4', 'dst6'):
|
|
rule_opt = 'dsthost'
|
|
setattr(self, rule_opt, value)
|
|
|
|
if rule_dict:
|
|
raise ValueError('Unknown rule elements: {!r}'.format(
|
|
rule_dict))
|
|
|
|
if self.action is None:
|
|
raise ValueError('missing action=')
|
|
|
|
@property
|
|
def action(self):
|
|
'''rule action'''
|
|
return self._action
|
|
|
|
@action.setter
|
|
def action(self, value):
|
|
if not isinstance(value, Action):
|
|
value = Action(value)
|
|
self._action = value
|
|
|
|
@property
|
|
def proto(self):
|
|
'''protocol to match'''
|
|
return self._proto
|
|
|
|
@proto.setter
|
|
def proto(self, value):
|
|
if value is not None and not isinstance(value, Proto):
|
|
value = Proto(value)
|
|
if value not in ('tcp', 'udp'):
|
|
self.dstports = None
|
|
if value not in ('icmp',):
|
|
self.icmptype = None
|
|
self._proto = value
|
|
|
|
@property
|
|
def dsthost(self):
|
|
'''destination host/network'''
|
|
return self._dsthost
|
|
|
|
@dsthost.setter
|
|
def dsthost(self, value):
|
|
if value is not None and not isinstance(value, DstHost):
|
|
value = DstHost(value)
|
|
self._dsthost = value
|
|
|
|
@property
|
|
def dstports(self):
|
|
''''Destination port(s) (for \'tcp\' and \'udp\' protocol only)'''
|
|
return self._dstports
|
|
|
|
@dstports.setter
|
|
def dstports(self, value):
|
|
if value is not None:
|
|
if self.proto not in ('tcp', 'udp'):
|
|
raise ValueError(
|
|
'dstports valid only for \'tcp\' and \'udp\' protocols')
|
|
if not isinstance(value, DstPorts):
|
|
value = DstPorts(value)
|
|
self._dstports = value
|
|
|
|
@property
|
|
def icmptype(self):
|
|
'''ICMP packet type (for \'icmp\' protocol only)'''
|
|
return self._icmptype
|
|
|
|
@icmptype.setter
|
|
def icmptype(self, value):
|
|
if value is not None:
|
|
if self.proto not in ('icmp',):
|
|
raise ValueError('icmptype valid only for \'icmp\' protocol')
|
|
if not isinstance(value, IcmpType):
|
|
value = IcmpType(value)
|
|
self._icmptype = value
|
|
|
|
@property
|
|
def specialtarget(self):
|
|
'''Special target, for now only \'dns\' supported'''
|
|
return self._specialtarget
|
|
|
|
@specialtarget.setter
|
|
def specialtarget(self, value):
|
|
if not isinstance(value, SpecialTarget):
|
|
value = SpecialTarget(value)
|
|
self._specialtarget = value
|
|
|
|
@property
|
|
def expire(self):
|
|
'''Timestamp (UNIX epoch) on which this rule expire'''
|
|
return self._expire
|
|
|
|
@expire.setter
|
|
def expire(self, value):
|
|
if not isinstance(value, Expire):
|
|
value = Expire(value)
|
|
self._expire = value
|
|
|
|
@property
|
|
def comment(self):
|
|
'''User comment'''
|
|
return self._comment
|
|
|
|
@comment.setter
|
|
def comment(self, value):
|
|
if not isinstance(value, Comment):
|
|
value = Comment(value)
|
|
self._comment = value
|
|
|
|
@property
|
|
def rule(self):
|
|
'''API representation of this rule'''
|
|
values = []
|
|
# comment must be the last one
|
|
for prop in ('action', 'proto', 'dsthost', 'dstports', 'icmptype',
|
|
'specialtarget', 'expire', 'comment'):
|
|
value = getattr(self, prop)
|
|
if value is None:
|
|
continue
|
|
if value.rule is None:
|
|
continue
|
|
values.append(value.rule)
|
|
return ' '.join(values)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, Rule):
|
|
return self.rule == other.rule
|
|
if isinstance(other, str):
|
|
return self.rule == str
|
|
return NotImplemented
|
|
|
|
def __repr__(self):
|
|
return 'Rule(\'{}\')'.format(self.rule)
|
|
|
|
|
|
class Firewall(object):
|
|
'''Firewal manager for a VM'''
|
|
def __init__(self, vm):
|
|
self.vm = vm
|
|
self._rules = []
|
|
self._policy = None
|
|
self._loaded = False
|
|
|
|
def load_rules(self):
|
|
'''Force (re-)loading firewall rules'''
|
|
rules_str = self.vm.qubesd_call(None, 'mgmt.vm.firewall.Get')
|
|
rules = []
|
|
for rule_str in rules_str.decode().splitlines():
|
|
rules.append(Rule(rule_str))
|
|
self._rules = rules
|
|
self._loaded = True
|
|
|
|
@property
|
|
def rules(self):
|
|
'''Firewall rules
|
|
|
|
You can either copy them, edit and then assign new rules list to this
|
|
property, or edit in-place and call :py:meth:`save_rules`.
|
|
Once rules are loaded, they are cached. To reload rules,
|
|
call :py:meth:`load_rules`.
|
|
'''
|
|
if not self._loaded:
|
|
self.load_rules()
|
|
return self._rules
|
|
|
|
@rules.setter
|
|
def rules(self, value):
|
|
self.save_rules(value)
|
|
self._rules = value
|
|
|
|
def save_rules(self, rules=None):
|
|
'''Save firewall rules. Needs to be called after in-place editing
|
|
:py:attr:`rules`.
|
|
'''
|
|
if rules is None:
|
|
rules = self._rules
|
|
self.vm.qubesd_call(None, 'mgmt.vm.firewall.Set',
|
|
payload=(''.join('{}\n'.format(rule.rule)
|
|
for rule in rules)).encode('ascii'))
|
|
|
|
@property
|
|
def policy(self):
|
|
'''Default action to take if no rule matches'''
|
|
policy_str = self.vm.qubesd_call(None, 'mgmt.vm.firewall.GetPolicy')
|
|
return Action(policy_str.decode())
|
|
|
|
@policy.setter
|
|
def policy(self, value):
|
|
self.vm.qubesd_call(None, 'mgmt.vm.firewall.SetPolicy', payload=str(
|
|
value).encode('ascii'))
|
|
|
|
def reload(self):
|
|
'''Force reload the same firewall rules.
|
|
|
|
Can be used for example to force again names resolution.
|
|
'''
|
|
self.vm.qubesd_call(None, 'mgmt.vm.firewall.Reload')
|