123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- # -*- encoding: utf8 -*-
- # pylint: disable=too-few-public-methods
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2.1 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- '''Firewall configuration interface'''
- import datetime
- import socket
- class RuleOption(object):
- '''Base class for a single rule element'''
- def __init__(self, value):
- self._value = str(value)
- @property
- def rule(self):
- '''API representation of this rule element'''
- raise NotImplementedError
- def __str__(self):
- return self._value
- def __eq__(self, other):
- return str(self) == other
- # noinspection PyAbstractClass
- class RuleChoice(RuleOption):
- '''Base class for multiple-choices rule elements'''
- # pylint: disable=abstract-method
- def __init__(self, value):
- super(RuleChoice, self).__init__(value)
- self.allowed_values = \
- [v for k, v in self.__class__.__dict__.items()
- if not k.startswith('__') and isinstance(v, str) and
- not v.startswith('__')]
- if value not in self.allowed_values:
- raise ValueError(value)
- class Action(RuleChoice):
- '''Rule action'''
- accept = 'accept'
- drop = 'drop'
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'action=' + str(self)
- class Proto(RuleChoice):
- '''Protocol name'''
- tcp = 'tcp'
- udp = 'udp'
- icmp = 'icmp'
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'proto=' + str(self)
- class DstHost(RuleOption):
- '''Represent host/network address: either IPv4, IPv6, or DNS name'''
- def __init__(self, value, prefixlen=None):
- # TODO: in python >= 3.3 ipaddress module could be used
- if value.count('/') > 1:
- raise ValueError('Too many /: ' + value)
- elif not value.count('/'):
- # add prefix length to bare IP addresses
- try:
- socket.inet_pton(socket.AF_INET6, value)
- self.prefixlen = prefixlen or 128
- if self.prefixlen < 0 or self.prefixlen > 128:
- raise ValueError(
- 'netmask for IPv6 must be between 0 and 128')
- value += '/' + str(self.prefixlen)
- self.type = 'dst6'
- except socket.error:
- try:
- socket.inet_pton(socket.AF_INET, value)
- if value.count('.') != 3:
- raise ValueError(
- 'Invalid number of dots in IPv4 address')
- self.prefixlen = prefixlen or 32
- if self.prefixlen < 0 or self.prefixlen > 32:
- raise ValueError(
- 'netmask for IPv4 must be between 0 and 32')
- value += '/' + str(self.prefixlen)
- self.type = 'dst4'
- except socket.error:
- self.type = 'dsthost'
- self.prefixlen = 0
- else:
- host, prefixlen = value.split('/', 1)
- prefixlen = int(prefixlen)
- if prefixlen < 0:
- raise ValueError('netmask must be non-negative')
- self.prefixlen = prefixlen
- try:
- socket.inet_pton(socket.AF_INET6, host)
- if prefixlen > 128:
- raise ValueError('netmask for IPv6 must be <= 128')
- self.type = 'dst6'
- except socket.error:
- try:
- socket.inet_pton(socket.AF_INET, host)
- if prefixlen > 32:
- raise ValueError('netmask for IPv4 must be <= 32')
- self.type = 'dst4'
- if host.count('.') != 3:
- raise ValueError(
- 'Invalid number of dots in IPv4 address')
- except socket.error:
- raise ValueError('Invalid IP address: ' + host)
- super(DstHost, self).__init__(value)
- @property
- def rule(self):
- '''API representation of this rule element'''
- return self.type + '=' + str(self)
- class DstPorts(RuleOption):
- '''Destination port(s), for TCP/UDP only'''
- def __init__(self, value):
- if isinstance(value, int):
- value = str(value)
- if value.count('-') == 1:
- self.range = [int(x) for x in value.split('-', 1)]
- elif not value.count('-'):
- self.range = [int(value), int(value)]
- else:
- raise ValueError(value)
- if any(port < 0 or port > 65536 for port in self.range):
- raise ValueError('Ports out of range')
- if self.range[0] > self.range[1]:
- raise ValueError('Invalid port range')
- super(DstPorts, self).__init__(
- str(self.range[0]) if self.range[0] == self.range[1]
- else '{!s}-{!s}'.format(*self.range))
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'dstports=' + '{!s}-{!s}'.format(*self.range)
- class IcmpType(RuleOption):
- '''ICMP packet type'''
- def __init__(self, value):
- super(IcmpType, self).__init__(value)
- value = int(value)
- if value < 0 or value > 255:
- raise ValueError('ICMP type out of range')
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'icmptype=' + str(self)
- class SpecialTarget(RuleChoice):
- '''Special destination'''
- dns = 'dns'
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'specialtarget=' + str(self)
- class Expire(RuleOption):
- '''Rule expire time'''
- def __init__(self, value):
- super(Expire, self).__init__(value)
- self.datetime = datetime.datetime.utcfromtimestamp(int(value))
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'expire=' + str(self)
- @property
- def expired(self):
- '''Have this rule expired already?'''
- return self.datetime < datetime.datetime.utcnow()
- class Comment(RuleOption):
- '''User comment'''
- @property
- def rule(self):
- '''API representation of this rule element'''
- return 'comment=' + str(self)
- class Rule(object):
- '''A single firewall rule'''
- def __init__(self, rule, **kwargs):
- '''Single firewall rule
- :param xml: XML element describing rule, or None
- :param kwargs: rule elements
- '''
- self._action = None
- self._proto = None
- self._dsthost = None
- self._dstports = None
- self._icmptype = None
- self._specialtarget = None
- self._expire = None
- self._comment = None
- rule_dict = {}
- if rule is not None:
- rule_opts, _, comment = rule.partition('comment=')
- rule_dict = dict(rule_opt.split('=', 1) for rule_opt in
- rule_opts.split(' ') if rule_opt)
- if comment:
- rule_dict['comment'] = comment
- rule_dict.update(kwargs)
- rule_elements = ('action', 'proto', 'dsthost', 'dst4', 'dst6',
- 'specialtarget', 'dstports', 'icmptype', 'expire', 'comment')
- for rule_opt in rule_elements:
- value = rule_dict.pop(rule_opt, None)
- if value is None:
- continue
- if rule_opt in ('dst4', 'dst6'):
- rule_opt = 'dsthost'
- setattr(self, rule_opt, value)
- if rule_dict:
- raise ValueError('Unknown rule elements: {!r}'.format(
- rule_dict))
- if self.action is None:
- raise ValueError('missing action=')
- @property
- def action(self):
- '''rule action'''
- return self._action
- @action.setter
- def action(self, value):
- if not isinstance(value, Action):
- value = Action(value)
- self._action = value
- @property
- def proto(self):
- '''protocol to match'''
- return self._proto
- @proto.setter
- def proto(self, value):
- if value is not None and not isinstance(value, Proto):
- value = Proto(value)
- if value not in ('tcp', 'udp'):
- self.dstports = None
- if value not in ('icmp',):
- self.icmptype = None
- self._proto = value
- @property
- def dsthost(self):
- '''destination host/network'''
- return self._dsthost
- @dsthost.setter
- def dsthost(self, value):
- if value is not None and not isinstance(value, DstHost):
- value = DstHost(value)
- self._dsthost = value
- @property
- def dstports(self):
- ''''Destination port(s) (for \'tcp\' and \'udp\' protocol only)'''
- return self._dstports
- @dstports.setter
- def dstports(self, value):
- if value is not None:
- if self.proto not in ('tcp', 'udp'):
- raise ValueError(
- 'dstports valid only for \'tcp\' and \'udp\' protocols')
- if not isinstance(value, DstPorts):
- value = DstPorts(value)
- self._dstports = value
- @property
- def icmptype(self):
- '''ICMP packet type (for \'icmp\' protocol only)'''
- return self._icmptype
- @icmptype.setter
- def icmptype(self, value):
- if value is not None:
- if self.proto not in ('icmp',):
- raise ValueError('icmptype valid only for \'icmp\' protocol')
- if not isinstance(value, IcmpType):
- value = IcmpType(value)
- self._icmptype = value
- @property
- def specialtarget(self):
- '''Special target, for now only \'dns\' supported'''
- return self._specialtarget
- @specialtarget.setter
- def specialtarget(self, value):
- if not isinstance(value, SpecialTarget):
- value = SpecialTarget(value)
- self._specialtarget = value
- @property
- def expire(self):
- '''Timestamp (UNIX epoch) on which this rule expire'''
- return self._expire
- @expire.setter
- def expire(self, value):
- if not isinstance(value, Expire):
- value = Expire(value)
- self._expire = value
- @property
- def comment(self):
- '''User comment'''
- return self._comment
- @comment.setter
- def comment(self, value):
- if not isinstance(value, Comment):
- value = Comment(value)
- self._comment = value
- @property
- def rule(self):
- '''API representation of this rule'''
- values = []
- # comment must be the last one
- for prop in ('action', 'proto', 'dsthost', 'dstports', 'icmptype',
- 'specialtarget', 'expire', 'comment'):
- value = getattr(self, prop)
- if value is None:
- continue
- if value.rule is None:
- continue
- values.append(value.rule)
- return ' '.join(values)
- def __eq__(self, other):
- if isinstance(other, Rule):
- return self.rule == other.rule
- if isinstance(other, str):
- return self.rule == str
- return NotImplemented
- def __repr__(self):
- return 'Rule(\'{}\')'.format(self.rule)
- class Firewall(object):
- '''Firewal manager for a VM'''
- def __init__(self, vm):
- self.vm = vm
- self._rules = []
- self._policy = None
- self._loaded = False
- def load_rules(self):
- '''Force (re-)loading firewall rules'''
- rules_str = self.vm.qubesd_call(None, 'admin.vm.firewall.Get')
- rules = []
- for rule_str in rules_str.decode().splitlines():
- rules.append(Rule(rule_str))
- self._rules = rules
- self._loaded = True
- @property
- def rules(self):
- '''Firewall rules
- You can either copy them, edit and then assign new rules list to this
- property, or edit in-place and call :py:meth:`save_rules`.
- Once rules are loaded, they are cached. To reload rules,
- call :py:meth:`load_rules`.
- '''
- if not self._loaded:
- self.load_rules()
- return self._rules
- @rules.setter
- def rules(self, value):
- self.save_rules(value)
- self._rules = value
- def save_rules(self, rules=None):
- '''Save firewall rules. Needs to be called after in-place editing
- :py:attr:`rules`.
- '''
- if rules is None:
- rules = self._rules
- self.vm.qubesd_call(None, 'admin.vm.firewall.Set',
- payload=(''.join('{}\n'.format(rule.rule)
- for rule in rules)).encode('ascii'))
- @property
- def policy(self):
- '''Default action to take if no rule matches'''
- policy_str = self.vm.qubesd_call(None, 'admin.vm.firewall.GetPolicy')
- return Action(policy_str.decode())
- @policy.setter
- def policy(self, value):
- self.vm.qubesd_call(None, 'admin.vm.firewall.SetPolicy', payload=str(
- value).encode('ascii'))
- def reload(self):
- '''Force reload the same firewall rules.
- Can be used for example to force again names resolution.
- '''
- self.vm.qubesd_call(None, 'admin.vm.firewall.Reload')
|