core-admin/qubespolicy/tests/__init__.py
Marek Marczykowski-Górecki eeec2e0ddd
qubespolicy: forbid qrexec loopback connections at policy level
libxenvchan currently can't handle loopback connections. Since error
reporting on vchan connection setup is far from perfect, try to avoid
making such connections at all.

QubesOS/qubes-issues#951
Fixes QubesOS/qubes-issues#4804
2019-02-27 06:03:57 +01:00

920 lines
44 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
import os
import socket
import unittest.mock
import shutil
import qubes.tests
import qubespolicy
tmp_policy_dir = '/tmp/policy'
system_info = {
'domains': {
'dom0': {
'tags': ['dom0-tag'],
'type': 'AdminVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-vm1': {
'tags': ['tag1', 'tag2'],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-vm2': {
'tags': ['tag2'],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-vm3': {
'tags': ['tag3'],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': True,
},
'default-dvm': {
'tags': [],
'type': 'AppVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': True,
},
'test-invalid-dvm': {
'tags': ['tag1', 'tag2'],
'type': 'AppVM',
'default_dispvm': 'test-vm1',
'template_for_dispvms': False,
},
'test-no-dvm': {
'tags': ['tag1', 'tag2'],
'type': 'AppVM',
'default_dispvm': None,
'template_for_dispvms': False,
},
'test-template': {
'tags': ['tag1', 'tag2'],
'type': 'TemplateVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
'test-standalone': {
'tags': ['tag1', 'tag2'],
'type': 'StandaloneVM',
'default_dispvm': 'default-dvm',
'template_for_dispvms': False,
},
}
}
class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_000_verify_target_value(self):
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-vm1'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'default-dvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '@dispvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '@dispvm:default-dvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-template'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-standalone'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '@adminvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, 'no-such-vm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info,
'@dispvm:test-invalid-dvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@dispvm:test-vm1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, ''))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@default'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@anyvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@tag:tag1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@dispvm:@tag:tag1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '@invalid'))
def test_010_verify_special_value(self):
self.assertTrue(qubespolicy.verify_special_value('@tag:tag',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@tag:other-tag',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@type:AppVM',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@adminvm',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('@dispvm:some-vm',
for_target=True))
self.assertTrue(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
for_target=True))
self.assertFalse(qubespolicy.verify_special_value('@default',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@dispvm',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@dispvm:some-vm',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@invalid',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('vm-name',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@tag:',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('@type:',
for_target=False))
def test_020_line_simple(self):
line = qubespolicy.PolicyRule('@anyvm @anyvm ask', 'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@anyvm')
self.assertEqual(line.full_action, 'ask')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertIsNone(line.default_target)
def test_021_line_simple(self):
# also check spaces in action field
line = qubespolicy.PolicyRule(
'@tag:tag1 @type:AppVM ask, target=test-vm2, user=user',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@tag:tag1')
self.assertEqual(line.target, '@type:AppVM')
self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user')
self.assertEqual(line.override_target, 'test-vm2')
self.assertEqual(line.override_user, 'user')
self.assertIsNone(line.default_target)
def test_022_line_simple(self):
line = qubespolicy.PolicyRule(
'@anyvm @default allow,target=@dispvm:test-vm2',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.allow)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@default')
self.assertEqual(line.full_action, 'allow,target=@dispvm:test-vm2')
self.assertEqual(line.override_target, '@dispvm:test-vm2')
self.assertIsNone(line.override_user)
self.assertIsNone(line.default_target)
def test_023_line_simple(self):
line = qubespolicy.PolicyRule(
'@anyvm @default ask,default_target=test-vm1',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@default')
self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertEqual(line.default_target, 'test-vm1')
def test_024_line_simple(self):
line = qubespolicy.PolicyRule(
'@anyvm @adminvm ask,default_target=@adminvm',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@adminvm')
self.assertEqual(line.full_action, 'ask,default_target=@adminvm')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertEqual(line.default_target, '@adminvm')
def test_030_line_invalid(self):
invalid_lines = [
'@dispvm @default allow', # @dispvm can't be a source
'@default @default allow', # @default can't be a source
'@anyvm @default allow,target=@dispvm:@tag:tag1', # @dispvm:@tag
# as override target
'@anyvm @default allow,target=@tag:tag1', # @tag as override target
'@anyvm @default deny,target=test-vm1', # target= used with deny
'@anyvm @anyvm deny,default_target=test-vm1', # default_target=
# with deny
'@anyvm @anyvm deny,user=user', # user= with deny
'@anyvm @anyvm invalid', # invalid action
'@anyvm @anyvm allow,invalid=xx', # invalid option
'@anyvm @anyvm', # missing action
'@anyvm @anyvm allow,default_target=test-vm1', # default_target=
# with allow
'@invalid @anyvm allow', # invalid source
'@anyvm @invalid deny', # invalid target
'', # empty line
'@anyvm @anyvm allow extra', # trailing words
'@anyvm @default allow', # @default allow without target=
]
for line in invalid_lines:
with self.subTest(line):
with self.assertRaises(qubespolicy.PolicySyntaxError):
qubespolicy.PolicyRule(line, 'filename', 12)
def test_040_match_single(self):
is_match_single = qubespolicy.PolicyRule.is_match_single
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@default'))
self.assertTrue(is_match_single(system_info, '@default', '@default'))
self.assertTrue(is_match_single(system_info, '@tag:tag1', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@type:AppVM', 'test-vm1'))
self.assertTrue(is_match_single(system_info,
'@type:TemplateVM', 'test-template'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
self.assertTrue(is_match_single(system_info,
'@anyvm', '@dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '@dispvm', '@dispvm'))
self.assertTrue(is_match_single(system_info,
'@dispvm:@tag:tag3', '@dispvm:test-vm3'))
self.assertTrue(is_match_single(system_info, '@adminvm', '@adminvm'))
self.assertTrue(is_match_single(system_info, '@adminvm', 'dom0'))
self.assertTrue(is_match_single(system_info, 'dom0', '@adminvm'))
self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
self.assertTrue(is_match_single(system_info,
'@dispvm:default-dvm', '@dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@default', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@tag:tag1', 'test-vm3'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
# test-vm1.template_for_dispvms=False
self.assertFalse(is_match_single(system_info,
'@anyvm', '@dispvm:test-vm1'))
# test-vm1.template_for_dispvms=False
self.assertFalse(is_match_single(system_info,
'@dispvm:test-vm1', '@dispvm:test-vm1'))
self.assertFalse(is_match_single(system_info,
'@dispvm:@tag:tag1', '@dispvm:test-vm1'))
# test-vm3 has not tag1
self.assertFalse(is_match_single(system_info,
'@dispvm:@tag:tag1', '@dispvm:test-vm3'))
# default-dvm has no tag3
self.assertFalse(is_match_single(system_info,
'@dispvm:@tag:tag3', '@dispvm:default-dvm'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'dom0'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'@tag:dom0-tag', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'@type:AdminVM', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'@tag:dom0-tag', 'dom0'))
self.assertFalse(is_match_single(system_info,
'@type:AdminVM', 'dom0'))
self.assertFalse(is_match_single(system_info, '@tag:tag1', 'dom0'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@tag:tag1'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@type:AppVM'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@invalid'))
self.assertFalse(is_match_single(system_info, '@invalid', '@invalid'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info,
'no-such-vm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info, '@dispvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@dispvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info,
'@dispvm:default-dvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1\n'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1 '))
def test_050_match(self):
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
self.assertFalse(line.is_match(system_info,
'test-vm1', '@dispvm:default-dvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info,
'test-vm1', '@dispvm:default-dvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:@tag:tag3 allow')
self.assertTrue(line.is_match(system_info,
'test-vm1', '@dispvm:test-vm3'))
def test_060_expand_target(self):
lines = {
'@anyvm @anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
'@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone', '@dispvm'],
'@anyvm @dispvm allow': ['@dispvm'],
'@anyvm @dispvm:default-dvm allow': ['@dispvm:default-dvm'],
# no DispVM from test-vm1 allowed
'@anyvm @dispvm:test-vm1 allow': [],
'@anyvm @dispvm:test-vm3 allow': ['@dispvm:test-vm3'],
'@anyvm @dispvm:@tag:tag1 allow': [],
'@anyvm @dispvm:@tag:tag3 allow': ['@dispvm:test-vm3'],
'@anyvm test-vm1 allow': ['test-vm1'],
'@anyvm @type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
'@anyvm @type:TemplateVM allow': ['test-template'],
'@anyvm @tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
'test-template', 'test-standalone', 'test-no-dvm'],
'@anyvm @tag:tag2 allow': ['test-vm1', 'test-vm2',
'test-invalid-dvm', 'test-template', 'test-standalone',
'test-no-dvm'],
'@anyvm @tag:no-such-tag allow': [],
}
for line in lines:
with self.subTest(line):
policy_line = qubespolicy.PolicyRule(line)
self.assertCountEqual(list(policy_line.expand_target(system_info)),
lines[line])
def test_070_expand_override_target(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=test-vm2')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'test-vm2')
def test_071_expand_override_target_dispvm(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@dispvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'@dispvm:default-dvm')
def test_072_expand_override_target_dispvm_specific(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@dispvm:test-vm3')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'@dispvm:test-vm3')
def test_073_expand_override_target_dispvm_none(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@dispvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
None)
def test_074_expand_override_target_dom0(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=dom0')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
'dom0')
def test_075_expand_override_target_dom0(self):
line = qubespolicy.PolicyRule(
'@anyvm @anyvm allow,target=@adminvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
'@adminvm')
class TC_10_PolicyAction(qubes.tests.QubesTestCase):
def test_000_init(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm deny')
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
rule, 'test-vm2')
def test_001_init(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
self.assertEqual(action.service, 'test.service')
self.assertEqual(action.source, 'test-vm1')
self.assertIsNone(action.target)
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3'])
self.assertEqual(action.rule, rule)
self.assertEqual(action.action, qubespolicy.Action.ask)
def test_002_init_invalid(self):
rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
rule_allow = qubespolicy.PolicyRule('@anyvm @anyvm allow')
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule_allow, 'test-vm2', None)
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule_ask, 'test-vm2', None)
def test_003_init_default_target(self):
rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
self.assertIsNone(action.target)
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule_ask, 'test-vm2', ['test-vm2'])
self.assertEqual(action.target, 'test-vm2')
def test_010_handle_user_response(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
action.handle_user_response(True, 'test-vm2')
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
def test_011_handle_user_response(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(AssertionError):
action.handle_user_response(True, 'test-no-dvm')
def test_012_handle_user_response(self):
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(qubespolicy.AccessDenied):
action.handle_user_response(False, None)
self.assertEqual(action.action, qubespolicy.Action.deny)
def test_013_handle_user_response_with_default_target(self):
rule = qubespolicy.PolicyRule(
'@anyvm @anyvm ask,default_target=test-vm2')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
action.handle_user_response(True, 'test-vm2')
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_020_execute(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('test-vm2', 'admin.vm.Start')])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
'-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'dom0', rule, 'dom0')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls, [])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
'-c', 'some-ident',
'QUBESRPC test.service test-vm1 name dom0'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_021_execute_dom0_keyword(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'dom0', rule, '@adminvm')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls, [])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
'-c', 'some-ident',
'QUBESRPC test.service test-vm1 keyword adminvm'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'@dispvm:default-dvm', rule, '@dispvm:default-dvm')
mock_qubesd_call.side_effect = (lambda target, call:
b'dispvm-name' if call == 'admin.vm.CreateDisposable' else
unittest.mock.DEFAULT)
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('default-dvm', 'admin.vm.CreateDisposable'),
unittest.mock.call('dispvm-name', 'admin.vm.Start'),
unittest.mock.call('dispvm-name', 'admin.vm.Kill')])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name',
'-c', 'some-ident', '-W',
'DEFAULT:QUBESRPC test.service test-vm1'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_023_execute_already_running(self, mock_subprocess,
mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
mock_qubesd_call.side_effect = \
qubespolicy.QubesMgmtException('QubesVMNotHaltedError')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('test-vm2', 'admin.vm.Start')])
self.assertEqual(mock_subprocess.mock_calls,
[unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
'-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_024_execute_startup_error(self, mock_subprocess,
mock_qubesd_call):
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
mock_qubesd_call.side_effect = \
qubespolicy.QubesMgmtException('QubesVMError')
with self.assertRaises(qubespolicy.QubesMgmtException):
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls,
[unittest.mock.call('test-vm2', 'admin.vm.Start')])
self.assertEqual(mock_subprocess.mock_calls, [])
class TC_20_Policy(qubes.tests.QubesTestCase):
def setUp(self):
super(TC_20_Policy, self).setUp()
if not os.path.exists(tmp_policy_dir):
os.mkdir(tmp_policy_dir)
def tearDown(self):
shutil.rmtree(tmp_policy_dir)
super(TC_20_Policy, self).tearDown()
def test_000_load(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('\n')
f.write('# comment\n')
f.write('test-vm2 test-vm3 ask\n')
f.write(' # comment \n')
f.write('@anyvm @anyvm ask\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 3)
self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
self.assertEqual(policy.policy_rules[0].action,
qubespolicy.Action.allow)
def test_001_not_existent(self):
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.Policy('no-such.service', tmp_policy_dir)
def test_002_include(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('@include:test.service2\n')
f.write('@anyvm @anyvm deny\n')
with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
f.write('test-vm3 @default allow,target=test-vm2\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 3)
self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
self.assertEqual(policy.policy_rules[0].action,
qubespolicy.Action.allow)
self.assertEqual(policy.policy_rules[0].filename,
tmp_policy_dir + '/test.service')
self.assertEqual(policy.policy_rules[0].lineno, 1)
self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
self.assertEqual(policy.policy_rules[1].target, '@default')
self.assertEqual(policy.policy_rules[1].action,
qubespolicy.Action.allow)
self.assertEqual(policy.policy_rules[1].filename,
tmp_policy_dir + '/test.service2')
self.assertEqual(policy.policy_rules[1].lineno, 1)
self.assertEqual(policy.policy_rules[2].source, '@anyvm')
self.assertEqual(policy.policy_rules[2].target, '@anyvm')
self.assertEqual(policy.policy_rules[2].action,
qubespolicy.Action.deny)
self.assertEqual(policy.policy_rules[2].filename,
tmp_policy_dir + '/test.service')
self.assertEqual(policy.policy_rules[2].lineno, 3)
def test_003_load_convert(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm2 test-vm3 ask\n')
f.write(' # comment \n')
f.write('$anyvm $dispvm ask,default_target=$dispvm\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 2)
self.assertEqual(policy.policy_rules[1].source, '@anyvm')
self.assertEqual(policy.policy_rules[1].target, '@dispvm')
self.assertEqual(policy.policy_rules[1].action,
qubespolicy.Action.ask)
self.assertEqual(policy.policy_rules[1].default_target,
'@dispvm')
def test_010_find_rule(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @anyvm ask\n')
f.write('test-vm2 @tag:tag1 deny\n')
f.write('test-vm2 @tag:tag2 allow\n')
f.write('test-vm2 @dispvm:@tag:tag3 allow\n')
f.write('test-vm2 @dispvm:@tag:tag2 allow\n')
f.write('test-vm2 @dispvm:default-dvm allow\n')
f.write('@type:AppVM @default allow,target=test-vm3\n')
f.write('@tag:tag1 @type:AppVM allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
# @anyvm matches @default too
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', '@default'), policy.policy_rules[1])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '@default'), policy.policy_rules[7])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '@dispvm:test-vm3'),
policy.policy_rules[4])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '@dispvm'),
policy.policy_rules[6])
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(
system_info, 'test-no-dvm', 'test-standalone')
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(system_info, 'test-no-dvm', '@dispvm')
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(
system_info, 'test-standalone', '@default')
def test_020_collect_targets_for_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @anyvm ask\n')
f.write('test-vm2 @tag:tag1 deny\n')
f.write('test-vm2 @tag:tag2 allow\n')
f.write('test-no-dvm @type:AppVM deny\n')
f.write('@type:AppVM @default allow,target=test-vm3\n')
f.write('@tag:tag1 @type:AppVM allow\n')
f.write('test-no-dvm @dispvm allow\n')
f.write('test-standalone @dispvm allow\n')
f.write('test-standalone @adminvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm1'), ['test-vm2', 'test-vm3',
'@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm2'), ['test-vm3'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm3'), [])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
'@dispvm:default-dvm', 'dom0'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-no-dvm'), [])
def test_030_eval_simple(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm1', 'test-vm2')
self.assertEqual(action.rule, policy.policy_rules[0])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm2', '@default')
def test_031_eval_default(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm1', '@default')
self.assertEqual(action.rule, policy.policy_rules[1])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
self.assertEqual(action.original_target, '@default')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
with self.assertRaises(qubespolicy.AccessDenied):
# action allow should hit, but no target specified (either by
# caller or policy)
policy.evaluate(system_info, 'test-standalone', '@default')
def test_032_eval_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
self.assertEqual(action.rule, policy.policy_rules[2])
self.assertEqual(action.action, qubespolicy.Action.ask)
self.assertIsNone(action.target)
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.service, 'test.service')
self.assertCountEqual(action.targets_for_ask,
['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template'])
def test_033_eval_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
self.assertEqual(action.rule, policy.policy_rules[3])
self.assertEqual(action.action, qubespolicy.Action.ask)
self.assertEqual(action.target, 'test-vm3')
self.assertEqual(action.original_target, 'test-vm3')
self.assertEqual(action.service, 'test.service')
self.assertCountEqual(action.targets_for_ask,
['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template'])
def test_034_eval_resolve_dispvm(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 @dispvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm3', '@dispvm')
self.assertEqual(action.rule, policy.policy_rules[0])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, '@dispvm:default-dvm')
self.assertEqual(action.original_target, '@dispvm')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
def test_035_eval_resolve_dispvm_fail(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-no-dvm @dispvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-no-dvm', '@dispvm')
def test_036_eval_invalid_override_target(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 @anyvm allow,target=no-such-vm\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm3', '@default')
def test_037_eval_ask_no_targets(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 @default ask\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm3', '@default')
class TC_30_Misc(qubes.tests.QubesTestCase):
@unittest.mock.patch('socket.socket')
def test_000_qubesd_call(self, mock_socket):
mock_config = {
'return_value.makefile.return_value.read.return_value': b'0\x00data'
}
mock_socket.configure_mock(**mock_config)
result = qubespolicy.qubesd_call('test', 'internal.method')
self.assertEqual(result, b'data')
self.assertEqual(mock_socket.mock_calls, [
unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
unittest.mock.call().sendall(b'dom0'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'internal.method'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'test'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().shutdown(socket.SHUT_WR),
unittest.mock.call().makefile('rb'),
unittest.mock.call().makefile().read(),
])
@unittest.mock.patch('socket.socket')
def test_001_qubesd_call_arg_payload(self, mock_socket):
mock_config = {
'return_value.makefile.return_value.read.return_value': b'0\x00data'
}
mock_socket.configure_mock(**mock_config)
result = qubespolicy.qubesd_call('test', 'internal.method', 'arg',
b'payload')
self.assertEqual(result, b'data')
self.assertEqual(mock_socket.mock_calls, [
unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
unittest.mock.call().sendall(b'dom0'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'internal.method'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'test'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'arg'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'payload'),
unittest.mock.call().shutdown(socket.SHUT_WR),
unittest.mock.call().makefile('rb'),
unittest.mock.call().makefile().read(),
])
@unittest.mock.patch('socket.socket')
def test_002_qubesd_call_exception(self, mock_socket):
mock_config = {
'return_value.makefile.return_value.read.return_value':
b'2\x00SomeError\x00traceback\x00message\x00'
}
mock_socket.configure_mock(**mock_config)
with self.assertRaises(qubespolicy.QubesMgmtException) as e:
qubespolicy.qubesd_call('test', 'internal.method')
self.assertEqual(e.exception.exc_type, 'SomeError')
self.assertEqual(mock_socket.mock_calls, [
unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
unittest.mock.call().sendall(b'dom0'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'internal.method'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'test'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().sendall(b'\x00'),
unittest.mock.call().shutdown(socket.SHUT_WR),
unittest.mock.call().makefile('rb'),
unittest.mock.call().makefile().read(),
])