qubespolicy: use '@' instead of '$' for policy keywords

Using '$' is easy to misuse in shell scripts, shell commands etc. After
all this years, lets abandon this dangerous character and move to
something safer: '@'. The choice was made after reviewing specifications
of various shells on different operating systems and this is the
character that have no special meaning in none of them.

To preserve compatibility, automatically translate '$' to '@' when
loading policy files.
This commit is contained in:
Marek Marczykowski-Górecki 2018-02-16 05:17:20 +01:00
parent c87fcd7e2e
commit 68b6f1ec76
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
11 changed files with 345 additions and 324 deletions

View File

@ -1293,7 +1293,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
dest_vm = profile_data['destination_vm']
dest_path = profile_data['destination_path']
include_vms = profile_data['include']
# convert old keywords to new keywords
include_vms = [vm.replace('$', '@') for vm in include_vms]
exclude_vms = profile_data.get('exclude', [])
# convert old keywords to new keywords
exclude_vms = [vm.replace('$', '@') for vm in exclude_vms]
compression = profile_data.get('compression', True)
except KeyError as err:
raise qubes.exc.QubesException(

View File

@ -177,10 +177,10 @@ def systemd_notify():
sock.close()
def match_vm_name_with_special(vm, name):
'''Check if *vm* matches given name, which may be specified as $tag:...
or $type:...'''
if name.startswith('$tag:'):
return name[len('$tag:'):] in vm.tags
elif name.startswith('$type:'):
return name[len('$type:'):] == vm.__class__.__name__
'''Check if *vm* matches given name, which may be specified as @tag:...
or @type:...'''
if name.startswith('@tag:'):
return name[len('@tag:'):] in vm.tags
elif name.startswith('@type:'):
return name[len('@type:'):] == vm.__class__.__name__
return name == vm.name

View File

@ -61,23 +61,23 @@ class Action(enum.Enum):
def is_special_value(value):
'''Check if given source/target specification is special (keyword) value
'''
return value.startswith('$')
return value.startswith('@')
def verify_target_value(system_info, value):
''' Check if given value names valid target
This function check if given value is not only syntactically correct,
but also if names valid service call target (existing domain,
or valid $dispvm like keyword)
or valid @dispvm like keyword)
:param system_info: information about the system
:param value: value to be checked
'''
if value == '$dispvm':
if value == '@dispvm':
return True
elif value == '$adminvm':
elif value == '@adminvm':
return True
elif value.startswith('$dispvm:'):
elif value.startswith('@dispvm:'):
dispvm_base = value.split(':', 1)[1]
if dispvm_base not in system_info['domains']:
return False
@ -89,11 +89,11 @@ def verify_target_value(system_info, value):
def verify_special_value(value, for_target=True, specific_target=False):
'''
Verify if given special VM-specifier ('$...') is valid
Verify if given special VM-specifier ('@...') is valid
:param value: value to verify
:param for_target: should classify target-only values as valid (
'$default', '$dispvm')
'@default', '@dispvm')
:param specific_target: allow only values naming specific target
(for use with target=, default= etc)
:return: True or False
@ -103,27 +103,27 @@ def verify_special_value(value, for_target=True, specific_target=False):
# values used only for matching VMs, not naming specific one (for actual
# call target)
if not specific_target:
if value.startswith('$tag:') and len(value) > len('$tag:'):
if value.startswith('@tag:') and len(value) > len('@tag:'):
return True
if value.startswith('$type:') and len(value) > len('$type:'):
if value.startswith('@type:') and len(value) > len('@type:'):
return True
if for_target and value.startswith('$dispvm:$tag:') and \
len(value) > len('$dispvm:$tag:'):
if for_target and value.startswith('@dispvm:@tag:') and \
len(value) > len('@dispvm:@tag:'):
return True
if value == '$anyvm':
if value == '@anyvm':
return True
if for_target and value == '$default':
if for_target and value == '@default':
return True
# those can be used to name one specific call VM
if value == '$adminvm':
if value == '@adminvm':
return True
# allow only specific dispvm, not based on any $xxx keyword - don't name
# $tag here specifically, to work also with any future keywords
if for_target and value.startswith('$dispvm:') and \
not value.startswith('$dispvm:$'):
# allow only specific dispvm, not based on any @xxx keyword - don't name
# @tag here specifically, to work also with any future keywords
if for_target and value.startswith('@dispvm:') and \
not value.startswith('@dispvm:@'):
return True
if for_target and value == '$dispvm':
if for_target and value == '@dispvm':
return True
return False
@ -138,7 +138,7 @@ class PolicyRule(object):
:raise PolicySyntaxError: when syntax error is found
:param line: a single line of actual qrexec policy (not a comment,
empty line or $include)
empty line or @include)
:param filename: name of the file from which this line is loaded
:param lineno: line number from which this line is loaded
'''
@ -193,30 +193,30 @@ class PolicyRule(object):
'invalid option {} for {} action'.format(param, action))
# verify special values
if self.source.startswith('$'):
if is_special_value(self.source):
if not verify_special_value(self.source, False, False):
raise PolicySyntaxError(filename, lineno,
'invalid source specification: {}'.format(self.source))
if self.target.startswith('$'):
if is_special_value(self.target):
if not verify_special_value(self.target, True, False):
raise PolicySyntaxError(filename, lineno,
'invalid target specification: {}'.format(self.target))
if self.target == '$default' \
if self.target == '@default' \
and self.action == Action.allow \
and self.override_target is None:
raise PolicySyntaxError(filename, lineno,
'allow action for $default rule must specify target= option')
'allow action for @default rule must specify target= option')
if self.override_target is not None:
if self.override_target.startswith('$') and \
if is_special_value(self.override_target) and \
not verify_special_value(self.override_target, True, True):
raise PolicySyntaxError(filename, lineno,
'target= option needs to name specific target')
if self.default_target is not None:
if self.default_target.startswith('$') and \
if is_special_value(self.default_target) and \
not verify_special_value(self.default_target, True, True):
raise PolicySyntaxError(filename, lineno,
'target= option needs to name specific target')
@ -224,7 +224,7 @@ class PolicyRule(object):
@staticmethod
def is_match_single(system_info, policy_value, value):
'''
Evaluate if a single value (VM name or '$default') matches policy
Evaluate if a single value (VM name or '@default') matches policy
specification
:param system_info: information about the system
@ -235,62 +235,62 @@ class PolicyRule(object):
'''
# pylint: disable=too-many-return-statements
# not specified target matches only with $default and $anyvm policy
# not specified target matches only with @default and @anyvm policy
# entry
if value == '$default' or value == '':
return policy_value in ('$default', '$anyvm')
if value == '@default' or value == '':
return policy_value in ('@default', '@anyvm')
# if specific target used, check if it's valid
# this function (is_match_single) is also used for checking call source
# values, but this isn't a problem, because it will always be a
# domain name (not $dispvm or such) - this is guaranteed by a nature
# domain name (not @dispvm or such) - this is guaranteed by a nature
# of qrexec call
if not verify_target_value(system_info, value):
return False
# handle $adminvm keyword
# handle @adminvm keyword
if policy_value == 'dom0':
# TODO: log a warning in Qubes 4.1
policy_value = '$adminvm'
policy_value = '@adminvm'
if value == 'dom0':
value = '$adminvm'
value = '@adminvm'
# allow any _valid_, non-dom0 target
if policy_value == '$anyvm':
return value != '$adminvm'
if policy_value == '@anyvm':
return value != '@adminvm'
# exact match, including $dispvm* and $adminvm
# exact match, including @dispvm* and @adminvm
if value == policy_value:
return True
# DispVM request, using tags to match
if policy_value.startswith('$dispvm:$tag:') \
and value.startswith('$dispvm:'):
if policy_value.startswith('@dispvm:@tag:') \
and value.startswith('@dispvm:'):
tag = policy_value.split(':', 2)[2]
dispvm_base = value.split(':', 1)[1]
# already checked for existence by verify_target_value call
dispvm_base_info = system_info['domains'][dispvm_base]
return tag in dispvm_base_info['tags']
# if $dispvm* not matched above, reject it; default DispVM (bare
# $dispvm) was resolved by the caller
if value.startswith('$dispvm:'):
# if @dispvm* not matched above, reject it; default DispVM (bare
# @dispvm) was resolved by the caller
if value.startswith('@dispvm:'):
return False
# require $adminvm to be matched explicitly (not through $tag or $type)
# require @adminvm to be matched explicitly (not through @tag or @type)
# - if not matched already, reject it
if value == '$adminvm':
if value == '@adminvm':
return False
# at this point, value name a specific target
domain_info = system_info['domains'][value]
if policy_value.startswith('$tag:'):
if policy_value.startswith('@tag:'):
tag = policy_value.split(':', 1)[1]
return tag in domain_info['tags']
if policy_value.startswith('$type:'):
if policy_value.startswith('@type:'):
type_ = policy_value.split(':', 1)[1]
return type_ == domain_info['type']
@ -310,17 +310,17 @@ class PolicyRule(object):
if not self.is_match_single(system_info, self.source, source):
return False
# $dispvm in policy matches _only_ $dispvm (but not $dispvm:some-vm,
# @dispvm in policy matches _only_ @dispvm (but not @dispvm:some-vm,
# even if that would be the default one)
if self.target == '$dispvm' and target == '$dispvm':
if self.target == '@dispvm' and target == '@dispvm':
return True
if target == '$dispvm':
# resolve default DispVM, to check all kinds of $dispvm:*
if target == '@dispvm':
# resolve default DispVM, to check all kinds of @dispvm:*
default_dispvm = system_info['domains'][source]['default_dispvm']
if default_dispvm is None:
# if this VM have no default DispVM, match only with $anyvm
return self.target == '$anyvm'
target = '$dispvm:' + default_dispvm
# if this VM have no default DispVM, match only with @anyvm
return self.target == '@anyvm'
target = '@dispvm:' + default_dispvm
if not self.is_match_single(system_info, self.target, target):
return False
return True
@ -333,30 +333,30 @@ class PolicyRule(object):
:return: matching domains
'''
if self.target.startswith('$tag:'):
if self.target.startswith('@tag:'):
tag = self.target.split(':', 1)[1]
for name, domain in system_info['domains'].items():
if tag in domain['tags']:
yield name
elif self.target.startswith('$type:'):
elif self.target.startswith('@type:'):
type_ = self.target.split(':', 1)[1]
for name, domain in system_info['domains'].items():
if type_ == domain['type']:
yield name
elif self.target == '$anyvm':
elif self.target == '@anyvm':
for name, domain in system_info['domains'].items():
if name != 'dom0':
yield name
if domain['template_for_dispvms']:
yield '$dispvm:' + name
yield '$dispvm'
elif self.target.startswith('$dispvm:$tag:'):
yield '@dispvm:' + name
yield '@dispvm'
elif self.target.startswith('@dispvm:@tag:'):
tag = self.target.split(':', 2)[2]
for name, domain in system_info['domains'].items():
if tag in domain['tags']:
if domain['template_for_dispvms']:
yield '$dispvm:' + name
elif self.target.startswith('$dispvm:'):
yield '@dispvm:' + name
elif self.target.startswith('@dispvm:'):
dispvm_base = self.target.split(':', 1)[1]
try:
if system_info['domains'][dispvm_base]['template_for_dispvms']:
@ -364,9 +364,9 @@ class PolicyRule(object):
except KeyError:
# TODO log a warning?
pass
elif self.target == '$adminvm':
elif self.target == '@adminvm':
yield self.target
elif self.target == '$dispvm':
elif self.target == '@dispvm':
yield self.target
else:
if self.target in system_info['domains']:
@ -374,17 +374,17 @@ class PolicyRule(object):
def expand_override_target(self, system_info, source):
'''
Replace '$dispvm' with specific '$dispvm:...' value, based on qrexec
Replace '@dispvm' with specific '@dispvm:...' value, based on qrexec
call source.
:param system_info: System information
:param source: Source domain name
:return: :py:attr:`override_target` with '$dispvm' substituted
:return: :py:attr:`override_target` with '@dispvm' substituted
'''
if self.override_target == '$dispvm':
if self.override_target == '@dispvm':
if system_info['domains'][source]['default_dispvm'] is None:
return None
return '$dispvm:' + system_info['domains'][source]['default_dispvm']
return '@dispvm:' + system_info['domains'][source]['default_dispvm']
else:
return self.override_target
@ -450,12 +450,12 @@ class PolicyAction(object):
assert self.action == Action.allow
assert self.target is not None
if self.target == '$adminvm':
if self.target == '@adminvm':
self.target = 'dom0'
if self.target == 'dom0':
original_target_type = \
'keyword' if is_special_value(self.original_target) else 'name'
original_target = self.original_target.lstrip('$')
original_target = self.original_target.lstrip('@')
cmd = \
'QUBESRPC {service} {source} {original_target_type} ' \
'{original_target}'.format(
@ -468,7 +468,7 @@ class PolicyAction(object):
user=(self.rule.override_user or 'DEFAULT'),
service=self.service,
source=self.source)
if self.target.startswith('$dispvm:'):
if self.target.startswith('@dispvm:'):
target = self.spawn_dispvm()
dispvm = True
else:
@ -569,13 +569,15 @@ class Policy(object):
for lineno, line in zip(itertools.count(start=1),
policy_file.readlines()):
line = line.strip()
# compatibility with old keywords notation
line = line.replace('$', '@')
if not line:
# skip empty lines
continue
if line[0] == '#':
# skip comments
continue
if line.startswith('$include:'):
if line.startswith('@include:'):
include_path = line.split(':', 1)[1]
# os.path.join will leave include_path unchanged if it's
# already absolute
@ -597,7 +599,7 @@ class Policy(object):
''' Collect targets the user can choose from in 'ask' action
Word 'targets' is used intentionally instead of 'domains', because it
can also contains $dispvm like keywords.
can also contains @dispvm like keywords.
'''
targets = set()
@ -617,17 +619,17 @@ class Policy(object):
targets.update(rule.expand_target(system_info))
# expand default DispVM
if '$dispvm' in targets:
targets.remove('$dispvm')
if '@dispvm' in targets:
targets.remove('@dispvm')
if system_info['domains'][source]['default_dispvm'] is not None:
dispvm = '$dispvm:' + \
dispvm = '@dispvm:' + \
system_info['domains'][source]['default_dispvm']
if verify_target_value(system_info, dispvm):
targets.add(dispvm)
# expand other keywords
if '$adminvm' in targets:
targets.remove('$adminvm')
if '@adminvm' in targets:
targets.remove('@adminvm')
targets.add('dom0')
return targets
@ -668,18 +670,18 @@ class Policy(object):
return PolicyAction(self.service, source, rule.default_target,
rule, target, targets)
elif rule.action == Action.allow:
if actual_target == '$default':
if actual_target == '@default':
raise AccessDenied(
'policy define \'allow\' action at {}:{} but no target is '
'specified by caller or policy'.format(
rule.filename, rule.lineno))
if actual_target == '$dispvm':
if actual_target == '@dispvm':
if system_info['domains'][source]['default_dispvm'] is None:
raise AccessDenied(
'policy define \'allow\' action to $dispvm at {}:{} '
'policy define \'allow\' action to @dispvm at {}:{} '
'but no DispVM base is set for this VM'.format(
rule.filename, rule.lineno))
actual_target = '$dispvm:' + \
actual_target = '@dispvm:' + \
system_info['domains'][source]['default_dispvm']
return PolicyAction(self.service, source,

View File

@ -58,7 +58,7 @@ def create_default_policy(service_name):
policy.write("\n")
policy.write("## Please use a single # to start your custom comments\n")
policy.write("\n")
policy.write("$anyvm $anyvm ask\n")
policy.write("@anyvm @anyvm ask\n")
def main(args=None):
@ -118,7 +118,7 @@ def main(args=None):
if not (system_info['domains'][dispvm_base]
['template_for_dispvms']):
continue
dispvm_api_name = '$dispvm:' + dispvm_base
dispvm_api_name = '@dispvm:' + dispvm_base
icons[dispvm_api_name] = \
system_info['domains'][dispvm_base]['icon']
icons[dispvm_api_name] = \

View File

@ -85,8 +85,8 @@ def main(args=None):
sources = args.source
targets = list(system_info['domains'].keys())
targets.append('$dispvm')
targets.extend('$dispvm:' + dom for dom in system_info['domains']
targets.append('@dispvm')
targets.extend('@dispvm:' + dom for dom in system_info['domains']
if system_info['domains'][dom]['template_for_dispvms'])
connections = set()

View File

@ -50,8 +50,8 @@ class VMListModeler:
def _create_entries(self):
for name, vm in self._domains_info.items():
if name.startswith('$dispvm:'):
vm_name = name[len('$dispvm:'):]
if name.startswith('@dispvm:'):
vm_name = name[len('@dispvm:'):]
dispvm = True
else:
vm_name = name

View File

@ -95,72 +95,72 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'default-dvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '$dispvm'))
qubespolicy.verify_target_value(system_info, '@dispvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm'))
qubespolicy.verify_target_value(system_info, '@dispvm:default-dvm'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-template'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, 'test-standalone'))
self.assertTrue(
qubespolicy.verify_target_value(system_info, '$adminvm'))
qubespolicy.verify_target_value(system_info, '@adminvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, 'no-such-vm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info,
'$dispvm:test-invalid-dvm'))
'@dispvm:test-invalid-dvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1'))
qubespolicy.verify_target_value(system_info, '@dispvm:test-vm1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, ''))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '$default'))
qubespolicy.verify_target_value(system_info, '@default'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '$anyvm'))
qubespolicy.verify_target_value(system_info, '@anyvm'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '$tag:tag1'))
qubespolicy.verify_target_value(system_info, '@tag:tag1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '$dispvm:$tag:tag1'))
qubespolicy.verify_target_value(system_info, '@dispvm:@tag:tag1'))
self.assertFalse(
qubespolicy.verify_target_value(system_info, '$invalid'))
qubespolicy.verify_target_value(system_info, '@invalid'))
def test_010_verify_special_value(self):
self.assertTrue(qubespolicy.verify_special_value('$tag:tag',
self.assertTrue(qubespolicy.verify_special_value('@tag:tag',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag',
self.assertTrue(qubespolicy.verify_special_value('@tag:other-tag',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('$type:AppVM',
self.assertTrue(qubespolicy.verify_special_value('@type:AppVM',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('$adminvm',
self.assertTrue(qubespolicy.verify_special_value('@adminvm',
for_target=False))
self.assertTrue(qubespolicy.verify_special_value('$dispvm:some-vm',
self.assertTrue(qubespolicy.verify_special_value('@dispvm:some-vm',
for_target=True))
self.assertTrue(qubespolicy.verify_special_value('$dispvm:$tag:tag1',
self.assertTrue(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
for_target=True))
self.assertFalse(qubespolicy.verify_special_value('$default',
self.assertFalse(qubespolicy.verify_special_value('@default',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('$dispvm',
self.assertFalse(qubespolicy.verify_special_value('@dispvm',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm',
self.assertFalse(qubespolicy.verify_special_value('@dispvm:some-vm',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('$dispvm:$tag:tag1',
self.assertFalse(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('$invalid',
self.assertFalse(qubespolicy.verify_special_value('@invalid',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('vm-name',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('$tag:',
self.assertFalse(qubespolicy.verify_special_value('@tag:',
for_target=False))
self.assertFalse(qubespolicy.verify_special_value('$type:',
self.assertFalse(qubespolicy.verify_special_value('@type:',
for_target=False))
def test_020_line_simple(self):
line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12)
line = qubespolicy.PolicyRule('@anyvm @anyvm ask', 'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '$anyvm')
self.assertEqual(line.target, '$anyvm')
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@anyvm')
self.assertEqual(line.full_action, 'ask')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
@ -169,13 +169,13 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_021_line_simple(self):
# also check spaces in action field
line = qubespolicy.PolicyRule(
'$tag:tag1 $type:AppVM ask, target=test-vm2, user=user',
'@tag:tag1 @type:AppVM ask, target=test-vm2, user=user',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '$tag:tag1')
self.assertEqual(line.target, '$type:AppVM')
self.assertEqual(line.source, '@tag:tag1')
self.assertEqual(line.target, '@type:AppVM')
self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user')
self.assertEqual(line.override_target, 'test-vm2')
self.assertEqual(line.override_user, 'user')
@ -183,27 +183,27 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_022_line_simple(self):
line = qubespolicy.PolicyRule(
'$anyvm $default allow,target=$dispvm:test-vm2',
'@anyvm @default allow,target=@dispvm:test-vm2',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.allow)
self.assertEqual(line.source, '$anyvm')
self.assertEqual(line.target, '$default')
self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2')
self.assertEqual(line.override_target, '$dispvm:test-vm2')
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@default')
self.assertEqual(line.full_action, 'allow,target=@dispvm:test-vm2')
self.assertEqual(line.override_target, '@dispvm:test-vm2')
self.assertIsNone(line.override_user)
self.assertIsNone(line.default_target)
def test_023_line_simple(self):
line = qubespolicy.PolicyRule(
'$anyvm $default ask,default_target=test-vm1',
'@anyvm @default ask,default_target=test-vm1',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '$anyvm')
self.assertEqual(line.target, '$default')
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@default')
self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
@ -211,39 +211,39 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_024_line_simple(self):
line = qubespolicy.PolicyRule(
'$anyvm $adminvm ask,default_target=$adminvm',
'@anyvm @adminvm ask,default_target=@adminvm',
'filename', 12)
self.assertEqual(line.filename, 'filename')
self.assertEqual(line.lineno, 12)
self.assertEqual(line.action, qubespolicy.Action.ask)
self.assertEqual(line.source, '$anyvm')
self.assertEqual(line.target, '$adminvm')
self.assertEqual(line.full_action, 'ask,default_target=$adminvm')
self.assertEqual(line.source, '@anyvm')
self.assertEqual(line.target, '@adminvm')
self.assertEqual(line.full_action, 'ask,default_target=@adminvm')
self.assertIsNone(line.override_target)
self.assertIsNone(line.override_user)
self.assertEqual(line.default_target, '$adminvm')
self.assertEqual(line.default_target, '@adminvm')
def test_030_line_invalid(self):
invalid_lines = [
'$dispvm $default allow', # $dispvm can't be a source
'$default $default allow', # $default can't be a source
'$anyvm $default allow,target=$dispvm:$tag:tag1', # $dispvm:$tag
'@dispvm @default allow', # @dispvm can't be a source
'@default @default allow', # @default can't be a source
'@anyvm @default allow,target=@dispvm:@tag:tag1', # @dispvm:@tag
# as override target
'$anyvm $default allow,target=$tag:tag1', # $tag as override target
'$anyvm $default deny,target=test-vm1', # target= used with deny
'$anyvm $anyvm deny,default_target=test-vm1', # default_target=
'@anyvm @default allow,target=@tag:tag1', # @tag as override target
'@anyvm @default deny,target=test-vm1', # target= used with deny
'@anyvm @anyvm deny,default_target=test-vm1', # default_target=
# with deny
'$anyvm $anyvm deny,user=user', # user= with deny
'$anyvm $anyvm invalid', # invalid action
'$anyvm $anyvm allow,invalid=xx', # invalid option
'$anyvm $anyvm', # missing action
'$anyvm $anyvm allow,default_target=test-vm1', # default_target=
'@anyvm @anyvm deny,user=user', # user= with deny
'@anyvm @anyvm invalid', # invalid action
'@anyvm @anyvm allow,invalid=xx', # invalid option
'@anyvm @anyvm', # missing action
'@anyvm @anyvm allow,default_target=test-vm1', # default_target=
# with allow
'$invalid $anyvm allow', # invalid source
'$anyvm $invalid deny', # invalid target
'@invalid @anyvm allow', # invalid source
'@anyvm @invalid deny', # invalid target
'', # empty line
'$anyvm $anyvm allow extra', # trailing words
'$anyvm $default allow', # $default allow without target=
'@anyvm @anyvm allow extra', # trailing words
'@anyvm @default allow', # @default allow without target=
]
for line in invalid_lines:
with self.subTest(line):
@ -252,118 +252,118 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_040_match_single(self):
is_match_single = qubespolicy.PolicyRule.is_match_single
self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '$anyvm', '$default'))
self.assertTrue(is_match_single(system_info, '$anyvm', ''))
self.assertTrue(is_match_single(system_info, '$default', ''))
self.assertTrue(is_match_single(system_info, '$default', '$default'))
self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@default'))
self.assertTrue(is_match_single(system_info, '@anyvm', ''))
self.assertTrue(is_match_single(system_info, '@default', ''))
self.assertTrue(is_match_single(system_info, '@default', '@default'))
self.assertTrue(is_match_single(system_info, '@tag:tag1', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@type:AppVM', 'test-vm1'))
self.assertTrue(is_match_single(system_info,
'$type:TemplateVM', 'test-template'))
self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
'@type:TemplateVM', 'test-template'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
self.assertTrue(is_match_single(system_info,
'$anyvm', '$dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm'))
'@anyvm', '@dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '@dispvm', '@dispvm'))
self.assertTrue(is_match_single(system_info,
'$dispvm:$tag:tag3', '$dispvm:test-vm3'))
self.assertTrue(is_match_single(system_info, '$adminvm', '$adminvm'))
self.assertTrue(is_match_single(system_info, '$adminvm', 'dom0'))
self.assertTrue(is_match_single(system_info, 'dom0', '$adminvm'))
'@dispvm:@tag:tag3', '@dispvm:test-vm3'))
self.assertTrue(is_match_single(system_info, '@adminvm', '@adminvm'))
self.assertTrue(is_match_single(system_info, '@adminvm', 'dom0'))
self.assertTrue(is_match_single(system_info, 'dom0', '@adminvm'))
self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
self.assertTrue(is_match_single(system_info,
'$dispvm:default-dvm', '$dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
'@dispvm:default-dvm', '@dispvm:default-dvm'))
self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '$default', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3'))
self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info, '@default', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@tag:tag1', 'test-vm3'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
# test-vm1.template_for_dispvms=False
self.assertFalse(is_match_single(system_info,
'$anyvm', '$dispvm:test-vm1'))
'@anyvm', '@dispvm:test-vm1'))
# test-vm1.template_for_dispvms=False
self.assertFalse(is_match_single(system_info,
'$dispvm:test-vm1', '$dispvm:test-vm1'))
'@dispvm:test-vm1', '@dispvm:test-vm1'))
self.assertFalse(is_match_single(system_info,
'$dispvm:$tag:tag1', '$dispvm:test-vm1'))
'@dispvm:@tag:tag1', '@dispvm:test-vm1'))
# test-vm3 has not tag1
self.assertFalse(is_match_single(system_info,
'$dispvm:$tag:tag1', '$dispvm:test-vm3'))
'@dispvm:@tag:tag1', '@dispvm:test-vm3'))
# default-dvm has no tag3
self.assertFalse(is_match_single(system_info,
'$dispvm:$tag:tag3', '$dispvm:default-dvm'))
self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0'))
self.assertFalse(is_match_single(system_info, '$anyvm', '$adminvm'))
'@dispvm:@tag:tag3', '@dispvm:default-dvm'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'dom0'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'$tag:dom0-tag', '$adminvm'))
'@tag:dom0-tag', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'$type:AdminVM', '$adminvm'))
'@type:AdminVM', '@adminvm'))
self.assertFalse(is_match_single(system_info,
'$tag:dom0-tag', 'dom0'))
'@tag:dom0-tag', 'dom0'))
self.assertFalse(is_match_single(system_info,
'$type:AdminVM', 'dom0'))
self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0'))
self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1'))
self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM'))
self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid'))
self.assertFalse(is_match_single(system_info, '$invalid', '$invalid'))
self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
'@type:AdminVM', 'dom0'))
self.assertFalse(is_match_single(system_info, '@tag:tag1', 'dom0'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@tag:tag1'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@type:AppVM'))
self.assertFalse(is_match_single(system_info, '@anyvm', '@invalid'))
self.assertFalse(is_match_single(system_info, '@invalid', '@invalid'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info,
'no-such-vm', 'no-such-vm'))
self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info, '@dispvm', 'test-vm1'))
self.assertFalse(is_match_single(system_info, '@dispvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info,
'$dispvm:default-dvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n'))
self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1 '))
'@dispvm:default-dvm', 'default-dvm'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1\n'))
self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1 '))
def test_050_match(self):
line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
line = qubespolicy.PolicyRule('$anyvm $dispvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '$dispvm'))
line = qubespolicy.PolicyRule('$anyvm $dispvm allow')
line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
self.assertFalse(line.is_match(system_info,
'test-vm1', '$dispvm:default-dvm'))
line = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '$dispvm'))
line = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
'test-vm1', '@dispvm:default-dvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
self.assertTrue(line.is_match(system_info,
'test-vm1', '$dispvm:default-dvm'))
line = qubespolicy.PolicyRule('$anyvm $dispvm:$tag:tag3 allow')
'test-vm1', '@dispvm:default-dvm'))
line = qubespolicy.PolicyRule('@anyvm @dispvm:@tag:tag3 allow')
self.assertTrue(line.is_match(system_info,
'test-vm1', '$dispvm:test-vm3'))
'test-vm1', '@dispvm:test-vm3'))
def test_060_expand_target(self):
lines = {
'$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
'$dispvm:test-vm3',
'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'],
'$anyvm $dispvm allow': ['$dispvm'],
'$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'],
'@anyvm @anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
'@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone', '@dispvm'],
'@anyvm @dispvm allow': ['@dispvm'],
'@anyvm @dispvm:default-dvm allow': ['@dispvm:default-dvm'],
# no DispVM from test-vm1 allowed
'$anyvm $dispvm:test-vm1 allow': [],
'$anyvm $dispvm:test-vm3 allow': ['$dispvm:test-vm3'],
'$anyvm $dispvm:$tag:tag1 allow': [],
'$anyvm $dispvm:$tag:tag3 allow': ['$dispvm:test-vm3'],
'$anyvm test-vm1 allow': ['test-vm1'],
'$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
'@anyvm @dispvm:test-vm1 allow': [],
'@anyvm @dispvm:test-vm3 allow': ['@dispvm:test-vm3'],
'@anyvm @dispvm:@tag:tag1 allow': [],
'@anyvm @dispvm:@tag:tag3 allow': ['@dispvm:test-vm3'],
'@anyvm test-vm1 allow': ['test-vm1'],
'@anyvm @type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
'$anyvm $type:TemplateVM allow': ['test-template'],
'$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
'@anyvm @type:TemplateVM allow': ['test-template'],
'@anyvm @tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
'test-template', 'test-standalone', 'test-no-dvm'],
'$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2',
'@anyvm @tag:tag2 allow': ['test-vm1', 'test-vm2',
'test-invalid-dvm', 'test-template', 'test-standalone',
'test-no-dvm'],
'$anyvm $tag:no-such-tag allow': [],
'@anyvm @tag:no-such-tag allow': [],
}
for line in lines:
with self.subTest(line):
@ -373,56 +373,56 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase):
def test_070_expand_override_target(self):
line = qubespolicy.PolicyRule(
'$anyvm $anyvm allow,target=test-vm2')
'@anyvm @anyvm allow,target=test-vm2')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'test-vm2')
def test_071_expand_override_target_dispvm(self):
line = qubespolicy.PolicyRule(
'$anyvm $anyvm allow,target=$dispvm')
'@anyvm @anyvm allow,target=@dispvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'$dispvm:default-dvm')
'@dispvm:default-dvm')
def test_072_expand_override_target_dispvm_specific(self):
line = qubespolicy.PolicyRule(
'$anyvm $anyvm allow,target=$dispvm:test-vm3')
'@anyvm @anyvm allow,target=@dispvm:test-vm3')
self.assertEqual(
line.expand_override_target(system_info, 'test-vm1'),
'$dispvm:test-vm3')
'@dispvm:test-vm3')
def test_073_expand_override_target_dispvm_none(self):
line = qubespolicy.PolicyRule(
'$anyvm $anyvm allow,target=$dispvm')
'@anyvm @anyvm allow,target=@dispvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
None)
def test_074_expand_override_target_dom0(self):
line = qubespolicy.PolicyRule(
'$anyvm $anyvm allow,target=dom0')
'@anyvm @anyvm allow,target=dom0')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
'dom0')
def test_075_expand_override_target_dom0(self):
line = qubespolicy.PolicyRule(
'$anyvm $anyvm allow,target=$adminvm')
'@anyvm @anyvm allow,target=@adminvm')
self.assertEqual(
line.expand_override_target(system_info, 'test-no-dvm'),
'$adminvm')
'@adminvm')
class TC_10_PolicyAction(qubes.tests.QubesTestCase):
def test_000_init(self):
rule = qubespolicy.PolicyRule('$anyvm $anyvm deny')
rule = qubespolicy.PolicyRule('@anyvm @anyvm deny')
with self.assertRaises(qubespolicy.AccessDenied):
qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
rule, 'test-vm2')
def test_001_init(self):
rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
self.assertEqual(action.service, 'test.service')
@ -434,8 +434,8 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
self.assertEqual(action.action, qubespolicy.Action.ask)
def test_002_init_invalid(self):
rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow')
rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
rule_allow = qubespolicy.PolicyRule('@anyvm @anyvm allow')
with self.assertRaises(AssertionError):
qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule_allow, 'test-vm2', None)
@ -448,7 +448,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
None, rule_ask, 'test-vm2', None)
def test_003_init_default_target(self):
rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
@ -459,7 +459,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
self.assertEqual(action.target, 'test-vm2')
def test_010_handle_user_response(self):
rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
action.handle_user_response(True, 'test-vm2')
@ -467,14 +467,14 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
self.assertEqual(action.target, 'test-vm2')
def test_011_handle_user_response(self):
rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(AssertionError):
action.handle_user_response(True, 'test-no-dvm')
def test_012_handle_user_response(self):
rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
with self.assertRaises(qubespolicy.AccessDenied):
@ -483,7 +483,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
def test_013_handle_user_response_with_default_target(self):
rule = qubespolicy.PolicyRule(
'$anyvm $anyvm ask,default_target=test-vm2')
'@anyvm @anyvm ask,default_target=test-vm2')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
action.handle_user_response(True, 'test-vm2')
@ -493,7 +493,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_020_execute(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
action.execute('some-ident')
@ -506,7 +506,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('$anyvm dom0 allow')
rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'dom0', rule, 'dom0')
action.execute('some-ident')
@ -519,9 +519,9 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_021_execute_dom0_keyword(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('$anyvm dom0 allow')
rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'dom0', rule, '$adminvm')
'dom0', rule, '@adminvm')
action.execute('some-ident')
self.assertEqual(mock_qubesd_call.mock_calls, [])
self.assertEqual(mock_subprocess.mock_calls,
@ -532,9 +532,9 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
@unittest.mock.patch('qubespolicy.qubesd_call')
@unittest.mock.patch('subprocess.call')
def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
rule = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'$dispvm:default-dvm', rule, '$dispvm:default-dvm')
'@dispvm:default-dvm', rule, '@dispvm:default-dvm')
mock_qubesd_call.side_effect = (lambda target, call:
b'dispvm-name' if call == 'admin.vm.CreateDisposable' else
unittest.mock.DEFAULT)
@ -552,7 +552,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
@unittest.mock.patch('subprocess.call')
def test_023_execute_already_running(self, mock_subprocess,
mock_qubesd_call):
rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
mock_qubesd_call.side_effect = \
@ -568,7 +568,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase):
@unittest.mock.patch('subprocess.call')
def test_024_execute_startup_error(self, mock_subprocess,
mock_qubesd_call):
rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
action = qubespolicy.PolicyAction('test.service', 'test-vm1',
'test-vm2', rule, 'test-vm2')
mock_qubesd_call.side_effect = \
@ -597,7 +597,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
f.write('# comment\n')
f.write('test-vm2 test-vm3 ask\n')
f.write(' # comment \n')
f.write('$anyvm $anyvm ask\n')
f.write('@anyvm @anyvm ask\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 3)
@ -613,10 +613,10 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
def test_002_include(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('$include:test.service2\n')
f.write('$anyvm $anyvm deny\n')
f.write('@include:test.service2\n')
f.write('@anyvm @anyvm deny\n')
with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
f.write('test-vm3 $default allow,target=test-vm2\n')
f.write('test-vm3 @default allow,target=test-vm2\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 3)
@ -628,31 +628,46 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
tmp_policy_dir + '/test.service')
self.assertEqual(policy.policy_rules[0].lineno, 1)
self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
self.assertEqual(policy.policy_rules[1].target, '$default')
self.assertEqual(policy.policy_rules[1].target, '@default')
self.assertEqual(policy.policy_rules[1].action,
qubespolicy.Action.allow)
self.assertEqual(policy.policy_rules[1].filename,
tmp_policy_dir + '/test.service2')
self.assertEqual(policy.policy_rules[1].lineno, 1)
self.assertEqual(policy.policy_rules[2].source, '$anyvm')
self.assertEqual(policy.policy_rules[2].target, '$anyvm')
self.assertEqual(policy.policy_rules[2].source, '@anyvm')
self.assertEqual(policy.policy_rules[2].target, '@anyvm')
self.assertEqual(policy.policy_rules[2].action,
qubespolicy.Action.deny)
self.assertEqual(policy.policy_rules[2].filename,
tmp_policy_dir + '/test.service')
self.assertEqual(policy.policy_rules[2].lineno, 3)
def test_003_load_convert(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm2 test-vm3 ask\n')
f.write(' # comment \n')
f.write('$anyvm $dispvm ask,default_target=$dispvm\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.service, 'test.service')
self.assertEqual(len(policy.policy_rules), 2)
self.assertEqual(policy.policy_rules[1].source, '@anyvm')
self.assertEqual(policy.policy_rules[1].target, '@dispvm')
self.assertEqual(policy.policy_rules[1].action,
qubespolicy.Action.ask)
self.assertEqual(policy.policy_rules[1].default_target,
'@dispvm')
def test_010_find_rule(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 $anyvm ask\n')
f.write('test-vm2 $tag:tag1 deny\n')
f.write('test-vm2 $tag:tag2 allow\n')
f.write('test-vm2 $dispvm:$tag:tag3 allow\n')
f.write('test-vm2 $dispvm:$tag:tag2 allow\n')
f.write('test-vm2 $dispvm:default-dvm allow\n')
f.write('$type:AppVM $default allow,target=test-vm3\n')
f.write('$tag:tag1 $type:AppVM allow\n')
f.write('test-vm1 @anyvm ask\n')
f.write('test-vm2 @tag:tag1 deny\n')
f.write('test-vm2 @tag:tag2 allow\n')
f.write('test-vm2 @dispvm:@tag:tag3 allow\n')
f.write('test-vm2 @dispvm:@tag:tag2 allow\n')
f.write('test-vm2 @dispvm:default-dvm allow\n')
f.write('@type:AppVM @default allow,target=test-vm3\n')
f.write('@tag:tag1 @type:AppVM allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
@ -662,47 +677,47 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
# $anyvm matches $default too
# @anyvm matches @default too
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm1', ''), policy.policy_rules[1])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', ''), policy.policy_rules[7])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '$default'), policy.policy_rules[7])
system_info, 'test-vm2', '@default'), policy.policy_rules[7])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '$dispvm:test-vm3'),
system_info, 'test-vm2', '@dispvm:test-vm3'),
policy.policy_rules[4])
self.assertEqual(policy.find_matching_rule(
system_info, 'test-vm2', '$dispvm'),
system_info, 'test-vm2', '@dispvm'),
policy.policy_rules[6])
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(
system_info, 'test-no-dvm', 'test-standalone')
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(system_info, 'test-no-dvm', '$dispvm')
policy.find_matching_rule(system_info, 'test-no-dvm', '@dispvm')
with self.assertRaises(qubespolicy.AccessDenied):
policy.find_matching_rule(
system_info, 'test-standalone', '$default')
system_info, 'test-standalone', '@default')
def test_020_collect_targets_for_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 $anyvm ask\n')
f.write('test-vm2 $tag:tag1 deny\n')
f.write('test-vm2 $tag:tag2 allow\n')
f.write('test-no-dvm $type:AppVM deny\n')
f.write('$type:AppVM $default allow,target=test-vm3\n')
f.write('$tag:tag1 $type:AppVM allow\n')
f.write('test-no-dvm $dispvm allow\n')
f.write('test-standalone $dispvm allow\n')
f.write('test-standalone $adminvm allow\n')
f.write('test-vm1 @anyvm ask\n')
f.write('test-vm2 @tag:tag1 deny\n')
f.write('test-vm2 @tag:tag2 allow\n')
f.write('test-no-dvm @type:AppVM deny\n')
f.write('@type:AppVM @default allow,target=test-vm3\n')
f.write('@tag:tag1 @type:AppVM allow\n')
f.write('test-no-dvm @dispvm allow\n')
f.write('test-standalone @dispvm allow\n')
f.write('test-standalone @adminvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3',
'$dispvm:test-vm3',
'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
'@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-vm2'), ['test-vm2', 'test-vm3'])
@ -711,7 +726,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
'$dispvm:default-dvm', 'dom0'])
'@dispvm:default-dvm', 'dom0'])
self.assertCountEqual(policy.collect_targets_for_ask(system_info,
'test-no-dvm'), [])
@ -728,37 +743,37 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm2', '$default')
policy.evaluate(system_info, 'test-vm2', '@default')
def test_031_eval_default(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 $default allow,target=test-vm2\n')
f.write('$tag:tag1 test-vm2 ask\n')
f.write('$tag:tag2 $anyvm allow\n')
f.write('test-vm3 $anyvm deny\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm1', '$default')
action = policy.evaluate(system_info, 'test-vm1', '@default')
self.assertEqual(action.rule, policy.policy_rules[1])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, 'test-vm2')
self.assertEqual(action.original_target, '$default')
self.assertEqual(action.original_target, '@default')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
with self.assertRaises(qubespolicy.AccessDenied):
# action allow should hit, but no target specified (either by
# caller or policy)
policy.evaluate(system_info, 'test-standalone', '$default')
policy.evaluate(system_info, 'test-standalone', '@default')
def test_032_eval_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 $default allow,target=test-vm2\n')
f.write('$tag:tag1 test-vm2 ask\n')
f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('$tag:tag2 $anyvm allow\n')
f.write('test-vm3 $anyvm deny\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
@ -768,18 +783,18 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
self.assertEqual(action.original_target, 'test-vm2')
self.assertEqual(action.service, 'test.service')
self.assertCountEqual(action.targets_for_ask,
['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
def test_033_eval_ask(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm1 test-vm2 allow\n')
f.write('test-vm1 $default allow,target=test-vm2\n')
f.write('$tag:tag1 test-vm2 ask\n')
f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('$tag:tag2 $anyvm allow\n')
f.write('test-vm3 $anyvm deny\n')
f.write('test-vm1 @default allow,target=test-vm2\n')
f.write('@tag:tag1 test-vm2 ask\n')
f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
f.write('@tag:tag2 @anyvm allow\n')
f.write('test-vm3 @anyvm deny\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
@ -789,46 +804,46 @@ class TC_20_Policy(qubes.tests.QubesTestCase):
self.assertEqual(action.original_target, 'test-vm3')
self.assertEqual(action.service, 'test.service')
self.assertCountEqual(action.targets_for_ask,
['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
'test-no-dvm', 'test-template', 'test-standalone'])
def test_034_eval_resolve_dispvm(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 $dispvm allow\n')
f.write('test-vm3 @dispvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
action = policy.evaluate(system_info, 'test-vm3', '$dispvm')
action = policy.evaluate(system_info, 'test-vm3', '@dispvm')
self.assertEqual(action.rule, policy.policy_rules[0])
self.assertEqual(action.action, qubespolicy.Action.allow)
self.assertEqual(action.target, '$dispvm:default-dvm')
self.assertEqual(action.original_target, '$dispvm')
self.assertEqual(action.target, '@dispvm:default-dvm')
self.assertEqual(action.original_target, '@dispvm')
self.assertEqual(action.service, 'test.service')
self.assertIsNone(action.targets_for_ask)
def test_035_eval_resolve_dispvm_fail(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-no-dvm $dispvm allow\n')
f.write('test-no-dvm @dispvm allow\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-no-dvm', '$dispvm')
policy.evaluate(system_info, 'test-no-dvm', '@dispvm')
def test_036_eval_invalid_override_target(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 $anyvm allow,target=no-such-vm\n')
f.write('test-vm3 @anyvm allow,target=no-such-vm\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm3', '$default')
policy.evaluate(system_info, 'test-vm3', '@default')
def test_037_eval_ask_no_targets(self):
with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
f.write('test-vm3 $default ask\n')
f.write('test-vm3 @default ask\n')
policy = qubespolicy.Policy('test.service', tmp_policy_dir)
with self.assertRaises(qubespolicy.AccessDenied):
policy.evaluate(system_info, 'test-vm3', '$default')
policy.evaluate(system_info, 'test-vm3', '@default')
class TC_30_Misc(qubes.tests.QubesTestCase):

View File

@ -107,7 +107,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase):
'test-vm1': 'red',
'test-vm2': 'red',
'test-vm3': 'green',
'$dispvm:test-vm3': 'green',
'@dispvm:test-vm3': 'green',
}
self.assertEqual(self.dbus_mock.mock_calls, [
('', (), {}),
@ -146,7 +146,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase):
'test-vm1': 'red',
'test-vm2': 'red',
'test-vm3': 'green',
'$dispvm:test-vm3': 'green',
'@dispvm:test-vm3': 'green',
}
self.assertEqual(self.dbus_mock.mock_calls, [
('', (), {}),
@ -183,7 +183,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase):
'test-vm1': 'red',
'test-vm2': 'red',
'test-vm3': 'green',
'$dispvm:test-vm3': 'green',
'@dispvm:test-vm3': 'green',
}
self.assertEqual(self.dbus_mock.mock_calls, [
('', (), {}),
@ -314,7 +314,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase):
"\n"
"## Please use a single # to start your custom comments\n"
"\n"
"$anyvm $anyvm ask\n")
"@anyvm @anyvm ask\n")
def test_041_create_policy_abort(self):
self.policy_mock.configure_mock(**{

View File

@ -37,11 +37,11 @@ mock_domains_info = {
'test-red3': {'icon': 'red', 'type': 'AppVM'},
'test-source': {'icon': 'green', 'type': 'AppVM'},
'test-target': {'icon': 'orange', 'type': 'AppVM'},
'$dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'},
'@dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'},
}
mock_whitelist = ["test-red1", "test-red2", "test-red3",
"test-target", "$dispvm:test-disp6"]
"test-target", "@dispvm:test-disp6"]
class MockComboEntry:
def __init__(self, text):
@ -108,7 +108,7 @@ class VMListModelerTest(VMListModeler, unittest.TestCase):
self._get_valid_qube_name(None, None, mock_whitelist))
def test_valid_qube_name_whitelist(self):
list_exc = ["$dispvm:test-disp6", "test-red2"]
list_exc = ["@dispvm:test-disp6", "test-red2"]
whitelist = [name for name in mock_whitelist if name not in list_exc]
self.apply_model(Gtk.ComboBox(), whitelist)

View File

@ -301,7 +301,7 @@ class RPCConfirmationWindowTestWhitelist(unittest.TestCase):
def test_all_allowed_domains(self):
self._assert_whitelist(
["test-red1", "test-red2", "test-red3",
"test-target", "$dispvm:test-disp6", "test-source", "dom0"],
"test-target", "@dispvm:test-disp6", "test-source", "dom0"],
["test-red1", "test-red2", "test-red3",
"test-target", "Disposable VM (test-disp6)", "test-source",
"dom0"])

View File

@ -24,7 +24,7 @@ def _sanitize_char(input_char, extra_allowed_characters):
if (ord('a') <= input_char_ord <= ord('z')) \
or (ord('A') <= input_char_ord <= ord('Z')) \
or (ord('0') <= input_char_ord <= ord('9')) \
or (input_char in ['$', '_', '-', '.']) \
or (input_char in ['@', '_', '-', '.']) \
or (input_char in extra_allowed_characters):
result = input_char
else: