123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742 |
- # coding=utf-8
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2013-2015 Joanna Rutkowska <joanna@invisiblethingslab.com>
- # Copyright (C) 2013-2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- ''' Qrexec policy parser and evaluator '''
- import enum
- import itertools
- import json
- import os
- import os.path
- import socket
- import subprocess
- # don't import 'qubes.config' please, it takes 0.3s
- QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client'
- QUBES_RPC_MULTIPLEXER_PATH = '/usr/lib/qubes/qubes-rpc-multiplexer'
- POLICY_DIR = '/etc/qubes-rpc/policy'
- QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'
- QUBESD_SOCK = '/var/run/qubesd.sock'
- class AccessDenied(Exception):
- ''' Raised when qrexec policy denied access '''
- pass
- class PolicySyntaxError(AccessDenied):
- ''' Syntax error in qrexec policy, abort parsing '''
- def __init__(self, filename, lineno, msg):
- super(PolicySyntaxError, self).__init__(
- '{}:{}: {}'.format(filename, lineno, msg))
- class PolicyNotFound(AccessDenied):
- ''' Policy was not found for this service '''
- def __init__(self, service_name):
- super(PolicyNotFound, self).__init__(
- 'Policy not found for service {}'.format(service_name))
- class Action(enum.Enum):
- ''' Action as defined by policy '''
- allow = 1
- deny = 2
- ask = 3
- def verify_target_value(system_info, value):
- ''' Check if given value names valid target
- This function check if given value is not only syntactically correct,
- but also if names valid service call target (existing domain,
- or valid $dispvm like keyword)
- :param system_info: information about the system
- :param value: value to be checked
- '''
- if value == '$dispvm':
- return True
- elif value == '$adminvm':
- return True
- elif value.startswith('$dispvm:'):
- dispvm_base = value.split(':', 1)[1]
- if dispvm_base not in system_info['domains']:
- return False
- dispvm_base_info = system_info['domains'][dispvm_base]
- return bool(dispvm_base_info['template_for_dispvms'])
- else:
- return value in system_info['domains']
- def verify_special_value(value, for_target=True, specific_target=False):
- '''
- Verify if given special VM-specifier ('$...') is valid
- :param value: value to verify
- :param for_target: should classify target-only values as valid (
- '$default', '$dispvm')
- :param specific_target: allow only values naming specific target
- (for use with target=, default= etc)
- :return: True or False
- '''
- # pylint: disable=too-many-return-statements
- # values used only for matching VMs, not naming specific one (for actual
- # call target)
- if not specific_target:
- if value.startswith('$tag:') and len(value) > len('$tag:'):
- return True
- if value.startswith('$type:') and len(value) > len('$type:'):
- return True
- if for_target and value.startswith('$dispvm:$tag:') and \
- len(value) > len('$dispvm:$tag:'):
- return True
- if value == '$anyvm':
- return True
- if for_target and value == '$default':
- return True
- # those can be used to name one specific call VM
- if value == '$adminvm':
- return True
- # allow only specific dispvm, not based on any $xxx keyword - don't name
- # $tag here specifically, to work also with any future keywords
- if for_target and value.startswith('$dispvm:') and \
- not value.startswith('$dispvm:$'):
- return True
- if for_target and value == '$dispvm':
- return True
- return False
- class PolicyRule(object):
- ''' A single line of policy file '''
- def __init__(self, line, filename=None, lineno=None):
- '''
- Load a single line of qrexec policy and check its syntax.
- Do not verify existence of named objects.
- :raise PolicySyntaxError: when syntax error is found
- :param line: a single line of actual qrexec policy (not a comment,
- empty line or $include)
- :param filename: name of the file from which this line is loaded
- :param lineno: line number from which this line is loaded
- '''
- self.lineno = lineno
- self.filename = filename
- try:
- self.source, self.target, self.full_action = line.split(maxsplit=2)
- except ValueError:
- raise PolicySyntaxError(filename, lineno, 'wrong number of fields')
- (action, *params) = self.full_action.replace(',', ' ').split()
- try:
- self.action = Action[action]
- except KeyError:
- raise PolicySyntaxError(filename, lineno,
- 'invalid action: {}'.format(action))
- #: alternative target, used instead of the one specified by the caller
- self.override_target = None
- #: alternative user, used instead of vm.default_user
- self.override_user = None
- #: default target when asking the user for confirmation
- self.default_target = None
- for param in params:
- try:
- param_name, value = param.split('=')
- except ValueError:
- raise PolicySyntaxError(filename, lineno,
- 'invalid action parameter syntax: {}'.format(param))
- if param_name == 'target':
- if self.action == Action.deny:
- raise PolicySyntaxError(filename, lineno,
- 'target= option not allowed for deny action')
- self.override_target = value
- elif param_name == 'user':
- if self.action == Action.deny:
- raise PolicySyntaxError(filename, lineno,
- 'user= option not allowed for deny action')
- self.override_user = value
- elif param_name == 'default_target':
- if self.action != Action.ask:
- raise PolicySyntaxError(filename, lineno,
- 'default_target= option allowed only for ask action')
- self.default_target = value
- else:
- raise PolicySyntaxError(filename, lineno,
- 'invalid option {} for {} action'.format(param, action))
- # verify special values
- if self.source.startswith('$'):
- if not verify_special_value(self.source, False, False):
- raise PolicySyntaxError(filename, lineno,
- 'invalid source specification: {}'.format(self.source))
- if self.target.startswith('$'):
- if not verify_special_value(self.target, True, False):
- raise PolicySyntaxError(filename, lineno,
- 'invalid target specification: {}'.format(self.target))
- if self.target == '$default' \
- and self.action == Action.allow \
- and self.override_target is None:
- raise PolicySyntaxError(filename, lineno,
- 'allow action for $default rule must specify target= option')
- if self.override_target is not None:
- if self.override_target.startswith('$') and \
- not verify_special_value(self.override_target, True, True):
- raise PolicySyntaxError(filename, lineno,
- 'target= option needs to name specific target')
- if self.default_target is not None:
- if self.default_target.startswith('$') and \
- not verify_special_value(self.default_target, True, True):
- raise PolicySyntaxError(filename, lineno,
- 'target= option needs to name specific target')
- @staticmethod
- def is_match_single(system_info, policy_value, value):
- '''
- Evaluate if a single value (VM name or '$default') matches policy
- specification
- :param system_info: information about the system
- :param policy_value: value from qrexec policy (either self.source or
- self.target)
- :param value: value to be compared (source or target)
- :return: True or False
- '''
- # pylint: disable=too-many-return-statements
- # not specified target matches only with $default and $anyvm policy
- # entry
- if value == '$default' or value == '':
- return policy_value in ('$default', '$anyvm')
- # if specific target used, check if it's valid
- # this function (is_match_single) is also used for checking call source
- # values, but this isn't a problem, because it will always be a
- # domain name (not $dispvm or such) - this is guaranteed by a nature
- # of qrexec call
- if not verify_target_value(system_info, value):
- return False
- # handle $adminvm keyword
- if policy_value == 'dom0':
- # TODO: log a warning in Qubes 4.1
- policy_value = '$adminvm'
- if value == 'dom0':
- value = '$adminvm'
- # allow any _valid_, non-dom0 target
- if policy_value == '$anyvm':
- return value != '$adminvm'
- # exact match, including $dispvm* and $adminvm
- if value == policy_value:
- return True
- # DispVM request, using tags to match
- if policy_value.startswith('$dispvm:$tag:') \
- and value.startswith('$dispvm:'):
- tag = policy_value.split(':', 2)[2]
- dispvm_base = value.split(':', 1)[1]
- # already checked for existence by verify_target_value call
- dispvm_base_info = system_info['domains'][dispvm_base]
- return tag in dispvm_base_info['tags']
- # if $dispvm* not matched above, reject it; default DispVM (bare
- # $dispvm) was resolved by the caller
- if value.startswith('$dispvm:'):
- return False
- # require $adminvm to be matched explicitly (not through $tag or $type)
- # - if not matched already, reject it
- if value == '$adminvm':
- return False
- # at this point, value name a specific target
- domain_info = system_info['domains'][value]
- if policy_value.startswith('$tag:'):
- tag = policy_value.split(':', 1)[1]
- return tag in domain_info['tags']
- if policy_value.startswith('$type:'):
- type_ = policy_value.split(':', 1)[1]
- return type_ == domain_info['type']
- return False
- def is_match(self, system_info, source, target):
- '''
- Check if given (source, target) matches this policy line.
- :param system_info: information about the system - available VMs,
- their types, labels, tags etc. as returned by
- :py:func:`app_to_system_info`
- :param source: name of the source VM
- :param target: name of the target VM, or None if not specified
- :return: True or False
- '''
- if not self.is_match_single(system_info, self.source, source):
- return False
- # $dispvm in policy matches _only_ $dispvm (but not $dispvm:some-vm,
- # even if that would be the default one)
- if self.target == '$dispvm' and target == '$dispvm':
- return True
- if target == '$dispvm':
- # resolve default DispVM, to check all kinds of $dispvm:*
- default_dispvm = system_info['domains'][source]['default_dispvm']
- if default_dispvm is None:
- # if this VM have no default DispVM, match only with $anyvm
- return self.target == '$anyvm'
- target = '$dispvm:' + default_dispvm
- if not self.is_match_single(system_info, self.target, target):
- return False
- return True
- def expand_target(self, system_info):
- '''
- Return domains matching target of this policy line
- :param system_info: information about the system
- :return: matching domains
- '''
- if self.target.startswith('$tag:'):
- tag = self.target.split(':', 1)[1]
- for name, domain in system_info['domains'].items():
- if tag in domain['tags']:
- yield name
- elif self.target.startswith('$type:'):
- type_ = self.target.split(':', 1)[1]
- for name, domain in system_info['domains'].items():
- if type_ == domain['type']:
- yield name
- elif self.target == '$anyvm':
- for name, domain in system_info['domains'].items():
- if name != 'dom0':
- yield name
- if domain['template_for_dispvms']:
- yield '$dispvm:' + name
- yield '$dispvm'
- elif self.target.startswith('$dispvm:$tag:'):
- tag = self.target.split(':', 2)[2]
- for name, domain in system_info['domains'].items():
- if tag in domain['tags']:
- if domain['template_for_dispvms']:
- yield '$dispvm:' + name
- elif self.target.startswith('$dispvm:'):
- dispvm_base = self.target.split(':', 1)[1]
- try:
- if system_info['domains'][dispvm_base]['template_for_dispvms']:
- yield self.target
- except KeyError:
- # TODO log a warning?
- pass
- elif self.target == '$adminvm':
- yield self.target
- elif self.target == '$dispvm':
- yield self.target
- else:
- if self.target in system_info['domains']:
- yield self.target
- def expand_override_target(self, system_info, source):
- '''
- Replace '$dispvm' with specific '$dispvm:...' value, based on qrexec
- call source.
- :param system_info: System information
- :param source: Source domain name
- :return: :py:attr:`override_target` with '$dispvm' substituted
- '''
- if self.override_target == '$dispvm':
- if system_info['domains'][source]['default_dispvm'] is None:
- return None
- return '$dispvm:' + system_info['domains'][source]['default_dispvm']
- else:
- return self.override_target
- class PolicyAction(object):
- ''' Object representing positive policy evaluation result -
- either ask or allow action '''
- def __init__(self, service, source, target, rule, original_target,
- targets_for_ask=None):
- #: service name
- self.service = service
- #: calling domain
- self.source = source
- #: target domain the service should be connected to, None if
- # not chosen yet
- if targets_for_ask is None or target in targets_for_ask:
- self.target = target
- else:
- # TODO: log a warning?
- self.target = None
- #: original target specified by the caller
- self.original_target = original_target
- #: targets for the user to choose from
- self.targets_for_ask = targets_for_ask
- #: policy rule from which this action is derived
- self.rule = rule
- if rule.action == Action.deny:
- # this should be really rejected by Policy.eval()
- raise AccessDenied(
- 'denied by policy {}:{}'.format(rule.filename, rule.lineno))
- elif rule.action == Action.ask:
- assert targets_for_ask is not None
- elif rule.action == Action.allow:
- assert targets_for_ask is None
- assert target is not None
- self.action = rule.action
- def handle_user_response(self, response, target=None):
- '''
- Handle user response for the 'ask' action
- :param response: whether the call was allowed or denied (bool)
- :param target: target chosen by the user (if reponse==True)
- :return: None
- '''
- assert self.action == Action.ask
- if response:
- assert target in self.targets_for_ask
- self.target = target
- self.action = Action.allow
- else:
- self.action = Action.deny
- raise AccessDenied(
- 'denied by the user {}:{}'.format(self.rule.filename,
- self.rule.lineno))
- def execute(self, caller_ident):
- ''' Execute allowed service call
- :param caller_ident: Service caller ident
- (`process_ident,source_name, source_id`)
- '''
- assert self.action == Action.allow
- assert self.target is not None
- if self.target == '$adminvm':
- self.target = 'dom0'
- if self.target == 'dom0':
- cmd = '{multiplexer} {service} {source} {original_target}'.format(
- multiplexer=QUBES_RPC_MULTIPLEXER_PATH,
- service=self.service,
- source=self.source,
- original_target=self.original_target)
- else:
- cmd = '{user}:QUBESRPC {service} {source}'.format(
- user=(self.rule.override_user or 'DEFAULT'),
- service=self.service,
- source=self.source)
- if self.target.startswith('$dispvm:'):
- target = self.spawn_dispvm()
- dispvm = True
- else:
- target = self.target
- dispvm = False
- self.ensure_target_running()
- qrexec_opts = ['-d', target, '-c', caller_ident]
- if dispvm:
- qrexec_opts.append('-W')
- try:
- subprocess.call([QREXEC_CLIENT] + qrexec_opts + [cmd])
- finally:
- if dispvm:
- self.cleanup_dispvm(target)
- def spawn_dispvm(self):
- '''
- Create and start Disposable VM based on AppVM specified in
- :py:attr:`target`
- :return: name of new Disposable VM
- '''
- base_appvm = self.target.split(':', 1)[1]
- dispvm_name = qubesd_call(base_appvm, 'admin.vm.CreateDisposable')
- dispvm_name = dispvm_name.decode('ascii')
- qubesd_call(dispvm_name, 'admin.vm.Start')
- return dispvm_name
- def ensure_target_running(self):
- '''
- Start domain if not running already
- :return: None
- '''
- if self.target == 'dom0':
- return
- try:
- qubesd_call(self.target, 'admin.vm.Start')
- except QubesMgmtException as e:
- if e.exc_type == 'QubesVMNotHaltedError':
- pass
- else:
- raise
- @staticmethod
- def cleanup_dispvm(dispvm):
- '''
- Kill and remove Disposable VM
- :param dispvm: name of Disposable VM
- :return: None
- '''
- qubesd_call(dispvm, 'admin.vm.Kill')
- class Policy(object):
- ''' Full policy for a given service
- Usage:
- >>> system_info = get_system_info()
- >>> policy = Policy('some-service')
- >>> action = policy.evaluate(system_info, 'source-name', 'target-name')
- >>> if action.action == Action.ask:
- >>> # ... ask the user, see action.targets_for_ask ...
- >>> action.handle_user_response(response, target_chosen_by_user)
- >>> action.execute('process-ident')
- '''
- def __init__(self, service, policy_dir=POLICY_DIR):
- policy_file = os.path.join(policy_dir, service)
- if not os.path.exists(policy_file):
- # fallback to policy without specific argument set (if any)
- policy_file = os.path.join(policy_dir, service.split('+')[0])
- if not os.path.exists(policy_file):
- raise PolicyNotFound(service)
- #: policy storage directory
- self.policy_dir = policy_dir
- #: service name
- self.service = service
- #: list of PolicyLine objects
- self.policy_rules = []
- try:
- self.load_policy_file(policy_file)
- except OSError as e:
- raise AccessDenied(
- 'failed to load {} file: {!s}'.format(e.filename, e))
- def load_policy_file(self, path):
- ''' Load policy file and append rules to :py:attr:`policy_rules`
- :param path: file to load
- '''
- with open(path) as policy_file:
- for lineno, line in zip(itertools.count(start=1),
- policy_file.readlines()):
- line = line.strip()
- if not line:
- # skip empty lines
- continue
- if line[0] == '#':
- # skip comments
- continue
- if line.startswith('$include:'):
- include_path = line.split(':', 1)[1]
- # os.path.join will leave include_path unchanged if it's
- # already absolute
- include_path = os.path.join(self.policy_dir, include_path)
- self.load_policy_file(include_path)
- else:
- self.policy_rules.append(PolicyRule(line, path, lineno))
- def find_matching_rule(self, system_info, source, target):
- ''' Find the first rule matching given arguments '''
- for rule in self.policy_rules:
- if rule.is_match(system_info, source, target):
- return rule
- raise AccessDenied('no matching rule found')
- def collect_targets_for_ask(self, system_info, source):
- ''' Collect targets the user can choose from in 'ask' action
- Word 'targets' is used intentionally instead of 'domains', because it
- can also contains $dispvm like keywords.
- '''
- targets = set()
- # iterate over rules in reversed order to easier handle 'deny'
- # actions - simply remove matching domains from allowed set
- for rule in reversed(self.policy_rules):
- if rule.is_match_single(system_info, rule.source, source):
- if rule.action == Action.deny:
- targets -= set(rule.expand_target(system_info))
- else:
- if rule.override_target is not None:
- override_target = rule.expand_override_target(
- system_info, source)
- if verify_target_value(system_info, override_target):
- targets.add(rule.override_target)
- else:
- targets.update(rule.expand_target(system_info))
- # expand default DispVM
- if '$dispvm' in targets:
- targets.remove('$dispvm')
- if system_info['domains'][source]['default_dispvm'] is not None:
- dispvm = '$dispvm:' + \
- system_info['domains'][source]['default_dispvm']
- if verify_target_value(system_info, dispvm):
- targets.add(dispvm)
- # expand other keywords
- if '$adminvm' in targets:
- targets.remove('$adminvm')
- targets.add('dom0')
- return targets
- def evaluate(self, system_info, source, target):
- ''' Evaluate policy
- :raise AccessDenied: when action should be denied unconditionally
- :return tuple(rule, considered_targets) - where considered targets is a
- list of possible targets for 'ask' action (rule.action == Action.ask)
- '''
- rule = self.find_matching_rule(system_info, source, target)
- if rule.action == Action.deny:
- raise AccessDenied(
- 'denied by policy {}:{}'.format(rule.filename, rule.lineno))
- if rule.override_target is not None:
- override_target = rule.expand_override_target(system_info, source)
- if not verify_target_value(system_info, override_target):
- raise AccessDenied('invalid target= value in {}:{}'.format(
- rule.filename, rule.lineno))
- actual_target = override_target
- else:
- actual_target = target
- if rule.action == Action.ask:
- if rule.override_target is not None:
- targets = [actual_target]
- else:
- targets = list(
- self.collect_targets_for_ask(system_info, source))
- if not targets:
- raise AccessDenied(
- 'policy define \'ask\' action at {}:{} but no target is '
- 'available to choose from'.format(
- rule.filename, rule.lineno))
- return PolicyAction(self.service, source, rule.default_target,
- rule, target, targets)
- elif rule.action == Action.allow:
- if actual_target == '$default':
- raise AccessDenied(
- 'policy define \'allow\' action at {}:{} but no target is '
- 'specified by caller or policy'.format(
- rule.filename, rule.lineno))
- if actual_target == '$dispvm':
- if system_info['domains'][source]['default_dispvm'] is None:
- raise AccessDenied(
- 'policy define \'allow\' action to $dispvm at {}:{} '
- 'but no DispVM base is set for this VM'.format(
- rule.filename, rule.lineno))
- actual_target = '$dispvm:' + \
- system_info['domains'][source]['default_dispvm']
- return PolicyAction(self.service, source,
- actual_target, rule, target)
- else:
- # should be unreachable
- raise AccessDenied(
- 'invalid action?! {}:{}'.format(rule.filename, rule.lineno))
- class QubesMgmtException(Exception):
- ''' Exception returned by qubesd '''
- def __init__(self, exc_type):
- super(QubesMgmtException, self).__init__()
- self.exc_type = exc_type
- def qubesd_call(dest, method, arg=None, payload=None):
- if method.startswith('internal.'):
- socket_path = QUBESD_INTERNAL_SOCK
- else:
- socket_path = QUBESD_SOCK
- try:
- client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- client_socket.connect(socket_path)
- except IOError:
- # TODO:
- raise
- # src, method, dest, arg
- for call_arg in ('dom0', method, dest, arg):
- if call_arg is not None:
- client_socket.sendall(call_arg.encode('ascii'))
- client_socket.sendall(b'\0')
- if payload is not None:
- client_socket.sendall(payload)
- client_socket.shutdown(socket.SHUT_WR)
- return_data = client_socket.makefile('rb').read()
- if return_data.startswith(b'0\x00'):
- return return_data[2:]
- elif return_data.startswith(b'2\x00'):
- (_, exc_type, _traceback, _format_string, _args) = \
- return_data.split(b'\x00', 4)
- raise QubesMgmtException(exc_type.decode('ascii'))
- else:
- raise AssertionError(
- 'invalid qubesd response: {!r}'.format(return_data))
- def get_system_info():
- ''' Get system information
- This retrieve information necessary to process qrexec policy. Returned
- data is nested dict structure with this structure:
- - domains:
- - `<domain name>`:
- - tags: list of tags
- - type: domain type
- - template_for_dispvms: should DispVM based on this VM be allowed
- - default_dispvm: name of default AppVM for DispVMs started from here
- '''
- system_info = qubesd_call('dom0', 'internal.GetSystemInfo')
- return json.loads(system_info.decode('utf-8'))
|