123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- import asyncio
- import collections
- from datetime import datetime
- import itertools
- import json
- import os
- import typing
- import PyQt5 # pylint: disable=import-error
- import PyQt5.QtWidgets # pylint: disable=import-error
- from . import ui_qvmtemplate # pylint: disable=no-name-in-module
- from . import ui_templateinstallconfirmdlg # pylint: disable=no-name-in-module
- from . import ui_templateinstallprogressdlg # pylint: disable=no-name-in-module
- from . import utils
- #pylint: disable=invalid-name
- BASE_CMD = ['qvm-template', '--enablerepo=*', '--yes']
- # singleton for "no date"
- ZERO_DATE = datetime.utcfromtimestamp(0)
- # pylint: disable=too-few-public-methods,inherit-non-class
- class Template(typing.NamedTuple):
- status: str
- name: str
- evr: str
- reponame: str
- size: int
- buildtime: datetime
- installtime: typing.Optional[datetime]
- #licence: str
- #url: str
- #summary: str
- # ---- internal ----
- description: str
- default_status: str
- # ------------------
- COL_NAMES = [
- 'Status',
- 'Name',
- 'Version',
- 'Repository',
- 'Download Size (MB)',
- 'Build',
- 'Install',
- #'License',
- #'URL',
- #'Summary'
- ]
- @staticmethod
- def build(status, entry):
- cli_format = '%Y-%m-%d %H:%M:%S'
- buildtime = datetime.strptime(entry['buildtime'], cli_format)
- if entry['installtime']:
- installtime = datetime.strptime(entry['installtime'], cli_format)
- else:
- installtime = ZERO_DATE
- return Template(
- status,
- entry['name'],
- '%s:%s-%s' % (entry['epoch'], entry['version'], entry['release']),
- entry['reponame'],
- int(entry['size']) // 1000000,
- buildtime,
- installtime,
- #entry['license'],
- #entry['url'],
- entry['description'],
- status
- )
- class Action(typing.NamedTuple):
- op: str
- name: str
- evr: str
- TYPES = [str, str, str]
- COL_NAMES = ['Operation', 'Name', 'Version']
- class TemplateStatusDelegate(PyQt5.QtWidgets.QStyledItemDelegate):
- OPS = [
- ['Installed', 'Reinstall', 'Remove'],
- ['Extra', 'Remove'],
- ['Upgradable', 'Upgrade', 'Remove'],
- ['Downgradable', 'Downgrade', 'Remove'],
- ['Available', 'Install']
- ]
- def createEditor(self, parent, option, index):
- _ = option # unused
- editor = PyQt5.QtWidgets.QComboBox(parent)
- # Otherwise the internalPointer can be overwritten with a QComboBox
- index = index.model().index(index.row(), index.column())
- kind = index.internalPointer().default_status
- for op_list in TemplateStatusDelegate.OPS:
- if op_list[0] == kind:
- for op in op_list:
- editor.addItem(op)
- editor.currentIndexChanged.connect(self.currentIndexChanged)
- editor.showPopup()
- return editor
- return None
- def setEditorData(self, editor, index):
- #pylint: disable=no-self-use
- cur = index.data()
- idx = editor.findText(cur)
- if idx >= 0:
- editor.setCurrentIndex(idx)
- def setModelData(self, editor, model, index):
- #pylint: disable=no-self-use
- model.setData(index, editor.currentText())
- def updateEditorGeometry(self, editor, option, index):
- #pylint: disable=no-self-use
- _ = index # unused
- editor.setGeometry(option.rect)
- @PyQt5.QtCore.pyqtSlot()
- def currentIndexChanged(self):
- self.commitData.emit(self.sender())
- class TemplateModel(PyQt5.QtCore.QAbstractItemModel):
- def __init__(self):
- super().__init__()
- self.children = []
- def flags(self, index):
- if index.isValid() and index.column() == 0:
- return super().flags(index) | PyQt5.QtCore.Qt.ItemIsEditable
- return super().flags(index)
- def sort(self, idx, order):
- rev = (order == PyQt5.QtCore.Qt.AscendingOrder)
- self.children.sort(key=lambda x: x[idx], reverse=rev)
- self.dataChanged.emit(*self.row_index(0, self.rowCount() - 1))
- def index(self, row, column, parent=PyQt5.QtCore.QModelIndex()):
- if not self.hasIndex(row, column, parent):
- return PyQt5.QtCore.QModelIndex()
- return self.createIndex(row, column, self.children[row])
- def parent(self, child):
- #pylint: disable=no-self-use
- _ = child # unused
- return PyQt5.QtCore.QModelIndex()
- def rowCount(self, parent=PyQt5.QtCore.QModelIndex()):
- #pylint: disable=no-self-use
- _ = parent # unused
- return len(self.children)
- def columnCount(self, parent=PyQt5.QtCore.QModelIndex()):
- #pylint: disable=no-self-use
- _ = parent # unused
- return len(Template.COL_NAMES)
- def hasChildren(self, index=PyQt5.QtCore.QModelIndex()):
- #pylint: disable=no-self-use
- return index == PyQt5.QtCore.QModelIndex()
- def data(self, index, role=PyQt5.QtCore.Qt.DisplayRole):
- # pylint: disable=too-many-return-statements
- if index.isValid():
- data = self.children[index.row()][index.column()]
- if role == PyQt5.QtCore.Qt.DisplayRole:
- if data is ZERO_DATE:
- return ''
- if isinstance(data, datetime):
- return data.strftime('%d %b %Y')
- return data
- if role == PyQt5.QtCore.Qt.FontRole:
- font = PyQt5.QtGui.QFont()
- tpl = self.children[index.row()]
- font.setBold(tpl.status != tpl.default_status)
- return font
- if role == PyQt5.QtCore.Qt.TextAlignmentRole:
- if isinstance(data, int):
- return PyQt5.QtCore.Qt.AlignRight
- return PyQt5.QtCore.Qt.AlignLeft
- return None
- def setData(self, index, value, role=PyQt5.QtCore.Qt.EditRole):
- if index.isValid() and role == PyQt5.QtCore.Qt.EditRole:
- old_list = list(self.children[index.row()])
- old_list[index.column()] = value
- new_tpl = Template(*old_list)
- self.children[index.row()] = new_tpl
- self.dataChanged.emit(index, index)
- return True
- return False
- def headerData(self, section, orientation,
- role=PyQt5.QtCore.Qt.DisplayRole):
- #pylint: disable=no-self-use
- if section < len(Template.COL_NAMES) \
- and orientation == PyQt5.QtCore.Qt.Horizontal \
- and role == PyQt5.QtCore.Qt.DisplayRole:
- return Template.COL_NAMES[section]
- return None
- def removeRows(self, row, count, parent=PyQt5.QtCore.QModelIndex()):
- _ = parent # unused
- self.beginRemoveRows(PyQt5.QtCore.QModelIndex(), row, row + count)
- del self.children[row:row+count]
- self.endRemoveRows()
- self.dataChanged.emit(*self.row_index(row, row + count))
- def row_index(self, low, high):
- return self.createIndex(low, 0), \
- self.createIndex(high, self.columnCount())
- def set_templates(self, templates):
- self.removeRows(0, self.rowCount())
- cnt = sum(len(g) for _, g in templates.items())
- self.beginInsertRows(PyQt5.QtCore.QModelIndex(), 0, cnt - 1)
- for status, grp in templates.items():
- for tpl in grp:
- self.children.append(Template.build(status, tpl))
- self.endInsertRows()
- self.dataChanged.emit(*self.row_index(0, self.rowCount() - 1))
- def get_actions(self):
- actions = []
- for tpl in self.children:
- if tpl.status != tpl.default_status:
- actions.append(Action(tpl.status, tpl.name, tpl.evr))
- return actions
- async def refresh(self, refresh=True):
- cmd = BASE_CMD[:]
- if refresh:
- # Force refresh if triggered by button press
- cmd.append('--refresh')
- cmd.extend(['info', '--machine-readable-json', '--installed',
- '--available', '--upgrades', '--extras'])
- proc = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE)
- output, stderr = await proc.communicate()
- output = output.decode('ASCII')
- if proc.returncode != 0:
- stderr = stderr.decode('ASCII')
- return False, stderr
- # Default type is dict as we're going to replace the lists with
- # dicts shortly after
- tpls = collections.defaultdict(dict, json.loads(output))
- # Remove duplicates
- # Should this be done in qvm-template?
- # TODO: Merge templates with same name?
- # If so, we may need to have a separate UI to force versions.
- local_names = set(x['name'] for x in tpls['installed'])
- # Convert to dict for easier subtraction
- for key in tpls:
- tpls[key] = {
- (x['name'], x['epoch'], x['version'], x['release']): x
- for x in tpls[key]}
- tpls['installed'] = {
- k: v for k, v in tpls['installed'].items()
- if k not in tpls['extra'] and k not in tpls['upgradable']}
- tpls['available'] = {
- k: v for k, v in tpls['available'].items()
- if k not in tpls['installed']
- and k not in tpls['upgradable']}
- # If the package name is installed but the specific version is
- # neither installed or an upgrade, then it must be a downgrade
- tpls['downgradable'] = {
- k: v for k, v in tpls['available'].items()
- if k[0] in local_names}
- tpls['available'] = {
- k: v for k, v in tpls['available'].items()
- if k not in tpls['downgradable']}
- # Convert back to list
- tpls = {k.title(): list(v.values()) for k, v in tpls.items()}
- self.set_templates(tpls)
- return True, None
- class TemplateInstallConfirmDialog(
- ui_templateinstallconfirmdlg.Ui_TemplateInstallConfirmDlg,
- PyQt5.QtWidgets.QDialog):
- def __init__(self, actions):
- super().__init__()
- self.setupUi(self)
- model = PyQt5.QtGui.QStandardItemModel()
- model.setHorizontalHeaderLabels(Action.COL_NAMES)
- self.treeView.setModel(model)
- for act in actions:
- model.appendRow([PyQt5.QtGui.QStandardItem(x) for x in act])
- class TemplateInstallProgressDialog(
- ui_templateinstallprogressdlg.Ui_TemplateInstallProgressDlg,
- PyQt5.QtWidgets.QDialog):
- def __init__(self, actions):
- super().__init__()
- self.setupUi(self)
- self.actions = actions
- self.buttonBox.hide()
- @staticmethod
- def _process_cr(text):
- """Reduce lines replaced using CR character (\r)"""
- while '\r' in text:
- prefix, suffix = text.rsplit('\r', 1)
- if '\n' in prefix:
- prefix = prefix.rsplit('\n', 1)[0]
- prefix += '\n'
- else:
- prefix = ''
- text = prefix + suffix
- return text
- def install(self):
- async def coro():
- self.actions.sort()
- for oper, grp in itertools.groupby(self.actions, lambda x: x[0]):
- oper = oper.lower()
- # No need to specify versions for local operations
- if oper in ('remove', 'purge'):
- specs = [x.name for x in grp]
- else:
- specs = [x.name + '-' + x.evr for x in grp]
- # FIXME: (C)Python versions before 3.9 fully-buffers stderr in
- # this context, cf. https://bugs.python.org/issue13601
- # Forcing it to be unbuffered for the time being so that
- # the messages can be displayed in time.
- envs = os.environ.copy()
- envs['PYTHONUNBUFFERED'] = '1'
- proc = await asyncio.create_subprocess_exec(
- *(BASE_CMD + [oper, '--'] + specs),
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
- env=envs)
- #pylint: disable=cell-var-from-loop
- status_text = ''
- while True:
- line = await proc.stdout.read(100)
- if line == b'':
- break
- line = line.decode('UTF-8')
- status_text = self._process_cr(status_text + line)
- self.textEdit.setPlainText(status_text)
- if await proc.wait() != 0:
- self.buttonBox.show()
- self.progressBar.setMaximum(100)
- self.progressBar.setValue(0)
- return False
- self.progressBar.setMaximum(100)
- self.progressBar.setValue(100)
- self.buttonBox.show()
- return True
- asyncio.create_task(coro())
- class QvmTemplateWindow(
- ui_qvmtemplate.Ui_QubesTemplateManager,
- PyQt5.QtWidgets.QMainWindow):
- def __init__(self, qt_app, qubes_app, dispatcher, parent=None):
- _ = parent # unused
- super().__init__()
- self.setupUi(self)
- self.listing.header().setSectionResizeMode(
- PyQt5.QtWidgets.QHeaderView.ResizeToContents)
- self.qubes_app = qubes_app
- self.qt_app = qt_app
- self.dispatcher = dispatcher
- self.listing_model = TemplateModel()
- self.listing_delegate = TemplateStatusDelegate(self.listing)
- self.listing.setModel(self.listing_model)
- self.listing.setItemDelegateForColumn(0, self.listing_delegate)
- self.refresh(False)
- self.listing.setItemDelegateForColumn(0, self.listing_delegate)
- self.listing.selectionModel() \
- .selectionChanged.connect(self.update_info)
- self.actionRefresh.triggered.connect(lambda: self.refresh(True))
- self.actionInstall.triggered.connect(self.do_install)
- def update_info(self, selected):
- _ = selected # unused
- indices = [
- x
- for x in self.listing.selectionModel().selectedIndexes()
- if x.column() == 0]
- if len(indices) == 0:
- return
- self.infobox.clear()
- cursor = PyQt5.QtGui.QTextCursor(self.infobox.document())
- bold_fmt = PyQt5.QtGui.QTextCharFormat()
- bold_fmt.setFontWeight(PyQt5.QtGui.QFont.Bold)
- norm_fmt = PyQt5.QtGui.QTextCharFormat()
- if len(indices) > 1:
- cursor.insertText('Selected templates:\n', bold_fmt)
- for idx in indices:
- tpl = self.listing_model.children[idx.row()]
- cursor.insertText(tpl.name + '-' + tpl.evr + '\n', norm_fmt)
- else:
- idx = indices[0]
- tpl = self.listing_model.children[idx.row()]
- cursor.insertText('Name: ', bold_fmt)
- cursor.insertText(tpl.name + '\n', norm_fmt)
- cursor.insertText('Description:\n', bold_fmt)
- cursor.insertText(tpl.description + '\n', norm_fmt)
- def refresh(self, refresh=True):
- self.progressBar.show()
- async def coro():
- ok, stderr = await self.listing_model.refresh(refresh)
- self.infobox.clear()
- if not ok:
- cursor = PyQt5.QtGui.QTextCursor(self.infobox.document())
- fmt = PyQt5.QtGui.QTextCharFormat()
- fmt.setFontWeight(PyQt5.QtGui.QFont.Bold)
- cursor.insertText('Failed to fetch template list:\n', fmt)
- fmt.setFontWeight(PyQt5.QtGui.QFont.Normal)
- cursor.insertText(stderr, fmt)
- self.progressBar.hide()
- asyncio.create_task(coro())
- def do_install(self):
- actions = self.listing_model.get_actions()
- confirm = TemplateInstallConfirmDialog(actions)
- if confirm.exec_():
- progress = TemplateInstallProgressDialog(actions)
- progress.install()
- progress.exec_()
- self.refresh()
- def main():
- utils.run_asynchronous(QvmTemplateWindow)
- if __name__ == '__main__':
- main()
|