utils.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org
  3. #
  4. # Copyright (C) 2012 Agnieszka Kostrzewa <agnieszka.kostrzewa@gmail.com>
  5. # Copyright (C) 2012 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. #
  9. # This program is free software: you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by
  11. # the Free Software Foundation, either version 2 of the License, or
  12. # (at your option) any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. import itertools
  23. import os
  24. import re
  25. import qubesadmin
  26. import traceback
  27. import asyncio
  28. from contextlib import suppress
  29. import sys
  30. import quamash
  31. from qubesadmin import events
  32. from PyQt5 import QtWidgets, QtCore, QtGui # pylint: disable=import-error
  33. #TODO: remove
  34. def _filter_internal(vm):
  35. return (not vm.klass == 'AdminVM'
  36. and not vm.features.get('internal', False))
  37. def is_internal(vm):
  38. return (vm.klass == 'AdminVM'
  39. or vm.features.get('internal', False))
  40. def translate(string):
  41. return QtCore.QCoreApplication.translate(
  42. "ManagerUtils", string)
  43. class SizeSpinBox(QtWidgets.QSpinBox):
  44. # pylint: disable=invalid-name, no-self-use
  45. def __init__(self, *args, **kwargs):
  46. super(SizeSpinBox, self).__init__(*args, **kwargs)
  47. self.pattern = r'(\d+\.?\d?) ?(GB|MB)'
  48. self.regex = re.compile(self.pattern)
  49. self.validator = QtGui.QRegExpValidator(QtCore.QRegExp(
  50. self.pattern), self)
  51. def textFromValue(self, v: int) -> str:
  52. if v > 1024:
  53. return '{:.1f} GB'.format(v / 1024)
  54. return '{} MB'.format(v)
  55. def validate(self, text: str, pos: int):
  56. return self.validator.validate(text, pos)
  57. def valueFromText(self, text: str) -> int:
  58. value, unit = self.regex.fullmatch(text.strip()).groups()
  59. if unit == 'GB':
  60. multiplier = 1024
  61. else:
  62. multiplier = 1
  63. return int(float(value) * multiplier)
  64. def get_boolean_feature(vm, feature_name):
  65. result = vm.features.get(feature_name, None)
  66. if result is not None:
  67. result = bool(result)
  68. return result
  69. # TODO: doublecheck translation
  70. def did_widget_selection_change(widget):
  71. return not translate(" (current)") in widget.currentText()
  72. def initialize_widget(widget, choices, selected_value=None, icon_getter=None):
  73. """
  74. populates widget (ListBox or ComboBox) with items. Previous widget contents
  75. are erased.
  76. :param widget: widget to populate
  77. :param choices: list of tuples (text, value) to use to populate widget
  78. :param selected_value: value to populate widget with
  79. :param icon_getter: function of value that returns desired icon
  80. :return:
  81. """
  82. widget.clear()
  83. selected_item = None
  84. for (name, value) in choices:
  85. if value == selected_value:
  86. selected_item = name
  87. if icon_getter is not None:
  88. widget.addItem(icon_getter(value), name, userData=value)
  89. else:
  90. widget.addItem(name, userData=value)
  91. if selected_item is not None:
  92. widget.setCurrentIndex(widget.findText(selected_item))
  93. else:
  94. widget.addItem(str(selected_value), selected_value)
  95. widget.setCurrentIndex(widget.findText(str(selected_value)))
  96. widget.setItemText(widget.currentIndex(),
  97. widget.currentText() + translate(" (current)"))
  98. def initialize_widget_for_property(
  99. widget, choices, holder, property_name, allow_default=False,
  100. icon_getter=None):
  101. # potentially add default
  102. if allow_default:
  103. default_property = holder.property_get_default(property_name)
  104. if default_property is None:
  105. default_property = "none"
  106. choices.append(
  107. (translate("default ({})").format(default_property),
  108. qubesadmin.DEFAULT))
  109. # calculate current (can be default)
  110. if holder.property_is_default(property_name):
  111. current_value = qubesadmin.DEFAULT
  112. else:
  113. current_value = getattr(holder, property_name)
  114. initialize_widget(widget,
  115. choices,
  116. selected_value=current_value,
  117. icon_getter=icon_getter)
  118. # TODO: add use icons here
  119. def initialize_widget_with_vms(widget,
  120. qubes_app,
  121. filter_function=(lambda x: True),
  122. allow_none=False,
  123. holder=None,
  124. property_name=None,
  125. allow_default=False,
  126. allow_internal=False):
  127. choices = []
  128. for vm in qubes_app.domains:
  129. if not allow_internal and is_internal(vm):
  130. continue
  131. if not filter_function(vm):
  132. continue
  133. choices.append((vm.name, vm))
  134. if allow_none:
  135. choices.append((translate("(none)"), None))
  136. initialize_widget_for_property(
  137. widget=widget, choices=choices, holder=holder,
  138. property_name=property_name, allow_default=allow_default)
  139. def initialize_widget_with_kernels(widget,
  140. qubes_app,
  141. allow_none=False,
  142. holder=None,
  143. property_name=None,
  144. allow_default=False
  145. ):
  146. kernels = [kernel.vid for kernel in qubes_app.pools['linux-kernel'].volumes]
  147. kernels = sorted(kernels, key=KernelVersion)
  148. choices = [(kernel, kernel) for kernel in kernels]
  149. if allow_none:
  150. choices.append((translate("(none)"), None))
  151. initialize_widget_for_property(
  152. widget=widget, choices=choices, holder=holder,
  153. property_name=property_name, allow_default=allow_default)
  154. def initialize_widget_with_labels(widget,
  155. qubes_app,
  156. holder=None,
  157. property_name='label'):
  158. labels = sorted(qubes_app.labels.values(), key=lambda l: l.index)
  159. choices = [(label.name, label) for label in labels]
  160. initialize_widget_for_property(
  161. widget=widget,
  162. choices=choices,
  163. holder=holder,
  164. property_name=property_name,
  165. icon_getter=(lambda label:
  166. QtGui.QIcon.fromTheme(label.icon)))
  167. def prepare_choice(widget, holder, propname, choice, default,
  168. filter_function=None, *,
  169. icon_getter=None, allow_internal=None, allow_default=False,
  170. allow_none=False, transform=None):
  171. # for newly created vms, set propname to None
  172. # clear the widget, so that prepare_choice functions can be used
  173. # to refresh widget values
  174. while widget.count() > 0:
  175. widget.removeItem(0)
  176. debug(
  177. 'prepare_choice(widget={widget!r}, '
  178. 'holder={holder!r}, '
  179. 'propname={propname!r}, '
  180. 'choice={choice!r}, '
  181. 'default={default!r}, '
  182. 'filter_function={filter_function!r}, '
  183. 'icon_getter={icon_getter!r}, '
  184. 'allow_internal={allow_internal!r}, '
  185. 'allow_default={allow_default!r}, '
  186. 'allow_none={allow_none!r})'.format(**locals()))
  187. if propname is not None and allow_default:
  188. default = holder.property_get_default(propname)
  189. if allow_internal is None:
  190. allow_internal = propname is None or not propname.endswith('vm')
  191. if propname is not None:
  192. if holder.property_is_default(propname):
  193. oldvalue = qubesadmin.DEFAULT
  194. else:
  195. oldvalue = getattr(holder, propname)
  196. if oldvalue == '':
  197. oldvalue = None
  198. if transform is not None and oldvalue is not None:
  199. oldvalue = transform(oldvalue)
  200. else:
  201. oldvalue = object() # won't match for identity
  202. idx = 0
  203. choice_list = list(choice)[:]
  204. if not allow_internal:
  205. choice_list = filter(_filter_internal, choice_list)
  206. if filter_function is not None:
  207. choice_list = filter(filter_function, choice_list)
  208. choice_list = list(choice_list)
  209. if allow_default:
  210. choice_list.insert(0, qubesadmin.DEFAULT)
  211. if allow_none:
  212. choice_list.append(None)
  213. for i, item in enumerate(choice_list):
  214. debug('i={} item={}'.format(i, item))
  215. # 0: default (unset)
  216. if item is qubesadmin.DEFAULT:
  217. default_string = str(default) if default is not None else 'none'
  218. if transform is not None:
  219. default_string = transform(default_string)
  220. text = QtCore.QCoreApplication.translate(
  221. "ManagerUtils", 'default ({})').format(default_string)
  222. # N+1: explicit None
  223. elif item is None:
  224. text = QtCore.QCoreApplication.translate("ManagerUtils", '(none)')
  225. # 1..N: choices
  226. else:
  227. text = str(item)
  228. if transform is not None:
  229. text = transform(text)
  230. if item == oldvalue:
  231. text += QtCore.QCoreApplication.translate(
  232. "ManagerUtils", ' (current)')
  233. idx = i
  234. widget.insertItem(i, text)
  235. if icon_getter is not None:
  236. icon = icon_getter(item)
  237. if icon is not None:
  238. widget.setItemIcon(i, icon)
  239. widget.setCurrentIndex(idx)
  240. return choice_list, idx
  241. class KernelVersion: # pylint: disable=too-few-public-methods
  242. # Cannot use distutils.version.LooseVersion, because it fails at handling
  243. # versions that have no numbers in them
  244. def __init__(self, string):
  245. self.string = string
  246. self.groups = re.compile(r'(\d+)').split(self.string)
  247. def __lt__(self, other):
  248. for (self_content, other_content) in itertools.zip_longest(
  249. self.groups, other.groups):
  250. if self_content == other_content:
  251. continue
  252. if self_content is None:
  253. return True
  254. if other_content is None:
  255. return False
  256. if self_content.isdigit() and other_content.isdigit():
  257. return int(self_content) < int(other_content)
  258. return self_content < other_content
  259. def prepare_kernel_choice(widget, holder, propname, default, *args, **kwargs):
  260. try:
  261. app = holder.app
  262. except AttributeError:
  263. app = holder
  264. kernels = [kernel.vid for kernel in app.pools['linux-kernel'].volumes]
  265. kernels = sorted(kernels, key=KernelVersion)
  266. return prepare_choice(
  267. widget, holder, propname, kernels, default, *args, **kwargs)
  268. def prepare_label_choice(widget, holder, propname, default, *args, **kwargs):
  269. try:
  270. app = holder.app
  271. except AttributeError:
  272. app = holder
  273. return prepare_choice(widget, holder, propname,
  274. sorted(app.labels.values(), key=lambda l: l.index),
  275. default, *args,
  276. icon_getter=(lambda label:
  277. QtGui.QIcon.fromTheme(label.icon)),
  278. **kwargs)
  279. def prepare_vm_choice(widget, holder, propname, default, *args, **kwargs):
  280. try:
  281. app = holder.app
  282. except AttributeError:
  283. app = holder
  284. return prepare_choice(widget, holder, propname, app.domains, default,
  285. *args, **kwargs)
  286. def is_debug():
  287. return os.getenv('QUBES_MANAGER_DEBUG', '') not in ('', '0')
  288. def debug(*args, **kwargs):
  289. if not is_debug():
  290. return
  291. print(*args, **kwargs)
  292. def get_path_from_vm(vm, service_name):
  293. """
  294. Displays a file/directory selection window for the given VM.
  295. :param vm: vm from which to select path
  296. :param service_name: qubes.SelectFile or qubes.SelectDirectory
  297. :return: path to file, checked for validity
  298. """
  299. path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() -]*")
  300. path_max_len = 512
  301. if not vm:
  302. return None
  303. stdout, _stderr = vm.run_service_for_stdio(service_name)
  304. stdout = stdout.strip()
  305. untrusted_path = stdout.decode(encoding='ascii')[:path_max_len]
  306. if not untrusted_path:
  307. return None
  308. if path_re.fullmatch(untrusted_path):
  309. assert '../' not in untrusted_path
  310. assert '\0' not in untrusted_path
  311. return untrusted_path.strip()
  312. raise ValueError(QtCore.QCoreApplication.translate(
  313. "ManagerUtils", 'Unexpected characters in path.'))
  314. def format_dependencies_list(dependencies):
  315. """Given a list of tuples representing properties, formats them in
  316. a readable list."""
  317. list_text = ""
  318. for (holder, prop) in dependencies:
  319. if holder is None:
  320. list_text += QtCore.QCoreApplication.translate(
  321. "ManagerUtils", "- Global property <b>{}</b> <br>").format(prop)
  322. else:
  323. list_text += QtCore.QCoreApplication.translate(
  324. "ManagerUtils", "- <b>{0}</b> for qube <b>{1}</b> <br>").format(
  325. prop, holder.name)
  326. return list_text
  327. def loop_shutdown():
  328. pending = asyncio.Task.all_tasks()
  329. for task in pending:
  330. with suppress(asyncio.CancelledError):
  331. task.cancel()
  332. # Bases on the original code by:
  333. # Copyright (c) 2002-2007 Pascal Varet <p.varet@gmail.com>
  334. def handle_exception(exc_type, exc_value, exc_traceback):
  335. filename, line, _, _ = traceback.extract_tb(exc_traceback).pop()
  336. filename = os.path.basename(filename)
  337. error = "%s: %s" % (exc_type.__name__, exc_value)
  338. strace = ""
  339. stacktrace = traceback.extract_tb(exc_traceback)
  340. while stacktrace:
  341. (filename, line, func, txt) = stacktrace.pop()
  342. strace += "----\n"
  343. strace += "line: %s\n" % txt
  344. strace += "func: %s\n" % func
  345. strace += "line no.: %d\n" % line
  346. strace += "file: %s\n" % filename
  347. msg_box = QtWidgets.QMessageBox()
  348. msg_box.setDetailedText(strace)
  349. msg_box.setIcon(QtWidgets.QMessageBox.Critical)
  350. msg_box.setWindowTitle(QtCore.QCoreApplication.translate(
  351. "ManagerUtils", "Houston, we have a problem..."))
  352. msg_box.setText(QtCore.QCoreApplication.translate(
  353. "ManagerUtils", "Whoops. A critical error has occured. "
  354. "This is most likely a bug in Qubes Manager.<br><br>"
  355. "<b><i>{0}</i></b><br/>at line <b>{1}</b><br/>of file "
  356. "{2}.<br/><br/>").format(error, line, filename))
  357. msg_box.exec_()
  358. def run_asynchronous(window_class):
  359. qt_app = QtWidgets.QApplication(sys.argv)
  360. translator = QtCore.QTranslator(qt_app)
  361. locale = QtCore.QLocale.system().name()
  362. i18n_dir = os.path.join(
  363. os.path.dirname(os.path.realpath(__file__)),
  364. 'i18n')
  365. translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
  366. qt_app.installTranslator(translator)
  367. QtCore.QCoreApplication.installTranslator(translator)
  368. qt_app.setOrganizationName("The Qubes Project")
  369. qt_app.setOrganizationDomain("http://qubes-os.org")
  370. qt_app.lastWindowClosed.connect(loop_shutdown)
  371. qubes_app = qubesadmin.Qubes()
  372. loop = quamash.QEventLoop(qt_app)
  373. asyncio.set_event_loop(loop)
  374. dispatcher = events.EventsDispatcher(qubes_app)
  375. window = window_class(qt_app, qubes_app, dispatcher)
  376. if hasattr(window, "setup_application"):
  377. window.setup_application()
  378. window.show()
  379. try:
  380. loop.run_until_complete(
  381. asyncio.ensure_future(dispatcher.listen_for_events()))
  382. except asyncio.CancelledError:
  383. pass
  384. except Exception: # pylint: disable=broad-except
  385. loop_shutdown()
  386. exc_type, exc_value, exc_traceback = sys.exc_info()[:3]
  387. handle_exception(exc_type, exc_value, exc_traceback)
  388. def run_synchronous(window_class):
  389. qt_app = QtWidgets.QApplication(sys.argv)
  390. translator = QtCore.QTranslator(qt_app)
  391. locale = QtCore.QLocale.system().name()
  392. i18n_dir = os.path.join(
  393. os.path.dirname(os.path.realpath(__file__)),
  394. 'i18n')
  395. translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
  396. qt_app.installTranslator(translator)
  397. QtCore.QCoreApplication.installTranslator(translator)
  398. qt_app.setOrganizationName("The Qubes Project")
  399. qt_app.setOrganizationDomain("http://qubes-os.org")
  400. sys.excepthook = handle_exception
  401. qubes_app = qubesadmin.Qubes()
  402. window = window_class(qt_app, qubes_app)
  403. if hasattr(window, "setup_application"):
  404. window.setup_application()
  405. window.show()
  406. qt_app.exec_()
  407. qt_app.exit()