policy.RegisterArgument 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #!/usr/bin/env python3
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import logging
  21. import os
  22. import string
  23. import sys
  24. import pathlib
  25. POLICY_PATH = pathlib.Path('/etc/qubes-rpc/policy')
  26. POLICY_RULE = '{frontend} {backend} allow\n'
  27. # linux-utils/qrexec-lib/qrexec.h
  28. MAX_ARGUMENT_LEN = 64
  29. # core-admin-linux/qrexec/qrexec-daemon.c
  30. VALID_CHARS = set(map(ord, string.ascii_letters + string.digits + '-._'))
  31. def die(*args, **kwargs):
  32. logging.error(*args, **kwargs)
  33. sys.exit(1)
  34. def main():
  35. # pylint: disable=missing-docstring
  36. logging.basicConfig(
  37. level=logging.WARNING,
  38. filename='/var/log/qubes/policy-register.log',
  39. format='%(asctime)s %(message)s')
  40. backend = os.environ['QREXEC_REMOTE_DOMAIN']
  41. frontend = os.environ['QREXEC_REQUESTED_TARGET']
  42. rpcname = os.environ['QREXEC_SERVICE_ARGUMENT']
  43. logging.debug('%s %s → %s request, reading argument',
  44. rpcname, frontend, backend)
  45. untrusted_argument = sys.stdin.buffer.read(MAX_ARGUMENT_LEN)
  46. untrusted_overflow = sys.stdin.buffer.read(1)
  47. sys.stdin.buffer.close()
  48. if untrusted_overflow:
  49. die('%s: %s → %s request refused: argument too long',
  50. rpcname, frontend, backend)
  51. if not untrusted_argument:
  52. die('%s: %s → %s request refused: empty argument',
  53. rpcname, frontend, backend)
  54. if any(c not in VALID_CHARS for c in untrusted_argument):
  55. die('%s: %s → %s request refused: invalid argument',
  56. rpcname, frontend, backend)
  57. # argument may also be too long, so that length of rpcname, separator and
  58. # argument exceed 64 bytes, but that's fine, the call just wont work
  59. argument = untrusted_argument
  60. del untrusted_argument
  61. argument = argument.decode('ascii')
  62. filename = '{}+{}'.format(rpcname, argument)
  63. logging.debug('%s %s → %s argument %s filename %s',
  64. rpcname, frontend, backend, argument, filename)
  65. try:
  66. # the 'x' is critical
  67. with open(str(POLICY_PATH / filename), 'x') as file:
  68. rule = POLICY_RULE.format(frontend=frontend, backend=backend)
  69. logging.warning('%s: %s → %s %s argument allowed',
  70. rpcname, frontend, backend, argument)
  71. logging.debug('%s: %s → %s %s adding rule %r',
  72. rpcname, frontend, backend, rule)
  73. file.write(rule)
  74. except FileExistsError:
  75. die('%s: %s → %s %s argument failed: file exists')
  76. if __name__ == '__main__':
  77. main()