diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index 6877559..cb65b4d 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -808,183 +808,180 @@ def install( :param override_existing: Whether to override existing packages. Used for reinstall, upgrade, and downgrade operations """ - try: - keys = get_keys(args.keyring) + keys = get_keys(args.keyring) - unverified_rpm_list = [] # rpmfile, reponame - verified_rpm_list = [] - def verify(rpmfile, reponame, dl_dir=None): - """Verify package signature and version, remove "unverified" - suffix, and parse package header.""" - if dl_dir: - path = os.path.join( - dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX) - else: - path = rpmfile + unverified_rpm_list = [] # rpmfile, reponame + verified_rpm_list = [] + def verify(rpmfile, reponame, dl_dir=None): + """Verify package signature and version, remove "unverified" + suffix, and parse package header.""" + if dl_dir: + path = os.path.join( + dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX) + else: + path = rpmfile - package_hdr = verify_rpm(path, keys, args.nogpgcheck) - if not package_hdr: - parser.error('Package \'%s\' verification failed.' % rpmfile) + package_hdr = verify_rpm(path, keys, args.nogpgcheck) + if not package_hdr: + parser.error('Package \'%s\' verification failed.' % rpmfile) - package_name = package_hdr[rpm.RPMTAG_NAME] - if not package_name.startswith(PACKAGE_NAME_PREFIX): + package_name = package_hdr[rpm.RPMTAG_NAME] + if not package_name.startswith(PACKAGE_NAME_PREFIX): + parser.error( + 'Illegal package name for package \'%s\'.' % rpmfile) + # Remove prefix to get the real template name + name = package_name[len(PACKAGE_NAME_PREFIX):] + + if path != rpmfile: + os.rename(path, rpmfile) + + # Check if already installed + if not override_existing and name in app.domains: + print(('Template \'%s\' already installed, skipping...' + ' (You may want to use the' + ' {reinstall,upgrade,downgrade}' + ' operations.)') % name, file=sys.stderr) + return + + # Check if version is really what we want + if override_existing: + vm = get_managed_template_vm(app, name) + pkg_evr = ( + str(package_hdr[rpm.RPMTAG_EPOCHNUM]), + package_hdr[rpm.RPMTAG_VERSION], + package_hdr[rpm.RPMTAG_RELEASE]) + vm_evr = query_local_evr(vm) + cmp_res = rpm.labelCompare(pkg_evr, vm_evr) + if version_selector == VersionSelector.REINSTALL \ + and cmp_res != 0: parser.error( - 'Illegal package name for package \'%s\'.' % rpmfile) - # Remove prefix to get the real template name - name = package_name[len(PACKAGE_NAME_PREFIX):] - - if path != rpmfile: - os.rename(path, rpmfile) - - # Check if already installed - if not override_existing and name in app.domains: - print(('Template \'%s\' already installed, skipping...' - ' (You may want to use the' - ' {reinstall,upgrade,downgrade}' - ' operations.)') % name, file=sys.stderr) + 'Same version of template \'%s\' not found.' \ + % name) + elif version_selector == VersionSelector.LATEST_LOWER \ + and cmp_res != -1: + print(("Template '%s' of lower version" + " already installed, skipping..." % name), + file=sys.stderr) + return + elif version_selector == VersionSelector.LATEST_HIGHER \ + and cmp_res != 1: + print(("Template '%s' of higher version" + " already installed, skipping..." % name), + file=sys.stderr) return - # Check if version is really what we want - if override_existing: - vm = get_managed_template_vm(app, name) - pkg_evr = ( - str(package_hdr[rpm.RPMTAG_EPOCHNUM]), - package_hdr[rpm.RPMTAG_VERSION], - package_hdr[rpm.RPMTAG_RELEASE]) - vm_evr = query_local_evr(vm) - cmp_res = rpm.labelCompare(pkg_evr, vm_evr) - if version_selector == VersionSelector.REINSTALL \ - and cmp_res != 0: - parser.error( - 'Same version of template \'%s\' not found.' \ - % name) - elif version_selector == VersionSelector.LATEST_LOWER \ - and cmp_res != -1: - print(("Template '%s' of lower version" - " already installed, skipping..." % name), - file=sys.stderr) - return - elif version_selector == VersionSelector.LATEST_HIGHER \ - and cmp_res != 1: - print(("Template '%s' of higher version" - " already installed, skipping..." % name), - file=sys.stderr) - return + verified_rpm_list.append((rpmfile, reponame, name, package_hdr)) - verified_rpm_list.append((rpmfile, reponame, name, package_hdr)) + # Process local templates + for template in args.templates: + if template.endswith('.rpm'): + if not os.path.exists(template): + parser.error('RPM file \'%s\' not found.' % template) + unverified_rpm_list.append((template, '@commandline')) - # Process local templates - for template in args.templates: - if template.endswith('.rpm'): - if not os.path.exists(template): - parser.error('RPM file \'%s\' not found.' % template) - unverified_rpm_list.append((template, '@commandline')) + # First verify local RPMs and extract header + for rpmfile, reponame in unverified_rpm_list: + verify(rpmfile, reponame) + unverified_rpm_list = [] - # First verify local RPMs and extract header + os.makedirs(args.cachedir, exist_ok=True) + + # Get list of templates to download + dl_list = get_dl_list(args, app, version_selector=version_selector) + dl_list_copy = dl_list.copy() + for name, entry in dl_list.items(): + # Should be ensured by checks in repoquery + assert entry.reponame != '@commandline' + # Verify that the templates to be downloaded are not yet installed + # Note that we *still* have to do this again in verify() for + # already-downloaded templates + if not override_existing and name in app.domains: + print(('Template \'%s\' already installed, skipping...' + ' (You may want to use the' + ' {reinstall,upgrade,downgrade}' + ' operations.)') % name, file=sys.stderr) + del dl_list_copy[name] + else: + # XXX: Perhaps this is better returned by download() + version_str = build_version_str(entry.evr) + target_file = \ + '%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str) + unverified_rpm_list.append( + (os.path.join(args.cachedir, target_file), entry.reponame)) + dl_list = dl_list_copy + + # Ask the user for confirmation before we actually download stuff + if override_existing and not args.yes: + override_tpls = [] + # Local templates, already verified + for _, _, name, _ in verified_rpm_list: + override_tpls.append(name) + # Templates not yet downloaded + for name in dl_list: + override_tpls.append(name) + + confirm_action( + 'This will override changes made in the following VMs:', + override_tpls) + + with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir: + download(args, app, path_override=dl_dir, + dl_list=dl_list, suffix=UNVERIFIED_SUFFIX, + version_selector=version_selector) + + # Verify downloaded templates for rpmfile, reponame in unverified_rpm_list: - verify(rpmfile, reponame) - unverified_rpm_list = [] + verify(rpmfile, reponame, dl_dir=dl_dir) + unverified_rpm_list = [] - os.makedirs(args.cachedir, exist_ok=True) + # Unpack and install + for rpmfile, reponame, name, package_hdr in verified_rpm_list: + with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target: + print('Installing template \'%s\'...' % name, file=sys.stderr) + if not extract_rpm(name, rpmfile, target): + raise Exception( + 'Failed to extract {} template'.format(name)) + cmdline = [ + 'qvm-template-postprocess', + '--really', + '--no-installed-by-rpm', + ] + if args.allow_pv: + cmdline.append('--allow-pv') + if not override_existing and args.pool: + cmdline += ['--pool', args.pool] + subprocess.check_call(cmdline + [ + 'post-install', + name, + target + PATH_PREFIX + '/' + name]) - # Get list of templates to download - dl_list = get_dl_list(args, app, version_selector=version_selector) - dl_list_copy = dl_list.copy() - for name, entry in dl_list.items(): - # Should be ensured by checks in repoquery - assert entry.reponame != '@commandline' - # Verify that the templates to be downloaded are not yet installed - # Note that we *still* have to do this again in verify() for - # already-downloaded templates - if not override_existing and name in app.domains: - print(('Template \'%s\' already installed, skipping...' - ' (You may want to use the' - ' {reinstall,upgrade,downgrade}' - ' operations.)') % name, file=sys.stderr) - del dl_list_copy[name] - else: - # XXX: Perhaps this is better returned by download() - version_str = build_version_str(entry.evr) - target_file = \ - '%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str) - unverified_rpm_list.append( - (os.path.join(args.cachedir, target_file), entry.reponame)) - dl_list = dl_list_copy + app.domains.refresh_cache(force=True) + tpl = app.domains[name] - # Ask the user for confirmation before we actually download stuff - if override_existing and not args.yes: - override_tpls = [] - # Local templates, already verified - for _, _, name, _ in verified_rpm_list: - override_tpls.append(name) - # Templates not yet downloaded - for name in dl_list: - override_tpls.append(name) - - confirm_action( - 'This will override changes made in the following VMs:', - override_tpls) - - with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir: - download(args, app, path_override=dl_dir, - dl_list=dl_list, suffix=UNVERIFIED_SUFFIX, - version_selector=version_selector) - - # Verify downloaded templates - for rpmfile, reponame in unverified_rpm_list: - verify(rpmfile, reponame, dl_dir=dl_dir) - unverified_rpm_list = [] - - # Unpack and install - for rpmfile, reponame, name, package_hdr in verified_rpm_list: - with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target: - print('Installing template \'%s\'...' % name, file=sys.stderr) - if not extract_rpm(name, rpmfile, target): - raise Exception( - 'Failed to extract {} template'.format(name)) - cmdline = [ - 'qvm-template-postprocess', - '--really', - '--no-installed-by-rpm', - ] - if args.allow_pv: - cmdline.append('--allow-pv') - if not override_existing and args.pool: - cmdline += ['--pool', args.pool] - subprocess.check_call(cmdline + [ - 'post-install', - name, - target + PATH_PREFIX + '/' + name]) - - app.domains.refresh_cache(force=True) - tpl = app.domains[name] - - tpl.features['template-name'] = name - tpl.features['template-epoch'] = \ - package_hdr[rpm.RPMTAG_EPOCHNUM] - tpl.features['template-version'] = \ - package_hdr[rpm.RPMTAG_VERSION] - tpl.features['template-release'] = \ - package_hdr[rpm.RPMTAG_RELEASE] - tpl.features['template-reponame'] = reponame - tpl.features['template-buildtime'] = \ - datetime.datetime.fromtimestamp( - int(package_hdr[rpm.RPMTAG_BUILDTIME]), - tz=datetime.timezone.utc) \ - .strftime(DATE_FMT) - tpl.features['template-installtime'] = \ - datetime.datetime.now( - tz=datetime.timezone.utc).strftime(DATE_FMT) - tpl.features['template-license'] = \ - package_hdr[rpm.RPMTAG_LICENSE] - tpl.features['template-url'] = \ - package_hdr[rpm.RPMTAG_URL] - tpl.features['template-summary'] = \ - package_hdr[rpm.RPMTAG_SUMMARY] - tpl.features['template-description'] = \ - package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|') - finally: - pass + tpl.features['template-name'] = name + tpl.features['template-epoch'] = \ + package_hdr[rpm.RPMTAG_EPOCHNUM] + tpl.features['template-version'] = \ + package_hdr[rpm.RPMTAG_VERSION] + tpl.features['template-release'] = \ + package_hdr[rpm.RPMTAG_RELEASE] + tpl.features['template-reponame'] = reponame + tpl.features['template-buildtime'] = \ + datetime.datetime.fromtimestamp( + int(package_hdr[rpm.RPMTAG_BUILDTIME]), + tz=datetime.timezone.utc) \ + .strftime(DATE_FMT) + tpl.features['template-installtime'] = \ + datetime.datetime.now( + tz=datetime.timezone.utc).strftime(DATE_FMT) + tpl.features['template-license'] = \ + package_hdr[rpm.RPMTAG_LICENSE] + tpl.features['template-url'] = \ + package_hdr[rpm.RPMTAG_URL] + tpl.features['template-summary'] = \ + package_hdr[rpm.RPMTAG_SUMMARY] + tpl.features['template-description'] = \ + package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|') def list_templates(args: argparse.Namespace, app: qubesadmin.app.QubesBase, operation: str) -> None: