Kaynağa Gözat

tests: qubespolicy tests

Fixes QubesOS/qubes-issues#2460
Marek Marczykowski-Górecki 7 yıl önce
ebeveyn
işleme
564408eb3f
2 değiştirilmiş dosya ile 766 ekleme ve 0 silme
  1. 1 0
      qubes/tests/__init__.py
  2. 765 0
      qubespolicy/tests/__init__.py

+ 1 - 0
qubes/tests/__init__.py

@@ -909,6 +909,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
             'qubes.tests.tools.qvm_device',
             'qubes.tests.tools.qvm_firewall',
             'qubes.tests.tools.qvm_ls',
+            'qubespolicy.tests',
             ):
         tests.addTests(loader.loadTestsFromName(modname))
 

+ 765 - 0
qubespolicy/tests/__init__.py

@@ -0,0 +1,765 @@
+# -*- encoding: utf8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2017 Marek Marczykowski-Górecki
+#                               <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, see <http://www.gnu.org/licenses/>.
+import os
+import socket
+import unittest.mock
+
+import shutil
+
+import qubes.tests
+import qubespolicy
+
+tmp_policy_dir = '/tmp/policy'
+
+system_info = {
+    'domains': {
+        'dom0': {
+            'tags': [],
+            'type': 'AdminVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': False,
+        },
+        'test-vm1': {
+            'tags': ['tag1', 'tag2'],
+            'type': 'AppVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': False,
+        },
+        'test-vm2': {
+            'tags': ['tag2'],
+            'type': 'AppVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': False,
+        },
+        'test-vm3': {
+            'tags': [],
+            'type': 'AppVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': True,
+        },
+        'default-dvm': {
+            'tags': [],
+            'type': 'AppVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': True,
+        },
+        'test-invalid-dvm': {
+            'tags': ['tag1', 'tag2'],
+            'type': 'AppVM',
+            'default_dispvm': 'test-vm1',
+            'dispvm_allowed': False,
+        },
+        'test-no-dvm': {
+            'tags': ['tag1', 'tag2'],
+            'type': 'AppVM',
+            'default_dispvm': None,
+            'dispvm_allowed': False,
+        },
+        'test-template': {
+            'tags': ['tag1', 'tag2'],
+            'type': 'TemplateVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': False,
+        },
+        'test-standalone': {
+            'tags': ['tag1', 'tag2'],
+            'type': 'StandaloneVM',
+            'default_dispvm': 'default-dvm',
+            'dispvm_allowed': False,
+        },
+    }
+}
+
+
+class TC_00_PolicyRule(qubes.tests.QubesTestCase):
+    def test_000_verify_target_value(self):
+        self.assertTrue(
+            qubespolicy.verify_target_value(system_info, 'test-vm1'))
+        self.assertTrue(
+            qubespolicy.verify_target_value(system_info, 'default-dvm'))
+        self.assertTrue(
+            qubespolicy.verify_target_value(system_info, '$dispvm'))
+        self.assertTrue(
+            qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm'))
+        self.assertTrue(
+            qubespolicy.verify_target_value(system_info, 'test-template'))
+        self.assertTrue(
+            qubespolicy.verify_target_value(system_info, 'test-standalone'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, 'no-such-vm'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info,
+                '$dispvm:test-invalid-dvm'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, ''))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, '$default'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, '$anyvm'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, '$tag:tag1'))
+        self.assertFalse(
+            qubespolicy.verify_target_value(system_info, '$invalid'))
+
+    def test_010_verify_special_value(self):
+        self.assertTrue(qubespolicy.verify_special_value('$tag:tag',
+            for_target=False))
+        self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag',
+            for_target=False))
+        self.assertTrue(qubespolicy.verify_special_value('$type:AppVM',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('$default',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('$dispvm',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('$invalid',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('vm-name',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('$tag:',
+            for_target=False))
+        self.assertFalse(qubespolicy.verify_special_value('$type:',
+            for_target=False))
+
+    def test_020_line_simple(self):
+        line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12)
+        self.assertEqual(line.filename, 'filename')
+        self.assertEqual(line.lineno, 12)
+        self.assertEqual(line.action, qubespolicy.Action.ask)
+        self.assertEqual(line.source, '$anyvm')
+        self.assertEqual(line.target, '$anyvm')
+        self.assertEqual(line.full_action, 'ask')
+        self.assertIsNone(line.override_target)
+        self.assertIsNone(line.override_user)
+        self.assertIsNone(line.default_target)
+
+    def test_021_line_simple(self):
+        line = qubespolicy.PolicyRule(
+            '$tag:tag1 $type:AppVM ask,target=test-vm2,user=user',
+            'filename', 12)
+        self.assertEqual(line.filename, 'filename')
+        self.assertEqual(line.lineno, 12)
+        self.assertEqual(line.action, qubespolicy.Action.ask)
+        self.assertEqual(line.source, '$tag:tag1')
+        self.assertEqual(line.target, '$type:AppVM')
+        self.assertEqual(line.full_action, 'ask,target=test-vm2,user=user')
+        self.assertEqual(line.override_target, 'test-vm2')
+        self.assertEqual(line.override_user, 'user')
+        self.assertIsNone(line.default_target)
+
+    def test_022_line_simple(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $default allow,target=$dispvm:test-vm2',
+            'filename', 12)
+        self.assertEqual(line.filename, 'filename')
+        self.assertEqual(line.lineno, 12)
+        self.assertEqual(line.action, qubespolicy.Action.allow)
+        self.assertEqual(line.source, '$anyvm')
+        self.assertEqual(line.target, '$default')
+        self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2')
+        self.assertEqual(line.override_target, '$dispvm:test-vm2')
+        self.assertIsNone(line.override_user)
+        self.assertIsNone(line.default_target)
+
+    def test_023_line_simple(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $default ask,default_target=test-vm1',
+            'filename', 12)
+        self.assertEqual(line.filename, 'filename')
+        self.assertEqual(line.lineno, 12)
+        self.assertEqual(line.action, qubespolicy.Action.ask)
+        self.assertEqual(line.source, '$anyvm')
+        self.assertEqual(line.target, '$default')
+        self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
+        self.assertIsNone(line.override_target)
+        self.assertIsNone(line.override_user)
+        self.assertEqual(line.default_target, 'test-vm1')
+
+    def test_030_line_invalid(self):
+        invalid_lines = [
+            '$dispvm $default allow',  # $dispvm can't be a source
+            '$default $default allow',  # $default can't be a source
+            '$anyvm $default deny,target=test-vm1',  # target= used with deny
+            '$anyvm $anyvm deny,default_target=test-vm1',  # default_target=
+            # with deny
+            '$anyvm $anyvm deny,user=user',  # user= with deny
+            '$anyvm $anyvm invalid',  # invalid action
+            '$anyvm $anyvm allow,invalid=xx',  # invalid option
+            '$anyvm $anyvm',  # missing action
+            '$anyvm $anyvm allow,default_target=test-vm1',  # default_target=
+            #  with allow
+            '$invalid $anyvm allow',  # invalid source
+            '$anyvm $invalid deny',  # invalid target
+            '',  # empty line
+            '$anyvm $anyvm allow extra',  # trailing words
+            '$anyvm $default allow',  # $default allow without target=
+        ]
+        for line in invalid_lines:
+            with self.subTest(line):
+                with self.assertRaises(qubespolicy.PolicySyntaxError):
+                    qubespolicy.PolicyRule(line, 'filename', 12)
+
+    def test_040_match_single(self):
+        is_match_single = qubespolicy.PolicyRule.is_match_single
+        self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', '$default'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', ''))
+        self.assertTrue(is_match_single(system_info, '$default', ''))
+        self.assertTrue(is_match_single(system_info, '$default', '$default'))
+        self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1'))
+        self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1'))
+        self.assertTrue(is_match_single(system_info,
+            '$type:TemplateVM', 'test-template'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
+        self.assertTrue(is_match_single(system_info,
+            '$anyvm', '$dispvm:default-dvm'))
+        self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm'))
+        self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
+        self.assertTrue(is_match_single(system_info,
+            '$dispvm:default-dvm', '$dispvm:default-dvm'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
+        self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
+
+        self.assertFalse(is_match_single(system_info, '$default', 'test-vm1'))
+        self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
+        # test-vm1.dispvm_allowed=False
+        self.assertFalse(is_match_single(system_info,
+            '$anyvm', '$dispvm:test-vm1'))
+        # test-vm1.dispvm_allowed=False
+        self.assertFalse(is_match_single(system_info,
+            '$dispvm:test-vm1', '$dispvm:test-vm1'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0'))
+        self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid'))
+        self.assertFalse(is_match_single(system_info, '$invalid', '$invalid'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
+        self.assertFalse(is_match_single(system_info,
+            'no-such-vm', 'no-such-vm'))
+        self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1'))
+        self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm'))
+        self.assertFalse(is_match_single(system_info,
+            '$dispvm:default-dvm', 'default-dvm'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n'))
+        self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1  '))
+
+    def test_050_match(self):
+        line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
+        line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
+        line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
+
+    def test_060_expand_target(self):
+        lines = {
+            '$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
+                '$dispvm:test-vm3',
+                'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
+                'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'],
+            '$anyvm $dispvm allow': ['$dispvm'],
+            '$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'],
+            # no DispVM from test-vm1 allowed
+            '$anyvm $dispvm:test-vm1 allow': [],
+            '$anyvm test-vm1 allow': ['test-vm1'],
+            '$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
+                'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
+            '$anyvm $type:TemplateVM allow': ['test-template'],
+            '$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
+                'test-template', 'test-standalone', 'test-no-dvm'],
+            '$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2',
+                'test-invalid-dvm', 'test-template', 'test-standalone',
+                'test-no-dvm'],
+            '$anyvm $tag:no-such-tag allow': [],
+        }
+        for line in lines:
+            with self.subTest(line):
+                policy_line = qubespolicy.PolicyRule(line)
+                self.assertCountEqual(list(policy_line.expand_target(system_info)),
+                    lines[line])
+
+    def test_070_expand_override_target(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $anyvm allow,target=test-vm2')
+        self.assertEqual(
+            line.expand_override_target(system_info, 'test-vm1'),
+            'test-vm2')
+
+    def test_071_expand_override_target_dispvm(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $anyvm allow,target=$dispvm')
+        self.assertEqual(
+            line.expand_override_target(system_info, 'test-vm1'),
+            '$dispvm:default-dvm')
+
+    def test_072_expand_override_target_dispvm_specific(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $anyvm allow,target=$dispvm:test-vm3')
+        self.assertEqual(
+            line.expand_override_target(system_info, 'test-vm1'),
+            '$dispvm:test-vm3')
+
+    def test_073_expand_override_target_dispvm_none(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $anyvm allow,target=$dispvm')
+        self.assertEqual(
+            line.expand_override_target(system_info, 'test-no-dvm'),
+            None)
+
+    def test_074_expand_override_target_dom0(self):
+        line = qubespolicy.PolicyRule(
+            '$anyvm $anyvm allow,target=dom0')
+        self.assertEqual(
+            line.expand_override_target(system_info, 'test-no-dvm'),
+            'dom0')
+
+
+class TC_10_PolicyAction(qubes.tests.QubesTestCase):
+    def test_000_init(self):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm deny')
+        with self.assertRaises(qubespolicy.AccessDenied):
+            qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
+                rule, 'test-vm2')
+
+    def test_001_init(self):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
+        self.assertEqual(action.service, 'test.service')
+        self.assertEqual(action.source, 'test-vm1')
+        self.assertIsNone(action.target)
+        self.assertEqual(action.original_target, 'test-vm2')
+        self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3'])
+        self.assertEqual(action.rule, rule)
+        self.assertEqual(action.action, qubespolicy.Action.ask)
+
+    def test_002_init_invalid(self):
+        rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
+        rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        with self.assertRaises(AssertionError):
+            qubespolicy.PolicyAction('test.service', 'test-vm1',
+            None, rule_allow, 'test-vm2', None)
+        with self.assertRaises(AssertionError):
+            qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3'])
+
+        with self.assertRaises(AssertionError):
+            qubespolicy.PolicyAction('test.service', 'test-vm1',
+            None, rule_ask, 'test-vm2', None)
+
+    def test_003_init_default_target(self):
+        rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
+
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
+        self.assertIsNone(action.target)
+
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'test-vm2', rule_ask, 'test-vm2', ['test-vm2'])
+        self.assertEqual(action.target, 'test-vm2')
+
+    def test_010_handle_user_response(self):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
+        action.handle_user_response(True, 'test-vm2')
+        self.assertEqual(action.action, qubespolicy.Action.allow)
+        self.assertEqual(action.target, 'test-vm2')
+
+    def test_011_handle_user_response(self):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
+        with self.assertRaises(AssertionError):
+            action.handle_user_response(True, 'test-no-dvm')
+
+    def test_012_handle_user_response(self):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
+        with self.assertRaises(qubespolicy.AccessDenied):
+            action.handle_user_response(False, None)
+        self.assertEqual(action.action, qubespolicy.Action.deny)
+
+    @unittest.mock.patch('qubespolicy.qubesd_call')
+    @unittest.mock.patch('subprocess.call')
+    def test_020_execute(self, mock_subprocess, mock_qubesd_call):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'test-vm2', rule, 'test-vm2')
+        action.execute('some-ident')
+        self.assertEqual(mock_qubesd_call.mock_calls,
+            [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')])
+        self.assertEqual(mock_subprocess.mock_calls,
+            [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
+             '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
+
+    @unittest.mock.patch('qubespolicy.qubesd_call')
+    @unittest.mock.patch('subprocess.call')
+    def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
+        rule = qubespolicy.PolicyRule('$anyvm dom0 allow')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'dom0', rule, 'dom0')
+        action.execute('some-ident')
+        self.assertEqual(mock_qubesd_call.mock_calls,
+            [unittest.mock.call('dom0', 'mgmtinternal.vm.Start')])
+        self.assertEqual(mock_subprocess.mock_calls,
+            [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
+             '-c', 'some-ident',
+             qubespolicy.QUBES_RPC_MULTIPLEXER_PATH +
+             ' test.service test-vm1 dom0'])])
+
+    @unittest.mock.patch('qubespolicy.qubesd_call')
+    @unittest.mock.patch('subprocess.call')
+    def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
+        rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            '$dispvm:default-dvm', rule, '$dispvm:default-dvm')
+        mock_qubesd_call.side_effect = (lambda target, call:
+            b'dispvm-name' if call == 'mgmtinternal.vm.Create.DispVM' else
+            unittest.mock.DEFAULT)
+        action.execute('some-ident')
+        self.assertEqual(mock_qubesd_call.mock_calls,
+            [unittest.mock.call('default-dvm', 'mgmtinternal.vm.Create.DispVM'),
+             unittest.mock.call('dispvm-name', 'mgmtinternal.vm.Start'),
+             unittest.mock.call('dispvm-name',
+                'mgmtinternal.vm.CleanupDispVM')])
+        self.assertEqual(mock_subprocess.mock_calls,
+            [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name',
+             '-c', 'some-ident', '-W',
+             'DEFAULT:QUBESRPC test.service test-vm1'])])
+
+    @unittest.mock.patch('qubespolicy.qubesd_call')
+    @unittest.mock.patch('subprocess.call')
+    def test_023_execute_already_running(self, mock_subprocess,
+            mock_qubesd_call):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'test-vm2', rule, 'test-vm2')
+        mock_qubesd_call.side_effect = \
+            qubespolicy.QubesMgmtException('QubesVMNotHaltedError')
+        action.execute('some-ident')
+        self.assertEqual(mock_qubesd_call.mock_calls,
+            [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')])
+        self.assertEqual(mock_subprocess.mock_calls,
+            [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
+             '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
+
+    @unittest.mock.patch('qubespolicy.qubesd_call')
+    @unittest.mock.patch('subprocess.call')
+    def test_024_execute_startup_error(self, mock_subprocess,
+            mock_qubesd_call):
+        rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
+        action = qubespolicy.PolicyAction('test.service', 'test-vm1',
+            'test-vm2', rule, 'test-vm2')
+        mock_qubesd_call.side_effect = \
+            qubespolicy.QubesMgmtException('QubesVMError')
+        with self.assertRaises(qubespolicy.QubesMgmtException):
+            action.execute('some-ident')
+        self.assertEqual(mock_qubesd_call.mock_calls,
+            [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')])
+        self.assertEqual(mock_subprocess.mock_calls, [])
+
+@unittest.mock.patch('qubespolicy.POLICY_DIR', tmp_policy_dir)
+class TC_20_Policy(qubes.tests.QubesTestCase):
+
+    def setUp(self):
+        super(TC_20_Policy, self).setUp()
+        if not os.path.exists(tmp_policy_dir):
+            os.mkdir(tmp_policy_dir)
+
+    def tearDown(self):
+        shutil.rmtree(tmp_policy_dir)
+        super(TC_20_Policy, self).tearDown()
+
+    def test_000_load(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('\n')
+            f.write('# comment\n')
+            f.write('test-vm2 test-vm3 ask\n')
+            f.write('   # comment  \n')
+            f.write('$anyvm $anyvm ask\n')
+        policy = qubespolicy.Policy('test.service')
+        self.assertEqual(policy.service, 'test.service')
+        self.assertEqual(len(policy.policy_rules), 3)
+        self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
+        self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
+        self.assertEqual(policy.policy_rules[0].action,
+            qubespolicy.Action.allow)
+
+    def test_001_not_existent(self):
+        with self.assertRaises(qubespolicy.AccessDenied):
+            qubespolicy.Policy('no-such.service')
+
+    def test_002_include(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('$include:test.service2\n')
+            f.write('$anyvm $anyvm deny\n')
+        with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
+            f.write('test-vm3 $default allow,target=test-vm2\n')
+        policy = qubespolicy.Policy('test.service')
+        self.assertEqual(policy.service, 'test.service')
+        self.assertEqual(len(policy.policy_rules), 3)
+        self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
+        self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
+        self.assertEqual(policy.policy_rules[0].action,
+            qubespolicy.Action.allow)
+        self.assertEqual(policy.policy_rules[0].filename,
+            tmp_policy_dir + '/test.service')
+        self.assertEqual(policy.policy_rules[0].lineno, 1)
+        self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
+        self.assertEqual(policy.policy_rules[1].target, '$default')
+        self.assertEqual(policy.policy_rules[1].action,
+            qubespolicy.Action.allow)
+        self.assertEqual(policy.policy_rules[1].filename,
+            tmp_policy_dir + '/test.service2')
+        self.assertEqual(policy.policy_rules[1].lineno, 1)
+        self.assertEqual(policy.policy_rules[2].source, '$anyvm')
+        self.assertEqual(policy.policy_rules[2].target, '$anyvm')
+        self.assertEqual(policy.policy_rules[2].action,
+            qubespolicy.Action.deny)
+        self.assertEqual(policy.policy_rules[2].filename,
+            tmp_policy_dir + '/test.service')
+        self.assertEqual(policy.policy_rules[2].lineno, 3)
+
+    def test_010_find_rule(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('test-vm1 $anyvm ask\n')
+            f.write('test-vm2 $tag:tag1 deny\n')
+            f.write('test-vm2 $tag:tag2 allow\n')
+            f.write('$type:AppVM $default allow,target=test-vm3\n')
+            f.write('$tag:tag1 $type:AppVM allow\n')
+        policy = qubespolicy.Policy('test.service')
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1])
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
+        # $anyvm matches $default too
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm1', ''), policy.policy_rules[1])
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm2', ''), policy.policy_rules[4])
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-vm2', '$default'), policy.policy_rules[4])
+        self.assertEqual(policy.find_matching_rule(
+            system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[5])
+        with self.assertRaises(qubespolicy.AccessDenied):
+            policy.find_matching_rule(
+                system_info, 'test-no-dvm', 'test-standalone')
+        with self.assertRaises(qubespolicy.AccessDenied):
+            policy.find_matching_rule(
+                system_info, 'test-standalone', '$default')
+
+    def test_020_collect_targets_for_ask(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('test-vm1 $anyvm ask\n')
+            f.write('test-vm2 $tag:tag1 deny\n')
+            f.write('test-vm2 $tag:tag2 allow\n')
+            f.write('test-no-dvm $type:AppVM deny\n')
+            f.write('$type:AppVM $default allow,target=test-vm3\n')
+            f.write('$tag:tag1 $type:AppVM allow\n')
+            f.write('test-no-dvm $dispvm allow\n')
+            f.write('test-standalone $dispvm allow\n')
+        policy = qubespolicy.Policy('test.service')
+        self.assertCountEqual(policy.collect_targets_for_ask(system_info,
+            'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3',
+                '$dispvm:test-vm3',
+                'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
+                'test-no-dvm', 'test-template', 'test-standalone'])
+        self.assertCountEqual(policy.collect_targets_for_ask(system_info,
+            'test-vm2'), ['test-vm2', 'test-vm3'])
+        self.assertCountEqual(policy.collect_targets_for_ask(system_info,
+            'test-vm3'), ['test-vm3'])
+        self.assertCountEqual(policy.collect_targets_for_ask(system_info,
+            'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
+            'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
+            '$dispvm:default-dvm'])
+        self.assertCountEqual(policy.collect_targets_for_ask(system_info,
+            'test-no-dvm'), [])
+
+    def test_030_eval_simple(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+
+        policy = qubespolicy.Policy('test.service')
+        action = policy.evaluate(system_info, 'test-vm1', 'test-vm2')
+        self.assertEqual(action.rule, policy.policy_rules[0])
+        self.assertEqual(action.action, qubespolicy.Action.allow)
+        self.assertEqual(action.target, 'test-vm2')
+        self.assertEqual(action.original_target, 'test-vm2')
+        self.assertEqual(action.service, 'test.service')
+        self.assertIsNone(action.targets_for_ask)
+        with self.assertRaises(qubespolicy.AccessDenied):
+            policy.evaluate(system_info, 'test-vm2', '$default')
+
+    def test_031_eval_default(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('test-vm1 $default allow,target=test-vm2\n')
+            f.write('$tag:tag1 test-vm2 ask\n')
+            f.write('$tag:tag2 $anyvm allow\n')
+            f.write('test-vm3 $anyvm deny\n')
+
+        policy = qubespolicy.Policy('test.service')
+        action = policy.evaluate(system_info, 'test-vm1', '$default')
+        self.assertEqual(action.rule, policy.policy_rules[1])
+        self.assertEqual(action.action, qubespolicy.Action.allow)
+        self.assertEqual(action.target, 'test-vm2')
+        self.assertEqual(action.original_target, '$default')
+        self.assertEqual(action.service, 'test.service')
+        self.assertIsNone(action.targets_for_ask)
+        with self.assertRaises(qubespolicy.AccessDenied):
+            # action allow should hit, but no target specified (either by
+            # caller or policy)
+            policy.evaluate(system_info, 'test-standalone', '$default')
+
+    def test_032_eval_ask(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('test-vm1 $default allow,target=test-vm2\n')
+            f.write('$tag:tag1 test-vm2 ask\n')
+            f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
+            f.write('$tag:tag2 $anyvm allow\n')
+            f.write('test-vm3 $anyvm deny\n')
+
+        policy = qubespolicy.Policy('test.service')
+        action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
+        self.assertEqual(action.rule, policy.policy_rules[2])
+        self.assertEqual(action.action, qubespolicy.Action.ask)
+        self.assertIsNone(action.target)
+        self.assertEqual(action.original_target, 'test-vm2')
+        self.assertEqual(action.service, 'test.service')
+        self.assertCountEqual(action.targets_for_ask,
+            ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
+                'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
+                'test-no-dvm', 'test-template', 'test-standalone'])
+
+    def test_033_eval_ask(self):
+        with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
+            f.write('test-vm1 test-vm2 allow\n')
+            f.write('test-vm1 $default allow,target=test-vm2\n')
+            f.write('$tag:tag1 test-vm2 ask\n')
+            f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
+            f.write('$tag:tag2 $anyvm allow\n')
+            f.write('test-vm3 $anyvm deny\n')
+
+        policy = qubespolicy.Policy('test.service')
+        action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
+        self.assertEqual(action.rule, policy.policy_rules[3])
+        self.assertEqual(action.action, qubespolicy.Action.ask)
+        self.assertEqual(action.target, 'test-vm3')
+        self.assertEqual(action.original_target, 'test-vm3')
+        self.assertEqual(action.service, 'test.service')
+        self.assertCountEqual(action.targets_for_ask,
+            ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
+                'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
+                'test-no-dvm', 'test-template', 'test-standalone'])
+
+
+class TC_30_Misc(qubes.tests.QubesTestCase):
+    @unittest.mock.patch('socket.socket')
+    def test_000_qubesd_call(self, mock_socket):
+        mock_config = {
+            'return_value.makefile.return_value.read.return_value': b'0\x00data'
+        }
+        mock_socket.configure_mock(**mock_config)
+        result = qubespolicy.qubesd_call('test', 'method')
+        self.assertEqual(result, b'data')
+        self.assertEqual(mock_socket.mock_calls, [
+            unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
+            unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
+            unittest.mock.call().sendall(b'dom0'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'method'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'test'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().shutdown(socket.SHUT_WR),
+            unittest.mock.call().makefile('rb'),
+            unittest.mock.call().makefile().read(),
+        ])
+
+    @unittest.mock.patch('socket.socket')
+    def test_001_qubesd_call_arg_payload(self, mock_socket):
+        mock_config = {
+            'return_value.makefile.return_value.read.return_value': b'0\x00data'
+        }
+        mock_socket.configure_mock(**mock_config)
+        result = qubespolicy.qubesd_call('test', 'method', 'arg', b'payload')
+        self.assertEqual(result, b'data')
+        self.assertEqual(mock_socket.mock_calls, [
+            unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
+            unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
+            unittest.mock.call().sendall(b'dom0'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'method'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'test'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'arg'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'payload'),
+            unittest.mock.call().shutdown(socket.SHUT_WR),
+            unittest.mock.call().makefile('rb'),
+            unittest.mock.call().makefile().read(),
+        ])
+
+    @unittest.mock.patch('socket.socket')
+    def test_002_qubesd_call_exception(self, mock_socket):
+        mock_config = {
+            'return_value.makefile.return_value.read.return_value':
+                b'2\x00SomeError\x00traceback\x00message\x00'
+        }
+        mock_socket.configure_mock(**mock_config)
+        with self.assertRaises(qubespolicy.QubesMgmtException) as e:
+            qubespolicy.qubesd_call('test', 'method')
+        self.assertEqual(e.exception.exc_type, 'SomeError')
+        self.assertEqual(mock_socket.mock_calls, [
+            unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
+            unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
+            unittest.mock.call().sendall(b'dom0'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'method'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'test'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().sendall(b'\x00'),
+            unittest.mock.call().shutdown(socket.SHUT_WR),
+            unittest.mock.call().makefile('rb'),
+            unittest.mock.call().makefile().read(),
+        ])
+