import asyncio import collections from datetime import datetime import itertools import json import os import typing import PyQt5 # pylint: disable=import-error import PyQt5.QtWidgets # pylint: disable=import-error from . import ui_qvmtemplate # pylint: disable=no-name-in-module from . import ui_templateinstallconfirmdlg # pylint: disable=no-name-in-module from . import ui_templateinstallprogressdlg # pylint: disable=no-name-in-module from . import utils #pylint: disable=invalid-name BASE_CMD = ['qvm-template', '--enablerepo=*', '--yes'] # singleton for "no date" ZERO_DATE = datetime.utcfromtimestamp(0) # pylint: disable=too-few-public-methods,inherit-non-class class Template(typing.NamedTuple): status: str name: str evr: str reponame: str size: int buildtime: datetime installtime: typing.Optional[datetime] #licence: str #url: str #summary: str # ---- internal ---- description: str default_status: str # ------------------ COL_NAMES = [ 'Status', 'Name', 'Version', 'Repository', 'Download Size (MB)', 'Build', 'Install', #'License', #'URL', #'Summary' ] @staticmethod def build(status, entry): cli_format = '%Y-%m-%d %H:%M:%S' buildtime = datetime.strptime(entry['buildtime'], cli_format) if entry['installtime']: installtime = datetime.strptime(entry['installtime'], cli_format) else: installtime = ZERO_DATE return Template( status, entry['name'], '%s:%s-%s' % (entry['epoch'], entry['version'], entry['release']), entry['reponame'], int(entry['size']) // 1000000, buildtime, installtime, #entry['license'], #entry['url'], entry['description'], status ) class Action(typing.NamedTuple): op: str name: str evr: str TYPES = [str, str, str] COL_NAMES = ['Operation', 'Name', 'Version'] class TemplateStatusDelegate(PyQt5.QtWidgets.QStyledItemDelegate): OPS = [ ['Installed', 'Reinstall', 'Remove'], ['Extra', 'Remove'], ['Upgradable', 'Upgrade', 'Remove'], ['Downgradable', 'Downgrade', 'Remove'], ['Available', 'Install'] ] def createEditor(self, parent, option, index): _ = option # unused editor = PyQt5.QtWidgets.QComboBox(parent) # Otherwise the internalPointer can be overwritten with a QComboBox index = index.model().index(index.row(), index.column()) kind = index.internalPointer().default_status for op_list in TemplateStatusDelegate.OPS: if op_list[0] == kind: for op in op_list: editor.addItem(op) editor.currentIndexChanged.connect(self.currentIndexChanged) editor.showPopup() return editor return None def setEditorData(self, editor, index): #pylint: disable=no-self-use cur = index.data() idx = editor.findText(cur) if idx >= 0: editor.setCurrentIndex(idx) def setModelData(self, editor, model, index): #pylint: disable=no-self-use model.setData(index, editor.currentText()) def updateEditorGeometry(self, editor, option, index): #pylint: disable=no-self-use _ = index # unused editor.setGeometry(option.rect) @PyQt5.QtCore.pyqtSlot() def currentIndexChanged(self): self.commitData.emit(self.sender()) class TemplateModel(PyQt5.QtCore.QAbstractItemModel): def __init__(self): super().__init__() self.children = [] def flags(self, index): if index.isValid() and index.column() == 0: return super().flags(index) | PyQt5.QtCore.Qt.ItemIsEditable return super().flags(index) def sort(self, idx, order): rev = (order == PyQt5.QtCore.Qt.AscendingOrder) self.children.sort(key=lambda x: x[idx], reverse=rev) self.dataChanged.emit(*self.row_index(0, self.rowCount() - 1)) def index(self, row, column, parent=PyQt5.QtCore.QModelIndex()): if not self.hasIndex(row, column, parent): return PyQt5.QtCore.QModelIndex() return self.createIndex(row, column, self.children[row]) def parent(self, child): #pylint: disable=no-self-use _ = child # unused return PyQt5.QtCore.QModelIndex() def rowCount(self, parent=PyQt5.QtCore.QModelIndex()): #pylint: disable=no-self-use _ = parent # unused return len(self.children) def columnCount(self, parent=PyQt5.QtCore.QModelIndex()): #pylint: disable=no-self-use _ = parent # unused return len(Template.COL_NAMES) def hasChildren(self, index=PyQt5.QtCore.QModelIndex()): #pylint: disable=no-self-use return index == PyQt5.QtCore.QModelIndex() def data(self, index, role=PyQt5.QtCore.Qt.DisplayRole): # pylint: disable=too-many-return-statements if index.isValid(): data = self.children[index.row()][index.column()] if role == PyQt5.QtCore.Qt.DisplayRole: if data is ZERO_DATE: return '' if isinstance(data, datetime): return data.strftime('%d %b %Y') return data if role == PyQt5.QtCore.Qt.FontRole: font = PyQt5.QtGui.QFont() tpl = self.children[index.row()] font.setBold(tpl.status != tpl.default_status) return font if role == PyQt5.QtCore.Qt.TextAlignmentRole: if isinstance(data, int): return PyQt5.QtCore.Qt.AlignRight return PyQt5.QtCore.Qt.AlignLeft return None def setData(self, index, value, role=PyQt5.QtCore.Qt.EditRole): if index.isValid() and role == PyQt5.QtCore.Qt.EditRole: old_list = list(self.children[index.row()]) old_list[index.column()] = value new_tpl = Template(*old_list) self.children[index.row()] = new_tpl self.dataChanged.emit(index, index) return True return False def headerData(self, section, orientation, role=PyQt5.QtCore.Qt.DisplayRole): #pylint: disable=no-self-use if section < len(Template.COL_NAMES) \ and orientation == PyQt5.QtCore.Qt.Horizontal \ and role == PyQt5.QtCore.Qt.DisplayRole: return Template.COL_NAMES[section] return None def removeRows(self, row, count, parent=PyQt5.QtCore.QModelIndex()): _ = parent # unused self.beginRemoveRows(PyQt5.QtCore.QModelIndex(), row, row + count) del self.children[row:row+count] self.endRemoveRows() self.dataChanged.emit(*self.row_index(row, row + count)) def row_index(self, low, high): return self.createIndex(low, 0), \ self.createIndex(high, self.columnCount()) def set_templates(self, templates): self.removeRows(0, self.rowCount()) cnt = sum(len(g) for _, g in templates.items()) self.beginInsertRows(PyQt5.QtCore.QModelIndex(), 0, cnt - 1) for status, grp in templates.items(): for tpl in grp: self.children.append(Template.build(status, tpl)) self.endInsertRows() self.dataChanged.emit(*self.row_index(0, self.rowCount() - 1)) def get_actions(self): actions = [] for tpl in self.children: if tpl.status != tpl.default_status: actions.append(Action(tpl.status, tpl.name, tpl.evr)) return actions async def refresh(self, refresh=True): cmd = BASE_CMD[:] if refresh: # Force refresh if triggered by button press cmd.append('--refresh') cmd.extend(['info', '--machine-readable-json', '--installed', '--available', '--upgrades', '--extras']) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) output, stderr = await proc.communicate() output = output.decode('ASCII') if proc.returncode != 0: stderr = stderr.decode('ASCII') return False, stderr # Default type is dict as we're going to replace the lists with # dicts shortly after tpls = collections.defaultdict(dict, json.loads(output)) # Remove duplicates # Should this be done in qvm-template? # TODO: Merge templates with same name? # If so, we may need to have a separate UI to force versions. local_names = set(x['name'] for x in tpls['installed']) # Convert to dict for easier subtraction for key in tpls: tpls[key] = { (x['name'], x['epoch'], x['version'], x['release']): x for x in tpls[key]} tpls['installed'] = { k: v for k, v in tpls['installed'].items() if k not in tpls['extra'] and k not in tpls['upgradable']} tpls['available'] = { k: v for k, v in tpls['available'].items() if k not in tpls['installed'] and k not in tpls['upgradable']} # If the package name is installed but the specific version is # neither installed or an upgrade, then it must be a downgrade tpls['downgradable'] = { k: v for k, v in tpls['available'].items() if k[0] in local_names} tpls['available'] = { k: v for k, v in tpls['available'].items() if k not in tpls['downgradable']} # Convert back to list tpls = {k.title(): list(v.values()) for k, v in tpls.items()} self.set_templates(tpls) return True, None class TemplateInstallConfirmDialog( ui_templateinstallconfirmdlg.Ui_TemplateInstallConfirmDlg, PyQt5.QtWidgets.QDialog): def __init__(self, actions): super().__init__() self.setupUi(self) model = PyQt5.QtGui.QStandardItemModel() model.setHorizontalHeaderLabels(Action.COL_NAMES) self.treeView.setModel(model) for act in actions: model.appendRow([PyQt5.QtGui.QStandardItem(x) for x in act]) class TemplateInstallProgressDialog( ui_templateinstallprogressdlg.Ui_TemplateInstallProgressDlg, PyQt5.QtWidgets.QDialog): def __init__(self, actions): super().__init__() self.setupUi(self) self.actions = actions self.buttonBox.hide() @staticmethod def _process_cr(text): """Reduce lines replaced using CR character (\r)""" while '\r' in text: prefix, suffix = text.rsplit('\r', 1) if '\n' in prefix: prefix = prefix.rsplit('\n', 1)[0] prefix += '\n' else: prefix = '' text = prefix + suffix return text def install(self): async def coro(): self.actions.sort() for oper, grp in itertools.groupby(self.actions, lambda x: x[0]): oper = oper.lower() # No need to specify versions for local operations if oper in ('remove', 'purge'): specs = [x.name for x in grp] else: specs = [x.name + '-' + x.evr for x in grp] # FIXME: (C)Python versions before 3.9 fully-buffers stderr in # this context, cf. https://bugs.python.org/issue13601 # Forcing it to be unbuffered for the time being so that # the messages can be displayed in time. envs = os.environ.copy() envs['PYTHONUNBUFFERED'] = '1' proc = await asyncio.create_subprocess_exec( *(BASE_CMD + [oper, '--'] + specs), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=envs) #pylint: disable=cell-var-from-loop status_text = '' while True: line = await proc.stdout.read(100) if line == b'': break line = line.decode('UTF-8') status_text = self._process_cr(status_text + line) self.textEdit.setPlainText(status_text) if await proc.wait() != 0: self.buttonBox.show() self.progressBar.setMaximum(100) self.progressBar.setValue(0) return False self.progressBar.setMaximum(100) self.progressBar.setValue(100) self.buttonBox.show() return True asyncio.create_task(coro()) class QvmTemplateWindow( ui_qvmtemplate.Ui_QubesTemplateManager, PyQt5.QtWidgets.QMainWindow): def __init__(self, qt_app, qubes_app, dispatcher, parent=None): _ = parent # unused super().__init__() self.setupUi(self) self.listing.header().setSectionResizeMode( PyQt5.QtWidgets.QHeaderView.ResizeToContents) self.qubes_app = qubes_app self.qt_app = qt_app self.dispatcher = dispatcher self.listing_model = TemplateModel() self.listing_delegate = TemplateStatusDelegate(self.listing) self.listing.setModel(self.listing_model) self.listing.setItemDelegateForColumn(0, self.listing_delegate) self.refresh(False) self.listing.setItemDelegateForColumn(0, self.listing_delegate) self.listing.selectionModel() \ .selectionChanged.connect(self.update_info) self.actionRefresh.triggered.connect(lambda: self.refresh(True)) self.actionInstall.triggered.connect(self.do_install) def update_info(self, selected): _ = selected # unused indices = [ x for x in self.listing.selectionModel().selectedIndexes() if x.column() == 0] if len(indices) == 0: return self.infobox.clear() cursor = PyQt5.QtGui.QTextCursor(self.infobox.document()) bold_fmt = PyQt5.QtGui.QTextCharFormat() bold_fmt.setFontWeight(PyQt5.QtGui.QFont.Bold) norm_fmt = PyQt5.QtGui.QTextCharFormat() if len(indices) > 1: cursor.insertText('Selected templates:\n', bold_fmt) for idx in indices: tpl = self.listing_model.children[idx.row()] cursor.insertText(tpl.name + '-' + tpl.evr + '\n', norm_fmt) else: idx = indices[0] tpl = self.listing_model.children[idx.row()] cursor.insertText('Name: ', bold_fmt) cursor.insertText(tpl.name + '\n', norm_fmt) cursor.insertText('Description:\n', bold_fmt) cursor.insertText(tpl.description + '\n', norm_fmt) def refresh(self, refresh=True): self.progressBar.show() async def coro(): ok, stderr = await self.listing_model.refresh(refresh) self.infobox.clear() if not ok: cursor = PyQt5.QtGui.QTextCursor(self.infobox.document()) fmt = PyQt5.QtGui.QTextCharFormat() fmt.setFontWeight(PyQt5.QtGui.QFont.Bold) cursor.insertText('Failed to fetch template list:\n', fmt) fmt.setFontWeight(PyQt5.QtGui.QFont.Normal) cursor.insertText(stderr, fmt) self.progressBar.hide() asyncio.create_task(coro()) def do_install(self): actions = self.listing_model.get_actions() confirm = TemplateInstallConfirmDialog(actions) if confirm.exec_(): progress = TemplateInstallProgressDialog(actions) progress.install() progress.exec_() self.refresh() def main(): utils.run_asynchronous(QvmTemplateWindow) if __name__ == '__main__': main()