123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- #
- # The Qubes OS Project, https://www.qubes-os.org
- #
- # Copyright (C) 2012 Agnieszka Kostrzewa <agnieszka.kostrzewa@gmail.com>
- # Copyright (C) 2012 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- # Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- import itertools
- import os
- import re
- import qubesadmin
- import traceback
- import asyncio
- from contextlib import suppress
- import sys
- import quamash
- from qubesadmin import events
- from PyQt5 import QtWidgets, QtCore, QtGui # pylint: disable=import-error
- def _filter_internal(vm):
- return (not vm.klass == 'AdminVM'
- and not vm.features.get('internal', False))
- class SizeSpinBox(QtWidgets.QSpinBox):
- # pylint: disable=invalid-name, no-self-use
- def __init__(self, *args, **kwargs):
- super(SizeSpinBox, self).__init__(*args, **kwargs)
- self.pattern = r'(\d+\.?\d?) ?(GB|MB)'
- self.regex = re.compile(self.pattern)
- self.validator = QtGui.QRegExpValidator(QtCore.QRegExp(
- self.pattern), self)
- def textFromValue(self, v: int) -> str:
- if v > 1024:
- return '{:.1f} GB'.format(v / 1024)
- return '{} MB'.format(v)
- def validate(self, text: str, pos: int):
- return self.validator.validate(text, pos)
- def valueFromText(self, text: str) -> int:
- value, unit = self.regex.fullmatch(text.strip()).groups()
- if unit == 'GB':
- multiplier = 1024
- else:
- multiplier = 1
- return int(float(value) * multiplier)
- def prepare_choice(widget, holder, propname, choice, default,
- filter_function=None, *,
- icon_getter=None, allow_internal=None, allow_default=False,
- allow_none=False, transform=None):
- # for newly created vms, set propname to None
- # clear the widget, so that prepare_choice functions can be used
- # to refresh widget values
- while widget.count() > 0:
- widget.removeItem(0)
- debug(
- 'prepare_choice(widget={widget!r}, '
- 'holder={holder!r}, '
- 'propname={propname!r}, '
- 'choice={choice!r}, '
- 'default={default!r}, '
- 'filter_function={filter_function!r}, '
- 'icon_getter={icon_getter!r}, '
- 'allow_internal={allow_internal!r}, '
- 'allow_default={allow_default!r}, '
- 'allow_none={allow_none!r})'.format(**locals()))
- if propname is not None and allow_default:
- default = holder.property_get_default(propname)
- if allow_internal is None:
- allow_internal = propname is None or not propname.endswith('vm')
- if propname is not None:
- if holder.property_is_default(propname):
- oldvalue = qubesadmin.DEFAULT
- else:
- oldvalue = getattr(holder, propname)
- if oldvalue == '':
- oldvalue = None
- if transform is not None and oldvalue is not None:
- oldvalue = transform(oldvalue)
- else:
- oldvalue = object() # won't match for identity
- idx = 0
- choice_list = list(choice)[:]
- if not allow_internal:
- choice_list = filter(_filter_internal, choice_list)
- if filter_function is not None:
- choice_list = filter(filter_function, choice_list)
- choice_list = list(choice_list)
- if allow_default:
- choice_list.insert(0, qubesadmin.DEFAULT)
- if allow_none:
- choice_list.append(None)
- for i, item in enumerate(choice_list):
- debug('i={} item={}'.format(i, item))
- # 0: default (unset)
- if item is qubesadmin.DEFAULT:
- default_string = str(default) if default is not None else 'none'
- if transform is not None:
- default_string = transform(default_string)
- text = QtCore.QCoreApplication.translate(
- "ManagerUtils", 'default ({})').format(default_string)
- # N+1: explicit None
- elif item is None:
- text = QtCore.QCoreApplication.translate("ManagerUtils", '(none)')
- # 1..N: choices
- else:
- text = str(item)
- if transform is not None:
- text = transform(text)
- if item == oldvalue:
- text += QtCore.QCoreApplication.translate(
- "ManagerUtils", ' (current)')
- idx = i
- widget.insertItem(i, text)
- if icon_getter is not None:
- icon = icon_getter(item)
- if icon is not None:
- widget.setItemIcon(i, icon)
- widget.setCurrentIndex(idx)
- return choice_list, idx
- class KernelVersion: # pylint: disable=too-few-public-methods
- # Cannot use distutils.version.LooseVersion, because it fails at handling
- # versions that have no numbers in them
- def __init__(self, string):
- self.string = string
- self.groups = re.compile(r'(\d+)').split(self.string)
- def __lt__(self, other):
- for (self_content, other_content) in itertools.zip_longest(
- self.groups, other.groups):
- if self_content == other_content:
- continue
- if self_content is None:
- return True
- if other_content is None:
- return False
- if self_content.isdigit() and other_content.isdigit():
- return int(self_content) < int(other_content)
- return self_content < other_content
- def prepare_kernel_choice(widget, holder, propname, default, *args, **kwargs):
- try:
- app = holder.app
- except AttributeError:
- app = holder
- kernels = [kernel.vid for kernel in app.pools['linux-kernel'].volumes]
- kernels = sorted(kernels, key=KernelVersion)
- return prepare_choice(
- widget, holder, propname, kernels, default, *args, **kwargs)
- def prepare_label_choice(widget, holder, propname, default, *args, **kwargs):
- try:
- app = holder.app
- except AttributeError:
- app = holder
- return prepare_choice(widget, holder, propname,
- sorted(app.labels.values(), key=lambda l: l.index),
- default, *args,
- icon_getter=(lambda label:
- QtGui.QIcon.fromTheme(label.icon)),
- **kwargs)
- def prepare_vm_choice(widget, holder, propname, default, *args, **kwargs):
- try:
- app = holder.app
- except AttributeError:
- app = holder
- return prepare_choice(widget, holder, propname, app.domains, default,
- *args, **kwargs)
- def is_debug():
- return os.getenv('QUBES_MANAGER_DEBUG', '') not in ('', '0')
- def debug(*args, **kwargs):
- if not is_debug():
- return
- print(*args, **kwargs)
- def get_path_from_vm(vm, service_name):
- """
- Displays a file/directory selection window for the given VM.
- :param vm: vm from which to select path
- :param service_name: qubes.SelectFile or qubes.SelectDirectory
- :return: path to file, checked for validity
- """
- path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() -]*")
- path_max_len = 512
- if not vm:
- return None
- stdout, _stderr = vm.run_service_for_stdio(service_name)
- stdout = stdout.strip()
- untrusted_path = stdout.decode(encoding='ascii')[:path_max_len]
- if not untrusted_path:
- return None
- if path_re.fullmatch(untrusted_path):
- assert '../' not in untrusted_path
- assert '\0' not in untrusted_path
- return untrusted_path.strip()
- raise ValueError(QtCore.QCoreApplication.translate(
- "ManagerUtils", 'Unexpected characters in path.'))
- def format_dependencies_list(dependencies):
- """Given a list of tuples representing properties, formats them in
- a readable list."""
- list_text = ""
- for (holder, prop) in dependencies:
- if holder is None:
- list_text += QtCore.QCoreApplication.translate(
- "ManagerUtils", "- Global property <b>{}</b> <br>").format(prop)
- else:
- list_text += QtCore.QCoreApplication.translate(
- "ManagerUtils", "- <b>{0}</b> for qube <b>{1}</b> <br>").format(
- prop, holder.name)
- return list_text
- def loop_shutdown():
- pending = asyncio.Task.all_tasks()
- for task in pending:
- with suppress(asyncio.CancelledError):
- task.cancel()
- # Bases on the original code by:
- # Copyright (c) 2002-2007 Pascal Varet <p.varet@gmail.com>
- def handle_exception(exc_type, exc_value, exc_traceback):
- filename, line, _, _ = traceback.extract_tb(exc_traceback).pop()
- filename = os.path.basename(filename)
- error = "%s: %s" % (exc_type.__name__, exc_value)
- strace = ""
- stacktrace = traceback.extract_tb(exc_traceback)
- while stacktrace:
- (filename, line, func, txt) = stacktrace.pop()
- strace += "----\n"
- strace += "line: %s\n" % txt
- strace += "func: %s\n" % func
- strace += "line no.: %d\n" % line
- strace += "file: %s\n" % filename
- msg_box = QtWidgets.QMessageBox()
- msg_box.setDetailedText(strace)
- msg_box.setIcon(QtWidgets.QMessageBox.Critical)
- msg_box.setWindowTitle(QtCore.QCoreApplication.translate(
- "ManagerUtils", "Houston, we have a problem..."))
- msg_box.setText(QtCore.QCoreApplication.translate(
- "ManagerUtils", "Whoops. A critical error has occured. "
- "This is most likely a bug in Qubes Manager.<br><br>"
- "<b><i>{0}</i></b><br/>at line <b>{1}</b><br/>of file "
- "{2}.<br/><br/>").format(error, line, filename))
- msg_box.exec_()
- def run_asynchronous(window_class):
- qt_app = QtWidgets.QApplication(sys.argv)
- translator = QtCore.QTranslator(qt_app)
- locale = QtCore.QLocale.system().name()
- i18n_dir = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'i18n')
- translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
- qt_app.installTranslator(translator)
- QtCore.QCoreApplication.installTranslator(translator)
- qt_app.setOrganizationName("The Qubes Project")
- qt_app.setOrganizationDomain("http://qubes-os.org")
- qt_app.lastWindowClosed.connect(loop_shutdown)
- qubes_app = qubesadmin.Qubes()
- loop = quamash.QEventLoop(qt_app)
- asyncio.set_event_loop(loop)
- dispatcher = events.EventsDispatcher(qubes_app)
- window = window_class(qt_app, qubes_app, dispatcher)
- if hasattr(window, "setup_application"):
- window.setup_application()
- window.show()
- try:
- loop.run_until_complete(
- asyncio.ensure_future(dispatcher.listen_for_events()))
- except asyncio.CancelledError:
- pass
- except Exception: # pylint: disable=broad-except
- loop_shutdown()
- exc_type, exc_value, exc_traceback = sys.exc_info()[:3]
- handle_exception(exc_type, exc_value, exc_traceback)
- def run_synchronous(window_class):
- qt_app = QtWidgets.QApplication(sys.argv)
- translator = QtCore.QTranslator(qt_app)
- locale = QtCore.QLocale.system().name()
- i18n_dir = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'i18n')
- translator.load("qubesmanager_{!s}.qm".format(locale), i18n_dir)
- qt_app.installTranslator(translator)
- QtCore.QCoreApplication.installTranslator(translator)
- qt_app.setOrganizationName("The Qubes Project")
- qt_app.setOrganizationDomain("http://qubes-os.org")
- sys.excepthook = handle_exception
- qubes_app = qubesadmin.Qubes()
- window = window_class(qt_app, qubes_app)
- if hasattr(window, "setup_application"):
- window.setup_application()
- window.show()
- qt_app.exec_()
- qt_app.exit()
|