core-admin/qubespolicy/tests/__init__.py
2018-10-29 22:37:15 +01:00

920 lines
44 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
import os
import socket
import unittest.mock
import shutil
import qubes.tests
import qubespolicy
tmp_policy_dir = '/tmp/policy'
system_info = {
'domains': {
'dom0': {
'tags': ['dom0-tag'],
'type': 'AdminVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-vm1': {
'tags': ['tag1', 'tag2'],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-vm2': {
'tags': ['tag2'],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-vm3': {
'tags': ['tag3'],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': True,
},
'default-dvm': {
'tags': [],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': True,
},
'test-invalid-dvm': {
'tags': ['tag1', 'tag2'],
'type': 'AppVM',
'default_dispvm': 'test-vm1',
'template_for_dispvms': False,
},
'test-no-dvm': {
'tags': ['tag1', 'tag2'],
'type': 'AppVM',
'default_dispvm': None,
'template_for_dispvms': False,
},
'test-template': {
'tags': ['tag1', 'tag2'],
'type': 'TemplateVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-standalone': {
'tags': ['tag1', 'tag2'],
'type': 'StandaloneVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
}
}
class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_000_verify_target_value(self):
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-vm1'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'default-dvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '@dispvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '@dispvm:default-dvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-template'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-standalone'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '@adminvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, 'no-such-vm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info,
'@dispvm:test-invalid-dvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@dispvm:test-vm1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, ''))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@default'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@anyvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@tag:tag1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@dispvm:@tag:tag1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@invalid'))
def test_010_verify_special_value(self):
self.assertTrue(qubespolicy.verify_special_value('@tag:tag',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@tag:other-tag',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@type:AppVM',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@adminvm',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@dispvm:some-vm',
for_target=True))
self.assertTrue(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
for_target=True))
self.assertFalse(qubespolicy.verify_special_value('@default',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@dispvm',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@dispvm:some-vm',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@invalid',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('vm-name',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@tag:',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@type:',
for_target=False))
def test_020_line_simple(self):
line = qubespolicy.PolicyRule('@anyvm @anyvm ask', 'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@anyvm')
self.assertEqual(line.full_action, 'ask')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertIsNone(line.default_target)
def test_021_line_simple(self):
# also check spaces in action field
line = qubespolicy.PolicyRule(
'@tag:tag1 @type:AppVM ask, target=test-vm2, user=user',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@tag:tag1')
self.assertEqual(line.target, '@type:AppVM')
self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user')
self.assertEqual(line.override_target, 'test-vm2')
self.assertEqual(line.override_user, 'user')
self.assertIsNone(line.default_target)
def test_022_line_simple(self):
line = qubespolicy.PolicyRule(
'@anyvm @default allow,target=@dispvm:test-vm2',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.allow)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@default')
self.assertEqual(line.full_action, 'allow,target=@dispvm:test-vm2')
self.assertEqual(line.override_target, '@dispvm:test-vm2')
self.assertIsNone(line.override_user)
self.assertIsNone(line.default_target)
def test_023_line_simple(self):
line = qubespolicy.PolicyRule(
'@anyvm @default ask,default_target=test-vm1',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@default')
self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertEqual(line.default_target, 'test-vm1')
def test_024_line_simple(self):
line = qubespolicy.PolicyRule(
'@anyvm @adminvm ask,default_target=@adminvm',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@adminvm')
self.assertEqual(line.full_action, 'ask,default_target=@adminvm')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertEqual(line.default_target, '@adminvm')
def test_030_line_invalid(self):
invalid_lines = [
'@dispvm @default allow', # @dispvm can't be a source
'@default @default allow', # @default can't be a source
'@anyvm @default allow,target=@dispvm:@tag:tag1', # @dispvm:@tag
# as override target
'@anyvm @default allow,target=@tag:tag1', # @tag as override target
'@anyvm @default deny,target=test-vm1', # target= used with deny
'@anyvm @anyvm deny,default_target=test-vm1', # default_target=
# with deny
'@anyvm @anyvm deny,user=user', # user= with deny
'@anyvm @anyvm invalid', # invalid action
'@anyvm @anyvm allow,invalid=xx', # invalid option
'@anyvm @anyvm', # missing action
'@anyvm @anyvm allow,default_target=test-vm1', # default_target=
# with allow
'@invalid @anyvm allow', # invalid source
'@anyvm @invalid deny', # invalid target
'', # empty line
'@anyvm @anyvm allow extra', # trailing words
'@anyvm @default allow', # @default allow without target=
]
for line in invalid_lines:
with self.subTest(line):
with self.assertRaises(qubespolicy.PolicySyntaxError):
qubespolicy.PolicyRule(line, 'filename', 12)
def test_040_match_single(self):
is_match_single = qubespolicy.PolicyRule.is_match_single
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@default'))
self.assertTrue(is_match_single(system_info, '@default', '@default'))
self.assertTrue(is_match_single(system_info, '@tag:tag1', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@type:AppVM', 'test-vm1'))
self.assertTrue(is_match_single(system_info,
'@type:TemplateVM', 'test-template'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
self.assertTrue(is_match_single(system_info,
'@anyvm', '@dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '@dispvm', '@dispvm'))
self.assertTrue(is_match_single(system_info,
'@dispvm:@tag:tag3', '@dispvm:test-vm3'))
self.assertTrue(is_match_single(system_info, '@adminvm', '@adminvm'))
self.assertTrue(is_match_single(system_info, '@adminvm', 'dom0'))
self.assertTrue(is_match_single(system_info, 'dom0', '@adminvm'))
self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
self.assertTrue(is_match_single(system_info,
'@dispvm:default-dvm', '@dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@default', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@tag:tag1', 'test-vm3'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
# test-vm1.template_for_dispvms=False
self.assertFalse(is_match_single(system_info,
'@anyvm', '@dispvm:test-vm1'))
# test-vm1.template_for_dispvms=False
self.assertFalse(is_match_single(system_info,
'@dispvm:test-vm1', '@dispvm:test-vm1'))
self.assertFalse(is_match_single(system_info,
'@dispvm:@tag:tag1', '@dispvm:test-vm1'))
# test-vm3 has not tag1
self.assertFalse(is_match_single(system_info,
'@dispvm:@tag:tag1', '@dispvm:test-vm3'))
# default-dvm has no tag3
self.assertFalse(is_match_single(system_info,
'@dispvm:@tag:tag3', '@dispvm:default-dvm'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'dom0'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'@tag:dom0-tag', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'@type:AdminVM', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'@tag:dom0-tag', 'dom0'))
self.assertFalse(is_match_single(system_info,
'@type:AdminVM', 'dom0'))
self.assertFalse(is_match_single(system_info, '@tag:tag1', 'dom0'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@tag:tag1'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@type:AppVM'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@invalid'))
self.assertFalse(is_match_single(system_info, '@invalid', '@invalid'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info,
'no-such-vm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info, '@dispvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@dispvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info,
'@dispvm:default-dvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1\n'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1 '))
def test_050_match(self):
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
self.assertFalse(line.is_match(system_info,
'test-vm1', '@dispvm:default-dvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info,
'test-vm1', '@dispvm:default-dvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:@tag:tag3 allow')
self.assertTrue(line.is_match(system_info,
'test-vm1', '@dispvm:test-vm3'))
def test_060_expand_target(self):
lines = {
'@anyvm @anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
'@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone', '@dispvm'],
'@anyvm @dispvm allow': ['@dispvm'],
'@anyvm @dispvm:default-dvm allow': ['@dispvm:default-dvm'],
# no DispVM from test-vm1 allowed
'@anyvm @dispvm:test-vm1 allow': [],
'@anyvm @dispvm:test-vm3 allow': ['@dispvm:test-vm3'],
'@anyvm @dispvm:@tag:tag1 allow': [],
'@anyvm @dispvm:@tag:tag3 allow': ['@dispvm:test-vm3'],
'@anyvm test-vm1 allow': ['test-vm1'],
'@anyvm @type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
'@anyvm @type:TemplateVM allow': ['test-template'],
'@anyvm @tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
'test-template', 'test-standalone', 'test-no-dvm'],
'@anyvm @tag:tag2 allow': ['test-vm1', 'test-vm2',
'test-invalid-dvm', 'test-template', 'test-standalone',
'test-no-dvm'],
'@anyvm @tag:no-such-tag allow': [],
}
for line in lines:
with self.subTest(line):
policy_line = qubespolicy.PolicyRule(line)
self.assertCountEqual(list(policy_line.expand_target(system_info)),
lines[line])
def test_070_expand_override_target(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=test-vm2')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'test-vm2')
def test_071_expand_override_target_dispvm(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@dispvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'@dispvm:default-dvm')
def test_072_expand_override_target_dispvm_specific(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@dispvm:test-vm3')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'@dispvm:test-vm3')
def test_073_expand_override_target_dispvm_none(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@dispvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
None)
def test_074_expand_override_target_dom0(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=dom0')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
'dom0')
def test_075_expand_override_target_dom0(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@adminvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
'@adminvm')
class TC_10_PolicyAction(qubes.tests.QubesTestCase):
def test_000_init(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm deny')
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
rule, 'test-vm2')
def test_001_init(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
self.assertEqual(action.service, 'test.service')
self.assertEqual(action.source, 'test-vm1')
self.assertIsNone(action.target)
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3'])
self.assertEqual(action.rule, rule)
self.assertEqual(action.action, qubespolicy.Action.ask)
def test_002_init_invalid(self):
rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
rule_allow = qubespolicy.PolicyRule('@anyvm @anyvm allow')
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule_allow, 'test-vm2', None)
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule_ask, 'test-vm2', None)
def test_003_init_default_target(self):
rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
self.assertIsNone(action.target)
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule_ask, 'test-vm2', ['test-vm2'])
self.assertEqual(action.target, 'test-vm2')
def test_010_handle_user_response(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
action.handle_user_response(True, 'test-vm2')
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
def test_011_handle_user_response(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(AssertionError):
action.handle_user_response(True, 'test-no-dvm')
def test_012_handle_user_response(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(qubespolicy.AccessDenied):
action.handle_user_response(False, None)
self.assertEqual(action.action, qubespolicy.Action.deny)
def test_013_handle_user_response_with_default_target(self):
rule = qubespolicy.PolicyRule(
'@anyvm @anyvm ask,default_target=test-vm2')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
action.handle_user_response(True, 'test-vm2')
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_020_execute(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('test-vm2', 'admin.vm.Start')])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
'-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'dom0', rule, 'dom0')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls, [])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
'-c', 'some-ident',
'QUBESRPC test.service test-vm1 name dom0'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_021_execute_dom0_keyword(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'dom0', rule, '@adminvm')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls, [])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
'-c', 'some-ident',
'QUBESRPC test.service test-vm1 keyword adminvm'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'@dispvm:default-dvm', rule, '@dispvm:default-dvm')
mock_qubesd_call.side_effect = (lambda target, call:
b'dispvm-name' if call == 'admin.vm.CreateDisposable' else
unittest.mock.DEFAULT)
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('default-dvm', 'admin.vm.CreateDisposable'),
unittest.mock.call('dispvm-name', 'admin.vm.Start'),
unittest.mock.call('dispvm-name', 'admin.vm.Kill')])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name',
'-c', 'some-ident', '-W',
'DEFAULT:QUBESRPC test.service test-vm1'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_023_execute_already_running(self, mock_subprocess,
mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
mock_qubesd_call.side_effect = \
qubespolicy.QubesMgmtException('QubesVMNotHaltedError')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('test-vm2', 'admin.vm.Start')])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
'-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_024_execute_startup_error(self, mock_subprocess,
mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
mock_qubesd_call.side_effect = \
qubespolicy.QubesMgmtException('QubesVMError')
with self.assertRaises(qubespolicy.QubesMgmtException):
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('test-vm2', 'admin.vm.Start')])
self.assertEqual(mock_subprocess.mock_calls, [])
class TC_20_Policy(qubes.tests.QubesTestCase):
def setUp(self):
super(TC_20_Policy, self).setUp()
if not os.path.exists(tmp_policy_dir):
os.mkdir(tmp_policy_dir)
def tearDown(self):
shutil.rmtree(tmp_policy_dir)
super(TC_20_Policy, self).tearDown()
def test_000_load(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('\n')
f.write('# comment\n')
f.write('test-vm2 test-vm3 ask\n')
f.write(' # comment \n')
f.write('@anyvm @anyvm ask\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 3)
self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
self.assertEqual(policy.policy_rules[0].action,
qubespolicy.Action.allow)
def test_001_not_existent(self):
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.Policy('no-such.service', tmp_policy_dir)
def test_002_include(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('@include:test.service2\n')
f.write('@anyvm @anyvm deny\n')
with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
f.write('test-vm3 @default allow,target=test-vm2\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 3)
self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
self.assertEqual(policy.policy_rules[0].action,
qubespolicy.Action.allow)
self.assertEqual(policy.policy_rules[0].filename,
tmp_policy_dir + '/test.service')
self.assertEqual(policy.policy_rules[0].lineno, 1)
self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
self.assertEqual(policy.policy_rules[1].target, '@default')
self.assertEqual(policy.policy_rules[1].action,
qubespolicy.Action.allow)
self.assertEqual(policy.policy_rules[1].filename,
tmp_policy_dir + '/test.service2')
self.assertEqual(policy.policy_rules[1].lineno, 1)
self.assertEqual(policy.policy_rules[2].source, '@anyvm')
self.assertEqual(policy.policy_rules[2].target, '@anyvm')
self.assertEqual(policy.policy_rules[2].action,
qubespolicy.Action.deny)
self.assertEqual(policy.policy_rules[2].filename,
tmp_policy_dir + '/test.service')
self.assertEqual(policy.policy_rules[2].lineno, 3)
def test_003_load_convert(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm2 test-vm3 ask\n')
f.write(' # comment \n')
f.write('$anyvm $dispvm ask,default_target=$dispvm\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 2)
self.assertEqual(policy.policy_rules[1].source, '@anyvm')
self.assertEqual(policy.policy_rules[1].target, '@dispvm')
self.assertEqual(policy.policy_rules[1].action,
qubespolicy.Action.ask)
self.assertEqual(policy.policy_rules[1].default_target,
'@dispvm')
def test_010_find_rule(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @anyvm ask\n')
f.write('test-vm2 @tag:tag1 deny\n')
f.write('test-vm2 @tag:tag2 allow\n')
f.write('test-vm2 @dispvm:@tag:tag3 allow\n')
f.write('test-vm2 @dispvm:@tag:tag2 allow\n')
f.write('test-vm2 @dispvm:default-dvm allow\n')
f.write('@type:AppVM @default allow,target=test-vm3\n')
f.write('@tag:tag1 @type:AppVM allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
# @anyvm matches @default too
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', '@default'), policy.policy_rules[1])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '@default'), policy.policy_rules[7])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '@dispvm:test-vm3'),
policy.policy_rules[4])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '@dispvm'),
policy.policy_rules[6])
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(
system_info, 'test-no-dvm', 'test-standalone')
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(system_info, 'test-no-dvm', '@dispvm')
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(
system_info, 'test-standalone', '@default')
def test_020_collect_targets_for_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @anyvm ask\n')
f.write('test-vm2 @tag:tag1 deny\n')
f.write('test-vm2 @tag:tag2 allow\n')
f.write('test-no-dvm @type:AppVM deny\n')
f.write('@type:AppVM @default allow,target=test-vm3\n')
f.write('@tag:tag1 @type:AppVM allow\n')
f.write('test-no-dvm @dispvm allow\n')
f.write('test-standalone @dispvm allow\n')
f.write('test-standalone @adminvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3',
'@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm2'), ['test-vm2', 'test-vm3'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm3'), ['test-vm3'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
'@dispvm:default-dvm', 'dom0'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-no-dvm'), [])
def test_030_eval_simple(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm1', 'test-vm2')
self.assertEqual(action.rule, policy.policy_rules[0])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm2', '@default')
def test_031_eval_default(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm1', '@default')
self.assertEqual(action.rule, policy.policy_rules[1])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
self.assertEqual(action.original_target, '@default')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
with self.assertRaises(qubespolicy.AccessDenied):
# action allow should hit, but no target specified (either by
# caller or policy)
policy.evaluate(system_info, 'test-standalone', '@default')
def test_032_eval_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
self.assertEqual(action.rule, policy.policy_rules[2])
self.assertEqual(action.action, qubespolicy.Action.ask)
self.assertIsNone(action.target)
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.service, 'test.service')
self.assertCountEqual(action.targets_for_ask,
['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
def test_033_eval_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
self.assertEqual(action.rule, policy.policy_rules[3])
self.assertEqual(action.action, qubespolicy.Action.ask)
self.assertEqual(action.target, 'test-vm3')
self.assertEqual(action.original_target, 'test-vm3')
self.assertEqual(action.service, 'test.service')
self.assertCountEqual(action.targets_for_ask,
['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
def test_034_eval_resolve_dispvm(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 @dispvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm3', '@dispvm')
self.assertEqual(action.rule, policy.policy_rules[0])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, '@dispvm:default-dvm')
self.assertEqual(action.original_target, '@dispvm')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
def test_035_eval_resolve_dispvm_fail(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-no-dvm @dispvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-no-dvm', '@dispvm')
def test_036_eval_invalid_override_target(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 @anyvm allow,target=no-such-vm\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm3', '@default')
def test_037_eval_ask_no_targets(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 @default ask\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm3', '@default')
class TC_30_Misc(qubes.tests.QubesTestCase):
@unittest.mock.patch('socket.socket')
def test_000_qubesd_call(self, mock_socket):
mock_config = {
'return_value.makefile.return_value.read.return_value': b'0\x00data'
}
mock_socket.configure_mock(**mock_config)
result = qubespolicy.qubesd_call('test', 'internal.method')
self.assertEqual(result, b'data')
self.assertEqual(mock_socket.mock_calls, [
unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
unittest.mock.call().sendall(b'dom0'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'internal.method'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'test'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().shutdown(socket.SHUT_WR),
unittest.mock.call().makefile('rb'),
unittest.mock.call().makefile().read(),
])
@unittest.mock.patch('socket.socket')
def test_001_qubesd_call_arg_payload(self, mock_socket):
mock_config = {
'return_value.makefile.return_value.read.return_value': b'0\x00data'
}
mock_socket.configure_mock(**mock_config)
result = qubespolicy.qubesd_call('test', 'internal.method', 'arg',
b'payload')
self.assertEqual(result, b'data')
self.assertEqual(mock_socket.mock_calls, [
unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
unittest.mock.call().sendall(b'dom0'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'internal.method'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'test'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'arg'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'payload'),
unittest.mock.call().shutdown(socket.SHUT_WR),
unittest.mock.call().makefile('rb'),
unittest.mock.call().makefile().read(),
])
@unittest.mock.patch('socket.socket')
def test_002_qubesd_call_exception(self, mock_socket):
mock_config = {
'return_value.makefile.return_value.read.return_value':
b'2\x00SomeError\x00traceback\x00message\x00'
}
mock_socket.configure_mock(**mock_config)
with self.assertRaises(qubespolicy.QubesMgmtException) as e:
qubespolicy.qubesd_call('test', 'internal.method')
self.assertEqual(e.exception.exc_type, 'SomeError')
self.assertEqual(mock_socket.mock_calls, [
unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
unittest.mock.call().sendall(b'dom0'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'internal.method'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'test'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().shutdown(socket.SHUT_WR),
unittest.mock.call().makefile('rb'),
unittest.mock.call().makefile().read(),
])