qubes/admin: Add listing of API methods
This commit is contained in:
parent
26013122a0
commit
3e0d01cfb9
@ -41,7 +41,7 @@ class PermissionDenied(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def method(name, *, no_payload=False, endpoints=None):
|
||||
def method(name, *, no_payload=False, endpoints=None, **classifiers):
|
||||
'''Decorator factory for methods intended to appear in API.
|
||||
|
||||
The decorated method can be called from public API using a child of
|
||||
@ -75,11 +75,14 @@ def method(name, *, no_payload=False, endpoints=None):
|
||||
|
||||
# pylint: disable=protected-access
|
||||
if endpoints is None:
|
||||
func._rpcname = ((name, None),)
|
||||
func.rpcnames = ((name, None),)
|
||||
else:
|
||||
func._rpcname = tuple(
|
||||
func.rpcnames = tuple(
|
||||
(name.format(endpoint=endpoint), endpoint)
|
||||
for endpoint in endpoints)
|
||||
|
||||
func.classifiers = classifiers
|
||||
|
||||
return func
|
||||
|
||||
return decorator
|
||||
@ -133,43 +136,45 @@ class AbstractQubesAPI(object):
|
||||
#: is this operation cancellable?
|
||||
self.cancellable = False
|
||||
|
||||
untrusted_candidates = []
|
||||
for attr in dir(self):
|
||||
func = getattr(self, attr)
|
||||
candidates = list(self.list_methods(self.method))
|
||||
|
||||
if not candidates:
|
||||
raise ProtocolError('no such method: {!r}'.format(self.method))
|
||||
|
||||
assert len(candidates) == 1, \
|
||||
'multiple candidates for method {!r}'.format(self.method)
|
||||
|
||||
#: the method to execute
|
||||
self._handler = candidates[0]
|
||||
self._running_handler = None
|
||||
|
||||
@classmethod
|
||||
def list_methods(cls, select_method=None):
|
||||
for attr in dir(cls):
|
||||
func = getattr(cls, attr)
|
||||
if not callable(func):
|
||||
continue
|
||||
|
||||
try:
|
||||
# pylint: disable=protected-access
|
||||
for mname, endpoint in func._rpcname:
|
||||
if mname != self.method:
|
||||
continue
|
||||
untrusted_candidates.append((func, endpoint))
|
||||
rpcnames = func.rpcnames
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
if not untrusted_candidates:
|
||||
raise ProtocolError('no such method: {!r}'.format(self.method))
|
||||
|
||||
assert len(untrusted_candidates) == 1, \
|
||||
'multiple candidates for method {!r}'.format(self.method)
|
||||
|
||||
#: the method to execute
|
||||
self._handler = untrusted_candidates[0]
|
||||
self._running_handler = None
|
||||
del untrusted_candidates
|
||||
for mname, endpoint in rpcnames:
|
||||
if select_method is None or mname == select_method:
|
||||
yield (func, mname, endpoint)
|
||||
|
||||
def execute(self, *, untrusted_payload):
|
||||
'''Execute management operation.
|
||||
|
||||
This method is a coroutine.
|
||||
'''
|
||||
handler, endpoint = self._handler
|
||||
handler, _, endpoint = self._handler
|
||||
kwargs = {}
|
||||
if endpoint is not None:
|
||||
kwargs['endpoint'] = endpoint
|
||||
self._running_handler = asyncio.ensure_future(handler(
|
||||
self._running_handler = asyncio.ensure_future(handler(self,
|
||||
untrusted_payload=untrusted_payload, **kwargs))
|
||||
return self._running_handler
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user