__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
  6. # Copyright (C) 2017 Marek Marczykowski-Górecki
  7. # <marmarek@invisiblethingslab.com>
  8. #
  9. # This library is free software; you can redistribute it and/or
  10. # modify it under the terms of the GNU Lesser General Public
  11. # License as published by the Free Software Foundation; either
  12. # version 2.1 of the License, or (at your option) any later version.
  13. #
  14. # This library is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  17. # Lesser General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Lesser General Public
  20. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  21. import asyncio
  22. import errno
  23. import functools
  24. import io
  25. import os
  26. import shutil
  27. import socket
  28. import struct
  29. import traceback
  30. import qubes.exc
  31. class ProtocolError(AssertionError):
  32. '''Raised when something is wrong with data received'''
  33. class PermissionDenied(Exception):
  34. '''Raised deliberately by handlers when we decide not to cooperate'''
  35. def method(name, *, no_payload=False, endpoints=None, **classifiers):
  36. '''Decorator factory for methods intended to appear in API.
  37. The decorated method can be called from public API using a child of
  38. :py:class:`AbstractQubesMgmt` class. The method becomes "public", and can be
  39. called using remote management interface.
  40. :param str name: qrexec rpc method name
  41. :param bool no_payload: if :py:obj:`True`, will barf on non-empty payload; \
  42. also will not pass payload at all to the method
  43. :param iterable endpoints: if specified, method serve multiple API calls
  44. generated by replacing `{endpoint}` with each value in this iterable
  45. The expected function method should have one argument (other than usual
  46. *self*), ``untrusted_payload``, which will contain the payload.
  47. .. warning::
  48. This argument has to be named such, to remind the programmer that the
  49. content of this variable is indeed untrusted.
  50. If *no_payload* is true, then the method is called with no arguments.
  51. '''
  52. def decorator(func):
  53. if no_payload:
  54. # the following assignment is needed for how closures work in Python
  55. _func = func
  56. @functools.wraps(_func)
  57. def wrapper(self, untrusted_payload, **kwargs):
  58. if untrusted_payload != b'':
  59. raise ProtocolError('unexpected payload')
  60. return _func(self, **kwargs)
  61. func = wrapper
  62. # pylint: disable=protected-access
  63. if endpoints is None:
  64. func.rpcnames = ((name, None),)
  65. else:
  66. func.rpcnames = tuple(
  67. (name.format(endpoint=endpoint), endpoint)
  68. for endpoint in endpoints)
  69. func.classifiers = classifiers
  70. return func
  71. return decorator
  72. def apply_filters(iterable, filters):
  73. '''Apply filters returned by admin-permission:... event'''
  74. for selector in filters:
  75. iterable = filter(selector, iterable)
  76. return iterable
  77. class AbstractQubesAPI:
  78. '''Common code for Qubes Management Protocol handling
  79. Different interfaces can expose different API call sets, however they share
  80. common protocol and common implementation framework. This class is the
  81. latter.
  82. To implement a new interface, inherit from this class and write at least one
  83. method and decorate it with :py:func:`api` decorator. It will have access to
  84. pre-defined attributes: :py:attr:`app`, :py:attr:`src`, :py:attr:`dest`,
  85. :py:attr:`arg` and :py:attr:`method`.
  86. There are also two helper functions for firing events associated with API
  87. calls.
  88. '''
  89. #: the preferred socket location (to be overridden in child's class)
  90. SOCKNAME = None
  91. def __init__(self, app, src, method_name, dest, arg, send_event=None):
  92. #: :py:class:`qubes.Qubes` object
  93. self.app = app
  94. try:
  95. vm = src.decode('ascii')
  96. #: source qube
  97. self.src = self.app.domains[vm]
  98. vm = dest.decode('ascii')
  99. #: destination qube
  100. self.dest = self.app.domains[vm]
  101. except KeyError:
  102. # normally this should filtered out by qrexec policy, but there are
  103. # two cases it might not be:
  104. # 1. The call comes from dom0, which bypasses qrexec policy
  105. # 2. Domain was removed between checking the policy and here
  106. # we inform the client accordingly
  107. raise qubes.exc.QubesVMNotFoundError(vm)
  108. #: argument
  109. self.arg = arg.decode('ascii')
  110. #: name of the method
  111. self.method = method_name.decode('ascii')
  112. #: callback for sending events if applicable
  113. self.send_event = send_event
  114. #: is this operation cancellable?
  115. self.cancellable = False
  116. candidates = list(self.list_methods(self.method))
  117. if not candidates:
  118. raise ProtocolError('no such method: {!r}'.format(self.method))
  119. assert len(candidates) == 1, \
  120. 'multiple candidates for method {!r}'.format(self.method)
  121. #: the method to execute
  122. self._handler = candidates[0]
  123. self._running_handler = None
  124. @classmethod
  125. def list_methods(cls, select_method=None):
  126. for attr in dir(cls):
  127. func = getattr(cls, attr)
  128. if not callable(func):
  129. continue
  130. try:
  131. # pylint: disable=protected-access
  132. rpcnames = func.rpcnames
  133. except AttributeError:
  134. continue
  135. for mname, endpoint in rpcnames:
  136. if select_method is None or mname == select_method:
  137. yield (func, mname, endpoint)
  138. def execute(self, *, untrusted_payload):
  139. '''Execute management operation.
  140. This method is a coroutine.
  141. '''
  142. handler, _, endpoint = self._handler
  143. kwargs = {}
  144. if endpoint is not None:
  145. kwargs['endpoint'] = endpoint
  146. self._running_handler = asyncio.ensure_future(handler(self,
  147. untrusted_payload=untrusted_payload, **kwargs))
  148. return self._running_handler
  149. def cancel(self):
  150. '''If operation is cancellable, interrupt it'''
  151. if self.cancellable and self._running_handler is not None:
  152. self._running_handler.cancel()
  153. def fire_event_for_permission(self, **kwargs):
  154. '''Fire an event on the source qube to check for permission'''
  155. return self.src.fire_event('admin-permission:' + self.method,
  156. pre_event=True, dest=self.dest, arg=self.arg, **kwargs)
  157. def fire_event_for_filter(self, iterable, **kwargs):
  158. '''Fire an event on the source qube to filter for permission'''
  159. return apply_filters(iterable,
  160. self.fire_event_for_permission(**kwargs))
  161. @staticmethod
  162. def enforce(predicate):
  163. '''An assert replacement, but works even with optimisations.'''
  164. if not predicate:
  165. raise PermissionDenied()
  166. class QubesDaemonProtocol(asyncio.Protocol):
  167. buffer_size = 65536
  168. header = struct.Struct('Bx')
  169. # keep track of connections, to gracefully close them at server exit
  170. # (including cleanup of integration test)
  171. connections = set()
  172. def __init__(self, handler, *args, app, debug=False, **kwargs):
  173. super().__init__(*args, **kwargs)
  174. self.handler = handler
  175. self.app = app
  176. self.untrusted_buffer = io.BytesIO()
  177. self.len_untrusted_buffer = 0
  178. self.transport = None
  179. self.debug = debug
  180. self.event_sent = False
  181. self.mgmt = None
  182. def connection_made(self, transport):
  183. self.transport = transport
  184. self.connections.add(self)
  185. def connection_lost(self, exc):
  186. self.untrusted_buffer.close()
  187. # for cancellable operation, interrupt it, otherwise it will do nothing
  188. if self.mgmt is not None:
  189. self.mgmt.cancel()
  190. self.transport = None
  191. self.connections.remove(self)
  192. def data_received(self, untrusted_data): # pylint: disable=arguments-differ
  193. if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
  194. self.app.log.warning('request too long')
  195. self.transport.abort()
  196. self.untrusted_buffer.close()
  197. return
  198. self.len_untrusted_buffer += \
  199. self.untrusted_buffer.write(untrusted_data)
  200. def eof_received(self):
  201. try:
  202. connection_params, untrusted_payload = \
  203. self.untrusted_buffer.getvalue().split(b'\0', 1)
  204. meth_arg, src, dest_type, dest = \
  205. connection_params.split(b' ', 3)
  206. if dest_type == b'keyword' and dest == b'adminvm':
  207. dest_type, dest = b'name', b'dom0'
  208. if dest_type != b'name':
  209. raise ValueError(
  210. 'got {} destination type, '
  211. 'while only explicit name supported'.format(dest_type))
  212. if b'+' in meth_arg:
  213. meth, arg = meth_arg.split(b'+', 1)
  214. else:
  215. meth, arg = meth_arg, b''
  216. except ValueError:
  217. self.app.log.warning('framing error')
  218. self.transport.abort()
  219. return None
  220. finally:
  221. self.untrusted_buffer.close()
  222. asyncio.ensure_future(self.respond(
  223. src, meth, dest, arg, untrusted_payload=untrusted_payload))
  224. return True
  225. @asyncio.coroutine
  226. def respond(self, src, meth, dest, arg, *, untrusted_payload):
  227. try:
  228. self.mgmt = self.handler(self.app, src, meth, dest, arg,
  229. self.send_event)
  230. response = yield from self.mgmt.execute(
  231. untrusted_payload=untrusted_payload)
  232. assert not (self.event_sent and response)
  233. if self.transport is None:
  234. return
  235. # except clauses will fall through to transport.abort() below
  236. except PermissionDenied:
  237. self.app.log.warning(
  238. 'permission denied for call %s+%s (%s → %s) '
  239. 'with payload of %d bytes',
  240. meth, arg, src, dest, len(untrusted_payload))
  241. except ProtocolError:
  242. self.app.log.warning(
  243. 'protocol error for call %s+%s (%s → %s) '
  244. 'with payload of %d bytes',
  245. meth, arg, src, dest, len(untrusted_payload))
  246. except qubes.exc.QubesException as err:
  247. msg = ('%r while calling '
  248. 'src=%r meth=%r dest=%r arg=%r len(untrusted_payload)=%d')
  249. if self.debug:
  250. self.app.log.debug(msg,
  251. err, src, meth, dest, arg, len(untrusted_payload),
  252. exc_info=1)
  253. if self.transport is not None:
  254. self.send_exception(err)
  255. self.transport.write_eof()
  256. self.transport.close()
  257. return
  258. except Exception: # pylint: disable=broad-except
  259. self.app.log.exception(
  260. 'unhandled exception while calling '
  261. 'src=%r meth=%r dest=%r arg=%r len(untrusted_payload)=%d',
  262. src, meth, dest, arg, len(untrusted_payload))
  263. else:
  264. if not self.event_sent:
  265. self.send_response(response)
  266. try:
  267. self.transport.write_eof()
  268. except NotImplementedError:
  269. pass
  270. self.transport.close()
  271. return
  272. # this is reached if from except: blocks; do not put it in finally:,
  273. # because this will prevent the good case from sending the reply
  274. if self.transport:
  275. self.transport.abort()
  276. def send_header(self, *args):
  277. self.transport.write(self.header.pack(*args))
  278. def send_response(self, content):
  279. assert not self.event_sent
  280. self.send_header(0x30)
  281. if content is not None:
  282. self.transport.write(content.encode('utf-8'))
  283. def send_event(self, subject, event, **kwargs):
  284. if self.transport is None:
  285. return
  286. self.event_sent = True
  287. self.send_header(0x31)
  288. if subject is not self.app:
  289. self.transport.write(str(subject).encode('ascii'))
  290. self.transport.write(b'\0')
  291. self.transport.write(event.encode('ascii') + b'\0')
  292. for k, v in kwargs.items():
  293. self.transport.write('{}\0{}\0'.format(k, str(v)).encode('ascii'))
  294. self.transport.write(b'\0')
  295. def send_exception(self, exc):
  296. self.send_header(0x32)
  297. self.transport.write(type(exc).__name__.encode() + b'\0')
  298. if self.debug:
  299. self.transport.write(''.join(traceback.format_exception(
  300. type(exc), exc, exc.__traceback__)).encode('utf-8'))
  301. self.transport.write(b'\0')
  302. self.transport.write(str(exc).encode('utf-8') + b'\0')
  303. def cleanup_socket(sockpath, force):
  304. '''Remove socket if stale, or force=True
  305. :param sockpath: path to a socket
  306. :param force: should remove even if still used
  307. '''
  308. if force:
  309. os.unlink(sockpath)
  310. else:
  311. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  312. try:
  313. sock.connect(sockpath)
  314. except ConnectionRefusedError:
  315. # dead socket, remove it anyway
  316. os.unlink(sockpath)
  317. else:
  318. # woops, someone is listening
  319. sock.close()
  320. raise FileExistsError(errno.EEXIST,
  321. 'socket already exists: {!r}'.format(sockpath))
  322. @asyncio.coroutine
  323. def create_servers(*args, force=False, loop=None, **kwargs):
  324. '''Create multiple Qubes API servers
  325. :param qubes.Qubes app: the app that is a backend of the servers
  326. :param bool force: if :py:obj:`True`, unconditionally remove existing \
  327. sockets; if :py:obj:`False`, raise an error if there is some process \
  328. listening to such socket
  329. :param asyncio.Loop loop: loop
  330. *args* are supposed to be classes inheriting from
  331. :py:class:`AbstractQubesAPI`
  332. *kwargs* (like *app* or *debug* for example) are passed to
  333. :py:class:`QubesDaemonProtocol` constructor
  334. '''
  335. loop = loop or asyncio.get_event_loop()
  336. servers = []
  337. old_umask = os.umask(0o007)
  338. try:
  339. # XXX this can be optimised with asyncio.wait() to start servers in
  340. # parallel, but I currently don't see the need
  341. for handler in args:
  342. sockpath = handler.SOCKNAME
  343. assert sockpath is not None, \
  344. 'SOCKNAME needs to be overloaded in {}'.format(
  345. type(handler).__name__)
  346. if os.path.exists(sockpath):
  347. cleanup_socket(sockpath, force)
  348. server = yield from loop.create_unix_server(
  349. functools.partial(QubesDaemonProtocol, handler, **kwargs),
  350. sockpath)
  351. for sock in server.sockets:
  352. shutil.chown(sock.getsockname(), group='qubes')
  353. servers.append(server)
  354. except:
  355. for server in servers:
  356. for sock in server.sockets:
  357. try:
  358. os.unlink(sock.getsockname())
  359. except FileNotFoundError:
  360. pass
  361. server.close()
  362. if servers:
  363. yield from asyncio.wait([
  364. server.wait_closed() for server in servers])
  365. raise
  366. finally:
  367. os.umask(old_umask)
  368. return servers