__init__.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import ast
  21. import os
  22. import qubesmgmt.exc
  23. DEFAULT = object()
  24. class PropertyHolder(object):
  25. '''A base class for object having properties retrievable using mgmt API.
  26. Warning: each (non-private) local attribute needs to be defined at class
  27. level, even if initialized in __init__; otherwise will be treated as
  28. property retrievable using mgmt call.
  29. '''
  30. #: a place for appropriate Qubes() object (QubesLocal or QubesRemote),
  31. # use None for self
  32. app = None
  33. def __init__(self, app, method_prefix, method_dest):
  34. #: appropriate Qubes() object (QubesLocal or QubesRemote), use None
  35. # for self
  36. self.app = app
  37. self._method_prefix = method_prefix
  38. self._method_dest = method_dest
  39. self._properties = None
  40. self._properties_help = None
  41. def qubesd_call(self, dest, method, arg=None, payload=None):
  42. '''
  43. Call into qubesd using appropriate mechanism. This method should be
  44. defined by a subclass.
  45. :param dest: Destination VM name
  46. :param method: Full API method name ('mgmt...')
  47. :param arg: Method argument (if any)
  48. :param payload: Payload send to the method
  49. :return: Data returned by qubesd (string)
  50. '''
  51. # have the actual implementation at Qubes() instance
  52. if self.app:
  53. return self.app.qubesd_call(dest, method, arg, payload)
  54. raise NotImplementedError
  55. @staticmethod
  56. def _parse_qubesd_response(response_data):
  57. if response_data[0:2] == b'\x30\x00':
  58. return response_data[2:]
  59. elif response_data[0:2] == b'\x32\x00':
  60. (_, exc_type, _traceback, format_string, args) = \
  61. response_data.split(b'\x00', 4)
  62. # drop last field because of terminating '\x00'
  63. args = [arg.decode() for arg in args.split(b'\x00')[:-1]]
  64. format_string = format_string.decode('utf-8')
  65. exc_class = getattr(qubesmgmt.exc, exc_type, 'QubesException')
  66. # TODO: handle traceback if given
  67. raise exc_class(format_string, *args)
  68. else:
  69. raise qubesmgmt.exc.QubesException('Invalid response format')
  70. def property_list(self):
  71. if self._properties is None:
  72. properties_str = self.qubesd_call(
  73. self._method_dest,
  74. self._method_prefix + 'List',
  75. None,
  76. None)
  77. self._properties = properties_str.splitlines()
  78. # TODO: make it somehow immutable
  79. return self._properties
  80. def property_is_default(self, item):
  81. if item.startswith('_'):
  82. raise AttributeError(item)
  83. property_str = self.qubesd_call(
  84. self._method_dest,
  85. self._method_prefix + 'Get',
  86. item,
  87. None)
  88. (default, _value) = property_str.split(b' ', 1)
  89. assert default.startswith(b'default=')
  90. is_default_str = default.split(b'=')[1]
  91. is_default = ast.literal_eval(is_default_str.decode('ascii'))
  92. assert isinstance(is_default, bool)
  93. return is_default
  94. def __getattr__(self, item):
  95. if item.startswith('_'):
  96. raise AttributeError(item)
  97. property_str = self.qubesd_call(
  98. self._method_dest,
  99. self._method_prefix + 'Get',
  100. item,
  101. None)
  102. (_default, value) = property_str.split(' ', 1)
  103. if value[0] == '\'':
  104. return ast.literal_eval('b' + value)
  105. else:
  106. return ast.literal_eval(value)
  107. def __setattr__(self, key, value):
  108. if key.startswith('_') or key in dir(self):
  109. return super(PropertyHolder, self).__setattr__(key, value)
  110. if value is DEFAULT:
  111. self.qubesd_call(
  112. self._method_dest,
  113. self._method_prefix + 'Reset',
  114. key,
  115. None)
  116. else:
  117. if isinstance(value, qubesmgmt.vm.QubesVM):
  118. # pylint: disable=protected-access
  119. value = value._name
  120. self.qubesd_call(
  121. self._method_dest,
  122. self._method_prefix + 'Set',
  123. key,
  124. bytes(value))
  125. def __delattr__(self, name):
  126. if name.startswith('_') or name in dir(self):
  127. return super(PropertyHolder, self).__delattr__(name)
  128. self.qubesd_call(
  129. self._method_dest,
  130. self._method_prefix + 'Reset',
  131. name
  132. )
  133. # pylint: disable=wrong-import-position
  134. import qubesmgmt.app
  135. if os.path.exists(qubesmgmt.app.QUBESD_SOCK):
  136. Qubes = qubesmgmt.app.QubesLocal
  137. else:
  138. Qubes = qubesmgmt.app.QubesRemote