qubespolicy: implement $dispvm:$tag: syntax
This allow to specify allowed/forbidden DispVM base using tags, not only static name. Fixes QubesOS/qubes-issues#3048
This commit is contained in:
parent
99bd193688
commit
5e870e4b6a
@ -29,6 +29,7 @@ Each line consist of three values separated by white characters (space(s), tab(s
|
|||||||
AppVM, TemplateVM, StandaloneVM, DispVM
|
AppVM, TemplateVM, StandaloneVM, DispVM
|
||||||
- `$default` - used when caller did not specified any VM
|
- `$default` - used when caller did not specified any VM
|
||||||
- `$dispvm:vm-name` - _new_ Disposable VM created from AppVM `vm-name`
|
- `$dispvm:vm-name` - _new_ Disposable VM created from AppVM `vm-name`
|
||||||
|
- `$dispvm:$tag:some-tag` - _new_ Disposable VM created from AppVM tagged with `some-tag`
|
||||||
- `$dispvm` - _new_ Disposable VM created from AppVM pointed by caller
|
- `$dispvm` - _new_ Disposable VM created from AppVM pointed by caller
|
||||||
property `default_dispvm`, which defaults to global property `default_dispvm`
|
property `default_dispvm`, which defaults to global property `default_dispvm`
|
||||||
- `$adminvm` - Admin VM aka dom0
|
- `$adminvm` - Admin VM aka dom0
|
||||||
|
@ -196,7 +196,9 @@ class PolicyRule(object):
|
|||||||
|
|
||||||
if self.override_target is not None:
|
if self.override_target is not None:
|
||||||
if self.override_target.startswith('$') and \
|
if self.override_target.startswith('$') and \
|
||||||
not self.override_target.startswith('$dispvm') and \
|
(not self.override_target.startswith('$dispvm') or
|
||||||
|
self.override_target.startswith('$dispvm:$tag:') or
|
||||||
|
self.override_target.startswith('$tag:')) and \
|
||||||
self.override_target != '$adminvm':
|
self.override_target != '$adminvm':
|
||||||
raise PolicySyntaxError(filename, lineno,
|
raise PolicySyntaxError(filename, lineno,
|
||||||
'target= option needs to name specific target')
|
'target= option needs to name specific target')
|
||||||
@ -244,9 +246,18 @@ class PolicyRule(object):
|
|||||||
if value == policy_value:
|
if value == policy_value:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# if $dispvm* not matched above, reject it; missing ':' is
|
# DispVM request, using tags to match
|
||||||
# intentional - handle both '$dispvm' and '$dispvm:xxx'
|
if policy_value.startswith('$dispvm:$tag:') \
|
||||||
if value.startswith('$dispvm'):
|
and value.startswith('$dispvm:'):
|
||||||
|
tag = policy_value.split(':', 2)[2]
|
||||||
|
dispvm_base = value.split(':', 1)[1]
|
||||||
|
# already checked for existence by verify_target_value call
|
||||||
|
dispvm_base_info = system_info['domains'][dispvm_base]
|
||||||
|
return tag in dispvm_base_info['tags']
|
||||||
|
|
||||||
|
# if $dispvm* not matched above, reject it; default DispVM (bare
|
||||||
|
# $dispvm) was resolved by the caller
|
||||||
|
if value.startswith('$dispvm:'):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# require $adminvm to be matched explicitly (not through $tag or $type)
|
# require $adminvm to be matched explicitly (not through $tag or $type)
|
||||||
@ -281,6 +292,17 @@ class PolicyRule(object):
|
|||||||
|
|
||||||
if not self.is_match_single(system_info, self.source, source):
|
if not self.is_match_single(system_info, self.source, source):
|
||||||
return False
|
return False
|
||||||
|
# $dispvm in policy matches _only_ $dispvm (but not $dispvm:some-vm,
|
||||||
|
# even if that would be the default one)
|
||||||
|
if self.target == '$dispvm' and target == '$dispvm':
|
||||||
|
return True
|
||||||
|
if target == '$dispvm':
|
||||||
|
# resolve default DispVM, to check all kinds of $dispvm:*
|
||||||
|
default_dispvm = system_info['domains'][source]['default_dispvm']
|
||||||
|
if default_dispvm is None:
|
||||||
|
# if this VM have no default DispVM, match only with $anyvm
|
||||||
|
return self.target == '$anyvm'
|
||||||
|
target = '$dispvm:' + default_dispvm
|
||||||
if not self.is_match_single(system_info, self.target, target):
|
if not self.is_match_single(system_info, self.target, target):
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
@ -310,6 +332,12 @@ class PolicyRule(object):
|
|||||||
if domain['template_for_dispvms']:
|
if domain['template_for_dispvms']:
|
||||||
yield '$dispvm:' + name
|
yield '$dispvm:' + name
|
||||||
yield '$dispvm'
|
yield '$dispvm'
|
||||||
|
elif self.target.startswith('$dispvm:$tag:'):
|
||||||
|
tag = self.target.split(':', 2)[2]
|
||||||
|
for name, domain in system_info['domains'].items():
|
||||||
|
if tag in domain['tags']:
|
||||||
|
if domain['template_for_dispvms']:
|
||||||
|
yield '$dispvm:' + name
|
||||||
elif self.target.startswith('$dispvm:'):
|
elif self.target.startswith('$dispvm:'):
|
||||||
dispvm_base = self.target.split(':', 1)[1]
|
dispvm_base = self.target.split(':', 1)[1]
|
||||||
try:
|
try:
|
||||||
|
@ -49,7 +49,7 @@ system_info = {
|
|||||||
'template_for_dispvms': False,
|
'template_for_dispvms': False,
|
||||||
},
|
},
|
||||||
'test-vm3': {
|
'test-vm3': {
|
||||||
'tags': [],
|
'tags': ['tag3'],
|
||||||
'type': 'AppVM',
|
'type': 'AppVM',
|
||||||
'default_dispvm': 'default-dvm',
|
'default_dispvm': 'default-dvm',
|
||||||
'template_for_dispvms': True,
|
'template_for_dispvms': True,
|
||||||
@ -119,6 +119,8 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
qubespolicy.verify_target_value(system_info, '$anyvm'))
|
qubespolicy.verify_target_value(system_info, '$anyvm'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
qubespolicy.verify_target_value(system_info, '$tag:tag1'))
|
qubespolicy.verify_target_value(system_info, '$tag:tag1'))
|
||||||
|
self.assertFalse(
|
||||||
|
qubespolicy.verify_target_value(system_info, '$dispvm:$tag:tag1'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
qubespolicy.verify_target_value(system_info, '$invalid'))
|
qubespolicy.verify_target_value(system_info, '$invalid'))
|
||||||
|
|
||||||
@ -131,12 +133,18 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
for_target=False))
|
for_target=False))
|
||||||
self.assertTrue(qubespolicy.verify_special_value('$adminvm',
|
self.assertTrue(qubespolicy.verify_special_value('$adminvm',
|
||||||
for_target=False))
|
for_target=False))
|
||||||
|
self.assertTrue(qubespolicy.verify_special_value('$dispvm:some-vm',
|
||||||
|
for_target=True))
|
||||||
|
self.assertTrue(qubespolicy.verify_special_value('$dispvm:$tag:tag1',
|
||||||
|
for_target=True))
|
||||||
self.assertFalse(qubespolicy.verify_special_value('$default',
|
self.assertFalse(qubespolicy.verify_special_value('$default',
|
||||||
for_target=False))
|
for_target=False))
|
||||||
self.assertFalse(qubespolicy.verify_special_value('$dispvm',
|
self.assertFalse(qubespolicy.verify_special_value('$dispvm',
|
||||||
for_target=False))
|
for_target=False))
|
||||||
self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm',
|
self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm',
|
||||||
for_target=False))
|
for_target=False))
|
||||||
|
self.assertFalse(qubespolicy.verify_special_value('$dispvm:$tag:tag1',
|
||||||
|
for_target=False))
|
||||||
self.assertFalse(qubespolicy.verify_special_value('$invalid',
|
self.assertFalse(qubespolicy.verify_special_value('$invalid',
|
||||||
for_target=False))
|
for_target=False))
|
||||||
self.assertFalse(qubespolicy.verify_special_value('vm-name',
|
self.assertFalse(qubespolicy.verify_special_value('vm-name',
|
||||||
@ -219,6 +227,9 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
invalid_lines = [
|
invalid_lines = [
|
||||||
'$dispvm $default allow', # $dispvm can't be a source
|
'$dispvm $default allow', # $dispvm can't be a source
|
||||||
'$default $default allow', # $default can't be a source
|
'$default $default allow', # $default can't be a source
|
||||||
|
'$anyvm $default allow,target=$dispvm:$tag:tag1', # $dispvm:$tag
|
||||||
|
# as override target
|
||||||
|
'$anyvm $default allow,target=$tag:tag1', # $tag as override target
|
||||||
'$anyvm $default deny,target=test-vm1', # target= used with deny
|
'$anyvm $default deny,target=test-vm1', # target= used with deny
|
||||||
'$anyvm $anyvm deny,default_target=test-vm1', # default_target=
|
'$anyvm $anyvm deny,default_target=test-vm1', # default_target=
|
||||||
# with deny
|
# with deny
|
||||||
@ -254,6 +265,8 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
self.assertTrue(is_match_single(system_info,
|
self.assertTrue(is_match_single(system_info,
|
||||||
'$anyvm', '$dispvm:default-dvm'))
|
'$anyvm', '$dispvm:default-dvm'))
|
||||||
self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm'))
|
self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm'))
|
||||||
|
self.assertTrue(is_match_single(system_info,
|
||||||
|
'$dispvm:$tag:tag3', '$dispvm:test-vm3'))
|
||||||
self.assertTrue(is_match_single(system_info, '$adminvm', '$adminvm'))
|
self.assertTrue(is_match_single(system_info, '$adminvm', '$adminvm'))
|
||||||
self.assertTrue(is_match_single(system_info, '$adminvm', 'dom0'))
|
self.assertTrue(is_match_single(system_info, '$adminvm', 'dom0'))
|
||||||
self.assertTrue(is_match_single(system_info, 'dom0', '$adminvm'))
|
self.assertTrue(is_match_single(system_info, 'dom0', '$adminvm'))
|
||||||
@ -274,6 +287,14 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
# test-vm1.template_for_dispvms=False
|
# test-vm1.template_for_dispvms=False
|
||||||
self.assertFalse(is_match_single(system_info,
|
self.assertFalse(is_match_single(system_info,
|
||||||
'$dispvm:test-vm1', '$dispvm:test-vm1'))
|
'$dispvm:test-vm1', '$dispvm:test-vm1'))
|
||||||
|
self.assertFalse(is_match_single(system_info,
|
||||||
|
'$dispvm:$tag:tag1', '$dispvm:test-vm1'))
|
||||||
|
# test-vm3 has not tag1
|
||||||
|
self.assertFalse(is_match_single(system_info,
|
||||||
|
'$dispvm:$tag:tag1', '$dispvm:test-vm3'))
|
||||||
|
# default-dvm has no tag3
|
||||||
|
self.assertFalse(is_match_single(system_info,
|
||||||
|
'$dispvm:$tag:tag3', '$dispvm:default-dvm'))
|
||||||
self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0'))
|
self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0'))
|
||||||
self.assertFalse(is_match_single(system_info, '$anyvm', '$adminvm'))
|
self.assertFalse(is_match_single(system_info, '$anyvm', '$adminvm'))
|
||||||
self.assertFalse(is_match_single(system_info,
|
self.assertFalse(is_match_single(system_info,
|
||||||
@ -306,6 +327,19 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
|
self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
|
||||||
line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
|
line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
|
||||||
self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
|
self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
|
||||||
|
line = qubespolicy.PolicyRule('$anyvm $dispvm allow')
|
||||||
|
self.assertTrue(line.is_match(system_info, 'test-vm1', '$dispvm'))
|
||||||
|
line = qubespolicy.PolicyRule('$anyvm $dispvm allow')
|
||||||
|
self.assertFalse(line.is_match(system_info,
|
||||||
|
'test-vm1', '$dispvm:default-dvm'))
|
||||||
|
line = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
|
||||||
|
self.assertTrue(line.is_match(system_info, 'test-vm1', '$dispvm'))
|
||||||
|
line = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
|
||||||
|
self.assertTrue(line.is_match(system_info,
|
||||||
|
'test-vm1', '$dispvm:default-dvm'))
|
||||||
|
line = qubespolicy.PolicyRule('$anyvm $dispvm:$tag:tag3 allow')
|
||||||
|
self.assertTrue(line.is_match(system_info,
|
||||||
|
'test-vm1', '$dispvm:test-vm3'))
|
||||||
|
|
||||||
def test_060_expand_target(self):
|
def test_060_expand_target(self):
|
||||||
lines = {
|
lines = {
|
||||||
@ -317,6 +351,9 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
|
|||||||
'$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'],
|
'$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'],
|
||||||
# no DispVM from test-vm1 allowed
|
# no DispVM from test-vm1 allowed
|
||||||
'$anyvm $dispvm:test-vm1 allow': [],
|
'$anyvm $dispvm:test-vm1 allow': [],
|
||||||
|
'$anyvm $dispvm:test-vm3 allow': ['$dispvm:test-vm3'],
|
||||||
|
'$anyvm $dispvm:$tag:tag1 allow': [],
|
||||||
|
'$anyvm $dispvm:$tag:tag3 allow': ['$dispvm:test-vm3'],
|
||||||
'$anyvm test-vm1 allow': ['test-vm1'],
|
'$anyvm test-vm1 allow': ['test-vm1'],
|
||||||
'$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
|
'$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
|
||||||
'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
|
'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
|
||||||
@ -599,6 +636,9 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
|
|||||||
f.write('test-vm1 $anyvm ask\n')
|
f.write('test-vm1 $anyvm ask\n')
|
||||||
f.write('test-vm2 $tag:tag1 deny\n')
|
f.write('test-vm2 $tag:tag1 deny\n')
|
||||||
f.write('test-vm2 $tag:tag2 allow\n')
|
f.write('test-vm2 $tag:tag2 allow\n')
|
||||||
|
f.write('test-vm2 $dispvm:$tag:tag3 allow\n')
|
||||||
|
f.write('test-vm2 $dispvm:$tag:tag2 allow\n')
|
||||||
|
f.write('test-vm2 $dispvm:default-dvm allow\n')
|
||||||
f.write('$type:AppVM $default allow,target=test-vm3\n')
|
f.write('$type:AppVM $default allow,target=test-vm3\n')
|
||||||
f.write('$tag:tag1 $type:AppVM allow\n')
|
f.write('$tag:tag1 $type:AppVM allow\n')
|
||||||
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
|
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
|
||||||
@ -614,14 +654,22 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
|
|||||||
self.assertEqual(policy.find_matching_rule(
|
self.assertEqual(policy.find_matching_rule(
|
||||||
system_info, 'test-vm1', ''), policy.policy_rules[1])
|
system_info, 'test-vm1', ''), policy.policy_rules[1])
|
||||||
self.assertEqual(policy.find_matching_rule(
|
self.assertEqual(policy.find_matching_rule(
|
||||||
system_info, 'test-vm2', ''), policy.policy_rules[4])
|
system_info, 'test-vm2', ''), policy.policy_rules[7])
|
||||||
self.assertEqual(policy.find_matching_rule(
|
self.assertEqual(policy.find_matching_rule(
|
||||||
system_info, 'test-vm2', '$default'), policy.policy_rules[4])
|
system_info, 'test-vm2', '$default'), policy.policy_rules[7])
|
||||||
self.assertEqual(policy.find_matching_rule(
|
self.assertEqual(policy.find_matching_rule(
|
||||||
system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[5])
|
system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8])
|
||||||
|
self.assertEqual(policy.find_matching_rule(
|
||||||
|
system_info, 'test-vm2', '$dispvm:test-vm3'),
|
||||||
|
policy.policy_rules[4])
|
||||||
|
self.assertEqual(policy.find_matching_rule(
|
||||||
|
system_info, 'test-vm2', '$dispvm'),
|
||||||
|
policy.policy_rules[6])
|
||||||
with self.assertRaises(qubespolicy.AccessDenied):
|
with self.assertRaises(qubespolicy.AccessDenied):
|
||||||
policy.find_matching_rule(
|
policy.find_matching_rule(
|
||||||
system_info, 'test-no-dvm', 'test-standalone')
|
system_info, 'test-no-dvm', 'test-standalone')
|
||||||
|
with self.assertRaises(qubespolicy.AccessDenied):
|
||||||
|
policy.find_matching_rule(system_info, 'test-no-dvm', '$dispvm')
|
||||||
with self.assertRaises(qubespolicy.AccessDenied):
|
with self.assertRaises(qubespolicy.AccessDenied):
|
||||||
policy.find_matching_rule(
|
policy.find_matching_rule(
|
||||||
system_info, 'test-standalone', '$default')
|
system_info, 'test-standalone', '$default')
|
||||||
@ -745,6 +793,30 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
|
|||||||
self.assertEqual(action.service, 'test.service')
|
self.assertEqual(action.service, 'test.service')
|
||||||
self.assertIsNone(action.targets_for_ask)
|
self.assertIsNone(action.targets_for_ask)
|
||||||
|
|
||||||
|
def test_035_eval_resolve_dispvm_fail(self):
|
||||||
|
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
|
||||||
|
f.write('test-no-dvm $dispvm allow\n')
|
||||||
|
|
||||||
|
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
|
||||||
|
with self.assertRaises(qubespolicy.AccessDenied):
|
||||||
|
policy.evaluate(system_info, 'test-no-dvm', '$dispvm')
|
||||||
|
|
||||||
|
def test_036_eval_invalid_override_target(self):
|
||||||
|
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
|
||||||
|
f.write('test-vm3 $anyvm allow,target=no-such-vm\n')
|
||||||
|
|
||||||
|
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
|
||||||
|
with self.assertRaises(qubespolicy.AccessDenied):
|
||||||
|
policy.evaluate(system_info, 'test-vm3', '$default')
|
||||||
|
|
||||||
|
def test_037_eval_ask_no_targets(self):
|
||||||
|
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
|
||||||
|
f.write('test-vm3 $default ask\n')
|
||||||
|
|
||||||
|
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
|
||||||
|
with self.assertRaises(qubespolicy.AccessDenied):
|
||||||
|
policy.evaluate(system_info, 'test-vm3', '$default')
|
||||||
|
|
||||||
|
|
||||||
class TC_30_Misc(qubes.tests.QubesTestCase):
|
class TC_30_Misc(qubes.tests.QubesTestCase):
|
||||||
@unittest.mock.patch('socket.socket')
|
@unittest.mock.patch('socket.socket')
|
||||||
|
Loading…
Reference in New Issue
Block a user