qvm-template: Incorporate additional metadata in qubes.TemplateSearch.

This commit is contained in:
WillyPillow 2020-07-29 18:36:02 +08:00
parent 421dd74dd2
commit 88ee572cac

View File

@ -8,6 +8,7 @@ import fnmatch
import functools
import itertools
import os
import re
import shutil
import subprocess
import sys
@ -167,7 +168,10 @@ def install(args, app, version_selector=VersionSelector.LATEST,
dl_list = get_dl_list(args, app, version_selector=version_selector)
dl_list_copy = dl_list.copy()
# Verify that the templates are not yet installed
# TODO: Check that installed versions satisfy
# the {reinstall,{up,down}grade} requirements
for name, (ver, dlsize, reponame) in dl_list.items():
assert reponame != '@commandline'
if not ignore_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the {reinstall,upgrade,downgrade}'
@ -185,7 +189,6 @@ def install(args, app, version_selector=VersionSelector.LATEST,
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
version_selector=version_selector)
# XXX: Verify if package name is what we want?
for rpmfile, dlsize, reponame in rpm_list:
if reponame != '@commandline':
path = rpmfile + UNVERIFIED_SUFFIX
@ -231,18 +234,27 @@ def install(args, app, version_selector=VersionSelector.LATEST,
app.domains.refresh_cache(force=True)
tpl = app.domains[name]
tpl.features['template-name'] = name
tpl.features['template-epoch'] = \
package_hdr[rpm.RPMTAG_EPOCHNUM]
tpl.features['template-version'] = \
package_hdr[rpm.RPMTAG_VERSION]
tpl.features['template-release'] = \
package_hdr[rpm.RPMTAG_RELEASE]
tpl.features['template-install-date'] = \
str(datetime.datetime.today())
tpl.features['template-name'] = name
tpl.features['template-reponame'] = reponame
tpl.features['template-buildtime'] = \
str(datetime.datetime.fromtimestamp(
int(package_hdr[rpm.RPMTAG_BUILDTIME])))
tpl.features['template-install-time'] = \
str(datetime.datetime.today())
tpl.features['template-license'] = \
package_hdr[rpm.RPMTAG_LICENSE]
tpl.features['template-url'] = \
package_hdr[rpm.RPMTAG_URL]
tpl.features['template-summary'] = \
package_hdr[rpm.RPMTAG_SUMMARY]
tpl.features['template-description'] = \
package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|')
finally:
os.remove(LOCK_FILE)
@ -297,18 +309,52 @@ def qrexec_repoquery(args, app, spec='*'):
for line in stderr.decode('ASCII').rstrip().split('\n'):
print('[Qrexec] %s' % line, file=sys.stderr)
raise ConnectionError("qrexec call 'qubes.TemplateSearch' failed.")
name_re = re.compile(r'^[A-Za-z0-9._+\-]*$')
evr_re = re.compile(r'^[A-Za-z0-9._+~]*$')
date_re = re.compile(r'^\d+-\d+-\d+ \d+:\d+$')
license_re = re.compile(r'^[A-Za-z0-9._+\-()]*$')
result = []
for line in stdout.strip().split('\n'):
# Make sure that there are at most 7 fields
# As there may be colons in the summary
entry = line.split(':', 7 - 1)
# If there are not enough entries, raise an error
if len(entry) < 7:
raise ConnectionError("qrexec call 'qubes.TemplateSearch' failed.")
if not entry[0].startswith(PACKAGE_NAME_PREFIX):
for line in stdout.split('|\n'):
# Note that there's an empty entry at the end as .strip() is not used.
# This is because if .strip() is used, the .split() will not work.
if line == '':
continue
entry[0] = entry[0][len(PACKAGE_NAME_PREFIX):]
result.append(tuple(entry))
entry = line.split('|')
try:
# If there is an incorrect number of entries, raise an error
name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description = entry
# Ignore packages that are not templates
if not name.startswith(PACKAGE_NAME_PREFIX):
continue
name = name[len(PACKAGE_NAME_PREFIX):]
# Check that the values make sense
if not re.fullmatch(name_re, name):
raise ValueError
for val in [epoch, version, release]:
if not re.fullmatch(evr_re, val):
raise ValueError
if not re.fullmatch(name_re, reponame):
raise ValueError
dlsize = int(dlsize)
# First verify that the date does not look weird, then parse it
if not re.fullmatch(date_re, buildtime):
raise ValueError
buildtime = datetime.datetime.strptime(buildtime, '%Y-%m-%d %H:%M')
# XXX: Perhaps whitelist licenses directly?
if not re.fullmatch(license_re, license):
raise ValueError
# Check name actually matches spec
if not is_match_spec(name, epoch, version, release, spec):
continue
result.append((name, epoch, version, release, reponame, dlsize,
buildtime, license, url, summary, description))
except (TypeError, ValueError):
raise ConnectionError(("qrexec call 'qubes.TemplateSearch' failed:"
" unexpected data format."))
return result
def qrexec_download(args, app, spec, path, dlsize=None):
@ -360,22 +406,35 @@ def is_match_spec(name, epoch, version, release, spec):
def list_templates(args, app, operation):
tpl_list = []
def append_list(data, status):
def append_list(data, status, install_time=None):
_ = install_time # unused
#pylint: disable=unused-variable
name, epoch, version, release, reponame, dlsize, summary = data
name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description = data
version_str = build_version_str((epoch, version, release))
tpl_list.append((status, name, version_str, reponame))
def append_info(data, status):
name, epoch, version, release, reponame, dlsize, summary = data
def append_info(data, status, install_time=None):
name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description = data
tpl_list.append((status, 'Name', ':', name))
tpl_list.append((status, 'Epoch', ':', epoch))
tpl_list.append((status, 'Version', ':', version))
tpl_list.append((status, 'Release', ':', release))
tpl_list.append((status, 'Size', ':',
qubesadmin.utils.size_to_human(int(dlsize))))
qubesadmin.utils.size_to_human(dlsize)))
tpl_list.append((status, 'Repository', ':', reponame))
tpl_list.append((status, 'Buildtime', ':', str(buildtime)))
if install_time:
tpl_list.append((status, 'Install time', ':', str(install_time)))
tpl_list.append((status, 'URL', ':', url))
tpl_list.append((status, 'License', ':', license))
tpl_list.append((status, 'Summary', ':', summary))
# Only show "Description" for the first line
title = 'Description'
for line in description.splitlines():
tpl_list.append((status, title, ':', line))
title = ''
tpl_list.append((status, ' ', ' ', ' ')) # empty line
if operation == 'list':
@ -394,7 +453,14 @@ def list_templates(args, app, operation):
vm.features['template-release'],
vm.features['template-reponame'],
vm.get_disk_utilization(),
vm.features['template-summary']), status)
vm.features['template-buildtime'],
vm.features['template-license'],
vm.features['template-url'],
vm.features['template-summary'],
vm.features['template-description'].replace('|', '\n')
),
status,
vm.features['template-install-time'])
if not (args.installed or args.available or args.extras or args.upgrades):
args.all = True
@ -428,8 +494,8 @@ def list_templates(args, app, operation):
if args.extras:
remote = set()
#pylint: disable=unused-variable
for name, epoch, version, release, reponame, dlsize, summary \
in query_res:
for name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description in query_res:
remote.add(name)
for vm in app.domains:
if 'template-name' in vm.features and \
@ -445,7 +511,8 @@ def list_templates(args, app, operation):
vm.features['template-version'],
vm.features['template-release'])
for data in query_res:
name, epoch, version, release, reponame, dlsize, summary = data
name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description = data
if name in local:
if rpm.labelCompare(local[name], (epoch, version, release)) < 0:
append(data, TemplateState.UPGRADABLE)
@ -469,7 +536,12 @@ def search(args, app):
vm.features['template-release'],
vm.features['template-reponame'],
vm.get_disk_utilization(),
vm.features['template-summary']))
datetime.datetime.fromisoformat(
vm.features['template-buildtime']),
vm.features['template-license'],
vm.features['template-url'],
vm.features['template-summary'],
vm.features['template-description']))
# Get latest version for each template
query_res_tmp = []
@ -480,24 +552,36 @@ def search(args, app):
query_res = query_res_tmp
#pylint: disable=invalid-name
WEIGHT_NAME_EXACT = 1 << 3
WEIGHT_NAME = 1 << 2
WEIGHT_SUMMARY = 1 << 1
WEIGHT_NAME_EXACT = 1 << 4
WEIGHT_NAME = 1 << 3
WEIGHT_SUMMARY = 1 << 2
WEIGHT_DESCRIPTION = 1 << 1
WEIGHT_URL = 1 << 0
WEIGHT_TO_FIELD = [
(WEIGHT_NAME_EXACT, 'Name'),
(WEIGHT_NAME, 'Name'),
(WEIGHT_SUMMARY, 'Summary'),
(WEIGHT_DESCRIPTION, 'Description'),
(WEIGHT_URL, 'URL')]
search_res = collections.defaultdict(list)
for keyword in args.templates:
#pylint: disable=unused-variable
for idx, (name, epoch, version, release, reponame, dlsize, summary) \
for idx, (name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description) \
in enumerate(query_res):
if fnmatch.fnmatch(name, '*' + keyword + '*'):
exact = keyword == name
weight = WEIGHT_NAME_EXACT if exact else WEIGHT_NAME
needles = [(name, WEIGHT_NAME), (summary, WEIGHT_SUMMARY)]
if args.all:
needles += \
[(description, WEIGHT_DESCRIPTION), (url, WEIGHT_URL)]
for key, weight in needles:
if fnmatch.fnmatch(key, '*' + keyword + '*'):
exact = keyword == key
if exact and weight == WEIGHT_NAME:
weight = WEIGHT_NAME_EXACT
search_res[idx].append((weight, keyword, exact))
if fnmatch.fnmatch(summary, '*' + keyword + '*'):
search_res[idx].append(
(WEIGHT_SUMMARY, keyword, keyword == summary))
# TODO: Search in description and URL for --all?
# Requires changes to the qrexec call qubes.TemplateSearch
if not args.all:
keywords = set(args.templates)
@ -517,14 +601,13 @@ def search(args, app):
def gen_header(idx, needles):
#pylint: disable=unused-variable
name, epoch, version, release, reponame, dlsize, summary = \
query_res[idx]
name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description = query_res[idx]
fields = []
weight_types = set(x[0] for x in needles)
if WEIGHT_NAME in weight_types:
fields.append('Name')
if WEIGHT_SUMMARY in weight_types:
fields.append('Summary')
for weight, field in WEIGHT_TO_FIELD:
if weight in weight_types:
fields.append(field)
exact = all(x[-1] for x in needles)
match = 'Exactly Matched' if exact else 'Matched'
keywords = sorted(list(set(x[1] for x in needles)))
@ -538,8 +621,8 @@ def search(args, app):
last_header = cur_header
# XXX: The style is different from that of DNF
print('===', cur_header, '===')
name, epoch, version, release, reponame, dlsize, summary = \
query_res[idx]
name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description = query_res[idx]
print(name, ':', summary)
def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
@ -557,14 +640,13 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
# We only select one package for each distinct package name
#pylint: disable=unused-variable
for name, epoch, version, release, reponame, dlsize, summary \
in query_res:
assert reponame != '@commandline'
for name, epoch, version, release, reponame, dlsize, \
buildtime, license, url, summary, description in query_res:
ver = (epoch, version, release)
if version_selector == VersionSelector.LATEST:
if name not in candid \
or rpm.labelCompare(candid[name][0], ver) < 0:
candid[name] = (ver, int(dlsize), reponame)
candid[name] = (ver, dlsize, reponame)
elif version_selector == VersionSelector.REINSTALL:
if name not in app.domains:
parser.error("Template '%s' not installed." % name)
@ -574,7 +656,7 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
vm.features['template-version'],
vm.features['template-release'])
if rpm.labelCompare(ver, cur_ver) == 0:
candid[name] = (ver, int(dlsize), reponame)
candid[name] = (ver, dlsize, reponame)
elif version_selector in [VersionSelector.LATEST_LOWER,
VersionSelector.LATEST_HIGHER]:
if name not in app.domains:
@ -590,7 +672,7 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
if rpm.labelCompare(ver, cur_ver) == cmp_res:
if name not in candid \
or rpm.labelCompare(candid[name][0], ver) < 0:
candid[name] = (ver, int(dlsize), reponame)
candid[name] = (ver, dlsize, reponame)
if len(candid) == 0:
if version_selector == VersionSelector.LATEST: