123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- # -*- encoding: utf-8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- import os
- import tempfile
- import unittest.mock
- import shutil
- import qubes.tests
- import qubespolicy
- import qubespolicy.cli
- import qubespolicy.tests
- class TC_00_qrexec_policy(qubes.tests.QubesTestCase):
- def setUp(self):
- super(TC_00_qrexec_policy, self).setUp()
- self.policy_patch = unittest.mock.patch('qubespolicy.Policy')
- self.policy_mock = self.policy_patch.start()
- self.system_info_patch = unittest.mock.patch(
- 'qubespolicy.get_system_info')
- self.system_info_mock = self.system_info_patch.start()
- self.system_info = {
- 'domains': {'dom0': {'icon': 'black', 'template_for_dispvms': False},
- 'test-vm1': {'icon': 'red', 'template_for_dispvms': False},
- 'test-vm2': {'icon': 'red', 'template_for_dispvms': False},
- 'test-vm3': {'icon': 'green', 'template_for_dispvms': True}, }}
- self.system_info_mock.return_value = self.system_info
- self.dbus_patch = unittest.mock.patch('pydbus.SystemBus')
- self.dbus_mock = self.dbus_patch.start()
- self.policy_dir = tempfile.TemporaryDirectory()
- self.policydir_patch = unittest.mock.patch('qubespolicy.POLICY_DIR',
- self.policy_dir.name)
- self.policydir_patch.start()
- def tearDown(self):
- self.policydir_patch.stop()
- self.policy_dir.cleanup()
- self.dbus_patch.start()
- self.system_info_patch.stop()
- self.policy_patch.stop()
- super(TC_00_qrexec_policy, self).tearDown()
- def test_000_allow(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.allow,
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 0)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ('().evaluate().target.__str__', (), {}),
- ('().evaluate().execute', ('process_ident,source,source-id', ), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [])
- def test_010_ask_allow(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.ask,
- 'return_value.evaluate.return_value.target':
- None,
- 'return_value.evaluate.return_value.targets_for_ask':
- ['test-vm1', 'test-vm2'],
- })
- self.dbus_mock.configure_mock(**{
- 'return_value.get.return_value.Ask.return_value': 'test-vm1'
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 0)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ('().evaluate().handle_user_response', (True, 'test-vm1'), {}),
- ('().evaluate().execute', ('process_ident,source,source-id', ), {}),
- ])
- icons = {
- 'dom0': 'black',
- 'test-vm1': 'red',
- 'test-vm2': 'red',
- 'test-vm3': 'green',
- '@dispvm:test-vm3': 'green',
- }
- self.assertEqual(self.dbus_mock.mock_calls, [
- ('', (), {}),
- ('().get', ('org.qubesos.PolicyAgent',
- '/org/qubesos/PolicyAgent'), {}),
- ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'],
- '', icons), {}),
- ])
- def test_011_ask_deny(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.ask,
- 'return_value.evaluate.return_value.target':
- None,
- 'return_value.evaluate.return_value.targets_for_ask':
- ['test-vm1', 'test-vm2'],
- 'return_value.evaluate.return_value.handle_user_response'
- '.side_effect':
- qubespolicy.AccessDenied,
- })
- self.dbus_mock.configure_mock(**{
- 'return_value.get.return_value.Ask.return_value': ''
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 1)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ('().evaluate().handle_user_response', (False,), {}),
- ])
- icons = {
- 'dom0': 'black',
- 'test-vm1': 'red',
- 'test-vm2': 'red',
- 'test-vm3': 'green',
- '@dispvm:test-vm3': 'green',
- }
- self.assertEqual(self.dbus_mock.mock_calls, [
- ('', (), {}),
- ('().get', ('org.qubesos.PolicyAgent',
- '/org/qubesos/PolicyAgent'), {}),
- ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'],
- '', icons), {}),
- ])
- def test_012_ask_default_target(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.ask,
- 'return_value.evaluate.return_value.target':
- 'test-vm1',
- 'return_value.evaluate.return_value.targets_for_ask':
- ['test-vm1', 'test-vm2'],
- })
- self.dbus_mock.configure_mock(**{
- 'return_value.get.return_value.Ask.return_value': 'test-vm1'
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 0)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ('().evaluate().handle_user_response', (True, 'test-vm1'), {}),
- ('().evaluate().execute', ('process_ident,source,source-id',), {}),
- ])
- icons = {
- 'dom0': 'black',
- 'test-vm1': 'red',
- 'test-vm2': 'red',
- 'test-vm3': 'green',
- '@dispvm:test-vm3': 'green',
- }
- self.assertEqual(self.dbus_mock.mock_calls, [
- ('', (), {}),
- ('().get', ('org.qubesos.PolicyAgent',
- '/org/qubesos/PolicyAgent'), {}),
- ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'],
- 'test-vm1', icons), {}),
- ])
- def test_020_deny(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.deny,
- 'return_value.evaluate.return_value.execute.side_effect':
- qubespolicy.AccessDenied,
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 1)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ('().evaluate().target.__str__', (), {}),
- ('().evaluate().execute', ('process_ident,source,source-id',), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [])
- def test_030_just_evaluate_allow(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.allow,
- })
- retval = qubespolicy.cli.main(
- ['--just-evaluate',
- 'source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 0)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [])
- def test_031_just_evaluate_deny(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.deny,
- })
- retval = qubespolicy.cli.main(
- ['--just-evaluate',
- 'source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 1)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [])
- def test_032_just_evaluate_ask(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.ask,
- })
- retval = qubespolicy.cli.main(
- ['--just-evaluate',
- 'source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 1)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [])
- def test_033_just_evaluate_ask_assume_yes(self):
- self.policy_mock.configure_mock(**{
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.ask,
- })
- retval = qubespolicy.cli.main(
- ['--just-evaluate', '--assume-yes-for-ask',
- 'source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 0)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [])
- def test_040_create_policy(self):
- self.policy_mock.configure_mock(**{
- 'side_effect':
- [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT],
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.allow,
- })
- self.dbus_mock.configure_mock(**{
- 'return_value.get.return_value.ConfirmPolicyCreate.return_value':
- True
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 0)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ('', ('service',), {}),
- ('().evaluate', (self.system_info, 'source',
- 'target'), {}),
- ('().evaluate().target.__str__', (), {}),
- ('().evaluate().execute', ('process_ident,source,source-id',), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [
- ('', (), {}),
- ('().get', ('org.qubesos.PolicyAgent',
- '/org/qubesos/PolicyAgent'), {}),
- ('().get().ConfirmPolicyCreate', ('source', 'service'), {}),
- ])
- policy_path = os.path.join(self.policy_dir.name, 'service')
- self.assertTrue(os.path.exists(policy_path))
- with open(policy_path) as policy_file:
- self.assertEqual(policy_file.read(),
- "## Policy file automatically created on first service call.\n"
- "## Fill free to edit.\n"
- "## Note that policy parsing stops at the first match\n"
- "\n"
- "## Please use a single # to start your custom comments\n"
- "\n"
- "@anyvm @anyvm ask\n")
- def test_041_create_policy_abort(self):
- self.policy_mock.configure_mock(**{
- 'side_effect':
- [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT],
- 'return_value.evaluate.return_value.action':
- qubespolicy.Action.deny,
- })
- self.dbus_mock.configure_mock(**{
- 'return_value.get.return_value.ConfirmPolicyCreate.return_value':
- False
- })
- retval = qubespolicy.cli.main(
- ['source-id', 'source', 'target', 'service', 'process_ident'])
- self.assertEqual(retval, 1)
- self.assertEqual(self.policy_mock.mock_calls, [
- ('', ('service',), {}),
- ])
- self.assertEqual(self.dbus_mock.mock_calls, [
- ('', (), {}),
- ('().get', ('org.qubesos.PolicyAgent',
- '/org/qubesos/PolicyAgent'), {}),
- ('().get().ConfirmPolicyCreate', ('source', 'service'), {}),
- ])
- policy_path = os.path.join(self.policy_dir.name, 'service')
- self.assertFalse(os.path.exists(policy_path))
|