diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index 61925b3..ca3818b 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -8,6 +8,7 @@ import fnmatch import functools import itertools import os +import re import shutil import subprocess import sys @@ -167,7 +168,10 @@ def install(args, app, version_selector=VersionSelector.LATEST, dl_list = get_dl_list(args, app, version_selector=version_selector) dl_list_copy = dl_list.copy() # Verify that the templates are not yet installed + # TODO: Check that installed versions satisfy + # the {reinstall,{up,down}grade} requirements for name, (ver, dlsize, reponame) in dl_list.items(): + assert reponame != '@commandline' if not ignore_existing and name in app.domains: print(('Template \'%s\' already installed, skipping...' ' (You may want to use the {reinstall,upgrade,downgrade}' @@ -185,7 +189,6 @@ def install(args, app, version_selector=VersionSelector.LATEST, dl_list=dl_list, suffix=UNVERIFIED_SUFFIX, version_selector=version_selector) - # XXX: Verify if package name is what we want? for rpmfile, dlsize, reponame in rpm_list: if reponame != '@commandline': path = rpmfile + UNVERIFIED_SUFFIX @@ -231,18 +234,27 @@ def install(args, app, version_selector=VersionSelector.LATEST, app.domains.refresh_cache(force=True) tpl = app.domains[name] + tpl.features['template-name'] = name tpl.features['template-epoch'] = \ package_hdr[rpm.RPMTAG_EPOCHNUM] tpl.features['template-version'] = \ package_hdr[rpm.RPMTAG_VERSION] tpl.features['template-release'] = \ package_hdr[rpm.RPMTAG_RELEASE] - tpl.features['template-install-date'] = \ - str(datetime.datetime.today()) - tpl.features['template-name'] = name tpl.features['template-reponame'] = reponame + tpl.features['template-buildtime'] = \ + str(datetime.datetime.fromtimestamp( + int(package_hdr[rpm.RPMTAG_BUILDTIME]))) + tpl.features['template-install-time'] = \ + str(datetime.datetime.today()) + tpl.features['template-license'] = \ + package_hdr[rpm.RPMTAG_LICENSE] + tpl.features['template-url'] = \ + package_hdr[rpm.RPMTAG_URL] tpl.features['template-summary'] = \ package_hdr[rpm.RPMTAG_SUMMARY] + tpl.features['template-description'] = \ + package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|') finally: os.remove(LOCK_FILE) @@ -297,18 +309,52 @@ def qrexec_repoquery(args, app, spec='*'): for line in stderr.decode('ASCII').rstrip().split('\n'): print('[Qrexec] %s' % line, file=sys.stderr) raise ConnectionError("qrexec call 'qubes.TemplateSearch' failed.") + name_re = re.compile(r'^[A-Za-z0-9._+\-]*$') + evr_re = re.compile(r'^[A-Za-z0-9._+~]*$') + date_re = re.compile(r'^\d+-\d+-\d+ \d+:\d+$') + license_re = re.compile(r'^[A-Za-z0-9._+\-()]*$') result = [] - for line in stdout.strip().split('\n'): - # Make sure that there are at most 7 fields - # As there may be colons in the summary - entry = line.split(':', 7 - 1) - # If there are not enough entries, raise an error - if len(entry) < 7: - raise ConnectionError("qrexec call 'qubes.TemplateSearch' failed.") - if not entry[0].startswith(PACKAGE_NAME_PREFIX): + for line in stdout.split('|\n'): + # Note that there's an empty entry at the end as .strip() is not used. + # This is because if .strip() is used, the .split() will not work. + if line == '': continue - entry[0] = entry[0][len(PACKAGE_NAME_PREFIX):] - result.append(tuple(entry)) + entry = line.split('|') + try: + # If there is an incorrect number of entries, raise an error + name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description = entry + + # Ignore packages that are not templates + if not name.startswith(PACKAGE_NAME_PREFIX): + continue + name = name[len(PACKAGE_NAME_PREFIX):] + + # Check that the values make sense + if not re.fullmatch(name_re, name): + raise ValueError + for val in [epoch, version, release]: + if not re.fullmatch(evr_re, val): + raise ValueError + if not re.fullmatch(name_re, reponame): + raise ValueError + dlsize = int(dlsize) + # First verify that the date does not look weird, then parse it + if not re.fullmatch(date_re, buildtime): + raise ValueError + buildtime = datetime.datetime.strptime(buildtime, '%Y-%m-%d %H:%M') + # XXX: Perhaps whitelist licenses directly? + if not re.fullmatch(license_re, license): + raise ValueError + # Check name actually matches spec + if not is_match_spec(name, epoch, version, release, spec): + continue + + result.append((name, epoch, version, release, reponame, dlsize, + buildtime, license, url, summary, description)) + except (TypeError, ValueError): + raise ConnectionError(("qrexec call 'qubes.TemplateSearch' failed:" + " unexpected data format.")) return result def qrexec_download(args, app, spec, path, dlsize=None): @@ -360,22 +406,35 @@ def is_match_spec(name, epoch, version, release, spec): def list_templates(args, app, operation): tpl_list = [] - def append_list(data, status): + def append_list(data, status, install_time=None): + _ = install_time # unused #pylint: disable=unused-variable - name, epoch, version, release, reponame, dlsize, summary = data + name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description = data version_str = build_version_str((epoch, version, release)) tpl_list.append((status, name, version_str, reponame)) - def append_info(data, status): - name, epoch, version, release, reponame, dlsize, summary = data + def append_info(data, status, install_time=None): + name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description = data tpl_list.append((status, 'Name', ':', name)) tpl_list.append((status, 'Epoch', ':', epoch)) tpl_list.append((status, 'Version', ':', version)) tpl_list.append((status, 'Release', ':', release)) tpl_list.append((status, 'Size', ':', - qubesadmin.utils.size_to_human(int(dlsize)))) + qubesadmin.utils.size_to_human(dlsize))) tpl_list.append((status, 'Repository', ':', reponame)) + tpl_list.append((status, 'Buildtime', ':', str(buildtime))) + if install_time: + tpl_list.append((status, 'Install time', ':', str(install_time))) + tpl_list.append((status, 'URL', ':', url)) + tpl_list.append((status, 'License', ':', license)) tpl_list.append((status, 'Summary', ':', summary)) + # Only show "Description" for the first line + title = 'Description' + for line in description.splitlines(): + tpl_list.append((status, title, ':', line)) + title = '' tpl_list.append((status, ' ', ' ', ' ')) # empty line if operation == 'list': @@ -388,13 +447,20 @@ def list_templates(args, app, operation): def append_vm(vm, status): if vm.name == vm.features['template-name']: append(( - vm.features['template-name'], - vm.features['template-epoch'], - vm.features['template-version'], - vm.features['template-release'], - vm.features['template-reponame'], - vm.get_disk_utilization(), - vm.features['template-summary']), status) + vm.features['template-name'], + vm.features['template-epoch'], + vm.features['template-version'], + vm.features['template-release'], + vm.features['template-reponame'], + vm.get_disk_utilization(), + vm.features['template-buildtime'], + vm.features['template-license'], + vm.features['template-url'], + vm.features['template-summary'], + vm.features['template-description'].replace('|', '\n') + ), + status, + vm.features['template-install-time']) if not (args.installed or args.available or args.extras or args.upgrades): args.all = True @@ -428,8 +494,8 @@ def list_templates(args, app, operation): if args.extras: remote = set() #pylint: disable=unused-variable - for name, epoch, version, release, reponame, dlsize, summary \ - in query_res: + for name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description in query_res: remote.add(name) for vm in app.domains: if 'template-name' in vm.features and \ @@ -445,7 +511,8 @@ def list_templates(args, app, operation): vm.features['template-version'], vm.features['template-release']) for data in query_res: - name, epoch, version, release, reponame, dlsize, summary = data + name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description = data if name in local: if rpm.labelCompare(local[name], (epoch, version, release)) < 0: append(data, TemplateState.UPGRADABLE) @@ -469,7 +536,12 @@ def search(args, app): vm.features['template-release'], vm.features['template-reponame'], vm.get_disk_utilization(), - vm.features['template-summary'])) + datetime.datetime.fromisoformat( + vm.features['template-buildtime']), + vm.features['template-license'], + vm.features['template-url'], + vm.features['template-summary'], + vm.features['template-description'])) # Get latest version for each template query_res_tmp = [] @@ -480,24 +552,36 @@ def search(args, app): query_res = query_res_tmp #pylint: disable=invalid-name - WEIGHT_NAME_EXACT = 1 << 3 - WEIGHT_NAME = 1 << 2 - WEIGHT_SUMMARY = 1 << 1 + WEIGHT_NAME_EXACT = 1 << 4 + WEIGHT_NAME = 1 << 3 + WEIGHT_SUMMARY = 1 << 2 + WEIGHT_DESCRIPTION = 1 << 1 + WEIGHT_URL = 1 << 0 + + WEIGHT_TO_FIELD = [ + (WEIGHT_NAME_EXACT, 'Name'), + (WEIGHT_NAME, 'Name'), + (WEIGHT_SUMMARY, 'Summary'), + (WEIGHT_DESCRIPTION, 'Description'), + (WEIGHT_URL, 'URL')] search_res = collections.defaultdict(list) for keyword in args.templates: #pylint: disable=unused-variable - for idx, (name, epoch, version, release, reponame, dlsize, summary) \ + for idx, (name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description) \ in enumerate(query_res): - if fnmatch.fnmatch(name, '*' + keyword + '*'): - exact = keyword == name - weight = WEIGHT_NAME_EXACT if exact else WEIGHT_NAME - search_res[idx].append((weight, keyword, exact)) - if fnmatch.fnmatch(summary, '*' + keyword + '*'): - search_res[idx].append( - (WEIGHT_SUMMARY, keyword, keyword == summary)) + needles = [(name, WEIGHT_NAME), (summary, WEIGHT_SUMMARY)] + if args.all: + needles += \ + [(description, WEIGHT_DESCRIPTION), (url, WEIGHT_URL)] + for key, weight in needles: + if fnmatch.fnmatch(key, '*' + keyword + '*'): + exact = keyword == key + if exact and weight == WEIGHT_NAME: + weight = WEIGHT_NAME_EXACT + search_res[idx].append((weight, keyword, exact)) - # TODO: Search in description and URL for --all? # Requires changes to the qrexec call qubes.TemplateSearch if not args.all: keywords = set(args.templates) @@ -517,14 +601,13 @@ def search(args, app): def gen_header(idx, needles): #pylint: disable=unused-variable - name, epoch, version, release, reponame, dlsize, summary = \ - query_res[idx] + name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description = query_res[idx] fields = [] weight_types = set(x[0] for x in needles) - if WEIGHT_NAME in weight_types: - fields.append('Name') - if WEIGHT_SUMMARY in weight_types: - fields.append('Summary') + for weight, field in WEIGHT_TO_FIELD: + if weight in weight_types: + fields.append(field) exact = all(x[-1] for x in needles) match = 'Exactly Matched' if exact else 'Matched' keywords = sorted(list(set(x[1] for x in needles))) @@ -538,8 +621,8 @@ def search(args, app): last_header = cur_header # XXX: The style is different from that of DNF print('===', cur_header, '===') - name, epoch, version, release, reponame, dlsize, summary = \ - query_res[idx] + name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description = query_res[idx] print(name, ':', summary) def get_dl_list(args, app, version_selector=VersionSelector.LATEST): @@ -557,14 +640,13 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST): # We only select one package for each distinct package name #pylint: disable=unused-variable - for name, epoch, version, release, reponame, dlsize, summary \ - in query_res: - assert reponame != '@commandline' + for name, epoch, version, release, reponame, dlsize, \ + buildtime, license, url, summary, description in query_res: ver = (epoch, version, release) if version_selector == VersionSelector.LATEST: if name not in candid \ or rpm.labelCompare(candid[name][0], ver) < 0: - candid[name] = (ver, int(dlsize), reponame) + candid[name] = (ver, dlsize, reponame) elif version_selector == VersionSelector.REINSTALL: if name not in app.domains: parser.error("Template '%s' not installed." % name) @@ -574,7 +656,7 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST): vm.features['template-version'], vm.features['template-release']) if rpm.labelCompare(ver, cur_ver) == 0: - candid[name] = (ver, int(dlsize), reponame) + candid[name] = (ver, dlsize, reponame) elif version_selector in [VersionSelector.LATEST_LOWER, VersionSelector.LATEST_HIGHER]: if name not in app.domains: @@ -590,7 +672,7 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST): if rpm.labelCompare(ver, cur_ver) == cmp_res: if name not in candid \ or rpm.labelCompare(candid[name][0], ver) < 0: - candid[name] = (ver, int(dlsize), reponame) + candid[name] = (ver, dlsize, reponame) if len(candid) == 0: if version_selector == VersionSelector.LATEST: