Merge remote-tracking branch 'qubesos/pr/87'
* qubesos/pr/87: tests: check if qubes-firewall-user-script is called qubes-firewall: call firewall-user-script at service startup
This commit is contained in:
commit
06f0d865b4
@ -62,6 +62,13 @@ class FirewallWorker(object):
|
|||||||
'''Apply rules in given source address'''
|
'''Apply rules in given source address'''
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def run_user_script(self):
|
||||||
|
'''Run user script in /rw/config'''
|
||||||
|
user_script_path = '/rw/config/qubes-firewall-user-script'
|
||||||
|
if os.path.isfile(user_script_path) and \
|
||||||
|
os.access(user_script_path, os.X_OK):
|
||||||
|
subprocess.call([user_script_path])
|
||||||
|
|
||||||
def read_rules(self, target):
|
def read_rules(self, target):
|
||||||
'''Read rules from QubesDB and return them as a list of dicts'''
|
'''Read rules from QubesDB and return them as a list of dicts'''
|
||||||
entries = self.qdb.multiread('/qubes-firewall/{}/'.format(target))
|
entries = self.qdb.multiread('/qubes-firewall/{}/'.format(target))
|
||||||
@ -133,6 +140,7 @@ class FirewallWorker(object):
|
|||||||
def main(self):
|
def main(self):
|
||||||
self.terminate_requested = False
|
self.terminate_requested = False
|
||||||
self.init()
|
self.init()
|
||||||
|
self.run_user_script()
|
||||||
# initial load
|
# initial load
|
||||||
for source_addr in self.list_targets():
|
for source_addr in self.list_targets():
|
||||||
self.handle_addr(source_addr)
|
self.handle_addr(source_addr)
|
||||||
|
@ -63,6 +63,7 @@ class FirewallWorker(qubesagent.firewall.FirewallWorker):
|
|||||||
|
|
||||||
self.init_called = False
|
self.init_called = False
|
||||||
self.cleanup_called = False
|
self.cleanup_called = False
|
||||||
|
self.user_script_called = False
|
||||||
self.rules = {}
|
self.rules = {}
|
||||||
|
|
||||||
def apply_rules(self, source_addr, rules):
|
def apply_rules(self, source_addr, rules):
|
||||||
@ -74,6 +75,9 @@ class FirewallWorker(qubesagent.firewall.FirewallWorker):
|
|||||||
def init(self):
|
def init(self):
|
||||||
self.cleanup_called = True
|
self.cleanup_called = True
|
||||||
|
|
||||||
|
def run_user_script(self):
|
||||||
|
self.user_script_called = True
|
||||||
|
|
||||||
|
|
||||||
class IptablesWorker(qubesagent.firewall.IptablesWorker):
|
class IptablesWorker(qubesagent.firewall.IptablesWorker):
|
||||||
'''Override methods actually modifying system state to only log what
|
'''Override methods actually modifying system state to only log what
|
||||||
@ -522,10 +526,31 @@ class TestFirewallWorker(TestCase):
|
|||||||
self.obj.handle_addr('10.137.0.4')
|
self.obj.handle_addr('10.137.0.4')
|
||||||
self.assertEqual(self.obj.rules['10.137.0.4'], [{'action': 'drop'}])
|
self.assertEqual(self.obj.rules['10.137.0.4'], [{'action': 'drop'}])
|
||||||
|
|
||||||
|
@patch('os.path.isfile')
|
||||||
|
@patch('os.access')
|
||||||
|
@patch('subprocess.call')
|
||||||
|
def test_run_user_script(self, mock_subprocess, mock_os_access,
|
||||||
|
mock_os_path_isfile):
|
||||||
|
mock_os_path_isfile.return_value = False
|
||||||
|
mock_os_access.return_value = False
|
||||||
|
super(FirewallWorker, self.obj).run_user_script()
|
||||||
|
self.assertFalse(mock_subprocess.called)
|
||||||
|
|
||||||
|
mock_os_path_isfile.return_value = True
|
||||||
|
mock_os_access.return_value = False
|
||||||
|
super(FirewallWorker, self.obj).run_user_script()
|
||||||
|
self.assertFalse(mock_subprocess.called)
|
||||||
|
|
||||||
|
mock_os_path_isfile.return_value = True
|
||||||
|
mock_os_access.return_value = True
|
||||||
|
super(FirewallWorker, self.obj).run_user_script()
|
||||||
|
mock_subprocess.assert_called_once_with(
|
||||||
|
['/rw/config/qubes-firewall-user-script'])
|
||||||
|
|
||||||
def test_main(self):
|
def test_main(self):
|
||||||
self.obj.main()
|
self.obj.main()
|
||||||
self.assertTrue(self.obj.init_called)
|
self.assertTrue(self.obj.init_called)
|
||||||
self.assertTrue(self.obj.cleanup_called)
|
self.assertTrue(self.obj.cleanup_called)
|
||||||
|
self.assertTrue(self.obj.user_script_called)
|
||||||
self.assertEqual(set(self.obj.rules.keys()), self.obj.list_targets())
|
self.assertEqual(set(self.obj.rules.keys()), self.obj.list_targets())
|
||||||
# rules content were already tested
|
# rules content were already tested
|
||||||
|
Loading…
Reference in New Issue
Block a user