From 8afb425271e3b932d9f509787fb3e5c0a857c953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Tue, 27 Jun 2017 05:38:54 +0200 Subject: [PATCH] qubespolicy: allow non-default policy directory This will allow to evaluate policy extracted from other system. And also ease tests. QubesOS/qubes-issues#2873 --- qubespolicy/__init__.py | 11 +++++++---- qubespolicy/tests/__init__.py | 19 +++++++++---------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/qubespolicy/__init__.py b/qubespolicy/__init__.py index acba1dd6..7aaabc22 100755 --- a/qubespolicy/__init__.py +++ b/qubespolicy/__init__.py @@ -479,11 +479,14 @@ class Policy(object): ''' - def __init__(self, service): - policy_file = os.path.join(POLICY_DIR, service) + def __init__(self, service, policy_dir=POLICY_DIR): + policy_file = os.path.join(policy_dir, service) if not os.path.exists(policy_file): # fallback to policy without specific argument set (if any) - policy_file = os.path.join(POLICY_DIR, service.split('+')[0]) + policy_file = os.path.join(policy_dir, service.split('+')[0]) + + #: policy storage directory + self.policy_dir = policy_dir #: service name self.service = service @@ -515,7 +518,7 @@ class Policy(object): include_path = line.split(':', 1)[1] # os.path.join will leave include_path unchanged if it's # already absolute - include_path = os.path.join(POLICY_DIR, include_path) + include_path = os.path.join(self.policy_dir, include_path) self.load_policy_file(include_path) else: self.policy_rules.append(PolicyRule(line, path, lineno)) diff --git a/qubespolicy/tests/__init__.py b/qubespolicy/tests/__init__.py index fe5a42c0..6c3ee4a7 100644 --- a/qubespolicy/tests/__init__.py +++ b/qubespolicy/tests/__init__.py @@ -523,7 +523,6 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): [unittest.mock.call('test-vm2', 'internal.vm.Start')]) self.assertEqual(mock_subprocess.mock_calls, []) -@unittest.mock.patch('qubespolicy.POLICY_DIR', tmp_policy_dir) class TC_20_Policy(qubes.tests.QubesTestCase): def setUp(self): @@ -543,7 +542,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('test-vm2 test-vm3 ask\n') f.write(' # comment \n') f.write('$anyvm $anyvm ask\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertEqual(policy.service, 'test.service') self.assertEqual(len(policy.policy_rules), 3) self.assertEqual(policy.policy_rules[0].source, 'test-vm1') @@ -553,7 +552,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): def test_001_not_existent(self): with self.assertRaises(qubespolicy.AccessDenied): - qubespolicy.Policy('no-such.service') + qubespolicy.Policy('no-such.service', tmp_policy_dir) def test_002_include(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: @@ -562,7 +561,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('$anyvm $anyvm deny\n') with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f: f.write('test-vm3 $default allow,target=test-vm2\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertEqual(policy.service, 'test.service') self.assertEqual(len(policy.policy_rules), 3) self.assertEqual(policy.policy_rules[0].source, 'test-vm1') @@ -595,7 +594,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('test-vm2 $tag:tag2 allow\n') f.write('$type:AppVM $default allow,target=test-vm3\n') f.write('$tag:tag1 $type:AppVM allow\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertEqual(policy.find_matching_rule( system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0]) self.assertEqual(policy.find_matching_rule( @@ -631,7 +630,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('$tag:tag1 $type:AppVM allow\n') f.write('test-no-dvm $dispvm allow\n') f.write('test-standalone $dispvm allow\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertCountEqual(policy.collect_targets_for_ask(system_info, 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', @@ -652,7 +651,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) action = policy.evaluate(system_info, 'test-vm1', 'test-vm2') self.assertEqual(action.rule, policy.policy_rules[0]) self.assertEqual(action.action, qubespolicy.Action.allow) @@ -671,7 +670,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('$tag:tag2 $anyvm allow\n') f.write('test-vm3 $anyvm deny\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) action = policy.evaluate(system_info, 'test-vm1', '$default') self.assertEqual(action.rule, policy.policy_rules[1]) self.assertEqual(action.action, qubespolicy.Action.allow) @@ -693,7 +692,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('$tag:tag2 $anyvm allow\n') f.write('test-vm3 $anyvm deny\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) action = policy.evaluate(system_info, 'test-standalone', 'test-vm2') self.assertEqual(action.rule, policy.policy_rules[2]) self.assertEqual(action.action, qubespolicy.Action.ask) @@ -714,7 +713,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('$tag:tag2 $anyvm allow\n') f.write('test-vm3 $anyvm deny\n') - policy = qubespolicy.Policy('test.service') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) action = policy.evaluate(system_info, 'test-standalone', 'test-vm3') self.assertEqual(action.rule, policy.policy_rules[3]) self.assertEqual(action.action, qubespolicy.Action.ask)