b28ddb6621
This allows to perform actions on objects (VM, storage etc), without listing them. This is useful when calling VM have minimal permissions and only selected actions are allowed. This means that app.domains['some-name'] will not raise KeyError, even when domain do not exists. But performing actual action (like vm.start()) will fail in that case.
314 lines
11 KiB
Python
314 lines
11 KiB
Python
# -*- encoding: utf8 -*-
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2017 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License along
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
'''Base classes for managed objects'''
|
|
|
|
import ast
|
|
import qubesadmin.exc
|
|
|
|
DEFAULT = object()
|
|
|
|
|
|
class PropertyHolder(object):
|
|
'''A base class for object having properties retrievable using mgmt API.
|
|
|
|
Warning: each (non-private) local attribute needs to be defined at class
|
|
level, even if initialized in __init__; otherwise will be treated as
|
|
property retrievable using mgmt call.
|
|
'''
|
|
#: a place for appropriate Qubes() object (QubesLocal or QubesRemote),
|
|
# use None for self
|
|
app = None
|
|
|
|
def __init__(self, app, method_prefix, method_dest):
|
|
#: appropriate Qubes() object (QubesLocal or QubesRemote), use None
|
|
# for self
|
|
self.app = app
|
|
self._method_prefix = method_prefix
|
|
self._method_dest = method_dest
|
|
self._properties = None
|
|
self._properties_help = None
|
|
|
|
def qubesd_call(self, dest, method, arg=None, payload=None,
|
|
payload_stream=None):
|
|
'''
|
|
Call into qubesd using appropriate mechanism. This method should be
|
|
defined by a subclass.
|
|
|
|
Only one of `payload` and `payload_stream` can be specified.
|
|
|
|
:param dest: Destination VM name
|
|
:param method: Full API method name ('admin...')
|
|
:param arg: Method argument (if any)
|
|
:param payload: Payload send to the method
|
|
:param payload_stream: file-like object to read payload from
|
|
:return: Data returned by qubesd (string)
|
|
'''
|
|
if dest is None:
|
|
dest = self._method_dest
|
|
# have the actual implementation at Qubes() instance
|
|
if self.app:
|
|
return self.app.qubesd_call(dest, method, arg, payload,
|
|
payload_stream)
|
|
raise NotImplementedError
|
|
|
|
@staticmethod
|
|
def _parse_qubesd_response(response_data):
|
|
'''Parse response from qubesd.
|
|
|
|
In case of success, return actual data. In case of error,
|
|
raise appropriate exception.
|
|
'''
|
|
|
|
if response_data == b'':
|
|
raise qubesadmin.exc.QubesDaemonNoResponseError(
|
|
'Got empty response from qubesd. See journalctl in dom0 for '
|
|
'details.')
|
|
|
|
if response_data[0:2] == b'\x30\x00':
|
|
return response_data[2:]
|
|
elif response_data[0:2] == b'\x32\x00':
|
|
(_, exc_type, _traceback, format_string, args) = \
|
|
response_data.split(b'\x00', 4)
|
|
# drop last field because of terminating '\x00'
|
|
args = [arg.decode() for arg in args.split(b'\x00')[:-1]]
|
|
format_string = format_string.decode('utf-8')
|
|
exc_type = exc_type.decode('ascii')
|
|
try:
|
|
exc_class = getattr(qubesadmin.exc, exc_type)
|
|
except AttributeError:
|
|
if exc_type.endswith('Error'):
|
|
exc_class = __builtins__.get(exc_type,
|
|
qubesadmin.exc.QubesException)
|
|
else:
|
|
exc_class = qubesadmin.exc.QubesException
|
|
# TODO: handle traceback if given
|
|
raise exc_class(format_string, *args)
|
|
else:
|
|
raise qubesadmin.exc.QubesDaemonCommunicationError(
|
|
'Invalid response format')
|
|
|
|
def property_list(self):
|
|
'''
|
|
List available properties (their names).
|
|
|
|
:return: list of strings
|
|
'''
|
|
if self._properties is None:
|
|
properties_str = self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'List',
|
|
None,
|
|
None)
|
|
self._properties = properties_str.decode('ascii').splitlines()
|
|
# TODO: make it somehow immutable
|
|
return self._properties
|
|
|
|
def property_help(self, name):
|
|
'''
|
|
Get description of a property.
|
|
|
|
:return: property help text
|
|
'''
|
|
help_text = self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'Help',
|
|
name,
|
|
None)
|
|
return help_text.decode('ascii')
|
|
|
|
def property_is_default(self, item):
|
|
'''
|
|
Check if given property have default value
|
|
|
|
:param str item: name of property
|
|
:return: bool
|
|
'''
|
|
if item.startswith('_'):
|
|
raise AttributeError(item)
|
|
property_str = self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'Get',
|
|
item,
|
|
None)
|
|
(default, _value) = property_str.split(b' ', 1)
|
|
assert default.startswith(b'default=')
|
|
is_default_str = default.split(b'=')[1]
|
|
is_default = ast.literal_eval(is_default_str.decode('ascii'))
|
|
assert isinstance(is_default, bool)
|
|
return is_default
|
|
|
|
def clone_properties(self, src, proplist=None):
|
|
'''Clone properties from other object.
|
|
|
|
:param PropertyHolder src: source object
|
|
:param list proplist: list of properties \
|
|
(:py:obj:`None` or omit for all properties)
|
|
'''
|
|
|
|
if proplist is None:
|
|
proplist = self.property_list()
|
|
|
|
for prop in proplist:
|
|
try:
|
|
setattr(self, prop, getattr(src, prop))
|
|
except AttributeError:
|
|
continue
|
|
|
|
def __getattr__(self, item):
|
|
# pylint: disable=too-many-return-statements
|
|
if item.startswith('_'):
|
|
raise AttributeError(item)
|
|
try:
|
|
property_str = self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'Get',
|
|
item,
|
|
None)
|
|
except qubesadmin.exc.QubesDaemonNoResponseError:
|
|
raise qubesadmin.exc.QubesPropertyAccessError(item)
|
|
(_default, prop_type, value) = property_str.split(b' ', 2)
|
|
prop_type = prop_type.decode('ascii')
|
|
if not prop_type.startswith('type='):
|
|
raise qubesadmin.exc.QubesDaemonCommunicationError(
|
|
'Invalid type prefix received: {}'.format(prop_type))
|
|
(_, prop_type) = prop_type.split('=', 1)
|
|
value = value.decode()
|
|
if prop_type == 'str':
|
|
return str(value)
|
|
elif prop_type == 'bool':
|
|
if value == '':
|
|
raise AttributeError
|
|
return ast.literal_eval(value)
|
|
elif prop_type == 'int':
|
|
if value == '':
|
|
raise AttributeError
|
|
return ast.literal_eval(value)
|
|
elif prop_type == 'vm':
|
|
if value == '':
|
|
return None
|
|
return self.app.domains[value]
|
|
elif prop_type == 'label':
|
|
if value == '':
|
|
return None
|
|
# TODO
|
|
return self.app.labels[value]
|
|
else:
|
|
raise qubesadmin.exc.QubesDaemonCommunicationError(
|
|
'Received invalid value type: {}'.format(prop_type))
|
|
|
|
def __setattr__(self, key, value):
|
|
if key.startswith('_') or key in dir(self):
|
|
return super(PropertyHolder, self).__setattr__(key, value)
|
|
if value is qubesadmin.DEFAULT:
|
|
try:
|
|
self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'Reset',
|
|
key,
|
|
None)
|
|
except qubesadmin.exc.QubesDaemonNoResponseError:
|
|
raise qubesadmin.exc.QubesPropertyAccessError(key)
|
|
else:
|
|
if isinstance(value, qubesadmin.vm.QubesVM):
|
|
value = value.name
|
|
if value is None:
|
|
value = ''
|
|
try:
|
|
self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'Set',
|
|
key,
|
|
str(value).encode('utf-8'))
|
|
except qubesadmin.exc.QubesDaemonNoResponseError:
|
|
raise qubesadmin.exc.QubesPropertyAccessError(key)
|
|
|
|
def __delattr__(self, name):
|
|
if name.startswith('_') or name in dir(self):
|
|
return super(PropertyHolder, self).__delattr__(name)
|
|
try:
|
|
self.qubesd_call(
|
|
self._method_dest,
|
|
self._method_prefix + 'Reset',
|
|
name
|
|
)
|
|
except qubesadmin.exc.QubesDaemonNoResponseError:
|
|
raise qubesadmin.exc.QubesPropertyAccessError(name)
|
|
|
|
|
|
class WrapperObjectsCollection(object):
|
|
'''Collection of simple named objects'''
|
|
def __init__(self, app, list_method, object_class):
|
|
'''
|
|
Construct manager of named wrapper objects.
|
|
|
|
:param app: Qubes() object
|
|
:param list_method: name of API method used to list objects,
|
|
must return simple "one name per line" list
|
|
:param object_class: object class (callable) for wrapper objects,
|
|
will be called with just two arguments: app and a name
|
|
'''
|
|
self.app = app
|
|
self._list_method = list_method
|
|
self._object_class = object_class
|
|
#: names cache
|
|
self._names_list = None
|
|
#: returned objects cache
|
|
self._objects = {}
|
|
|
|
def clear_cache(self):
|
|
'''Clear cached list of names'''
|
|
self._names_list = None
|
|
|
|
def refresh_cache(self, force=False):
|
|
'''Refresh cached list of names'''
|
|
if not force and self._names_list is not None:
|
|
return
|
|
list_data = self.app.qubesd_call('dom0', self._list_method)
|
|
list_data = list_data.decode('ascii')
|
|
assert list_data[-1] == '\n'
|
|
self._names_list = [str(name) for name in list_data[:-1].splitlines()]
|
|
|
|
for name, obj in list(self._objects.items()):
|
|
if obj.name not in self._names_list:
|
|
# Object no longer exists
|
|
del self._objects[name]
|
|
|
|
def __getitem__(self, item):
|
|
if not self.app.blind_mode and item not in self:
|
|
raise KeyError(item)
|
|
if item not in self._objects:
|
|
self._objects[item] = self._object_class(self.app, item)
|
|
return self._objects[item]
|
|
|
|
def __contains__(self, item):
|
|
self.refresh_cache()
|
|
return item in self._names_list
|
|
|
|
def __iter__(self):
|
|
self.refresh_cache()
|
|
for obj in self._names_list:
|
|
yield self[obj]
|
|
|
|
def keys(self):
|
|
'''Get list of names.'''
|
|
self.refresh_cache()
|
|
return self._names_list.copy()
|