diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 2951ab4e..8b1f6596 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -1293,7 +1293,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): dest_vm = profile_data['destination_vm'] dest_path = profile_data['destination_path'] include_vms = profile_data['include'] + # convert old keywords to new keywords + include_vms = [vm.replace('$', '@') for vm in include_vms] exclude_vms = profile_data.get('exclude', []) + # convert old keywords to new keywords + exclude_vms = [vm.replace('$', '@') for vm in exclude_vms] compression = profile_data.get('compression', True) except KeyError as err: raise qubes.exc.QubesException( diff --git a/qubes/utils.py b/qubes/utils.py index c6e3492a..4fefe7df 100644 --- a/qubes/utils.py +++ b/qubes/utils.py @@ -177,10 +177,10 @@ def systemd_notify(): sock.close() def match_vm_name_with_special(vm, name): - '''Check if *vm* matches given name, which may be specified as $tag:... - or $type:...''' - if name.startswith('$tag:'): - return name[len('$tag:'):] in vm.tags - elif name.startswith('$type:'): - return name[len('$type:'):] == vm.__class__.__name__ + '''Check if *vm* matches given name, which may be specified as @tag:... + or @type:...''' + if name.startswith('@tag:'): + return name[len('@tag:'):] in vm.tags + elif name.startswith('@type:'): + return name[len('@type:'):] == vm.__class__.__name__ return name == vm.name diff --git a/qubespolicy/__init__.py b/qubespolicy/__init__.py index 248b65cb..190e3d61 100755 --- a/qubespolicy/__init__.py +++ b/qubespolicy/__init__.py @@ -61,23 +61,23 @@ class Action(enum.Enum): def is_special_value(value): '''Check if given source/target specification is special (keyword) value ''' - return value.startswith('$') + return value.startswith('@') def verify_target_value(system_info, value): ''' Check if given value names valid target This function check if given value is not only syntactically correct, but also if names valid service call target (existing domain, - or valid $dispvm like keyword) + or valid @dispvm like keyword) :param system_info: information about the system :param value: value to be checked ''' - if value == '$dispvm': + if value == '@dispvm': return True - elif value == '$adminvm': + elif value == '@adminvm': return True - elif value.startswith('$dispvm:'): + elif value.startswith('@dispvm:'): dispvm_base = value.split(':', 1)[1] if dispvm_base not in system_info['domains']: return False @@ -89,11 +89,11 @@ def verify_target_value(system_info, value): def verify_special_value(value, for_target=True, specific_target=False): ''' - Verify if given special VM-specifier ('$...') is valid + Verify if given special VM-specifier ('@...') is valid :param value: value to verify :param for_target: should classify target-only values as valid ( - '$default', '$dispvm') + '@default', '@dispvm') :param specific_target: allow only values naming specific target (for use with target=, default= etc) :return: True or False @@ -103,27 +103,27 @@ def verify_special_value(value, for_target=True, specific_target=False): # values used only for matching VMs, not naming specific one (for actual # call target) if not specific_target: - if value.startswith('$tag:') and len(value) > len('$tag:'): + if value.startswith('@tag:') and len(value) > len('@tag:'): return True - if value.startswith('$type:') and len(value) > len('$type:'): + if value.startswith('@type:') and len(value) > len('@type:'): return True - if for_target and value.startswith('$dispvm:$tag:') and \ - len(value) > len('$dispvm:$tag:'): + if for_target and value.startswith('@dispvm:@tag:') and \ + len(value) > len('@dispvm:@tag:'): return True - if value == '$anyvm': + if value == '@anyvm': return True - if for_target and value == '$default': + if for_target and value == '@default': return True # those can be used to name one specific call VM - if value == '$adminvm': + if value == '@adminvm': return True - # allow only specific dispvm, not based on any $xxx keyword - don't name - # $tag here specifically, to work also with any future keywords - if for_target and value.startswith('$dispvm:') and \ - not value.startswith('$dispvm:$'): + # allow only specific dispvm, not based on any @xxx keyword - don't name + # @tag here specifically, to work also with any future keywords + if for_target and value.startswith('@dispvm:') and \ + not value.startswith('@dispvm:@'): return True - if for_target and value == '$dispvm': + if for_target and value == '@dispvm': return True return False @@ -138,7 +138,7 @@ class PolicyRule(object): :raise PolicySyntaxError: when syntax error is found :param line: a single line of actual qrexec policy (not a comment, - empty line or $include) + empty line or @include) :param filename: name of the file from which this line is loaded :param lineno: line number from which this line is loaded ''' @@ -193,30 +193,30 @@ class PolicyRule(object): 'invalid option {} for {} action'.format(param, action)) # verify special values - if self.source.startswith('$'): + if is_special_value(self.source): if not verify_special_value(self.source, False, False): raise PolicySyntaxError(filename, lineno, 'invalid source specification: {}'.format(self.source)) - if self.target.startswith('$'): + if is_special_value(self.target): if not verify_special_value(self.target, True, False): raise PolicySyntaxError(filename, lineno, 'invalid target specification: {}'.format(self.target)) - if self.target == '$default' \ + if self.target == '@default' \ and self.action == Action.allow \ and self.override_target is None: raise PolicySyntaxError(filename, lineno, - 'allow action for $default rule must specify target= option') + 'allow action for @default rule must specify target= option') if self.override_target is not None: - if self.override_target.startswith('$') and \ + if is_special_value(self.override_target) and \ not verify_special_value(self.override_target, True, True): raise PolicySyntaxError(filename, lineno, 'target= option needs to name specific target') if self.default_target is not None: - if self.default_target.startswith('$') and \ + if is_special_value(self.default_target) and \ not verify_special_value(self.default_target, True, True): raise PolicySyntaxError(filename, lineno, 'target= option needs to name specific target') @@ -224,7 +224,7 @@ class PolicyRule(object): @staticmethod def is_match_single(system_info, policy_value, value): ''' - Evaluate if a single value (VM name or '$default') matches policy + Evaluate if a single value (VM name or '@default') matches policy specification :param system_info: information about the system @@ -235,62 +235,62 @@ class PolicyRule(object): ''' # pylint: disable=too-many-return-statements - # not specified target matches only with $default and $anyvm policy + # not specified target matches only with @default and @anyvm policy # entry - if value == '$default' or value == '': - return policy_value in ('$default', '$anyvm') + if value == '@default' or value == '': + return policy_value in ('@default', '@anyvm') # if specific target used, check if it's valid # this function (is_match_single) is also used for checking call source # values, but this isn't a problem, because it will always be a - # domain name (not $dispvm or such) - this is guaranteed by a nature + # domain name (not @dispvm or such) - this is guaranteed by a nature # of qrexec call if not verify_target_value(system_info, value): return False - # handle $adminvm keyword + # handle @adminvm keyword if policy_value == 'dom0': # TODO: log a warning in Qubes 4.1 - policy_value = '$adminvm' + policy_value = '@adminvm' if value == 'dom0': - value = '$adminvm' + value = '@adminvm' # allow any _valid_, non-dom0 target - if policy_value == '$anyvm': - return value != '$adminvm' + if policy_value == '@anyvm': + return value != '@adminvm' - # exact match, including $dispvm* and $adminvm + # exact match, including @dispvm* and @adminvm if value == policy_value: return True # DispVM request, using tags to match - if policy_value.startswith('$dispvm:$tag:') \ - and value.startswith('$dispvm:'): + if policy_value.startswith('@dispvm:@tag:') \ + and value.startswith('@dispvm:'): tag = policy_value.split(':', 2)[2] dispvm_base = value.split(':', 1)[1] # already checked for existence by verify_target_value call dispvm_base_info = system_info['domains'][dispvm_base] return tag in dispvm_base_info['tags'] - # if $dispvm* not matched above, reject it; default DispVM (bare - # $dispvm) was resolved by the caller - if value.startswith('$dispvm:'): + # if @dispvm* not matched above, reject it; default DispVM (bare + # @dispvm) was resolved by the caller + if value.startswith('@dispvm:'): return False - # require $adminvm to be matched explicitly (not through $tag or $type) + # require @adminvm to be matched explicitly (not through @tag or @type) # - if not matched already, reject it - if value == '$adminvm': + if value == '@adminvm': return False # at this point, value name a specific target domain_info = system_info['domains'][value] - if policy_value.startswith('$tag:'): + if policy_value.startswith('@tag:'): tag = policy_value.split(':', 1)[1] return tag in domain_info['tags'] - if policy_value.startswith('$type:'): + if policy_value.startswith('@type:'): type_ = policy_value.split(':', 1)[1] return type_ == domain_info['type'] @@ -310,17 +310,17 @@ class PolicyRule(object): if not self.is_match_single(system_info, self.source, source): return False - # $dispvm in policy matches _only_ $dispvm (but not $dispvm:some-vm, + # @dispvm in policy matches _only_ @dispvm (but not @dispvm:some-vm, # even if that would be the default one) - if self.target == '$dispvm' and target == '$dispvm': + if self.target == '@dispvm' and target == '@dispvm': return True - if target == '$dispvm': - # resolve default DispVM, to check all kinds of $dispvm:* + if target == '@dispvm': + # resolve default DispVM, to check all kinds of @dispvm:* default_dispvm = system_info['domains'][source]['default_dispvm'] if default_dispvm is None: - # if this VM have no default DispVM, match only with $anyvm - return self.target == '$anyvm' - target = '$dispvm:' + default_dispvm + # if this VM have no default DispVM, match only with @anyvm + return self.target == '@anyvm' + target = '@dispvm:' + default_dispvm if not self.is_match_single(system_info, self.target, target): return False return True @@ -333,30 +333,30 @@ class PolicyRule(object): :return: matching domains ''' - if self.target.startswith('$tag:'): + if self.target.startswith('@tag:'): tag = self.target.split(':', 1)[1] for name, domain in system_info['domains'].items(): if tag in domain['tags']: yield name - elif self.target.startswith('$type:'): + elif self.target.startswith('@type:'): type_ = self.target.split(':', 1)[1] for name, domain in system_info['domains'].items(): if type_ == domain['type']: yield name - elif self.target == '$anyvm': + elif self.target == '@anyvm': for name, domain in system_info['domains'].items(): if name != 'dom0': yield name if domain['template_for_dispvms']: - yield '$dispvm:' + name - yield '$dispvm' - elif self.target.startswith('$dispvm:$tag:'): + yield '@dispvm:' + name + yield '@dispvm' + elif self.target.startswith('@dispvm:@tag:'): tag = self.target.split(':', 2)[2] for name, domain in system_info['domains'].items(): if tag in domain['tags']: if domain['template_for_dispvms']: - yield '$dispvm:' + name - elif self.target.startswith('$dispvm:'): + yield '@dispvm:' + name + elif self.target.startswith('@dispvm:'): dispvm_base = self.target.split(':', 1)[1] try: if system_info['domains'][dispvm_base]['template_for_dispvms']: @@ -364,9 +364,9 @@ class PolicyRule(object): except KeyError: # TODO log a warning? pass - elif self.target == '$adminvm': + elif self.target == '@adminvm': yield self.target - elif self.target == '$dispvm': + elif self.target == '@dispvm': yield self.target else: if self.target in system_info['domains']: @@ -374,17 +374,17 @@ class PolicyRule(object): def expand_override_target(self, system_info, source): ''' - Replace '$dispvm' with specific '$dispvm:...' value, based on qrexec + Replace '@dispvm' with specific '@dispvm:...' value, based on qrexec call source. :param system_info: System information :param source: Source domain name - :return: :py:attr:`override_target` with '$dispvm' substituted + :return: :py:attr:`override_target` with '@dispvm' substituted ''' - if self.override_target == '$dispvm': + if self.override_target == '@dispvm': if system_info['domains'][source]['default_dispvm'] is None: return None - return '$dispvm:' + system_info['domains'][source]['default_dispvm'] + return '@dispvm:' + system_info['domains'][source]['default_dispvm'] else: return self.override_target @@ -450,12 +450,12 @@ class PolicyAction(object): assert self.action == Action.allow assert self.target is not None - if self.target == '$adminvm': + if self.target == '@adminvm': self.target = 'dom0' if self.target == 'dom0': original_target_type = \ 'keyword' if is_special_value(self.original_target) else 'name' - original_target = self.original_target.lstrip('$') + original_target = self.original_target.lstrip('@') cmd = \ 'QUBESRPC {service} {source} {original_target_type} ' \ '{original_target}'.format( @@ -468,7 +468,7 @@ class PolicyAction(object): user=(self.rule.override_user or 'DEFAULT'), service=self.service, source=self.source) - if self.target.startswith('$dispvm:'): + if self.target.startswith('@dispvm:'): target = self.spawn_dispvm() dispvm = True else: @@ -569,13 +569,15 @@ class Policy(object): for lineno, line in zip(itertools.count(start=1), policy_file.readlines()): line = line.strip() + # compatibility with old keywords notation + line = line.replace('$', '@') if not line: # skip empty lines continue if line[0] == '#': # skip comments continue - if line.startswith('$include:'): + if line.startswith('@include:'): include_path = line.split(':', 1)[1] # os.path.join will leave include_path unchanged if it's # already absolute @@ -597,7 +599,7 @@ class Policy(object): ''' Collect targets the user can choose from in 'ask' action Word 'targets' is used intentionally instead of 'domains', because it - can also contains $dispvm like keywords. + can also contains @dispvm like keywords. ''' targets = set() @@ -617,17 +619,17 @@ class Policy(object): targets.update(rule.expand_target(system_info)) # expand default DispVM - if '$dispvm' in targets: - targets.remove('$dispvm') + if '@dispvm' in targets: + targets.remove('@dispvm') if system_info['domains'][source]['default_dispvm'] is not None: - dispvm = '$dispvm:' + \ + dispvm = '@dispvm:' + \ system_info['domains'][source]['default_dispvm'] if verify_target_value(system_info, dispvm): targets.add(dispvm) # expand other keywords - if '$adminvm' in targets: - targets.remove('$adminvm') + if '@adminvm' in targets: + targets.remove('@adminvm') targets.add('dom0') return targets @@ -668,18 +670,18 @@ class Policy(object): return PolicyAction(self.service, source, rule.default_target, rule, target, targets) elif rule.action == Action.allow: - if actual_target == '$default': + if actual_target == '@default': raise AccessDenied( 'policy define \'allow\' action at {}:{} but no target is ' 'specified by caller or policy'.format( rule.filename, rule.lineno)) - if actual_target == '$dispvm': + if actual_target == '@dispvm': if system_info['domains'][source]['default_dispvm'] is None: raise AccessDenied( - 'policy define \'allow\' action to $dispvm at {}:{} ' + 'policy define \'allow\' action to @dispvm at {}:{} ' 'but no DispVM base is set for this VM'.format( rule.filename, rule.lineno)) - actual_target = '$dispvm:' + \ + actual_target = '@dispvm:' + \ system_info['domains'][source]['default_dispvm'] return PolicyAction(self.service, source, diff --git a/qubespolicy/cli.py b/qubespolicy/cli.py index 0206c946..da1d02a7 100644 --- a/qubespolicy/cli.py +++ b/qubespolicy/cli.py @@ -58,7 +58,7 @@ def create_default_policy(service_name): policy.write("\n") policy.write("## Please use a single # to start your custom comments\n") policy.write("\n") - policy.write("$anyvm $anyvm ask\n") + policy.write("@anyvm @anyvm ask\n") def main(args=None): @@ -118,7 +118,7 @@ def main(args=None): if not (system_info['domains'][dispvm_base] ['template_for_dispvms']): continue - dispvm_api_name = '$dispvm:' + dispvm_base + dispvm_api_name = '@dispvm:' + dispvm_base icons[dispvm_api_name] = \ system_info['domains'][dispvm_base]['icon'] icons[dispvm_api_name] = \ diff --git a/qubespolicy/graph.py b/qubespolicy/graph.py index d2dd1ddb..90136e5b 100644 --- a/qubespolicy/graph.py +++ b/qubespolicy/graph.py @@ -85,8 +85,8 @@ def main(args=None): sources = args.source targets = list(system_info['domains'].keys()) - targets.append('$dispvm') - targets.extend('$dispvm:' + dom for dom in system_info['domains'] + targets.append('@dispvm') + targets.extend('@dispvm:' + dom for dom in system_info['domains'] if system_info['domains'][dom]['template_for_dispvms']) connections = set() diff --git a/qubespolicy/gtkhelpers.py b/qubespolicy/gtkhelpers.py index 2a5abf98..f68ed938 100644 --- a/qubespolicy/gtkhelpers.py +++ b/qubespolicy/gtkhelpers.py @@ -50,8 +50,8 @@ class VMListModeler: def _create_entries(self): for name, vm in self._domains_info.items(): - if name.startswith('$dispvm:'): - vm_name = name[len('$dispvm:'):] + if name.startswith('@dispvm:'): + vm_name = name[len('@dispvm:'):] dispvm = True else: vm_name = name diff --git a/qubespolicy/tests/__init__.py b/qubespolicy/tests/__init__.py index 45ee32bc..036b4fe5 100644 --- a/qubespolicy/tests/__init__.py +++ b/qubespolicy/tests/__init__.py @@ -95,72 +95,72 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase): self.assertTrue( qubespolicy.verify_target_value(system_info, 'default-dvm')) self.assertTrue( - qubespolicy.verify_target_value(system_info, '$dispvm')) + qubespolicy.verify_target_value(system_info, '@dispvm')) self.assertTrue( - qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm')) + qubespolicy.verify_target_value(system_info, '@dispvm:default-dvm')) self.assertTrue( qubespolicy.verify_target_value(system_info, 'test-template')) self.assertTrue( qubespolicy.verify_target_value(system_info, 'test-standalone')) self.assertTrue( - qubespolicy.verify_target_value(system_info, '$adminvm')) + qubespolicy.verify_target_value(system_info, '@adminvm')) self.assertFalse( qubespolicy.verify_target_value(system_info, 'no-such-vm')) self.assertFalse( qubespolicy.verify_target_value(system_info, - '$dispvm:test-invalid-dvm')) + '@dispvm:test-invalid-dvm')) self.assertFalse( - qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1')) + qubespolicy.verify_target_value(system_info, '@dispvm:test-vm1')) self.assertFalse( qubespolicy.verify_target_value(system_info, '')) self.assertFalse( - qubespolicy.verify_target_value(system_info, '$default')) + qubespolicy.verify_target_value(system_info, '@default')) self.assertFalse( - qubespolicy.verify_target_value(system_info, '$anyvm')) + qubespolicy.verify_target_value(system_info, '@anyvm')) self.assertFalse( - qubespolicy.verify_target_value(system_info, '$tag:tag1')) + qubespolicy.verify_target_value(system_info, '@tag:tag1')) self.assertFalse( - qubespolicy.verify_target_value(system_info, '$dispvm:$tag:tag1')) + qubespolicy.verify_target_value(system_info, '@dispvm:@tag:tag1')) self.assertFalse( - qubespolicy.verify_target_value(system_info, '$invalid')) + qubespolicy.verify_target_value(system_info, '@invalid')) def test_010_verify_special_value(self): - self.assertTrue(qubespolicy.verify_special_value('$tag:tag', + self.assertTrue(qubespolicy.verify_special_value('@tag:tag', for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag', + self.assertTrue(qubespolicy.verify_special_value('@tag:other-tag', for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('$type:AppVM', + self.assertTrue(qubespolicy.verify_special_value('@type:AppVM', for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('$adminvm', + self.assertTrue(qubespolicy.verify_special_value('@adminvm', for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('$dispvm:some-vm', + self.assertTrue(qubespolicy.verify_special_value('@dispvm:some-vm', for_target=True)) - self.assertTrue(qubespolicy.verify_special_value('$dispvm:$tag:tag1', + self.assertTrue(qubespolicy.verify_special_value('@dispvm:@tag:tag1', for_target=True)) - self.assertFalse(qubespolicy.verify_special_value('$default', + self.assertFalse(qubespolicy.verify_special_value('@default', for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('$dispvm', + self.assertFalse(qubespolicy.verify_special_value('@dispvm', for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm', + self.assertFalse(qubespolicy.verify_special_value('@dispvm:some-vm', for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('$dispvm:$tag:tag1', + self.assertFalse(qubespolicy.verify_special_value('@dispvm:@tag:tag1', for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('$invalid', + self.assertFalse(qubespolicy.verify_special_value('@invalid', for_target=False)) self.assertFalse(qubespolicy.verify_special_value('vm-name', for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('$tag:', + self.assertFalse(qubespolicy.verify_special_value('@tag:', for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('$type:', + self.assertFalse(qubespolicy.verify_special_value('@type:', for_target=False)) def test_020_line_simple(self): - line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12) + line = qubespolicy.PolicyRule('@anyvm @anyvm ask', 'filename', 12) self.assertEqual(line.filename, 'filename') self.assertEqual(line.lineno, 12) self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '$anyvm') - self.assertEqual(line.target, '$anyvm') + self.assertEqual(line.source, '@anyvm') + self.assertEqual(line.target, '@anyvm') self.assertEqual(line.full_action, 'ask') self.assertIsNone(line.override_target) self.assertIsNone(line.override_user) @@ -169,13 +169,13 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase): def test_021_line_simple(self): # also check spaces in action field line = qubespolicy.PolicyRule( - '$tag:tag1 $type:AppVM ask, target=test-vm2, user=user', + '@tag:tag1 @type:AppVM ask, target=test-vm2, user=user', 'filename', 12) self.assertEqual(line.filename, 'filename') self.assertEqual(line.lineno, 12) self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '$tag:tag1') - self.assertEqual(line.target, '$type:AppVM') + self.assertEqual(line.source, '@tag:tag1') + self.assertEqual(line.target, '@type:AppVM') self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user') self.assertEqual(line.override_target, 'test-vm2') self.assertEqual(line.override_user, 'user') @@ -183,27 +183,27 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase): def test_022_line_simple(self): line = qubespolicy.PolicyRule( - '$anyvm $default allow,target=$dispvm:test-vm2', + '@anyvm @default allow,target=@dispvm:test-vm2', 'filename', 12) self.assertEqual(line.filename, 'filename') self.assertEqual(line.lineno, 12) self.assertEqual(line.action, qubespolicy.Action.allow) - self.assertEqual(line.source, '$anyvm') - self.assertEqual(line.target, '$default') - self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2') - self.assertEqual(line.override_target, '$dispvm:test-vm2') + self.assertEqual(line.source, '@anyvm') + self.assertEqual(line.target, '@default') + self.assertEqual(line.full_action, 'allow,target=@dispvm:test-vm2') + self.assertEqual(line.override_target, '@dispvm:test-vm2') self.assertIsNone(line.override_user) self.assertIsNone(line.default_target) def test_023_line_simple(self): line = qubespolicy.PolicyRule( - '$anyvm $default ask,default_target=test-vm1', + '@anyvm @default ask,default_target=test-vm1', 'filename', 12) self.assertEqual(line.filename, 'filename') self.assertEqual(line.lineno, 12) self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '$anyvm') - self.assertEqual(line.target, '$default') + self.assertEqual(line.source, '@anyvm') + self.assertEqual(line.target, '@default') self.assertEqual(line.full_action, 'ask,default_target=test-vm1') self.assertIsNone(line.override_target) self.assertIsNone(line.override_user) @@ -211,39 +211,39 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase): def test_024_line_simple(self): line = qubespolicy.PolicyRule( - '$anyvm $adminvm ask,default_target=$adminvm', + '@anyvm @adminvm ask,default_target=@adminvm', 'filename', 12) self.assertEqual(line.filename, 'filename') self.assertEqual(line.lineno, 12) self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '$anyvm') - self.assertEqual(line.target, '$adminvm') - self.assertEqual(line.full_action, 'ask,default_target=$adminvm') + self.assertEqual(line.source, '@anyvm') + self.assertEqual(line.target, '@adminvm') + self.assertEqual(line.full_action, 'ask,default_target=@adminvm') self.assertIsNone(line.override_target) self.assertIsNone(line.override_user) - self.assertEqual(line.default_target, '$adminvm') + self.assertEqual(line.default_target, '@adminvm') def test_030_line_invalid(self): invalid_lines = [ - '$dispvm $default allow', # $dispvm can't be a source - '$default $default allow', # $default can't be a source - '$anyvm $default allow,target=$dispvm:$tag:tag1', # $dispvm:$tag + '@dispvm @default allow', # @dispvm can't be a source + '@default @default allow', # @default can't be a source + '@anyvm @default allow,target=@dispvm:@tag:tag1', # @dispvm:@tag # as override target - '$anyvm $default allow,target=$tag:tag1', # $tag as override target - '$anyvm $default deny,target=test-vm1', # target= used with deny - '$anyvm $anyvm deny,default_target=test-vm1', # default_target= + '@anyvm @default allow,target=@tag:tag1', # @tag as override target + '@anyvm @default deny,target=test-vm1', # target= used with deny + '@anyvm @anyvm deny,default_target=test-vm1', # default_target= # with deny - '$anyvm $anyvm deny,user=user', # user= with deny - '$anyvm $anyvm invalid', # invalid action - '$anyvm $anyvm allow,invalid=xx', # invalid option - '$anyvm $anyvm', # missing action - '$anyvm $anyvm allow,default_target=test-vm1', # default_target= + '@anyvm @anyvm deny,user=user', # user= with deny + '@anyvm @anyvm invalid', # invalid action + '@anyvm @anyvm allow,invalid=xx', # invalid option + '@anyvm @anyvm', # missing action + '@anyvm @anyvm allow,default_target=test-vm1', # default_target= # with allow - '$invalid $anyvm allow', # invalid source - '$anyvm $invalid deny', # invalid target + '@invalid @anyvm allow', # invalid source + '@anyvm @invalid deny', # invalid target '', # empty line - '$anyvm $anyvm allow extra', # trailing words - '$anyvm $default allow', # $default allow without target= + '@anyvm @anyvm allow extra', # trailing words + '@anyvm @default allow', # @default allow without target= ] for line in invalid_lines: with self.subTest(line): @@ -252,118 +252,118 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase): def test_040_match_single(self): is_match_single = qubespolicy.PolicyRule.is_match_single - self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '$anyvm', '$default')) - self.assertTrue(is_match_single(system_info, '$anyvm', '')) - self.assertTrue(is_match_single(system_info, '$default', '')) - self.assertTrue(is_match_single(system_info, '$default', '$default')) - self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '@anyvm', '@default')) + self.assertTrue(is_match_single(system_info, '@anyvm', '')) + self.assertTrue(is_match_single(system_info, '@default', '')) + self.assertTrue(is_match_single(system_info, '@default', '@default')) + self.assertTrue(is_match_single(system_info, '@tag:tag1', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '@type:AppVM', 'test-vm1')) self.assertTrue(is_match_single(system_info, - '$type:TemplateVM', 'test-template')) - self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm')) + '@type:TemplateVM', 'test-template')) + self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm')) self.assertTrue(is_match_single(system_info, - '$anyvm', '$dispvm:default-dvm')) - self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm')) + '@anyvm', '@dispvm:default-dvm')) + self.assertTrue(is_match_single(system_info, '@dispvm', '@dispvm')) self.assertTrue(is_match_single(system_info, - '$dispvm:$tag:tag3', '$dispvm:test-vm3')) - self.assertTrue(is_match_single(system_info, '$adminvm', '$adminvm')) - self.assertTrue(is_match_single(system_info, '$adminvm', 'dom0')) - self.assertTrue(is_match_single(system_info, 'dom0', '$adminvm')) + '@dispvm:@tag:tag3', '@dispvm:test-vm3')) + self.assertTrue(is_match_single(system_info, '@adminvm', '@adminvm')) + self.assertTrue(is_match_single(system_info, '@adminvm', 'dom0')) + self.assertTrue(is_match_single(system_info, 'dom0', '@adminvm')) self.assertTrue(is_match_single(system_info, 'dom0', 'dom0')) self.assertTrue(is_match_single(system_info, - '$dispvm:default-dvm', '$dispvm:default-dvm')) - self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm')) - self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + '@dispvm:default-dvm', '@dispvm:default-dvm')) + self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm')) + self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) - self.assertFalse(is_match_single(system_info, '$default', 'test-vm1')) - self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3')) - self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm')) + self.assertFalse(is_match_single(system_info, '@default', 'test-vm1')) + self.assertFalse(is_match_single(system_info, '@tag:tag1', 'test-vm3')) + self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm')) # test-vm1.template_for_dispvms=False self.assertFalse(is_match_single(system_info, - '$anyvm', '$dispvm:test-vm1')) + '@anyvm', '@dispvm:test-vm1')) # test-vm1.template_for_dispvms=False self.assertFalse(is_match_single(system_info, - '$dispvm:test-vm1', '$dispvm:test-vm1')) + '@dispvm:test-vm1', '@dispvm:test-vm1')) self.assertFalse(is_match_single(system_info, - '$dispvm:$tag:tag1', '$dispvm:test-vm1')) + '@dispvm:@tag:tag1', '@dispvm:test-vm1')) # test-vm3 has not tag1 self.assertFalse(is_match_single(system_info, - '$dispvm:$tag:tag1', '$dispvm:test-vm3')) + '@dispvm:@tag:tag1', '@dispvm:test-vm3')) # default-dvm has no tag3 self.assertFalse(is_match_single(system_info, - '$dispvm:$tag:tag3', '$dispvm:default-dvm')) - self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0')) - self.assertFalse(is_match_single(system_info, '$anyvm', '$adminvm')) + '@dispvm:@tag:tag3', '@dispvm:default-dvm')) + self.assertFalse(is_match_single(system_info, '@anyvm', 'dom0')) + self.assertFalse(is_match_single(system_info, '@anyvm', '@adminvm')) self.assertFalse(is_match_single(system_info, - '$tag:dom0-tag', '$adminvm')) + '@tag:dom0-tag', '@adminvm')) self.assertFalse(is_match_single(system_info, - '$type:AdminVM', '$adminvm')) + '@type:AdminVM', '@adminvm')) self.assertFalse(is_match_single(system_info, - '$tag:dom0-tag', 'dom0')) + '@tag:dom0-tag', 'dom0')) self.assertFalse(is_match_single(system_info, - '$type:AdminVM', 'dom0')) - self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0')) - self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1')) - self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM')) - self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid')) - self.assertFalse(is_match_single(system_info, '$invalid', '$invalid')) - self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm')) + '@type:AdminVM', 'dom0')) + self.assertFalse(is_match_single(system_info, '@tag:tag1', 'dom0')) + self.assertFalse(is_match_single(system_info, '@anyvm', '@tag:tag1')) + self.assertFalse(is_match_single(system_info, '@anyvm', '@type:AppVM')) + self.assertFalse(is_match_single(system_info, '@anyvm', '@invalid')) + self.assertFalse(is_match_single(system_info, '@invalid', '@invalid')) + self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm')) self.assertFalse(is_match_single(system_info, 'no-such-vm', 'no-such-vm')) - self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1')) - self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm')) + self.assertFalse(is_match_single(system_info, '@dispvm', 'test-vm1')) + self.assertFalse(is_match_single(system_info, '@dispvm', 'default-dvm')) self.assertFalse(is_match_single(system_info, - '$dispvm:default-dvm', 'default-dvm')) - self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n')) - self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1 ')) + '@dispvm:default-dvm', 'default-dvm')) + self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1\n')) + self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1 ')) def test_050_match(self): - line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + line = qubespolicy.PolicyRule('@anyvm @anyvm allow') self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2')) - line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + line = qubespolicy.PolicyRule('@anyvm @anyvm allow') self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2')) - line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + line = qubespolicy.PolicyRule('@anyvm @anyvm allow') self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm')) - line = qubespolicy.PolicyRule('$anyvm $dispvm allow') - self.assertTrue(line.is_match(system_info, 'test-vm1', '$dispvm')) - line = qubespolicy.PolicyRule('$anyvm $dispvm allow') + line = qubespolicy.PolicyRule('@anyvm @dispvm allow') + self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm')) + line = qubespolicy.PolicyRule('@anyvm @dispvm allow') self.assertFalse(line.is_match(system_info, - 'test-vm1', '$dispvm:default-dvm')) - line = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow') - self.assertTrue(line.is_match(system_info, 'test-vm1', '$dispvm')) - line = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow') + 'test-vm1', '@dispvm:default-dvm')) + line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow') + self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm')) + line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow') self.assertTrue(line.is_match(system_info, - 'test-vm1', '$dispvm:default-dvm')) - line = qubespolicy.PolicyRule('$anyvm $dispvm:$tag:tag3 allow') + 'test-vm1', '@dispvm:default-dvm')) + line = qubespolicy.PolicyRule('@anyvm @dispvm:@tag:tag3 allow') self.assertTrue(line.is_match(system_info, - 'test-vm1', '$dispvm:test-vm3')) + 'test-vm1', '@dispvm:test-vm3')) def test_060_expand_target(self): lines = { - '$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3', - '$dispvm:test-vm3', - 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', - 'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'], - '$anyvm $dispvm allow': ['$dispvm'], - '$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'], + '@anyvm @anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3', + '@dispvm:test-vm3', + 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone', '@dispvm'], + '@anyvm @dispvm allow': ['@dispvm'], + '@anyvm @dispvm:default-dvm allow': ['@dispvm:default-dvm'], # no DispVM from test-vm1 allowed - '$anyvm $dispvm:test-vm1 allow': [], - '$anyvm $dispvm:test-vm3 allow': ['$dispvm:test-vm3'], - '$anyvm $dispvm:$tag:tag1 allow': [], - '$anyvm $dispvm:$tag:tag3 allow': ['$dispvm:test-vm3'], - '$anyvm test-vm1 allow': ['test-vm1'], - '$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3', + '@anyvm @dispvm:test-vm1 allow': [], + '@anyvm @dispvm:test-vm3 allow': ['@dispvm:test-vm3'], + '@anyvm @dispvm:@tag:tag1 allow': [], + '@anyvm @dispvm:@tag:tag3 allow': ['@dispvm:test-vm3'], + '@anyvm test-vm1 allow': ['test-vm1'], + '@anyvm @type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3', 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'], - '$anyvm $type:TemplateVM allow': ['test-template'], - '$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm', + '@anyvm @type:TemplateVM allow': ['test-template'], + '@anyvm @tag:tag1 allow': ['test-vm1', 'test-invalid-dvm', 'test-template', 'test-standalone', 'test-no-dvm'], - '$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2', + '@anyvm @tag:tag2 allow': ['test-vm1', 'test-vm2', 'test-invalid-dvm', 'test-template', 'test-standalone', 'test-no-dvm'], - '$anyvm $tag:no-such-tag allow': [], + '@anyvm @tag:no-such-tag allow': [], } for line in lines: with self.subTest(line): @@ -373,56 +373,56 @@ class TC_00_PolicyRule(qubes.tests.QubesTestCase): def test_070_expand_override_target(self): line = qubespolicy.PolicyRule( - '$anyvm $anyvm allow,target=test-vm2') + '@anyvm @anyvm allow,target=test-vm2') self.assertEqual( line.expand_override_target(system_info, 'test-vm1'), 'test-vm2') def test_071_expand_override_target_dispvm(self): line = qubespolicy.PolicyRule( - '$anyvm $anyvm allow,target=$dispvm') + '@anyvm @anyvm allow,target=@dispvm') self.assertEqual( line.expand_override_target(system_info, 'test-vm1'), - '$dispvm:default-dvm') + '@dispvm:default-dvm') def test_072_expand_override_target_dispvm_specific(self): line = qubespolicy.PolicyRule( - '$anyvm $anyvm allow,target=$dispvm:test-vm3') + '@anyvm @anyvm allow,target=@dispvm:test-vm3') self.assertEqual( line.expand_override_target(system_info, 'test-vm1'), - '$dispvm:test-vm3') + '@dispvm:test-vm3') def test_073_expand_override_target_dispvm_none(self): line = qubespolicy.PolicyRule( - '$anyvm $anyvm allow,target=$dispvm') + '@anyvm @anyvm allow,target=@dispvm') self.assertEqual( line.expand_override_target(system_info, 'test-no-dvm'), None) def test_074_expand_override_target_dom0(self): line = qubespolicy.PolicyRule( - '$anyvm $anyvm allow,target=dom0') + '@anyvm @anyvm allow,target=dom0') self.assertEqual( line.expand_override_target(system_info, 'test-no-dvm'), 'dom0') def test_075_expand_override_target_dom0(self): line = qubespolicy.PolicyRule( - '$anyvm $anyvm allow,target=$adminvm') + '@anyvm @anyvm allow,target=@adminvm') self.assertEqual( line.expand_override_target(system_info, 'test-no-dvm'), - '$adminvm') + '@adminvm') class TC_10_PolicyAction(qubes.tests.QubesTestCase): def test_000_init(self): - rule = qubespolicy.PolicyRule('$anyvm $anyvm deny') + rule = qubespolicy.PolicyRule('@anyvm @anyvm deny') with self.assertRaises(qubespolicy.AccessDenied): qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', rule, 'test-vm2') def test_001_init(self): - rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') action = qubespolicy.PolicyAction('test.service', 'test-vm1', None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) self.assertEqual(action.service, 'test.service') @@ -434,8 +434,8 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): self.assertEqual(action.action, qubespolicy.Action.ask) def test_002_init_invalid(self): - rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask') - rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow') + rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask') + rule_allow = qubespolicy.PolicyRule('@anyvm @anyvm allow') with self.assertRaises(AssertionError): qubespolicy.PolicyAction('test.service', 'test-vm1', None, rule_allow, 'test-vm2', None) @@ -448,7 +448,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): None, rule_ask, 'test-vm2', None) def test_003_init_default_target(self): - rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask') action = qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm1', rule_ask, 'test-vm2', ['test-vm2']) @@ -459,7 +459,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): self.assertEqual(action.target, 'test-vm2') def test_010_handle_user_response(self): - rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') action = qubespolicy.PolicyAction('test.service', 'test-vm1', None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) action.handle_user_response(True, 'test-vm2') @@ -467,14 +467,14 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): self.assertEqual(action.target, 'test-vm2') def test_011_handle_user_response(self): - rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') action = qubespolicy.PolicyAction('test.service', 'test-vm1', None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) with self.assertRaises(AssertionError): action.handle_user_response(True, 'test-no-dvm') def test_012_handle_user_response(self): - rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') action = qubespolicy.PolicyAction('test.service', 'test-vm1', None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) with self.assertRaises(qubespolicy.AccessDenied): @@ -483,7 +483,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): def test_013_handle_user_response_with_default_target(self): rule = qubespolicy.PolicyRule( - '$anyvm $anyvm ask,default_target=test-vm2') + '@anyvm @anyvm ask,default_target=test-vm2') action = qubespolicy.PolicyAction('test.service', 'test-vm1', None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) action.handle_user_response(True, 'test-vm2') @@ -493,7 +493,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): @unittest.mock.patch('qubespolicy.qubesd_call') @unittest.mock.patch('subprocess.call') def test_020_execute(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + rule = qubespolicy.PolicyRule('@anyvm @anyvm allow') action = qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', rule, 'test-vm2') action.execute('some-ident') @@ -506,7 +506,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): @unittest.mock.patch('qubespolicy.qubesd_call') @unittest.mock.patch('subprocess.call') def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('$anyvm dom0 allow') + rule = qubespolicy.PolicyRule('@anyvm dom0 allow') action = qubespolicy.PolicyAction('test.service', 'test-vm1', 'dom0', rule, 'dom0') action.execute('some-ident') @@ -519,9 +519,9 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): @unittest.mock.patch('qubespolicy.qubesd_call') @unittest.mock.patch('subprocess.call') def test_021_execute_dom0_keyword(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('$anyvm dom0 allow') + rule = qubespolicy.PolicyRule('@anyvm dom0 allow') action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'dom0', rule, '$adminvm') + 'dom0', rule, '@adminvm') action.execute('some-ident') self.assertEqual(mock_qubesd_call.mock_calls, []) self.assertEqual(mock_subprocess.mock_calls, @@ -532,9 +532,9 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): @unittest.mock.patch('qubespolicy.qubesd_call') @unittest.mock.patch('subprocess.call') def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow') + rule = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow') action = qubespolicy.PolicyAction('test.service', 'test-vm1', - '$dispvm:default-dvm', rule, '$dispvm:default-dvm') + '@dispvm:default-dvm', rule, '@dispvm:default-dvm') mock_qubesd_call.side_effect = (lambda target, call: b'dispvm-name' if call == 'admin.vm.CreateDisposable' else unittest.mock.DEFAULT) @@ -552,7 +552,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): @unittest.mock.patch('subprocess.call') def test_023_execute_already_running(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + rule = qubespolicy.PolicyRule('@anyvm @anyvm allow') action = qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', rule, 'test-vm2') mock_qubesd_call.side_effect = \ @@ -568,7 +568,7 @@ class TC_10_PolicyAction(qubes.tests.QubesTestCase): @unittest.mock.patch('subprocess.call') def test_024_execute_startup_error(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + rule = qubespolicy.PolicyRule('@anyvm @anyvm allow') action = qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', rule, 'test-vm2') mock_qubesd_call.side_effect = \ @@ -597,7 +597,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): f.write('# comment\n') f.write('test-vm2 test-vm3 ask\n') f.write(' # comment \n') - f.write('$anyvm $anyvm ask\n') + f.write('@anyvm @anyvm ask\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertEqual(policy.service, 'test.service') self.assertEqual(len(policy.policy_rules), 3) @@ -613,10 +613,10 @@ class TC_20_Policy(qubes.tests.QubesTestCase): def test_002_include(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - f.write('$include:test.service2\n') - f.write('$anyvm $anyvm deny\n') + f.write('@include:test.service2\n') + f.write('@anyvm @anyvm deny\n') with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f: - f.write('test-vm3 $default allow,target=test-vm2\n') + f.write('test-vm3 @default allow,target=test-vm2\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertEqual(policy.service, 'test.service') self.assertEqual(len(policy.policy_rules), 3) @@ -628,31 +628,46 @@ class TC_20_Policy(qubes.tests.QubesTestCase): tmp_policy_dir + '/test.service') self.assertEqual(policy.policy_rules[0].lineno, 1) self.assertEqual(policy.policy_rules[1].source, 'test-vm3') - self.assertEqual(policy.policy_rules[1].target, '$default') + self.assertEqual(policy.policy_rules[1].target, '@default') self.assertEqual(policy.policy_rules[1].action, qubespolicy.Action.allow) self.assertEqual(policy.policy_rules[1].filename, tmp_policy_dir + '/test.service2') self.assertEqual(policy.policy_rules[1].lineno, 1) - self.assertEqual(policy.policy_rules[2].source, '$anyvm') - self.assertEqual(policy.policy_rules[2].target, '$anyvm') + self.assertEqual(policy.policy_rules[2].source, '@anyvm') + self.assertEqual(policy.policy_rules[2].target, '@anyvm') self.assertEqual(policy.policy_rules[2].action, qubespolicy.Action.deny) self.assertEqual(policy.policy_rules[2].filename, tmp_policy_dir + '/test.service') self.assertEqual(policy.policy_rules[2].lineno, 3) + def test_003_load_convert(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm2 test-vm3 ask\n') + f.write(' # comment \n') + f.write('$anyvm $dispvm ask,default_target=$dispvm\n') + policy = qubespolicy.Policy('test.service', tmp_policy_dir) + self.assertEqual(policy.service, 'test.service') + self.assertEqual(len(policy.policy_rules), 2) + self.assertEqual(policy.policy_rules[1].source, '@anyvm') + self.assertEqual(policy.policy_rules[1].target, '@dispvm') + self.assertEqual(policy.policy_rules[1].action, + qubespolicy.Action.ask) + self.assertEqual(policy.policy_rules[1].default_target, + '@dispvm') + def test_010_find_rule(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 $anyvm ask\n') - f.write('test-vm2 $tag:tag1 deny\n') - f.write('test-vm2 $tag:tag2 allow\n') - f.write('test-vm2 $dispvm:$tag:tag3 allow\n') - f.write('test-vm2 $dispvm:$tag:tag2 allow\n') - f.write('test-vm2 $dispvm:default-dvm allow\n') - f.write('$type:AppVM $default allow,target=test-vm3\n') - f.write('$tag:tag1 $type:AppVM allow\n') + f.write('test-vm1 @anyvm ask\n') + f.write('test-vm2 @tag:tag1 deny\n') + f.write('test-vm2 @tag:tag2 allow\n') + f.write('test-vm2 @dispvm:@tag:tag3 allow\n') + f.write('test-vm2 @dispvm:@tag:tag2 allow\n') + f.write('test-vm2 @dispvm:default-dvm allow\n') + f.write('@type:AppVM @default allow,target=test-vm3\n') + f.write('@tag:tag1 @type:AppVM allow\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertEqual(policy.find_matching_rule( system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0]) @@ -662,47 +677,47 @@ class TC_20_Policy(qubes.tests.QubesTestCase): system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3]) self.assertEqual(policy.find_matching_rule( system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2]) - # $anyvm matches $default too + # @anyvm matches @default too self.assertEqual(policy.find_matching_rule( system_info, 'test-vm1', ''), policy.policy_rules[1]) self.assertEqual(policy.find_matching_rule( system_info, 'test-vm2', ''), policy.policy_rules[7]) self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', '$default'), policy.policy_rules[7]) + system_info, 'test-vm2', '@default'), policy.policy_rules[7]) self.assertEqual(policy.find_matching_rule( system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8]) self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', '$dispvm:test-vm3'), + system_info, 'test-vm2', '@dispvm:test-vm3'), policy.policy_rules[4]) self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', '$dispvm'), + system_info, 'test-vm2', '@dispvm'), policy.policy_rules[6]) with self.assertRaises(qubespolicy.AccessDenied): policy.find_matching_rule( system_info, 'test-no-dvm', 'test-standalone') with self.assertRaises(qubespolicy.AccessDenied): - policy.find_matching_rule(system_info, 'test-no-dvm', '$dispvm') + policy.find_matching_rule(system_info, 'test-no-dvm', '@dispvm') with self.assertRaises(qubespolicy.AccessDenied): policy.find_matching_rule( - system_info, 'test-standalone', '$default') + system_info, 'test-standalone', '@default') def test_020_collect_targets_for_ask(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 $anyvm ask\n') - f.write('test-vm2 $tag:tag1 deny\n') - f.write('test-vm2 $tag:tag2 allow\n') - f.write('test-no-dvm $type:AppVM deny\n') - f.write('$type:AppVM $default allow,target=test-vm3\n') - f.write('$tag:tag1 $type:AppVM allow\n') - f.write('test-no-dvm $dispvm allow\n') - f.write('test-standalone $dispvm allow\n') - f.write('test-standalone $adminvm allow\n') + f.write('test-vm1 @anyvm ask\n') + f.write('test-vm2 @tag:tag1 deny\n') + f.write('test-vm2 @tag:tag2 allow\n') + f.write('test-no-dvm @type:AppVM deny\n') + f.write('@type:AppVM @default allow,target=test-vm3\n') + f.write('@tag:tag1 @type:AppVM allow\n') + f.write('test-no-dvm @dispvm allow\n') + f.write('test-standalone @dispvm allow\n') + f.write('test-standalone @adminvm allow\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) self.assertCountEqual(policy.collect_targets_for_ask(system_info, 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3', - '$dispvm:test-vm3', - 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + '@dispvm:test-vm3', + 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', 'test-no-dvm', 'test-template', 'test-standalone']) self.assertCountEqual(policy.collect_targets_for_ask(system_info, 'test-vm2'), ['test-vm2', 'test-vm3']) @@ -711,7 +726,7 @@ class TC_20_Policy(qubes.tests.QubesTestCase): self.assertCountEqual(policy.collect_targets_for_ask(system_info, 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3', 'default-dvm', 'test-no-dvm', 'test-invalid-dvm', - '$dispvm:default-dvm', 'dom0']) + '@dispvm:default-dvm', 'dom0']) self.assertCountEqual(policy.collect_targets_for_ask(system_info, 'test-no-dvm'), []) @@ -728,37 +743,37 @@ class TC_20_Policy(qubes.tests.QubesTestCase): self.assertEqual(action.service, 'test.service') self.assertIsNone(action.targets_for_ask) with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-vm2', '$default') + policy.evaluate(system_info, 'test-vm2', '@default') def test_031_eval_default(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 $default allow,target=test-vm2\n') - f.write('$tag:tag1 test-vm2 ask\n') - f.write('$tag:tag2 $anyvm allow\n') - f.write('test-vm3 $anyvm deny\n') + f.write('test-vm1 @default allow,target=test-vm2\n') + f.write('@tag:tag1 test-vm2 ask\n') + f.write('@tag:tag2 @anyvm allow\n') + f.write('test-vm3 @anyvm deny\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-vm1', '$default') + action = policy.evaluate(system_info, 'test-vm1', '@default') self.assertEqual(action.rule, policy.policy_rules[1]) self.assertEqual(action.action, qubespolicy.Action.allow) self.assertEqual(action.target, 'test-vm2') - self.assertEqual(action.original_target, '$default') + self.assertEqual(action.original_target, '@default') self.assertEqual(action.service, 'test.service') self.assertIsNone(action.targets_for_ask) with self.assertRaises(qubespolicy.AccessDenied): # action allow should hit, but no target specified (either by # caller or policy) - policy.evaluate(system_info, 'test-standalone', '$default') + policy.evaluate(system_info, 'test-standalone', '@default') def test_032_eval_ask(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 $default allow,target=test-vm2\n') - f.write('$tag:tag1 test-vm2 ask\n') - f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n') - f.write('$tag:tag2 $anyvm allow\n') - f.write('test-vm3 $anyvm deny\n') + f.write('test-vm1 @default allow,target=test-vm2\n') + f.write('@tag:tag1 test-vm2 ask\n') + f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n') + f.write('@tag:tag2 @anyvm allow\n') + f.write('test-vm3 @anyvm deny\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) action = policy.evaluate(system_info, 'test-standalone', 'test-vm2') @@ -768,18 +783,18 @@ class TC_20_Policy(qubes.tests.QubesTestCase): self.assertEqual(action.original_target, 'test-vm2') self.assertEqual(action.service, 'test.service') self.assertCountEqual(action.targets_for_ask, - ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', - 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + ['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3', + 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', 'test-no-dvm', 'test-template', 'test-standalone']) def test_033_eval_ask(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 $default allow,target=test-vm2\n') - f.write('$tag:tag1 test-vm2 ask\n') - f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n') - f.write('$tag:tag2 $anyvm allow\n') - f.write('test-vm3 $anyvm deny\n') + f.write('test-vm1 @default allow,target=test-vm2\n') + f.write('@tag:tag1 test-vm2 ask\n') + f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n') + f.write('@tag:tag2 @anyvm allow\n') + f.write('test-vm3 @anyvm deny\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) action = policy.evaluate(system_info, 'test-standalone', 'test-vm3') @@ -789,46 +804,46 @@ class TC_20_Policy(qubes.tests.QubesTestCase): self.assertEqual(action.original_target, 'test-vm3') self.assertEqual(action.service, 'test.service') self.assertCountEqual(action.targets_for_ask, - ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', - 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + ['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3', + 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', 'test-no-dvm', 'test-template', 'test-standalone']) def test_034_eval_resolve_dispvm(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm3 $dispvm allow\n') + f.write('test-vm3 @dispvm allow\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-vm3', '$dispvm') + action = policy.evaluate(system_info, 'test-vm3', '@dispvm') self.assertEqual(action.rule, policy.policy_rules[0]) self.assertEqual(action.action, qubespolicy.Action.allow) - self.assertEqual(action.target, '$dispvm:default-dvm') - self.assertEqual(action.original_target, '$dispvm') + self.assertEqual(action.target, '@dispvm:default-dvm') + self.assertEqual(action.original_target, '@dispvm') self.assertEqual(action.service, 'test.service') self.assertIsNone(action.targets_for_ask) def test_035_eval_resolve_dispvm_fail(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-no-dvm $dispvm allow\n') + f.write('test-no-dvm @dispvm allow\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-no-dvm', '$dispvm') + policy.evaluate(system_info, 'test-no-dvm', '@dispvm') def test_036_eval_invalid_override_target(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm3 $anyvm allow,target=no-such-vm\n') + f.write('test-vm3 @anyvm allow,target=no-such-vm\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-vm3', '$default') + policy.evaluate(system_info, 'test-vm3', '@default') def test_037_eval_ask_no_targets(self): with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm3 $default ask\n') + f.write('test-vm3 @default ask\n') policy = qubespolicy.Policy('test.service', tmp_policy_dir) with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-vm3', '$default') + policy.evaluate(system_info, 'test-vm3', '@default') class TC_30_Misc(qubes.tests.QubesTestCase): diff --git a/qubespolicy/tests/cli.py b/qubespolicy/tests/cli.py index 2e3075dc..cc65ba26 100644 --- a/qubespolicy/tests/cli.py +++ b/qubespolicy/tests/cli.py @@ -107,7 +107,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase): 'test-vm1': 'red', 'test-vm2': 'red', 'test-vm3': 'green', - '$dispvm:test-vm3': 'green', + '@dispvm:test-vm3': 'green', } self.assertEqual(self.dbus_mock.mock_calls, [ ('', (), {}), @@ -146,7 +146,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase): 'test-vm1': 'red', 'test-vm2': 'red', 'test-vm3': 'green', - '$dispvm:test-vm3': 'green', + '@dispvm:test-vm3': 'green', } self.assertEqual(self.dbus_mock.mock_calls, [ ('', (), {}), @@ -183,7 +183,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase): 'test-vm1': 'red', 'test-vm2': 'red', 'test-vm3': 'green', - '$dispvm:test-vm3': 'green', + '@dispvm:test-vm3': 'green', } self.assertEqual(self.dbus_mock.mock_calls, [ ('', (), {}), @@ -314,7 +314,7 @@ class TC_00_qrexec_policy(qubes.tests.QubesTestCase): "\n" "## Please use a single # to start your custom comments\n" "\n" - "$anyvm $anyvm ask\n") + "@anyvm @anyvm ask\n") def test_041_create_policy_abort(self): self.policy_mock.configure_mock(**{ diff --git a/qubespolicy/tests/gtkhelpers.py b/qubespolicy/tests/gtkhelpers.py index d8f061d6..05b1971b 100755 --- a/qubespolicy/tests/gtkhelpers.py +++ b/qubespolicy/tests/gtkhelpers.py @@ -37,11 +37,11 @@ mock_domains_info = { 'test-red3': {'icon': 'red', 'type': 'AppVM'}, 'test-source': {'icon': 'green', 'type': 'AppVM'}, 'test-target': {'icon': 'orange', 'type': 'AppVM'}, - '$dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'}, + '@dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'}, } mock_whitelist = ["test-red1", "test-red2", "test-red3", - "test-target", "$dispvm:test-disp6"] + "test-target", "@dispvm:test-disp6"] class MockComboEntry: def __init__(self, text): @@ -108,7 +108,7 @@ class VMListModelerTest(VMListModeler, unittest.TestCase): self._get_valid_qube_name(None, None, mock_whitelist)) def test_valid_qube_name_whitelist(self): - list_exc = ["$dispvm:test-disp6", "test-red2"] + list_exc = ["@dispvm:test-disp6", "test-red2"] whitelist = [name for name in mock_whitelist if name not in list_exc] self.apply_model(Gtk.ComboBox(), whitelist) diff --git a/qubespolicy/tests/rpcconfirmation.py b/qubespolicy/tests/rpcconfirmation.py index 25e2d49e..e353a495 100755 --- a/qubespolicy/tests/rpcconfirmation.py +++ b/qubespolicy/tests/rpcconfirmation.py @@ -301,7 +301,7 @@ class RPCConfirmationWindowTestWhitelist(unittest.TestCase): def test_all_allowed_domains(self): self._assert_whitelist( ["test-red1", "test-red2", "test-red3", - "test-target", "$dispvm:test-disp6", "test-source", "dom0"], + "test-target", "@dispvm:test-disp6", "test-source", "dom0"], ["test-red1", "test-red2", "test-red3", "test-target", "Disposable VM (test-disp6)", "test-source", "dom0"]) diff --git a/qubespolicy/utils.py b/qubespolicy/utils.py index 4528b6b6..b16e2c38 100644 --- a/qubespolicy/utils.py +++ b/qubespolicy/utils.py @@ -24,7 +24,7 @@ def _sanitize_char(input_char, extra_allowed_characters): if (ord('a') <= input_char_ord <= ord('z')) \ or (ord('A') <= input_char_ord <= ord('Z')) \ or (ord('0') <= input_char_ord <= ord('9')) \ - or (input_char in ['$', '_', '-', '.']) \ + or (input_char in ['@', '_', '-', '.']) \ or (input_char in extra_allowed_characters): result = input_char else: