utils.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org
  3. #
  4. # Copyright (C) 2012 Agnieszka Kostrzewa <agnieszka.kostrzewa@gmail.com>
  5. # Copyright (C) 2012 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. # Copyright (C) 2020 Marta Marczykowska-Górecka
  9. # <marmarta@invisiblethingslab.com>
  10. #
  11. # This program is free software: you can redistribute it and/or modify
  12. # it under the terms of the GNU General Public License as published by
  13. # the Free Software Foundation, either version 2 of the License, or
  14. # (at your option) any later version.
  15. #
  16. # This program is distributed in the hope that it will be useful,
  17. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  19. # GNU General Public License for more details.
  20. #
  21. # You should have received a copy of the GNU General Public License
  22. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. #
  24. import itertools
  25. import os
  26. import re
  27. import qubesadmin
  28. import traceback
  29. import asyncio
  30. from contextlib import suppress
  31. import sys
  32. import qasync
  33. from qubesadmin import events
  34. from qubesadmin import exc
  35. from PyQt5 import QtWidgets, QtCore, QtGui # pylint: disable=import-error
  36. # important usage note: which initialize_widget should I use?
  37. # - if you want a list of VMs, use initialize_widget_with_vms, optionally
  38. # adding a property if you want to handle qubesadmin.DEFAULT and the
  39. # current (potentially default) value
  40. # - if you want a list of labels or kernals, use
  41. # initialize_widget_with_kernels/labels
  42. # - list of some things, but associated with a definite property (optionally
  43. # with qubesadmin.DEFAULT) - initialize_widget_for_property
  44. # - list of some things, not associated with a property, but still having a
  45. # default - initialize_widget_with_default
  46. # - just a list, no properties or defaults, just a nice list with a "current"
  47. # value - initialize_widget
  48. def is_internal(vm):
  49. """checks if the VM is either an AdminVM or has the 'internal' features"""
  50. try:
  51. return (vm.klass == 'AdminVM'
  52. or vm.features.get('internal', False))
  53. except exc.QubesDaemonAccessError:
  54. return False
  55. def is_running(vm, default_state):
  56. """Checks if the VM is running, returns default_state if we have
  57. insufficient permissions to deteremine that."""
  58. try:
  59. return vm.is_running()
  60. except exc.QubesDaemonAccessError:
  61. return default_state
  62. def translate(string):
  63. """helper function for translations"""
  64. return QtCore.QCoreApplication.translate(
  65. "ManagerUtils", string)
  66. class SizeSpinBox(QtWidgets.QSpinBox):
  67. """A SpinBox subclass with extended handling for sizes in MB and GB"""
  68. # pylint: disable=invalid-name, no-self-use
  69. def __init__(self, *args, **kwargs):
  70. super().__init__(*args, **kwargs)
  71. self.pattern = r'(\d+\.?\d?) ?(GB|MB)'
  72. self.regex = re.compile(self.pattern)
  73. self.validator = QtGui.QRegExpValidator(QtCore.QRegExp(
  74. self.pattern), self)
  75. def textFromValue(self, v: int) -> str:
  76. if v > 1024:
  77. return '{:.1f} GB'.format(v / 1024)
  78. return '{} MB'.format(v)
  79. def validate(self, text: str, pos: int):
  80. return self.validator.validate(text, pos)
  81. def valueFromText(self, text: str) -> int:
  82. value, unit = self.regex.fullmatch(text.strip()).groups()
  83. if unit == 'GB':
  84. multiplier = 1024
  85. else:
  86. multiplier = 1
  87. return int(float(value) * multiplier)
  88. def get_feature(vm, feature_name, default_value):
  89. try:
  90. return vm.features.get(feature_name, default_value)
  91. except exc.QubesDaemonAccessError:
  92. return default_value
  93. def get_boolean_feature(vm, feature_name):
  94. """heper function to get a feature converted to a Bool if it does exist.
  95. Necessary because of the true/false in features being coded as 1/empty
  96. string."""
  97. result = get_feature(vm, feature_name, None)
  98. if result is not None:
  99. result = bool(result)
  100. return result
  101. def did_widget_selection_change(widget):
  102. """a simple heuristic to check if the widget text contains appropriately
  103. translated 'current'"""
  104. if not widget.isEnabled():
  105. return False
  106. return not translate(" (current)") in widget.currentText()
  107. def initialize_widget(widget, choices, selected_value=None,
  108. icon_getter=None, add_current_label=True):
  109. """
  110. populates widget (ListBox or ComboBox) with items. Previous widget contents
  111. are erased.
  112. :param widget: QListBox or QComboBox; must support addItem and findText
  113. :param choices: list of tuples (text, value) to use to populate widget.
  114. text should be a string, value can be of any type, including None
  115. :param selected_value: initial widget value
  116. :param icon_getter: function of value that returns desired icon
  117. :param add_current_label: if initial value should be labelled as (current)
  118. :return:
  119. """
  120. widget.clear()
  121. selected_item = None
  122. for (name, value) in choices:
  123. if value == selected_value:
  124. selected_item = name
  125. if icon_getter is not None:
  126. widget.addItem(icon_getter(value), name, userData=value)
  127. else:
  128. widget.addItem(name, userData=value)
  129. if selected_item is not None:
  130. widget.setCurrentIndex(widget.findText(selected_item))
  131. else:
  132. widget.addItem(str(selected_value), selected_value)
  133. widget.setCurrentIndex(widget.findText(str(selected_value)))
  134. if add_current_label:
  135. widget.setItemText(widget.currentIndex(),
  136. widget.currentText() + translate(" (current)"))
  137. def initialize_widget_for_property(
  138. widget, choices, holder, property_name, allow_default=False,
  139. icon_getter=None, add_current_label=True):
  140. """
  141. populates widget (ListBox or ComboBox) with items, based on a listed
  142. property. Supports discovering the system default for the given property
  143. and handling qubesadmin.DEFAULT special value. Value of holder.property
  144. will be set as current item. Previous widget contents are erased.
  145. :param widget: QListBox or QComboBox; must support addItem and findText
  146. :param choices: list of tuples (text, value) to use to populate widget.
  147. text should be a string, value can be of any type, including None
  148. :param holder: object to use as property_name's holder
  149. :param property_name: name of the property
  150. :param allow_default: boolean, should a position with qubesadmin.DEFAULT
  151. be added; default False
  152. :param icon_getter: a function applied to values (from choices) that
  153. returns a QIcon to be used as a item icon; default None
  154. :param add_current_label: if initial value should be labelled as (current)
  155. :return:
  156. """
  157. if allow_default:
  158. try:
  159. default_property = holder.property_get_default(property_name)
  160. except exc.QubesDaemonAccessError:
  161. default_property = "ERROR: unavailable"
  162. if default_property is None:
  163. default_property = "none"
  164. choices.append(
  165. (translate("default ({})").format(default_property),
  166. qubesadmin.DEFAULT))
  167. # calculate current (can be default)
  168. try:
  169. is_default = holder.property_is_default(property_name)
  170. except exc.QubesDaemonAccessError:
  171. is_default = False
  172. if is_default:
  173. current_value = qubesadmin.DEFAULT
  174. else:
  175. current_value = getattr(holder, property_name)
  176. initialize_widget(widget,
  177. choices,
  178. selected_value=current_value,
  179. icon_getter=icon_getter,
  180. add_current_label=add_current_label)
  181. # TODO: improvement: add optional icon support
  182. def initialize_widget_with_vms(
  183. widget, qubes_app, filter_function=(lambda x: True),
  184. allow_none=False, holder=None, property_name=None,
  185. allow_default=False, allow_internal=False):
  186. """
  187. populates widget (ListBox or ComboBox) with vm items, optionally based on
  188. a given property. Supports discovering the system default for the property
  189. and handling qubesadmin.DEFAULT special value. Value of holder.property
  190. will be set as current item. Previous widget contents are erased.
  191. :param widget: QListBox or QComboBox; must support addItem and findText
  192. :param qubes_app: Qubes() object
  193. :param filter_function: function used to filter vms; optional
  194. :param allow_none: should a None option be added; default False
  195. :param holder: object to use as property_name's holder
  196. :param property_name: name of the property
  197. :param allow_default: should a position with qubesadmin.DEFAULT be added;
  198. default False
  199. :param allow_internal: should AdminVMs and vms with feature 'internal' be
  200. used
  201. :return:
  202. """
  203. choices = []
  204. for vm in qubes_app.domains:
  205. if not allow_internal and is_internal(vm):
  206. continue
  207. if not filter_function(vm):
  208. continue
  209. choices.append((vm.name, vm))
  210. if allow_none:
  211. choices.append((translate("(none)"), None))
  212. if holder is None:
  213. initialize_widget(widget,
  214. choices,
  215. selected_value=choices[0][1],
  216. add_current_label=False)
  217. else:
  218. initialize_widget_for_property(
  219. widget=widget, choices=choices, holder=holder,
  220. property_name=property_name, allow_default=allow_default)
  221. def initialize_widget_with_default(
  222. widget, choices, add_none=False, add_qubes_default=False,
  223. mark_existing_as_default=False, default_value=None):
  224. """
  225. populates widget (ListBox or ComboBox) with items. Used when there is no
  226. corresponding property, but support for special qubesadmin.DEFAULT value
  227. is still needed.
  228. :param widget: QListBox or QComboBox; must support addItem and findText
  229. :param choices: list of tuples (text, value) to use to populate widget.
  230. text should be a string, value can be of any type, including None
  231. :param add_none: should a 'None' position be added
  232. :param add_qubes_default: should a qubesadmin.DEFAULT position be added
  233. (requires default_value to be set to something meaningful)
  234. :param mark_existing_as_default: should an existing value be marked
  235. as default. If used with conjuction with add_qubes_default, the
  236. default_value listed will be replaced by qubesadmin.DEFAULT
  237. :param default_value: what value should be used as the default
  238. :return:
  239. """
  240. added_existing = False
  241. if mark_existing_as_default:
  242. existing_default = [item for item in choices
  243. if item[1] == default_value]
  244. if existing_default:
  245. choices = [item for item in choices if item not in existing_default]
  246. if add_qubes_default:
  247. # if for some reason (e.g. storage pools) we want to mark an
  248. # actual value as default and replace it with qubesadmin.DEFAULT
  249. default_value = qubesadmin.DEFAULT
  250. choices.insert(
  251. 0, (translate("default ({})").format(existing_default[0][0]),
  252. default_value))
  253. added_existing = True
  254. elif add_qubes_default:
  255. choices.insert(0, (translate("default ({})").format(default_value),
  256. qubesadmin.DEFAULT))
  257. if add_none:
  258. if mark_existing_as_default and default_value is None and \
  259. not added_existing:
  260. choices.append((translate("default (none)"), None))
  261. else:
  262. choices.append((translate("(none)"), None))
  263. if add_qubes_default:
  264. selected_value = qubesadmin.DEFAULT
  265. elif mark_existing_as_default:
  266. selected_value = default_value
  267. else:
  268. selected_value = choices[0][1]
  269. initialize_widget(
  270. widget=widget, choices=choices, selected_value=selected_value,
  271. add_current_label=False)
  272. def initialize_widget_with_kernels(
  273. widget, qubes_app, allow_none=False, holder=None,
  274. property_name=None, allow_default=False):
  275. """
  276. populates widget (ListBox or ComboBox) with kernel items, based on a given
  277. property. Supports discovering the system default for the property
  278. and handling qubesadmin.DEFAULT special value. Value of holder.property
  279. will be set as current item. Previous widget contents are erased.
  280. :param widget: QListBox or QComboBox; must support addItem and findText
  281. :param qubes_app: Qubes() object
  282. :param allow_none: should a None item be added
  283. :param holder: object to use as property_name's holder
  284. :param property_name: name of the property
  285. :param allow_default: should a qubesadmin.DEFAULT item be added
  286. :return:
  287. """
  288. kernels = [kernel.vid for kernel in qubes_app.pools['linux-kernel'].volumes]
  289. kernels = sorted(kernels, key=KernelVersion)
  290. choices = [(kernel, kernel) for kernel in kernels]
  291. if allow_none:
  292. choices.append((translate("(none)"), None))
  293. initialize_widget_for_property(
  294. widget=widget, choices=choices, holder=holder,
  295. property_name=property_name, allow_default=allow_default)
  296. def initialize_widget_with_labels(widget, qubes_app,
  297. holder=None, property_name='label'):
  298. """
  299. populates widget (ListBox or ComboBox) with label items, optionally based
  300. on a given property. Value of holder.property will be set as current item.
  301. Previous widget contents are erased.
  302. :param widget: QListBox or QComboBox; must support addItem and findText
  303. :param qubes_app: Qubes() object
  304. :param holder: object to use as property_name's holder; can be None
  305. :param property_name: name of the property
  306. :return:
  307. """
  308. labels = sorted(qubes_app.labels.values(), key=lambda l: l.index)
  309. choices = [(label.name, label) for label in labels]
  310. icon_getter = (lambda label:
  311. QtGui.QIcon.fromTheme(label.icon))
  312. if holder:
  313. initialize_widget_for_property(widget=widget,
  314. choices=choices,
  315. holder=holder,
  316. property_name=property_name,
  317. icon_getter=icon_getter)
  318. else:
  319. initialize_widget(widget=widget,
  320. choices=choices,
  321. selected_value=labels[0],
  322. icon_getter=icon_getter,
  323. add_current_label=False)
  324. class KernelVersion: # pylint: disable=too-few-public-methods
  325. # Cannot use distutils.version.LooseVersion, because it fails at handling
  326. # versions that have no numbers in them
  327. def __init__(self, string):
  328. self.string = string
  329. self.groups = re.compile(r'(\d+)').split(self.string)
  330. def __lt__(self, other):
  331. for (self_content, other_content) in itertools.zip_longest(
  332. self.groups, other.groups):
  333. if self_content == other_content:
  334. continue
  335. if self_content is None:
  336. return True
  337. if other_content is None:
  338. return False
  339. if self_content.isdigit() and other_content.isdigit():
  340. return int(self_content) < int(other_content)
  341. return self_content < other_content
  342. def is_debug():
  343. return os.getenv('QUBES_MANAGER_DEBUG', '') not in ('', '0')
  344. def debug(*args, **kwargs):
  345. if not is_debug():
  346. return
  347. print(*args, **kwargs)
  348. def get_path_from_vm(vm, service_name):
  349. """
  350. Displays a file/directory selection window for the given VM.
  351. :param vm: vm from which to select path
  352. :param service_name: qubes.SelectFile or qubes.SelectDirectory
  353. :return: path to file, checked for validity
  354. """
  355. path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() -]*")
  356. path_max_len = 512
  357. if not vm:
  358. return None
  359. stdout, _stderr = vm.run_service_for_stdio(service_name)
  360. stdout = stdout.strip()
  361. untrusted_path = stdout.decode(encoding='ascii')[:path_max_len]
  362. if not untrusted_path:
  363. return None
  364. if path_re.fullmatch(untrusted_path):
  365. assert '../' not in untrusted_path
  366. assert '\0' not in untrusted_path
  367. return untrusted_path.strip()
  368. raise ValueError(QtCore.QCoreApplication.translate(
  369. "ManagerUtils", 'Unexpected characters in path.'))
  370. def format_dependencies_list(dependencies):
  371. """Given a list of tuples representing properties, formats them in
  372. a readable list."""
  373. list_text = ""
  374. for (holder, prop) in dependencies:
  375. if holder is None:
  376. list_text += QtCore.QCoreApplication.translate(
  377. "ManagerUtils", "- Global property <b>{}</b> <br>").format(prop)
  378. else:
  379. list_text += QtCore.QCoreApplication.translate(
  380. "ManagerUtils", "- <b>{0}</b> for qube <b>{1}</b> <br>").format(
  381. prop, holder.name)
  382. return list_text
  383. def loop_shutdown():
  384. pending = asyncio.all_tasks()
  385. for task in pending:
  386. with suppress(asyncio.CancelledError):
  387. task.cancel()
  388. # Bases on the original code by:
  389. # Copyright (c) 2002-2007 Pascal Varet <p.varet@gmail.com>
  390. def handle_exception(exc_type, exc_value, exc_traceback):
  391. filename, line, _, _ = traceback.extract_tb(exc_traceback).pop()
  392. filename = os.path.basename(filename)
  393. error = "%s: %s" % (exc_type.__name__, exc_value)
  394. strace = ""
  395. stacktrace = traceback.extract_tb(exc_traceback)
  396. while stacktrace:
  397. (filename, line, func, txt) = stacktrace.pop()
  398. strace += "----\n"
  399. strace += "line: %s\n" % txt
  400. strace += "func: %s\n" % func
  401. strace += "line no.: %d\n" % line
  402. strace += "file: %s\n" % filename
  403. msg_box = QtWidgets.QMessageBox()
  404. msg_box.setDetailedText(strace)
  405. msg_box.setIcon(QtWidgets.QMessageBox.Critical)
  406. msg_box.setWindowTitle(QtCore.QCoreApplication.translate(
  407. "ManagerUtils", "Houston, we have a problem..."))
  408. msg_box.setText(QtCore.QCoreApplication.translate(
  409. "ManagerUtils", "Whoops. A critical error has occured. "
  410. "This is most likely a bug in Qubes Manager.<br><br>"
  411. "<b><i>{0}</i></b><br/>at line <b>{1}</b><br/>of file "
  412. "{2}.<br/><br/>").format(error, line, filename))
  413. msg_box.exec_()
  414. def run_asynchronous(window_class):
  415. qt_app = QtWidgets.QApplication(sys.argv)
  416. translator = QtCore.QTranslator(qt_app)
  417. locale = QtCore.QLocale.system().name()
  418. i18n_dir = os.path.join(
  419. os.path.dirname(os.path.realpath(__file__)),
  420. 'i18n')
  421. translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
  422. qt_app.installTranslator(translator)
  423. QtCore.QCoreApplication.installTranslator(translator)
  424. qt_app.setOrganizationName("The Qubes Project")
  425. qt_app.setOrganizationDomain("http://qubes-os.org")
  426. qt_app.lastWindowClosed.connect(loop_shutdown)
  427. qubes_app = qubesadmin.Qubes()
  428. loop = qasync.QEventLoop(qt_app)
  429. asyncio.set_event_loop(loop)
  430. dispatcher = events.EventsDispatcher(qubes_app)
  431. window = window_class(qt_app, qubes_app, dispatcher)
  432. if hasattr(window, "setup_application"):
  433. window.setup_application()
  434. window.show()
  435. try:
  436. loop.run_until_complete(
  437. asyncio.ensure_future(dispatcher.listen_for_events()))
  438. except asyncio.CancelledError:
  439. pass
  440. except Exception: # pylint: disable=broad-except
  441. loop_shutdown()
  442. exc_type, exc_value, exc_traceback = sys.exc_info()[:3]
  443. handle_exception(exc_type, exc_value, exc_traceback)
  444. def run_synchronous(window_class):
  445. qt_app = QtWidgets.QApplication(sys.argv)
  446. translator = QtCore.QTranslator(qt_app)
  447. locale = QtCore.QLocale.system().name()
  448. i18n_dir = os.path.join(
  449. os.path.dirname(os.path.realpath(__file__)),
  450. 'i18n')
  451. translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
  452. qt_app.installTranslator(translator)
  453. QtCore.QCoreApplication.installTranslator(translator)
  454. qt_app.setOrganizationName("The Qubes Project")
  455. qt_app.setOrganizationDomain("http://qubes-os.org")
  456. sys.excepthook = handle_exception
  457. qubes_app = qubesadmin.Qubes()
  458. window = window_class(qt_app, qubes_app)
  459. if hasattr(window, "setup_application"):
  460. window.setup_application()
  461. window.show()
  462. qt_app.exec_()
  463. qt_app.exit()
  464. return window