From 812e28c97f407c7472149830c2158964bb99f505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 1 May 2017 02:21:17 +0200 Subject: [PATCH] tools: qvm-firewall The tool and tests. Use '-' for empty fields in the table, to work also with old 'column' program version, which doesn't support empty table fields. --- qubesmgmt/tests/tools/qvm_firewall.py | 218 ++++++++++++++++++++++++++ qubesmgmt/tools/qvm_firewall.py | 186 ++++++++++++++++++++++ 2 files changed, 404 insertions(+) create mode 100644 qubesmgmt/tests/tools/qvm_firewall.py create mode 100644 qubesmgmt/tools/qvm_firewall.py diff --git a/qubesmgmt/tests/tools/qvm_firewall.py b/qubesmgmt/tests/tools/qvm_firewall.py new file mode 100644 index 0000000..bcd6aa4 --- /dev/null +++ b/qubesmgmt/tests/tools/qvm_firewall.py @@ -0,0 +1,218 @@ +# encoding=utf-8 +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import argparse + +import qubesmgmt.firewall +import qubesmgmt.tests +import qubesmgmt.tests.tools +import qubesmgmt.tools.qvm_firewall + + +class TC_00_RuleAction(qubesmgmt.tests.QubesTestCase): + def setUp(self): + super(TC_00_RuleAction, self).setUp() + self.action = qubesmgmt.tools.qvm_firewall.RuleAction( + None, dest='rule') + + def test_000_named_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['dsthost=127.0.0.1', 'action=accept']) + self.assertEqual(ns.rule, + qubesmgmt.firewall.Rule( + None, action='accept', dsthost='127.0.0.1/32')) + + def test_001_unnamed_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['accept', '127.0.0.1', 'tcp', '80']) + self.assertEqual(ns.rule, + qubesmgmt.firewall.Rule( + None, action='accept', dsthost='127.0.0.1/32', + proto='tcp', dstports=80)) + + def test_002_unnamed_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['accept', '127.0.0.1', 'icmp', '8']) + self.assertEqual(ns.rule, + qubesmgmt.firewall.Rule( + None, action='accept', dsthost='127.0.0.1/32', + proto='icmp', icmptype=8)) + + def test_003_mixed_opts(self): + ns = argparse.Namespace() + self.action(None, ns, ['dsthost=127.0.0.1', 'accept', + 'dstports=443', 'tcp']) + self.assertEqual(ns.rule, + qubesmgmt.firewall.Rule( + None, action='accept', dsthost='127.0.0.1/32', + proto='tcp', dstports=443)) + + +class TC_10_qvm_firewall(qubesmgmt.tests.QubesTestCase): + def setUp(self): + super(TC_10_qvm_firewall, self).setUp() + self.app.expected_calls[('dom0', 'mgmt.vm.List', None, None)] = \ + b'0\0test-vm class=AppVM state=Halted\n' + + def test_000_list(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + with qubesmgmt.tests.tools.StdoutBuffer() as stdout: + qubesmgmt.tools.qvm_firewall.main(['test-vm', 'list'], app=self.app) + self.assertEqual( + [l.strip() for l in stdout.getvalue().splitlines()], + ['NO ACTION HOST PROTOCOL PORT(S) SPECIAL ' + 'TARGET ICMP TYPE COMMENT', + '0 accept qubes-os.org - - - ' + ' - -', + '1 drop - icmp - - ' + ' - -', + ]) + + def test_001_list(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org proto=tcp ' \ + b'dstports=443-443\n' \ + b'action=drop proto=icmp icmptype=8\n' \ + b'action=accept specialtarget=dns comment=Allow DNS\n' + with qubesmgmt.tests.tools.StdoutBuffer() as stdout: + qubesmgmt.tools.qvm_firewall.main(['test-vm', 'list'], app=self.app) + self.assertEqual( + [l.strip() for l in stdout.getvalue().splitlines()], + ['NO ACTION HOST PROTOCOL PORT(S) SPECIAL ' + 'TARGET ICMP TYPE COMMENT', + '0 accept qubes-os.org tcp 443 - ' + ' - -', + '1 drop - icmp - - ' + ' 8 -', + '2 accept - - - dns ' + ' - Allow DNS', + ]) + + def test_002_list_raw(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + with qubesmgmt.tests.tools.StdoutBuffer() as stdout: + qubesmgmt.tools.qvm_firewall.main(['test-vm', '--raw', 'list'], + app=self.app) + self.assertEqual( + [l.strip() for l in stdout.getvalue().splitlines()], + ['action=accept dsthost=qubes-os.org', + 'action=drop proto=icmp', + ]) + + def test_003_list_raw_reload(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Reload', + None, None)] = b'0\0' + with qubesmgmt.tests.tools.StdoutBuffer() as stdout: + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', '--raw', '--reload', 'list'], + app=self.app) + self.assertEqual( + [l.strip() for l in stdout.getvalue().splitlines()], + ['action=accept dsthost=qubes-os.org', + 'action=drop proto=icmp', + ]) + + def test_010_add_after(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Set', None, + b'action=accept dsthost=qubes-os.org\n' + b'action=drop proto=icmp\n' + b'action=accept dst4=192.168.0.0/24 comment=Allow LAN\n')] = \ + b'0\0' + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', 'add', 'accept', '192.168.0.0/24', 'comment=Allow LAN'], + app=self.app + ) + + def test_011_add_before(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Set', None, + b'action=accept dsthost=qubes-os.org\n' + b'action=accept dst4=192.168.0.0/24 comment=Allow LAN\n' + b'action=drop proto=icmp\n')] = b'0\0' + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', 'add', '--before', '1', 'accept', '192.168.0.0/24', + 'comment=Allow LAN'], + app=self.app + ) + + def test_020_del_number(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Set', None, + b'action=accept dsthost=qubes-os.org\n')] = b'0\0' + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', 'del', '--rule-no', '1'], + app=self.app + ) + + def test_021_del_rule(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get', + None, None)] = \ + b'0\0action=accept dsthost=qubes-os.org\n' \ + b'action=drop proto=icmp\n' + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Set', None, + b'action=accept dsthost=qubes-os.org\n')] = b'0\0' + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', 'del', 'drop', 'proto=icmp'], + app=self.app + ) + + def test_030_policy_get(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.GetPolicy', + None, None)] = b'0\0accept' + with qubesmgmt.tests.tools.StdoutBuffer() as stdout: + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', 'policy'], + app=self.app + ) + self.assertEqual(stdout.getvalue(), 'accept\n') + + def test_031_policy_set(self): + self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.SetPolicy', + None, b'accept')] = b'0\0' + with qubesmgmt.tests.tools.StdoutBuffer() as stdout: + qubesmgmt.tools.qvm_firewall.main( + ['test-vm', 'policy', 'accept'], + app=self.app + ) + self.assertEqual(stdout.getvalue(), '') + diff --git a/qubesmgmt/tools/qvm_firewall.py b/qubesmgmt/tools/qvm_firewall.py new file mode 100644 index 0000000..73d7990 --- /dev/null +++ b/qubesmgmt/tools/qvm_firewall.py @@ -0,0 +1,186 @@ +# encoding=utf-8 +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2016 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +'''qvm-firewall tool''' + +import argparse +import sys +import itertools + +import qubesmgmt.exc +import qubesmgmt.firewall +import qubesmgmt.tools + + +class RuleAction(argparse.Action): + # pylint: disable=too-few-public-methods + '''Parser action for a single firewall rule. It accept syntax: + - [ [ [|]]] + - action= [specialtarget=dns] [dsthost=] + [proto=] [dstports=] [icmptype=] + + Or a mix of them. + ''' + def __call__(self, _parser, namespace, values, option_string=None): + if not values: + setattr(namespace, self.dest, None) + return + assumed_order = ['action', 'dsthost', 'proto', 'dstports', 'icmptype'] + allowed_opts = assumed_order + ['specialtarget', 'comment'] + kwargs = {} + for opt in values: + opt_elements = opt.split('=') + if len(opt_elements) == 2: + key, value = opt_elements + elif len(opt_elements) == 1: + key, value = assumed_order[0], opt + else: + raise argparse.ArgumentError(None, + 'invalid rule description: {}'.format(opt)) + if key not in allowed_opts: + raise argparse.ArgumentError(None, + 'Invalid rule element: {}'.format(opt)) + kwargs[key] = value + if key in assumed_order: + assumed_order.remove(key) + if key == 'proto' and value in ['tcp', 'udp']: + assumed_order.remove('icmptype') + elif key == 'proto' and value in ['icmp']: + assumed_order.remove('dstports') + rule = qubesmgmt.firewall.Rule(None, **kwargs) + setattr(namespace, self.dest, rule) + +parser = qubesmgmt.tools.QubesArgumentParser(vmname_nargs=1) + +action = parser.add_subparsers(dest='command', help='action to perform') + +action_add = action.add_parser('add', help='add rule') +action_add.add_argument('--before', type=int, default=None, + help='Add rule before rule with given number, instead of at the end') +action_add.add_argument('rule', nargs='+', action=RuleAction, + help='rule description') + +action_del = action.add_parser('del', help='remove rule') +action_del.add_argument('--rule-no', dest='rule_no', type=int, + action='store', help='rule number') +action_del.add_argument('rule', nargs='*', action=RuleAction, + help='rule to be removed') + +action_list = action.add_parser('list', help='list rules') + +action_policy = action.add_parser('policy', + help='get/set policy - default action') +action_policy.add_argument('policy', choices=['accept', 'drop'], + help='policy value', default=None, nargs='?') + +parser.add_argument('--reload', '-r', action='store_true', + help='force reloading rules even when unchanged') + +parser.add_argument('--raw', action='store_true', + help='output rules as raw strings, instead of nice table') + + +def rules_list_table(vm): + '''Print rules to stdout in human-readable form (table) + + :param vm: VM object + :return: None + ''' + header = ['NO', 'ACTION', 'HOST', 'PROTOCOL', 'PORT(S)', + 'SPECIAL TARGET', 'ICMP TYPE', 'COMMENT'] + rows = [] + for (rule, rule_no) in zip(vm.firewall.rules, itertools.count()): + row = [str(x) if x is not None else '-' for x in [ + rule_no, + rule.action, + rule.dsthost, + rule.proto, + rule.dstports, + rule.specialtarget, + rule.icmptype, + rule.comment, + ]] + rows.append(row) + qubesmgmt.tools.print_table([header] + rows) + + +def rules_list_raw(vm): + '''Print rules in machine-readable form (as specified in Admin API) + + :param vm: VM object + :return: None + ''' + for rule in vm.firewall.rules: + sys.stdout.write(rule.rule + '\n') + + +def rules_add(vm, args): + '''Add a rule defined by args.rule''' + if args.before is not None: + vm.firewall.rules.insert(args.before, args.rule) + else: + vm.firewall.rules.append(args.rule) + vm.firewall.save_rules() + + +def rules_del(vm, args): + '''Delete a rule according to args.rule/args.rule_no''' + if args.rule_no is not None: + vm.firewall.rules.pop(args.rule_no) + else: + vm.firewall.rules.remove(args.rule) + vm.firewall.save_rules() + + +def policy(vm, args): + '''Get/Set default action (policy)''' + if args.policy is not None: + vm.firewall.policy = args.policy + else: + print(vm.firewall.policy) + + +def main(args=None, app=None): + '''Main routine of :program:`qvm-firewall`.''' + try: + args = parser.parse_args(args, app=app) + vm = args.domains[0] + if args.command == 'add': + rules_add(vm, args) + elif args.command == 'del': + rules_del(vm, args) + elif args.command == 'policy': + policy(vm, args) + elif args.command == 'list': + if args.raw: + rules_list_raw(vm) + else: + rules_list_table(vm) + if args.reload: + vm.firewall.reload() + except qubesmgmt.exc.QubesException as e: + parser.print_error(str(e)) + return 1 + return 0 + + +if __name__ == '__main__': + sys.exit(main())