# # The Qubes OS Project, https://www.qubes-os.org # # Copyright (C) 2012 Agnieszka Kostrzewa # Copyright (C) 2012 Marek Marczykowski-Górecki # # Copyright (C) 2017 Wojtek Porczyk # Copyright (C) 2020 Marta Marczykowska-Górecka # # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import itertools import os import re import qubesadmin import traceback import asyncio from contextlib import suppress import sys import qasync from qubesadmin import events from qubesadmin import exc from PyQt5 import QtWidgets, QtCore, QtGui # pylint: disable=import-error # important usage note: which initialize_widget should I use? # - if you want a list of VMs, use initialize_widget_with_vms, optionally # adding a property if you want to handle qubesadmin.DEFAULT and the # current (potentially default) value # - if you want a list of labels or kernals, use # initialize_widget_with_kernels/labels # - list of some things, but associated with a definite property (optionally # with qubesadmin.DEFAULT) - initialize_widget_for_property # - list of some things, not associated with a property, but still having a # default - initialize_widget_with_default # - just a list, no properties or defaults, just a nice list with a "current" # value - initialize_widget def is_internal(vm): """checks if the VM is either an AdminVM or has the 'internal' features""" try: return (vm.klass == 'AdminVM' or vm.features.get('internal', False)) except exc.QubesDaemonAccessError: return False def is_running(vm, default_state): """Checks if the VM is running, returns default_state if we have insufficient permissions to deteremine that.""" try: return vm.is_running() except exc.QubesDaemonAccessError: return default_state def translate(string): """helper function for translations""" return QtCore.QCoreApplication.translate( "ManagerUtils", string) class SizeSpinBox(QtWidgets.QSpinBox): """A SpinBox subclass with extended handling for sizes in MB and GB""" # pylint: disable=invalid-name, no-self-use def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.pattern = r'(\d+\.?\d?) ?(GB|MB)' self.regex = re.compile(self.pattern) self.validator = QtGui.QRegExpValidator(QtCore.QRegExp( self.pattern), self) def textFromValue(self, v: int) -> str: if v > 1024: return '{:.1f} GB'.format(v / 1024) return '{} MB'.format(v) def validate(self, text: str, pos: int): return self.validator.validate(text, pos) def valueFromText(self, text: str) -> int: value, unit = self.regex.fullmatch(text.strip()).groups() if unit == 'GB': multiplier = 1024 else: multiplier = 1 return int(float(value) * multiplier) def get_feature(vm, feature_name, default_value): try: return vm.features.get(feature_name, default_value) except exc.QubesDaemonAccessError: return default_value def get_boolean_feature(vm, feature_name): """heper function to get a feature converted to a Bool if it does exist. Necessary because of the true/false in features being coded as 1/empty string.""" result = get_feature(vm, feature_name, None) if result is not None: result = bool(result) return result def did_widget_selection_change(widget): """a simple heuristic to check if the widget text contains appropriately translated 'current'""" if not widget.isEnabled(): return False return not translate(" (current)") in widget.currentText() def initialize_widget(widget, choices, selected_value=None, icon_getter=None, add_current_label=True): """ populates widget (ListBox or ComboBox) with items. Previous widget contents are erased. :param widget: QListBox or QComboBox; must support addItem and findText :param choices: list of tuples (text, value) to use to populate widget. text should be a string, value can be of any type, including None :param selected_value: initial widget value :param icon_getter: function of value that returns desired icon :param add_current_label: if initial value should be labelled as (current) :return: """ widget.clear() selected_item = None for (name, value) in choices: if value == selected_value: selected_item = name if icon_getter is not None: widget.addItem(icon_getter(value), name, userData=value) else: widget.addItem(name, userData=value) if selected_item is not None: widget.setCurrentIndex(widget.findText(selected_item)) else: widget.addItem(str(selected_value), selected_value) widget.setCurrentIndex(widget.findText(str(selected_value))) if add_current_label: widget.setItemText(widget.currentIndex(), widget.currentText() + translate(" (current)")) def initialize_widget_for_property( widget, choices, holder, property_name, allow_default=False, icon_getter=None, add_current_label=True): """ populates widget (ListBox or ComboBox) with items, based on a listed property. Supports discovering the system default for the given property and handling qubesadmin.DEFAULT special value. Value of holder.property will be set as current item. Previous widget contents are erased. :param widget: QListBox or QComboBox; must support addItem and findText :param choices: list of tuples (text, value) to use to populate widget. text should be a string, value can be of any type, including None :param holder: object to use as property_name's holder :param property_name: name of the property :param allow_default: boolean, should a position with qubesadmin.DEFAULT be added; default False :param icon_getter: a function applied to values (from choices) that returns a QIcon to be used as a item icon; default None :param add_current_label: if initial value should be labelled as (current) :return: """ if allow_default: try: default_property = holder.property_get_default(property_name) except exc.QubesDaemonAccessError: default_property = "ERROR: unavailable" if default_property is None: default_property = "none" choices.append( (translate("default ({})").format(default_property), qubesadmin.DEFAULT)) # calculate current (can be default) try: is_default = holder.property_is_default(property_name) except exc.QubesDaemonAccessError: is_default = False if is_default: current_value = qubesadmin.DEFAULT else: current_value = getattr(holder, property_name) initialize_widget(widget, choices, selected_value=current_value, icon_getter=icon_getter, add_current_label=add_current_label) # TODO: improvement: add optional icon support def initialize_widget_with_vms( widget, qubes_app, filter_function=(lambda x: True), allow_none=False, holder=None, property_name=None, allow_default=False, allow_internal=False): """ populates widget (ListBox or ComboBox) with vm items, optionally based on a given property. Supports discovering the system default for the property and handling qubesadmin.DEFAULT special value. Value of holder.property will be set as current item. Previous widget contents are erased. :param widget: QListBox or QComboBox; must support addItem and findText :param qubes_app: Qubes() object :param filter_function: function used to filter vms; optional :param allow_none: should a None option be added; default False :param holder: object to use as property_name's holder :param property_name: name of the property :param allow_default: should a position with qubesadmin.DEFAULT be added; default False :param allow_internal: should AdminVMs and vms with feature 'internal' be used :return: """ choices = [] for vm in qubes_app.domains: if not allow_internal and is_internal(vm): continue if not filter_function(vm): continue choices.append((vm.name, vm)) if allow_none: choices.append((translate("(none)"), None)) if holder is None: initialize_widget(widget, choices, selected_value=choices[0][1], add_current_label=False) else: initialize_widget_for_property( widget=widget, choices=choices, holder=holder, property_name=property_name, allow_default=allow_default) def initialize_widget_with_default( widget, choices, add_none=False, add_qubes_default=False, mark_existing_as_default=False, default_value=None): """ populates widget (ListBox or ComboBox) with items. Used when there is no corresponding property, but support for special qubesadmin.DEFAULT value is still needed. :param widget: QListBox or QComboBox; must support addItem and findText :param choices: list of tuples (text, value) to use to populate widget. text should be a string, value can be of any type, including None :param add_none: should a 'None' position be added :param add_qubes_default: should a qubesadmin.DEFAULT position be added (requires default_value to be set to something meaningful) :param mark_existing_as_default: should an existing value be marked as default. If used with conjuction with add_qubes_default, the default_value listed will be replaced by qubesadmin.DEFAULT :param default_value: what value should be used as the default :return: """ added_existing = False if mark_existing_as_default: existing_default = [item for item in choices if item[1] == default_value] if existing_default: choices = [item for item in choices if item not in existing_default] if add_qubes_default: # if for some reason (e.g. storage pools) we want to mark an # actual value as default and replace it with qubesadmin.DEFAULT default_value = qubesadmin.DEFAULT choices.insert( 0, (translate("default ({})").format(existing_default[0][0]), default_value)) added_existing = True elif add_qubes_default: choices.insert(0, (translate("default ({})").format(default_value), qubesadmin.DEFAULT)) if add_none: if mark_existing_as_default and default_value is None and \ not added_existing: choices.append((translate("default (none)"), None)) else: choices.append((translate("(none)"), None)) if add_qubes_default: selected_value = qubesadmin.DEFAULT elif mark_existing_as_default: selected_value = default_value else: selected_value = choices[0][1] initialize_widget( widget=widget, choices=choices, selected_value=selected_value, add_current_label=False) def initialize_widget_with_kernels( widget, qubes_app, allow_none=False, holder=None, property_name=None, allow_default=False): """ populates widget (ListBox or ComboBox) with kernel items, based on a given property. Supports discovering the system default for the property and handling qubesadmin.DEFAULT special value. Value of holder.property will be set as current item. Previous widget contents are erased. :param widget: QListBox or QComboBox; must support addItem and findText :param qubes_app: Qubes() object :param allow_none: should a None item be added :param holder: object to use as property_name's holder :param property_name: name of the property :param allow_default: should a qubesadmin.DEFAULT item be added :return: """ kernels = [kernel.vid for kernel in qubes_app.pools['linux-kernel'].volumes] kernels = sorted(kernels, key=KernelVersion) choices = [(kernel, kernel) for kernel in kernels] if allow_none: choices.append((translate("(none)"), None)) initialize_widget_for_property( widget=widget, choices=choices, holder=holder, property_name=property_name, allow_default=allow_default) def initialize_widget_with_labels(widget, qubes_app, holder=None, property_name='label'): """ populates widget (ListBox or ComboBox) with label items, optionally based on a given property. Value of holder.property will be set as current item. Previous widget contents are erased. :param widget: QListBox or QComboBox; must support addItem and findText :param qubes_app: Qubes() object :param holder: object to use as property_name's holder; can be None :param property_name: name of the property :return: """ labels = sorted(qubes_app.labels.values(), key=lambda l: l.index) choices = [(label.name, label) for label in labels] icon_getter = (lambda label: QtGui.QIcon.fromTheme(label.icon)) if holder: initialize_widget_for_property(widget=widget, choices=choices, holder=holder, property_name=property_name, icon_getter=icon_getter) else: initialize_widget(widget=widget, choices=choices, selected_value=labels[0], icon_getter=icon_getter, add_current_label=False) class KernelVersion: # pylint: disable=too-few-public-methods # Cannot use distutils.version.LooseVersion, because it fails at handling # versions that have no numbers in them def __init__(self, string): self.string = string self.groups = re.compile(r'(\d+)').split(self.string) def __lt__(self, other): for (self_content, other_content) in itertools.zip_longest( self.groups, other.groups): if self_content == other_content: continue if self_content is None: return True if other_content is None: return False if self_content.isdigit() and other_content.isdigit(): return int(self_content) < int(other_content) return self_content < other_content def is_debug(): return os.getenv('QUBES_MANAGER_DEBUG', '') not in ('', '0') def debug(*args, **kwargs): if not is_debug(): return print(*args, **kwargs) def get_path_from_vm(vm, service_name): """ Displays a file/directory selection window for the given VM. :param vm: vm from which to select path :param service_name: qubes.SelectFile or qubes.SelectDirectory :return: path to file, checked for validity """ path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() -]*") path_max_len = 512 if not vm: return None stdout, _stderr = vm.run_service_for_stdio(service_name) stdout = stdout.strip() untrusted_path = stdout.decode(encoding='ascii')[:path_max_len] if not untrusted_path: return None if path_re.fullmatch(untrusted_path): assert '../' not in untrusted_path assert '\0' not in untrusted_path return untrusted_path.strip() raise ValueError(QtCore.QCoreApplication.translate( "ManagerUtils", 'Unexpected characters in path.')) def format_dependencies_list(dependencies): """Given a list of tuples representing properties, formats them in a readable list.""" list_text = "" for (holder, prop) in dependencies: if holder is None: list_text += QtCore.QCoreApplication.translate( "ManagerUtils", "- Global property {}
").format(prop) else: list_text += QtCore.QCoreApplication.translate( "ManagerUtils", "- {0} for qube {1}
").format( prop, holder.name) return list_text def loop_shutdown(): pending = asyncio.all_tasks() for task in pending: with suppress(asyncio.CancelledError): task.cancel() # Bases on the original code by: # Copyright (c) 2002-2007 Pascal Varet def handle_exception(exc_type, exc_value, exc_traceback): filename, line, _, _ = traceback.extract_tb(exc_traceback).pop() filename = os.path.basename(filename) error = "%s: %s" % (exc_type.__name__, exc_value) strace = "" stacktrace = traceback.extract_tb(exc_traceback) while stacktrace: (filename, line, func, txt) = stacktrace.pop() strace += "----\n" strace += "line: %s\n" % txt strace += "func: %s\n" % func strace += "line no.: %d\n" % line strace += "file: %s\n" % filename msg_box = QtWidgets.QMessageBox() msg_box.setDetailedText(strace) msg_box.setIcon(QtWidgets.QMessageBox.Critical) msg_box.setWindowTitle(QtCore.QCoreApplication.translate( "ManagerUtils", "Houston, we have a problem...")) msg_box.setText(QtCore.QCoreApplication.translate( "ManagerUtils", "Whoops. A critical error has occured. " "This is most likely a bug in Qubes Manager.

