diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 80b18eba..d41e2739 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -1010,6 +1010,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.api_admin', 'qubes.tests.api_misc', 'qubespolicy.tests', + 'qubespolicy.tests.cli', ): tests.addTests(loader.loadTestsFromName(modname)) diff --git a/qubespolicy/tests/cli.py b/qubespolicy/tests/cli.py new file mode 100644 index 00000000..a7387939 --- /dev/null +++ b/qubespolicy/tests/cli.py @@ -0,0 +1,343 @@ +# -*- encoding: utf-8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . +import os +import tempfile +import unittest.mock + +import shutil + +import qubes.tests +import qubespolicy +import qubespolicy.cli +import qubespolicy.tests + + + +class TC_00_qrexec_policy(qubes.tests.QubesTestCase): + def setUp(self): + super(TC_00_qrexec_policy, self).setUp() + self.policy_patch = unittest.mock.patch('qubespolicy.Policy') + self.policy_mock = self.policy_patch.start() + + self.system_info_patch = unittest.mock.patch( + 'qubespolicy.get_system_info') + self.system_info_mock = self.system_info_patch.start() + + self.system_info = { + 'domains': {'dom0': {'icon': 'black', 'dispvm_allowed': False}, + 'test-vm1': {'icon': 'red', 'dispvm_allowed': False}, + 'test-vm2': {'icon': 'red', 'dispvm_allowed': False}, + 'test-vm3': {'icon': 'green', 'dispvm_allowed': True}, }} + self.system_info_mock.return_value = self.system_info + + self.dbus_patch = unittest.mock.patch('pydbus.SystemBus') + self.dbus_mock = self.dbus_patch.start() + + self.policy_dir = tempfile.TemporaryDirectory() + self.policydir_patch = unittest.mock.patch('qubespolicy.POLICY_DIR', + self.policy_dir.name) + self.policydir_patch.start() + + def tearDown(self): + self.policydir_patch.stop() + self.policy_dir.cleanup() + self.dbus_patch.start() + self.system_info_patch.stop() + self.policy_patch.stop() + super(TC_00_qrexec_policy, self).tearDown() + + def test_000_allow(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.allow, + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 0) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ('().evaluate().target.__str__', (), {}), + ('().evaluate().execute', ('process_ident,source,source-id', ), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, []) + + def test_010_ask_allow(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.ask, + 'return_value.evaluate.return_value.target': + None, + 'return_value.evaluate.return_value.targets_for_ask': + ['test-vm1', 'test-vm2'], + }) + self.dbus_mock.configure_mock(**{ + 'return_value.get.return_value.Ask.return_value': 'test-vm1' + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 0) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ('().evaluate().handle_user_response', (True, 'test-vm1'), {}), + ('().evaluate().execute', ('process_ident,source,source-id', ), {}), + ]) + icons = { + 'dom0': 'black', + 'test-vm1': 'red', + 'test-vm2': 'red', + 'test-vm3': 'green', + '$dispvm:test-vm3': 'green', + } + self.assertEqual(self.dbus_mock.mock_calls, [ + ('', (), {}), + ('().get', ('org.qubesos.PolicyAgent', + '/org/qubesos/PolicyAgent'), {}), + ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'], + '', icons), {}), + ]) + + def test_011_ask_deny(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.ask, + 'return_value.evaluate.return_value.target': + None, + 'return_value.evaluate.return_value.targets_for_ask': + ['test-vm1', 'test-vm2'], + 'return_value.evaluate.return_value.handle_user_response' + '.side_effect': + qubespolicy.AccessDenied, + }) + self.dbus_mock.configure_mock(**{ + 'return_value.get.return_value.Ask.return_value': '' + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 1) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ('().evaluate().handle_user_response', (False,), {}), + ]) + icons = { + 'dom0': 'black', + 'test-vm1': 'red', + 'test-vm2': 'red', + 'test-vm3': 'green', + '$dispvm:test-vm3': 'green', + } + self.assertEqual(self.dbus_mock.mock_calls, [ + ('', (), {}), + ('().get', ('org.qubesos.PolicyAgent', + '/org/qubesos/PolicyAgent'), {}), + ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'], + '', icons), {}), + ]) + + def test_012_ask_default_target(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.ask, + 'return_value.evaluate.return_value.target': + 'test-vm1', + 'return_value.evaluate.return_value.targets_for_ask': + ['test-vm1', 'test-vm2'], + }) + self.dbus_mock.configure_mock(**{ + 'return_value.get.return_value.Ask.return_value': 'test-vm1' + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 0) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ('().evaluate().handle_user_response', (True, 'test-vm1'), {}), + ('().evaluate().execute', ('process_ident,source,source-id',), {}), + ]) + icons = { + 'dom0': 'black', + 'test-vm1': 'red', + 'test-vm2': 'red', + 'test-vm3': 'green', + '$dispvm:test-vm3': 'green', + } + self.assertEqual(self.dbus_mock.mock_calls, [ + ('', (), {}), + ('().get', ('org.qubesos.PolicyAgent', + '/org/qubesos/PolicyAgent'), {}), + ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'], + 'test-vm1', icons), {}), + ]) + + def test_020_deny(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.deny, + 'return_value.evaluate.return_value.execute.side_effect': + qubespolicy.AccessDenied, + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 1) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ('().evaluate().target.__str__', (), {}), + ('().evaluate().execute', ('process_ident,source,source-id',), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, []) + + def test_030_just_evaluate_allow(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.allow, + }) + retval = qubespolicy.cli.main( + ['--just-evaluate', + 'source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 0) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, []) + + def test_031_just_evaluate_deny(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.deny, + }) + retval = qubespolicy.cli.main( + ['--just-evaluate', + 'source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 1) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, []) + + def test_032_just_evaluate_ask(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.ask, + }) + retval = qubespolicy.cli.main( + ['--just-evaluate', + 'source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 1) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, []) + + def test_033_just_evaluate_ask_assume_yes(self): + self.policy_mock.configure_mock(**{ + 'return_value.evaluate.return_value.action': + qubespolicy.Action.ask, + }) + retval = qubespolicy.cli.main( + ['--just-evaluate', '--assume-yes-for-ask', + 'source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 0) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, []) + + def test_040_create_policy(self): + self.policy_mock.configure_mock(**{ + 'side_effect': + [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT], + 'return_value.evaluate.return_value.action': + qubespolicy.Action.allow, + }) + self.dbus_mock.configure_mock(**{ + 'return_value.get.return_value.ConfirmPolicyCreate.return_value': + True + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 0) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ('', ('service',), {}), + ('().evaluate', (self.system_info, 'source', + 'target'), {}), + ('().evaluate().target.__str__', (), {}), + ('().evaluate().execute', ('process_ident,source,source-id',), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, [ + ('', (), {}), + ('().get', ('org.qubesos.PolicyAgent', + '/org/qubesos/PolicyAgent'), {}), + ('().get().ConfirmPolicyCreate', ('source', 'service'), {}), + ]) + policy_path = os.path.join(self.policy_dir.name, 'service') + self.assertTrue(os.path.exists(policy_path)) + with open(policy_path) as policy_file: + self.assertEqual(policy_file.read(), + "## Policy file automatically created on first service call.\n" + "## Fill free to edit.\n" + "## Note that policy parsing stops at the first match\n" + "\n" + "## Please use a single # to start your custom comments\n" + "\n" + "$anyvm $anyvm ask\n") + + def test_041_create_policy_abort(self): + self.policy_mock.configure_mock(**{ + 'side_effect': + [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT], + 'return_value.evaluate.return_value.action': + qubespolicy.Action.deny, + }) + self.dbus_mock.configure_mock(**{ + 'return_value.get.return_value.ConfirmPolicyCreate.return_value': + False + }) + retval = qubespolicy.cli.main( + ['source-id', 'source', 'target', 'service', 'process_ident']) + self.assertEqual(retval, 1) + self.assertEqual(self.policy_mock.mock_calls, [ + ('', ('service',), {}), + ]) + self.assertEqual(self.dbus_mock.mock_calls, [ + ('', (), {}), + ('().get', ('org.qubesos.PolicyAgent', + '/org/qubesos/PolicyAgent'), {}), + ('().get().ConfirmPolicyCreate', ('source', 'service'), {}), + ]) + policy_path = os.path.join(self.policy_dir.name, 'service') + self.assertFalse(os.path.exists(policy_path)) diff --git a/rpm_spec/core-dom0.spec b/rpm_spec/core-dom0.spec index f2e6d326..9534fa16 100644 --- a/rpm_spec/core-dom0.spec +++ b/rpm_spec/core-dom0.spec @@ -383,6 +383,7 @@ fi %dir %{python3_sitelib}/qubespolicy/tests/__pycache__ %{python3_sitelib}/qubespolicy/tests/__pycache__/* %{python3_sitelib}/qubespolicy/tests/__init__.py +%{python3_sitelib}/qubespolicy/tests/cli.py %{python3_sitelib}/qubespolicy/tests/gtkhelpers.py %{python3_sitelib}/qubespolicy/tests/rpcconfirmation.py