65fc62a989
To simplify configuration, automatically enable 'yum-proxy-setup' pseudo-service when allowing access to the proxy. Also disable this service, when access is revoked. Thanks to this the user can enable this feature by one click in firewall settings.
324 lines
10 KiB
Python
Executable File
324 lines
10 KiB
Python
Executable File
#!/usr/bin/python2.6
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2012 Marek Marczykowski <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
#
|
|
#
|
|
|
|
from qubes.qubes import QubesVmCollection
|
|
from optparse import OptionParser;
|
|
import subprocess
|
|
import sys
|
|
import re
|
|
|
|
services = list()
|
|
|
|
def load_services():
|
|
global services
|
|
services = list()
|
|
pattern = re.compile("(?P<name>[a-z][a-z0-9-]+)\s+(?P<port>[0-9]+)/(?P<protocol>[a-z]+)", re.IGNORECASE)
|
|
f = open('/etc/services', 'r')
|
|
for line in f:
|
|
match = pattern.match(line)
|
|
if match is not None:
|
|
service = match.groupdict()
|
|
services.append( (service["name"], int(service["port"]), service["protocol"]) )
|
|
f.close()
|
|
|
|
def get_service_name(port):
|
|
for service in services:
|
|
if service[1] == port:
|
|
return service[0]
|
|
return str(port)
|
|
|
|
def get_service_port(name):
|
|
for service in services:
|
|
if service[0] == name:
|
|
return int(service[1])
|
|
return None
|
|
|
|
def parse_rule(args):
|
|
if len(args) < 2:
|
|
print >>sys.stderr, "ERROR: Rule must have at least address and protocol"
|
|
return None
|
|
|
|
address = args[0]
|
|
netmask = 32
|
|
proto = args[1]
|
|
port = args[2] if len(args) > 2 else None
|
|
port_end = None
|
|
|
|
unmask = address.split("/", 1)
|
|
if len(unmask) == 2:
|
|
address = unmask[0]
|
|
netmask = unmask[1]
|
|
if netmask.isdigit():
|
|
if re.match("^([0-9]{1,3}\.){3}[0-9]{1,3}$", address) is None:
|
|
print >>sys.stderr, "ERROR: Only IP is allowed when specyfying netmask"
|
|
return None
|
|
if netmask != "":
|
|
netmask = int(unmask[1])
|
|
if netmask < 0 or netmask > 32:
|
|
print >>sys.stderr, "ERROR: Invalid netmask"
|
|
return None
|
|
else:
|
|
print >>sys.stderr, "ERROR: Invalid netmask"
|
|
return None
|
|
|
|
if address[-1:] == ".":
|
|
address = address[:-1]
|
|
|
|
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
|
|
if not all(allowed.match(x) for x in address.split(".")):
|
|
print >>sys.stderr, "ERROR: Invalid hostname"
|
|
return None
|
|
|
|
proto_split = proto.split('/', 1)
|
|
if len(proto_split) == 2:
|
|
proto = proto_split[0]
|
|
port = proto_split[1]
|
|
|
|
if proto not in ['tcp', 'udp', 'any']:
|
|
print >>sys.stderr, "ERROR: Protocol must be one of: 'tcp', 'udp', 'any'"
|
|
return None
|
|
|
|
if proto != "any" and port is None:
|
|
print >>sys.stderr, "ERROR: Port required for protocol %s" % args[1]
|
|
return None
|
|
|
|
if port is not None:
|
|
port_range = port.split('-', 1)
|
|
if len(port_range) == 2:
|
|
port = port_range[0]
|
|
port_end = port_range[1]
|
|
|
|
if get_service_port(port):
|
|
port = get_service_port(port)
|
|
elif not port.isdigit():
|
|
print >>sys.stderr, "ERROR: Invalid port/service name '%s'" % port
|
|
return None
|
|
else:
|
|
port = int(port)
|
|
|
|
if port_end is not None and not port_end.isdigit():
|
|
print >>sys.stderr, "ERROR: Invalid port '%s'" % port_end
|
|
return None
|
|
|
|
if port_end is not None:
|
|
port_end = int(port_end)
|
|
|
|
rule = {}
|
|
rule['address'] = address
|
|
rule['netmask'] = netmask
|
|
rule['proto'] = proto
|
|
rule['portBegin'] = port
|
|
rule['portEnd'] = port_end
|
|
return rule
|
|
|
|
def list_rules(rules):
|
|
fields = [ "num", "address", "proto", "port(s)" ]
|
|
|
|
rules_to_display = list()
|
|
counter = 1
|
|
for rule in rules:
|
|
parsed_rule = {
|
|
'num': "{0:>2}".format(counter),
|
|
'address': rule['address'] + ('/' + str(rule['netmask']) if rule['netmask'] < 32 else ""),
|
|
'proto': rule['proto'],
|
|
'port(s)': '',
|
|
}
|
|
if rule['proto'] in ['tcp', 'udp']:
|
|
parsed_rule['port(s)'] = str(rule['portBegin']) + \
|
|
('-' + str(rule['portEnd']) if rule['portEnd'] is not None else '')
|
|
if rule['portBegin'] is not None and rule['portEnd'] is None:
|
|
parsed_rule['port(s)'] = get_service_name(rule['portBegin'])
|
|
|
|
rules_to_display.append(parsed_rule)
|
|
counter += 1
|
|
|
|
fields_width = {}
|
|
for f in fields:
|
|
fields_width[f] = len(f)
|
|
for r in rules_to_display:
|
|
if len(r[f]) > fields_width[f]:
|
|
fields_width[f] = len(r[f])
|
|
|
|
# Display the header
|
|
s = ""
|
|
for f in fields:
|
|
fmt="{{0:-^{0}}}-+".format(fields_width[f] + 1)
|
|
s += fmt.format('-')
|
|
print s
|
|
|
|
s = ""
|
|
for f in fields:
|
|
fmt=" {{0:^{0}}} |".format(fields_width[f])
|
|
s += fmt.format(f)
|
|
print s
|
|
|
|
s = ""
|
|
for f in fields:
|
|
fmt="{{0:-^{0}}}-+".format(fields_width[f] + 1)
|
|
s += fmt.format('-')
|
|
print s
|
|
|
|
# And the content
|
|
for r in rules_to_display:
|
|
s = ""
|
|
for f in fields:
|
|
fmt=" {{0:<{0}}} |".format(fields_width[f])
|
|
s += fmt.format(r[f])
|
|
print s
|
|
|
|
def display_firewall(conf):
|
|
print "Firewall policy: %s" % (
|
|
"ALLOW all traffic except" if conf['allow'] else "DENY all traffic except")
|
|
print "ICMP: %s" % ("ALLOW" if conf['allowIcmp'] else 'DENY')
|
|
print "DNS: %s" % ("ALLOW" if conf['allowDns'] else 'DENY')
|
|
print "Qubes yum proxy: %s" % ("ALLOW" if conf['allowYumProxy'] else 'DENY')
|
|
list_rules(conf['rules'])
|
|
|
|
def add_rule(conf, args):
|
|
rule = parse_rule(args)
|
|
if rule is None:
|
|
return False
|
|
|
|
conf['rules'].append(rule)
|
|
return True
|
|
|
|
def del_rule(conf, args):
|
|
if len(args) == 1 and args[0].isdigit():
|
|
rulenum = int(args[0])
|
|
if rulenum < 1 or rulenum > len(conf['rules']):
|
|
print >>sys.stderr, "ERROR: Rule number out of range"
|
|
return False
|
|
conf['rules'].pop(rulenum-1)
|
|
else:
|
|
rule = parse_rule(args)
|
|
#print "PARSED: %s" % str(rule)
|
|
#print "ALL: %s" % str(conf['rules'])
|
|
if rule is None:
|
|
return False
|
|
try:
|
|
conf['rules'].remove(rule)
|
|
except ValueError:
|
|
print >>sys.stderr, "ERROR: Rule not found"
|
|
return False
|
|
|
|
return True
|
|
|
|
def allow_deny_value(s):
|
|
value = None
|
|
if s == "allow":
|
|
value = True
|
|
elif s == "deny":
|
|
value = False
|
|
else:
|
|
print >>sys.stderr, 'ERROR: Only "allow" or "deny" allowed'
|
|
exit(1)
|
|
return value
|
|
|
|
def main():
|
|
usage = "usage: %prog [-n] <vm-name> [action] [rule spec]\n"
|
|
usage += " rule specification can be one of:\n"
|
|
usage += " address|hostname[/netmask] tcp|udp port[-port]\n"
|
|
usage += " address|hostname[/netmask] tcp|udp service_name\n"
|
|
usage += " address|hostname[/netmask] any\n"
|
|
parser = OptionParser (usage)
|
|
parser.add_option ("-l", "--list", dest="do_list", action="store_true", default=True,
|
|
help="List firewall settings (default action)")
|
|
parser.add_option ("-a", "--add", dest="do_add", action="store_true", default=False,
|
|
help="Add rule")
|
|
parser.add_option ("-d", "--del", dest="do_del", action="store_true", default=False,
|
|
help="Remove rule (given by number or by rule spec)")
|
|
parser.add_option ("-P", "--policy", dest="set_policy", action="store", default=None,
|
|
help="Set firewall policy (allow/deny)")
|
|
parser.add_option ("-i", "--icmp", dest="set_icmp", action="store", default=None,
|
|
help="Set ICMP access (allow/deny)")
|
|
parser.add_option ("-D", "--dns", dest="set_dns", action="store", default=None,
|
|
help="Set DNS access (allow/deny)")
|
|
parser.add_option ("-Y", "--yum-proxy", dest="set_yum_proxy", action="store", default=None,
|
|
help="Set access to Qubes yum proxy (allow/deny)")
|
|
|
|
parser.add_option ("-n", "--numeric", dest="numeric", action="store_true", default=False,
|
|
help="Display port numbers instead of services (makes sense only with --list)")
|
|
|
|
(options, args) = parser.parse_args ()
|
|
if (len (args) < 1):
|
|
parser.error ("You must specify VM name!")
|
|
vmname = args[0]
|
|
args = args[1:]
|
|
|
|
if options.do_add or options.do_del or options.set_policy or options.set_icmp or options.set_dns or options.set_yum_proxy:
|
|
options.do_list = False
|
|
qvm_collection = QubesVmCollection()
|
|
if options.do_list:
|
|
qvm_collection.lock_db_for_reading()
|
|
qvm_collection.load()
|
|
qvm_collection.unlock_db()
|
|
else:
|
|
qvm_collection.lock_db_for_writing()
|
|
qvm_collection.load()
|
|
|
|
vm = qvm_collection.get_vm_by_name(vmname)
|
|
if vm is None:
|
|
print >> sys.stderr, "A VM with the name '{0}' does not exist in the system.".format(vmname)
|
|
exit(1)
|
|
|
|
changed = False
|
|
conf = vm.get_firewall_conf()
|
|
|
|
if options.set_policy:
|
|
conf['allow'] = allow_deny_value(options.set_policy)
|
|
changed = True
|
|
if options.set_icmp:
|
|
conf['allowIcmp'] = allow_deny_value(options.set_icmp)
|
|
changed = True
|
|
if options.set_dns:
|
|
conf['allowDns'] = allow_deny_value(options.set_dns)
|
|
changed = True
|
|
if options.set_yum_proxy:
|
|
conf['allowYumProxy'] = allow_deny_value(options.set_yum_proxy)
|
|
changed = True
|
|
|
|
if options.do_add:
|
|
load_services()
|
|
changed = add_rule(conf, args)
|
|
elif options.do_del:
|
|
load_services()
|
|
changed = del_rule(conf, args)
|
|
elif options.do_list:
|
|
if not options.numeric:
|
|
load_services()
|
|
if not vm.has_firewall():
|
|
print "INFO: This VM has no firewall rules set, below defaults are listed"
|
|
display_firewall(conf)
|
|
|
|
if changed:
|
|
vm.write_firewall_conf(conf)
|
|
if vm.is_running():
|
|
if vm.netvm is not None and vm.netvm.is_proxyvm():
|
|
vm.netvm.write_iptables_xenstore_entry()
|
|
qvm_collection.save()
|
|
|
|
if not options.do_list:
|
|
qvm_collection.unlock_db()
|
|
|
|
|
|
main()
|