diff --git a/.travis.yml b/.travis.yml index 328c8785..cfb99cd8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ install: - pip install --quiet -r ci/requirements.txt - git clone https://github.com/"${TRAVIS_REPO_SLUG%%/*}"/qubes-builder ~/qubes-builder script: - - PYTHONPATH=test-packages pylint --rcfile=ci/pylintrc qubes qubespolicy + - PYTHONPATH=test-packages pylint --rcfile=ci/pylintrc qubes - ./run-tests --no-syslog - ~/qubes-builder/scripts/travis-build env: diff --git a/Makefile b/Makefile index c18b0dde..57a9eafb 100644 --- a/Makefile +++ b/Makefile @@ -190,7 +190,6 @@ endif cp qubes-rpc/qubes.GetRandomizedTime $(DESTDIR)/etc/qubes-rpc/ cp qubes-rpc/qubes.NotifyTools $(DESTDIR)/etc/qubes-rpc/ cp qubes-rpc/qubes.NotifyUpdates $(DESTDIR)/etc/qubes-rpc/ - cp qubes-rpc/policy.RegisterArgument $(DESTDIR)/etc/qubes-rpc/ install qubes-rpc/qubesd-query-fast $(DESTDIR)/usr/libexec/qubes/ install -m 0755 qvm-tools/qubes-bug-report $(DESTDIR)/usr/bin/qubes-bug-report install -m 0755 qvm-tools/qubes-hcl-report $(DESTDIR)/usr/bin/qubes-hcl-report diff --git a/ci/coveragerc b/ci/coveragerc index 8dccfa52..2d47c77e 100644 --- a/ci/coveragerc +++ b/ci/coveragerc @@ -1,3 +1,3 @@ [run] -source = qubes, qubespolicy -omit = qubes/tests/*, qubespolicy/tests/* +source = qubes +omit = qubes/tests/* diff --git a/ci/requirements.txt b/ci/requirements.txt index 001a61d1..311d186f 100644 --- a/ci/requirements.txt +++ b/ci/requirements.txt @@ -7,5 +7,4 @@ jinja2 lxml pylint sphinx -pydbus PyYAML diff --git a/doc/conf.py b/doc/conf.py index 7ae732aa..3c91f210 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -246,8 +246,6 @@ _man_pages_author = [] man_pages = [ ('manpages/qubesd-query', 'qubesd-query', u'Low-level qubesd interrogation tool', _man_pages_author, 1), - ('manpages/qrexec-policy-graph', 'qrexec-policy-graph', - u'Graph qrexec policy', _man_pages_author, 1), ] if os.path.exists('sandbox.rst'): diff --git a/doc/manpages/qrexec-policy-graph.rst b/doc/manpages/qrexec-policy-graph.rst deleted file mode 100644 index 6e95bea3..00000000 --- a/doc/manpages/qrexec-policy-graph.rst +++ /dev/null @@ -1,73 +0,0 @@ -.. program:: qrexec-policy-graph - -:program:`qrexec-policy-graph` -- Graph qrexec policy -===================================================== - -Synopsis --------- - -:command:`qrexec-policy-graph` [-h] [--include-ask] [--source *SOURCE* [*SOURCE* ...]] [--target *TARGET* [*TARGET* ...]] [--service *SERVICE* [*SERVICE* ...]] [--output *OUTPUT*] [--policy-dir POLICY_DIR] [--system-info SYSTEM_INFO] - - -Options -------- - -.. option:: --help, -h - - show this help message and exit - -.. option:: --include-ask - - Include `ask` action in graph. In most cases produce unreadable graphs - because many services contains `$anyvm $anyvm ask` rules. It's recommended to - limit graph using other options. - -.. option:: --source - - Limit graph to calls from *source*. You can specify multiple names. - -.. option:: --target - - Limit graph to calls to *target*. You can specify multiple names. - -.. option:: --service - - Limit graph to *service*. You can specify multiple names. This can be either - bare service name, or service with argument (joined with `+`). If bare - service name is given, output will contain also policies for specific - arguments. - -.. option:: --output - - Write to *output* instead of stdout. The file will be overwritten without - confirmation. - -.. option:: --policy-dir - - Look for policy in *policy-dir*. This can be useful to process policy - extracted from other system. This option adjust only base directory, if any - policy file contains `$include:path` with absolute path, it will try to load - the file from that location. - See also --system-info option. - -.. option:: --system-info - - Load system information from file instead of querying local qubesd instance. - The file should be in json format, as returned by `internal.GetSystemInfo` - qubesd method. This can be obtained by running in dom0: - - qubesd-query -e -c /var/run/qubesd.internal.sock dom0 \ - internal.GetSystemInfo dom0 | cut -b 3- - -.. option:: --skip-labels - - Do not include service names on the graph. Also, include only a single - connection between qubes if any service call is allowed there. - - -Authors -------- - -| Marek Marczykowski-Górecki - -.. vim: ts=3 sw=3 et tw=80 diff --git a/linux/system-config/Makefile b/linux/system-config/Makefile index ee047f12..865880f4 100644 --- a/linux/system-config/Makefile +++ b/linux/system-config/Makefile @@ -6,12 +6,7 @@ install: cp vif-route-qubes $(DESTDIR)/etc/xen/scripts cp block-snapshot $(DESTDIR)/etc/xen/scripts ln -s block-snapshot $(DESTDIR)/etc/xen/scripts/block-origin - install -d $(DESTDIR)/etc/xdg/autostart - install -m 0644 qrexec-policy-agent.desktop $(DESTDIR)/etc/xdg/autostart/ install -m 0644 -D tmpfiles-qubes.conf $(DESTDIR)/usr/lib/tmpfiles.d/qubes.conf - install -d $(DESTDIR)/etc/dbus-1/system.d - install -m 0644 dbus-org.qubesos.PolicyAgent.conf \ - $(DESTDIR)/etc/dbus-1/system.d/org.qubesos.PolicyAgent.conf install -d $(DESTDIR)/etc/logrotate.d install -m 0644 logrotate-qubes \ $(DESTDIR)/etc/logrotate.d/qubes diff --git a/linux/system-config/dbus-org.qubesos.PolicyAgent.conf b/linux/system-config/dbus-org.qubesos.PolicyAgent.conf deleted file mode 100644 index e1dd2b9d..00000000 --- a/linux/system-config/dbus-org.qubesos.PolicyAgent.conf +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - - - - - - - diff --git a/linux/system-config/qrexec-policy-agent.desktop b/linux/system-config/qrexec-policy-agent.desktop deleted file mode 100644 index 0e1cd8d3..00000000 --- a/linux/system-config/qrexec-policy-agent.desktop +++ /dev/null @@ -1,7 +0,0 @@ -[Desktop Entry] -Name=Qubes Qrexec Policy agent -Comment=Agent for handling policy confirmation prompts -Icon=qubes -Exec=qrexec-policy-agent -Terminal=false -Type=Application diff --git a/qubes-rpc/policy.RegisterArgument b/qubes-rpc/policy.RegisterArgument deleted file mode 100755 index e965ca14..00000000 --- a/qubes-rpc/policy.RegisterArgument +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python3 -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2017 Wojtek Porczyk -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -# - -'''policy.RegisterArgument - -This qrexec is meant for services, which require some kind of "registering" -before use (say ``example.Register`` and ``example.Perform+ARGUMENT``). After -registering, the backend should invoke this call with frontend as the intended -destination, with the actual service in argument of this call and the argument -as the payload. The policy generated will be a single line with explicit -frontend and backend domain names, and a plain "allow", without further -qualifiers. - -The call allows for registering an argument only once, for one frontend domain. -There is not possibility of deregistering or reregistering for another frontend. -The backend can always register another argument for any frontend, including -one that is already registered for some other argument. - -By default this qrexec is disabled by policy. To actually use it you should -drop a policy for an exact call you want to register which will redirect the -call to dom0. - -.. code-block:: none - :caption: /etc/qubes-rpc/policy/policy.RegisterArgument+example.Perform - - backendvm $anyvm allow,target=dom0 - -It will generate, for argument ``EXAMPLE``: - -.. code-bloc:: none - :caption: /etc/qubes-rpc/policy/example.Perform+EXAMPLE - - frontendvm backendvm allow -''' - -import logging -import os -import string -import sys -import pathlib - -POLICY_PATH = pathlib.Path('/etc/qubes-rpc/policy') -POLICY_RULE = '{frontend} {backend} allow\n' - -# linux-utils/qrexec-lib/qrexec.h -MAX_ARGUMENT_LEN = 64 - -# core-admin-linux/qrexec/qrexec-daemon.c -VALID_CHARS = set(map(ord, string.ascii_letters + string.digits + '-._')) - -def die(*args, **kwargs): - logging.error(*args, **kwargs) - sys.exit(1) - -def main(): - # pylint: disable=missing-docstring - logging.basicConfig( - level=logging.WARNING, - filename='/var/log/qubes/policy-register.log', - format='%(asctime)s %(message)s') - - backend = os.environ['QREXEC_REMOTE_DOMAIN'] - frontend = os.environ['QREXEC_REQUESTED_TARGET'] - rpcname = os.environ['QREXEC_SERVICE_ARGUMENT'] - - logging.debug('%s %s → %s request, reading argument', - rpcname, frontend, backend) - - untrusted_argument = sys.stdin.buffer.read(MAX_ARGUMENT_LEN) - untrusted_overflow = sys.stdin.buffer.read(1) - sys.stdin.buffer.close() - - if untrusted_overflow: - die('%s: %s → %s request refused: argument too long', - rpcname, frontend, backend) - - if not untrusted_argument: - die('%s: %s → %s request refused: empty argument', - rpcname, frontend, backend) - - if any(c not in VALID_CHARS for c in untrusted_argument): - die('%s: %s → %s request refused: invalid argument', - rpcname, frontend, backend) - - # argument may also be too long, so that length of rpcname, separator and - # argument exceed 64 bytes, but that's fine, the call just wont work - - argument = untrusted_argument - del untrusted_argument - argument = argument.decode('ascii', errors='strict') - - filename = '{}+{}'.format(rpcname, argument) - logging.debug('%s %s → %s argument %s filename %s', - rpcname, frontend, backend, argument, filename) - - try: - # the 'x' enforces that argument cannot be registered twice - with open(str(POLICY_PATH / filename), 'x') as file: - rule = POLICY_RULE.format(frontend=frontend, backend=backend) - logging.warning('%s: %s → %s %s argument allowed', - rpcname, frontend, backend, argument) - logging.debug('%s: %s → %s %s adding rule %r', - rpcname, frontend, backend, rule) - file.write(rule) - - except FileExistsError: - die('%s: %s → %s %s argument failed: file exists') - -if __name__ == '__main__': - main() diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 03a66da0..a7bc0952 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -1342,14 +1342,6 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.api_admin', 'qubes.tests.api_misc', 'qubes.tests.api_internal', - 'qubespolicy.tests', - 'qubespolicy.tests.cli', - ): - tests.addTests(loader.loadTestsFromName(modname)) - - for modname in ( - 'qubespolicy.tests.gtkhelpers', - 'qubespolicy.tests.rpcconfirmation', ): tests.addTests(loader.loadTestsFromName(modname)) diff --git a/qubespolicy/__init__.py b/qubespolicy/__init__.py deleted file mode 100755 index 20544ed3..00000000 --- a/qubespolicy/__init__.py +++ /dev/null @@ -1,765 +0,0 @@ -# coding=utf-8 -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2013-2015 Joanna Rutkowska -# Copyright (C) 2013-2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . - -# pylint: disable=no-else-return,useless-object-inheritance,try-except-raise - -''' Qrexec policy parser and evaluator ''' -import enum -import itertools -import json -import os -import os.path -import socket -import subprocess - -# don't import 'qubes.config' please, it takes 0.3s -QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client' -POLICY_DIR = '/etc/qubes-rpc/policy' -QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock' -QUBESD_SOCK = '/var/run/qubesd.sock' - - -class AccessDenied(Exception): - ''' Raised when qrexec policy denied access ''' - - -class PolicySyntaxError(AccessDenied): - ''' Syntax error in qrexec policy, abort parsing ''' - def __init__(self, filename, lineno, msg): - super(PolicySyntaxError, self).__init__( - '{}:{}: {}'.format(filename, lineno, msg)) - -class PolicyNotFound(AccessDenied): - ''' Policy was not found for this service ''' - def __init__(self, service_name): - super(PolicyNotFound, self).__init__( - 'Policy not found for service {}'.format(service_name)) - - -class Action(enum.Enum): - ''' Action as defined by policy ''' - allow = 1 - deny = 2 - ask = 3 - -def is_special_value(value): - '''Check if given source/target specification is special (keyword) value - ''' - return value.startswith('@') - -def verify_target_value(system_info, value): - ''' Check if given value names valid target - - This function check if given value is not only syntactically correct, - but also if names valid service call target (existing domain, - or valid @dispvm like keyword) - - :param system_info: information about the system - :param value: value to be checked - ''' - if value == '@dispvm': - return True - elif value == '@adminvm': - return True - elif value.startswith('@dispvm:'): - dispvm_base = value.split(':', 1)[1] - if dispvm_base not in system_info['domains']: - return False - dispvm_base_info = system_info['domains'][dispvm_base] - return bool(dispvm_base_info['template_for_dispvms']) - else: - return value in system_info['domains'] - - -def verify_special_value(value, for_target=True, specific_target=False): - ''' - Verify if given special VM-specifier ('@...') is valid - - :param value: value to verify - :param for_target: should classify target-only values as valid ( - '@default', '@dispvm') - :param specific_target: allow only values naming specific target - (for use with target=, default= etc) - :return: True or False - ''' - # pylint: disable=too-many-return-statements - - # values used only for matching VMs, not naming specific one (for actual - # call target) - if not specific_target: - if value.startswith('@tag:') and len(value) > len('@tag:'): - return True - if value.startswith('@type:') and len(value) > len('@type:'): - return True - if for_target and value.startswith('@dispvm:@tag:') and \ - len(value) > len('@dispvm:@tag:'): - return True - if value == '@anyvm': - return True - if for_target and value == '@default': - return True - - # those can be used to name one specific call VM - if value == '@adminvm': - return True - # allow only specific dispvm, not based on any @xxx keyword - don't name - # @tag here specifically, to work also with any future keywords - if for_target and value.startswith('@dispvm:') and \ - not value.startswith('@dispvm:@'): - return True - if for_target and value == '@dispvm': - return True - return False - - -class PolicyRule(object): - ''' A single line of policy file ''' - def __init__(self, line, filename=None, lineno=None): - ''' - Load a single line of qrexec policy and check its syntax. - Do not verify existence of named objects. - - :raise PolicySyntaxError: when syntax error is found - - :param line: a single line of actual qrexec policy (not a comment, - empty line or @include) - :param filename: name of the file from which this line is loaded - :param lineno: line number from which this line is loaded - ''' - - self.lineno = lineno - self.filename = filename - - try: - self.source, self.target, self.full_action = line.split(maxsplit=2) - except ValueError: - raise PolicySyntaxError(filename, lineno, 'wrong number of fields') - - (action, *params) = self.full_action.replace(',', ' ').split() - try: - self.action = Action[action] - except KeyError: - raise PolicySyntaxError(filename, lineno, - 'invalid action: {}'.format(action)) - - #: alternative target, used instead of the one specified by the caller - self.override_target = None - - #: alternative user, used instead of vm.default_user - self.override_user = None - - #: default target when asking the user for confirmation - self.default_target = None - - for param in params: - try: - param_name, value = param.split('=') - except ValueError: - raise PolicySyntaxError(filename, lineno, - 'invalid action parameter syntax: {}'.format(param)) - if param_name == 'target': - if self.action == Action.deny: - raise PolicySyntaxError(filename, lineno, - 'target= option not allowed for deny action') - self.override_target = value - elif param_name == 'user': - if self.action == Action.deny: - raise PolicySyntaxError(filename, lineno, - 'user= option not allowed for deny action') - self.override_user = value - elif param_name == 'default_target': - if self.action != Action.ask: - raise PolicySyntaxError(filename, lineno, - 'default_target= option allowed only for ask action') - self.default_target = value - else: - raise PolicySyntaxError(filename, lineno, - 'invalid option {} for {} action'.format(param, action)) - - # verify special values - if is_special_value(self.source): - if not verify_special_value(self.source, False, False): - raise PolicySyntaxError(filename, lineno, - 'invalid source specification: {}'.format(self.source)) - - if is_special_value(self.target): - if not verify_special_value(self.target, True, False): - raise PolicySyntaxError(filename, lineno, - 'invalid target specification: {}'.format(self.target)) - - if self.target == '@default' \ - and self.action == Action.allow \ - and self.override_target is None: - raise PolicySyntaxError(filename, lineno, - 'allow action for @default rule must specify target= option') - - if self.override_target is not None: - if is_special_value(self.override_target) and \ - not verify_special_value(self.override_target, True, True): - raise PolicySyntaxError(filename, lineno, - 'target= option needs to name specific target') - - if self.default_target is not None: - if is_special_value(self.default_target) and \ - not verify_special_value(self.default_target, True, True): - raise PolicySyntaxError(filename, lineno, - 'target= option needs to name specific target') - - @staticmethod - def is_match_single(system_info, policy_value, value): - ''' - Evaluate if a single value (VM name or '@default') matches policy - specification - - :param system_info: information about the system - :param policy_value: value from qrexec policy (either self.source or - self.target) - :param value: value to be compared (source or target) - :return: True or False - ''' - # pylint: disable=too-many-return-statements - - # not specified target matches only with @default and @anyvm policy - # entry - if value == '@default': - return policy_value in ('@default', '@anyvm') - - # if specific target used, check if it's valid - # this function (is_match_single) is also used for checking call source - # values, but this isn't a problem, because it will always be a - # domain name (not @dispvm or such) - this is guaranteed by a nature - # of qrexec call - if not verify_target_value(system_info, value): - return False - - # handle @adminvm keyword - if policy_value == 'dom0': - # TODO: log a warning in Qubes 4.1 - policy_value = '@adminvm' - - if value == 'dom0': - value = '@adminvm' - - # allow any _valid_, non-dom0 target - if policy_value == '@anyvm': - return value != '@adminvm' - - # exact match, including @dispvm* and @adminvm - if value == policy_value: - return True - - # DispVM request, using tags to match - if policy_value.startswith('@dispvm:@tag:') \ - and value.startswith('@dispvm:'): - tag = policy_value.split(':', 2)[2] - dispvm_base = value.split(':', 1)[1] - # already checked for existence by verify_target_value call - dispvm_base_info = system_info['domains'][dispvm_base] - return tag in dispvm_base_info['tags'] - - # if @dispvm* not matched above, reject it; default DispVM (bare - # @dispvm) was resolved by the caller - if value.startswith('@dispvm:'): - return False - - # require @adminvm to be matched explicitly (not through @tag or @type) - # - if not matched already, reject it - if value == '@adminvm': - return False - - # at this point, value name a specific target - domain_info = system_info['domains'][value] - - if policy_value.startswith('@tag:'): - tag = policy_value.split(':', 1)[1] - return tag in domain_info['tags'] - - if policy_value.startswith('@type:'): - type_ = policy_value.split(':', 1)[1] - return type_ == domain_info['type'] - - return False - - def is_match(self, system_info, source, target): - ''' - Check if given (source, target) matches this policy line. - - :param system_info: information about the system - available VMs, - their types, labels, tags etc. as returned by - :py:func:`app_to_system_info` - :param source: name of the source VM - :param target: name of the target VM, or None if not specified - :return: True or False - ''' - - if not self.is_match_single(system_info, self.source, source): - return False - # @dispvm in policy matches _only_ @dispvm (but not @dispvm:some-vm, - # even if that would be the default one) - if self.target == '@dispvm' and target == '@dispvm': - return True - if target == '@dispvm': - # resolve default DispVM, to check all kinds of @dispvm:* - default_dispvm = system_info['domains'][source]['default_dispvm'] - if default_dispvm is None: - # if this VM have no default DispVM, match only with @anyvm - return self.target == '@anyvm' - target = '@dispvm:' + default_dispvm - if not self.is_match_single(system_info, self.target, target): - return False - return True - - def expand_target(self, system_info): - ''' - Return domains matching target of this policy line - - :param system_info: information about the system - :return: matching domains - ''' - - if self.target.startswith('@tag:'): - tag = self.target.split(':', 1)[1] - for name, domain in system_info['domains'].items(): - if tag in domain['tags']: - yield name - elif self.target.startswith('@type:'): - type_ = self.target.split(':', 1)[1] - for name, domain in system_info['domains'].items(): - if type_ == domain['type']: - yield name - elif self.target == '@anyvm': - for name, domain in system_info['domains'].items(): - if name != 'dom0': - yield name - if domain['template_for_dispvms']: - yield '@dispvm:' + name - yield '@dispvm' - elif self.target.startswith('@dispvm:@tag:'): - tag = self.target.split(':', 2)[2] - for name, domain in system_info['domains'].items(): - if tag in domain['tags']: - if domain['template_for_dispvms']: - yield '@dispvm:' + name - elif self.target.startswith('@dispvm:'): - dispvm_base = self.target.split(':', 1)[1] - try: - if system_info['domains'][dispvm_base]['template_for_dispvms']: - yield self.target - except KeyError: - # TODO log a warning? - pass - elif self.target == '@adminvm': - yield self.target - elif self.target == '@dispvm': - yield self.target - else: - if self.target in system_info['domains']: - yield self.target - - def expand_override_target(self, system_info, source): - ''' - Replace '@dispvm' with specific '@dispvm:...' value, based on qrexec - call source. - - :param system_info: System information - :param source: Source domain name - :return: :py:attr:`override_target` with '@dispvm' substituted - ''' - if self.override_target == '@dispvm': - if system_info['domains'][source]['default_dispvm'] is None: - return None - return '@dispvm:' + system_info['domains'][source]['default_dispvm'] - else: - return self.override_target - - -class PolicyAction(object): - ''' Object representing positive policy evaluation result - - either ask or allow action ''' - def __init__(self, service, source, target, rule, original_target, - targets_for_ask=None): - #: service name - self.service = service - #: calling domain - self.source = source - #: target domain the service should be connected to, None if - # not chosen yet - if targets_for_ask is None or target in targets_for_ask: - self.target = target - else: - # TODO: log a warning? - self.target = None - #: original target specified by the caller - self.original_target = original_target - #: targets for the user to choose from - self.targets_for_ask = targets_for_ask - #: policy rule from which this action is derived - self.rule = rule - if rule.action == Action.deny: - # this should be really rejected by Policy.eval() - raise AccessDenied( - 'denied by policy {}:{}'.format(rule.filename, rule.lineno)) - if rule.action == Action.ask: - if targets_for_ask is None: - raise AccessDenied( - 'invalid policy {}:{}'.format(rule.filename, rule.lineno)) - elif rule.action == Action.allow: - if targets_for_ask is not None or target is None: - raise AccessDenied( - 'invalid policy {}:{}'.format(rule.filename, rule.lineno)) - self.action = rule.action - - def handle_user_response(self, response, target=None): - ''' - Handle user response for the 'ask' action - - :param response: whether the call was allowed or denied (bool) - :param target: target chosen by the user (if reponse==True) - :return: None - ''' - assert self.action == Action.ask - if response: - assert target in self.targets_for_ask - self.target = target - self.action = Action.allow - else: - self.action = Action.deny - raise AccessDenied( - 'denied by the user {}:{}'.format(self.rule.filename, - self.rule.lineno)) - - def execute(self, caller_ident): - ''' Execute allowed service call - - :param caller_ident: Service caller ident - (`process_ident,source_name, source_id`) - ''' - assert self.action == Action.allow - assert self.target is not None - - if self.target == '@adminvm': - self.target = 'dom0' - if self.target == 'dom0': - original_target_type = \ - 'keyword' if is_special_value(self.original_target) else 'name' - original_target = self.original_target.lstrip('@') - cmd = \ - 'QUBESRPC {service} {source} {original_target_type} ' \ - '{original_target}'.format( - service=self.service, - source=self.source, - original_target_type=original_target_type, - original_target=original_target) - else: - cmd = '{user}:QUBESRPC {service} {source}'.format( - user=(self.rule.override_user or 'DEFAULT'), - service=self.service, - source=self.source) - if self.target.startswith('@dispvm:'): - target = self.spawn_dispvm() - dispvm = True - else: - target = self.target - dispvm = False - self.ensure_target_running() - qrexec_opts = ['-d', target, '-c', caller_ident] - if dispvm: - qrexec_opts.append('-W') - # XXX remove when #951 gets fixed - if self.source == target: - raise AccessDenied('loopback qrexec connection not supported') - try: - subprocess.call([QREXEC_CLIENT] + qrexec_opts + [cmd]) - finally: - if dispvm: - self.cleanup_dispvm(target) - - - def spawn_dispvm(self): - ''' - Create and start Disposable VM based on AppVM specified in - :py:attr:`target` - :return: name of new Disposable VM - ''' - base_appvm = self.target.split(':', 1)[1] - dispvm_name = qubesd_call(base_appvm, 'admin.vm.CreateDisposable') - dispvm_name = dispvm_name.decode('ascii') - qubesd_call(dispvm_name, 'admin.vm.Start') - return dispvm_name - - def ensure_target_running(self): - ''' - Start domain if not running already - - :return: None - ''' - if self.target == 'dom0': - return - try: - qubesd_call(self.target, 'admin.vm.Start') - except QubesMgmtException as e: - if e.exc_type == 'QubesVMNotHaltedError': - pass - else: - raise - - @staticmethod - def cleanup_dispvm(dispvm): - ''' - Kill and remove Disposable VM - - :param dispvm: name of Disposable VM - :return: None - ''' - qubesd_call(dispvm, 'admin.vm.Kill') - - -class Policy(object): - ''' Full policy for a given service - - Usage: - >>> system_info = get_system_info() - >>> policy = Policy('some-service') - >>> action = policy.evaluate(system_info, 'source-name', 'target-name') - >>> if action.action == Action.ask: - >>> # ... ask the user, see action.targets_for_ask ... - >>> action.handle_user_response(response, target_chosen_by_user) - >>> action.execute('process-ident') - - ''' - - def __init__(self, service, policy_dir=POLICY_DIR): - policy_file = os.path.join(policy_dir, service) - if not os.path.exists(policy_file): - # fallback to policy without specific argument set (if any) - policy_file = os.path.join(policy_dir, service.split('+')[0]) - if not os.path.exists(policy_file): - raise PolicyNotFound(service) - - #: policy storage directory - self.policy_dir = policy_dir - - #: service name - self.service = service - - #: list of PolicyLine objects - self.policy_rules = [] - try: - self.load_policy_file(policy_file) - except OSError as e: - raise AccessDenied( - 'failed to load {} file: {!s}'.format(e.filename, e)) - - def load_policy_file(self, path): - ''' Load policy file and append rules to :py:attr:`policy_rules` - - :param path: file to load - ''' - with open(path) as policy_file: - for lineno, line in zip(itertools.count(start=1), - policy_file.readlines()): - line = line.strip() - # compatibility with old keywords notation - line = line.replace('$', '@') - if not line: - # skip empty lines - continue - if line[0] == '#': - # skip comments - continue - if line.startswith('@include:'): - include_path = line.split(':', 1)[1] - # os.path.join will leave include_path unchanged if it's - # already absolute - include_path = os.path.join(self.policy_dir, include_path) - self.load_policy_file(include_path) - else: - self.policy_rules.append(PolicyRule(line, path, lineno)) - - def find_matching_rule(self, system_info, source, target): - ''' Find the first rule matching given arguments ''' - - for rule in self.policy_rules: - if rule.is_match(system_info, source, target): - return rule - raise AccessDenied('no matching rule found') - - - def collect_targets_for_ask(self, system_info, source): - ''' Collect targets the user can choose from in 'ask' action - - Word 'targets' is used intentionally instead of 'domains', because it - can also contains @dispvm like keywords. - ''' - targets = set() - - # iterate over rules in reversed order to easier handle 'deny' - # actions - simply remove matching domains from allowed set - for rule in reversed(self.policy_rules): - if rule.is_match_single(system_info, rule.source, source): - if rule.action == Action.deny: - targets -= set(rule.expand_target(system_info)) - else: - if rule.override_target is not None: - override_target = rule.expand_override_target( - system_info, source) - if verify_target_value(system_info, override_target): - targets.add(rule.override_target) - else: - targets.update(rule.expand_target(system_info)) - - # expand default DispVM - if '@dispvm' in targets: - targets.remove('@dispvm') - if system_info['domains'][source]['default_dispvm'] is not None: - dispvm = '@dispvm:' + \ - system_info['domains'][source]['default_dispvm'] - if verify_target_value(system_info, dispvm): - targets.add(dispvm) - - # expand other keywords - if '@adminvm' in targets: - targets.remove('@adminvm') - targets.add('dom0') - - # XXX remove when #951 gets fixed - if source in targets: - targets.remove(source) - - return targets - - def evaluate(self, system_info, source, target): - ''' Evaluate policy - - :raise AccessDenied: when action should be denied unconditionally - - :return tuple(rule, considered_targets) - where considered targets is a - list of possible targets for 'ask' action (rule.action == Action.ask) - ''' - if target == '': - target = '@default' - rule = self.find_matching_rule(system_info, source, target) - if rule.action == Action.deny: - raise AccessDenied( - 'denied by policy {}:{}'.format(rule.filename, rule.lineno)) - - if rule.override_target is not None: - override_target = rule.expand_override_target(system_info, source) - if not verify_target_value(system_info, override_target): - raise AccessDenied('invalid target= value in {}:{}'.format( - rule.filename, rule.lineno)) - actual_target = override_target - else: - actual_target = target - - if rule.action == Action.ask: - if rule.override_target is not None: - targets = [actual_target] - else: - targets = list( - self.collect_targets_for_ask(system_info, source)) - if not targets: - raise AccessDenied( - 'policy define \'ask\' action at {}:{} but no target is ' - 'available to choose from'.format( - rule.filename, rule.lineno)) - return PolicyAction(self.service, source, rule.default_target, - rule, target, targets) - elif rule.action == Action.allow: - if actual_target == '@default': - raise AccessDenied( - 'policy define \'allow\' action at {}:{} but no target is ' - 'specified by caller or policy'.format( - rule.filename, rule.lineno)) - if actual_target == '@dispvm': - if system_info['domains'][source]['default_dispvm'] is None: - raise AccessDenied( - 'policy define \'allow\' action to @dispvm at {}:{} ' - 'but no DispVM base is set for this VM'.format( - rule.filename, rule.lineno)) - actual_target = '@dispvm:' + \ - system_info['domains'][source]['default_dispvm'] - - return PolicyAction(self.service, source, - actual_target, rule, target) - else: - # should be unreachable - raise AccessDenied( - 'invalid action?! {}:{}'.format(rule.filename, rule.lineno)) - - -class QubesMgmtException(Exception): - ''' Exception returned by qubesd ''' - def __init__(self, exc_type): - super(QubesMgmtException, self).__init__() - self.exc_type = exc_type - - -def qubesd_call(dest, method, arg=None, payload=None): - if method.startswith('internal.'): - socket_path = QUBESD_INTERNAL_SOCK - else: - socket_path = QUBESD_SOCK - try: - client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client_socket.connect(socket_path) - except IOError: - # TODO: - raise - - # src, method, dest, arg - for call_arg in ('dom0', method, dest, arg): - if call_arg is not None: - client_socket.sendall(call_arg.encode('ascii')) - client_socket.sendall(b'\0') - if payload is not None: - client_socket.sendall(payload) - - client_socket.shutdown(socket.SHUT_WR) - - return_data = client_socket.makefile('rb').read() - if return_data.startswith(b'0\x00'): - return return_data[2:] - elif return_data.startswith(b'2\x00'): - (_, exc_type, _traceback, _format_string, _args) = \ - return_data.split(b'\x00', 4) - raise QubesMgmtException(exc_type.decode('ascii')) - else: - raise AssertionError( - 'invalid qubesd response: {!r}'.format(return_data)) - - -def get_system_info(): - ''' Get system information - - This retrieve information necessary to process qrexec policy. Returned - data is nested dict structure with this structure: - - - domains: - - ``: - - tags: list of tags - - type: domain type - - template_for_dispvms: should DispVM based on this VM be allowed - - default_dispvm: name of default AppVM for DispVMs started from here - - ''' - - system_info = qubesd_call('dom0', 'internal.GetSystemInfo') - return json.loads(system_info.decode('utf-8')) diff --git a/qubespolicy/agent.py b/qubespolicy/agent.py deleted file mode 100644 index 661ec8a1..00000000 --- a/qubespolicy/agent.py +++ /dev/null @@ -1,89 +0,0 @@ -# -*- encoding: utf8 -*- -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . - - -''' Agent running in user session, responsible for asking the user about policy -decisions.''' - -import pydbus -# pylint: disable=import-error,wrong-import-position -import gi -gi.require_version('Gtk', '3.0') -from gi.repository import GLib -# pylint: enable=import-error - -import qubespolicy.rpcconfirmation -import qubespolicy.policycreateconfirmation -# pylint: enable=wrong-import-position - -class PolicyAgent: - # pylint: disable=too-few-public-methods - dbus = """ - - - - - - - - - - - - - - - - - - """ - - @staticmethod - def Ask(source, service_name, targets, default_target, - icons): - # pylint: disable=invalid-name - entries_info = {} - for entry in icons: - entries_info[entry] = {} - entries_info[entry]['icon'] = icons.get(entry, None) - - response = qubespolicy.rpcconfirmation.confirm_rpc( - entries_info, source, service_name, - targets, default_target or None) - return response or '' - - @staticmethod - def ConfirmPolicyCreate(source, service_name): - # pylint: disable=invalid-name - - response = qubespolicy.policycreateconfirmation.confirm( - source, service_name) - return response - -def main(): - loop = GLib.MainLoop() - bus = pydbus.SystemBus() - obj = PolicyAgent() - bus.publish('org.qubesos.PolicyAgent', obj) - loop.run() - - -if __name__ == '__main__': - main() diff --git a/qubespolicy/cli.py b/qubespolicy/cli.py deleted file mode 100644 index 9c08d783..00000000 --- a/qubespolicy/cli.py +++ /dev/null @@ -1,144 +0,0 @@ -# -*- encoding: utf8 -*- -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -import argparse -import logging -import logging.handlers -import os - -import sys - -import qubespolicy - -parser = argparse.ArgumentParser(description="Evaluate qrexec policy") - -parser.add_argument("--assume-yes-for-ask", action="store_true", - dest="assume_yes_for_ask", default=False, - help="Allow run of service without confirmation if policy say 'ask'") -parser.add_argument("--just-evaluate", action="store_true", - dest="just_evaluate", default=False, - help="Do not run the service, only evaluate policy; " - "retcode=0 means 'allow'") -parser.add_argument('domain_id', metavar='src-domain-id', - help='Source domain ID (Xen ID or similar, not Qubes ID)') -parser.add_argument('domain', metavar='src-domain-name', - help='Source domain name') -parser.add_argument('target', metavar='dst-domain-name', - help='Target domain name') -parser.add_argument('service_name', metavar='service-name', - help='Service name') -parser.add_argument('process_ident', metavar='process-ident', - help='Qrexec process identifier - for connecting data channel') - - -def create_default_policy(service_name): - policy_file = os.path.join(qubespolicy.POLICY_DIR, service_name) - with open(policy_file, "w") as policy: - policy.write( - "## Policy file automatically created on first service call.\n") - policy.write( - "## Feel free to edit.\n") - policy.write("## Note that policy parsing stops at the first match\n") - policy.write("\n") - policy.write("## Please use a single # to start your custom comments\n") - policy.write("\n") - policy.write("@anyvm @anyvm ask\n") - - -def main(args=None): - args = parser.parse_args(args) - - # Add source domain information, required by qrexec-client for establishing - # connection - caller_ident = args.process_ident + "," + args.domain + "," + args.domain_id - log = logging.getLogger('qubespolicy') - log.setLevel(logging.INFO) - if not log.handlers: - handler = logging.handlers.SysLogHandler(address='/dev/log') - log.addHandler(handler) - log_prefix = 'qrexec: {}: {} -> {}:'.format( - args.service_name, args.domain, args.target) - try: - system_info = qubespolicy.get_system_info() - except qubespolicy.QubesMgmtException as e: - log.error('%s error getting system info: %s', log_prefix, str(e)) - return 1 - try: - try: - policy = qubespolicy.Policy(args.service_name) - except qubespolicy.PolicyNotFound: - service_name = args.service_name.split('+')[0] - import pydbus - bus = pydbus.SystemBus() - proxy = bus.get('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent') - create_policy = proxy.ConfirmPolicyCreate( - args.domain, service_name) - if create_policy: - create_default_policy(service_name) - policy = qubespolicy.Policy(args.service_name) - else: - raise - - action = policy.evaluate(system_info, args.domain, args.target) - if args.assume_yes_for_ask and action.action == qubespolicy.Action.ask: - action.action = qubespolicy.Action.allow - if args.just_evaluate: - return { - qubespolicy.Action.allow: 0, - qubespolicy.Action.deny: 1, - qubespolicy.Action.ask: 1, - }[action.action] - if action.action == qubespolicy.Action.ask: - # late import to save on time for allow/deny actions - import pydbus - bus = pydbus.SystemBus() - proxy = bus.get('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent') - - icons = {name: system_info['domains'][name]['icon'] - for name in system_info['domains'].keys()} - for dispvm_base in system_info['domains']: - if not (system_info['domains'][dispvm_base] - ['template_for_dispvms']): - continue - dispvm_api_name = '@dispvm:' + dispvm_base - icons[dispvm_api_name] = \ - system_info['domains'][dispvm_base]['icon'] - icons[dispvm_api_name] = \ - icons[dispvm_api_name].replace('app', 'disp') - - response = proxy.Ask(args.domain, args.service_name, - action.targets_for_ask, action.target or '', icons) - if response: - action.handle_user_response(True, response) - else: - action.handle_user_response(False) - log.info('%s allowed to %s', log_prefix, str(action.target)) - action.execute(caller_ident) - except qubespolicy.PolicySyntaxError as e: - log.error('%s error loading policy: %s', log_prefix, str(e)) - return 1 - except qubespolicy.AccessDenied as e: - log.info('%s denied: %s', log_prefix, str(e)) - return 1 - return 0 - -if __name__ == '__main__': - sys.exit(main()) diff --git a/qubespolicy/glade/PolicyCreateConfirmationWindow.glade b/qubespolicy/glade/PolicyCreateConfirmationWindow.glade deleted file mode 100644 index 14e1994f..00000000 --- a/qubespolicy/glade/PolicyCreateConfirmationWindow.glade +++ /dev/null @@ -1,141 +0,0 @@ - - - - - - False - Default service policy - dialog-warning - dialog - question - ok-cancel - - - False - vertical - - - False - end - - - False - True - 0 - - - - - True - False - vertical - 2 - - - True - False - Policy for requested service does not exist. -Do you want to create default one (ask for everything)? - - - False - True - 2 - 0 - - - - - True - False - 2 - 2 - - - True - False - 4 - Source: - - - 0 - 0 - - - - - True - False - 4 - Service: - - - 0 - 1 - - - - - True - False - True - False - - - 1 - 0 - - - - - True - False - True - False - - - 1 - 1 - - - - - True - False - Type capital YES to confirm: - - - 0 - 2 - - - - - True - True - True - False - - - 1 - 2 - - - - - False - False - 1 - - - - - False - True - 1 - - - - - - diff --git a/qubespolicy/glade/RPCConfirmationWindow.glade b/qubespolicy/glade/RPCConfirmationWindow.glade deleted file mode 100644 index 0c0ccba5..00000000 --- a/qubespolicy/glade/RPCConfirmationWindow.glade +++ /dev/null @@ -1,359 +0,0 @@ - - - - - - 400 - False - Operation execution - center - dialog-question - dialog - True - center - - - True - False - vertical - - - True - False - True - error - True - - - False - 6 - end - - - - - - False - False - 0 - - - - - False - 16 - - - True - False - gtk-dialog-error - - - False - True - 0 - - - - - True - False - ErrorMessage - - - False - True - 1 - - - - - False - False - 0 - - - - - False - True - 0 - - - - - 100 - 80 - True - False - 12 - 12 - 12 - 12 - True - vertical - 6 - - - True - False - 6 - end - - - gtk-cancel - True - True - True - True - - - True - True - 0 - - - - - gtk-ok - True - False - True - True - True - True - - - True - True - 1 - - - - - False - True - end - 1 - - - - - True - False - vertical - 6 - - - True - False - 6 - 12 - - - True - False - gtk-dialog-question - 6 - - - False - True - 0 - - - - - True - False - start - Do you want to allow the following operation? -<small>Select the target domain and confirm with 'OK'</small> - True - - - True - True - 1 - - - - - False - True - 0 - - - - - True - False - 12 - 6 - 12 - 6 - - - True - False - 1 - Target: - - - 0 - 2 - - - - - True - False - True - True - - - True - 5 - False - Start typing or use the arrow - GTK_INPUT_HINT_WORD_COMPLETION | GTK_INPUT_HINT_NONE - - - - - 1 - 2 - - - - - True - False - 1 - Source: - - - 0 - 0 - - - - - True - False - False - source - False - - - 1 - 0 - - - - - True - False - 1 - Operation: - - - 0 - 1 - - - - - True - False - 0 - qubes.<b>MyOperation</b> - True - - - 1 - 1 - - - - - False - True - 1 - - - - - True - True - - - True - False - 6 - vertical - 6 - - - Display templates in the target list - True - True - False - 0 - True - - - False - True - 0 - - - - - Choose a custom destination in the target - True - True - False - 0 - True - - - False - True - 1 - - - - - - - False - Advanced options - True - - - - - False - True - 2 - - - - - True - True - 2 - - - - - False - True - 1 - - - - - - diff --git a/qubespolicy/graph.py b/qubespolicy/graph.py deleted file mode 100644 index 90136e5b..00000000 --- a/qubespolicy/graph.py +++ /dev/null @@ -1,121 +0,0 @@ -# -*- encoding: utf8 -*- -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . - -import argparse -import json -import os - -import sys - -import qubespolicy - -parser = argparse.ArgumentParser(description='Graph qrexec policy') -parser.add_argument('--include-ask', action='store_true', - help='Include `ask` action in graph') -parser.add_argument('--source', action='store', nargs='+', - help='Limit graph to calls from *source*') -parser.add_argument('--target', action='store', nargs='+', - help='Limit graph to calls to *target*') -parser.add_argument('--service', action='store', nargs='+', - help='Limit graph to *service*') -parser.add_argument('--output', action='store', - help='Write to *output* instead of stdout') -parser.add_argument('--policy-dir', action='store', - default=qubespolicy.POLICY_DIR, - help='Look for policy in *policy-dir*') -parser.add_argument('--system-info', action='store', - help='Load system information from file instead of querying qubesd') -parser.add_argument('--skip-labels', action='store_true', - help='Do not include service names on the graph, also deduplicate ' - 'connections.') - -def handle_single_action(args, action): - '''Get single policy action and output (or not) a line to add''' - if args.skip_labels: - service = '' - else: - service = action.service - target = action.target or action.original_target - # handle forced target= - if action.rule.override_target: - target = action.rule.override_target - if args.target and target not in args.target: - return '' - if action.action == qubespolicy.Action.ask: - if args.include_ask: - return ' "{}" -> "{}" [label="{}" color=orange];\n'.format( - action.source, target, service) - elif action.action == qubespolicy.Action.allow: - return ' "{}" -> "{}" [label="{}" color=red];\n'.format( - action.source, target, service) - return '' - -def main(args=None): - args = parser.parse_args(args) - - output = sys.stdout - if args.output: - output = open(args.output, 'w') - - if args.system_info: - with open(args.system_info) as f_system_info: - system_info = json.load(f_system_info) - else: - system_info = qubespolicy.get_system_info() - - sources = list(system_info['domains'].keys()) - if args.source: - sources = args.source - - targets = list(system_info['domains'].keys()) - targets.append('@dispvm') - targets.extend('@dispvm:' + dom for dom in system_info['domains'] - if system_info['domains'][dom]['template_for_dispvms']) - - connections = set() - - output.write('digraph g {\n') - for service in os.listdir(args.policy_dir): - if os.path.isdir(os.path.join(args.policy_dir, service)): - continue - if args.service and service not in args.service and \ - not any(service.startswith(srv + '+') for srv in args.service): - continue - - policy = qubespolicy.Policy(service, args.policy_dir) - for source in sources: - for target in targets: - try: - action = policy.evaluate(system_info, source, target) - line = handle_single_action(args, action) - if line in connections: - continue - if line: - output.write(line) - connections.add(line) - except qubespolicy.AccessDenied: - continue - - output.write('}\n') - if args.output: - output.close() - -if __name__ == '__main__': - sys.exit(main()) diff --git a/qubespolicy/gtkhelpers.py b/qubespolicy/gtkhelpers.py deleted file mode 100644 index b53ec002..00000000 --- a/qubespolicy/gtkhelpers.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/python -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2017 boring-stuff -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -# - -import itertools -# pylint: disable=import-error,wrong-import-position -import gi -gi.require_version('Gtk', '3.0') -from gi.repository import Gtk, Gdk, GdkPixbuf, GObject, GLib -# pylint: enable=import-error - -from qubespolicy.utils import sanitize_domain_name -# pylint: enable=wrong-import-position - -class VMListModeler: - def __init__(self, domains_info=None): - self._entries = {} - self._domains_info = domains_info - self._icons = {} - self._icon_size = 16 - self._theme = Gtk.IconTheme.get_default() - self._create_entries() - - def _get_icon(self, name): - if name not in self._icons: - try: - icon = self._theme.load_icon(name, self._icon_size, 0) - except GLib.Error: # pylint: disable=catching-non-exception - icon = self._theme.load_icon("edit-find", self._icon_size, 0) - - self._icons[name] = icon - - return self._icons[name] - - def _create_entries(self): - for name, vm in self._domains_info.items(): - if name.startswith('@dispvm:'): - vm_name = name[len('@dispvm:'):] - dispvm = True - else: - vm_name = name - dispvm = False - sanitize_domain_name(vm_name, assert_sanitized=True) - - icon = self._get_icon(vm.get('icon', None)) - - if dispvm: - display_name = 'Disposable VM ({})'.format(vm_name) - else: - display_name = vm_name - self._entries[display_name] = { - 'api_name': name, - 'icon': icon, - 'vm': vm} - - def _get_valid_qube_name(self, combo, entry_box, whitelist): - name = None - - if combo and combo.get_active_id(): - selected = combo.get_active_id() - - if selected in self._entries and \ - self._entries[selected]['api_name'] in whitelist: - name = selected - - if not name and entry_box: - typed = entry_box.get_text() - - if typed in self._entries and \ - self._entries[typed]['api_name'] in whitelist: - name = typed - - return name - - def _combo_change(self, selection_trigger, combo, entry_box, whitelist): - data = None - name = self._get_valid_qube_name(combo, entry_box, whitelist) - - if name: - entry = self._entries[name] - - data = entry['api_name'] - - if entry_box: - entry_box.set_icon_from_pixbuf( - Gtk.EntryIconPosition.PRIMARY, entry['icon']) - else: - if entry_box: - entry_box.set_icon_from_stock( - Gtk.EntryIconPosition.PRIMARY, "gtk-find") - - if selection_trigger: - selection_trigger(data) - - def _entry_activate(self, activation_trigger, combo, entry_box, whitelist): - name = self._get_valid_qube_name(combo, entry_box, whitelist) - - if name: - activation_trigger(entry_box) - - def apply_model(self, destination_object, vm_list, - selection_trigger=None, activation_trigger=None): - if isinstance(destination_object, Gtk.ComboBox): - list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf, str) - - for entry_no, display_name in zip(itertools.count(), - sorted(self._entries)): - entry = self._entries[display_name] - if entry['api_name'] in vm_list: - list_store.append([ - entry_no, - display_name, - entry['icon'], - entry['api_name'], - ]) - - destination_object.set_model(list_store) - destination_object.set_id_column(1) - - icon_column = Gtk.CellRendererPixbuf() - destination_object.pack_start(icon_column, False) - destination_object.add_attribute(icon_column, "pixbuf", 2) - destination_object.set_entry_text_column(1) - - if destination_object.get_has_entry(): - entry_box = destination_object.get_child() - - area = Gtk.CellAreaBox() - area.pack_start(icon_column, False, False, False) - area.add_attribute(icon_column, "pixbuf", 2) - - completion = Gtk.EntryCompletion.new_with_area(area) - completion.set_inline_selection(True) - completion.set_inline_completion(True) - completion.set_popup_completion(True) - completion.set_popup_single_match(False) - completion.set_model(list_store) - completion.set_text_column(1) - - entry_box.set_completion(completion) - if activation_trigger: - entry_box.connect("activate", - lambda entry: self._entry_activate( - activation_trigger, - destination_object, - entry, - vm_list)) - - # A Combo with an entry has a text column already - text_column = destination_object.get_cells()[0] - destination_object.reorder(text_column, 1) - else: - entry_box = None - - text_column = Gtk.CellRendererText() - destination_object.pack_start(text_column, False) - destination_object.add_attribute(text_column, "text", 1) - - changed_function = lambda combo: self._combo_change( - selection_trigger, - combo, - entry_box, - vm_list) - - destination_object.connect("changed", changed_function) - changed_function(destination_object) - - else: - raise TypeError( - "Only expecting Gtk.ComboBox objects to want our model.") - - def apply_icon(self, entry, qube_name): - if isinstance(entry, Gtk.Entry): - if qube_name in self._entries: - entry.set_icon_from_pixbuf( - Gtk.EntryIconPosition.PRIMARY, - self._entries[qube_name]['icon']) - else: - raise ValueError("The specified source qube does not exist!") - else: - raise TypeError( - "Only expecting Gtk.Entry objects to want our icon.") - - -class GtkOneTimerHelper: - # pylint: disable=too-few-public-methods - def __init__(self, wait_seconds): - self._wait_seconds = wait_seconds - self._current_timer_id = 0 - self._timer_completed = False - - def _invalidate_timer_completed(self): - self._timer_completed = False - - def _invalidate_current_timer(self): - self._current_timer_id += 1 - - def _timer_check_run(self, timer_id): - if self._current_timer_id == timer_id: - self._timer_run(timer_id) - self._timer_completed = True - else: - pass - - def _timer_run(self, timer_id): - raise NotImplementedError("Not yet implemented") - - def _timer_schedule(self): - self._invalidate_current_timer() - GObject.timeout_add(int(round(self._wait_seconds * 1000)), - self._timer_check_run, - self._current_timer_id) - - def _timer_has_completed(self): - return self._timer_completed - - -class FocusStealingHelper(GtkOneTimerHelper): - def __init__(self, window, target_button, wait_seconds=1): - GtkOneTimerHelper.__init__(self, wait_seconds) - self._window = window - self._target_button = target_button - - self._window.connect("window-state-event", self._window_state_event) - - self._target_sensitivity = False - self._target_button.set_sensitive(self._target_sensitivity) - - def _window_changed_focus(self, window_is_focused): - self._target_button.set_sensitive(False) - self._invalidate_timer_completed() - - if window_is_focused: - self._timer_schedule() - else: - self._invalidate_current_timer() - - def _window_state_event(self, window, event): - assert window == self._window, \ - 'Window state callback called with wrong window' - - changed_focus = event.changed_mask & Gdk.WindowState.FOCUSED - window_focus = event.new_window_state & Gdk.WindowState.FOCUSED - - if changed_focus: - self._window_changed_focus(window_focus != 0) - - # Propagate event further - return False - - def _timer_run(self, timer_id): - self._target_button.set_sensitive(self._target_sensitivity) - - def request_sensitivity(self, sensitivity): - if self._timer_has_completed() or not sensitivity: - self._target_button.set_sensitive(sensitivity) - - self._target_sensitivity = sensitivity - - def can_perform_action(self): - return self._timer_has_completed() diff --git a/qubespolicy/policycreateconfirmation.py b/qubespolicy/policycreateconfirmation.py deleted file mode 100644 index 3aeec822..00000000 --- a/qubespolicy/policycreateconfirmation.py +++ /dev/null @@ -1,82 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . - -import os - -import pkg_resources - -# pylint: disable=import-error,wrong-import-position -import gi -gi.require_version('Gtk', '3.0') -from gi.repository import Gtk -# pylint: enable=import-error - -class PolicyCreateConfirmationWindow: - # pylint: disable=too-few-public-methods - _source_file = pkg_resources.resource_filename('qubespolicy', - os.path.join('glade', "PolicyCreateConfirmationWindow.glade")) - _source_id = {'window': "PolicyCreateConfirmationWindow", - 'ok': "okButton", - 'cancel': "cancelButton", - 'source': "sourceEntry", - 'service': "serviceEntry", - 'confirm': "confirmEntry", - } - - def __init__(self, source, service): - self._gtk_builder = Gtk.Builder() - self._gtk_builder.add_from_file(self._source_file) - self._window = self._gtk_builder.get_object( - self._source_id['window']) - self._rpc_ok_button = self._gtk_builder.get_object( - self._source_id['ok']) - self._rpc_cancel_button = self._gtk_builder.get_object( - self._source_id['cancel']) - self._service_entry = self._gtk_builder.get_object( - self._source_id['service']) - self._source_entry = self._gtk_builder.get_object( - self._source_id['source']) - self._confirm_entry = self._gtk_builder.get_object( - self._source_id['confirm']) - - self._source_entry.set_text(source) - self._service_entry.set_text(service) - - # make OK button the default - ok_button = self._window.get_widget_for_response(Gtk.ResponseType.OK) - ok_button.set_can_default(True) - ok_button.grab_default() - - def run(self): - self._window.set_keep_above(True) - self._window.connect("delete-event", Gtk.main_quit) - self._window.show_all() - - response = self._window.run() - - self._window.hide() - if response == Gtk.ResponseType.OK: - return self._confirm_entry.get_text() == 'YES' - return False - -def confirm(source, service): - window = PolicyCreateConfirmationWindow(source, service) - - return window.run() diff --git a/qubespolicy/rpcconfirmation.py b/qubespolicy/rpcconfirmation.py deleted file mode 100644 index 0a89eb97..00000000 --- a/qubespolicy/rpcconfirmation.py +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/python -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2017 boring-stuff -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -# -import os -from gi.repository import Gtk, Gdk, GLib # pylint: disable=import-error -import pkg_resources - -from qubespolicy.gtkhelpers import VMListModeler, FocusStealingHelper -from qubespolicy.utils import sanitize_domain_name, \ - sanitize_service_name - - -class RPCConfirmationWindow: - # pylint: disable=too-few-public-methods - _source_file = pkg_resources.resource_filename('qubespolicy', - os.path.join('glade', "RPCConfirmationWindow.glade")) - _source_id = {'window': "RPCConfirmationWindow", - 'ok': "okButton", - 'cancel': "cancelButton", - 'source': "sourceEntry", - 'rpc_label': "rpcLabel", - 'target': "TargetCombo", - 'error_bar': "ErrorBar", - 'error_message': "ErrorMessage", - } - - def _clicked_ok(self, source): - assert source is not None, \ - 'Called the clicked ok callback from no source object' - - if self._can_perform_action(): - self._confirmed = True - self._close() - - def _clicked_cancel(self, button): - assert button == self._rpc_cancel_button, \ - 'Called the clicked cancel callback through the wrong button' - - if self._can_perform_action(): - self._confirmed = False - self._close() - - def _key_pressed(self, window, key): - assert window == self._rpc_window, \ - 'Key pressed callback called with wrong window' - - if self._can_perform_action(): - if key.keyval == Gdk.KEY_Escape: - self._confirmed = False - self._close() - - def _update_ok_button_sensitivity(self, data): - valid = (data is not None) - - if valid: - self._target_name = data - else: - self._target_name = None - - self._focus_helper.request_sensitivity(valid) - - def _show_error(self, error_message): - self._error_message.set_text(error_message) - self._error_bar.set_visible(True) - - def _close_error(self, error_bar, response): - assert error_bar == self._error_bar, \ - 'Closed the error bar with the wrong error bar as parameter' - assert response is not None, \ - 'Closed the error bar with None as a response' - - self._error_bar.set_visible(False) - - def _set_initial_target(self, source, target): - if target is not None: - if target == source: - self._show_error( - "Source and target domains must not be the same.") - else: - model = self._rpc_combo_box.get_model() - - found = False - for item in model: - if item[3] == target: - found = True - - self._rpc_combo_box.set_active_iter( - model.get_iter(item.path)) - - break - - if not found: - self._show_error("Domain '%s' doesn't exist." % target) - - def _can_perform_action(self): - return self._focus_helper.can_perform_action() - - @staticmethod - def _escape_and_format_rpc_text(rpc_operation): - escaped = GLib.markup_escape_text(rpc_operation) - - partitioned = escaped.partition('.') - formatted = partitioned[0] + partitioned[1] - - if partitioned[2]: - formatted += "" + partitioned[2] + "" - else: - formatted = "" + formatted + "" - - return formatted - - def _connect_events(self): - self._rpc_window.connect("key-press-event", self._key_pressed) - self._rpc_ok_button.connect("clicked", self._clicked_ok) - self._rpc_cancel_button.connect("clicked", self._clicked_cancel) - - self._error_bar.connect("response", self._close_error) - - def __init__(self, entries_info, source, rpc_operation, targets_list, - target=None): - sanitize_domain_name(source, assert_sanitized=True) - sanitize_service_name(source, assert_sanitized=True) - - self._gtk_builder = Gtk.Builder() - self._gtk_builder.add_from_file(self._source_file) - self._rpc_window = self._gtk_builder.get_object( - self._source_id['window']) - self._rpc_ok_button = self._gtk_builder.get_object( - self._source_id['ok']) - self._rpc_cancel_button = self._gtk_builder.get_object( - self._source_id['cancel']) - self._rpc_label = self._gtk_builder.get_object( - self._source_id['rpc_label']) - self._source_entry = self._gtk_builder.get_object( - self._source_id['source']) - self._rpc_combo_box = self._gtk_builder.get_object( - self._source_id['target']) - self._error_bar = self._gtk_builder.get_object( - self._source_id['error_bar']) - self._error_message = self._gtk_builder.get_object( - self._source_id['error_message']) - self._target_name = None - - self._focus_helper = self._new_focus_stealing_helper() - - self._rpc_label.set_markup( - self._escape_and_format_rpc_text(rpc_operation)) - - self._entries_info = entries_info - list_modeler = self._new_vm_list_modeler() - - list_modeler.apply_model(self._rpc_combo_box, targets_list, - selection_trigger=self._update_ok_button_sensitivity, - activation_trigger=self._clicked_ok) - - self._source_entry.set_text(source) - list_modeler.apply_icon(self._source_entry, source) - - self._confirmed = None - - self._set_initial_target(source, target) - - self._connect_events() - - def _close(self): - self._rpc_window.close() - - def _show(self): - self._rpc_window.set_keep_above(True) - self._rpc_window.connect("delete-event", Gtk.main_quit) - self._rpc_window.show_all() - - Gtk.main() - - def _new_vm_list_modeler(self): - return VMListModeler(self._entries_info) - - def _new_focus_stealing_helper(self): - return FocusStealingHelper( - self._rpc_window, - self._rpc_ok_button, - 1) - - def confirm_rpc(self): - self._show() - - if self._confirmed: - return self._target_name - return False - - -def confirm_rpc(entries_info, source, rpc_operation, targets_list, target=None): - window = RPCConfirmationWindow(entries_info, source, rpc_operation, - targets_list, target) - - return window.confirm_rpc() diff --git a/qubespolicy/tests/__init__.py b/qubespolicy/tests/__init__.py deleted file mode 100644 index 495efca5..00000000 --- a/qubespolicy/tests/__init__.py +++ /dev/null @@ -1,919 +0,0 @@ -# -*- encoding: utf8 -*- -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -import os -import socket -import unittest.mock - -import shutil - -import qubes.tests -import qubespolicy - -tmp_policy_dir = '/tmp/policy' - -system_info = { - 'domains': { - 'dom0': { - 'tags': ['dom0-tag'], - 'type': 'AdminVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': False, - }, - 'test-vm1': { - 'tags': ['tag1', 'tag2'], - 'type': 'AppVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': False, - }, - 'test-vm2': { - 'tags': ['tag2'], - 'type': 'AppVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': False, - }, - 'test-vm3': { - 'tags': ['tag3'], - 'type': 'AppVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': True, - }, - 'default-dvm': { - 'tags': [], - 'type': 'AppVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': True, - }, - 'test-invalid-dvm': { - 'tags': ['tag1', 'tag2'], - 'type': 'AppVM', - 'default_dispvm': 'test-vm1', - 'template_for_dispvms': False, - }, - 'test-no-dvm': { - 'tags': ['tag1', 'tag2'], - 'type': 'AppVM', - 'default_dispvm': None, - 'template_for_dispvms': False, - }, - 'test-template': { - 'tags': ['tag1', 'tag2'], - 'type': 'TemplateVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': False, - }, - 'test-standalone': { - 'tags': ['tag1', 'tag2'], - 'type': 'StandaloneVM', - 'default_dispvm': 'default-dvm', - 'template_for_dispvms': False, - }, - } -} - - -class TC_00_PolicyRule(qubes.tests.QubesTestCase): - def test_000_verify_target_value(self): - self.assertTrue( - qubespolicy.verify_target_value(system_info, 'test-vm1')) - self.assertTrue( - qubespolicy.verify_target_value(system_info, 'default-dvm')) - self.assertTrue( - qubespolicy.verify_target_value(system_info, '@dispvm')) - self.assertTrue( - qubespolicy.verify_target_value(system_info, '@dispvm:default-dvm')) - self.assertTrue( - qubespolicy.verify_target_value(system_info, 'test-template')) - self.assertTrue( - qubespolicy.verify_target_value(system_info, 'test-standalone')) - self.assertTrue( - qubespolicy.verify_target_value(system_info, '@adminvm')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, 'no-such-vm')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, - '@dispvm:test-invalid-dvm')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '@dispvm:test-vm1')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '@default')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '@anyvm')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '@tag:tag1')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '@dispvm:@tag:tag1')) - self.assertFalse( - qubespolicy.verify_target_value(system_info, '@invalid')) - - def test_010_verify_special_value(self): - self.assertTrue(qubespolicy.verify_special_value('@tag:tag', - for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('@tag:other-tag', - for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('@type:AppVM', - for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('@adminvm', - for_target=False)) - self.assertTrue(qubespolicy.verify_special_value('@dispvm:some-vm', - for_target=True)) - self.assertTrue(qubespolicy.verify_special_value('@dispvm:@tag:tag1', - for_target=True)) - self.assertFalse(qubespolicy.verify_special_value('@default', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('@dispvm', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('@dispvm:some-vm', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('@dispvm:@tag:tag1', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('@invalid', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('vm-name', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('@tag:', - for_target=False)) - self.assertFalse(qubespolicy.verify_special_value('@type:', - for_target=False)) - - def test_020_line_simple(self): - line = qubespolicy.PolicyRule('@anyvm @anyvm ask', 'filename', 12) - self.assertEqual(line.filename, 'filename') - self.assertEqual(line.lineno, 12) - self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '@anyvm') - self.assertEqual(line.target, '@anyvm') - self.assertEqual(line.full_action, 'ask') - self.assertIsNone(line.override_target) - self.assertIsNone(line.override_user) - self.assertIsNone(line.default_target) - - def test_021_line_simple(self): - # also check spaces in action field - line = qubespolicy.PolicyRule( - '@tag:tag1 @type:AppVM ask, target=test-vm2, user=user', - 'filename', 12) - self.assertEqual(line.filename, 'filename') - self.assertEqual(line.lineno, 12) - self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '@tag:tag1') - self.assertEqual(line.target, '@type:AppVM') - self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user') - self.assertEqual(line.override_target, 'test-vm2') - self.assertEqual(line.override_user, 'user') - self.assertIsNone(line.default_target) - - def test_022_line_simple(self): - line = qubespolicy.PolicyRule( - '@anyvm @default allow,target=@dispvm:test-vm2', - 'filename', 12) - self.assertEqual(line.filename, 'filename') - self.assertEqual(line.lineno, 12) - self.assertEqual(line.action, qubespolicy.Action.allow) - self.assertEqual(line.source, '@anyvm') - self.assertEqual(line.target, '@default') - self.assertEqual(line.full_action, 'allow,target=@dispvm:test-vm2') - self.assertEqual(line.override_target, '@dispvm:test-vm2') - self.assertIsNone(line.override_user) - self.assertIsNone(line.default_target) - - def test_023_line_simple(self): - line = qubespolicy.PolicyRule( - '@anyvm @default ask,default_target=test-vm1', - 'filename', 12) - self.assertEqual(line.filename, 'filename') - self.assertEqual(line.lineno, 12) - self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '@anyvm') - self.assertEqual(line.target, '@default') - self.assertEqual(line.full_action, 'ask,default_target=test-vm1') - self.assertIsNone(line.override_target) - self.assertIsNone(line.override_user) - self.assertEqual(line.default_target, 'test-vm1') - - def test_024_line_simple(self): - line = qubespolicy.PolicyRule( - '@anyvm @adminvm ask,default_target=@adminvm', - 'filename', 12) - self.assertEqual(line.filename, 'filename') - self.assertEqual(line.lineno, 12) - self.assertEqual(line.action, qubespolicy.Action.ask) - self.assertEqual(line.source, '@anyvm') - self.assertEqual(line.target, '@adminvm') - self.assertEqual(line.full_action, 'ask,default_target=@adminvm') - self.assertIsNone(line.override_target) - self.assertIsNone(line.override_user) - self.assertEqual(line.default_target, '@adminvm') - - def test_030_line_invalid(self): - invalid_lines = [ - '@dispvm @default allow', # @dispvm can't be a source - '@default @default allow', # @default can't be a source - '@anyvm @default allow,target=@dispvm:@tag:tag1', # @dispvm:@tag - # as override target - '@anyvm @default allow,target=@tag:tag1', # @tag as override target - '@anyvm @default deny,target=test-vm1', # target= used with deny - '@anyvm @anyvm deny,default_target=test-vm1', # default_target= - # with deny - '@anyvm @anyvm deny,user=user', # user= with deny - '@anyvm @anyvm invalid', # invalid action - '@anyvm @anyvm allow,invalid=xx', # invalid option - '@anyvm @anyvm', # missing action - '@anyvm @anyvm allow,default_target=test-vm1', # default_target= - # with allow - '@invalid @anyvm allow', # invalid source - '@anyvm @invalid deny', # invalid target - '', # empty line - '@anyvm @anyvm allow extra', # trailing words - '@anyvm @default allow', # @default allow without target= - ] - for line in invalid_lines: - with self.subTest(line): - with self.assertRaises(qubespolicy.PolicySyntaxError): - qubespolicy.PolicyRule(line, 'filename', 12) - - def test_040_match_single(self): - is_match_single = qubespolicy.PolicyRule.is_match_single - self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '@anyvm', '@default')) - self.assertTrue(is_match_single(system_info, '@default', '@default')) - self.assertTrue(is_match_single(system_info, '@tag:tag1', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '@type:AppVM', 'test-vm1')) - self.assertTrue(is_match_single(system_info, - '@type:TemplateVM', 'test-template')) - self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm')) - self.assertTrue(is_match_single(system_info, - '@anyvm', '@dispvm:default-dvm')) - self.assertTrue(is_match_single(system_info, '@dispvm', '@dispvm')) - self.assertTrue(is_match_single(system_info, - '@dispvm:@tag:tag3', '@dispvm:test-vm3')) - self.assertTrue(is_match_single(system_info, '@adminvm', '@adminvm')) - self.assertTrue(is_match_single(system_info, '@adminvm', 'dom0')) - self.assertTrue(is_match_single(system_info, 'dom0', '@adminvm')) - self.assertTrue(is_match_single(system_info, 'dom0', 'dom0')) - self.assertTrue(is_match_single(system_info, - '@dispvm:default-dvm', '@dispvm:default-dvm')) - self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm')) - self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) - self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1')) - - self.assertFalse(is_match_single(system_info, '@default', 'test-vm1')) - self.assertFalse(is_match_single(system_info, '@tag:tag1', 'test-vm3')) - self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm')) - # test-vm1.template_for_dispvms=False - self.assertFalse(is_match_single(system_info, - '@anyvm', '@dispvm:test-vm1')) - # test-vm1.template_for_dispvms=False - self.assertFalse(is_match_single(system_info, - '@dispvm:test-vm1', '@dispvm:test-vm1')) - self.assertFalse(is_match_single(system_info, - '@dispvm:@tag:tag1', '@dispvm:test-vm1')) - # test-vm3 has not tag1 - self.assertFalse(is_match_single(system_info, - '@dispvm:@tag:tag1', '@dispvm:test-vm3')) - # default-dvm has no tag3 - self.assertFalse(is_match_single(system_info, - '@dispvm:@tag:tag3', '@dispvm:default-dvm')) - self.assertFalse(is_match_single(system_info, '@anyvm', 'dom0')) - self.assertFalse(is_match_single(system_info, '@anyvm', '@adminvm')) - self.assertFalse(is_match_single(system_info, - '@tag:dom0-tag', '@adminvm')) - self.assertFalse(is_match_single(system_info, - '@type:AdminVM', '@adminvm')) - self.assertFalse(is_match_single(system_info, - '@tag:dom0-tag', 'dom0')) - self.assertFalse(is_match_single(system_info, - '@type:AdminVM', 'dom0')) - self.assertFalse(is_match_single(system_info, '@tag:tag1', 'dom0')) - self.assertFalse(is_match_single(system_info, '@anyvm', '@tag:tag1')) - self.assertFalse(is_match_single(system_info, '@anyvm', '@type:AppVM')) - self.assertFalse(is_match_single(system_info, '@anyvm', '@invalid')) - self.assertFalse(is_match_single(system_info, '@invalid', '@invalid')) - self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm')) - self.assertFalse(is_match_single(system_info, - 'no-such-vm', 'no-such-vm')) - self.assertFalse(is_match_single(system_info, '@dispvm', 'test-vm1')) - self.assertFalse(is_match_single(system_info, '@dispvm', 'default-dvm')) - self.assertFalse(is_match_single(system_info, - '@dispvm:default-dvm', 'default-dvm')) - self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1\n')) - self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1 ')) - - def test_050_match(self): - line = qubespolicy.PolicyRule('@anyvm @anyvm allow') - self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2')) - line = qubespolicy.PolicyRule('@anyvm @anyvm allow') - self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2')) - line = qubespolicy.PolicyRule('@anyvm @anyvm allow') - self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm')) - line = qubespolicy.PolicyRule('@anyvm @dispvm allow') - self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm')) - line = qubespolicy.PolicyRule('@anyvm @dispvm allow') - self.assertFalse(line.is_match(system_info, - 'test-vm1', '@dispvm:default-dvm')) - line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow') - self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm')) - line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow') - self.assertTrue(line.is_match(system_info, - 'test-vm1', '@dispvm:default-dvm')) - line = qubespolicy.PolicyRule('@anyvm @dispvm:@tag:tag3 allow') - self.assertTrue(line.is_match(system_info, - 'test-vm1', '@dispvm:test-vm3')) - - def test_060_expand_target(self): - lines = { - '@anyvm @anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3', - '@dispvm:test-vm3', - 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', - 'test-no-dvm', 'test-template', 'test-standalone', '@dispvm'], - '@anyvm @dispvm allow': ['@dispvm'], - '@anyvm @dispvm:default-dvm allow': ['@dispvm:default-dvm'], - # no DispVM from test-vm1 allowed - '@anyvm @dispvm:test-vm1 allow': [], - '@anyvm @dispvm:test-vm3 allow': ['@dispvm:test-vm3'], - '@anyvm @dispvm:@tag:tag1 allow': [], - '@anyvm @dispvm:@tag:tag3 allow': ['@dispvm:test-vm3'], - '@anyvm test-vm1 allow': ['test-vm1'], - '@anyvm @type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3', - 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'], - '@anyvm @type:TemplateVM allow': ['test-template'], - '@anyvm @tag:tag1 allow': ['test-vm1', 'test-invalid-dvm', - 'test-template', 'test-standalone', 'test-no-dvm'], - '@anyvm @tag:tag2 allow': ['test-vm1', 'test-vm2', - 'test-invalid-dvm', 'test-template', 'test-standalone', - 'test-no-dvm'], - '@anyvm @tag:no-such-tag allow': [], - } - for line in lines: - with self.subTest(line): - policy_line = qubespolicy.PolicyRule(line) - self.assertCountEqual(list(policy_line.expand_target(system_info)), - lines[line]) - - def test_070_expand_override_target(self): - line = qubespolicy.PolicyRule( - '@anyvm @anyvm allow,target=test-vm2') - self.assertEqual( - line.expand_override_target(system_info, 'test-vm1'), - 'test-vm2') - - def test_071_expand_override_target_dispvm(self): - line = qubespolicy.PolicyRule( - '@anyvm @anyvm allow,target=@dispvm') - self.assertEqual( - line.expand_override_target(system_info, 'test-vm1'), - '@dispvm:default-dvm') - - def test_072_expand_override_target_dispvm_specific(self): - line = qubespolicy.PolicyRule( - '@anyvm @anyvm allow,target=@dispvm:test-vm3') - self.assertEqual( - line.expand_override_target(system_info, 'test-vm1'), - '@dispvm:test-vm3') - - def test_073_expand_override_target_dispvm_none(self): - line = qubespolicy.PolicyRule( - '@anyvm @anyvm allow,target=@dispvm') - self.assertEqual( - line.expand_override_target(system_info, 'test-no-dvm'), - None) - - def test_074_expand_override_target_dom0(self): - line = qubespolicy.PolicyRule( - '@anyvm @anyvm allow,target=dom0') - self.assertEqual( - line.expand_override_target(system_info, 'test-no-dvm'), - 'dom0') - - def test_075_expand_override_target_dom0(self): - line = qubespolicy.PolicyRule( - '@anyvm @anyvm allow,target=@adminvm') - self.assertEqual( - line.expand_override_target(system_info, 'test-no-dvm'), - '@adminvm') - - -class TC_10_PolicyAction(qubes.tests.QubesTestCase): - def test_000_init(self): - rule = qubespolicy.PolicyRule('@anyvm @anyvm deny') - with self.assertRaises(qubespolicy.AccessDenied): - qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', - rule, 'test-vm2') - - def test_001_init(self): - rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) - self.assertEqual(action.service, 'test.service') - self.assertEqual(action.source, 'test-vm1') - self.assertIsNone(action.target) - self.assertEqual(action.original_target, 'test-vm2') - self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3']) - self.assertEqual(action.rule, rule) - self.assertEqual(action.action, qubespolicy.Action.ask) - - def test_002_init_invalid(self): - rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask') - rule_allow = qubespolicy.PolicyRule('@anyvm @anyvm allow') - with self.assertRaises(qubespolicy.AccessDenied): - qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule_allow, 'test-vm2', None) - with self.assertRaises(qubespolicy.AccessDenied): - qubespolicy.PolicyAction('test.service', 'test-vm1', - 'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3']) - - with self.assertRaises(qubespolicy.AccessDenied): - qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule_ask, 'test-vm2', None) - - def test_003_init_default_target(self): - rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask') - - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'test-vm1', rule_ask, 'test-vm2', ['test-vm2']) - self.assertIsNone(action.target) - - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'test-vm2', rule_ask, 'test-vm2', ['test-vm2']) - self.assertEqual(action.target, 'test-vm2') - - def test_010_handle_user_response(self): - rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) - action.handle_user_response(True, 'test-vm2') - self.assertEqual(action.action, qubespolicy.Action.allow) - self.assertEqual(action.target, 'test-vm2') - - def test_011_handle_user_response(self): - rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) - with self.assertRaises(AssertionError): - action.handle_user_response(True, 'test-no-dvm') - - def test_012_handle_user_response(self): - rule = qubespolicy.PolicyRule('@anyvm @anyvm ask') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) - with self.assertRaises(qubespolicy.AccessDenied): - action.handle_user_response(False, None) - self.assertEqual(action.action, qubespolicy.Action.deny) - - def test_013_handle_user_response_with_default_target(self): - rule = qubespolicy.PolicyRule( - '@anyvm @anyvm ask,default_target=test-vm2') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) - action.handle_user_response(True, 'test-vm2') - self.assertEqual(action.action, qubespolicy.Action.allow) - self.assertEqual(action.target, 'test-vm2') - - @unittest.mock.patch('qubespolicy.qubesd_call') - @unittest.mock.patch('subprocess.call') - def test_020_execute(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('@anyvm @anyvm allow') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'test-vm2', rule, 'test-vm2') - action.execute('some-ident') - self.assertEqual(mock_qubesd_call.mock_calls, - [unittest.mock.call('test-vm2', 'admin.vm.Start')]) - self.assertEqual(mock_subprocess.mock_calls, - [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2', - '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])]) - - @unittest.mock.patch('qubespolicy.qubesd_call') - @unittest.mock.patch('subprocess.call') - def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('@anyvm dom0 allow') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'dom0', rule, 'dom0') - action.execute('some-ident') - self.assertEqual(mock_qubesd_call.mock_calls, []) - self.assertEqual(mock_subprocess.mock_calls, - [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0', - '-c', 'some-ident', - 'QUBESRPC test.service test-vm1 name dom0'])]) - - @unittest.mock.patch('qubespolicy.qubesd_call') - @unittest.mock.patch('subprocess.call') - def test_021_execute_dom0_keyword(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('@anyvm dom0 allow') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'dom0', rule, '@adminvm') - action.execute('some-ident') - self.assertEqual(mock_qubesd_call.mock_calls, []) - self.assertEqual(mock_subprocess.mock_calls, - [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0', - '-c', 'some-ident', - 'QUBESRPC test.service test-vm1 keyword adminvm'])]) - - @unittest.mock.patch('qubespolicy.qubesd_call') - @unittest.mock.patch('subprocess.call') - def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call): - rule = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - '@dispvm:default-dvm', rule, '@dispvm:default-dvm') - mock_qubesd_call.side_effect = (lambda target, call: - b'dispvm-name' if call == 'admin.vm.CreateDisposable' else - unittest.mock.DEFAULT) - action.execute('some-ident') - self.assertEqual(mock_qubesd_call.mock_calls, - [unittest.mock.call('default-dvm', 'admin.vm.CreateDisposable'), - unittest.mock.call('dispvm-name', 'admin.vm.Start'), - unittest.mock.call('dispvm-name', 'admin.vm.Kill')]) - self.assertEqual(mock_subprocess.mock_calls, - [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name', - '-c', 'some-ident', '-W', - 'DEFAULT:QUBESRPC test.service test-vm1'])]) - - @unittest.mock.patch('qubespolicy.qubesd_call') - @unittest.mock.patch('subprocess.call') - def test_023_execute_already_running(self, mock_subprocess, - mock_qubesd_call): - rule = qubespolicy.PolicyRule('@anyvm @anyvm allow') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'test-vm2', rule, 'test-vm2') - mock_qubesd_call.side_effect = \ - qubespolicy.QubesMgmtException('QubesVMNotHaltedError') - action.execute('some-ident') - self.assertEqual(mock_qubesd_call.mock_calls, - [unittest.mock.call('test-vm2', 'admin.vm.Start')]) - self.assertEqual(mock_subprocess.mock_calls, - [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2', - '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])]) - - @unittest.mock.patch('qubespolicy.qubesd_call') - @unittest.mock.patch('subprocess.call') - def test_024_execute_startup_error(self, mock_subprocess, - mock_qubesd_call): - rule = qubespolicy.PolicyRule('@anyvm @anyvm allow') - action = qubespolicy.PolicyAction('test.service', 'test-vm1', - 'test-vm2', rule, 'test-vm2') - mock_qubesd_call.side_effect = \ - qubespolicy.QubesMgmtException('QubesVMError') - with self.assertRaises(qubespolicy.QubesMgmtException): - action.execute('some-ident') - self.assertEqual(mock_qubesd_call.mock_calls, - [unittest.mock.call('test-vm2', 'admin.vm.Start')]) - self.assertEqual(mock_subprocess.mock_calls, []) - -class TC_20_Policy(qubes.tests.QubesTestCase): - - def setUp(self): - super(TC_20_Policy, self).setUp() - if not os.path.exists(tmp_policy_dir): - os.mkdir(tmp_policy_dir) - - def tearDown(self): - shutil.rmtree(tmp_policy_dir) - super(TC_20_Policy, self).tearDown() - - def test_000_load(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('\n') - f.write('# comment\n') - f.write('test-vm2 test-vm3 ask\n') - f.write(' # comment \n') - f.write('@anyvm @anyvm ask\n') - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - self.assertEqual(policy.service, 'test.service') - self.assertEqual(len(policy.policy_rules), 3) - self.assertEqual(policy.policy_rules[0].source, 'test-vm1') - self.assertEqual(policy.policy_rules[0].target, 'test-vm2') - self.assertEqual(policy.policy_rules[0].action, - qubespolicy.Action.allow) - - def test_001_not_existent(self): - with self.assertRaises(qubespolicy.AccessDenied): - qubespolicy.Policy('no-such.service', tmp_policy_dir) - - def test_002_include(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('@include:test.service2\n') - f.write('@anyvm @anyvm deny\n') - with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f: - f.write('test-vm3 @default allow,target=test-vm2\n') - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - self.assertEqual(policy.service, 'test.service') - self.assertEqual(len(policy.policy_rules), 3) - self.assertEqual(policy.policy_rules[0].source, 'test-vm1') - self.assertEqual(policy.policy_rules[0].target, 'test-vm2') - self.assertEqual(policy.policy_rules[0].action, - qubespolicy.Action.allow) - self.assertEqual(policy.policy_rules[0].filename, - tmp_policy_dir + '/test.service') - self.assertEqual(policy.policy_rules[0].lineno, 1) - self.assertEqual(policy.policy_rules[1].source, 'test-vm3') - self.assertEqual(policy.policy_rules[1].target, '@default') - self.assertEqual(policy.policy_rules[1].action, - qubespolicy.Action.allow) - self.assertEqual(policy.policy_rules[1].filename, - tmp_policy_dir + '/test.service2') - self.assertEqual(policy.policy_rules[1].lineno, 1) - self.assertEqual(policy.policy_rules[2].source, '@anyvm') - self.assertEqual(policy.policy_rules[2].target, '@anyvm') - self.assertEqual(policy.policy_rules[2].action, - qubespolicy.Action.deny) - self.assertEqual(policy.policy_rules[2].filename, - tmp_policy_dir + '/test.service') - self.assertEqual(policy.policy_rules[2].lineno, 3) - - def test_003_load_convert(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm2 test-vm3 ask\n') - f.write(' # comment \n') - f.write('$anyvm $dispvm ask,default_target=$dispvm\n') - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - self.assertEqual(policy.service, 'test.service') - self.assertEqual(len(policy.policy_rules), 2) - self.assertEqual(policy.policy_rules[1].source, '@anyvm') - self.assertEqual(policy.policy_rules[1].target, '@dispvm') - self.assertEqual(policy.policy_rules[1].action, - qubespolicy.Action.ask) - self.assertEqual(policy.policy_rules[1].default_target, - '@dispvm') - - def test_010_find_rule(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 @anyvm ask\n') - f.write('test-vm2 @tag:tag1 deny\n') - f.write('test-vm2 @tag:tag2 allow\n') - f.write('test-vm2 @dispvm:@tag:tag3 allow\n') - f.write('test-vm2 @dispvm:@tag:tag2 allow\n') - f.write('test-vm2 @dispvm:default-dvm allow\n') - f.write('@type:AppVM @default allow,target=test-vm3\n') - f.write('@tag:tag1 @type:AppVM allow\n') - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2]) - # @anyvm matches @default too - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm1', '@default'), policy.policy_rules[1]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', '@default'), policy.policy_rules[7]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', '@dispvm:test-vm3'), - policy.policy_rules[4]) - self.assertEqual(policy.find_matching_rule( - system_info, 'test-vm2', '@dispvm'), - policy.policy_rules[6]) - with self.assertRaises(qubespolicy.AccessDenied): - policy.find_matching_rule( - system_info, 'test-no-dvm', 'test-standalone') - with self.assertRaises(qubespolicy.AccessDenied): - policy.find_matching_rule(system_info, 'test-no-dvm', '@dispvm') - with self.assertRaises(qubespolicy.AccessDenied): - policy.find_matching_rule( - system_info, 'test-standalone', '@default') - - def test_020_collect_targets_for_ask(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 @anyvm ask\n') - f.write('test-vm2 @tag:tag1 deny\n') - f.write('test-vm2 @tag:tag2 allow\n') - f.write('test-no-dvm @type:AppVM deny\n') - f.write('@type:AppVM @default allow,target=test-vm3\n') - f.write('@tag:tag1 @type:AppVM allow\n') - f.write('test-no-dvm @dispvm allow\n') - f.write('test-standalone @dispvm allow\n') - f.write('test-standalone @adminvm allow\n') - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - self.assertCountEqual(policy.collect_targets_for_ask(system_info, - 'test-vm1'), ['test-vm2', 'test-vm3', - '@dispvm:test-vm3', - 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', - 'test-no-dvm', 'test-template', 'test-standalone']) - self.assertCountEqual(policy.collect_targets_for_ask(system_info, - 'test-vm2'), ['test-vm3']) - self.assertCountEqual(policy.collect_targets_for_ask(system_info, - 'test-vm3'), []) - self.assertCountEqual(policy.collect_targets_for_ask(system_info, - 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3', - 'default-dvm', 'test-no-dvm', 'test-invalid-dvm', - '@dispvm:default-dvm', 'dom0']) - self.assertCountEqual(policy.collect_targets_for_ask(system_info, - 'test-no-dvm'), []) - - def test_030_eval_simple(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-vm1', 'test-vm2') - self.assertEqual(action.rule, policy.policy_rules[0]) - self.assertEqual(action.action, qubespolicy.Action.allow) - self.assertEqual(action.target, 'test-vm2') - self.assertEqual(action.original_target, 'test-vm2') - self.assertEqual(action.service, 'test.service') - self.assertIsNone(action.targets_for_ask) - with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-vm2', '@default') - - def test_031_eval_default(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 @default allow,target=test-vm2\n') - f.write('@tag:tag1 test-vm2 ask\n') - f.write('@tag:tag2 @anyvm allow\n') - f.write('test-vm3 @anyvm deny\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-vm1', '@default') - self.assertEqual(action.rule, policy.policy_rules[1]) - self.assertEqual(action.action, qubespolicy.Action.allow) - self.assertEqual(action.target, 'test-vm2') - self.assertEqual(action.original_target, '@default') - self.assertEqual(action.service, 'test.service') - self.assertIsNone(action.targets_for_ask) - with self.assertRaises(qubespolicy.AccessDenied): - # action allow should hit, but no target specified (either by - # caller or policy) - policy.evaluate(system_info, 'test-standalone', '@default') - - def test_032_eval_ask(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 @default allow,target=test-vm2\n') - f.write('@tag:tag1 test-vm2 ask\n') - f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n') - f.write('@tag:tag2 @anyvm allow\n') - f.write('test-vm3 @anyvm deny\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-standalone', 'test-vm2') - self.assertEqual(action.rule, policy.policy_rules[2]) - self.assertEqual(action.action, qubespolicy.Action.ask) - self.assertIsNone(action.target) - self.assertEqual(action.original_target, 'test-vm2') - self.assertEqual(action.service, 'test.service') - self.assertCountEqual(action.targets_for_ask, - ['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3', - 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', - 'test-no-dvm', 'test-template']) - - def test_033_eval_ask(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm1 test-vm2 allow\n') - f.write('test-vm1 @default allow,target=test-vm2\n') - f.write('@tag:tag1 test-vm2 ask\n') - f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n') - f.write('@tag:tag2 @anyvm allow\n') - f.write('test-vm3 @anyvm deny\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-standalone', 'test-vm3') - self.assertEqual(action.rule, policy.policy_rules[3]) - self.assertEqual(action.action, qubespolicy.Action.ask) - self.assertEqual(action.target, 'test-vm3') - self.assertEqual(action.original_target, 'test-vm3') - self.assertEqual(action.service, 'test.service') - self.assertCountEqual(action.targets_for_ask, - ['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3', - 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm', - 'test-no-dvm', 'test-template']) - - def test_034_eval_resolve_dispvm(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm3 @dispvm allow\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - action = policy.evaluate(system_info, 'test-vm3', '@dispvm') - self.assertEqual(action.rule, policy.policy_rules[0]) - self.assertEqual(action.action, qubespolicy.Action.allow) - self.assertEqual(action.target, '@dispvm:default-dvm') - self.assertEqual(action.original_target, '@dispvm') - self.assertEqual(action.service, 'test.service') - self.assertIsNone(action.targets_for_ask) - - def test_035_eval_resolve_dispvm_fail(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-no-dvm @dispvm allow\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-no-dvm', '@dispvm') - - def test_036_eval_invalid_override_target(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm3 @anyvm allow,target=no-such-vm\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-vm3', '@default') - - def test_037_eval_ask_no_targets(self): - with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: - f.write('test-vm3 @default ask\n') - - policy = qubespolicy.Policy('test.service', tmp_policy_dir) - with self.assertRaises(qubespolicy.AccessDenied): - policy.evaluate(system_info, 'test-vm3', '@default') - - -class TC_30_Misc(qubes.tests.QubesTestCase): - @unittest.mock.patch('socket.socket') - def test_000_qubesd_call(self, mock_socket): - mock_config = { - 'return_value.makefile.return_value.read.return_value': b'0\x00data' - } - mock_socket.configure_mock(**mock_config) - result = qubespolicy.qubesd_call('test', 'internal.method') - self.assertEqual(result, b'data') - self.assertEqual(mock_socket.mock_calls, [ - unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), - unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), - unittest.mock.call().sendall(b'dom0'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'internal.method'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'test'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().shutdown(socket.SHUT_WR), - unittest.mock.call().makefile('rb'), - unittest.mock.call().makefile().read(), - ]) - - @unittest.mock.patch('socket.socket') - def test_001_qubesd_call_arg_payload(self, mock_socket): - mock_config = { - 'return_value.makefile.return_value.read.return_value': b'0\x00data' - } - mock_socket.configure_mock(**mock_config) - result = qubespolicy.qubesd_call('test', 'internal.method', 'arg', - b'payload') - self.assertEqual(result, b'data') - self.assertEqual(mock_socket.mock_calls, [ - unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), - unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), - unittest.mock.call().sendall(b'dom0'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'internal.method'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'test'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'arg'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'payload'), - unittest.mock.call().shutdown(socket.SHUT_WR), - unittest.mock.call().makefile('rb'), - unittest.mock.call().makefile().read(), - ]) - - @unittest.mock.patch('socket.socket') - def test_002_qubesd_call_exception(self, mock_socket): - mock_config = { - 'return_value.makefile.return_value.read.return_value': - b'2\x00SomeError\x00traceback\x00message\x00' - } - mock_socket.configure_mock(**mock_config) - with self.assertRaises(qubespolicy.QubesMgmtException) as e: - qubespolicy.qubesd_call('test', 'internal.method') - self.assertEqual(e.exception.exc_type, 'SomeError') - self.assertEqual(mock_socket.mock_calls, [ - unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), - unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), - unittest.mock.call().sendall(b'dom0'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'internal.method'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'test'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().sendall(b'\x00'), - unittest.mock.call().shutdown(socket.SHUT_WR), - unittest.mock.call().makefile('rb'), - unittest.mock.call().makefile().read(), - ]) - diff --git a/qubespolicy/tests/cli.py b/qubespolicy/tests/cli.py deleted file mode 100644 index 28941ec0..00000000 --- a/qubespolicy/tests/cli.py +++ /dev/null @@ -1,343 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# The Qubes OS Project, http://www.qubes-os.org -# -# Copyright (C) 2017 Marek Marczykowski-Górecki -# -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -import os -import tempfile -import unittest.mock - -import shutil - -import qubes.tests -import qubespolicy -import qubespolicy.cli -import qubespolicy.tests - - - -class TC_00_qrexec_policy(qubes.tests.QubesTestCase): - def setUp(self): - super(TC_00_qrexec_policy, self).setUp() - self.policy_patch = unittest.mock.patch('qubespolicy.Policy') - self.policy_mock = self.policy_patch.start() - - self.system_info_patch = unittest.mock.patch( - 'qubespolicy.get_system_info') - self.system_info_mock = self.system_info_patch.start() - - self.system_info = { - 'domains': {'dom0': {'icon': 'black', 'template_for_dispvms': False}, - 'test-vm1': {'icon': 'red', 'template_for_dispvms': False}, - 'test-vm2': {'icon': 'red', 'template_for_dispvms': False}, - 'test-vm3': {'icon': 'green', 'template_for_dispvms': True}, }} - self.system_info_mock.return_value = self.system_info - - self.dbus_patch = unittest.mock.patch('pydbus.SystemBus') - self.dbus_mock = self.dbus_patch.start() - - self.policy_dir = tempfile.TemporaryDirectory() - self.policydir_patch = unittest.mock.patch('qubespolicy.POLICY_DIR', - self.policy_dir.name) - self.policydir_patch.start() - - def tearDown(self): - self.policydir_patch.stop() - self.policy_dir.cleanup() - self.dbus_patch.start() - self.system_info_patch.stop() - self.policy_patch.stop() - super(TC_00_qrexec_policy, self).tearDown() - - def test_000_allow(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.allow, - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 0) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ('().evaluate().target.__str__', (), {}), - ('().evaluate().execute', ('process_ident,source,source-id', ), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, []) - - def test_010_ask_allow(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.ask, - 'return_value.evaluate.return_value.target': - None, - 'return_value.evaluate.return_value.targets_for_ask': - ['test-vm1', 'test-vm2'], - }) - self.dbus_mock.configure_mock(**{ - 'return_value.get.return_value.Ask.return_value': 'test-vm1' - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 0) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ('().evaluate().handle_user_response', (True, 'test-vm1'), {}), - ('().evaluate().execute', ('process_ident,source,source-id', ), {}), - ]) - icons = { - 'dom0': 'black', - 'test-vm1': 'red', - 'test-vm2': 'red', - 'test-vm3': 'green', - '@dispvm:test-vm3': 'green', - } - self.assertEqual(self.dbus_mock.mock_calls, [ - ('', (), {}), - ('().get', ('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent'), {}), - ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'], - '', icons), {}), - ]) - - def test_011_ask_deny(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.ask, - 'return_value.evaluate.return_value.target': - None, - 'return_value.evaluate.return_value.targets_for_ask': - ['test-vm1', 'test-vm2'], - 'return_value.evaluate.return_value.handle_user_response' - '.side_effect': - qubespolicy.AccessDenied, - }) - self.dbus_mock.configure_mock(**{ - 'return_value.get.return_value.Ask.return_value': '' - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 1) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ('().evaluate().handle_user_response', (False,), {}), - ]) - icons = { - 'dom0': 'black', - 'test-vm1': 'red', - 'test-vm2': 'red', - 'test-vm3': 'green', - '@dispvm:test-vm3': 'green', - } - self.assertEqual(self.dbus_mock.mock_calls, [ - ('', (), {}), - ('().get', ('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent'), {}), - ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'], - '', icons), {}), - ]) - - def test_012_ask_default_target(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.ask, - 'return_value.evaluate.return_value.target': - 'test-vm1', - 'return_value.evaluate.return_value.targets_for_ask': - ['test-vm1', 'test-vm2'], - }) - self.dbus_mock.configure_mock(**{ - 'return_value.get.return_value.Ask.return_value': 'test-vm1' - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 0) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ('().evaluate().handle_user_response', (True, 'test-vm1'), {}), - ('().evaluate().execute', ('process_ident,source,source-id',), {}), - ]) - icons = { - 'dom0': 'black', - 'test-vm1': 'red', - 'test-vm2': 'red', - 'test-vm3': 'green', - '@dispvm:test-vm3': 'green', - } - self.assertEqual(self.dbus_mock.mock_calls, [ - ('', (), {}), - ('().get', ('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent'), {}), - ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'], - 'test-vm1', icons), {}), - ]) - - def test_020_deny(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.deny, - 'return_value.evaluate.return_value.execute.side_effect': - qubespolicy.AccessDenied, - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 1) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ('().evaluate().target.__str__', (), {}), - ('().evaluate().execute', ('process_ident,source,source-id',), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, []) - - def test_030_just_evaluate_allow(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.allow, - }) - retval = qubespolicy.cli.main( - ['--just-evaluate', - 'source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 0) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, []) - - def test_031_just_evaluate_deny(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.deny, - }) - retval = qubespolicy.cli.main( - ['--just-evaluate', - 'source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 1) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, []) - - def test_032_just_evaluate_ask(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.ask, - }) - retval = qubespolicy.cli.main( - ['--just-evaluate', - 'source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 1) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, []) - - def test_033_just_evaluate_ask_assume_yes(self): - self.policy_mock.configure_mock(**{ - 'return_value.evaluate.return_value.action': - qubespolicy.Action.ask, - }) - retval = qubespolicy.cli.main( - ['--just-evaluate', '--assume-yes-for-ask', - 'source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 0) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, []) - - def test_040_create_policy(self): - self.policy_mock.configure_mock(**{ - 'side_effect': - [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT], - 'return_value.evaluate.return_value.action': - qubespolicy.Action.allow, - }) - self.dbus_mock.configure_mock(**{ - 'return_value.get.return_value.ConfirmPolicyCreate.return_value': - True - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 0) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ('', ('service',), {}), - ('().evaluate', (self.system_info, 'source', - 'target'), {}), - ('().evaluate().target.__str__', (), {}), - ('().evaluate().execute', ('process_ident,source,source-id',), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, [ - ('', (), {}), - ('().get', ('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent'), {}), - ('().get().ConfirmPolicyCreate', ('source', 'service'), {}), - ]) - policy_path = os.path.join(self.policy_dir.name, 'service') - self.assertTrue(os.path.exists(policy_path)) - with open(policy_path) as policy_file: - self.assertEqual(policy_file.read(), - "## Policy file automatically created on first service call.\n" - "## Feel free to edit.\n" - "## Note that policy parsing stops at the first match\n" - "\n" - "## Please use a single # to start your custom comments\n" - "\n" - "@anyvm @anyvm ask\n") - - def test_041_create_policy_abort(self): - self.policy_mock.configure_mock(**{ - 'side_effect': - [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT], - 'return_value.evaluate.return_value.action': - qubespolicy.Action.deny, - }) - self.dbus_mock.configure_mock(**{ - 'return_value.get.return_value.ConfirmPolicyCreate.return_value': - False - }) - retval = qubespolicy.cli.main( - ['source-id', 'source', 'target', 'service', 'process_ident']) - self.assertEqual(retval, 1) - self.assertEqual(self.policy_mock.mock_calls, [ - ('', ('service',), {}), - ]) - self.assertEqual(self.dbus_mock.mock_calls, [ - ('', (), {}), - ('().get', ('org.qubesos.PolicyAgent', - '/org/qubesos/PolicyAgent'), {}), - ('().get().ConfirmPolicyCreate', ('source', 'service'), {}), - ]) - policy_path = os.path.join(self.policy_dir.name, 'service') - self.assertFalse(os.path.exists(policy_path)) diff --git a/qubespolicy/tests/gtkhelpers.py b/qubespolicy/tests/gtkhelpers.py deleted file mode 100755 index 05b1971b..00000000 --- a/qubespolicy/tests/gtkhelpers.py +++ /dev/null @@ -1,409 +0,0 @@ -#!/usr/bin/python -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2017 boring-stuff -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -# - -import time -import unittest - -import gi # isort:skip -gi.require_version('Gtk', '3.0') # isort:skip -from gi.repository import Gtk # isort:skip pylint: - -from qubes.tests import skipUnlessEnv - -from qubespolicy.gtkhelpers import VMListModeler, GtkOneTimerHelper, \ - FocusStealingHelper - -mock_domains_info = { - 'dom0': {'icon': 'black', 'type': 'AdminVM'}, - 'test-red1': {'icon': 'red', 'type': 'AppVM'}, - 'test-red2': {'icon': 'red', 'type': 'AppVM'}, - 'test-red3': {'icon': 'red', 'type': 'AppVM'}, - 'test-source': {'icon': 'green', 'type': 'AppVM'}, - 'test-target': {'icon': 'orange', 'type': 'AppVM'}, - '@dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'}, -} - -mock_whitelist = ["test-red1", "test-red2", "test-red3", - "test-target", "@dispvm:test-disp6"] - -class MockComboEntry: - def __init__(self, text): - self._text = text - - def get_active_id(self): - return self._text - - def get_text(self): - return self._text - - -class GtkTestCase(unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - self._smallest_wait = 0.01 - - def flush_gtk_events(self, wait_seconds=0): - start = time.time() - iterations = 0 - remaining_wait = wait_seconds - time_length = 0 - - if wait_seconds < 0: - raise ValueError("Only non-negative intervals are allowed.") - - while remaining_wait >= 0: - while Gtk.events_pending(): - Gtk.main_iteration_do(blocking=False) - iterations += 1 - - time_length = time.time() - start - remaining_wait = wait_seconds - time_length - - if remaining_wait > 0: - time.sleep(self._smallest_wait) - - return iterations, time_length - - -@skipUnlessEnv('DISPLAY') -class VMListModelerTest(VMListModeler, unittest.TestCase): - def __init__(self, *args, **kwargs): - unittest.TestCase.__init__(self, *args, **kwargs) - VMListModeler.__init__(self, mock_domains_info) - - def test_entries_gets_loaded(self): - self.assertIsNotNone(self._entries) - - def test_valid_qube_name(self): - self.apply_model(Gtk.ComboBox(), list(mock_domains_info.keys())) - - for name in ["test-red1", "test-red2", "test-red3", - "test-target", "Disposable VM (test-disp6)"]: - - mock = MockComboEntry(name) - self.assertEquals(name, - self._get_valid_qube_name(mock, mock, mock_whitelist)) - self.assertEquals(name, - self._get_valid_qube_name(None, mock, mock_whitelist)) - self.assertEquals(name, - self._get_valid_qube_name(mock, None, mock_whitelist)) - self.assertIsNone( - self._get_valid_qube_name(None, None, mock_whitelist)) - - def test_valid_qube_name_whitelist(self): - list_exc = ["@dispvm:test-disp6", "test-red2"] - - whitelist = [name for name in mock_whitelist if name not in list_exc] - self.apply_model(Gtk.ComboBox(), whitelist) - - for name in list_exc: - mock = MockComboEntry(name) - self.assertIsNone(self._get_valid_qube_name(mock, mock, whitelist)) - self.assertIsNone(self._get_valid_qube_name(None, mock, whitelist)) - self.assertIsNone(self._get_valid_qube_name(mock, None, whitelist)) - - def test_invalid_qube_name(self): - self.apply_model(Gtk.ComboBox(), mock_whitelist) - - for name in ["test-nonexistant", None, "", 1]: - - mock = MockComboEntry(name) - self.assertIsNone( - self._get_valid_qube_name(mock, mock, mock_whitelist)) - self.assertIsNone( - self._get_valid_qube_name(None, mock, mock_whitelist)) - self.assertIsNone( - self._get_valid_qube_name(mock, None, mock_whitelist)) - - def test_apply_model(self): - new_object = Gtk.ComboBox() - self.assertIsNone(new_object.get_model()) - - self.apply_model(new_object, mock_whitelist) - - self.assertIsNotNone(new_object.get_model()) - - def test_apply_model_with_entry(self): - new_object = Gtk.ComboBox.new_with_entry() - - self.assertIsNone(new_object.get_model()) - - self.apply_model(new_object, []) - - self.assertIsNotNone(new_object.get_model()) - - def test_apply_model_only_combobox(self): - invalid_types = [1, "One", u'1', {'1': "one"}, VMListModeler( - mock_domains_info)] - - for invalid_type in invalid_types: - with self.assertRaises(TypeError): - self.apply_model(invalid_type, []) - - def test_apply_model_whitelist(self): - combo = Gtk.ComboBox() - - self.apply_model(combo, list(mock_domains_info.keys())) - self.assertEquals(7, len(combo.get_model())) - - names = [entry['api_name'] for entry in self._entries.values()] - - self.apply_model(combo, [names[0]]) - self.assertEquals(1, len(combo.get_model())) - - self.apply_model(combo, [names[0], names[1]]) - self.assertEquals(2, len(combo.get_model())) - - def test_apply_icon(self): - new_object = Gtk.Entry() - - self.assertIsNone( - new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY)) - - self.apply_icon(new_object, "Disposable VM (test-disp6)") - - self.assertIsNotNone( - new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY)) - - def test_apply_icon_only_entry(self): - invalid_types = [1, "One", u'1', {'1': "one"}, Gtk.ComboBox()] - - for invalid_type in invalid_types: - with self.assertRaises(TypeError): - self.apply_icon(invalid_type, "test-disp6") - - def test_apply_icon_only_existing(self): - new_object = Gtk.Entry() - - for name in ["test-red1", "test-red2", "test-red3", - "test-target", "Disposable VM (test-disp6)"]: - self.apply_icon(new_object, name) - - for name in ["test-nonexistant", None, "", 1]: - with self.assertRaises(ValueError): - self.apply_icon(new_object, name) - - -class GtkOneTimerHelperTest(GtkOneTimerHelper, GtkTestCase): - def __init__(self, *args, **kwargs): - GtkTestCase.__init__(self, *args, **kwargs) - - self._test_time = 0.1 - - GtkOneTimerHelper.__init__(self, self._test_time) - self._run_timers = [] - - def _timer_run(self, timer_id): - self._run_timers.append(timer_id) - - def test_nothing_runs_automatically(self): - self.flush_gtk_events(self._test_time*2) - self.assertEquals([], self._run_timers) - self.assertEquals(0, self._current_timer_id) - self.assertFalse(self._timer_has_completed()) - - def test_schedule_one_task(self): - self._timer_schedule() - self.flush_gtk_events(self._test_time*2) - self.assertEquals([1], self._run_timers) - self.assertEquals(1, self._current_timer_id) - self.assertTrue(self._timer_has_completed()) - - def test_invalidate_completed(self): - self._timer_schedule() - self.flush_gtk_events(self._test_time*2) - self.assertEquals([1], self._run_timers) - self.assertEquals(1, self._current_timer_id) - - self.assertTrue(self._timer_has_completed()) - self._invalidate_timer_completed() - self.assertFalse(self._timer_has_completed()) - - def test_schedule_and_cancel_one_task(self): - self._timer_schedule() - self._invalidate_current_timer() - self.flush_gtk_events(self._test_time*2) - self.assertEquals([], self._run_timers) - self.assertEquals(2, self._current_timer_id) - self.assertFalse(self._timer_has_completed()) - - def test_two_tasks(self): - self._timer_schedule() - self.flush_gtk_events(self._test_time/4) - self._timer_schedule() - self.flush_gtk_events(self._test_time*2) - self.assertEquals([2], self._run_timers) - self.assertEquals(2, self._current_timer_id) - self.assertTrue(self._timer_has_completed()) - - def test_more_tasks(self): - num = 0 - for num in range(1, 10): - self._timer_schedule() - self.flush_gtk_events(self._test_time/4) - self.flush_gtk_events(self._test_time*1.75) - self.assertEquals([num], self._run_timers) - self.assertEquals(num, self._current_timer_id) - self.assertTrue(self._timer_has_completed()) - - def test_more_tasks_cancel(self): - num = 0 - for num in range(1, 10): - self._timer_schedule() - self.flush_gtk_events(self._test_time/4) - self._invalidate_current_timer() - self.flush_gtk_events(int(self._test_time*1.75)) - self.assertEquals([], self._run_timers) - self.assertEquals(num+1, self._current_timer_id) - self.assertFalse(self._timer_has_completed()) - - def test_subsequent_tasks(self): - self._timer_schedule() # 1 - self.flush_gtk_events(self._test_time*2) - self.assertEquals([1], self._run_timers) - self.assertEquals(1, self._current_timer_id) - self.assertTrue(self._timer_has_completed()) - - self._timer_schedule() # 2 - self.flush_gtk_events(self._test_time*2) - self.assertEquals([1, 2], self._run_timers) - self.assertEquals(2, self._current_timer_id) - self.assertTrue(self._timer_has_completed()) - - self._invalidate_timer_completed() - self._timer_schedule() # 3 - self._invalidate_current_timer() # 4 - self.flush_gtk_events(self._test_time*2) - self.assertEquals([1, 2], self._run_timers) - self.assertEquals(4, self._current_timer_id) - self.assertFalse(self._timer_has_completed()) - - self._timer_schedule() # 5 - self.flush_gtk_events(self._test_time*2) - self.assertEquals([1, 2, 5], self._run_timers) - self.assertEquals(5, self._current_timer_id) - self.assertTrue(self._timer_has_completed()) - - -class FocusStealingHelperMock(FocusStealingHelper): - def simulate_focus(self): - self._window_changed_focus(True) - - -@skipUnlessEnv('DISPLAY') -class FocusStealingHelperTest(FocusStealingHelperMock, GtkTestCase): - def __init__(self, *args, **kwargs): - GtkTestCase.__init__(self, *args, **kwargs) - - self._test_time = 0.1 - self._test_button = Gtk.Button() - self._test_window = Gtk.Window() - - FocusStealingHelperMock.__init__(self, self._test_window, - self._test_button, self._test_time) - - def test_nothing_runs_automatically(self): - self.assertFalse(self.can_perform_action()) - self.flush_gtk_events(self._test_time*2) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - def test_nothing_runs_automatically_with_request(self): - self.request_sensitivity(True) - self.assertFalse(self.can_perform_action()) - self.flush_gtk_events(self._test_time*2) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - def _simulate_focus(self, focused): - self._window_changed_focus(focused) - - def test_focus_with_request(self): - self.request_sensitivity(True) - self._simulate_focus(True) - self.flush_gtk_events(self._test_time*2) - self.assertTrue(self.can_perform_action()) - self.assertTrue(self._test_button.get_sensitive()) - - def test_focus_with_late_request(self): - self._simulate_focus(True) - self.flush_gtk_events(self._test_time*2) - self.assertTrue(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - self.request_sensitivity(True) - self.assertTrue(self._test_button.get_sensitive()) - - def test_immediate_defocus(self): - self.request_sensitivity(True) - self._simulate_focus(True) - self._simulate_focus(False) - self.flush_gtk_events(self._test_time*2) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - def test_focus_then_unfocus(self): - self.request_sensitivity(True) - self._simulate_focus(True) - self.flush_gtk_events(self._test_time*2) - self.assertTrue(self.can_perform_action()) - self.assertTrue(self._test_button.get_sensitive()) - - self._simulate_focus(False) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - def test_focus_cycle(self): - self.request_sensitivity(True) - - self._simulate_focus(True) - self.flush_gtk_events(self._test_time*2) - self.assertTrue(self.can_perform_action()) - self.assertTrue(self._test_button.get_sensitive()) - - self._simulate_focus(False) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - self._simulate_focus(True) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - self.flush_gtk_events(self._test_time*2) - self.assertTrue(self.can_perform_action()) - self.assertTrue(self._test_button.get_sensitive()) - - self.request_sensitivity(False) - self.assertTrue(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - self._simulate_focus(False) - self.assertFalse(self.can_perform_action()) - - self._simulate_focus(True) - self.assertFalse(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - - self.flush_gtk_events(self._test_time*2) - self.assertTrue(self.can_perform_action()) - self.assertFalse(self._test_button.get_sensitive()) - -if __name__ == '__main__': - unittest.main() diff --git a/qubespolicy/tests/rpcconfirmation.py b/qubespolicy/tests/rpcconfirmation.py deleted file mode 100755 index f0c426ca..00000000 --- a/qubespolicy/tests/rpcconfirmation.py +++ /dev/null @@ -1,359 +0,0 @@ -#!/usr/bin/python -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2017 boring-stuff -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, see . -# - -import sys -import unittest - -from qubespolicy.tests.gtkhelpers import GtkTestCase, FocusStealingHelperMock -from qubespolicy.tests.gtkhelpers import mock_domains_info, mock_whitelist - -from qubespolicy.gtkhelpers import VMListModeler -from qubespolicy.rpcconfirmation import RPCConfirmationWindow - - -class MockRPCConfirmationWindow(RPCConfirmationWindow): - def _new_vm_list_modeler(self): - return VMListModeler(mock_domains_info) - - def _new_focus_stealing_helper(self): - return FocusStealingHelperMock( - self._rpc_window, - self._rpc_ok_button, - self._focus_stealing_seconds) - - def __init__(self, source, rpc_operation, whitelist, - target=None, focus_stealing_seconds=1): - self._focus_stealing_seconds = focus_stealing_seconds - - RPCConfirmationWindow.__init__( - self, mock_domains_info, source, rpc_operation, whitelist, - target) - - def is_error_visible(self): - return self._error_bar.get_visible() - - def get_shown_domains(self): - model = self._rpc_combo_box.get_model() - model_iter = model.get_iter_first() - domains = [] - - while model_iter is not None: - domain_name = model.get_value(model_iter, 1) - - domains += [domain_name] - - model_iter = model.iter_next(model_iter) - - return domains - - -class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase): - def __init__(self, test_method, source_name="test-source", - rpc_operation="test.Operation", whitelist=mock_whitelist, - target_name=None): - GtkTestCase.__init__(self, test_method) - self.test_source_name = source_name - self.test_rpc_operation = rpc_operation - self.test_target_name = target_name - - self._test_time = 0.1 - - self.test_called_close = False - self.test_called_show = False - - self.test_clicked_ok = False - self.test_clicked_cancel = False - - MockRPCConfirmationWindow.__init__(self, - self.test_source_name, - self.test_rpc_operation, - whitelist, - self.test_target_name, - focus_stealing_seconds=self._test_time) - - def _can_perform_action(self): - return True - - def _close(self): - self.test_called_close = True - - def _show(self): - self.test_called_show = True - - def _clicked_ok(self, button): - MockRPCConfirmationWindow._clicked_ok(self, button) - self.test_clicked_ok = True - - def _clicked_cancel(self, button): - MockRPCConfirmationWindow._clicked_cancel(self, button) - self.test_clicked_cancel = True - - def test_has_linked_the_fields(self): - self.assertIsNotNone(self._rpc_window) - self.assertIsNotNone(self._rpc_ok_button) - self.assertIsNotNone(self._rpc_cancel_button) - self.assertIsNotNone(self._rpc_label) - self.assertIsNotNone(self._source_entry) - self.assertIsNotNone(self._rpc_combo_box) - self.assertIsNotNone(self._error_bar) - self.assertIsNotNone(self._error_message) - - def test_is_showing_source(self): - self.assertTrue(self.test_source_name in self._source_entry.get_text()) - - def test_is_showing_operation(self): - self.assertTrue(self.test_rpc_operation in self._rpc_label.get_text()) - - def test_escape_and_format_rpc_text(self): - self.assertEquals("qubes.Test", - self._escape_and_format_rpc_text("qubes.Test")) - self.assertEquals("custom.Domain", - self._escape_and_format_rpc_text("custom.Domain")) - self.assertEquals("nodomain", - self._escape_and_format_rpc_text("nodomain")) - self.assertEquals("domain.Sub.Operation", - self._escape_and_format_rpc_text("domain.Sub.Operation")) - self.assertEquals("", - self._escape_and_format_rpc_text("")) - self.assertEquals(".", - self._escape_and_format_rpc_text(".")) - self.assertEquals("inject.<script>", - self._escape_and_format_rpc_text("inject.