core-admin-client/qubesadmin/base.py

374 lines
13 KiB
Python
Raw Normal View History

# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
'''Base classes for managed objects'''
import qubesadmin.exc
DEFAULT = object()
class PropertyHolder(object):
'''A base class for object having properties retrievable using mgmt API.
Warning: each (non-private) local attribute needs to be defined at class
level, even if initialized in __init__; otherwise will be treated as
property retrievable using mgmt call.
'''
#: a place for appropriate Qubes() object (QubesLocal or QubesRemote),
# use None for self
app = None
def __init__(self, app, method_prefix, method_dest):
#: appropriate Qubes() object (QubesLocal or QubesRemote), use None
# for self
self.app = app
self._method_prefix = method_prefix
self._method_dest = method_dest
self._properties = None
self._properties_help = None
def qubesd_call(self, dest, method, arg=None, payload=None,
payload_stream=None):
'''
Call into qubesd using appropriate mechanism. This method should be
defined by a subclass.
Only one of `payload` and `payload_stream` can be specified.
:param dest: Destination VM name
:param method: Full API method name ('admin...')
:param arg: Method argument (if any)
:param payload: Payload send to the method
:param payload_stream: file-like object to read payload from
:return: Data returned by qubesd (string)
'''
if dest is None:
dest = self._method_dest
# have the actual implementation at Qubes() instance
if self.app:
return self.app.qubesd_call(dest, method, arg, payload,
payload_stream)
raise NotImplementedError
@staticmethod
def _parse_qubesd_response(response_data):
'''Parse response from qubesd.
In case of success, return actual data. In case of error,
raise appropriate exception.
'''
2017-04-21 02:47:23 +02:00
if response_data == b'':
raise qubesadmin.exc.QubesDaemonNoResponseError(
'Got empty response from qubesd. See journalctl in dom0 for '
'details.')
if response_data[0:2] == b'\x30\x00':
return response_data[2:]
if response_data[0:2] == b'\x32\x00':
(_, exc_type, _traceback, format_string, args) = \
response_data.split(b'\x00', 4)
# drop last field because of terminating '\x00'
args = [arg.decode() for arg in args.split(b'\x00')[:-1]]
format_string = format_string.decode('utf-8')
exc_type = exc_type.decode('ascii')
try:
exc_class = getattr(qubesadmin.exc, exc_type)
except AttributeError:
if exc_type.endswith('Error'):
exc_class = __builtins__.get(exc_type,
qubesadmin.exc.QubesException)
else:
exc_class = qubesadmin.exc.QubesException
# TODO: handle traceback if given
raise exc_class(format_string, *args)
else:
raise qubesadmin.exc.QubesDaemonCommunicationError(
'Invalid response format')
def property_list(self):
'''
List available properties (their names).
:return: list of strings
'''
if self._properties is None:
properties_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'List',
None,
None)
self._properties = properties_str.decode('ascii').splitlines()
# TODO: make it somehow immutable
return self._properties
2017-03-11 01:45:57 +01:00
def property_help(self, name):
'''
Get description of a property.
:return: property help text
'''
help_text = self.qubesd_call(
self._method_dest,
self._method_prefix + 'Help',
name,
None)
return help_text.decode('ascii')
def property_is_default(self, item):
'''
Check if given property have default value
:param str item: name of property
:return: bool
'''
if item.startswith('_'):
raise AttributeError(item)
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'Get',
item,
None)
(default, _value) = property_str.split(b' ', 1)
assert default.startswith(b'default=')
is_default_str = default.split(b'=')[1]
is_default = is_default_str.decode('ascii') == "True"
assert isinstance(is_default, bool)
return is_default
def property_get_default(self, item):
'''
Get default property value, regardless of the current value
:param str item: name of property
:return: default value
'''
if item.startswith('_'):
raise AttributeError(item)
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'GetDefault',
item,
None)
if not property_str:
raise AttributeError(item + ' has no default')
(prop_type, value) = property_str.split(b' ', 1)
return self._parse_type_value(prop_type, value)
def clone_properties(self, src, proplist=None):
'''Clone properties from other object.
:param PropertyHolder src: source object
:param list proplist: list of properties \
(:py:obj:`None` or omit for all properties)
'''
if proplist is None:
proplist = self.property_list()
for prop in proplist:
try:
setattr(self, prop, getattr(src, prop))
except AttributeError:
continue
def __getattr__(self, item):
if item.startswith('_'):
raise AttributeError(item)
try:
property_str = self.qubesd_call(
self._method_dest,
self._method_prefix + 'Get',
item,
None)
except qubesadmin.exc.QubesDaemonNoResponseError:
raise qubesadmin.exc.QubesPropertyAccessError(item)
(_default, prop_type, value) = property_str.split(b' ', 2)
return self._parse_type_value(prop_type, value)
def _parse_type_value(self, prop_type, value):
'''
Parse `type=... ...` qubesd response format. Return a value of
appropriate type.
:param bytes prop_type: 'type=...' part of the response (including
`type=` prefix)
:param bytes value: 'value' part of the response
:return: parsed value
'''
# pylint: disable=too-many-return-statements
prop_type = prop_type.decode('ascii')
if not prop_type.startswith('type='):
raise qubesadmin.exc.QubesDaemonCommunicationError(
'Invalid type prefix received: {}'.format(prop_type))
(_, prop_type) = prop_type.split('=', 1)
value = value.decode()
if prop_type == 'str':
return str(value)
if prop_type == 'bool':
if value == '':
raise AttributeError
return value == "True"
if prop_type == 'int':
if value == '':
raise AttributeError
return int(value)
if prop_type == 'vm':
if value == '':
return None
return self.app.domains[value]
if prop_type == 'label':
if value == '':
return None
return self.app.labels.get_blind(value)
raise qubesadmin.exc.QubesDaemonCommunicationError(
'Received invalid value type: {}'.format(prop_type))
@classmethod
def _local_properties(cls):
'''
Get set of property names that are properties on the Python object,
and must not be set on the remote object
'''
if "_local_properties_set" not in cls.__dict__:
props = set()
for class_ in cls.__mro__:
for key in class_.__dict__:
props.add(key)
cls._local_properties_set = props
return cls._local_properties_set
def __setattr__(self, key, value):
if key.startswith('_') or key in self._local_properties():
return super(PropertyHolder, self).__setattr__(key, value)
if value is qubesadmin.DEFAULT:
try:
self.qubesd_call(
self._method_dest,
self._method_prefix + 'Reset',
key,
None)
except qubesadmin.exc.QubesDaemonNoResponseError:
raise qubesadmin.exc.QubesPropertyAccessError(key)
else:
if isinstance(value, qubesadmin.vm.QubesVM):
value = value.name
if value is None:
value = ''
try:
self.qubesd_call(
self._method_dest,
self._method_prefix + 'Set',
key,
str(value).encode('utf-8'))
except qubesadmin.exc.QubesDaemonNoResponseError:
raise qubesadmin.exc.QubesPropertyAccessError(key)
def __delattr__(self, name):
if name.startswith('_') or name in self._local_properties():
return super(PropertyHolder, self).__delattr__(name)
try:
self.qubesd_call(
self._method_dest,
self._method_prefix + 'Reset',
name
)
except qubesadmin.exc.QubesDaemonNoResponseError:
raise qubesadmin.exc.QubesPropertyAccessError(name)
class WrapperObjectsCollection(object):
'''Collection of simple named objects'''
def __init__(self, app, list_method, object_class):
'''
Construct manager of named wrapper objects.
:param app: Qubes() object
:param list_method: name of API method used to list objects,
must return simple "one name per line" list
:param object_class: object class (callable) for wrapper objects,
will be called with just two arguments: app and a name
'''
self.app = app
self._list_method = list_method
self._object_class = object_class
#: names cache
self._names_list = None
#: returned objects cache
self._objects = {}
def clear_cache(self):
'''Clear cached list of names'''
self._names_list = None
def refresh_cache(self, force=False):
'''Refresh cached list of names'''
if not force and self._names_list is not None:
return
list_data = self.app.qubesd_call('dom0', self._list_method)
list_data = list_data.decode('ascii')
assert list_data[-1] == '\n'
self._names_list = [str(name) for name in list_data[:-1].splitlines()]
for name, obj in list(self._objects.items()):
if obj.name not in self._names_list:
# Object no longer exists
del self._objects[name]
def __getitem__(self, item):
if not self.app.blind_mode and item not in self:
raise KeyError(item)
return self.get_blind(item)
def get_blind(self, item):
'''
Get a property without downloading the list
and checking if it's present
'''
if item not in self._objects:
self._objects[item] = self._object_class(self.app, item)
return self._objects[item]
def __contains__(self, item):
self.refresh_cache()
return item in self._names_list
def __iter__(self):
self.refresh_cache()
for key in self._names_list:
yield key
def keys(self):
'''Get list of names.'''
self.refresh_cache()
return [key for key in self._names_list]
def items(self):
'''Get list of (key, value) pairs'''
self.refresh_cache()
return [(key, self.get_blind(key)) for key in self._names_list]
def values(self):
'''Get list of objects'''
self.refresh_cache()
return [self.get_blind(key) for key in self._names_list]