utils.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org
  3. #
  4. # Copyright (C) 2012 Agnieszka Kostrzewa <agnieszka.kostrzewa@gmail.com>
  5. # Copyright (C) 2012 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. # Copyright (C) 2020 Marta Marczykowska-Górecka
  9. # <marmarta@invisiblethingslab.com>
  10. #
  11. # This program is free software: you can redistribute it and/or modify
  12. # it under the terms of the GNU General Public License as published by
  13. # the Free Software Foundation, either version 2 of the License, or
  14. # (at your option) any later version.
  15. #
  16. # This program is distributed in the hope that it will be useful,
  17. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  19. # GNU General Public License for more details.
  20. #
  21. # You should have received a copy of the GNU General Public License
  22. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. #
  24. import itertools
  25. import os
  26. import re
  27. import qubesadmin
  28. import traceback
  29. import asyncio
  30. from contextlib import suppress
  31. import sys
  32. import qasync
  33. from qubesadmin import events
  34. from qubesadmin import exc
  35. from PyQt5 import QtWidgets, QtCore, QtGui # pylint: disable=import-error
  36. # important usage note: which initialize_widget should I use?
  37. # - if you want a list of VMs, use initialize_widget_with_vms, optionally
  38. # adding a property if you want to handle qubesadmin.DEFAULT and the
  39. # current (potentially default) value
  40. # - if you want a list of labels or kernals, use
  41. # initialize_widget_with_kernels/labels
  42. # - list of some things, but associated with a definite property (optionally
  43. # with qubesadmin.DEFAULT) - initialize_widget_for_property
  44. # - list of some things, not associated with a property, but still having a
  45. # default - initialize_widget_with_default
  46. # - just a list, no properties or defaults, just a nice list with a "current"
  47. # value - initialize_widget
  48. def is_internal(vm):
  49. """checks if the VM is either an AdminVM or has the 'internal' features"""
  50. try:
  51. return (vm.klass == 'AdminVM'
  52. or vm.features.get('internal', False))
  53. except exc.QubesDaemonCommunicationError:
  54. return False
  55. def is_running(vm, default_state):
  56. """Checks if the VM is running, returns default_state if we have
  57. insufficient permissions to deteremine that."""
  58. try:
  59. return vm.is_running()
  60. except exc.QubesPropertyAccessError:
  61. return default_state
  62. def translate(string):
  63. """helper function for translations"""
  64. return QtCore.QCoreApplication.translate(
  65. "ManagerUtils", string)
  66. class SizeSpinBox(QtWidgets.QSpinBox):
  67. """A SpinBox subclass with extended handling for sizes in MB and GB"""
  68. # pylint: disable=invalid-name, no-self-use
  69. def __init__(self, *args, **kwargs):
  70. super(SizeSpinBox, self).__init__(*args, **kwargs)
  71. self.pattern = r'(\d+\.?\d?) ?(GB|MB)'
  72. self.regex = re.compile(self.pattern)
  73. self.validator = QtGui.QRegExpValidator(QtCore.QRegExp(
  74. self.pattern), self)
  75. def textFromValue(self, v: int) -> str:
  76. if v > 1024:
  77. return '{:.1f} GB'.format(v / 1024)
  78. return '{} MB'.format(v)
  79. def validate(self, text: str, pos: int):
  80. return self.validator.validate(text, pos)
  81. def valueFromText(self, text: str) -> int:
  82. value, unit = self.regex.fullmatch(text.strip()).groups()
  83. if unit == 'GB':
  84. multiplier = 1024
  85. else:
  86. multiplier = 1
  87. return int(float(value) * multiplier)
  88. def get_feature(vm, feature_name, default_value):
  89. try:
  90. return vm.features.get(feature_name, default_value)
  91. except exc.QubesDaemonCommunicationError:
  92. return default_value
  93. def get_boolean_feature(vm, feature_name):
  94. """heper function to get a feature converted to a Bool if it does exist.
  95. Necessary because of the true/false in features being coded as 1/empty
  96. string."""
  97. result = get_feature(vm, feature_name, None)
  98. if result is not None:
  99. result = bool(result)
  100. return result
  101. def did_widget_selection_change(widget):
  102. """a simple heuristic to check if the widget text contains appropriately
  103. translated 'current'"""
  104. return not translate(" (current)") in widget.currentText()
  105. def initialize_widget(widget, choices, selected_value=None,
  106. icon_getter=None, add_current_label=True):
  107. """
  108. populates widget (ListBox or ComboBox) with items. Previous widget contents
  109. are erased.
  110. :param widget: QListBox or QComboBox; must support addItem and findText
  111. :param choices: list of tuples (text, value) to use to populate widget.
  112. text should be a string, value can be of any type, including None
  113. :param selected_value: initial widget value
  114. :param icon_getter: function of value that returns desired icon
  115. :param add_current_label: if initial value should be labelled as (current)
  116. :return:
  117. """
  118. widget.clear()
  119. selected_item = None
  120. for (name, value) in choices:
  121. if value == selected_value:
  122. selected_item = name
  123. if icon_getter is not None:
  124. widget.addItem(icon_getter(value), name, userData=value)
  125. else:
  126. widget.addItem(name, userData=value)
  127. if selected_item is not None:
  128. widget.setCurrentIndex(widget.findText(selected_item))
  129. else:
  130. widget.addItem(str(selected_value), selected_value)
  131. widget.setCurrentIndex(widget.findText(str(selected_value)))
  132. if add_current_label:
  133. widget.setItemText(widget.currentIndex(),
  134. widget.currentText() + translate(" (current)"))
  135. def initialize_widget_for_property(
  136. widget, choices, holder, property_name, allow_default=False,
  137. icon_getter=None, add_current_label=True):
  138. """
  139. populates widget (ListBox or ComboBox) with items, based on a listed
  140. property. Supports discovering the system default for the given property
  141. and handling qubesadmin.DEFAULT special value. Value of holder.property
  142. will be set as current item. Previous widget contents are erased.
  143. :param widget: QListBox or QComboBox; must support addItem and findText
  144. :param choices: list of tuples (text, value) to use to populate widget.
  145. text should be a string, value can be of any type, including None
  146. :param holder: object to use as property_name's holder
  147. :param property_name: name of the property
  148. :param allow_default: boolean, should a position with qubesadmin.DEFAULT
  149. be added; default False
  150. :param icon_getter: a function applied to values (from choices) that
  151. returns a QIcon to be used as a item icon; default None
  152. :param add_current_label: if initial value should be labelled as (current)
  153. :return:
  154. """
  155. if allow_default:
  156. default_property = holder.property_get_default(property_name)
  157. if default_property is None:
  158. default_property = "none"
  159. choices.append(
  160. (translate("default ({})").format(default_property),
  161. qubesadmin.DEFAULT))
  162. # calculate current (can be default)
  163. if holder.property_is_default(property_name):
  164. current_value = qubesadmin.DEFAULT
  165. else:
  166. current_value = getattr(holder, property_name)
  167. initialize_widget(widget,
  168. choices,
  169. selected_value=current_value,
  170. icon_getter=icon_getter,
  171. add_current_label=add_current_label)
  172. # TODO: improvement: add optional icon support
  173. def initialize_widget_with_vms(
  174. widget, qubes_app, filter_function=(lambda x: True),
  175. allow_none=False, holder=None, property_name=None,
  176. allow_default=False, allow_internal=False):
  177. """
  178. populates widget (ListBox or ComboBox) with vm items, optionally based on
  179. a given property. Supports discovering the system default for the property
  180. and handling qubesadmin.DEFAULT special value. Value of holder.property
  181. will be set as current item. Previous widget contents are erased.
  182. :param widget: QListBox or QComboBox; must support addItem and findText
  183. :param qubes_app: Qubes() object
  184. :param filter_function: function used to filter vms; optional
  185. :param allow_none: should a None option be added; default False
  186. :param holder: object to use as property_name's holder
  187. :param property_name: name of the property
  188. :param allow_default: should a position with qubesadmin.DEFAULT be added;
  189. default False
  190. :param allow_internal: should AdminVMs and vms with feature 'internal' be
  191. used
  192. :return:
  193. """
  194. choices = []
  195. for vm in qubes_app.domains:
  196. if not allow_internal and is_internal(vm):
  197. continue
  198. if not filter_function(vm):
  199. continue
  200. choices.append((vm.name, vm))
  201. if allow_none:
  202. choices.append((translate("(none)"), None))
  203. if holder is None:
  204. initialize_widget(widget,
  205. choices,
  206. selected_value=choices[0][1],
  207. add_current_label=False)
  208. else:
  209. initialize_widget_for_property(
  210. widget=widget, choices=choices, holder=holder,
  211. property_name=property_name, allow_default=allow_default)
  212. def initialize_widget_with_default(
  213. widget, choices, add_none=False, add_qubes_default=False,
  214. mark_existing_as_default=False, default_value=None):
  215. """
  216. populates widget (ListBox or ComboBox) with items. Used when there is no
  217. corresponding property, but support for special qubesadmin.DEFAULT value
  218. is still needed.
  219. :param widget: QListBox or QComboBox; must support addItem and findText
  220. :param choices: list of tuples (text, value) to use to populate widget.
  221. text should be a string, value can be of any type, including None
  222. :param add_none: should a 'None' position be added
  223. :param add_qubes_default: should a qubesadmin.DEFAULT position be added
  224. (requires default_value to be set to something meaningful)
  225. :param mark_existing_as_default: should an existing value be marked
  226. as default. If used with conjuction with add_qubes_default, the
  227. default_value listed will be replaced by qubesadmin.DEFAULT
  228. :param default_value: what value should be used as the default
  229. :return:
  230. """
  231. added_existing = False
  232. if mark_existing_as_default:
  233. existing_default = [item for item in choices
  234. if item[1] == default_value]
  235. if existing_default:
  236. choices = [item for item in choices if item not in existing_default]
  237. if add_qubes_default:
  238. # if for some reason (e.g. storage pools) we want to mark an
  239. # actual value as default and replace it with qubesadmin.DEFAULT
  240. default_value = qubesadmin.DEFAULT
  241. choices.insert(
  242. 0, (translate("default ({})").format(existing_default[0][0]),
  243. default_value))
  244. added_existing = True
  245. elif add_qubes_default:
  246. choices.insert(0, (translate("default ({})").format(default_value),
  247. qubesadmin.DEFAULT))
  248. if add_none:
  249. if mark_existing_as_default and default_value is None and \
  250. not added_existing:
  251. choices.append((translate("default (none)"), None))
  252. else:
  253. choices.append((translate("(none)"), None))
  254. if add_qubes_default:
  255. selected_value = qubesadmin.DEFAULT
  256. elif mark_existing_as_default:
  257. selected_value = default_value
  258. else:
  259. selected_value = choices[0][1]
  260. initialize_widget(
  261. widget=widget, choices=choices, selected_value=selected_value,
  262. add_current_label=False)
  263. def initialize_widget_with_kernels(
  264. widget, qubes_app, allow_none=False, holder=None,
  265. property_name=None, allow_default=False):
  266. """
  267. populates widget (ListBox or ComboBox) with kernel items, based on a given
  268. property. Supports discovering the system default for the property
  269. and handling qubesadmin.DEFAULT special value. Value of holder.property
  270. will be set as current item. Previous widget contents are erased.
  271. :param widget: QListBox or QComboBox; must support addItem and findText
  272. :param qubes_app: Qubes() object
  273. :param allow_none: should a None item be added
  274. :param holder: object to use as property_name's holder
  275. :param property_name: name of the property
  276. :param allow_default: should a qubesadmin.DEFAULT item be added
  277. :return:
  278. """
  279. kernels = [kernel.vid for kernel in qubes_app.pools['linux-kernel'].volumes]
  280. kernels = sorted(kernels, key=KernelVersion)
  281. choices = [(kernel, kernel) for kernel in kernels]
  282. if allow_none:
  283. choices.append((translate("(none)"), None))
  284. initialize_widget_for_property(
  285. widget=widget, choices=choices, holder=holder,
  286. property_name=property_name, allow_default=allow_default)
  287. def initialize_widget_with_labels(widget, qubes_app,
  288. holder=None, property_name='label'):
  289. """
  290. populates widget (ListBox or ComboBox) with label items, optionally based
  291. on a given property. Value of holder.property will be set as current item.
  292. Previous widget contents are erased.
  293. :param widget: QListBox or QComboBox; must support addItem and findText
  294. :param qubes_app: Qubes() object
  295. :param holder: object to use as property_name's holder; can be None
  296. :param property_name: name of the property
  297. :return:
  298. """
  299. labels = sorted(qubes_app.labels.values(), key=lambda l: l.index)
  300. choices = [(label.name, label) for label in labels]
  301. icon_getter = (lambda label:
  302. QtGui.QIcon.fromTheme(label.icon))
  303. if holder:
  304. initialize_widget_for_property(widget=widget,
  305. choices=choices,
  306. holder=holder,
  307. property_name=property_name,
  308. icon_getter=icon_getter)
  309. else:
  310. initialize_widget(widget=widget,
  311. choices=choices,
  312. selected_value=labels[0],
  313. icon_getter=icon_getter,
  314. add_current_label=False)
  315. class KernelVersion: # pylint: disable=too-few-public-methods
  316. # Cannot use distutils.version.LooseVersion, because it fails at handling
  317. # versions that have no numbers in them
  318. def __init__(self, string):
  319. self.string = string
  320. self.groups = re.compile(r'(\d+)').split(self.string)
  321. def __lt__(self, other):
  322. for (self_content, other_content) in itertools.zip_longest(
  323. self.groups, other.groups):
  324. if self_content == other_content:
  325. continue
  326. if self_content is None:
  327. return True
  328. if other_content is None:
  329. return False
  330. if self_content.isdigit() and other_content.isdigit():
  331. return int(self_content) < int(other_content)
  332. return self_content < other_content
  333. def is_debug():
  334. return os.getenv('QUBES_MANAGER_DEBUG', '') not in ('', '0')
  335. def debug(*args, **kwargs):
  336. if not is_debug():
  337. return
  338. print(*args, **kwargs)
  339. def get_path_from_vm(vm, service_name):
  340. """
  341. Displays a file/directory selection window for the given VM.
  342. :param vm: vm from which to select path
  343. :param service_name: qubes.SelectFile or qubes.SelectDirectory
  344. :return: path to file, checked for validity
  345. """
  346. path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() -]*")
  347. path_max_len = 512
  348. if not vm:
  349. return None
  350. stdout, _stderr = vm.run_service_for_stdio(service_name)
  351. stdout = stdout.strip()
  352. untrusted_path = stdout.decode(encoding='ascii')[:path_max_len]
  353. if not untrusted_path:
  354. return None
  355. if path_re.fullmatch(untrusted_path):
  356. assert '../' not in untrusted_path
  357. assert '\0' not in untrusted_path
  358. return untrusted_path.strip()
  359. raise ValueError(QtCore.QCoreApplication.translate(
  360. "ManagerUtils", 'Unexpected characters in path.'))
  361. def format_dependencies_list(dependencies):
  362. """Given a list of tuples representing properties, formats them in
  363. a readable list."""
  364. list_text = ""
  365. for (holder, prop) in dependencies:
  366. if holder is None:
  367. list_text += QtCore.QCoreApplication.translate(
  368. "ManagerUtils", "- Global property <b>{}</b> <br>").format(prop)
  369. else:
  370. list_text += QtCore.QCoreApplication.translate(
  371. "ManagerUtils", "- <b>{0}</b> for qube <b>{1}</b> <br>").format(
  372. prop, holder.name)
  373. return list_text
  374. def loop_shutdown():
  375. pending = asyncio.Task.all_tasks()
  376. for task in pending:
  377. with suppress(asyncio.CancelledError):
  378. task.cancel()
  379. # Bases on the original code by:
  380. # Copyright (c) 2002-2007 Pascal Varet <p.varet@gmail.com>
  381. def handle_exception(exc_type, exc_value, exc_traceback):
  382. filename, line, _, _ = traceback.extract_tb(exc_traceback).pop()
  383. filename = os.path.basename(filename)
  384. error = "%s: %s" % (exc_type.__name__, exc_value)
  385. strace = ""
  386. stacktrace = traceback.extract_tb(exc_traceback)
  387. while stacktrace:
  388. (filename, line, func, txt) = stacktrace.pop()
  389. strace += "----\n"
  390. strace += "line: %s\n" % txt
  391. strace += "func: %s\n" % func
  392. strace += "line no.: %d\n" % line
  393. strace += "file: %s\n" % filename
  394. msg_box = QtWidgets.QMessageBox()
  395. msg_box.setDetailedText(strace)
  396. msg_box.setIcon(QtWidgets.QMessageBox.Critical)
  397. msg_box.setWindowTitle(QtCore.QCoreApplication.translate(
  398. "ManagerUtils", "Houston, we have a problem..."))
  399. msg_box.setText(QtCore.QCoreApplication.translate(
  400. "ManagerUtils", "Whoops. A critical error has occured. "
  401. "This is most likely a bug in Qubes Manager.<br><br>"
  402. "<b><i>{0}</i></b><br/>at line <b>{1}</b><br/>of file "
  403. "{2}.<br/><br/>").format(error, line, filename))
  404. msg_box.exec_()
  405. def run_asynchronous(window_class):
  406. qt_app = QtWidgets.QApplication(sys.argv)
  407. translator = QtCore.QTranslator(qt_app)
  408. locale = QtCore.QLocale.system().name()
  409. i18n_dir = os.path.join(
  410. os.path.dirname(os.path.realpath(__file__)),
  411. 'i18n')
  412. translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
  413. qt_app.installTranslator(translator)
  414. QtCore.QCoreApplication.installTranslator(translator)
  415. qt_app.setOrganizationName("The Qubes Project")
  416. qt_app.setOrganizationDomain("http://qubes-os.org")
  417. qt_app.lastWindowClosed.connect(loop_shutdown)
  418. qubes_app = qubesadmin.Qubes()
  419. loop = qasync.QEventLoop(qt_app)
  420. asyncio.set_event_loop(loop)
  421. dispatcher = events.EventsDispatcher(qubes_app)
  422. window = window_class(qt_app, qubes_app, dispatcher)
  423. if hasattr(window, "setup_application"):
  424. window.setup_application()
  425. window.show()
  426. try:
  427. loop.run_until_complete(
  428. asyncio.ensure_future(dispatcher.listen_for_events()))
  429. except asyncio.CancelledError:
  430. pass
  431. except Exception: # pylint: disable=broad-except
  432. loop_shutdown()
  433. exc_type, exc_value, exc_traceback = sys.exc_info()[:3]
  434. handle_exception(exc_type, exc_value, exc_traceback)
  435. def run_synchronous(window_class):
  436. qt_app = QtWidgets.QApplication(sys.argv)
  437. translator = QtCore.QTranslator(qt_app)
  438. locale = QtCore.QLocale.system().name()
  439. i18n_dir = os.path.join(
  440. os.path.dirname(os.path.realpath(__file__)),
  441. 'i18n')
  442. translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
  443. qt_app.installTranslator(translator)
  444. QtCore.QCoreApplication.installTranslator(translator)
  445. qt_app.setOrganizationName("The Qubes Project")
  446. qt_app.setOrganizationDomain("http://qubes-os.org")
  447. sys.excepthook = handle_exception
  448. qubes_app = qubesadmin.Qubes()
  449. window = window_class(qt_app, qubes_app)
  450. if hasattr(window, "setup_application"):
  451. window.setup_application()
  452. window.show()
  453. qt_app.exec_()
  454. qt_app.exit()