qubes/mgmt: move mgmt api to separate module

QubesOS/qubes-issues#2622
This commit is contained in:
Wojtek Porczyk 2017-02-08 18:44:08 +01:00
parent 5d455ac3c4
commit c12fc744a2
3 changed files with 162 additions and 132 deletions

157
qubes/mgmt.py Normal file
View File

@ -0,0 +1,157 @@
#
# The Qubes OS Project, https://www.qubes-os.org/
#
# Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
'''
Qubes OS Management API
'''
import reprlib
import types
import qubes.vm.qubesvm
class ProtocolRepr(reprlib.Repr):
def repr1(self, x, level):
if isinstance(x, qubes.vm.qubesvm.QubesVM):
x = x.name
return super().repr1(x, level)
# pylint: disable=invalid-name
def repr_str(self, x, level):
'''Warning: this is incompatible with python 3 wrt to b'' '''
return "'{}'".format(''.join(
chr(c)
if 0x20 < c < 0x7f and c not in (ord("'"), ord('\\'))
else '\\x{:02x}'.format(c)
for c in x.encode()))
def repr_Label(self, x, level):
return self.repr1(x.name, level)
class ProtocolError(AssertionError):
'''Raised when something is wrong with data received'''
pass
class PermissionDenied(Exception):
'''Raised deliberately by handlers when we decide not to cooperate'''
pass
def not_in_api(func):
func.not_in_api = True
return func
class QubesMgmt(object):
def __init__(self, app, src, method, dest, arg):
self.app = app
self.src = self.app.domains[src.decode('ascii')]
self.dest = self.app.domains[dest.decode('ascii')]
self.arg = arg.decode('ascii')
self.prepr = ProtocolRepr()
self.method = method.decode('ascii')
untrusted_func_name = self.method
if untrusted_func_name.startswith('mgmt.'):
untrusted_func_name = untrusted_func_name[5:]
untrusted_func_name = untrusted_func_name.lower().replace('.', '_')
if untrusted_func_name.startswith('_') \
or not '_' in untrusted_func_name:
raise ProtocolError(
'possibly malicious function name: {!r}'.format(
untrusted_func_name))
try:
untrusted_func = getattr(self, untrusted_func_name)
except AttributeError:
raise ProtocolError(
'no such attribute: {!r}'.format(
untrusted_func_name))
if not isinstance(untrusted_func, types.MethodType):
raise ProtocolError(
'no such method: {!r}'.format(
untrusted_func_name))
if getattr(untrusted_func, 'not_in_api', False):
raise ProtocolError(
'attempt to call private method: {!r}'.format(
untrusted_func_name))
self.execute = untrusted_func
del untrusted_func_name
del untrusted_func
#
# PRIVATE METHODS, not to be called via RPC
#
@not_in_api
def fire_event_for_permission(self, *args, **kwargs):
return self.src.fire_event_pre('mgmt-permission:{}'.format(self.method),
self.dest, self.arg, *args, **kwargs)
@not_in_api
def repr(self, *args, **kwargs):
return self.prepr.repr(*args, **kwargs)
#
# ACTUAL RPC CALLS
#
def vm_list(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert not self.arg
assert not untrusted_payload
del untrusted_payload
domains = self.app.domains
for selector in self.fire_event_for_permission():
domains = filter(selector, domains)
return ''.join('{} class={} state={}\n'.format(
self.repr(vm),
vm.__class__.__name__,
vm.get_power_state())
for vm in sorted(domains))
def vm_property_get(self, untrusted_payload):
assert self.arg in self.dest.property_list()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
try:
value = getattr(self.dest, self.arg)
except AttributeError:
return 'default=True '
else:
return 'default={} {}'.format(
str(self.dest.property_is_default(self.arg)),
self.repr(value))

View File

@ -4,145 +4,17 @@ import asyncio
import functools
import io
import os
import reprlib
import signal
import types
import qubes
import qubes.libvirtaio
import qubes.mgmt
import qubes.utils
import qubes.vm.qubesvm
QUBESD_SOCK = '/var/run/qubesd.sock'
class ProtocolRepr(reprlib.Repr):
def repr1(self, x, level):
if isinstance(x, qubes.vm.qubesvm.QubesVM):
x = x.name
return super().repr1(x, level)
# pylint: disable=invalid-name
def repr_str(self, x, level):
'''Warning: this is incompatible with python 3 wrt to b'' '''
return "'{}'".format(''.join(
chr(c)
if 0x20 < c < 0x7f and c not in (ord("'"), ord('\\'))
else '\\x{:02x}'.format(c)
for c in x.encode()))
def repr_Label(self, x, level):
return self.repr1(x.name, level)
class ProtocolError(AssertionError):
'''Raised when something is wrong with data received'''
pass
class PermissionDenied(Exception):
'''Raised deliberately by handlers when we decide not to cooperate'''
pass
def not_in_api(func):
func.not_in_api = True
return func
class QubesMgmt(object):
def __init__(self, app, src, method, dest, arg):
self.app = app
self.src = self.app.domains[src.decode('ascii')]
self.dest = self.app.domains[dest.decode('ascii')]
self.arg = arg.decode('ascii')
self.prepr = ProtocolRepr()
self.method = method.decode('ascii')
untrusted_func_name = self.method
if untrusted_func_name.startswith('mgmt.'):
untrusted_func_name = untrusted_func_name[5:]
untrusted_func_name = untrusted_func_name.lower().replace('.', '_')
if untrusted_func_name.startswith('_') \
or not '_' in untrusted_func_name:
raise ProtocolError(
'possibly malicious function name: {!r}'.format(
untrusted_func_name))
try:
untrusted_func = getattr(self, untrusted_func_name)
except AttributeError:
raise ProtocolError(
'no such attribute: {!r}'.format(
untrusted_func_name))
if not isinstance(untrusted_func, types.MethodType):
raise ProtocolError(
'no such method: {!r}'.format(
untrusted_func_name))
if getattr(untrusted_func, 'not_in_api', False):
raise ProtocolError(
'attempt to call private method: {!r}'.format(
untrusted_func_name))
self.execute = untrusted_func
del untrusted_func_name
del untrusted_func
#
# PRIVATE METHODS, not to be called via RPC
#
@not_in_api
def fire_event_for_permission(self, *args, **kwargs):
return self.src.fire_event_pre('mgmt-permission:{}'.format(self.method),
self.dest, self.arg, *args, **kwargs)
@not_in_api
def repr(self, *args, **kwargs):
return self.prepr.repr(*args, **kwargs)
#
# ACTUAL RPC CALLS
#
def vm_list(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert not self.arg
assert not untrusted_payload
del untrusted_payload
domains = self.app.domains
for selector in self.fire_event_for_permission():
domains = filter(selector, domains)
return ''.join('{} class={} state={}\n'.format(
self.repr(vm),
vm.__class__.__name__,
vm.get_power_state())
for vm in sorted(domains))
def vm_property_get(self, untrusted_payload):
assert self.arg in self.dest.property_list()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
try:
value = getattr(self.dest, self.arg)
except AttributeError:
return 'default=True '
else:
return 'default={} {}'.format(
str(self.dest.property_is_default(self.arg)),
self.repr(value))
class QubesDaemonProtocol(asyncio.Protocol):
buffer_size = 65536
@ -181,12 +53,12 @@ class QubesDaemonProtocol(asyncio.Protocol):
return
try:
mgmt = QubesMgmt(self.app, src, method, dest, arg)
mgmt = qubes.mgmt.QubesMgmt(self.app, src, method, dest, arg)
response = mgmt.execute(untrusted_payload=untrusted_payload)
except PermissionDenied as err:
except qubes.mgmt.PermissionDenied as err:
# TODO logging
return
except ProtocolError as err:
except qubes.mgmt.ProtocolError as err:
# TODO logging
print(repr(err))
return

View File

@ -228,6 +228,7 @@ fi
%{python3_sitelib}/qubes/firewall.py
%{python3_sitelib}/qubes/libvirtaio.py
%{python3_sitelib}/qubes/log.py
%{python3_sitelib}/qubes/mgmt.py
%{python3_sitelib}/qubes/rngdoc.py
%{python3_sitelib}/qubes/tarwriter.py
%{python3_sitelib}/qubes/utils.py