core-admin/qubespolicy/cli.py
Marek Marczykowski-Górecki 68b6f1ec76
qubespolicy: use '@' instead of '$' for policy keywords
Using '$' is easy to misuse in shell scripts, shell commands etc. After
all this years, lets abandon this dangerous character and move to
something safer: '@'. The choice was made after reviewing specifications
of various shells on different operating systems and this is the
character that have no special meaning in none of them.

To preserve compatibility, automatically translate '$' to '@' when
loading policy files.
2018-02-19 03:33:40 +01:00

145 lines
5.7 KiB
Python

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
import argparse
import logging
import logging.handlers
import os
import sys
import qubespolicy
parser = argparse.ArgumentParser(description="Evaluate qrexec policy")
parser.add_argument("--assume-yes-for-ask", action="store_true",
dest="assume_yes_for_ask", default=False,
help="Allow run of service without confirmation if policy say 'ask'")
parser.add_argument("--just-evaluate", action="store_true",
dest="just_evaluate", default=False,
help="Do not run the service, only evaluate policy; "
"retcode=0 means 'allow'")
parser.add_argument('domain_id', metavar='src-domain-id',
help='Source domain ID (Xen ID or similar, not Qubes ID)')
parser.add_argument('domain', metavar='src-domain-name',
help='Source domain name')
parser.add_argument('target', metavar='dst-domain-name',
help='Target domain name')
parser.add_argument('service_name', metavar='service-name',
help='Service name')
parser.add_argument('process_ident', metavar='process-ident',
help='Qrexec process identifier - for connecting data channel')
def create_default_policy(service_name):
policy_file = os.path.join(qubespolicy.POLICY_DIR, service_name)
with open(policy_file, "w") as policy:
policy.write(
"## Policy file automatically created on first service call.\n")
policy.write(
"## Fill free to edit.\n")
policy.write("## Note that policy parsing stops at the first match\n")
policy.write("\n")
policy.write("## Please use a single # to start your custom comments\n")
policy.write("\n")
policy.write("@anyvm @anyvm ask\n")
def main(args=None):
args = parser.parse_args(args)
# Add source domain information, required by qrexec-client for establishing
# connection
caller_ident = args.process_ident + "," + args.domain + "," + args.domain_id
log = logging.getLogger('qubespolicy')
log.setLevel(logging.INFO)
if not log.handlers:
handler = logging.handlers.SysLogHandler(address='/dev/log')
log.addHandler(handler)
log_prefix = 'qrexec: {}: {} -> {}: '.format(
args.service_name, args.domain, args.target)
try:
system_info = qubespolicy.get_system_info()
except qubespolicy.QubesMgmtException as e:
log.error(log_prefix + 'error getting system info: ' + str(e))
return 1
try:
try:
policy = qubespolicy.Policy(args.service_name)
except qubespolicy.PolicyNotFound:
service_name = args.service_name.split('+')[0]
import pydbus
bus = pydbus.SystemBus()
proxy = bus.get('org.qubesos.PolicyAgent',
'/org/qubesos/PolicyAgent')
create_policy = proxy.ConfirmPolicyCreate(
args.domain, service_name)
if create_policy:
create_default_policy(service_name)
policy = qubespolicy.Policy(args.service_name)
else:
raise
action = policy.evaluate(system_info, args.domain, args.target)
if args.assume_yes_for_ask and action.action == qubespolicy.Action.ask:
action.action = qubespolicy.Action.allow
if args.just_evaluate:
return {
qubespolicy.Action.allow: 0,
qubespolicy.Action.deny: 1,
qubespolicy.Action.ask: 1,
}[action.action]
if action.action == qubespolicy.Action.ask:
# late import to save on time for allow/deny actions
import pydbus
bus = pydbus.SystemBus()
proxy = bus.get('org.qubesos.PolicyAgent',
'/org/qubesos/PolicyAgent')
icons = {name: system_info['domains'][name]['icon']
for name in system_info['domains'].keys()}
for dispvm_base in system_info['domains']:
if not (system_info['domains'][dispvm_base]
['template_for_dispvms']):
continue
dispvm_api_name = '@dispvm:' + dispvm_base
icons[dispvm_api_name] = \
system_info['domains'][dispvm_base]['icon']
icons[dispvm_api_name] = \
icons[dispvm_api_name].replace('app', 'disp')
response = proxy.Ask(args.domain, args.service_name,
action.targets_for_ask, action.target or '', icons)
if response:
action.handle_user_response(True, response)
else:
action.handle_user_response(False)
log.info(log_prefix + 'allowed to {}'.format(action.target))
action.execute(caller_ident)
except qubespolicy.PolicySyntaxError as e:
log.error(log_prefix + 'error loading policy: ' + str(e))
return 1
except qubespolicy.AccessDenied as e:
log.info(log_prefix + 'denied: ' + str(e))
return 1
return 0
if __name__ == '__main__':
sys.exit(main())