123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- import collections
- import concurrent
- import concurrent.futures
- import itertools
- import json
- import os
- import subprocess
- import typing
- import gi
- gi.require_version('Gtk', '3.0')
- #pylint: disable=wrong-import-position
- from gi.repository import GLib
- from gi.repository import Gtk
- from gi.repository import Pango
- BASE_CMD = ['qvm-template', '--enablerepo=*', '--yes', '--quiet']
- class Template(typing.NamedTuple):
- status: str
- name: str
- evr: str
- reponame: str
- size: int
- buildtime: str
- installtime: str
- licence: str
- url: str
- summary: str
- # --- internal ---
- description: str
- default_status: str
- weight: int
- model: Gtk.TreeModel
- # ----------------
- # XXX: Is there a better way of doing this?
- TYPES = [str, str, str, str, int, str, str, str,
- str, str, str, str, int, Gtk.TreeModel]
- COL_NAMES = [
- 'Status',
- 'Name',
- 'Version',
- 'Reponame',
- 'Size (kB)',
- 'Build Time',
- 'Install Time',
- 'License',
- 'URL',
- 'Summary']
- @staticmethod
- def build(status, entry, model):
- return Template(
- status,
- entry['name'],
- '%s:%s-%s' % (entry['epoch'], entry['version'], entry['release']),
- entry['reponame'],
- # XXX: This may overflow glib ints, though pretty unlikely in the
- # foreseeable future
- int(entry['size']) / 1000,
- entry['buildtime'],
- entry['installtime'],
- entry['license'],
- entry['url'],
- entry['summary'],
- entry['description'],
- status,
- Pango.Weight.BOOK,
- model
- )
- class Action(typing.NamedTuple):
- op: str
- name: str
- evr: str
- TYPES = [str, str, str]
- COL_NAMES = ['Operation', 'Name', 'Version']
- # TODO: Set default window sizes
- class ConfirmDialog(Gtk.Dialog):
- def __init__(self, parent, actions):
- super(ConfirmDialog, self).__init__(
- title='Confirmation', transient_for=parent, modal=True)
- self.add_buttons(
- Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
- Gtk.STOCK_OK, Gtk.ResponseType.OK)
- box = self.get_content_area()
- self.msg = Gtk.Label()
- self.msg.set_markup((
- '<b>WARNING: Local changes made to the following'
- ' templates will be overwritten! Continue?</b>'))
- box.add(self.msg)
- self.store = Gtk.ListStore(*Action.TYPES)
- self.listing = Gtk.TreeView(model=self.store)
- for idx, colname in enumerate(Action.COL_NAMES):
- renderer = Gtk.CellRendererText()
- col = Gtk.TreeViewColumn(colname, renderer, text=idx)
- self.listing.append_column(col)
- col.set_sort_column_id(idx)
- for row in actions:
- self.store.append(row)
- self.scrollable_listing = Gtk.ScrolledWindow()
- self.scrollable_listing.add(self.listing)
- box.pack_start(self.scrollable_listing, True, True, 16)
- self.show_all()
- class ProgressDialog(Gtk.Dialog):
- def __init__(self, parent):
- super(ProgressDialog, self).__init__(
- title='Processing...', transient_for=parent, modal=True)
- box = self.get_content_area()
- self.spinner = Gtk.Spinner()
- self.spinner.start()
- box.add(self.spinner)
- self.msg = Gtk.Label()
- self.msg.set_text('Processing...')
- box.add(self.msg)
- self.infobox = Gtk.TextView()
- self.scrollable = Gtk.ScrolledWindow()
- self.scrollable.add(self.infobox)
- box.pack_start(self.scrollable, True, True, 16)
- self.show_all()
- def finish(self, success):
- self.spinner.stop()
- if success:
- self.msg.set_text('Operations succeeded.')
- else:
- self.msg.set_markup('<b>Error:</b>')
- self.add_button(Gtk.STOCK_OK, Gtk.ResponseType.OK)
- self.run()
- class QubesTemplateApp(Gtk.Window):
- def __init__(self):
- super(QubesTemplateApp, self).__init__(title='Qubes Template Manager')
- self.iconsize = Gtk.IconSize.SMALL_TOOLBAR
- self.executor = concurrent.futures.ThreadPoolExecutor()
- self.outerbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
- self.__build_action_models()
- self.__build_toolbar()
- self.__build_listing()
- self.__build_infobox()
- self.add(self.outerbox)
- def __build_action_models(self):
- #pylint: disable=invalid-name
- OPS = [
- ['Installed', 'Reinstall', 'Remove'],
- ['Extra', 'Remove'],
- ['Upgradable', 'Upgrade', 'Remove'],
- ['Downgradable', 'Downgrade', 'Remove'],
- ['Available', 'Install']
- ]
- self.action_models = {}
- for ops in OPS:
- # First element is the default status for the certain class of
- # templates
- self.action_models[ops[0]] = Gtk.ListStore(str)
- for oper in ops:
- self.action_models[ops[0]].append([oper])
- def __build_toolbar(self):
- self.toolbar = Gtk.Toolbar()
- self.btn_refresh = Gtk.ToolButton(
- icon_widget=Gtk.Image.new_from_icon_name(
- 'view-refresh', self.iconsize),
- label='Refresh')
- self.btn_refresh.connect('clicked', self.refresh)
- self.toolbar.insert(self.btn_refresh, 0)
- self.btn_install = Gtk.ToolButton(
- icon_widget=Gtk.Image.new_from_icon_name('go-down', self.iconsize),
- label='Apply')
- self.btn_install.connect('clicked', self.show_confirm)
- self.toolbar.insert(self.btn_install, 1)
- self.outerbox.pack_start(self.toolbar, False, True, 0)
- def __build_listing(self):
- self.store = Gtk.ListStore(*Template.TYPES)
- self.listing = Gtk.TreeView(model=self.store)
- self.cols = []
- for idx, colname in enumerate(Template.COL_NAMES):
- if colname == 'Status':
- renderer = Gtk.CellRendererCombo()
- renderer.set_property('editable', True)
- renderer.set_property('has-entry', False)
- renderer.set_property('text-column', 0)
- renderer.connect('edited', self.entry_edit)
- col = Gtk.TreeViewColumn(
- colname,
- renderer,
- text=idx,
- weight=len(Template.TYPES) - 2,
- model=len(Template.TYPES) - 1)
- else:
- renderer = Gtk.CellRendererText()
- col = Gtk.TreeViewColumn(
- colname,
- renderer,
- text=idx,
- weight=len(Template.TYPES) - 2)
- # Right-align for integers
- if Template.TYPES[idx] is int:
- renderer.set_property('xalign', 1.0)
- self.cols.append(col)
- self.listing.append_column(col)
- col.set_sort_column_id(idx)
- sel = self.listing.get_selection()
- sel.set_mode(Gtk.SelectionMode.MULTIPLE)
- sel.connect('changed', self.update_info)
- self.scrollable_listing = Gtk.ScrolledWindow()
- self.scrollable_listing.add(self.listing)
- self.scrollable_listing.set_visible(False)
- self.spinner = Gtk.Spinner()
- self.outerbox.pack_start(self.scrollable_listing, True, True, 0)
- self.outerbox.pack_start(self.spinner, True, True, 0)
- def __build_infobox(self):
- self.infobox = Gtk.TextView()
- self.outerbox.pack_start(self.infobox, True, True, 16)
- def refresh(self, button=None):
- # Ignore if we're already doing a refresh
- #pylint: disable=no-member
- if self.spinner.props.active:
- return
- self.scrollable_listing.set_visible(False)
- self.spinner.start()
- self.spinner.set_visible(True)
- self.store.clear()
- def worker():
- cmd = BASE_CMD[:]
- if button is not None:
- # Force refresh if triggered by button press
- cmd.append('--refresh')
- cmd.extend(['info', '--machine-readable-json', '--installed',
- '--available', '--upgrades', '--extras'])
- output = subprocess.check_output(cmd)
- # Default type is dict as we're going to replace the lists with
- # dicts shortly after
- tpls = collections.defaultdict(dict, json.loads(output))
- # Remove duplicates
- # Should this be done in qvm-template?
- # TODO: Merge templates with same name?
- # If so, we may need to have a separate UI to force versions.
- local_names = set(x['name'] for x in tpls['installed'])
- # Convert to dict for easier subtraction
- for key in tpls:
- tpls[key] = {
- (x['name'], x['epoch'], x['version'], x['release']): x
- for x in tpls[key]}
- tpls['installed'] = {
- k: v for k, v in tpls['installed'].items()
- if k not in tpls['extra'] and k not in tpls['upgradable']}
- tpls['available'] = {
- k: v for k, v in tpls['available'].items()
- if k not in tpls['installed']
- and k not in tpls['upgradable']}
- # If the package name is installed but the specific version is
- # neither installed or an upgrade, then it must be a downgrade
- tpls['downgradable'] = {
- k: v for k, v in tpls['available'].items()
- if k[0] in local_names}
- tpls['available'] = {
- k: v for k, v in tpls['available'].items()
- if k not in tpls['downgradable']}
- # Convert back to list
- for key in tpls:
- tpls[key] = list(tpls[key].values())
- for status, seq in tpls.items():
- status_str = status.title()
- for entry in seq:
- self.store.append(Template.build(
- status_str, entry, self.action_models[status_str]))
- def finish_cb(future):
- def callback():
- if future.exception() is not None:
- buf = self.infobox.get_buffer()
- buf.set_text('Error:\n' + str(future.exception()))
- self.spinner.set_visible(False)
- self.spinner.stop()
- self.scrollable_listing.set_visible(True)
- GLib.idle_add(callback)
- future = self.executor.submit(worker)
- future.add_done_callback(finish_cb)
- def show_confirm(self, button=None):
- _ = button # unused
- actions = []
- for row in self.store:
- tpl = Template(*row)
- if tpl.status != tpl.default_status:
- actions.append(Action(tpl.status, tpl.name, tpl.evr))
- dialog = ConfirmDialog(self, actions)
- resp = dialog.run()
- dialog.destroy()
- if resp == Gtk.ResponseType.OK:
- self.do_install(actions)
- def do_install(self, actions):
- dialog = ProgressDialog(self)
- def worker():
- actions.sort()
- for oper, grp in itertools.groupby(actions, lambda x: x[0]):
- oper = oper.lower()
- # No need to specify versions for local operations
- if oper in ('remove', 'purge'):
- specs = [x.name for x in grp]
- else:
- specs = [x.name + '-' + x.evr for x in grp]
- # FIXME: (C)Python versions before 3.9 fully-buffers stderr in
- # this context, cf. https://bugs.python.org/issue13601
- # Forcing it to be unbuffered for the time being so that
- # the messages can be displayed in time.
- envs = os.environ.copy()
- envs['PYTHONUNBUFFERED'] = '1'
- proc = subprocess.Popen(
- BASE_CMD + [oper, '--'] + specs,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True,
- bufsize=1,
- env=envs)
- #pylint: disable=cell-var-from-loop
- for line in iter(proc.stdout.readline, ''):
- # Need to modify the buffers in the main thread
- def callback():
- buf = dialog.infobox.get_buffer()
- end_iter = buf.get_end_iter()
- buf.insert(end_iter, line)
- GLib.idle_add(callback)
- if proc.wait() != 0:
- return False
- return True
- def finish_cb(future):
- def callback():
- dialog.finish(future.result())
- dialog.destroy()
- self.refresh()
- GLib.idle_add(callback)
- future = self.executor.submit(worker)
- future.add_done_callback(finish_cb)
- def update_info(self, sel):
- model, treeiters = sel.get_selected_rows()
- if not treeiters:
- return
- buf = self.infobox.get_buffer()
- if len(treeiters) > 1:
- def row_to_spec(row):
- tpl = Template(*row)
- return tpl.name + '-' + tpl.evr
- text = '\n'.join(row_to_spec(model[it]) for it in treeiters)
- buf.set_text('Selected templates:\n' + text)
- else:
- itr = treeiters[0]
- tpl = Template(*model[itr])
- text = 'Name: %s\n\nDescription:\n%s' % (tpl.name, tpl.description)
- buf.set_text(text)
- def entry_edit(self, widget, path, text):
- _ = widget # unused
- #pylint: disable=unsubscriptable-object
- tpl = Template(*self.store[path])
- tpl = tpl._replace(status=text)
- if text == tpl.default_status:
- tpl = tpl._replace(weight=Pango.Weight.BOOK)
- else:
- tpl = tpl._replace(weight=Pango.Weight.BOLD)
- #pylint: disable=unsupported-assignment-operation
- self.store[path] = tpl
- if __name__ == '__main__':
- main = QubesTemplateApp()
- main.connect('destroy', Gtk.main_quit)
- main.show_all()
- main.refresh()
- Gtk.main()
|