From 6c33652ed45a6abac14ed3b34100dc54c3f5cf9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 5 Feb 2018 16:27:52 +0100 Subject: [PATCH 1/2] qubes-firewall: call firewall-user-script at service startup Call it just after creating base chains in iptables/nftables. This allow the user to modify how those rules are plugged in, add custom rules at beginning/end etc. Fixes QubesOS/qubes-issues#3260 --- qubesagent/firewall.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/qubesagent/firewall.py b/qubesagent/firewall.py index 7e36f7f..6145804 100755 --- a/qubesagent/firewall.py +++ b/qubesagent/firewall.py @@ -62,6 +62,13 @@ class FirewallWorker(object): '''Apply rules in given source address''' raise NotImplementedError + def run_user_script(self): + '''Run user script in /rw/config''' + user_script_path = '/rw/config/qubes-firewall-user-script' + if os.path.isfile(user_script_path) and \ + os.access(user_script_path, os.X_OK): + subprocess.call([user_script_path]) + def read_rules(self, target): '''Read rules from QubesDB and return them as a list of dicts''' entries = self.qdb.multiread('/qubes-firewall/{}/'.format(target)) @@ -133,6 +140,7 @@ class FirewallWorker(object): def main(self): self.terminate_requested = False self.init() + self.run_user_script() # initial load for source_addr in self.list_targets(): self.handle_addr(source_addr) From 6b48d79d8cca76528ddcf1ade3f84ef1370c9776 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 5 Feb 2018 18:17:29 +0100 Subject: [PATCH 2/2] tests: check if qubes-firewall-user-script is called QubesOS/qubes-issues#3260 --- qubesagent/test_firewall.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/qubesagent/test_firewall.py b/qubesagent/test_firewall.py index f122eb6..7270afd 100644 --- a/qubesagent/test_firewall.py +++ b/qubesagent/test_firewall.py @@ -63,6 +63,7 @@ class FirewallWorker(qubesagent.firewall.FirewallWorker): self.init_called = False self.cleanup_called = False + self.user_script_called = False self.rules = {} def apply_rules(self, source_addr, rules): @@ -74,6 +75,9 @@ class FirewallWorker(qubesagent.firewall.FirewallWorker): def init(self): self.cleanup_called = True + def run_user_script(self): + self.user_script_called = True + class IptablesWorker(qubesagent.firewall.IptablesWorker): '''Override methods actually modifying system state to only log what @@ -522,10 +526,31 @@ class TestFirewallWorker(TestCase): self.obj.handle_addr('10.137.0.4') self.assertEqual(self.obj.rules['10.137.0.4'], [{'action': 'drop'}]) + @patch('os.path.isfile') + @patch('os.access') + @patch('subprocess.call') + def test_run_user_script(self, mock_subprocess, mock_os_access, + mock_os_path_isfile): + mock_os_path_isfile.return_value = False + mock_os_access.return_value = False + super(FirewallWorker, self.obj).run_user_script() + self.assertFalse(mock_subprocess.called) + + mock_os_path_isfile.return_value = True + mock_os_access.return_value = False + super(FirewallWorker, self.obj).run_user_script() + self.assertFalse(mock_subprocess.called) + + mock_os_path_isfile.return_value = True + mock_os_access.return_value = True + super(FirewallWorker, self.obj).run_user_script() + mock_subprocess.assert_called_once_with( + ['/rw/config/qubes-firewall-user-script']) def test_main(self): self.obj.main() self.assertTrue(self.obj.init_called) self.assertTrue(self.obj.cleanup_called) + self.assertTrue(self.obj.user_script_called) self.assertEqual(set(self.obj.rules.keys()), self.obj.list_targets()) # rules content were already tested