From 564408eb3f5dc6ead8f86c5def7ea8a7cddff3e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 20 Mar 2017 22:54:03 +0100 Subject: [PATCH] tests: qubespolicy tests Fixes QubesOS/qubes-issues#2460 --- qubes/tests/__init__.py | 1 + qubespolicy/tests/__init__.py | 765 ++++++++++++++++++++++++++++++++++ 2 files changed, 766 insertions(+) create mode 100644 qubespolicy/tests/__init__.py diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 7b0b2684..2e46db02 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -909,6 +909,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.tools.qvm_device', 'qubes.tests.tools.qvm_firewall', 'qubes.tests.tools.qvm_ls', + 'qubespolicy.tests', ): tests.addTests(loader.loadTestsFromName(modname)) diff --git a/qubespolicy/tests/__init__.py b/qubespolicy/tests/__init__.py new file mode 100644 index 00000000..9cca2726 --- /dev/null +++ b/qubespolicy/tests/__init__.py @@ -0,0 +1,765 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . +import os +import socket +import unittest.mock + +import shutil + +import qubes.tests +import qubespolicy + +tmp_policy_dir = '/tmp/policy' + +system_info = { + 'domains': { + 'dom0': { + 'tags': [], + 'type': 'AdminVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-vm1': { + 'tags': ['tag1', 'tag2'], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-vm2': { + 'tags': ['tag2'], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-vm3': { + 'tags': [], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': True, + }, + 'default-dvm': { + 'tags': [], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': True, + }, + 'test-invalid-dvm': { + 'tags': ['tag1', 'tag2'], + 'type': 'AppVM', + 'default_dispvm': 'test-vm1', + 'dispvm_allowed': False, + }, + 'test-no-dvm': { + 'tags': ['tag1', 'tag2'], + 'type': 'AppVM', + 'default_dispvm': None, + 'dispvm_allowed': False, + }, + 'test-template': { + 'tags': ['tag1', 'tag2'], + 'type': 'TemplateVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-standalone': { + 'tags': ['tag1', 'tag2'], + 'type': 'StandaloneVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + } +} + + +class TC_00_PolicyRule(qubes.tests.QubesTestCase): + def test_000_verify_target_value(self): + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'test-vm1')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'default-dvm')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, '$dispvm')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'test-template')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'test-standalone')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, 'no-such-vm')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, + '$dispvm:test-invalid-dvm')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$default')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$anyvm')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$tag:tag1')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$invalid')) + + def test_010_verify_special_value(self): + self.assertTrue(qubespolicy.verify_special_value('$tag:tag', + for_target=False)) + self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag', + for_target=False)) + self.assertTrue(qubespolicy.verify_special_value('$type:AppVM', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$default', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$dispvm', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$invalid', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('vm-name', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$tag:', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$type:', + for_target=False)) + + def test_020_line_simple(self): + line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.ask) + self.assertEqual(line.source, '$anyvm') + self.assertEqual(line.target, '$anyvm') + self.assertEqual(line.full_action, 'ask') + self.assertIsNone(line.override_target) + self.assertIsNone(line.override_user) + self.assertIsNone(line.default_target) + + def test_021_line_simple(self): + line = qubespolicy.PolicyRule( + '$tag:tag1 $type:AppVM ask,target=test-vm2,user=user', + 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.ask) + self.assertEqual(line.source, '$tag:tag1') + self.assertEqual(line.target, '$type:AppVM') + self.assertEqual(line.full_action, 'ask,target=test-vm2,user=user') + self.assertEqual(line.override_target, 'test-vm2') + self.assertEqual(line.override_user, 'user') + self.assertIsNone(line.default_target) + + def test_022_line_simple(self): + line = qubespolicy.PolicyRule( + '$anyvm $default allow,target=$dispvm:test-vm2', + 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.allow) + self.assertEqual(line.source, '$anyvm') + self.assertEqual(line.target, '$default') + self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2') + self.assertEqual(line.override_target, '$dispvm:test-vm2') + self.assertIsNone(line.override_user) + self.assertIsNone(line.default_target) + + def test_023_line_simple(self): + line = qubespolicy.PolicyRule( + '$anyvm $default ask,default_target=test-vm1', + 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.ask) + self.assertEqual(line.source, '$anyvm') + self.assertEqual(line.target, '$default') + self.assertEqual(line.full_action, 'ask,default_target=test-vm1') + self.assertIsNone(line.override_target) + self.assertIsNone(line.override_user) + self.assertEqual(line.default_target, 'test-vm1') + + def test_030_line_invalid(self): + invalid_lines = [ + '$dispvm $default allow', # $dispvm can't be a source + '$default $default allow', # $default can't be a source + '$anyvm $default deny,target=test-vm1', # target= used with deny + '$anyvm $anyvm deny,default_target=test-vm1', # default_target= + # with deny + '$anyvm $anyvm deny,user=user', # user= with deny + '$anyvm $anyvm invalid', # invalid action + '$anyvm $anyvm allow,invalid=xx', # invalid option + '$anyvm $anyvm', # missing action + '$anyvm $anyvm allow,default_target=test-vm1', # default_target= + # with allow + '$invalid $anyvm allow', # invalid source + '$anyvm $invalid deny', # invalid target + '', # empty line + '$anyvm $anyvm allow extra', # trailing words + '$anyvm $default allow', # $default allow without target= + ] + for line in invalid_lines: + with self.subTest(line): + with self.assertRaises(qubespolicy.PolicySyntaxError): + qubespolicy.PolicyRule(line, 'filename', 12) + + def test_040_match_single(self): + is_match_single = qubespolicy.PolicyRule.is_match_single + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$anyvm', '$default')) + self.assertTrue(is_match_single(system_info, '$anyvm', '')) + self.assertTrue(is_match_single(system_info, '$default', '')) + self.assertTrue(is_match_single(system_info, '$default', '$default')) + self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1')) + self.assertTrue(is_match_single(system_info, + '$type:TemplateVM', 'test-template')) + self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm')) + self.assertTrue(is_match_single(system_info, + '$anyvm', '$dispvm:default-dvm')) + self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm')) + self.assertTrue(is_match_single(system_info, 'dom0', 'dom0')) + self.assertTrue(is_match_single(system_info, + '$dispvm:default-dvm', '$dispvm:default-dvm')) + self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm')) + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + + self.assertFalse(is_match_single(system_info, '$default', 'test-vm1')) + self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm')) + # test-vm1.dispvm_allowed=False + self.assertFalse(is_match_single(system_info, + '$anyvm', '$dispvm:test-vm1')) + # test-vm1.dispvm_allowed=False + self.assertFalse(is_match_single(system_info, + '$dispvm:test-vm1', '$dispvm:test-vm1')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0')) + self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0')) + self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1')) + self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM')) + self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid')) + self.assertFalse(is_match_single(system_info, '$invalid', '$invalid')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm')) + self.assertFalse(is_match_single(system_info, + 'no-such-vm', 'no-such-vm')) + self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1')) + self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm')) + self.assertFalse(is_match_single(system_info, + '$dispvm:default-dvm', 'default-dvm')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1 ')) + + def test_050_match(self): + line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2')) + line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2')) + line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm')) + + def test_060_expand_target(self): + lines = { + '$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3', + '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'], + '$anyvm $dispvm allow': ['$dispvm'], + '$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'], + # no DispVM from test-vm1 allowed + '$anyvm $dispvm:test-vm1 allow': [], + '$anyvm test-vm1 allow': ['test-vm1'], + '$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3', + 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'], + '$anyvm $type:TemplateVM allow': ['test-template'], + '$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm', + 'test-template', 'test-standalone', 'test-no-dvm'], + '$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2', + 'test-invalid-dvm', 'test-template', 'test-standalone', + 'test-no-dvm'], + '$anyvm $tag:no-such-tag allow': [], + } + for line in lines: + with self.subTest(line): + policy_line = qubespolicy.PolicyRule(line) + self.assertCountEqual(list(policy_line.expand_target(system_info)), + lines[line]) + + def test_070_expand_override_target(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=test-vm2') + self.assertEqual( + line.expand_override_target(system_info, 'test-vm1'), + 'test-vm2') + + def test_071_expand_override_target_dispvm(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=$dispvm') + self.assertEqual( + line.expand_override_target(system_info, 'test-vm1'), + '$dispvm:default-dvm') + + def test_072_expand_override_target_dispvm_specific(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=$dispvm:test-vm3') + self.assertEqual( + line.expand_override_target(system_info, 'test-vm1'), + '$dispvm:test-vm3') + + def test_073_expand_override_target_dispvm_none(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=$dispvm') + self.assertEqual( + line.expand_override_target(system_info, 'test-no-dvm'), + None) + + def test_074_expand_override_target_dom0(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=dom0') + self.assertEqual( + line.expand_override_target(system_info, 'test-no-dvm'), + 'dom0') + + +class TC_10_PolicyAction(qubes.tests.QubesTestCase): + def test_000_init(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm deny') + with self.assertRaises(qubespolicy.AccessDenied): + qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', + rule, 'test-vm2') + + def test_001_init(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + self.assertEqual(action.service, 'test.service') + self.assertEqual(action.source, 'test-vm1') + self.assertIsNone(action.target) + self.assertEqual(action.original_target, 'test-vm2') + self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3']) + self.assertEqual(action.rule, rule) + self.assertEqual(action.action, qubespolicy.Action.ask) + + def test_002_init_invalid(self): + rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow') + with self.assertRaises(AssertionError): + qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule_allow, 'test-vm2', None) + with self.assertRaises(AssertionError): + qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3']) + + with self.assertRaises(AssertionError): + qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule_ask, 'test-vm2', None) + + def test_003_init_default_target(self): + rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask') + + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm1', rule_ask, 'test-vm2', ['test-vm2']) + self.assertIsNone(action.target) + + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule_ask, 'test-vm2', ['test-vm2']) + self.assertEqual(action.target, 'test-vm2') + + def test_010_handle_user_response(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + action.handle_user_response(True, 'test-vm2') + self.assertEqual(action.action, qubespolicy.Action.allow) + self.assertEqual(action.target, 'test-vm2') + + def test_011_handle_user_response(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + with self.assertRaises(AssertionError): + action.handle_user_response(True, 'test-no-dvm') + + def test_012_handle_user_response(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + with self.assertRaises(qubespolicy.AccessDenied): + action.handle_user_response(False, None) + self.assertEqual(action.action, qubespolicy.Action.deny) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_020_execute(self, mock_subprocess, mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule, 'test-vm2') + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2', + '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm dom0 allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'dom0', rule, 'dom0') + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('dom0', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0', + '-c', 'some-ident', + qubespolicy.QUBES_RPC_MULTIPLEXER_PATH + + ' test.service test-vm1 dom0'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + '$dispvm:default-dvm', rule, '$dispvm:default-dvm') + mock_qubesd_call.side_effect = (lambda target, call: + b'dispvm-name' if call == 'mgmtinternal.vm.Create.DispVM' else + unittest.mock.DEFAULT) + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('default-dvm', 'mgmtinternal.vm.Create.DispVM'), + unittest.mock.call('dispvm-name', 'mgmtinternal.vm.Start'), + unittest.mock.call('dispvm-name', + 'mgmtinternal.vm.CleanupDispVM')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name', + '-c', 'some-ident', '-W', + 'DEFAULT:QUBESRPC test.service test-vm1'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_023_execute_already_running(self, mock_subprocess, + mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule, 'test-vm2') + mock_qubesd_call.side_effect = \ + qubespolicy.QubesMgmtException('QubesVMNotHaltedError') + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2', + '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_024_execute_startup_error(self, mock_subprocess, + mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule, 'test-vm2') + mock_qubesd_call.side_effect = \ + qubespolicy.QubesMgmtException('QubesVMError') + with self.assertRaises(qubespolicy.QubesMgmtException): + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, []) + +@unittest.mock.patch('qubespolicy.POLICY_DIR', tmp_policy_dir) +class TC_20_Policy(qubes.tests.QubesTestCase): + + def setUp(self): + super(TC_20_Policy, self).setUp() + if not os.path.exists(tmp_policy_dir): + os.mkdir(tmp_policy_dir) + + def tearDown(self): + shutil.rmtree(tmp_policy_dir) + super(TC_20_Policy, self).tearDown() + + def test_000_load(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('\n') + f.write('# comment\n') + f.write('test-vm2 test-vm3 ask\n') + f.write(' # comment \n') + f.write('$anyvm $anyvm ask\n') + policy = qubespolicy.Policy('test.service') + self.assertEqual(policy.service, 'test.service') + self.assertEqual(len(policy.policy_rules), 3) + self.assertEqual(policy.policy_rules[0].source, 'test-vm1') + self.assertEqual(policy.policy_rules[0].target, 'test-vm2') + self.assertEqual(policy.policy_rules[0].action, + qubespolicy.Action.allow) + + def test_001_not_existent(self): + with self.assertRaises(qubespolicy.AccessDenied): + qubespolicy.Policy('no-such.service') + + def test_002_include(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('$include:test.service2\n') + f.write('$anyvm $anyvm deny\n') + with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f: + f.write('test-vm3 $default allow,target=test-vm2\n') + policy = qubespolicy.Policy('test.service') + self.assertEqual(policy.service, 'test.service') + self.assertEqual(len(policy.policy_rules), 3) + self.assertEqual(policy.policy_rules[0].source, 'test-vm1') + self.assertEqual(policy.policy_rules[0].target, 'test-vm2') + self.assertEqual(policy.policy_rules[0].action, + qubespolicy.Action.allow) + self.assertEqual(policy.policy_rules[0].filename, + tmp_policy_dir + '/test.service') + self.assertEqual(policy.policy_rules[0].lineno, 1) + self.assertEqual(policy.policy_rules[1].source, 'test-vm3') + self.assertEqual(policy.policy_rules[1].target, '$default') + self.assertEqual(policy.policy_rules[1].action, + qubespolicy.Action.allow) + self.assertEqual(policy.policy_rules[1].filename, + tmp_policy_dir + '/test.service2') + self.assertEqual(policy.policy_rules[1].lineno, 1) + self.assertEqual(policy.policy_rules[2].source, '$anyvm') + self.assertEqual(policy.policy_rules[2].target, '$anyvm') + self.assertEqual(policy.policy_rules[2].action, + qubespolicy.Action.deny) + self.assertEqual(policy.policy_rules[2].filename, + tmp_policy_dir + '/test.service') + self.assertEqual(policy.policy_rules[2].lineno, 3) + + def test_010_find_rule(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $anyvm ask\n') + f.write('test-vm2 $tag:tag1 deny\n') + f.write('test-vm2 $tag:tag2 allow\n') + f.write('$type:AppVM $default allow,target=test-vm3\n') + f.write('$tag:tag1 $type:AppVM allow\n') + policy = qubespolicy.Policy('test.service') + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2]) + # $anyvm matches $default too + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm1', ''), policy.policy_rules[1]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', ''), policy.policy_rules[4]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', '$default'), policy.policy_rules[4]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[5]) + with self.assertRaises(qubespolicy.AccessDenied): + policy.find_matching_rule( + system_info, 'test-no-dvm', 'test-standalone') + with self.assertRaises(qubespolicy.AccessDenied): + policy.find_matching_rule( + system_info, 'test-standalone', '$default') + + def test_020_collect_targets_for_ask(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $anyvm ask\n') + f.write('test-vm2 $tag:tag1 deny\n') + f.write('test-vm2 $tag:tag2 allow\n') + f.write('test-no-dvm $type:AppVM deny\n') + f.write('$type:AppVM $default allow,target=test-vm3\n') + f.write('$tag:tag1 $type:AppVM allow\n') + f.write('test-no-dvm $dispvm allow\n') + f.write('test-standalone $dispvm allow\n') + policy = qubespolicy.Policy('test.service') + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3', + '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-vm2'), ['test-vm2', 'test-vm3']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-vm3'), ['test-vm3']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3', + 'default-dvm', 'test-no-dvm', 'test-invalid-dvm', + '$dispvm:default-dvm']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-no-dvm'), []) + + def test_030_eval_simple(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-vm1', 'test-vm2') + self.assertEqual(action.rule, policy.policy_rules[0]) + self.assertEqual(action.action, qubespolicy.Action.allow) + self.assertEqual(action.target, 'test-vm2') + self.assertEqual(action.original_target, 'test-vm2') + self.assertEqual(action.service, 'test.service') + self.assertIsNone(action.targets_for_ask) + with self.assertRaises(qubespolicy.AccessDenied): + policy.evaluate(system_info, 'test-vm2', '$default') + + def test_031_eval_default(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $default allow,target=test-vm2\n') + f.write('$tag:tag1 test-vm2 ask\n') + f.write('$tag:tag2 $anyvm allow\n') + f.write('test-vm3 $anyvm deny\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-vm1', '$default') + self.assertEqual(action.rule, policy.policy_rules[1]) + self.assertEqual(action.action, qubespolicy.Action.allow) + self.assertEqual(action.target, 'test-vm2') + self.assertEqual(action.original_target, '$default') + self.assertEqual(action.service, 'test.service') + self.assertIsNone(action.targets_for_ask) + with self.assertRaises(qubespolicy.AccessDenied): + # action allow should hit, but no target specified (either by + # caller or policy) + policy.evaluate(system_info, 'test-standalone', '$default') + + def test_032_eval_ask(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $default allow,target=test-vm2\n') + f.write('$tag:tag1 test-vm2 ask\n') + f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n') + f.write('$tag:tag2 $anyvm allow\n') + f.write('test-vm3 $anyvm deny\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-standalone', 'test-vm2') + self.assertEqual(action.rule, policy.policy_rules[2]) + self.assertEqual(action.action, qubespolicy.Action.ask) + self.assertIsNone(action.target) + self.assertEqual(action.original_target, 'test-vm2') + self.assertEqual(action.service, 'test.service') + self.assertCountEqual(action.targets_for_ask, + ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone']) + + def test_033_eval_ask(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $default allow,target=test-vm2\n') + f.write('$tag:tag1 test-vm2 ask\n') + f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n') + f.write('$tag:tag2 $anyvm allow\n') + f.write('test-vm3 $anyvm deny\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-standalone', 'test-vm3') + self.assertEqual(action.rule, policy.policy_rules[3]) + self.assertEqual(action.action, qubespolicy.Action.ask) + self.assertEqual(action.target, 'test-vm3') + self.assertEqual(action.original_target, 'test-vm3') + self.assertEqual(action.service, 'test.service') + self.assertCountEqual(action.targets_for_ask, + ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone']) + + +class TC_30_Misc(qubes.tests.QubesTestCase): + @unittest.mock.patch('socket.socket') + def test_000_qubesd_call(self, mock_socket): + mock_config = { + 'return_value.makefile.return_value.read.return_value': b'0\x00data' + } + mock_socket.configure_mock(**mock_config) + result = qubespolicy.qubesd_call('test', 'method') + self.assertEqual(result, b'data') + self.assertEqual(mock_socket.mock_calls, [ + unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), + unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), + unittest.mock.call().sendall(b'dom0'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'method'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'test'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().shutdown(socket.SHUT_WR), + unittest.mock.call().makefile('rb'), + unittest.mock.call().makefile().read(), + ]) + + @unittest.mock.patch('socket.socket') + def test_001_qubesd_call_arg_payload(self, mock_socket): + mock_config = { + 'return_value.makefile.return_value.read.return_value': b'0\x00data' + } + mock_socket.configure_mock(**mock_config) + result = qubespolicy.qubesd_call('test', 'method', 'arg', b'payload') + self.assertEqual(result, b'data') + self.assertEqual(mock_socket.mock_calls, [ + unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), + unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), + unittest.mock.call().sendall(b'dom0'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'method'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'test'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'arg'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'payload'), + unittest.mock.call().shutdown(socket.SHUT_WR), + unittest.mock.call().makefile('rb'), + unittest.mock.call().makefile().read(), + ]) + + @unittest.mock.patch('socket.socket') + def test_002_qubesd_call_exception(self, mock_socket): + mock_config = { + 'return_value.makefile.return_value.read.return_value': + b'2\x00SomeError\x00traceback\x00message\x00' + } + mock_socket.configure_mock(**mock_config) + with self.assertRaises(qubespolicy.QubesMgmtException) as e: + qubespolicy.qubesd_call('test', 'method') + self.assertEqual(e.exception.exc_type, 'SomeError') + self.assertEqual(mock_socket.mock_calls, [ + unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), + unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), + unittest.mock.call().sendall(b'dom0'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'method'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'test'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().shutdown(socket.SHUT_WR), + unittest.mock.call().makefile('rb'), + unittest.mock.call().makefile().read(), + ]) +