cli.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. import argparse
  21. import logging
  22. import logging.handlers
  23. import os
  24. import sys
  25. import qubespolicy
  26. parser = argparse.ArgumentParser(description="Evaluate qrexec policy")
  27. parser.add_argument("--assume-yes-for-ask", action="store_true",
  28. dest="assume_yes_for_ask", default=False,
  29. help="Allow run of service without confirmation if policy say 'ask'")
  30. parser.add_argument("--just-evaluate", action="store_true",
  31. dest="just_evaluate", default=False,
  32. help="Do not run the service, only evaluate policy; "
  33. "retcode=0 means 'allow'")
  34. parser.add_argument('domain_id', metavar='src-domain-id',
  35. help='Source domain ID (Xen ID or similar, not Qubes ID)')
  36. parser.add_argument('domain', metavar='src-domain-name',
  37. help='Source domain name')
  38. parser.add_argument('target', metavar='dst-domain-name',
  39. help='Target domain name')
  40. parser.add_argument('service_name', metavar='service-name',
  41. help='Service name')
  42. parser.add_argument('process_ident', metavar='process-ident',
  43. help='Qrexec process identifier - for connecting data channel')
  44. def create_default_policy(service_name):
  45. policy_file = os.path.join(qubespolicy.POLICY_DIR, service_name)
  46. with open(policy_file, "w") as policy:
  47. policy.write(
  48. "## Policy file automatically created on first service call.\n")
  49. policy.write(
  50. "## Fill free to edit.\n")
  51. policy.write("## Note that policy parsing stops at the first match\n")
  52. policy.write("\n")
  53. policy.write("## Please use a single # to start your custom comments\n")
  54. policy.write("\n")
  55. policy.write("@anyvm @anyvm ask\n")
  56. def main(args=None):
  57. args = parser.parse_args(args)
  58. # Add source domain information, required by qrexec-client for establishing
  59. # connection
  60. caller_ident = args.process_ident + "," + args.domain + "," + args.domain_id
  61. log = logging.getLogger('qubespolicy')
  62. log.setLevel(logging.INFO)
  63. if not log.handlers:
  64. handler = logging.handlers.SysLogHandler(address='/dev/log')
  65. log.addHandler(handler)
  66. log_prefix = 'qrexec: {}: {} -> {}: '.format(
  67. args.service_name, args.domain, args.target)
  68. try:
  69. system_info = qubespolicy.get_system_info()
  70. except qubespolicy.QubesMgmtException as e:
  71. log.error(log_prefix + 'error getting system info: ' + str(e))
  72. return 1
  73. try:
  74. try:
  75. policy = qubespolicy.Policy(args.service_name)
  76. except qubespolicy.PolicyNotFound:
  77. service_name = args.service_name.split('+')[0]
  78. import pydbus
  79. bus = pydbus.SystemBus()
  80. proxy = bus.get('org.qubesos.PolicyAgent',
  81. '/org/qubesos/PolicyAgent')
  82. create_policy = proxy.ConfirmPolicyCreate(
  83. args.domain, service_name)
  84. if create_policy:
  85. create_default_policy(service_name)
  86. policy = qubespolicy.Policy(args.service_name)
  87. else:
  88. raise
  89. action = policy.evaluate(system_info, args.domain, args.target)
  90. if args.assume_yes_for_ask and action.action == qubespolicy.Action.ask:
  91. action.action = qubespolicy.Action.allow
  92. if args.just_evaluate:
  93. return {
  94. qubespolicy.Action.allow: 0,
  95. qubespolicy.Action.deny: 1,
  96. qubespolicy.Action.ask: 1,
  97. }[action.action]
  98. if action.action == qubespolicy.Action.ask:
  99. # late import to save on time for allow/deny actions
  100. import pydbus
  101. bus = pydbus.SystemBus()
  102. proxy = bus.get('org.qubesos.PolicyAgent',
  103. '/org/qubesos/PolicyAgent')
  104. icons = {name: system_info['domains'][name]['icon']
  105. for name in system_info['domains'].keys()}
  106. for dispvm_base in system_info['domains']:
  107. if not (system_info['domains'][dispvm_base]
  108. ['template_for_dispvms']):
  109. continue
  110. dispvm_api_name = '@dispvm:' + dispvm_base
  111. icons[dispvm_api_name] = \
  112. system_info['domains'][dispvm_base]['icon']
  113. icons[dispvm_api_name] = \
  114. icons[dispvm_api_name].replace('app', 'disp')
  115. response = proxy.Ask(args.domain, args.service_name,
  116. action.targets_for_ask, action.target or '', icons)
  117. if response:
  118. action.handle_user_response(True, response)
  119. else:
  120. action.handle_user_response(False)
  121. log.info(log_prefix + 'allowed to {}'.format(action.target))
  122. action.execute(caller_ident)
  123. except qubespolicy.PolicySyntaxError as e:
  124. log.error(log_prefix + 'error loading policy: ' + str(e))
  125. return 1
  126. except qubespolicy.AccessDenied as e:
  127. log.info(log_prefix + 'denied: ' + str(e))
  128. return 1
  129. return 0
  130. if __name__ == '__main__':
  131. sys.exit(main())