manager/qubesmanager/qvm_template_gui.py
2021-04-03 19:07:06 +02:00

447 lines
16 KiB
Python

import asyncio
import collections
from datetime import datetime
import itertools
import json
import os
import typing
import PyQt5 # pylint: disable=import-error
import PyQt5.QtWidgets # pylint: disable=import-error
from . import ui_qvmtemplate # pylint: disable=no-name-in-module
from . import ui_templateinstallconfirmdlg # pylint: disable=no-name-in-module
from . import ui_templateinstallprogressdlg # pylint: disable=no-name-in-module
from . import utils
#pylint: disable=invalid-name
BASE_CMD = ['qvm-template', '--enablerepo=*', '--yes']
# singleton for "no date"
ZERO_DATE = datetime.utcfromtimestamp(0)
# pylint: disable=too-few-public-methods,inherit-non-class
class Template(typing.NamedTuple):
status: str
name: str
evr: str
reponame: str
size: int
buildtime: datetime
installtime: typing.Optional[datetime]
#licence: str
#url: str
#summary: str
# ---- internal ----
description: str
default_status: str
# ------------------
COL_NAMES = [
'Status',
'Name',
'Version',
'Repository',
'Download Size (MB)',
'Build',
'Install',
#'License',
#'URL',
#'Summary'
]
@staticmethod
def build(status, entry):
cli_format = '%Y-%m-%d %H:%M:%S'
buildtime = datetime.strptime(entry['buildtime'], cli_format)
if entry['installtime']:
installtime = datetime.strptime(entry['installtime'], cli_format)
else:
installtime = ZERO_DATE
return Template(
status,
entry['name'],
'%s:%s-%s' % (entry['epoch'], entry['version'], entry['release']),
entry['reponame'],
int(entry['size']) // 1000000,
buildtime,
installtime,
#entry['license'],
#entry['url'],
entry['description'],
status
)
class Action(typing.NamedTuple):
op: str
name: str
evr: str
TYPES = [str, str, str]
COL_NAMES = ['Operation', 'Name', 'Version']
class TemplateStatusDelegate(PyQt5.QtWidgets.QStyledItemDelegate):
OPS = [
['Installed', 'Reinstall', 'Remove'],
['Extra', 'Remove'],
['Upgradable', 'Upgrade', 'Remove'],
['Downgradable', 'Downgrade', 'Remove'],
['Available', 'Install']
]
def createEditor(self, parent, option, index):
_ = option # unused
editor = PyQt5.QtWidgets.QComboBox(parent)
# Otherwise the internalPointer can be overwritten with a QComboBox
index = index.model().index(index.row(), index.column())
kind = index.internalPointer().default_status
for op_list in TemplateStatusDelegate.OPS:
if op_list[0] == kind:
for op in op_list:
editor.addItem(op)
editor.currentIndexChanged.connect(self.currentIndexChanged)
editor.showPopup()
return editor
return None
def setEditorData(self, editor, index):
#pylint: disable=no-self-use
cur = index.data()
idx = editor.findText(cur)
if idx >= 0:
editor.setCurrentIndex(idx)
def setModelData(self, editor, model, index):
#pylint: disable=no-self-use
model.setData(index, editor.currentText())
def updateEditorGeometry(self, editor, option, index):
#pylint: disable=no-self-use
_ = index # unused
editor.setGeometry(option.rect)
@PyQt5.QtCore.pyqtSlot()
def currentIndexChanged(self):
self.commitData.emit(self.sender())
class TemplateModel(PyQt5.QtCore.QAbstractItemModel):
def __init__(self):
super().__init__()
self.children = []
def flags(self, index):
if index.isValid() and index.column() == 0:
return super().flags(index) | PyQt5.QtCore.Qt.ItemIsEditable
return super().flags(index)
def sort(self, idx, order):
rev = (order == PyQt5.QtCore.Qt.AscendingOrder)
self.children.sort(key=lambda x: x[idx], reverse=rev)
self.dataChanged.emit(*self.row_index(0, self.rowCount() - 1))
def index(self, row, column, parent=PyQt5.QtCore.QModelIndex()):
if not self.hasIndex(row, column, parent):
return PyQt5.QtCore.QModelIndex()
return self.createIndex(row, column, self.children[row])
def parent(self, child):
#pylint: disable=no-self-use
_ = child # unused
return PyQt5.QtCore.QModelIndex()
def rowCount(self, parent=PyQt5.QtCore.QModelIndex()):
#pylint: disable=no-self-use
_ = parent # unused
return len(self.children)
def columnCount(self, parent=PyQt5.QtCore.QModelIndex()):
#pylint: disable=no-self-use
_ = parent # unused
return len(Template.COL_NAMES)
def hasChildren(self, index=PyQt5.QtCore.QModelIndex()):
#pylint: disable=no-self-use
return index == PyQt5.QtCore.QModelIndex()
def data(self, index, role=PyQt5.QtCore.Qt.DisplayRole):
if index.isValid():
data = self.children[index.row()][index.column()]
if role == PyQt5.QtCore.Qt.DisplayRole:
if data is ZERO_DATE:
return ''
if isinstance(data, datetime):
return data.strftime('%d %b %Y')
return data
if role == PyQt5.QtCore.Qt.FontRole:
font = PyQt5.QtGui.QFont()
tpl = self.children[index.row()]
font.setBold(tpl.status != tpl.default_status)
return font
if role == PyQt5.QtCore.Qt.TextAlignmentRole:
if isinstance(data, int):
return PyQt5.QtCore.Qt.AlignRight
return PyQt5.QtCore.Qt.AlignLeft
return None
def setData(self, index, value, role=PyQt5.QtCore.Qt.EditRole):
if index.isValid() and role == PyQt5.QtCore.Qt.EditRole:
old_list = list(self.children[index.row()])
old_list[index.column()] = value
new_tpl = Template(*old_list)
self.children[index.row()] = new_tpl
self.dataChanged.emit(index, index)
return True
return False
def headerData(self, section, orientation,
role=PyQt5.QtCore.Qt.DisplayRole):
#pylint: disable=no-self-use
if section < len(Template.COL_NAMES) \
and orientation == PyQt5.QtCore.Qt.Horizontal \
and role == PyQt5.QtCore.Qt.DisplayRole:
return Template.COL_NAMES[section]
return None
def removeRows(self, row, count, parent=PyQt5.QtCore.QModelIndex()):
_ = parent # unused
self.beginRemoveRows(PyQt5.QtCore.QModelIndex(), row, row + count)
del self.children[row:row+count]
self.endRemoveRows()
self.dataChanged.emit(*self.row_index(row, row + count))
def row_index(self, low, high):
return self.createIndex(low, 0), \
self.createIndex(high, self.columnCount())
def set_templates(self, templates):
self.removeRows(0, self.rowCount())
cnt = sum(len(g) for _, g in templates.items())
self.beginInsertRows(PyQt5.QtCore.QModelIndex(), 0, cnt - 1)
for status, grp in templates.items():
for tpl in grp:
self.children.append(Template.build(status, tpl))
self.endInsertRows()
self.dataChanged.emit(*self.row_index(0, self.rowCount() - 1))
def get_actions(self):
actions = []
for tpl in self.children:
if tpl.status != tpl.default_status:
actions.append(Action(tpl.status, tpl.name, tpl.evr))
return actions
async def refresh(self, refresh=True):
cmd = BASE_CMD[:]
if refresh:
# Force refresh if triggered by button press
cmd.append('--refresh')
cmd.extend(['info', '--machine-readable-json', '--installed',
'--available', '--upgrades', '--extras'])
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
output, stderr = await proc.communicate()
output = output.decode('ASCII')
if proc.returncode != 0:
stderr = stderr.decode('ASCII')
return False, stderr
# Default type is dict as we're going to replace the lists with
# dicts shortly after
tpls = collections.defaultdict(dict, json.loads(output))
# Remove duplicates
# Should this be done in qvm-template?
# TODO: Merge templates with same name?
# If so, we may need to have a separate UI to force versions.
local_names = set(x['name'] for x in tpls['installed'])
# Convert to dict for easier subtraction
for key in tpls:
tpls[key] = {
(x['name'], x['epoch'], x['version'], x['release']): x
for x in tpls[key]}
tpls['installed'] = {
k: v for k, v in tpls['installed'].items()
if k not in tpls['extra'] and k not in tpls['upgradable']}
tpls['available'] = {
k: v for k, v in tpls['available'].items()
if k not in tpls['installed']
and k not in tpls['upgradable']}
# If the package name is installed but the specific version is
# neither installed or an upgrade, then it must be a downgrade
tpls['downgradable'] = {
k: v for k, v in tpls['available'].items()
if k[0] in local_names}
tpls['available'] = {
k: v for k, v in tpls['available'].items()
if k not in tpls['downgradable']}
# Convert back to list
tpls = {k.title(): list(v.values()) for k, v in tpls.items()}
self.set_templates(tpls)
return True, None
class TemplateInstallConfirmDialog(
ui_templateinstallconfirmdlg.Ui_TemplateInstallConfirmDlg,
PyQt5.QtWidgets.QDialog):
def __init__(self, actions):
super().__init__()
self.setupUi(self)
model = PyQt5.QtGui.QStandardItemModel()
model.setHorizontalHeaderLabels(Action.COL_NAMES)
self.treeView.setModel(model)
for act in actions:
model.appendRow([PyQt5.QtGui.QStandardItem(x) for x in act])
class TemplateInstallProgressDialog(
ui_templateinstallprogressdlg.Ui_TemplateInstallProgressDlg,
PyQt5.QtWidgets.QDialog):
def __init__(self, actions):
super().__init__()
self.setupUi(self)
self.actions = actions
self.buttonBox.hide()
@staticmethod
def _process_cr(text):
"""Reduce lines replaced using CR character (\r)"""
while '\r' in text:
prefix, suffix = text.rsplit('\r', 1)
if '\n' in prefix:
prefix = prefix.rsplit('\n', 1)[0]
prefix += '\n'
else:
prefix = ''
text = prefix + suffix
return text
def install(self):
async def coro():
self.actions.sort()
for oper, grp in itertools.groupby(self.actions, lambda x: x[0]):
oper = oper.lower()
# No need to specify versions for local operations
if oper in ('remove', 'purge'):
specs = [x.name for x in grp]
else:
specs = [x.name + '-' + x.evr for x in grp]
# FIXME: (C)Python versions before 3.9 fully-buffers stderr in
# this context, cf. https://bugs.python.org/issue13601
# Forcing it to be unbuffered for the time being so that
# the messages can be displayed in time.
envs = os.environ.copy()
envs['PYTHONUNBUFFERED'] = '1'
proc = await asyncio.create_subprocess_exec(
*(BASE_CMD + [oper, '--'] + specs),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=envs)
#pylint: disable=cell-var-from-loop
status_text = ''
while True:
line = await proc.stdout.read(100)
if line == b'':
break
line = line.decode('UTF-8')
status_text = self._process_cr(status_text + line)
self.textEdit.setPlainText(status_text)
if await proc.wait() != 0:
self.buttonBox.show()
self.progressBar.setMaximum(100)
self.progressBar.setValue(0)
return False
self.progressBar.setMaximum(100)
self.progressBar.setValue(100)
self.buttonBox.show()
return True
asyncio.create_task(coro())
class QvmTemplateWindow(
ui_qvmtemplate.Ui_QubesTemplateManager,
PyQt5.QtWidgets.QMainWindow):
def __init__(self, qt_app, qubes_app, dispatcher, parent=None):
_ = parent # unused
super().__init__()
self.setupUi(self)
self.listing.header().setSectionResizeMode(
PyQt5.QtWidgets.QHeaderView.ResizeToContents)
self.qubes_app = qubes_app
self.qt_app = qt_app
self.dispatcher = dispatcher
self.listing_model = TemplateModel()
self.listing_delegate = TemplateStatusDelegate(self.listing)
self.listing.setModel(self.listing_model)
self.listing.setItemDelegateForColumn(0, self.listing_delegate)
self.refresh(False)
self.listing.setItemDelegateForColumn(0, self.listing_delegate)
self.listing.selectionModel() \
.selectionChanged.connect(self.update_info)
self.actionRefresh.triggered.connect(lambda: self.refresh(True))
self.actionInstall.triggered.connect(self.do_install)
def update_info(self, selected):
_ = selected # unused
indices = [
x
for x in self.listing.selectionModel().selectedIndexes()
if x.column() == 0]
if len(indices) == 0:
return
self.infobox.clear()
cursor = PyQt5.QtGui.QTextCursor(self.infobox.document())
bold_fmt = PyQt5.QtGui.QTextCharFormat()
bold_fmt.setFontWeight(PyQt5.QtGui.QFont.Bold)
norm_fmt = PyQt5.QtGui.QTextCharFormat()
if len(indices) > 1:
cursor.insertText('Selected templates:\n', bold_fmt)
for idx in indices:
tpl = self.listing_model.children[idx.row()]
cursor.insertText(tpl.name + '-' + tpl.evr + '\n', norm_fmt)
else:
idx = indices[0]
tpl = self.listing_model.children[idx.row()]
cursor.insertText('Name: ', bold_fmt)
cursor.insertText(tpl.name + '\n', norm_fmt)
cursor.insertText('Description:\n', bold_fmt)
cursor.insertText(tpl.description + '\n', norm_fmt)
def refresh(self, refresh=True):
self.progressBar.show()
async def coro():
ok, stderr = await self.listing_model.refresh(refresh)
self.infobox.clear()
if not ok:
cursor = PyQt5.QtGui.QTextCursor(self.infobox.document())
fmt = PyQt5.QtGui.QTextCharFormat()
fmt.setFontWeight(PyQt5.QtGui.QFont.Bold)
cursor.insertText('Failed to fetch template list:\n', fmt)
fmt.setFontWeight(PyQt5.QtGui.QFont.Normal)
cursor.insertText(stderr, fmt)
self.progressBar.hide()
asyncio.create_task(coro())
def do_install(self):
actions = self.listing_model.get_actions()
confirm = TemplateInstallConfirmDialog(actions)
if confirm.exec_():
progress = TemplateInstallProgressDialog(actions)
progress.install()
progress.exec_()
self.refresh()
def main():
utils.run_asynchronous(QvmTemplateWindow)
if __name__ == '__main__':
main()