" "{0}
at line {1}
of file " "{2}.

").format(error, line, filename)) msg_box.exec_() def run_asynchronous(window_class): qt_app = QtWidgets.QApplication(sys.argv) translator = QtCore.QTranslator(qt_app) locale = QtCore.QLocale.system().name() i18n_dir = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'i18n') translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir) qt_app.installTranslator(translator) QtCore.QCoreApplication.installTranslator(translator) qt_app.setOrganizationName("The Qubes Project") qt_app.setOrganizationDomain("http://qubes-os.org") qt_app.lastWindowClosed.connect(loop_shutdown) qubes_app = qubesadmin.Qubes() loop = qasync.QEventLoop(qt_app) asyncio.set_event_loop(loop) dispatcher = events.EventsDispatcher(qubes_app) window = window_class(qt_app, qubes_app, dispatcher) if hasattr(window, "setup_application"): window.setup_application() window.show() try: loop.run_until_complete( asyncio.ensure_future(dispatcher.listen_for_events())) except asyncio.CancelledError: pass except Exception: # pylint: disable=broad-except loop_shutdown() exc_type, exc_value, exc_traceback = sys.exc_info()[:3] handle_exception(exc_type, exc_value, exc_traceback) def run_synchronous(window_class): qt_app = QtWidgets.QApplication(sys.argv) translator = QtCore.QTranslator(qt_app) locale = QtCore.QLocale.system().name() i18n_dir = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'i18n') translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir) qt_app.installTranslator(translator) QtCore.QCoreApplication.installTranslator(translator) qt_app.setOrganizationName("The Qubes Project") qt_app.setOrganizationDomain("http://qubes-os.org") sys.excepthook = handle_exception qubes_app = qubesadmin.Qubes() window = window_class(qt_app, qubes_app) if hasattr(window, "setup_application"): window.setup_application() window.show() qt_app.exec_() qt_app.exit() return window