qvm-template: cleanup install function

Remove now unused try/finally in install() and reduce indentation.

No functional change.
This commit is contained in:
Marek Marczykowski-Górecki 2021-01-30 05:39:59 +01:00
parent f1424812b0
commit fe369ce523
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -808,183 +808,180 @@ def install(
:param override_existing: Whether to override existing packages. Used for :param override_existing: Whether to override existing packages. Used for
reinstall, upgrade, and downgrade operations reinstall, upgrade, and downgrade operations
""" """
try: keys = get_keys(args.keyring)
keys = get_keys(args.keyring)
unverified_rpm_list = [] # rpmfile, reponame unverified_rpm_list = [] # rpmfile, reponame
verified_rpm_list = [] verified_rpm_list = []
def verify(rpmfile, reponame, dl_dir=None): def verify(rpmfile, reponame, dl_dir=None):
"""Verify package signature and version, remove "unverified" """Verify package signature and version, remove "unverified"
suffix, and parse package header.""" suffix, and parse package header."""
if dl_dir: if dl_dir:
path = os.path.join( path = os.path.join(
dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX) dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX)
else: else:
path = rpmfile path = rpmfile
package_hdr = verify_rpm(path, keys, args.nogpgcheck) package_hdr = verify_rpm(path, keys, args.nogpgcheck)
if not package_hdr: if not package_hdr:
parser.error('Package \'%s\' verification failed.' % rpmfile) parser.error('Package \'%s\' verification failed.' % rpmfile)
package_name = package_hdr[rpm.RPMTAG_NAME] package_name = package_hdr[rpm.RPMTAG_NAME]
if not package_name.startswith(PACKAGE_NAME_PREFIX): if not package_name.startswith(PACKAGE_NAME_PREFIX):
parser.error(
'Illegal package name for package \'%s\'.' % rpmfile)
# Remove prefix to get the real template name
name = package_name[len(PACKAGE_NAME_PREFIX):]
if path != rpmfile:
os.rename(path, rpmfile)
# Check if already installed
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the'
' {reinstall,upgrade,downgrade}'
' operations.)') % name, file=sys.stderr)
return
# Check if version is really what we want
if override_existing:
vm = get_managed_template_vm(app, name)
pkg_evr = (
str(package_hdr[rpm.RPMTAG_EPOCHNUM]),
package_hdr[rpm.RPMTAG_VERSION],
package_hdr[rpm.RPMTAG_RELEASE])
vm_evr = query_local_evr(vm)
cmp_res = rpm.labelCompare(pkg_evr, vm_evr)
if version_selector == VersionSelector.REINSTALL \
and cmp_res != 0:
parser.error( parser.error(
'Illegal package name for package \'%s\'.' % rpmfile) 'Same version of template \'%s\' not found.' \
# Remove prefix to get the real template name % name)
name = package_name[len(PACKAGE_NAME_PREFIX):] elif version_selector == VersionSelector.LATEST_LOWER \
and cmp_res != -1:
if path != rpmfile: print(("Template '%s' of lower version"
os.rename(path, rpmfile) " already installed, skipping..." % name),
file=sys.stderr)
# Check if already installed return
if not override_existing and name in app.domains: elif version_selector == VersionSelector.LATEST_HIGHER \
print(('Template \'%s\' already installed, skipping...' and cmp_res != 1:
' (You may want to use the' print(("Template '%s' of higher version"
' {reinstall,upgrade,downgrade}' " already installed, skipping..." % name),
' operations.)') % name, file=sys.stderr) file=sys.stderr)
return return
# Check if version is really what we want verified_rpm_list.append((rpmfile, reponame, name, package_hdr))
if override_existing:
vm = get_managed_template_vm(app, name)
pkg_evr = (
str(package_hdr[rpm.RPMTAG_EPOCHNUM]),
package_hdr[rpm.RPMTAG_VERSION],
package_hdr[rpm.RPMTAG_RELEASE])
vm_evr = query_local_evr(vm)
cmp_res = rpm.labelCompare(pkg_evr, vm_evr)
if version_selector == VersionSelector.REINSTALL \
and cmp_res != 0:
parser.error(
'Same version of template \'%s\' not found.' \
% name)
elif version_selector == VersionSelector.LATEST_LOWER \
and cmp_res != -1:
print(("Template '%s' of lower version"
" already installed, skipping..." % name),
file=sys.stderr)
return
elif version_selector == VersionSelector.LATEST_HIGHER \
and cmp_res != 1:
print(("Template '%s' of higher version"
" already installed, skipping..." % name),
file=sys.stderr)
return
verified_rpm_list.append((rpmfile, reponame, name, package_hdr)) # Process local templates
for template in args.templates:
if template.endswith('.rpm'):
if not os.path.exists(template):
parser.error('RPM file \'%s\' not found.' % template)
unverified_rpm_list.append((template, '@commandline'))
# Process local templates # First verify local RPMs and extract header
for template in args.templates: for rpmfile, reponame in unverified_rpm_list:
if template.endswith('.rpm'): verify(rpmfile, reponame)
if not os.path.exists(template): unverified_rpm_list = []
parser.error('RPM file \'%s\' not found.' % template)
unverified_rpm_list.append((template, '@commandline'))
# First verify local RPMs and extract header os.makedirs(args.cachedir, exist_ok=True)
# Get list of templates to download
dl_list = get_dl_list(args, app, version_selector=version_selector)
dl_list_copy = dl_list.copy()
for name, entry in dl_list.items():
# Should be ensured by checks in repoquery
assert entry.reponame != '@commandline'
# Verify that the templates to be downloaded are not yet installed
# Note that we *still* have to do this again in verify() for
# already-downloaded templates
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the'
' {reinstall,upgrade,downgrade}'
' operations.)') % name, file=sys.stderr)
del dl_list_copy[name]
else:
# XXX: Perhaps this is better returned by download()
version_str = build_version_str(entry.evr)
target_file = \
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
unverified_rpm_list.append(
(os.path.join(args.cachedir, target_file), entry.reponame))
dl_list = dl_list_copy
# Ask the user for confirmation before we actually download stuff
if override_existing and not args.yes:
override_tpls = []
# Local templates, already verified
for _, _, name, _ in verified_rpm_list:
override_tpls.append(name)
# Templates not yet downloaded
for name in dl_list:
override_tpls.append(name)
confirm_action(
'This will override changes made in the following VMs:',
override_tpls)
with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir:
download(args, app, path_override=dl_dir,
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
version_selector=version_selector)
# Verify downloaded templates
for rpmfile, reponame in unverified_rpm_list: for rpmfile, reponame in unverified_rpm_list:
verify(rpmfile, reponame) verify(rpmfile, reponame, dl_dir=dl_dir)
unverified_rpm_list = [] unverified_rpm_list = []
os.makedirs(args.cachedir, exist_ok=True) # Unpack and install
for rpmfile, reponame, name, package_hdr in verified_rpm_list:
with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target:
print('Installing template \'%s\'...' % name, file=sys.stderr)
if not extract_rpm(name, rpmfile, target):
raise Exception(
'Failed to extract {} template'.format(name))
cmdline = [
'qvm-template-postprocess',
'--really',
'--no-installed-by-rpm',
]
if args.allow_pv:
cmdline.append('--allow-pv')
if not override_existing and args.pool:
cmdline += ['--pool', args.pool]
subprocess.check_call(cmdline + [
'post-install',
name,
target + PATH_PREFIX + '/' + name])
# Get list of templates to download app.domains.refresh_cache(force=True)
dl_list = get_dl_list(args, app, version_selector=version_selector) tpl = app.domains[name]
dl_list_copy = dl_list.copy()
for name, entry in dl_list.items():
# Should be ensured by checks in repoquery
assert entry.reponame != '@commandline'
# Verify that the templates to be downloaded are not yet installed
# Note that we *still* have to do this again in verify() for
# already-downloaded templates
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the'
' {reinstall,upgrade,downgrade}'
' operations.)') % name, file=sys.stderr)
del dl_list_copy[name]
else:
# XXX: Perhaps this is better returned by download()
version_str = build_version_str(entry.evr)
target_file = \
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
unverified_rpm_list.append(
(os.path.join(args.cachedir, target_file), entry.reponame))
dl_list = dl_list_copy
# Ask the user for confirmation before we actually download stuff tpl.features['template-name'] = name
if override_existing and not args.yes: tpl.features['template-epoch'] = \
override_tpls = [] package_hdr[rpm.RPMTAG_EPOCHNUM]
# Local templates, already verified tpl.features['template-version'] = \
for _, _, name, _ in verified_rpm_list: package_hdr[rpm.RPMTAG_VERSION]
override_tpls.append(name) tpl.features['template-release'] = \
# Templates not yet downloaded package_hdr[rpm.RPMTAG_RELEASE]
for name in dl_list: tpl.features['template-reponame'] = reponame
override_tpls.append(name) tpl.features['template-buildtime'] = \
datetime.datetime.fromtimestamp(
confirm_action( int(package_hdr[rpm.RPMTAG_BUILDTIME]),
'This will override changes made in the following VMs:', tz=datetime.timezone.utc) \
override_tpls) .strftime(DATE_FMT)
tpl.features['template-installtime'] = \
with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir: datetime.datetime.now(
download(args, app, path_override=dl_dir, tz=datetime.timezone.utc).strftime(DATE_FMT)
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX, tpl.features['template-license'] = \
version_selector=version_selector) package_hdr[rpm.RPMTAG_LICENSE]
tpl.features['template-url'] = \
# Verify downloaded templates package_hdr[rpm.RPMTAG_URL]
for rpmfile, reponame in unverified_rpm_list: tpl.features['template-summary'] = \
verify(rpmfile, reponame, dl_dir=dl_dir) package_hdr[rpm.RPMTAG_SUMMARY]
unverified_rpm_list = [] tpl.features['template-description'] = \
package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|')
# Unpack and install
for rpmfile, reponame, name, package_hdr in verified_rpm_list:
with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target:
print('Installing template \'%s\'...' % name, file=sys.stderr)
if not extract_rpm(name, rpmfile, target):
raise Exception(
'Failed to extract {} template'.format(name))
cmdline = [
'qvm-template-postprocess',
'--really',
'--no-installed-by-rpm',
]
if args.allow_pv:
cmdline.append('--allow-pv')
if not override_existing and args.pool:
cmdline += ['--pool', args.pool]
subprocess.check_call(cmdline + [
'post-install',
name,
target + PATH_PREFIX + '/' + name])
app.domains.refresh_cache(force=True)
tpl = app.domains[name]
tpl.features['template-name'] = name
tpl.features['template-epoch'] = \
package_hdr[rpm.RPMTAG_EPOCHNUM]
tpl.features['template-version'] = \
package_hdr[rpm.RPMTAG_VERSION]
tpl.features['template-release'] = \
package_hdr[rpm.RPMTAG_RELEASE]
tpl.features['template-reponame'] = reponame
tpl.features['template-buildtime'] = \
datetime.datetime.fromtimestamp(
int(package_hdr[rpm.RPMTAG_BUILDTIME]),
tz=datetime.timezone.utc) \
.strftime(DATE_FMT)
tpl.features['template-installtime'] = \
datetime.datetime.now(
tz=datetime.timezone.utc).strftime(DATE_FMT)
tpl.features['template-license'] = \
package_hdr[rpm.RPMTAG_LICENSE]
tpl.features['template-url'] = \
package_hdr[rpm.RPMTAG_URL]
tpl.features['template-summary'] = \
package_hdr[rpm.RPMTAG_SUMMARY]
tpl.features['template-description'] = \
package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|')
finally:
pass
def list_templates(args: argparse.Namespace, def list_templates(args: argparse.Namespace,
app: qubesadmin.app.QubesBase, operation: str) -> None: app: qubesadmin.app.QubesBase, operation: str) -> None: