base.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. '''Base classes for managed objects'''
  21. import ast
  22. import qubesadmin.exc
  23. DEFAULT = object()
  24. class PropertyHolder(object):
  25. '''A base class for object having properties retrievable using mgmt API.
  26. Warning: each (non-private) local attribute needs to be defined at class
  27. level, even if initialized in __init__; otherwise will be treated as
  28. property retrievable using mgmt call.
  29. '''
  30. #: a place for appropriate Qubes() object (QubesLocal or QubesRemote),
  31. # use None for self
  32. app = None
  33. def __init__(self, app, method_prefix, method_dest):
  34. #: appropriate Qubes() object (QubesLocal or QubesRemote), use None
  35. # for self
  36. self.app = app
  37. self._method_prefix = method_prefix
  38. self._method_dest = method_dest
  39. self._properties = None
  40. self._properties_help = None
  41. def qubesd_call(self, dest, method, arg=None, payload=None):
  42. '''
  43. Call into qubesd using appropriate mechanism. This method should be
  44. defined by a subclass.
  45. :param dest: Destination VM name
  46. :param method: Full API method name ('mgmt...')
  47. :param arg: Method argument (if any)
  48. :param payload: Payload send to the method
  49. :return: Data returned by qubesd (string)
  50. '''
  51. if dest is None:
  52. dest = self._method_dest
  53. # have the actual implementation at Qubes() instance
  54. if self.app:
  55. return self.app.qubesd_call(dest, method, arg, payload)
  56. raise NotImplementedError
  57. @staticmethod
  58. def _parse_qubesd_response(response_data):
  59. '''Parse response from qubesd.
  60. In case of success, return actual data. In case of error,
  61. raise appropriate exception.
  62. '''
  63. if response_data == b'':
  64. raise qubesadmin.exc.QubesDaemonNoResponseError(
  65. 'Got empty response from qubesd')
  66. if response_data[0:2] == b'\x30\x00':
  67. return response_data[2:]
  68. elif response_data[0:2] == b'\x32\x00':
  69. (_, exc_type, _traceback, format_string, args) = \
  70. response_data.split(b'\x00', 4)
  71. # drop last field because of terminating '\x00'
  72. args = [arg.decode() for arg in args.split(b'\x00')[:-1]]
  73. format_string = format_string.decode('utf-8')
  74. exc_type = exc_type.decode('ascii')
  75. try:
  76. exc_class = getattr(qubesadmin.exc, exc_type)
  77. except AttributeError:
  78. if exc_type.endswith('Error'):
  79. exc_class = __builtins__.get(exc_type,
  80. qubesadmin.exc.QubesException)
  81. else:
  82. exc_class = qubesadmin.exc.QubesException
  83. # TODO: handle traceback if given
  84. raise exc_class(format_string, *args)
  85. else:
  86. raise qubesadmin.exc.QubesDaemonCommunicationError(
  87. 'Invalid response format')
  88. def property_list(self):
  89. '''
  90. List available properties (their names).
  91. :return: list of strings
  92. '''
  93. if self._properties is None:
  94. properties_str = self.qubesd_call(
  95. self._method_dest,
  96. self._method_prefix + 'List',
  97. None,
  98. None)
  99. self._properties = properties_str.decode('ascii').splitlines()
  100. # TODO: make it somehow immutable
  101. return self._properties
  102. def property_help(self, name):
  103. '''
  104. Get description of a property.
  105. :return: property help text
  106. '''
  107. help_text = self.qubesd_call(
  108. self._method_dest,
  109. self._method_prefix + 'Help',
  110. name,
  111. None)
  112. return help_text.decode('ascii')
  113. def property_is_default(self, item):
  114. '''
  115. Check if given property have default value
  116. :param str item: name of property
  117. :return: bool
  118. '''
  119. if item.startswith('_'):
  120. raise AttributeError(item)
  121. property_str = self.qubesd_call(
  122. self._method_dest,
  123. self._method_prefix + 'Get',
  124. item,
  125. None)
  126. (default, _value) = property_str.split(b' ', 1)
  127. assert default.startswith(b'default=')
  128. is_default_str = default.split(b'=')[1]
  129. is_default = ast.literal_eval(is_default_str.decode('ascii'))
  130. assert isinstance(is_default, bool)
  131. return is_default
  132. def __getattr__(self, item):
  133. # pylint: disable=too-many-return-statements
  134. if item.startswith('_'):
  135. raise AttributeError(item)
  136. try:
  137. property_str = self.qubesd_call(
  138. self._method_dest,
  139. self._method_prefix + 'Get',
  140. item,
  141. None)
  142. except qubesadmin.exc.QubesDaemonNoResponseError:
  143. raise qubesadmin.exc.QubesPropertyAccessError(item)
  144. (_default, prop_type, value) = property_str.split(b' ', 2)
  145. prop_type = prop_type.decode('ascii')
  146. if not prop_type.startswith('type='):
  147. raise qubesadmin.exc.QubesDaemonCommunicationError(
  148. 'Invalid type prefix received: {}'.format(prop_type))
  149. (_, prop_type) = prop_type.split('=', 1)
  150. value = value.decode()
  151. if prop_type == 'str':
  152. return str(value)
  153. elif prop_type == 'bool':
  154. if value == '':
  155. raise AttributeError
  156. return ast.literal_eval(value)
  157. elif prop_type == 'int':
  158. if value == '':
  159. raise AttributeError
  160. return ast.literal_eval(value)
  161. elif prop_type == 'vm':
  162. if value == '':
  163. return None
  164. return self.app.domains[value]
  165. elif prop_type == 'label':
  166. if value == '':
  167. return None
  168. # TODO
  169. return self.app.labels[value]
  170. else:
  171. raise qubesadmin.exc.QubesDaemonCommunicationError(
  172. 'Received invalid value type: {}'.format(prop_type))
  173. def __setattr__(self, key, value):
  174. if key.startswith('_') or key in dir(self):
  175. return super(PropertyHolder, self).__setattr__(key, value)
  176. if value is qubesadmin.DEFAULT:
  177. try:
  178. self.qubesd_call(
  179. self._method_dest,
  180. self._method_prefix + 'Reset',
  181. key,
  182. None)
  183. except qubesadmin.exc.QubesDaemonNoResponseError:
  184. raise qubesadmin.exc.QubesPropertyAccessError(key)
  185. else:
  186. if isinstance(value, qubesadmin.vm.QubesVM):
  187. value = value.name
  188. try:
  189. self.qubesd_call(
  190. self._method_dest,
  191. self._method_prefix + 'Set',
  192. key,
  193. str(value).encode('utf-8'))
  194. except qubesadmin.exc.QubesDaemonNoResponseError:
  195. raise qubesadmin.exc.QubesPropertyAccessError(key)
  196. def __delattr__(self, name):
  197. if name.startswith('_') or name in dir(self):
  198. return super(PropertyHolder, self).__delattr__(name)
  199. try:
  200. self.qubesd_call(
  201. self._method_dest,
  202. self._method_prefix + 'Reset',
  203. name
  204. )
  205. except qubesadmin.exc.QubesDaemonNoResponseError:
  206. raise qubesadmin.exc.QubesPropertyAccessError(name)
  207. class WrapperObjectsCollection(object):
  208. '''Collection of simple named objects'''
  209. def __init__(self, app, list_method, object_class):
  210. '''
  211. Construct manager of named wrapper objects.
  212. :param app: Qubes() object
  213. :param list_method: name of API method used to list objects,
  214. must return simple "one name per line" list
  215. :param object_class: object class (callable) for wrapper objects,
  216. will be called with just two arguments: app and a name
  217. '''
  218. self.app = app
  219. self._list_method = list_method
  220. self._object_class = object_class
  221. #: names cache
  222. self._names_list = None
  223. #: returned objects cache
  224. self._objects = {}
  225. def clear_cache(self):
  226. '''Clear cached list of names'''
  227. self._names_list = None
  228. def refresh_cache(self, force=False):
  229. '''Refresh cached list of names'''
  230. if not force and self._names_list is not None:
  231. return
  232. list_data = self.app.qubesd_call('dom0', self._list_method)
  233. list_data = list_data.decode('ascii')
  234. assert list_data[-1] == '\n'
  235. self._names_list = [str(name) for name in list_data[:-1].splitlines()]
  236. for name, obj in list(self._objects.items()):
  237. if obj.name not in self._names_list:
  238. # Object no longer exists
  239. del self._objects[name]
  240. def __getitem__(self, item):
  241. if item not in self:
  242. raise KeyError(item)
  243. if item not in self._objects:
  244. self._objects[item] = self._object_class(self.app, item)
  245. return self._objects[item]
  246. def __contains__(self, item):
  247. self.refresh_cache()
  248. return item in self._names_list
  249. def __iter__(self):
  250. self.refresh_cache()
  251. for obj in self._names_list:
  252. yield self[obj]
  253. def keys(self):
  254. '''Get list of names.'''
  255. self.refresh_cache()
  256. return self._names_list.copy()