cli.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import argparse
  21. import logging
  22. import logging.handlers
  23. import sys
  24. import qubespolicy
  25. parser = argparse.ArgumentParser(description="Evaluate qrexec policy")
  26. parser.add_argument("--assume-yes-for-ask", action="store_true",
  27. dest="assume_yes_for_ask", default=False,
  28. help="Allow run of service without confirmation if policy say 'ask'")
  29. parser.add_argument("--just-evaluate", action="store_true",
  30. dest="just_evaluate", default=False,
  31. help="Do not run the service, only evaluate policy; "
  32. "retcode=0 means 'allow'")
  33. parser.add_argument('domain_id', metavar='src-domain-id',
  34. help='Source domain ID (Xen ID or similar, not Qubes ID)')
  35. parser.add_argument('domain', metavar='src-domain-name',
  36. help='Source domain name')
  37. parser.add_argument('target', metavar='dst-domain-name',
  38. help='Target domain name')
  39. parser.add_argument('service_name', metavar='service-name',
  40. help='Service name')
  41. parser.add_argument('process_ident', metavar='process-ident',
  42. help='Qrexec process identifier - for connecting data channel')
  43. def main(args=None):
  44. args = parser.parse_args(args)
  45. # Add source domain information, required by qrexec-client for establishing
  46. # connection
  47. caller_ident = args.process_ident + "," + args.domain + "," + args.domain_id
  48. log = logging.getLogger('qubespolicy')
  49. log.setLevel(logging.INFO)
  50. handler = logging.handlers.SysLogHandler(address='/dev/log')
  51. log.addHandler(handler)
  52. log_prefix = 'qrexec: {}: {} -> {}: '.format(
  53. args.service_name, args.domain, args.target)
  54. try:
  55. system_info = qubespolicy.get_system_info()
  56. except qubespolicy.QubesMgmtException as e:
  57. log.error(log_prefix + 'error getting system info: ' + str(e))
  58. return 1
  59. try:
  60. policy = qubespolicy.Policy(args.service_name)
  61. action = policy.evaluate(system_info, args.domain, args.target)
  62. if args.assume_yes_for_ask and action.action == qubespolicy.Action.ask:
  63. action.action = qubespolicy.Action.allow
  64. if args.just_evaluate:
  65. return {
  66. qubespolicy.Action.allow: 0,
  67. qubespolicy.Action.deny: 1,
  68. qubespolicy.Action.ask: 1,
  69. }[action.action]
  70. if action.action == qubespolicy.Action.ask:
  71. # late import to save on time for allow/deny actions
  72. import pydbus
  73. bus = pydbus.SystemBus()
  74. proxy = bus.get('org.qubesos.PolicyAgent',
  75. '/org/qubesos/PolicyAgent')
  76. icons = {name: system_info['domains'][name]['icon']
  77. for name in system_info['domains'].keys()}
  78. for dispvm_base in system_info['domains']:
  79. if not system_info['domains'][dispvm_base]['dispvm_allowed']:
  80. continue
  81. dispvm_api_name = '$dispvm:' + dispvm_base
  82. icons[dispvm_api_name] = \
  83. system_info['domains'][dispvm_base]['icon']
  84. icons[dispvm_api_name] = \
  85. icons[dispvm_api_name].replace('app', 'disp')
  86. response = proxy.Ask(args.domain, args.service_name,
  87. action.targets_for_ask, action.target or '', icons)
  88. if response:
  89. action.handle_user_response(True, response)
  90. else:
  91. action.handle_user_response(False)
  92. log.info(log_prefix + 'allowed to {}'.format(action.target))
  93. action.execute(caller_ident)
  94. except qubespolicy.PolicySyntaxError as e:
  95. log.error(log_prefix + 'error loading policy: ' + str(e))
  96. return 1
  97. except qubespolicy.AccessDenied as e:
  98. log.info(log_prefix + 'denied: ' + str(e))
  99. return 1
  100. return 0
  101. if __name__ == '__main__':
  102. sys.exit(main())