123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823 |
- # -*- encoding: utf8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- import os
- import socket
- import unittest.mock
- import shutil
- import qubes.tests
- import qubespolicy
- tmp_policy_dir = '/tmp/policy'
- system_info = {
- 'domains': {
- 'dom0': {
- 'tags': ['dom0-tag'],
- 'type': 'AdminVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': False,
- },
- 'test-vm1': {
- 'tags': ['tag1', 'tag2'],
- 'type': 'AppVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': False,
- },
- 'test-vm2': {
- 'tags': ['tag2'],
- 'type': 'AppVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': False,
- },
- 'test-vm3': {
- 'tags': [],
- 'type': 'AppVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': True,
- },
- 'default-dvm': {
- 'tags': [],
- 'type': 'AppVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': True,
- },
- 'test-invalid-dvm': {
- 'tags': ['tag1', 'tag2'],
- 'type': 'AppVM',
- 'default_dispvm': 'test-vm1',
- 'dispvm_allowed': False,
- },
- 'test-no-dvm': {
- 'tags': ['tag1', 'tag2'],
- 'type': 'AppVM',
- 'default_dispvm': None,
- 'dispvm_allowed': False,
- },
- 'test-template': {
- 'tags': ['tag1', 'tag2'],
- 'type': 'TemplateVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': False,
- },
- 'test-standalone': {
- 'tags': ['tag1', 'tag2'],
- 'type': 'StandaloneVM',
- 'default_dispvm': 'default-dvm',
- 'dispvm_allowed': False,
- },
- }
- }
- class TC_00_PolicyRule(qubes.tests.QubesTestCase):
- def test_000_verify_target_value(self):
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, 'test-vm1'))
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, 'default-dvm'))
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, '$dispvm'))
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm'))
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, 'test-template'))
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, 'test-standalone'))
- self.assertTrue(
- qubespolicy.verify_target_value(system_info, '$adminvm'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, 'no-such-vm'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info,
- '$dispvm:test-invalid-dvm'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, ''))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, '$default'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, '$anyvm'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, '$tag:tag1'))
- self.assertFalse(
- qubespolicy.verify_target_value(system_info, '$invalid'))
- def test_010_verify_special_value(self):
- self.assertTrue(qubespolicy.verify_special_value('$tag:tag',
- for_target=False))
- self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag',
- for_target=False))
- self.assertTrue(qubespolicy.verify_special_value('$type:AppVM',
- for_target=False))
- self.assertTrue(qubespolicy.verify_special_value('$adminvm',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('$default',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('$dispvm',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('$invalid',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('vm-name',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('$tag:',
- for_target=False))
- self.assertFalse(qubespolicy.verify_special_value('$type:',
- for_target=False))
- def test_020_line_simple(self):
- line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12)
- self.assertEqual(line.filename, 'filename')
- self.assertEqual(line.lineno, 12)
- self.assertEqual(line.action, qubespolicy.Action.ask)
- self.assertEqual(line.source, '$anyvm')
- self.assertEqual(line.target, '$anyvm')
- self.assertEqual(line.full_action, 'ask')
- self.assertIsNone(line.override_target)
- self.assertIsNone(line.override_user)
- self.assertIsNone(line.default_target)
- def test_021_line_simple(self):
- # also check spaces in action field
- line = qubespolicy.PolicyRule(
- '$tag:tag1 $type:AppVM ask, target=test-vm2, user=user',
- 'filename', 12)
- self.assertEqual(line.filename, 'filename')
- self.assertEqual(line.lineno, 12)
- self.assertEqual(line.action, qubespolicy.Action.ask)
- self.assertEqual(line.source, '$tag:tag1')
- self.assertEqual(line.target, '$type:AppVM')
- self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user')
- self.assertEqual(line.override_target, 'test-vm2')
- self.assertEqual(line.override_user, 'user')
- self.assertIsNone(line.default_target)
- def test_022_line_simple(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $default allow,target=$dispvm:test-vm2',
- 'filename', 12)
- self.assertEqual(line.filename, 'filename')
- self.assertEqual(line.lineno, 12)
- self.assertEqual(line.action, qubespolicy.Action.allow)
- self.assertEqual(line.source, '$anyvm')
- self.assertEqual(line.target, '$default')
- self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2')
- self.assertEqual(line.override_target, '$dispvm:test-vm2')
- self.assertIsNone(line.override_user)
- self.assertIsNone(line.default_target)
- def test_023_line_simple(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $default ask,default_target=test-vm1',
- 'filename', 12)
- self.assertEqual(line.filename, 'filename')
- self.assertEqual(line.lineno, 12)
- self.assertEqual(line.action, qubespolicy.Action.ask)
- self.assertEqual(line.source, '$anyvm')
- self.assertEqual(line.target, '$default')
- self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
- self.assertIsNone(line.override_target)
- self.assertIsNone(line.override_user)
- self.assertEqual(line.default_target, 'test-vm1')
- def test_024_line_simple(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $adminvm ask,default_target=$adminvm',
- 'filename', 12)
- self.assertEqual(line.filename, 'filename')
- self.assertEqual(line.lineno, 12)
- self.assertEqual(line.action, qubespolicy.Action.ask)
- self.assertEqual(line.source, '$anyvm')
- self.assertEqual(line.target, '$adminvm')
- self.assertEqual(line.full_action, 'ask,default_target=$adminvm')
- self.assertIsNone(line.override_target)
- self.assertIsNone(line.override_user)
- self.assertEqual(line.default_target, '$adminvm')
- def test_030_line_invalid(self):
- invalid_lines = [
- '$dispvm $default allow', # $dispvm can't be a source
- '$default $default allow', # $default can't be a source
- '$anyvm $default deny,target=test-vm1', # target= used with deny
- '$anyvm $anyvm deny,default_target=test-vm1', # default_target=
- # with deny
- '$anyvm $anyvm deny,user=user', # user= with deny
- '$anyvm $anyvm invalid', # invalid action
- '$anyvm $anyvm allow,invalid=xx', # invalid option
- '$anyvm $anyvm', # missing action
- '$anyvm $anyvm allow,default_target=test-vm1', # default_target=
- # with allow
- '$invalid $anyvm allow', # invalid source
- '$anyvm $invalid deny', # invalid target
- '', # empty line
- '$anyvm $anyvm allow extra', # trailing words
- '$anyvm $default allow', # $default allow without target=
- ]
- for line in invalid_lines:
- with self.subTest(line):
- with self.assertRaises(qubespolicy.PolicySyntaxError):
- qubespolicy.PolicyRule(line, 'filename', 12)
- def test_040_match_single(self):
- is_match_single = qubespolicy.PolicyRule.is_match_single
- self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
- self.assertTrue(is_match_single(system_info, '$anyvm', '$default'))
- self.assertTrue(is_match_single(system_info, '$anyvm', ''))
- self.assertTrue(is_match_single(system_info, '$default', ''))
- self.assertTrue(is_match_single(system_info, '$default', '$default'))
- self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1'))
- self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1'))
- self.assertTrue(is_match_single(system_info,
- '$type:TemplateVM', 'test-template'))
- self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
- self.assertTrue(is_match_single(system_info,
- '$anyvm', '$dispvm:default-dvm'))
- self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm'))
- self.assertTrue(is_match_single(system_info, '$adminvm', '$adminvm'))
- self.assertTrue(is_match_single(system_info, '$adminvm', 'dom0'))
- self.assertTrue(is_match_single(system_info, 'dom0', '$adminvm'))
- self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
- self.assertTrue(is_match_single(system_info,
- '$dispvm:default-dvm', '$dispvm:default-dvm'))
- self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
- self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
- self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
- self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
- self.assertFalse(is_match_single(system_info, '$default', 'test-vm1'))
- self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3'))
- self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
- # test-vm1.dispvm_allowed=False
- self.assertFalse(is_match_single(system_info,
- '$anyvm', '$dispvm:test-vm1'))
- # test-vm1.dispvm_allowed=False
- self.assertFalse(is_match_single(system_info,
- '$dispvm:test-vm1', '$dispvm:test-vm1'))
- self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0'))
- self.assertFalse(is_match_single(system_info, '$anyvm', '$adminvm'))
- self.assertFalse(is_match_single(system_info,
- '$tag:dom0-tag', '$adminvm'))
- self.assertFalse(is_match_single(system_info,
- '$type:AdminVM', '$adminvm'))
- self.assertFalse(is_match_single(system_info,
- '$tag:dom0-tag', 'dom0'))
- self.assertFalse(is_match_single(system_info,
- '$type:AdminVM', 'dom0'))
- self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0'))
- self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1'))
- self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM'))
- self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid'))
- self.assertFalse(is_match_single(system_info, '$invalid', '$invalid'))
- self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
- self.assertFalse(is_match_single(system_info,
- 'no-such-vm', 'no-such-vm'))
- self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1'))
- self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm'))
- self.assertFalse(is_match_single(system_info,
- '$dispvm:default-dvm', 'default-dvm'))
- self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n'))
- self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1 '))
- def test_050_match(self):
- line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
- line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
- line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
- def test_060_expand_target(self):
- lines = {
- '$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
- '$dispvm:test-vm3',
- 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
- 'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'],
- '$anyvm $dispvm allow': ['$dispvm'],
- '$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'],
- # no DispVM from test-vm1 allowed
- '$anyvm $dispvm:test-vm1 allow': [],
- '$anyvm test-vm1 allow': ['test-vm1'],
- '$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
- 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
- '$anyvm $type:TemplateVM allow': ['test-template'],
- '$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
- 'test-template', 'test-standalone', 'test-no-dvm'],
- '$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2',
- 'test-invalid-dvm', 'test-template', 'test-standalone',
- 'test-no-dvm'],
- '$anyvm $tag:no-such-tag allow': [],
- }
- for line in lines:
- with self.subTest(line):
- policy_line = qubespolicy.PolicyRule(line)
- self.assertCountEqual(list(policy_line.expand_target(system_info)),
- lines[line])
- def test_070_expand_override_target(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $anyvm allow,target=test-vm2')
- self.assertEqual(
- line.expand_override_target(system_info, 'test-vm1'),
- 'test-vm2')
- def test_071_expand_override_target_dispvm(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $anyvm allow,target=$dispvm')
- self.assertEqual(
- line.expand_override_target(system_info, 'test-vm1'),
- '$dispvm:default-dvm')
- def test_072_expand_override_target_dispvm_specific(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $anyvm allow,target=$dispvm:test-vm3')
- self.assertEqual(
- line.expand_override_target(system_info, 'test-vm1'),
- '$dispvm:test-vm3')
- def test_073_expand_override_target_dispvm_none(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $anyvm allow,target=$dispvm')
- self.assertEqual(
- line.expand_override_target(system_info, 'test-no-dvm'),
- None)
- def test_074_expand_override_target_dom0(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $anyvm allow,target=dom0')
- self.assertEqual(
- line.expand_override_target(system_info, 'test-no-dvm'),
- 'dom0')
- def test_075_expand_override_target_dom0(self):
- line = qubespolicy.PolicyRule(
- '$anyvm $anyvm allow,target=$adminvm')
- self.assertEqual(
- line.expand_override_target(system_info, 'test-no-dvm'),
- '$adminvm')
- class TC_10_PolicyAction(qubes.tests.QubesTestCase):
- def test_000_init(self):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm deny')
- with self.assertRaises(qubespolicy.AccessDenied):
- qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
- rule, 'test-vm2')
- def test_001_init(self):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
- self.assertEqual(action.service, 'test.service')
- self.assertEqual(action.source, 'test-vm1')
- self.assertIsNone(action.target)
- self.assertEqual(action.original_target, 'test-vm2')
- self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3'])
- self.assertEqual(action.rule, rule)
- self.assertEqual(action.action, qubespolicy.Action.ask)
- def test_002_init_invalid(self):
- rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
- rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- with self.assertRaises(AssertionError):
- qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule_allow, 'test-vm2', None)
- with self.assertRaises(AssertionError):
- qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3'])
- with self.assertRaises(AssertionError):
- qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule_ask, 'test-vm2', None)
- def test_003_init_default_target(self):
- rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
- self.assertIsNone(action.target)
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'test-vm2', rule_ask, 'test-vm2', ['test-vm2'])
- self.assertEqual(action.target, 'test-vm2')
- def test_010_handle_user_response(self):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
- action.handle_user_response(True, 'test-vm2')
- self.assertEqual(action.action, qubespolicy.Action.allow)
- self.assertEqual(action.target, 'test-vm2')
- def test_011_handle_user_response(self):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
- with self.assertRaises(AssertionError):
- action.handle_user_response(True, 'test-no-dvm')
- def test_012_handle_user_response(self):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
- with self.assertRaises(qubespolicy.AccessDenied):
- action.handle_user_response(False, None)
- self.assertEqual(action.action, qubespolicy.Action.deny)
- def test_013_handle_user_response_with_default_target(self):
- rule = qubespolicy.PolicyRule(
- '$anyvm $anyvm ask,default_target=test-vm2')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
- action.handle_user_response(True, 'test-vm2')
- self.assertEqual(action.action, qubespolicy.Action.allow)
- self.assertEqual(action.target, 'test-vm2')
- @unittest.mock.patch('qubespolicy.qubesd_call')
- @unittest.mock.patch('subprocess.call')
- def test_020_execute(self, mock_subprocess, mock_qubesd_call):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'test-vm2', rule, 'test-vm2')
- action.execute('some-ident')
- self.assertEqual(mock_qubesd_call.mock_calls,
- [unittest.mock.call('test-vm2', 'admin.vm.Start')])
- self.assertEqual(mock_subprocess.mock_calls,
- [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
- '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
- @unittest.mock.patch('qubespolicy.qubesd_call')
- @unittest.mock.patch('subprocess.call')
- def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
- rule = qubespolicy.PolicyRule('$anyvm dom0 allow')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'dom0', rule, 'dom0')
- action.execute('some-ident')
- self.assertEqual(mock_qubesd_call.mock_calls, [])
- self.assertEqual(mock_subprocess.mock_calls,
- [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
- '-c', 'some-ident',
- qubespolicy.QUBES_RPC_MULTIPLEXER_PATH +
- ' test.service test-vm1 dom0'])])
- @unittest.mock.patch('qubespolicy.qubesd_call')
- @unittest.mock.patch('subprocess.call')
- def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
- rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- '$dispvm:default-dvm', rule, '$dispvm:default-dvm')
- mock_qubesd_call.side_effect = (lambda target, call:
- b'dispvm-name' if call == 'admin.vm.CreateDisposable' else
- unittest.mock.DEFAULT)
- action.execute('some-ident')
- self.assertEqual(mock_qubesd_call.mock_calls,
- [unittest.mock.call('default-dvm', 'admin.vm.CreateDisposable'),
- unittest.mock.call('dispvm-name', 'admin.vm.Start'),
- unittest.mock.call('dispvm-name', 'admin.vm.Kill')])
- self.assertEqual(mock_subprocess.mock_calls,
- [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name',
- '-c', 'some-ident', '-W',
- 'DEFAULT:QUBESRPC test.service test-vm1'])])
- @unittest.mock.patch('qubespolicy.qubesd_call')
- @unittest.mock.patch('subprocess.call')
- def test_023_execute_already_running(self, mock_subprocess,
- mock_qubesd_call):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'test-vm2', rule, 'test-vm2')
- mock_qubesd_call.side_effect = \
- qubespolicy.QubesMgmtException('QubesVMNotHaltedError')
- action.execute('some-ident')
- self.assertEqual(mock_qubesd_call.mock_calls,
- [unittest.mock.call('test-vm2', 'admin.vm.Start')])
- self.assertEqual(mock_subprocess.mock_calls,
- [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
- '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
- @unittest.mock.patch('qubespolicy.qubesd_call')
- @unittest.mock.patch('subprocess.call')
- def test_024_execute_startup_error(self, mock_subprocess,
- mock_qubesd_call):
- rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
- action = qubespolicy.PolicyAction('test.service', 'test-vm1',
- 'test-vm2', rule, 'test-vm2')
- mock_qubesd_call.side_effect = \
- qubespolicy.QubesMgmtException('QubesVMError')
- with self.assertRaises(qubespolicy.QubesMgmtException):
- action.execute('some-ident')
- self.assertEqual(mock_qubesd_call.mock_calls,
- [unittest.mock.call('test-vm2', 'admin.vm.Start')])
- self.assertEqual(mock_subprocess.mock_calls, [])
- class TC_20_Policy(qubes.tests.QubesTestCase):
- def setUp(self):
- super(TC_20_Policy, self).setUp()
- if not os.path.exists(tmp_policy_dir):
- os.mkdir(tmp_policy_dir)
- def tearDown(self):
- shutil.rmtree(tmp_policy_dir)
- super(TC_20_Policy, self).tearDown()
- def test_000_load(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('\n')
- f.write('# comment\n')
- f.write('test-vm2 test-vm3 ask\n')
- f.write(' # comment \n')
- f.write('$anyvm $anyvm ask\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- self.assertEqual(policy.service, 'test.service')
- self.assertEqual(len(policy.policy_rules), 3)
- self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
- self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
- self.assertEqual(policy.policy_rules[0].action,
- qubespolicy.Action.allow)
- def test_001_not_existent(self):
- with self.assertRaises(qubespolicy.AccessDenied):
- qubespolicy.Policy('no-such.service', tmp_policy_dir)
- def test_002_include(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('$include:test.service2\n')
- f.write('$anyvm $anyvm deny\n')
- with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
- f.write('test-vm3 $default allow,target=test-vm2\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- self.assertEqual(policy.service, 'test.service')
- self.assertEqual(len(policy.policy_rules), 3)
- self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
- self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
- self.assertEqual(policy.policy_rules[0].action,
- qubespolicy.Action.allow)
- self.assertEqual(policy.policy_rules[0].filename,
- tmp_policy_dir + '/test.service')
- self.assertEqual(policy.policy_rules[0].lineno, 1)
- self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
- self.assertEqual(policy.policy_rules[1].target, '$default')
- self.assertEqual(policy.policy_rules[1].action,
- qubespolicy.Action.allow)
- self.assertEqual(policy.policy_rules[1].filename,
- tmp_policy_dir + '/test.service2')
- self.assertEqual(policy.policy_rules[1].lineno, 1)
- self.assertEqual(policy.policy_rules[2].source, '$anyvm')
- self.assertEqual(policy.policy_rules[2].target, '$anyvm')
- self.assertEqual(policy.policy_rules[2].action,
- qubespolicy.Action.deny)
- self.assertEqual(policy.policy_rules[2].filename,
- tmp_policy_dir + '/test.service')
- self.assertEqual(policy.policy_rules[2].lineno, 3)
- def test_010_find_rule(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('test-vm1 $anyvm ask\n')
- f.write('test-vm2 $tag:tag1 deny\n')
- f.write('test-vm2 $tag:tag2 allow\n')
- f.write('$type:AppVM $default allow,target=test-vm3\n')
- f.write('$tag:tag1 $type:AppVM allow\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1])
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
- # $anyvm matches $default too
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm1', ''), policy.policy_rules[1])
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm2', ''), policy.policy_rules[4])
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-vm2', '$default'), policy.policy_rules[4])
- self.assertEqual(policy.find_matching_rule(
- system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[5])
- with self.assertRaises(qubespolicy.AccessDenied):
- policy.find_matching_rule(
- system_info, 'test-no-dvm', 'test-standalone')
- with self.assertRaises(qubespolicy.AccessDenied):
- policy.find_matching_rule(
- system_info, 'test-standalone', '$default')
- def test_020_collect_targets_for_ask(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('test-vm1 $anyvm ask\n')
- f.write('test-vm2 $tag:tag1 deny\n')
- f.write('test-vm2 $tag:tag2 allow\n')
- f.write('test-no-dvm $type:AppVM deny\n')
- f.write('$type:AppVM $default allow,target=test-vm3\n')
- f.write('$tag:tag1 $type:AppVM allow\n')
- f.write('test-no-dvm $dispvm allow\n')
- f.write('test-standalone $dispvm allow\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- self.assertCountEqual(policy.collect_targets_for_ask(system_info,
- 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3',
- '$dispvm:test-vm3',
- 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
- 'test-no-dvm', 'test-template', 'test-standalone'])
- self.assertCountEqual(policy.collect_targets_for_ask(system_info,
- 'test-vm2'), ['test-vm2', 'test-vm3'])
- self.assertCountEqual(policy.collect_targets_for_ask(system_info,
- 'test-vm3'), ['test-vm3'])
- self.assertCountEqual(policy.collect_targets_for_ask(system_info,
- 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
- 'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
- '$dispvm:default-dvm'])
- self.assertCountEqual(policy.collect_targets_for_ask(system_info,
- 'test-no-dvm'), [])
- def test_030_eval_simple(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- action = policy.evaluate(system_info, 'test-vm1', 'test-vm2')
- self.assertEqual(action.rule, policy.policy_rules[0])
- self.assertEqual(action.action, qubespolicy.Action.allow)
- self.assertEqual(action.target, 'test-vm2')
- self.assertEqual(action.original_target, 'test-vm2')
- self.assertEqual(action.service, 'test.service')
- self.assertIsNone(action.targets_for_ask)
- with self.assertRaises(qubespolicy.AccessDenied):
- policy.evaluate(system_info, 'test-vm2', '$default')
- def test_031_eval_default(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('test-vm1 $default allow,target=test-vm2\n')
- f.write('$tag:tag1 test-vm2 ask\n')
- f.write('$tag:tag2 $anyvm allow\n')
- f.write('test-vm3 $anyvm deny\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- action = policy.evaluate(system_info, 'test-vm1', '$default')
- self.assertEqual(action.rule, policy.policy_rules[1])
- self.assertEqual(action.action, qubespolicy.Action.allow)
- self.assertEqual(action.target, 'test-vm2')
- self.assertEqual(action.original_target, '$default')
- self.assertEqual(action.service, 'test.service')
- self.assertIsNone(action.targets_for_ask)
- with self.assertRaises(qubespolicy.AccessDenied):
- # action allow should hit, but no target specified (either by
- # caller or policy)
- policy.evaluate(system_info, 'test-standalone', '$default')
- def test_032_eval_ask(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('test-vm1 $default allow,target=test-vm2\n')
- f.write('$tag:tag1 test-vm2 ask\n')
- f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
- f.write('$tag:tag2 $anyvm allow\n')
- f.write('test-vm3 $anyvm deny\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
- self.assertEqual(action.rule, policy.policy_rules[2])
- self.assertEqual(action.action, qubespolicy.Action.ask)
- self.assertIsNone(action.target)
- self.assertEqual(action.original_target, 'test-vm2')
- self.assertEqual(action.service, 'test.service')
- self.assertCountEqual(action.targets_for_ask,
- ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
- 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
- 'test-no-dvm', 'test-template', 'test-standalone'])
- def test_033_eval_ask(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm1 test-vm2 allow\n')
- f.write('test-vm1 $default allow,target=test-vm2\n')
- f.write('$tag:tag1 test-vm2 ask\n')
- f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
- f.write('$tag:tag2 $anyvm allow\n')
- f.write('test-vm3 $anyvm deny\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
- self.assertEqual(action.rule, policy.policy_rules[3])
- self.assertEqual(action.action, qubespolicy.Action.ask)
- self.assertEqual(action.target, 'test-vm3')
- self.assertEqual(action.original_target, 'test-vm3')
- self.assertEqual(action.service, 'test.service')
- self.assertCountEqual(action.targets_for_ask,
- ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
- 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
- 'test-no-dvm', 'test-template', 'test-standalone'])
- def test_034_eval_resolve_dispvm(self):
- with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
- f.write('test-vm3 $dispvm allow\n')
- policy = qubespolicy.Policy('test.service', tmp_policy_dir)
- action = policy.evaluate(system_info, 'test-vm3', '$dispvm')
- self.assertEqual(action.rule, policy.policy_rules[0])
- self.assertEqual(action.action, qubespolicy.Action.allow)
- self.assertEqual(action.target, '$dispvm:default-dvm')
- self.assertEqual(action.original_target, '$dispvm')
- self.assertEqual(action.service, 'test.service')
- self.assertIsNone(action.targets_for_ask)
- class TC_30_Misc(qubes.tests.QubesTestCase):
- @unittest.mock.patch('socket.socket')
- def test_000_qubesd_call(self, mock_socket):
- mock_config = {
- 'return_value.makefile.return_value.read.return_value': b'0\x00data'
- }
- mock_socket.configure_mock(**mock_config)
- result = qubespolicy.qubesd_call('test', 'internal.method')
- self.assertEqual(result, b'data')
- self.assertEqual(mock_socket.mock_calls, [
- unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
- unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
- unittest.mock.call().sendall(b'dom0'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'internal.method'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'test'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().shutdown(socket.SHUT_WR),
- unittest.mock.call().makefile('rb'),
- unittest.mock.call().makefile().read(),
- ])
- @unittest.mock.patch('socket.socket')
- def test_001_qubesd_call_arg_payload(self, mock_socket):
- mock_config = {
- 'return_value.makefile.return_value.read.return_value': b'0\x00data'
- }
- mock_socket.configure_mock(**mock_config)
- result = qubespolicy.qubesd_call('test', 'internal.method', 'arg',
- b'payload')
- self.assertEqual(result, b'data')
- self.assertEqual(mock_socket.mock_calls, [
- unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
- unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
- unittest.mock.call().sendall(b'dom0'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'internal.method'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'test'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'arg'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'payload'),
- unittest.mock.call().shutdown(socket.SHUT_WR),
- unittest.mock.call().makefile('rb'),
- unittest.mock.call().makefile().read(),
- ])
- @unittest.mock.patch('socket.socket')
- def test_002_qubesd_call_exception(self, mock_socket):
- mock_config = {
- 'return_value.makefile.return_value.read.return_value':
- b'2\x00SomeError\x00traceback\x00message\x00'
- }
- mock_socket.configure_mock(**mock_config)
- with self.assertRaises(qubespolicy.QubesMgmtException) as e:
- qubespolicy.qubesd_call('test', 'internal.method')
- self.assertEqual(e.exception.exc_type, 'SomeError')
- self.assertEqual(mock_socket.mock_calls, [
- unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
- unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
- unittest.mock.call().sendall(b'dom0'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'internal.method'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'test'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().sendall(b'\x00'),
- unittest.mock.call().shutdown(socket.SHUT_WR),
- unittest.mock.call().makefile('rb'),
- unittest.mock.call().makefile().read(),
- ])
|