1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- #!/usr/bin/env python3
- #
- # The Qubes OS Project, https://www.qubes-os.org/
- #
- # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
- #
- # This library is free software; you can redistribute it and/or
- # modify it under the terms of the GNU Lesser General Public
- # License as published by the Free Software Foundation; either
- # version 2.1 of the License, or (at your option) any later version.
- #
- # This library is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public
- # License along with this library; if not, see <https://www.gnu.org/licenses/>.
- #
- import logging
- import os
- import string
- import sys
- import pathlib
- POLICY_PATH = pathlib.Path('/etc/qubes-rpc/policy')
- POLICY_RULE = '{frontend} {backend} allow\n'
- # linux-utils/qrexec-lib/qrexec.h
- MAX_ARGUMENT_LEN = 64
- # core-admin-linux/qrexec/qrexec-daemon.c
- VALID_CHARS = set(map(ord, string.ascii_letters + string.digits + '-._'))
- def die(*args, **kwargs):
- logging.error(*args, **kwargs)
- sys.exit(1)
- def main():
- # pylint: disable=missing-docstring
- logging.basicConfig(
- level=logging.WARNING,
- filename='/var/log/qubes/policy-register.log',
- format='%(asctime)s %(message)s')
- backend = os.environ['QREXEC_REMOTE_DOMAIN']
- frontend = os.environ['QREXEC_REQUESTED_TARGET']
- rpcname = os.environ['QREXEC_SERVICE_ARGUMENT']
- logging.debug('%s %s → %s request, reading argument',
- rpcname, frontend, backend)
- untrusted_argument = sys.stdin.buffer.read(MAX_ARGUMENT_LEN)
- untrusted_overflow = sys.stdin.buffer.read(1)
- sys.stdin.buffer.close()
- if untrusted_overflow:
- die('%s: %s → %s request refused: argument too long',
- rpcname, frontend, backend)
- if not untrusted_argument:
- die('%s: %s → %s request refused: empty argument',
- rpcname, frontend, backend)
- if any(c not in VALID_CHARS for c in untrusted_argument):
- die('%s: %s → %s request refused: invalid argument',
- rpcname, frontend, backend)
- # argument may also be too long, so that length of rpcname, separator and
- # argument exceed 64 bytes, but that's fine, the call just wont work
- argument = untrusted_argument
- del untrusted_argument
- argument = argument.decode('ascii')
- filename = '{}+{}'.format(rpcname, argument)
- logging.debug('%s %s → %s argument %s filename %s',
- rpcname, frontend, backend, argument, filename)
- try:
- # the 'x' is critical
- with open(str(POLICY_PATH / filename), 'x') as file:
- rule = POLICY_RULE.format(frontend=frontend, backend=backend)
- logging.warning('%s: %s → %s %s argument allowed',
- rpcname, frontend, backend, argument)
- logging.debug('%s: %s → %s %s adding rule %r',
- rpcname, frontend, backend, rule)
- file.write(rule)
- except FileExistsError:
- die('%s: %s → %s %s argument failed: file exists')
- if __name__ == '__main__':
- main()
|