mgmt: add support for cancellable operations
Allow method handler to decide if operation could be cancelled. If yes, when connection to the qubesd is terminated (and protocol.connection_lost get called) the operation is cancelled using standard asyncio method - in which case asyncio.CancelledError is thrown inside method handler. This needs to be explicitly enabled, because cancellable methods are much harder to write, to maintain consistent system state. Caveat: protocol.connection_lost is called only when trying to send some data to it (and it's already terminated). Which makes this whole mechanism useful only for events. Otherwise, when sending some data (and possibly detecting that connection is broken), the operation is already completed. QubesOS/qubes-issues#2622
This commit is contained in:
parent
ee967c1e72
commit
bd9f1d2e7c
@ -112,6 +112,9 @@ class AbstractQubesMgmt(object):
|
|||||||
#: name of the method
|
#: name of the method
|
||||||
self.method = method.decode('ascii')
|
self.method = method.decode('ascii')
|
||||||
|
|
||||||
|
#: is this operation cancellable?
|
||||||
|
self.cancellable = False
|
||||||
|
|
||||||
untrusted_candidates = []
|
untrusted_candidates = []
|
||||||
for attr in dir(self):
|
for attr in dir(self):
|
||||||
untrusted_func = getattr(self, attr)
|
untrusted_func = getattr(self, attr)
|
||||||
@ -135,9 +138,25 @@ class AbstractQubesMgmt(object):
|
|||||||
'multiple candidates for method {!r}'.format(self.method)
|
'multiple candidates for method {!r}'.format(self.method)
|
||||||
|
|
||||||
#: the method to execute
|
#: the method to execute
|
||||||
self.execute = untrusted_candidates[0]
|
self._handler = untrusted_candidates[0]
|
||||||
|
self._running_handler = None
|
||||||
del untrusted_candidates
|
del untrusted_candidates
|
||||||
|
|
||||||
|
def execute(self, *, untrusted_payload):
|
||||||
|
'''Execute management operation.
|
||||||
|
|
||||||
|
This method is a coroutine.
|
||||||
|
'''
|
||||||
|
self._running_handler = asyncio.ensure_future(self._handler(
|
||||||
|
untrusted_payload=untrusted_payload))
|
||||||
|
return self._running_handler
|
||||||
|
|
||||||
|
def cancel(self):
|
||||||
|
'''If operation is cancellable, interrupt it'''
|
||||||
|
if self.cancellable and self._running_handler is not None:
|
||||||
|
self._running_handler.cancel()
|
||||||
|
|
||||||
|
|
||||||
def fire_event_for_permission(self, **kwargs):
|
def fire_event_for_permission(self, **kwargs):
|
||||||
'''Fire an event on the source qube to check for permission'''
|
'''Fire an event on the source qube to check for permission'''
|
||||||
return self.src.fire_event_pre('mgmt-permission:{}'.format(self.method),
|
return self.src.fire_event_pre('mgmt-permission:{}'.format(self.method),
|
||||||
|
@ -32,6 +32,7 @@ class QubesDaemonProtocol(asyncio.Protocol):
|
|||||||
self.len_untrusted_buffer = 0
|
self.len_untrusted_buffer = 0
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self.debug = debug
|
self.debug = debug
|
||||||
|
self.mgmt = None
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
print('connection_made()')
|
print('connection_made()')
|
||||||
@ -40,6 +41,10 @@ class QubesDaemonProtocol(asyncio.Protocol):
|
|||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
print('connection_lost(exc={!r})'.format(exc))
|
print('connection_lost(exc={!r})'.format(exc))
|
||||||
self.untrusted_buffer.close()
|
self.untrusted_buffer.close()
|
||||||
|
# for cancellable operation, interrupt it, otherwise it will do nothing
|
||||||
|
if self.mgmt is not None:
|
||||||
|
self.mgmt.cancel()
|
||||||
|
self.transport = None
|
||||||
|
|
||||||
def data_received(self, untrusted_data): # pylint: disable=arguments-differ
|
def data_received(self, untrusted_data): # pylint: disable=arguments-differ
|
||||||
print('data_received(untrusted_data={!r})'.format(untrusted_data))
|
print('data_received(untrusted_data={!r})'.format(untrusted_data))
|
||||||
@ -72,9 +77,12 @@ class QubesDaemonProtocol(asyncio.Protocol):
|
|||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def respond(self, src, method, dest, arg, *, untrusted_payload):
|
def respond(self, src, method, dest, arg, *, untrusted_payload):
|
||||||
try:
|
try:
|
||||||
mgmt = self.handler(self.app, src, method, dest, arg)
|
self.mgmt = self.handler(self.app, src, method, dest, arg,
|
||||||
response = yield from mgmt.execute(
|
self.send_event)
|
||||||
|
response = yield from self.mgmt.execute(
|
||||||
untrusted_payload=untrusted_payload)
|
untrusted_payload=untrusted_payload)
|
||||||
|
if self.transport is None:
|
||||||
|
return
|
||||||
|
|
||||||
# except clauses will fall through to transport.abort() below
|
# except clauses will fall through to transport.abort() below
|
||||||
|
|
||||||
@ -91,6 +99,11 @@ class QubesDaemonProtocol(asyncio.Protocol):
|
|||||||
method, arg, src, dest, len(untrusted_payload))
|
method, arg, src, dest, len(untrusted_payload))
|
||||||
|
|
||||||
except qubes.exc.QubesException as err:
|
except qubes.exc.QubesException as err:
|
||||||
|
self.app.log.exception(
|
||||||
|
'error while calling '
|
||||||
|
'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
|
||||||
|
src, method, dest, arg, len(untrusted_payload))
|
||||||
|
if self.transport is not None:
|
||||||
self.send_exception(err)
|
self.send_exception(err)
|
||||||
self.transport.write_eof()
|
self.transport.write_eof()
|
||||||
self.transport.close()
|
self.transport.close()
|
||||||
|
Loading…
Reference in New Issue
Block a user