qvm-template: Switch to namedtuples and other slight cleanup.

This commit is contained in:
WillyPillow 2020-08-01 02:24:29 +08:00
parent 3ada7af0eb
commit 233e411c2f
No known key found for this signature in database
GPG Key ID: 3839E194B1415A9C

View File

@ -103,6 +103,50 @@ class VersionSelector(enum.Enum):
LATEST_LOWER = enum.auto()
LATEST_HIGHER = enum.auto()
Template = collections.namedtuple('Template', [
'name',
'epoch',
'version',
'release',
'reponame',
'dlsize',
'buildtime',
'licence',
'url',
'summary',
'description'
])
DlEntry = collections.namedtuple('DlEntry', [
'evr',
'reponame',
'dlsize'
])
def query_local(vm):
return Template(
vm.features['template-name'],
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'],
vm.features['template-reponame'],
vm.get_disk_utilization(),
vm.features['template-buildtime'],
vm.features['template-license'],
vm.features['template-url'],
vm.features['template-summary'],
vm.features['template-description'].replace('|', '\n'))
def query_local_evr(vm):
return (
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'])
def is_managed_template(vm):
return 'template-name' in vm.features \
and vm.name == vm.features['template-name']
# NOTE: Verifying RPMs this way is prone to TOCTOU. This is okay for local
# files, but may create problems if multiple instances of `qvm-template` are
# downloading the same file, so a lock is needed in that case.
@ -159,7 +203,7 @@ def install(args, app, version_selector=VersionSelector.LATEST,
try:
transaction_set = rpm.TransactionSet()
rpm_list = []
rpm_list = [] # rpmfile, dlsize, reponame
for template in args.templates:
if template.endswith('.rpm'):
if not os.path.exists(template):
@ -172,25 +216,27 @@ def install(args, app, version_selector=VersionSelector.LATEST,
dl_list = get_dl_list(args, app, version_selector=version_selector)
dl_list_copy = dl_list.copy()
# Verify that the templates are not yet installed
for name, (ver, dlsize, reponame) in dl_list.items():
assert reponame != '@commandline'
for name, entry in dl_list.items():
# Should be ensured by checks in repoquery
assert entry.reponame != '@commandline'
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the {reinstall,upgrade,downgrade}'
' operations.)') % name, file=sys.stderr)
del dl_list_copy[name]
else:
version_str = build_version_str(ver)
version_str = build_version_str(entry.evr)
target_file = \
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
rpm_list.append((os.path.join(args.cachedir, target_file),
dlsize, reponame))
entry.dlsize, entry.reponame))
dl_list = dl_list_copy
download(args, app, path_override=args.cachedir,
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
version_selector=version_selector)
# Verify package and remove unverified suffix
for rpmfile, dlsize, reponame in rpm_list:
if reponame != '@commandline':
path = rpmfile + UNVERIFIED_SUFFIX
@ -201,6 +247,7 @@ def install(args, app, version_selector=VersionSelector.LATEST,
if reponame != '@commandline':
os.rename(path, rpmfile)
# Unpack and install
for rpmfile, dlsize, reponame in rpm_list:
with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target:
package_hdr = get_package_hdr(rpmfile)
@ -229,10 +276,7 @@ def install(args, app, version_selector=VersionSelector.LATEST,
str(package_hdr[rpm.RPMTAG_EPOCHNUM]),
package_hdr[rpm.RPMTAG_VERSION],
package_hdr[rpm.RPMTAG_RELEASE])
vm_evr = (
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'])
vm_evr = query_local_evr(vm)
cmp_res = rpm.labelCompare(pkg_evr, vm_evr)
if version_selector == VersionSelector.REINSTALL \
and cmp_res != 0:
@ -311,6 +355,8 @@ def qrexec_popen(args, app, service, stdout=subprocess.PIPE, filter_esc=True):
def qrexec_payload(args, app, spec, refresh):
_ = app # unused
# TODO: Check that spec != '---'
def check_newline(string, name):
if '\n' in string:
parser.error(f"Malformed {name}:" +
@ -360,6 +406,8 @@ def qrexec_repoquery(args, app, spec='*', refresh=False):
entry = line.split('|')
try:
# If there is an incorrect number of entries, raise an error
# Unpack manually instead of stuffing into `Template` right away
# so that it's easier to mutate stuff.
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = entry
@ -388,8 +436,8 @@ def qrexec_repoquery(args, app, spec='*', refresh=False):
if not is_match_spec(name, epoch, version, release, spec):
continue
result.append((name, epoch, version, release, reponame, dlsize,
buildtime, licence, url, summary, description))
result.append(Template(name, epoch, version, release, reponame,
dlsize, buildtime, licence, url, summary, description))
except (TypeError, ValueError):
raise ConnectionError(("qrexec call 'qubes.TemplateSearch' failed:"
" unexpected data format."))
@ -446,31 +494,27 @@ def list_templates(args, app, operation):
def append_list(data, status, install_time=None):
_ = install_time # unused
#pylint: disable=unused-variable
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = data
version_str = build_version_str((epoch, version, release))
tpl_list.append((status, name, version_str, reponame))
version_str = build_version_str(
(data.epoch, data.version, data.release))
tpl_list.append((status, data.name, version_str, data.reponame))
def append_info(data, status, install_time=None):
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = data
tpl_list.append((status, 'Name', ':', name))
tpl_list.append((status, 'Epoch', ':', epoch))
tpl_list.append((status, 'Version', ':', version))
tpl_list.append((status, 'Release', ':', release))
tpl_list.append((status, 'Name', ':', data.name))
tpl_list.append((status, 'Epoch', ':', data.epoch))
tpl_list.append((status, 'Version', ':', data.version))
tpl_list.append((status, 'Release', ':', data.release))
tpl_list.append((status, 'Size', ':',
qubesadmin.utils.size_to_human(dlsize)))
tpl_list.append((status, 'Repository', ':', reponame))
tpl_list.append((status, 'Buildtime', ':', str(buildtime)))
qubesadmin.utils.size_to_human(data.dlsize)))
tpl_list.append((status, 'Repository', ':', data.reponame))
tpl_list.append((status, 'Buildtime', ':', str(data.buildtime)))
if install_time:
tpl_list.append((status, 'Install time', ':', str(install_time)))
tpl_list.append((status, 'URL', ':', url))
tpl_list.append((status, 'License', ':', licence))
tpl_list.append((status, 'Summary', ':', summary))
tpl_list.append((status, 'URL', ':', data.url))
tpl_list.append((status, 'License', ':', data.licence))
tpl_list.append((status, 'Summary', ':', data.summary))
# Only show "Description" for the first line
title = 'Description'
for line in description.splitlines():
for line in data.description.splitlines():
tpl_list.append((status, title, ':', line))
title = ''
tpl_list.append((status, ' ', ' ', ' ')) # empty line
@ -483,22 +527,7 @@ def list_templates(args, app, operation):
assert False and 'Unknown operation'
def append_vm(vm, status):
if vm.name == vm.features['template-name']:
append((
vm.features['template-name'],
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'],
vm.features['template-reponame'],
vm.get_disk_utilization(),
vm.features['template-buildtime'],
vm.features['template-license'],
vm.features['template-url'],
vm.features['template-summary'],
vm.features['template-description'].replace('|', '\n')
),
status,
vm.features['template-install-time'])
append(query_local(vm), status, vm.features['template-install-time'])
if not (args.installed or args.available or args.extras or args.upgrades):
args.all = True
@ -514,13 +543,11 @@ def list_templates(args, app, operation):
if args.installed or args.all:
for vm in app.domains:
if 'template-name' in vm.features:
if is_managed_template(vm):
if not args.templates or \
any(is_match_spec(
vm.features['template-name'],
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'],
*query_local_evr(vm),
spec)[0]
for spec in args.templates):
append_vm(vm, TemplateState.INSTALLED)
@ -531,29 +558,23 @@ def list_templates(args, app, operation):
if args.extras:
remote = set()
#pylint: disable=unused-variable
for name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description in query_res:
remote.add(name)
for data in query_res:
remote.add(data.name)
for vm in app.domains:
if 'template-name' in vm.features and \
if is_managed_template(vm) and \
vm.features['template-name'] not in remote:
append_vm(vm, TemplateState.EXTRA)
if args.upgrades:
local = {}
for vm in app.domains:
if 'template-name' in vm.features:
local[vm.features['template-name']] = (
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'])
for data in query_res:
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = data
if name in local:
if rpm.labelCompare(local[name], (epoch, version, release)) < 0:
append(data, TemplateState.UPGRADABLE)
if is_managed_template(vm):
local[vm.features['template-name']] = query_local_evr(vm)
for entry in query_res:
if entry.name in local:
if rpm.labelCompare(local[entry.name],
(entry.epoch, entry.version, entry.release)) < 0:
append(entry, TemplateState.UPGRADABLE)
if len(tpl_list) == 0:
parser.error('No matching templates to list')
@ -566,24 +587,12 @@ def search(args, app):
# Search in both installed and available templates
query_res = qrexec_repoquery(args, app)
for vm in app.domains:
if 'template-name' in vm.features:
query_res.append((
vm.features['template-name'],
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'],
vm.features['template-reponame'],
vm.get_disk_utilization(),
datetime.datetime.fromisoformat(
vm.features['template-buildtime']),
vm.features['template-license'],
vm.features['template-url'],
vm.features['template-summary'],
vm.features['template-description']))
if is_managed_template(vm):
query_res.append(query_local(vm))
# Get latest version for each template
query_res_tmp = []
for name, grp in itertools.groupby(query_res, lambda x: x[0]):
for name, grp in itertools.groupby(sorted(query_res), lambda x: x[0]):
def compare(lhs, rhs):
return lhs if rpm.labelCompare(lhs[1:4], rhs[1:4]) < 0 else rhs
query_res_tmp.append(functools.reduce(compare, grp))
@ -605,14 +614,12 @@ def search(args, app):
search_res = collections.defaultdict(list)
for keyword in args.templates:
#pylint: disable=unused-variable
for idx, (name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description) \
in enumerate(query_res):
needles = [(name, WEIGHT_NAME), (summary, WEIGHT_SUMMARY)]
for idx, entry in enumerate(query_res):
needles = \
[(entry.name, WEIGHT_NAME), (entry.summary, WEIGHT_SUMMARY)]
if args.all:
needles += \
[(description, WEIGHT_DESCRIPTION), (url, WEIGHT_URL)]
needles += [(entry.description, WEIGHT_DESCRIPTION),
(entry.url, WEIGHT_URL)]
for key, weight in needles:
if fnmatch.fnmatch(key, '*' + keyword + '*'):
exact = keyword == key
@ -620,7 +627,6 @@ def search(args, app):
weight = WEIGHT_NAME_EXACT
search_res[idx].append((weight, keyword, exact))
# Requires changes to the qrexec call qubes.TemplateSearch
if not args.all:
keywords = set(args.templates)
idxs = list(search_res.keys())
@ -638,9 +644,6 @@ def search(args, app):
search_res = sorted(search_res.items(), key=key_func)
def gen_header(idx, needles):
#pylint: disable=unused-variable
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = query_res[idx]
fields = []
weight_types = set(x[0] for x in needles)
for weight, field in WEIGHT_TO_FIELD:
@ -659,9 +662,7 @@ def search(args, app):
last_header = cur_header
# XXX: The style is different from that of DNF
print('===', cur_header, '===')
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = query_res[idx]
print(name, ':', summary)
print(query_res[idx].name, ':', query_res[idx].summary)
def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
full_candid = {}
@ -677,40 +678,38 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
query_res = qrexec_repoquery(args, app, PACKAGE_NAME_PREFIX + template)
# We only select one package for each distinct package name
#pylint: disable=unused-variable
for name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description in query_res:
ver = (epoch, version, release)
# TODO: Check local VM is managed by qvm-template
for entry in query_res:
ver = (entry.epoch, entry.version, entry.release)
insert = False
if version_selector == VersionSelector.LATEST:
if name not in candid \
or rpm.labelCompare(candid[name][0], ver) < 0:
candid[name] = (ver, dlsize, reponame)
if entry.name not in candid \
or rpm.labelCompare(candid[entry.name][0], ver) < 0:
insert = True
elif version_selector == VersionSelector.REINSTALL:
if name not in app.domains:
parser.error("Template '%s' not already installed." % name)
vm = app.domains[name]
cur_ver = (
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'])
if entry.name not in app.domains:
parser.error(
"Template '%s' not already installed." % entry.name)
vm = app.domains[entry.name]
cur_ver = query_local_evr(vm)
if rpm.labelCompare(ver, cur_ver) == 0:
candid[name] = (ver, dlsize, reponame)
insert = True
elif version_selector in [VersionSelector.LATEST_LOWER,
VersionSelector.LATEST_HIGHER]:
if name not in app.domains:
parser.error("Template '%s' not already installed." % name)
vm = app.domains[name]
cur_ver = (
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'])
if entry.name not in app.domains:
parser.error(
"Template '%s' not already installed." % entry.name)
vm = app.domains[entry.name]
cur_ver = query_local_evr(vm)
cmp_res = -1 \
if version_selector == VersionSelector.LATEST_LOWER \
else 1
if rpm.labelCompare(ver, cur_ver) == cmp_res:
if name not in candid \
or rpm.labelCompare(candid[name][0], ver) < 0:
candid[name] = (ver, dlsize, reponame)
if entry.name not in candid \
or rpm.labelCompare(candid[entry.name][0], ver) < 0:
insert = True
if insert:
candid[entry.name] = DlEntry(ver, entry.reponame, entry.dlsize)
# XXX: As it's possible to include version information in `template`
# Perhaps the messages can be improved
@ -731,10 +730,10 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
file=sys.stderr)
# Merge & choose the template with the highest version
for name, (ver, dlsize, reponame) in candid.items():
for name, entry in candid.items():
if name not in full_candid \
or rpm.labelCompare(full_candid[name][0], ver) < 0:
full_candid[name] = (ver, dlsize, reponame)
or rpm.labelCompare(full_candid[name].evr, entry.evr) < 0:
full_candid[name] = entry
return candid
@ -744,9 +743,8 @@ def download(args, app, path_override=None,
dl_list = get_dl_list(args, app, version_selector=version_selector)
path = path_override if path_override is not None else args.downloaddir
for name, (ver, dlsize, reponame) in dl_list.items():
_ = reponame # unused
version_str = build_version_str(ver)
for name, entry in dl_list.items():
version_str = build_version_str(entry.evr)
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
target = os.path.join(path, '%s.rpm' % spec)
target_suffix = target + suffix
@ -763,7 +761,8 @@ def download(args, app, path_override=None,
done = False
for attempt in range(args.retries):
try:
qrexec_download(args, app, spec, target_suffix, dlsize)
qrexec_download(args, app, spec, target_suffix,
entry.dlsize)
done = True
break
except ConnectionError: