core-admin/qubes-rpc/policy.RegisterArgument
Wojtek Porczyk bdaf92f9dc qubes-rpc/policy.RegisterArgument: readability fixes
Thanks, @rootkovska and @marmarek.
2017-11-30 16:05:06 +01:00

128 lines
4.5 KiB
Python
Executable File

#!/usr/bin/env python3
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
#
'''policy.RegisterArgument
This qrexec is meant for services, which require some kind of "registering"
before use (say ``example.Register`` and ``example.Perform+ARGUMENT``). After
registering, the backend should invoke this call with frontend as the intended
destination, with the actual service in argument of this call and the argument
as the payload. The policy generated will be a single line with explicit
frontend and backend domain names, and a plain "allow", without further
qualifiers.
The call allows for registering an argument only once, for one frontend domain.
There is not possibility of deregistering or reregistering for another frontend.
The backend can always register another argument for any frontend, including
one that is already registered for some other argument.
By default this qrexec is disabled by policy. To actually use it you should
drop a policy for an exact call you want to register which will redirect the
call to dom0.
.. code-block:: none
:caption: /etc/qubes-rpc/policy/policy.RegisterArgument+example.Perform
backendvm $anyvm allow,target=dom0
It will generate, for argument ``EXAMPLE``:
.. code-bloc:: none
:caption: /etc/qubes-rpc/policy/example.Perform+EXAMPLE
frontendvm backendvm allow
'''
import logging
import os
import string
import sys
import pathlib
POLICY_PATH = pathlib.Path('/etc/qubes-rpc/policy')
POLICY_RULE = '{frontend} {backend} allow\n'
# linux-utils/qrexec-lib/qrexec.h
MAX_ARGUMENT_LEN = 64
# core-admin-linux/qrexec/qrexec-daemon.c
VALID_CHARS = set(map(ord, string.ascii_letters + string.digits + '-._'))
def die(*args, **kwargs):
logging.error(*args, **kwargs)
sys.exit(1)
def main():
# pylint: disable=missing-docstring
logging.basicConfig(
level=logging.WARNING,
filename='/var/log/qubes/policy-register.log',
format='%(asctime)s %(message)s')
backend = os.environ['QREXEC_REMOTE_DOMAIN']
frontend = os.environ['QREXEC_REQUESTED_TARGET']
rpcname = os.environ['QREXEC_SERVICE_ARGUMENT']
logging.debug('%s %s%s request, reading argument',
rpcname, frontend, backend)
untrusted_argument = sys.stdin.buffer.read(MAX_ARGUMENT_LEN)
untrusted_overflow = sys.stdin.buffer.read(1)
sys.stdin.buffer.close()
if untrusted_overflow:
die('%s: %s%s request refused: argument too long',
rpcname, frontend, backend)
if not untrusted_argument:
die('%s: %s%s request refused: empty argument',
rpcname, frontend, backend)
if any(c not in VALID_CHARS for c in untrusted_argument):
die('%s: %s%s request refused: invalid argument',
rpcname, frontend, backend)
# argument may also be too long, so that length of rpcname, separator and
# argument exceed 64 bytes, but that's fine, the call just wont work
argument = untrusted_argument
del untrusted_argument
argument = argument.decode('ascii', errors='strict')
filename = '{}+{}'.format(rpcname, argument)
logging.debug('%s %s%s argument %s filename %s',
rpcname, frontend, backend, argument, filename)
try:
# the 'x' enforces that argument cannot be registered twice
with open(str(POLICY_PATH / filename), 'x') as file:
rule = POLICY_RULE.format(frontend=frontend, backend=backend)
logging.warning('%s: %s%s %s argument allowed',
rpcname, frontend, backend, argument)
logging.debug('%s: %s%s %s adding rule %r',
rpcname, frontend, backend, rule)
file.write(rule)
except FileExistsError:
die('%s: %s%s %s argument failed: file exists')
if __name__ == '__main__':
main()