# -*- encoding: utf8 -*- # # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2017 Marek Marczykowski-Górecki # # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, see . import argparse import json import os import sys import qubespolicy parser = argparse.ArgumentParser(description='Graph qrexec policy') parser.add_argument('--include-ask', action='store_true', help='Include `ask` action in graph') parser.add_argument('--source', action='store', nargs='+', help='Limit graph to calls from *source*') parser.add_argument('--target', action='store', nargs='+', help='Limit graph to calls to *target*') parser.add_argument('--service', action='store', nargs='+', help='Limit graph to *service*') parser.add_argument('--output', action='store', help='Write to *output* instead of stdout') parser.add_argument('--policy-dir', action='store', default=qubespolicy.POLICY_DIR, help='Look for policy in *policy-dir*') parser.add_argument('--system-info', action='store', help='Load system information from file instead of querying qubesd') parser.add_argument('--skip-labels', action='store_true', help='Do not include service names on the graph, also deduplicate ' 'connections.') def handle_single_action(args, action): '''Get single policy action and output (or not) a line to add''' if args.skip_labels: service = '' else: service = action.service if action.action == qubespolicy.Action.ask: if args.include_ask: # handle forced target= if len(action.targets_for_ask) == 1: return ' "{}" -> "{}" [label="{}" color=orange];\n'.format( action.source, action.targets_for_ask[0], service) return ' "{}" -> "{}" [label="{}" color=orange];\n'.format( action.source, action.original_target, service) elif action.action == qubespolicy.Action.allow: return ' "{}" -> "{}" [label="{}" color=red];\n'.format( action.source, action.target, service) return '' def main(args=None): args = parser.parse_args(args) output = sys.stdout if args.output: output = open(args.output, 'w') if args.system_info: with open(args.system_info) as f_system_info: system_info = json.load(f_system_info) else: system_info = qubespolicy.get_system_info() sources = list(system_info['domains'].keys()) if args.source: sources = args.source targets = list(system_info['domains'].keys()) if args.target: targets = args.target else: targets.append('$dispvm') targets.extend('$dispvm:' + dom for dom in system_info['domains'] if system_info['domains'][dom]['dispvm_allowed']) connections = set() output.write('digraph g {\n') for service in os.listdir(args.policy_dir): if os.path.isdir(os.path.join(args.policy_dir, service)): continue if args.service and service not in args.service and \ not any(service.startswith(srv + '+') for srv in args.service): continue policy = qubespolicy.Policy(service, args.policy_dir) for source in sources: for target in targets: try: action = policy.evaluate(system_info, source, target) line = handle_single_action(args, action) if line in connections: continue if line: output.write(line) connections.add(line) except qubespolicy.AccessDenied: continue output.write('}\n') if args.output: output.close() if __name__ == '__main__': sys.exit(main())