__init__.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import ast
  21. import os
  22. import qubesmgmt.exc
  23. class property(object):
  24. DEFAULT = object()
  25. class PropertyHolder(object):
  26. '''A base class for object having properties retrievable using mgmt API.
  27. Warning: each (non-private) local attribute needs to be defined at class
  28. level, even if initialized in __init__; otherwise will be treated as
  29. property retrievable using mgmt call.
  30. '''
  31. #: a place for appropriate Qubes() object (QubesLocal or QubesRemote),
  32. # use None for self
  33. app = None
  34. def __init__(self, app, method_prefix, method_dest):
  35. #: appropriate Qubes() object (QubesLocal or QubesRemote), use None
  36. # for self
  37. self.app = app
  38. self._method_prefix = method_prefix
  39. self._method_dest = method_dest
  40. self._properties = None
  41. self._properties_help = None
  42. def _do_qubesd_call(self, dest, method, arg=None, payload=None):
  43. '''
  44. Call into qubesd using appropriate mechanism. This method should be
  45. defined by a subclass.
  46. :param dest: Destination VM name
  47. :param method: Full API method name ('mgmt...')
  48. :param arg: Method argument (if any)
  49. :param payload: Payload send to the method
  50. :return: Data returned by qubesd (string)
  51. '''
  52. # have the actual implementation at Qubes() instance
  53. if self.app:
  54. return self.app._do_qubesd_call(dest, method, arg, payload)
  55. raise NotImplementedError
  56. def _parse_qubesd_response(self, response_data):
  57. if response_data[0:2] == b'\x30\x00':
  58. return response_data[2:]
  59. elif response_data[0:2] == b'\x32\x00':
  60. (_, exc_type, traceback, format_string, args) = \
  61. response_data.split(b'\x00', 4)
  62. # drop last field because of terminating '\x00'
  63. args = [arg.decode() for arg in args.split(b'\x00')[:-1]]
  64. format_string = format_string.decode('utf-8')
  65. exc_class = getattr(qubesmgmt.exc, exc_type, 'QubesException')
  66. # TODO: handle traceback if given
  67. raise exc_class(format_string, *args)
  68. else:
  69. raise qubesmgmt.exc.QubesException('Invalid response format')
  70. def property_list(self):
  71. if self._properties is None:
  72. properties_str = self._do_qubesd_call(
  73. self._method_dest,
  74. self._method_prefix + 'List',
  75. None,
  76. None)
  77. self._properties = properties_str.splitlines()
  78. # TODO: make it somehow immutable
  79. return self._properties
  80. def property_is_default(self, item):
  81. if item.startswith('_'):
  82. raise AttributeError(item)
  83. property_str = self._do_qubesd_call(
  84. self._method_dest,
  85. self._method_prefix + 'Get',
  86. item,
  87. None)
  88. (default, value) = property_str.split(b' ', 1)
  89. assert default.startswith(b'default=')
  90. is_default_str = default.split(b'=')[1]
  91. is_default = ast.literal_eval(is_default_str.decode('ascii'))
  92. assert isinstance(is_default, bool)
  93. return is_default
  94. def __getattr__(self, item):
  95. if item.startswith('_'):
  96. raise AttributeError(item)
  97. property_str = self._do_qubesd_call(
  98. self._method_dest,
  99. self._method_prefix + 'Get',
  100. item,
  101. None)
  102. (default, value) = property_str.split(' ', 1)
  103. if value[0] == '\'':
  104. return ast.literal_eval('b' + value)
  105. else:
  106. return ast.literal_eval(value)
  107. def __setattr__(self, key, value):
  108. if key.startswith('_') or key in dir(self):
  109. return super(PropertyHolder, self).__setattr__(key, value)
  110. if value is property.DEFAULT:
  111. self._do_qubesd_call(
  112. self._method_dest,
  113. self._method_prefix + 'Reset',
  114. key,
  115. None)
  116. else:
  117. if isinstance(value, qubesmgmt.vm.QubesVM):
  118. value = value._name
  119. self._do_qubesd_call(
  120. self._method_dest,
  121. self._method_prefix + 'Set',
  122. key,
  123. bytes(value))
  124. def __delattr__(self, name):
  125. if name.startswith('_') or name in dir(self):
  126. return super(PropertyHolder, self).__delattr__(name)
  127. self._do_qubesd_call(
  128. self._method_dest,
  129. self._method_prefix + 'Reset',
  130. name
  131. )
  132. import qubesmgmt.app
  133. if os.path.exists(qubesmgmt.app.QUBESD_SOCK):
  134. Qubes = qubesmgmt.app.QubesLocal
  135. else:
  136. Qubes = qubesmgmt.app.QubesRemote