|
@@ -0,0 +1,95 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+#
|
|
|
+# The Qubes OS Project, https://www.qubes-os.org/
|
|
|
+#
|
|
|
+# Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
|
|
|
+#
|
|
|
+# This library is free software; you can redistribute it and/or
|
|
|
+# modify it under the terms of the GNU Lesser General Public
|
|
|
+# License as published by the Free Software Foundation; either
|
|
|
+# version 2.1 of the License, or (at your option) any later version.
|
|
|
+#
|
|
|
+# This library is distributed in the hope that it will be useful,
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
+# Lesser General Public License for more details.
|
|
|
+#
|
|
|
+# You should have received a copy of the GNU Lesser General Public
|
|
|
+# License along with this library; if not, see <https://www.gnu.org/licenses/>.
|
|
|
+#
|
|
|
+
|
|
|
+import logging
|
|
|
+import os
|
|
|
+import string
|
|
|
+import sys
|
|
|
+import pathlib
|
|
|
+
|
|
|
+POLICY_PATH = pathlib.Path('/etc/qubes-rpc/policy')
|
|
|
+POLICY_RULE = '{frontend} {backend} allow\n'
|
|
|
+
|
|
|
+# linux-utils/qrexec-lib/qrexec.h
|
|
|
+MAX_ARGUMENT_LEN = 64
|
|
|
+
|
|
|
+# core-admin-linux/qrexec/qrexec-daemon.c
|
|
|
+VALID_CHARS = set(map(ord, string.ascii_letters + string.digits + '-._'))
|
|
|
+
|
|
|
+def die(*args, **kwargs):
|
|
|
+ logging.error(*args, **kwargs)
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+def main():
|
|
|
+ # pylint: disable=missing-docstring
|
|
|
+ logging.basicConfig(
|
|
|
+ level=logging.WARNING,
|
|
|
+ filename='/var/log/qubes/policy-register.log',
|
|
|
+ format='%(asctime)s %(message)s')
|
|
|
+
|
|
|
+ backend = os.environ['QREXEC_REMOTE_DOMAIN']
|
|
|
+ frontend = os.environ['QREXEC_REQUESTED_TARGET']
|
|
|
+ rpcname = os.environ['QREXEC_SERVICE_ARGUMENT']
|
|
|
+
|
|
|
+ logging.debug('%s %s → %s request, reading argument',
|
|
|
+ rpcname, frontend, backend)
|
|
|
+
|
|
|
+ untrusted_argument = sys.stdin.buffer.read(MAX_ARGUMENT_LEN)
|
|
|
+ untrusted_overflow = sys.stdin.buffer.read(1)
|
|
|
+ sys.stdin.buffer.close()
|
|
|
+
|
|
|
+ if untrusted_overflow:
|
|
|
+ die('%s: %s → %s request refused: argument too long',
|
|
|
+ rpcname, frontend, backend)
|
|
|
+
|
|
|
+ if not untrusted_argument:
|
|
|
+ die('%s: %s → %s request refused: empty argument',
|
|
|
+ rpcname, frontend, backend)
|
|
|
+
|
|
|
+ if any(c not in VALID_CHARS for c in untrusted_argument):
|
|
|
+ die('%s: %s → %s request refused: invalid argument',
|
|
|
+ rpcname, frontend, backend)
|
|
|
+
|
|
|
+ # argument may also be too long, so that length of rpcname, separator and
|
|
|
+ # argument exceed 64 bytes, but that's fine, the call just wont work
|
|
|
+
|
|
|
+ argument = untrusted_argument
|
|
|
+ del untrusted_argument
|
|
|
+ argument = argument.decode('ascii')
|
|
|
+
|
|
|
+ filename = '{}+{}'.format(rpcname, argument)
|
|
|
+ logging.debug('%s %s → %s argument %s filename %s',
|
|
|
+ rpcname, frontend, backend, argument, filename)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # the 'x' is critical
|
|
|
+ with open(str(POLICY_PATH / filename), 'x') as file:
|
|
|
+ rule = POLICY_RULE.format(frontend=frontend, backend=backend)
|
|
|
+ logging.warning('%s: %s → %s %s argument allowed',
|
|
|
+ rpcname, frontend, backend, argument)
|
|
|
+ logging.debug('%s: %s → %s %s adding rule %r',
|
|
|
+ rpcname, frontend, backend, rule)
|
|
|
+ file.write(rule)
|
|
|
+
|
|
|
+ except FileExistsError:
|
|
|
+ die('%s: %s → %s %s argument failed: file exists')
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|