diff --git a/doc/manpages/qvm-firewall.rst b/doc/manpages/qvm-firewall.rst index 840c78da..c0c1c8d3 100644 --- a/doc/manpages/qvm-firewall.rst +++ b/doc/manpages/qvm-firewall.rst @@ -1,62 +1,85 @@ .. program:: qvm-firewall -======================================================= -:program:`qvm-firewall` -- Qubes firewall configuration -======================================================= +:program:`qvm-firewall` -- Manage VM outbound firewall +====================================================== Synopsis -======== -:command:`qvm-firewall` [-n] <*vm-name*> [*action*] [*rule spec*] +-------- -Rule specification can be one of: - 1. *address*\ |\ *hostname*\ [/*netmask*] tcp|udp *port*\ [-*port*] - 2. *address*\ |\ *hostname*\ [/*netmask*] tcp|udp *service_name* - 3. *address*\ |\ *hostname*\ [/*netmask*] any +:command:`qvm-firewall` [-h] [--verbose] [--quiet] [--reload] *VMNAME* add *RULE* +:command:`qvm-firewall` [-h] [--verbose] [--quiet] [--reload] *VMNAME* del [--rule-no=*RULE_NUMBER*] [*RULE*] +:command:`qvm-firewall` [-h] [--verbose] [--quiet] [--reload] *VMNAME* list [--raw] +:command:`qvm-firewall` [-h] [--verbose] [--quiet] [--reload] *VMNAME* policy {accept,drop} Options -======= +------- .. option:: --help, -h - Show this help message and exit + show help message and exit -.. option:: --list, -l +.. option:: --verbose, -v - List firewall settings (default action) + increase verbosity -.. option:: --add, -a +.. option:: --quiet, -q - Add rule + decrease verbosity -.. option:: --del, -d +.. option:: --reload, -r - Remove rule (given by number or by rule spec) + force reloading rules even when unchanged -.. option:: --policy=SET_POLICY, -P SET_POLICY +.. option:: --raw - Set firewall policy (allow/deny) + Print raw rules when listing -.. option:: --icmp=SET_ICMP, -i SET_ICMP - Set ICMP access (allow/deny) +Actions description +------------------- -.. option:: --dns=SET_DNS, -D SET_DNS +Available actions: - Set DNS access (allow/deny) +* add - add specified rule. See `Rule syntax` section below. -.. option:: --yum-proxy=SET_YUM_PROXY, -Y SET_YUM_PROXY +* del - delete specified rule. Can be selected either by rule number using +:option:`--rule-no`, or specifying rule itself. - Set access to Qubes yum proxy (allow/deny). +* list - list all the rules for a given VM. - .. note:: - if set to "deny", access will be rejected even if policy set to "allow" +* policy - set default action if no rule matches. -.. option:: --numeric, -n - Display port numbers instead of services (makes sense only with :option:`--list`) +Rule syntax +----------- + +A single rule is built from: + - action - either ``drop`` or ``accept`` + - zero or more matches + +Selected action is applied on given packet when all specified matches do match, +further rules are not evaluated. If none of the rules match, default action +(``policy``) is applied. + +Supported matches: + - ``dsthost`` - destination host or network. Can be either IP address in CIDR + notation, or a host name. Both IPv4 and IPv6 are supported by the rule syntax. + - ``proto`` - specific IP protocol. Supported values: ``tcp``, ``udp``, + ``icmp``. + - ``dstports`` - destination port or ports range. Can be either a single port, + or a range separated by ``-``. Valid only together with ``proto=udp`` or + ``proto=tcp``. + - ``icmptype`` - ICMP message type, specified as numeric value. Valid only + together with ``proto=icmp``. + - ``specialtarget`` - predefined target. Currently the only supported value is + ``dns``. This can be combined with other matches to narrow it down. Authors -======= +------- + | Joanna Rutkowska | Rafal Wojtczuk | Marek Marczykowski +| Wojtek Porczyk + +.. vim: ts=3 sw=3 et tw=80 diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index b13effa7..6776eeba 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -977,6 +977,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.vm.adminvm', 'qubes.tests.app', 'qubes.tests.tools.qvm_device', + 'qubes.tests.tools.qvm_firewall', 'qubes.tests.tools.qvm_ls', ): tests.addTests(loader.loadTestsFromName(modname)) @@ -1002,6 +1003,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument # tool tests 'qubes.tests.int.tools.qubes_create', 'qubes.tests.int.tools.qvm_check', + 'qubes.tests.int.tools.qvm_firewall', 'qubes.tests.int.tools.qvm_prefs', 'qubes.tests.int.tools.qvm_run', # external modules diff --git a/qubes/tests/int/tools/qvm_firewall.py b/qubes/tests/int/tools/qvm_firewall.py new file mode 100644 index 00000000..0bfbaa98 --- /dev/null +++ b/qubes/tests/int/tools/qvm_firewall.py @@ -0,0 +1,161 @@ +#!/usr/bin/python2 +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import qubes.firewall +import qubes.tests +import qubes.tests.tools +import qubes.tools.qvm_firewall +import qubes.vm.appvm + + +class TC_10_ArgParser(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): + list_header = ['NO', 'ACTION', 'HOST', 'PROTOCOL', 'PORT(S)', + 'SPECIAL TARGET', 'ICMP TYPE'] + + def setUp(self): + super(TC_10_ArgParser, self).setUp() + self.init_default_template() + self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, None, + name=self.make_vm_name('vm'), label='red') + self.vm.create_on_disk() + self.app.save() + + def test_000_list(self): + with qubes.tests.tools.StdoutBuffer() as stdout: + qubes.tools.qvm_firewall.main([self.vm.name, 'list']) + self.assertEqual(stdout.getvalue(), + ' '.join(self.list_header) + '\n') + + def test_001_list(self): + self.vm.firewall.rules.append( + qubes.firewall.Rule(action='accept', dsthost='127.0.0.2', + proto='tcp', dstports=80)) + self.vm.firewall.rules.append( + qubes.firewall.Rule(action='accept', dsthost='127.0.0.3', + proto='icmp', icmptype=8)) + self.vm.firewall.rules.append( + qubes.firewall.Rule(action='accept', specialtarget='dns')) + self.vm.firewall.save() + expected_output = ( + 'NO ACTION HOST PROTOCOL PORT(S) SPECIAL TARGET ICMP ' + 'TYPE\n' + '0 accept 127.0.0.2/32 tcp 80 ' + ' \n' + '1 accept 127.0.0.3/32 icmp 8 ' + ' \n' + '2 accept dns ' + ' \n' + ) + with qubes.tests.tools.StdoutBuffer() as stdout: + qubes.tools.qvm_firewall.main([self.vm.name, 'list']) + self.assertEqual( + '\n'.join(l.rstrip() for l in stdout.getvalue().splitlines()), + '\n'.join(l.rstrip() for l in expected_output.splitlines())) + + def test_002_list_raw(self): + self.vm.firewall.rules = [ + qubes.firewall.Rule(action='accept', dsthost='127.0.0.2', + proto='tcp', dstports=80), + qubes.firewall.Rule(action='accept', dsthost='127.0.0.3', + proto='icmp', icmptype=8), + qubes.firewall.Rule(action='accept', specialtarget='dns'), + ] + self.vm.firewall.save() + expected_output = '\n'.join(rule.rule for rule in + self.vm.firewall.rules) + '\n' + with qubes.tests.tools.StdoutBuffer() as stdout: + qubes.tools.qvm_firewall.main(['--raw', self.vm.name, 'list']) + self.assertEqual(stdout.getvalue(), expected_output) + + def test_010_add(self): + qubes.tools.qvm_firewall.main( + [self.vm.name, 'add', 'accept', '1.2.3.0/24', 'tcp', '443']) + self.assertEqual(self.vm.firewall.rules, + [qubes.firewall.Rule(action='accept', dsthost='1.2.3.0/24', + proto='tcp', dstports='443')]) + + def test_011_add_before(self): + self.vm.firewall.rules = [ + qubes.firewall.Rule(action='accept', dsthost='1.2.3.1'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.2'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.3'), + ] + self.vm.firewall.save() + qubes.tools.qvm_firewall.main( + [self.vm.name, 'add', '--before', '2', + 'accept', '1.2.3.0/24', 'tcp', '443']) + self.vm.firewall.load() + self.assertEqual(self.vm.firewall.rules, + [qubes.firewall.Rule(action='accept', dsthost='1.2.3.1'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.2'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.0/24', + proto='tcp', dstports='443'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.3'), + ]) + + def test_020_del(self): + self.vm.firewall.rules = [ + qubes.firewall.Rule(action='accept', dsthost='1.2.3.1'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.2'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.3'), + ] + self.vm.firewall.save() + qubes.tools.qvm_firewall.main( + [self.vm.name, 'del', 'accept', '1.2.3.2']) + self.vm.firewall.load() + self.assertEqual(self.vm.firewall.rules, + [qubes.firewall.Rule(action='accept', dsthost='1.2.3.1'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.3'), + ]) + + def test_021_del_by_number(self): + self.vm.firewall.rules = [ + qubes.firewall.Rule(action='accept', dsthost='1.2.3.1'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.2'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.3'), + ] + self.vm.firewall.save() + qubes.tools.qvm_firewall.main( + [self.vm.name, 'del', '--rule-no', '1']) + self.vm.firewall.load() + self.assertEqual(self.vm.firewall.rules, + [qubes.firewall.Rule(action='accept', dsthost='1.2.3.1'), + qubes.firewall.Rule(action='accept', dsthost='1.2.3.3'), + ]) + + def test_030_policy(self): + with qubes.tests.tools.StdoutBuffer() as stdout: + qubes.tools.qvm_firewall.main([self.vm.name, 'policy']) + self.assertEqual(stdout.getvalue(), 'accept\n') + self.vm.firewall.policy = 'drop' + self.vm.firewall.save() + with qubes.tests.tools.StdoutBuffer() as stdout: + qubes.tools.qvm_firewall.main([self.vm.name, 'policy']) + self.assertEqual(stdout.getvalue(), 'drop\n') + + def test_031_policy_set(self): + qubes.tools.qvm_firewall.main([self.vm.name, 'policy', 'drop']) + self.assertEqual(self.vm.firewall.policy, 'drop') + qubes.tools.qvm_firewall.main([self.vm.name, 'policy', 'accept']) + self.vm.firewall.load() + self.assertEqual(self.vm.firewall.policy, 'accept') + diff --git a/qubes/tests/tools/qvm_firewall.py b/qubes/tests/tools/qvm_firewall.py new file mode 100644 index 00000000..0d3e72c9 --- /dev/null +++ b/qubes/tests/tools/qvm_firewall.py @@ -0,0 +1,61 @@ +#!/usr/bin/python2 +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import argparse + +import qubes.firewall +import qubes.tests +import qubes.tests.firewall +import qubes.tools.qvm_firewall + + +class TC_00_RuleAction(qubes.tests.QubesTestCase): + def setUp(self): + super(TC_00_RuleAction, self).setUp() + self.action = qubes.tools.qvm_firewall.RuleAction(None, dest='rule') + + def test_000_named_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['dsthost=127.0.0.1', 'action=accept']) + self.assertEqual(ns.rule, + qubes.firewall.Rule(None, action='accept', dsthost='127.0.0.1/32')) + + def test_001_unnamed_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['accept', '127.0.0.1', 'tcp', '80']) + self.assertEqual(ns.rule, + qubes.firewall.Rule(None, action='accept', dsthost='127.0.0.1/32', + proto='tcp', dstports=80)) + + def test_002_unnamed_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['accept', '127.0.0.1', 'icmp', '8']) + self.assertEqual(ns.rule, + qubes.firewall.Rule(None, action='accept', dsthost='127.0.0.1/32', + proto='icmp', icmptype=8)) + + def test_003_mixed_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['dsthost=127.0.0.1', 'accept', + 'dstports=443', 'tcp']) + self.assertEqual(ns.rule, + qubes.firewall.Rule(None, action='accept', dsthost='127.0.0.1/32', + proto='tcp', dstports=443)) \ No newline at end of file diff --git a/qubes/tools/qvm_firewall.py b/qubes/tools/qvm_firewall.py new file mode 100644 index 00000000..47e730dd --- /dev/null +++ b/qubes/tools/qvm_firewall.py @@ -0,0 +1,172 @@ +#!/usr/bin/python2 +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +from __future__ import print_function +import argparse +import sys + +import itertools + +import qubes.firewall +import qubes.tools + + +class RuleAction(argparse.Action): + # pylint: disable=too-few-public-methods + '''Parser action for a single firewall rule. It accept syntax: + - [ [ [|]]] + - action= [specialtarget=dns] [dsthost=] + [proto=] [dstports=] [icmptype=] + + Or a mix of them. + ''' + def __call__(self, _parser, namespace, values, option_string=None): + if not values: + setattr(namespace, self.dest, None) + return + assumed_order = ['action', 'dsthost', 'proto', 'dstports', 'icmptype'] + allowed_opts = assumed_order + ['specialtarget'] + kwargs = {} + for opt in values: + opt_elements = opt.split('=') + if len(opt_elements) == 2: + key, value = opt_elements + elif len(opt_elements) == 1: + key, value = assumed_order[0], opt + else: + raise argparse.ArgumentError(None, + 'invalid rule description: {}'.format(opt)) + if key not in allowed_opts: + raise argparse.ArgumentError(None, + 'Invalid rule element: {}'.format(opt)) + kwargs[key] = value + if key in assumed_order: + assumed_order.remove(key) + if key == 'proto' and value in ['tcp', 'udp']: + assumed_order.remove('icmptype') + elif key == 'proto' and value in ['icmp']: + assumed_order.remove('dstports') + rule = qubes.firewall.Rule(**kwargs) + setattr(namespace, self.dest, rule) + +parser = qubes.tools.QubesArgumentParser(vmname_nargs=1) + +action = parser.add_subparsers(dest='command', help='action to perform') + +action_add = action.add_parser('add', help='add rule') +action_add.add_argument('--before', type=int, default=None, + help='Add rule before rule with given number, instead of at the end') +action_add.add_argument('rule', nargs='+', action=RuleAction, + help='rule description') + +action_del = action.add_parser('del', help='remove rule') +action_del.add_argument('--rule-no', dest='rule_no', type=int, + action='store', help='rule number') +action_del.add_argument('rule', nargs='*', action=RuleAction, + help='rule to be removed') + +action_list = action.add_parser('list', help='list rules') + +action_policy = action.add_parser('policy', + help='get/set policy - default action') +action_policy.add_argument('policy', choices=['accept', 'drop'], + help='policy value', default=None, nargs='?') + +parser.add_argument('--reload', '-r', action='store_true', + help='force reloading rules even when unchanged') + +parser.add_argument('--raw', action='store_true', + help='output rules as raw strings, instead of nice table') + + +def rules_list_table(vm): + header = ['NO', 'ACTION', 'HOST', 'PROTOCOL', 'PORT(S)', + 'SPECIAL TARGET', 'ICMP TYPE'] + rows = [] + for (rule, rule_no) in zip(vm.firewall.rules, itertools.count()): + row = [str(x) if x is not None else '' for x in [ + rule_no, + rule.action, + rule.dsthost, + rule.proto, + rule.dstports, + rule.specialtarget, + rule.icmptype, + ]] + rows.append(row) + qubes.tools.print_table([header] + rows) + + +def rules_list_raw(vm): + for rule in vm.firewall.rules: + sys.stdout.write(rule.rule + '\n') + + +def rules_add(vm, args): + if args.before is not None: + vm.firewall.rules.insert(args.before, args.rule) + else: + vm.firewall.rules.append(args.rule) + vm.firewall.save() + + +def rules_del(vm, args): + if args.rule_no is not None: + vm.firewall.rules.pop(args.rule_no) + else: + vm.firewall.rules.remove(args.rule) + vm.firewall.save() + + +def policy(vm, args): + if args.policy is not None: + vm.firewall.policy = args.policy + vm.firewall.save() + else: + print(vm.firewall.policy) + + +def main(args=None): + '''Main routine of :program:`qvm-firewall`.''' + try: + args = parser.parse_args(args) + vm = args.domains[0] + if args.command == 'add': + rules_add(vm, args) + elif args.command == 'del': + rules_del(vm, args) + elif args.command == 'policy': + policy(vm, args) + elif args.command == 'list': + if args.raw: + rules_list_raw(vm) + else: + rules_list_table(vm) + if args.reload: + vm.fire_event('firewall-changed') + except qubes.exc.QubesException as e: + parser.print_error(e.message) + return 1 + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/rpm_spec/core-dom0.spec b/rpm_spec/core-dom0.spec index 3ffb12e2..276c40a2 100644 --- a/rpm_spec/core-dom0.spec +++ b/rpm_spec/core-dom0.spec @@ -253,6 +253,7 @@ fi %{python_sitelib}/qubes/tools/qvm_create.py* %{python_sitelib}/qubes/tools/qvm_device.py* %{python_sitelib}/qubes/tools/qvm_features.py* +%{python_sitelib}/qubes/tools/qvm_firewall.py* %{python_sitelib}/qubes/tools/qvm_check.py* %{python_sitelib}/qubes/tools/qvm_clone.py* %{python_sitelib}/qubes/tools/qvm_kill.py* @@ -302,6 +303,7 @@ fi %{python_sitelib}/qubes/tests/tools/__init__.py* %{python_sitelib}/qubes/tests/tools/init.py* %{python_sitelib}/qubes/tests/tools/qvm_device.py* +%{python_sitelib}/qubes/tests/tools/qvm_firewall.py* %{python_sitelib}/qubes/tests/tools/qvm_ls.py* %dir %{python_sitelib}/qubes/tests/int @@ -318,6 +320,7 @@ fi %dir %{python_sitelib}/qubes/tests/int/tools %{python_sitelib}/qubes/tests/int/tools/__init__.py* %{python_sitelib}/qubes/tests/int/tools/qubes_create.py* +%{python_sitelib}/qubes/tests/int/tools/qvm_firewall.py* %{python_sitelib}/qubes/tests/int/tools/qvm_check.py* %{python_sitelib}/qubes/tests/int/tools/qvm_prefs.py* %{python_sitelib}/qubes/tests/int/tools/qvm_run.py*