core-admin-client/qubesadmin/tools/qvm_template.py

942 lines
37 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import argparse
import collections
import datetime
import enum
import fnmatch
import functools
import itertools
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import typing
import qubesadmin
import qubesadmin.tools
import rpm
2020-07-11 15:42:58 +02:00
import tqdm
import xdg.BaseDirectory
PATH_PREFIX = '/var/lib/qubes/vm-templates'
TEMP_DIR = '/var/tmp'
PACKAGE_NAME_PREFIX = 'qubes-template-'
CACHE_DIR = os.path.join(xdg.BaseDirectory.xdg_cache_home, 'qvm-template')
2020-07-11 16:31:38 +02:00
UNVERIFIED_SUFFIX = '.unverified'
LOCK_FILE = '/var/tmp/qvm-template.lck'
def qubes_release() -> str:
"""Return the Qubes release."""
2020-07-08 20:38:36 +02:00
if os.path.exists('/usr/share/qubes/marker-vm'):
2020-07-11 17:12:57 +02:00
with open('/usr/share/qubes/marker-vm', 'r') as fd:
2020-07-08 20:38:36 +02:00
# Get last line (in the format `x.x`)
2020-07-11 17:12:57 +02:00
return fd.readlines()[-1].strip()
2020-07-08 20:38:36 +02:00
return subprocess.check_output(['lsb_release', '-sr'],
encoding='UTF-8').strip()
def parser_gen() -> argparse.ArgumentParser:
"""Generate argument parser for the application."""
formatter = argparse.ArgumentDefaultsHelpFormatter
parser_main = argparse.ArgumentParser(description='Qubes Template Manager',
formatter_class=formatter)
subparsers = parser_main.add_subparsers(dest='operation', required=True,
description='Command to run.')
def parser_add_command(cmd, help_str, add_help=True):
return subparsers.add_parser(cmd, formatter_class=formatter,
help=help_str, description=help_str, add_help=add_help)
# qrexec/DNF related
parser_main.add_argument('--repo-files', action='append',
default=['/usr/share/qubes/repo-templates/qubes-templates.repo'],
help='Specify files containing DNF repository configuration.')
parser_main.add_argument('--updatevm', default='sys-firewall',
help='Specify VM to download updates from.')
parser_main.add_argument('--enablerepo', action='append', default=[],
help='Enable additional repositories.')
parser_main.add_argument('--disablerepo', action='append', default=[],
help='Disable certain repositories.')
parser_main.add_argument('--repoid', action='append', default=[],
help='Enable just specific repositories.')
parser_main.add_argument('--releasever', default=qubes_release(),
help='Override distro release version.')
parser_main.add_argument('--refresh', action='store_true',
help='Set repository metadata as expired before running the command.')
parser_main.add_argument('--cachedir', default=CACHE_DIR,
help='Specify cache directory.')
# qvm-template download
parser_download = parser_add_command('download',
help_str='Download template package.')
parser_download.add_argument('--downloaddir', default='.',
help='Specify download directory.')
parser_download.add_argument('--retries', default=5, type=int,
help='Specify number of retries for downloads.')
parser_download.add_argument('templates', nargs='*', metavar='TEMPLATE')
# qvm-template {install,reinstall,downgrade,upgrade}
parser_install = parser_add_command('install',
help_str='Install template packages.')
parser_install.add_argument('--pool',
help='Specify pool to store created VMs in.')
parser_reinstall = parser_add_command('reinstall',
help_str='Reinstall template packages.')
parser_downgrade = parser_add_command('downgrade',
help_str='Downgrade template packages.')
parser_upgrade = parser_add_command('upgrade',
help_str='Upgrade template packages.')
for parser_x in [parser_install, parser_reinstall,
parser_downgrade, parser_upgrade]:
parser_x.add_argument('--nogpgcheck', action='store_true',
help='Disable signature checks.')
parser_x.add_argument('--allow-pv', action='store_true',
help='Allow setting virt_mode to pv in configuration file.')
parser_x.add_argument('templates', nargs='*', metavar='TEMPLATE')
# qvm-template {list,info}
parser_list = parser_add_command('list',
help_str='List templates.')
parser_info = parser_add_command('info',
help_str='Display details about templates.')
for parser_x in [parser_list, parser_info]:
parser_x.add_argument('--all', action='store_true',
help='Show all templates (default).')
parser_x.add_argument('--installed', action='store_true',
help='Show installed templates.')
parser_x.add_argument('--available', action='store_true',
help='Show available templates.')
parser_x.add_argument('--extras', action='store_true',
help=('Show extras (e.g., ones that exists'
' locally but not in repos) templates.'))
parser_x.add_argument('--upgrades', action='store_true',
help='Show upgradable templates.')
parser_x.add_argument('templates', nargs='*', metavar='TEMPLATE')
# qvm-template search
parser_search = parser_add_command('search',
help_str='Search template details for the given string.')
parser_search.add_argument('--all', action='store_true',
help=('Search also in template description and URL. In addition,'
' the criterion are evaluated with OR instead of AND.'))
parser_search.add_argument('templates', nargs='*', metavar='PATTERN')
# qvm-template remove
parser_remove = parser_add_command('remove',
help_str='Remove installed templates.',
add_help=False) # Forward --help to qvm-remove
_ = parser_remove # unused
# qvm-template clean
parser_clean = parser_add_command('clean',
help_str='Remove cached data.')
_ = parser_clean # unused
return parser_main
parser = parser_gen()
class TemplateState(enum.Enum):
"""Enum representing the state of a template."""
INSTALLED = 'installed'
AVAILABLE = 'available'
EXTRA = 'extra'
UPGRADABLE = 'upgradable'
def title(self) -> str:
"""Return a long description of the state. Can be used as headings."""
#pylint: disable=invalid-name
TEMPLATE_TITLES = {
TemplateState.INSTALLED: 'Installed Templates',
TemplateState.AVAILABLE: 'Available Templates',
TemplateState.EXTRA: 'Extra Templates',
TemplateState.UPGRADABLE: 'Available Upgrades'
}
return TEMPLATE_TITLES[self]
class VersionSelector(enum.Enum):
"""Enum representing how the candidate template version is chosen."""
LATEST = enum.auto()
REINSTALL = enum.auto()
LATEST_LOWER = enum.auto()
LATEST_HIGHER = enum.auto()
# TODO: Docstrings and type hints for Template and DlEntry
Template = collections.namedtuple('Template', [
'name',
'epoch',
'version',
'release',
'reponame',
'dlsize',
'buildtime',
'licence',
'url',
'summary',
'description'
])
DlEntry = collections.namedtuple('DlEntry', [
'evr',
'reponame',
'dlsize'
])
def build_version_str(evr: typing.Tuple[str, str, str]) -> str:
"""Return version string described by ``evr`` in (epoch, version, release)
format."""
2020-07-31 20:40:27 +02:00
return '%s:%s-%s' % evr
def is_match_spec(name: str, epoch: str, version: str, release: str, spec: str
) -> typing.Tuple[bool, float]:
"""Check whether (name, epoch, version, release) matches the spec string.
For the algorithm, refer to section "NEVRA Matching" in the DNF
documentation.
NOTE: Currently ``arch`` is ignored as the templates should be of
``noarch``.
:return: the first element indicates whether there is a match; the second
element represents the priority of the match (lower is better).
"""
2020-07-31 20:40:27 +02:00
if epoch != 0:
targets = [
f'{name}-{epoch}:{version}-{release}',
f'{name}',
f'{name}-{epoch}:{version}'
]
else:
targets = [
f'{name}-{epoch}:{version}-{release}',
f'{name}-{version}-{release}',
f'{name}',
f'{name}-{epoch}:{version}',
f'{name}-{version}'
]
for prio, target in enumerate(targets):
if fnmatch.fnmatch(target, spec):
return True, prio
return False, float('inf')
def query_local(vm: qubesadmin.vm.QubesVM) -> Template:
"""Return Template object associated with ``vm``.
Requires the VM to be managed by qvm-template.
"""
return Template(
vm.features['template-name'],
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'],
vm.features['template-reponame'],
vm.get_disk_utilization(),
vm.features['template-buildtime'],
vm.features['template-license'],
vm.features['template-url'],
vm.features['template-summary'],
vm.features['template-description'].replace('|', '\n'))
def query_local_evr(vm: qubesadmin.vm.QubesVM) -> typing.Tuple[str, str, str]:
"""Return the (epoch, version, release) of ``vm``.
Requires the VM to be managed by qvm-template.
"""
return (
vm.features['template-epoch'],
vm.features['template-version'],
vm.features['template-release'])
def is_managed_template(vm: qubesadmin.vm.QubesVM) -> bool:
"""Return whether the VM is managed by qvm-template."""
return vm.features.get('template-name', None) == vm.name
def get_managed_template_vm(app: qubesadmin.app.QubesBase, name: str
) -> qubesadmin.vm.QubesVM:
"""Return the QubesVM object associated with the given name if it exists
and is managed by qvm-template, otherwise raise a parser error."""
if name not in app.domains:
parser.error("Template '%s' not already installed." % name)
vm = app.domains[name]
if not is_managed_template(vm):
parser.error("Template '%s' is not managed by qvm-template." % name)
return vm
def qrexec_popen(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
service: str,
stdout: int = subprocess.PIPE,
filter_esc: bool = True) -> subprocess.Popen:
"""Return Popen object that communicates with the given qrexec call.
Note that this falls back to invoking /etc/qubes-rpc/* directly if
args.updatevm is None.
:param args: arguments received by the application
:param app: Qubes application object
:param service: the qrexec call to invoke
:param stdout: Where the process stdout points to. This is passed directly
to subprocess.Popen. Defaults to subprocess.PIPE.
Note that stderr is always set to subprocess.PIPE.
:param filter_esc: whether to filter out escape sequences from
stdout/stderr. Defaults to True.
:returns: Popen object that communicates with the given qrexec call
"""
if args.updatevm:
return app.domains[args.updatevm].run_service(
service,
filter_esc=filter_esc,
stdout=stdout)
2020-07-11 17:12:57 +02:00
return subprocess.Popen([
'/etc/qubes-rpc/%s' % service,
],
stdin=subprocess.PIPE,
stdout=stdout,
stderr=subprocess.PIPE)
def qrexec_payload(args, app, spec, refresh):
2020-07-11 17:12:57 +02:00
_ = app # unused
if spec == '---':
parser.error("Malformed template name: argument should not be '---'.")
def check_newline(string, name):
if '\n' in string:
parser.error(f"Malformed {name}:" +
" argument should not contain '\\n'.")
payload = ''
for repo in args.enablerepo:
2020-07-11 17:12:57 +02:00
check_newline(repo, '--enablerepo')
payload += '--enablerepo=%s\n' % repo
for repo in args.disablerepo:
2020-07-11 17:12:57 +02:00
check_newline(repo, '--disablerepo')
payload += '--disablerepo=%s\n' % repo
for repo in args.repoid:
2020-07-11 17:12:57 +02:00
check_newline(repo, '--repoid')
payload += '--repoid=%s\n' % repo
if refresh:
payload += '--refresh\n'
check_newline(args.releasever, '--releasever')
payload += '--releasever=%s\n' % args.releasever
check_newline(spec, 'template name')
payload += spec + '\n'
payload += '---\n'
2020-07-11 17:12:57 +02:00
for path in args.repo_files:
with open(path, 'r') as fd:
payload += fd.read() + '\n'
return payload
def qrexec_repoquery(args, app, spec='*', refresh=False):
payload = qrexec_payload(args, app, spec, refresh)
proc = qrexec_popen(args, app, 'qubes.TemplateSearch')
stdout, stderr = proc.communicate(payload.encode('UTF-8'))
stdout = stdout.decode('ASCII')
if proc.wait() != 0:
for line in stderr.decode('ASCII').rstrip().split('\n'):
print('[Qrexec] %s' % line, file=sys.stderr)
raise ConnectionError("qrexec call 'qubes.TemplateSearch' failed.")
name_re = re.compile(r'^[A-Za-z0-9._+\-]*$')
evr_re = re.compile(r'^[A-Za-z0-9._+~]*$')
date_re = re.compile(r'^\d+-\d+-\d+ \d+:\d+$')
licence_re = re.compile(r'^[A-Za-z0-9._+\-()]*$')
result = []
for line in stdout.split('|\n'):
# Note that there's an empty entry at the end as .strip() is not used.
# This is because if .strip() is used, the .split() will not work.
if line == '':
continue
entry = line.split('|')
try:
# If there is an incorrect number of entries, raise an error
# Unpack manually instead of stuffing into `Template` right away
# so that it's easier to mutate stuff.
name, epoch, version, release, reponame, dlsize, \
buildtime, licence, url, summary, description = entry
# Ignore packages that are not templates
if not name.startswith(PACKAGE_NAME_PREFIX):
continue
name = name[len(PACKAGE_NAME_PREFIX):]
# Check that the values make sense
if not re.fullmatch(name_re, name):
raise ValueError
for val in [epoch, version, release]:
if not re.fullmatch(evr_re, val):
raise ValueError
if not re.fullmatch(name_re, reponame):
raise ValueError
dlsize = int(dlsize)
# First verify that the date does not look weird, then parse it
if not re.fullmatch(date_re, buildtime):
raise ValueError
buildtime = datetime.datetime.strptime(buildtime, '%Y-%m-%d %H:%M')
# XXX: Perhaps whitelist licenses directly?
if not re.fullmatch(licence_re, licence):
raise ValueError
# Check name actually matches spec
if not is_match_spec(name, epoch, version, release, spec):
continue
2020-07-31 20:40:27 +02:00
result.append(Template(name, epoch, version, release, reponame,
dlsize, buildtime, licence, url, summary, description))
except (TypeError, ValueError):
raise ConnectionError(("qrexec call 'qubes.TemplateSearch' failed:"
" unexpected data format."))
return result
def qrexec_download(args, app, spec, path, dlsize=None, refresh=False):
with open(path, 'wb') as fd:
payload = qrexec_payload(args, app, spec, refresh)
2020-07-31 20:40:27 +02:00
# Don't filter ESCs for binary files
proc = qrexec_popen(args, app, 'qubes.TemplateDownload',
stdout=fd, filter_esc=False)
proc.stdin.write(payload.encode('UTF-8'))
proc.stdin.close()
with tqdm.tqdm(desc=spec, total=dlsize, unit_scale=True,
unit_divisor=1000, unit='B') as pbar:
last = 0
while proc.poll() is None:
cur = fd.tell()
pbar.update(cur - last)
last = cur
time.sleep(0.1)
if proc.wait() != 0:
raise ConnectionError(
"qrexec call 'qubes.TemplateDownload' failed.")
return True
def verify_rpm(path, nogpgcheck=False, transaction_set=None):
# NOTE: Verifying RPMs this way is prone to TOCTOU. This is okay for local
# files, but may create problems if multiple instances of `qvm-template`
# are downloading the same file, so a lock is needed in that case.
if transaction_set is None:
transaction_set = rpm.TransactionSet()
with open(path, 'rb') as fd:
try:
hdr = transaction_set.hdrFromFdno(fd)
if hdr[rpm.RPMTAG_SIGSIZE] is None \
and hdr[rpm.RPMTAG_SIGPGP] is None \
and hdr[rpm.RPMTAG_SIGGPG] is None:
return nogpgcheck
except rpm.error as e:
if str(e) == 'public key not trusted' \
or str(e) == 'public key not available':
return nogpgcheck
return False
return True
def get_package_hdr(path, transaction_set=None):
if transaction_set is None:
transaction_set = rpm.TransactionSet()
with open(path, 'rb') as fd:
hdr = transaction_set.hdrFromFdno(fd)
return hdr
def extract_rpm(name, path, target):
rpm2cpio = subprocess.Popen(['rpm2cpio', path], stdout=subprocess.PIPE)
# `-D` is GNUism
cpio = subprocess.Popen([
'cpio',
'-idm',
'-D',
target,
'.%s/%s/*' % (PATH_PREFIX, name)
], stdin=rpm2cpio.stdout, stdout=subprocess.DEVNULL)
return rpm2cpio.wait() == 0 and cpio.wait() == 0
def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
full_candid = {}
for template in args.templates:
# This will be merged into `full_candid` later.
# It is separated so that we can check whether it is empty.
candid = {}
# Skip local RPMs
if template.endswith('.rpm'):
continue
query_res = qrexec_repoquery(args, app, PACKAGE_NAME_PREFIX + template)
# We only select one package for each distinct package name
for entry in query_res:
ver = (entry.epoch, entry.version, entry.release)
insert = False
if version_selector == VersionSelector.LATEST:
if entry.name not in candid \
or rpm.labelCompare(candid[entry.name][0], ver) < 0:
insert = True
elif version_selector == VersionSelector.REINSTALL:
vm = get_managed_template_vm(app, entry.name)
2020-07-31 20:40:27 +02:00
cur_ver = query_local_evr(vm)
if rpm.labelCompare(ver, cur_ver) == 0:
insert = True
elif version_selector in [VersionSelector.LATEST_LOWER,
VersionSelector.LATEST_HIGHER]:
vm = get_managed_template_vm(app, entry.name)
2020-07-31 20:40:27 +02:00
cur_ver = query_local_evr(vm)
cmp_res = -1 \
if version_selector == VersionSelector.LATEST_LOWER \
else 1
if rpm.labelCompare(ver, cur_ver) == cmp_res:
if entry.name not in candid \
or rpm.labelCompare(candid[entry.name][0], ver) < 0:
insert = True
if insert:
candid[entry.name] = DlEntry(ver, entry.reponame, entry.dlsize)
# XXX: As it's possible to include version information in `template`,
# perhaps the messages can be improved
2020-07-31 20:40:27 +02:00
if len(candid) == 0:
if version_selector == VersionSelector.LATEST:
parser.error('Template \'%s\' not found.' % template)
elif version_selector == VersionSelector.REINSTALL:
parser.error('Same version of template \'%s\' not found.' \
% template)
# Copy behavior of DNF and do nothing if version not found
elif version_selector == VersionSelector.LATEST_LOWER:
print(("Template '%s' of lowest version"
" already installed, skipping..." % template),
file=sys.stderr)
elif version_selector == VersionSelector.LATEST_HIGHER:
print(("Template '%s' of highest version"
" already installed, skipping..." % template),
file=sys.stderr)
# Merge & choose the template with the highest version
for name, entry in candid.items():
if name not in full_candid \
or rpm.labelCompare(full_candid[name].evr, entry.evr) < 0:
full_candid[name] = entry
2020-07-31 21:06:04 +02:00
return full_candid
2020-07-31 20:40:27 +02:00
def download(args, app, path_override=None,
dl_list=None, suffix='', version_selector=VersionSelector.LATEST):
if dl_list is None:
dl_list = get_dl_list(args, app, version_selector=version_selector)
path = path_override if path_override is not None else args.downloaddir
for name, entry in dl_list.items():
version_str = build_version_str(entry.evr)
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
target = os.path.join(path, '%s.rpm' % spec)
target_suffix = target + suffix
if suffix != '' and os.path.exists(target_suffix):
print('\'%s\' already exists, skipping...' % target,
file=sys.stderr)
if os.path.exists(target):
print('\'%s\' already exists, skipping...' % target,
file=sys.stderr)
if suffix != '':
os.rename(target, target_suffix)
else:
print('Downloading \'%s\'...' % spec, file=sys.stderr)
done = False
for attempt in range(args.retries):
try:
qrexec_download(args, app, spec, target_suffix,
entry.dlsize)
done = True
break
except ConnectionError:
os.remove(target_suffix)
if attempt + 1 < args.retries:
print('\'%s\' download failed, retrying...' % spec,
file=sys.stderr)
except:
# Also remove file if interrupted by other means
os.remove(target_suffix)
raise
if not done:
print('\'%s\' download failed.' % spec, file=sys.stderr)
sys.exit(1)
def install(args, app, version_selector=VersionSelector.LATEST,
override_existing=False):
try:
with open(LOCK_FILE, 'x') as _:
pass
except FileExistsError:
parser.error(('%s already exists.'
' Perhaps another instance of qvm-template is running?')
% LOCK_FILE)
try:
transaction_set = rpm.TransactionSet()
2020-07-31 20:56:59 +02:00
rpm_list = [] # rpmfile, reponame
2020-07-31 20:40:27 +02:00
for template in args.templates:
if template.endswith('.rpm'):
if not os.path.exists(template):
parser.error('RPM file \'%s\' not found.' % template)
2020-07-31 20:56:59 +02:00
rpm_list.append((template, '@commandline'))
2020-07-31 20:40:27 +02:00
os.makedirs(args.cachedir, exist_ok=True)
dl_list = get_dl_list(args, app, version_selector=version_selector)
dl_list_copy = dl_list.copy()
# Verify that the templates are not yet installed
for name, entry in dl_list.items():
# Should be ensured by checks in repoquery
assert entry.reponame != '@commandline'
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the'
' {reinstall,upgrade,downgrade}'
2020-07-31 20:40:27 +02:00
' operations.)') % name, file=sys.stderr)
del dl_list_copy[name]
else:
version_str = build_version_str(entry.evr)
target_file = \
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
2020-07-31 20:56:59 +02:00
rpm_list.append(
(os.path.join(args.cachedir, target_file), entry.reponame))
2020-07-31 20:40:27 +02:00
dl_list = dl_list_copy
download(args, app, path_override=args.cachedir,
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
version_selector=version_selector)
# Verify package and remove unverified suffix
2020-07-31 20:56:59 +02:00
for rpmfile, reponame in rpm_list:
2020-07-31 20:40:27 +02:00
if reponame != '@commandline':
path = rpmfile + UNVERIFIED_SUFFIX
else:
path = rpmfile
if not verify_rpm(path, args.nogpgcheck, transaction_set):
parser.error('Package \'%s\' verification failed.' % rpmfile)
if reponame != '@commandline':
os.rename(path, rpmfile)
# Unpack and install
2020-07-31 20:56:59 +02:00
for rpmfile, reponame in rpm_list:
2020-07-31 20:40:27 +02:00
with tempfile.TemporaryDirectory(dir=TEMP_DIR) as target:
package_hdr = get_package_hdr(rpmfile)
package_name = package_hdr[rpm.RPMTAG_NAME]
if not package_name.startswith(PACKAGE_NAME_PREFIX):
parser.error(
'Illegal package name for package \'%s\'.' % rpmfile)
# Remove prefix to get the real template name
name = package_name[len(PACKAGE_NAME_PREFIX):]
2020-07-31 20:40:27 +02:00
# Another check for already-downloaded RPMs
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
' (You may want to use the'
' {reinstall,upgrade,downgrade}'
' operations.)') % name, file=sys.stderr)
continue
2020-07-31 20:56:59 +02:00
2020-07-31 20:40:27 +02:00
# Check if local versus candidate version is in line with the
# operation
2020-07-31 20:56:59 +02:00
if override_existing:
vm = get_managed_template_vm(app, name)
2020-07-31 20:40:27 +02:00
pkg_evr = (
str(package_hdr[rpm.RPMTAG_EPOCHNUM]),
package_hdr[rpm.RPMTAG_VERSION],
package_hdr[rpm.RPMTAG_RELEASE])
vm_evr = query_local_evr(vm)
cmp_res = rpm.labelCompare(pkg_evr, vm_evr)
if version_selector == VersionSelector.REINSTALL \
and cmp_res != 0:
parser.error(
'Same version of template \'%s\' not found.' \
% name)
elif version_selector == VersionSelector.LATEST_LOWER \
and cmp_res != -1:
print(("Template '%s' of lower version"
" already installed, skipping..." % name),
file=sys.stderr)
continue
elif version_selector == VersionSelector.LATEST_HIGHER \
and cmp_res != 1:
print(("Template '%s' of higher version"
" already installed, skipping..." % name),
file=sys.stderr)
continue
2020-07-31 20:40:27 +02:00
print('Installing template \'%s\'...' % name, file=sys.stderr)
extract_rpm(name, rpmfile, target)
cmdline = [
'qvm-template-postprocess',
'--really',
'--no-installed-by-rpm',
]
if args.allow_pv:
cmdline.append('--allow-pv')
if args.pool:
cmdline += ['--pool', args.pool]
subprocess.check_call(cmdline + [
'post-install',
name,
target + PATH_PREFIX + '/' + name])
2020-07-31 20:40:27 +02:00
app.domains.refresh_cache(force=True)
tpl = app.domains[name]
tpl.features['template-name'] = name
tpl.features['template-epoch'] = \
package_hdr[rpm.RPMTAG_EPOCHNUM]
tpl.features['template-version'] = \
package_hdr[rpm.RPMTAG_VERSION]
tpl.features['template-release'] = \
package_hdr[rpm.RPMTAG_RELEASE]
tpl.features['template-reponame'] = reponame
tpl.features['template-buildtime'] = \
str(datetime.datetime.fromtimestamp(
int(package_hdr[rpm.RPMTAG_BUILDTIME])))
tpl.features['template-install-time'] = \
str(datetime.datetime.today())
tpl.features['template-license'] = \
package_hdr[rpm.RPMTAG_LICENSE]
tpl.features['template-url'] = \
package_hdr[rpm.RPMTAG_URL]
tpl.features['template-summary'] = \
package_hdr[rpm.RPMTAG_SUMMARY]
tpl.features['template-description'] = \
package_hdr[rpm.RPMTAG_DESCRIPTION].replace('\n', '|')
finally:
os.remove(LOCK_FILE)
def list_templates(args, app, operation):
tpl_list = []
def append_list(data, status, install_time=None):
_ = install_time # unused
version_str = build_version_str(
(data.epoch, data.version, data.release))
tpl_list.append((status, data.name, version_str, data.reponame))
def append_info(data, status, install_time=None):
tpl_list.append((status, 'Name', ':', data.name))
tpl_list.append((status, 'Epoch', ':', data.epoch))
tpl_list.append((status, 'Version', ':', data.version))
tpl_list.append((status, 'Release', ':', data.release))
tpl_list.append((status, 'Size', ':',
qubesadmin.utils.size_to_human(data.dlsize)))
tpl_list.append((status, 'Repository', ':', data.reponame))
tpl_list.append((status, 'Buildtime', ':', str(data.buildtime)))
if install_time:
tpl_list.append((status, 'Install time', ':', str(install_time)))
tpl_list.append((status, 'URL', ':', data.url))
tpl_list.append((status, 'License', ':', data.licence))
tpl_list.append((status, 'Summary', ':', data.summary))
# Only show "Description" for the first line
title = 'Description'
for line in data.description.splitlines():
tpl_list.append((status, title, ':', line))
title = ''
tpl_list.append((status, ' ', ' ', ' ')) # empty line
if operation == 'list':
append = append_list
elif operation == 'info':
append = append_info
else:
assert False and 'Unknown operation'
def append_vm(vm, status):
append(query_local(vm), status, vm.features['template-install-time'])
if not (args.installed or args.available or args.extras or args.upgrades):
args.all = True
if args.all or args.available or args.extras or args.upgrades:
if args.templates:
query_res = set()
for spec in args.templates:
query_res |= set(qrexec_repoquery(args, app, spec))
query_res = list(query_res)
else:
query_res = qrexec_repoquery(args, app)
if args.installed or args.all:
for vm in app.domains:
if is_managed_template(vm):
if not args.templates or \
any(is_match_spec(
vm.name,
*query_local_evr(vm),
spec)[0]
for spec in args.templates):
append_vm(vm, TemplateState.INSTALLED)
if args.available or args.all:
for data in query_res:
append(data, TemplateState.AVAILABLE)
if args.extras:
remote = set()
for data in query_res:
remote.add(data.name)
for vm in app.domains:
if is_managed_template(vm) and vm.name not in remote:
append_vm(vm, TemplateState.EXTRA)
if args.upgrades:
local = {}
for vm in app.domains:
if is_managed_template(vm):
local[vm.name] = query_local_evr(vm)
for entry in query_res:
if entry.name in local:
if rpm.labelCompare(local[entry.name],
(entry.epoch, entry.version, entry.release)) < 0:
append(entry, TemplateState.UPGRADABLE)
if len(tpl_list) == 0:
parser.error('No matching templates to list')
for k, grp in itertools.groupby(tpl_list, lambda x: x[0]):
print(k.title())
qubesadmin.tools.print_table(list(map(lambda x: x[1:], grp)))
def search(args, app):
# Search in both installed and available templates
query_res = qrexec_repoquery(args, app)
for vm in app.domains:
if is_managed_template(vm):
query_res.append(query_local(vm))
# Get latest version for each template
query_res_tmp = []
2020-07-31 20:56:59 +02:00
for _, grp in itertools.groupby(sorted(query_res), lambda x: x[0]):
def compare(lhs, rhs):
return lhs if rpm.labelCompare(lhs[1:4], rhs[1:4]) < 0 else rhs
query_res_tmp.append(functools.reduce(compare, grp))
query_res = query_res_tmp
#pylint: disable=invalid-name
WEIGHT_NAME_EXACT = 1 << 4
WEIGHT_NAME = 1 << 3
WEIGHT_SUMMARY = 1 << 2
WEIGHT_DESCRIPTION = 1 << 1
WEIGHT_URL = 1 << 0
WEIGHT_TO_FIELD = [
(WEIGHT_NAME_EXACT, 'Name'),
(WEIGHT_NAME, 'Name'),
(WEIGHT_SUMMARY, 'Summary'),
(WEIGHT_DESCRIPTION, 'Description'),
(WEIGHT_URL, 'URL')]
search_res = collections.defaultdict(list)
for keyword in args.templates:
for idx, entry in enumerate(query_res):
needles = \
[(entry.name, WEIGHT_NAME), (entry.summary, WEIGHT_SUMMARY)]
if args.all:
needles += [(entry.description, WEIGHT_DESCRIPTION),
(entry.url, WEIGHT_URL)]
for key, weight in needles:
if fnmatch.fnmatch(key, '*' + keyword + '*'):
exact = keyword == key
if exact and weight == WEIGHT_NAME:
weight = WEIGHT_NAME_EXACT
search_res[idx].append((weight, keyword, exact))
if not args.all:
keywords = set(args.templates)
idxs = list(search_res.keys())
for idx in idxs:
if keywords != set(x[1] for x in search_res[idx]):
del search_res[idx]
def key_func(x):
# ORDER BY weight DESC, list_of_needles ASC, name ASC
idx, needles = x
weight = sum(t[0] for t in needles)
name = query_res[idx][0]
return (-weight, needles, name)
search_res = sorted(search_res.items(), key=key_func)
2020-07-31 20:56:59 +02:00
def gen_header(needles):
fields = []
weight_types = set(x[0] for x in needles)
for weight, field in WEIGHT_TO_FIELD:
if weight in weight_types:
fields.append(field)
exact = all(x[-1] for x in needles)
match = 'Exactly Matched' if exact else 'Matched'
keywords = sorted(list(set(x[1] for x in needles)))
return ' & '.join(fields) + ' ' + match + ': ' + ', '.join(keywords)
last_header = ''
for idx, needles in search_res:
# Print headers
2020-07-31 20:56:59 +02:00
cur_header = gen_header(needles)
if last_header != cur_header:
last_header = cur_header
# XXX: The style is different from that of DNF
print('===', cur_header, '===')
print(query_res[idx].name, ':', query_res[idx].summary)
def remove(args, app):
_ = args, app # unused
2020-07-11 17:12:57 +02:00
# Remove 'remove' entry from the args...
operation_idx = sys.argv.index('remove')
argv = sys.argv[1:operation_idx] + sys.argv[operation_idx+1:]
# ...then pass the args to qvm-remove
# Use exec so stdio can be shared easily
os.execvp('qvm-remove', ['qvm-remove'] + argv)
def clean(args, app):
# TODO: More fine-grained options
2020-07-11 17:12:57 +02:00
_ = app # unused
shutil.rmtree(args.cachedir)
def main(args=None, app=None):
raw_args = args
args, unk_args = parser.parse_known_args(raw_args)
if args.operation != 'remove' and unk_args:
args = parser.parse_args(raw_args) # this should result in an error
assert False and 'This line should not be executed.'
# FIXME: Currently doing things this way as we have to forward
# arguments to qvm-remove. While argparse.REMAINDER should be able to
# solve this, there's a bug (issue 17050) that prevents it from working
# on inputs where the first argument is an option, like 'qvm-template
# remove --help'. The bug should be fixed in Python 3.9.
if app is None:
app = qubesadmin.Qubes()
if args.refresh:
qrexec_repoquery(args, app, refresh=True)
2020-07-31 20:40:27 +02:00
if args.operation == 'download':
download(args, app)
elif args.operation == 'install':
2020-07-11 17:12:57 +02:00
install(args, app)
elif args.operation == 'reinstall':
install(args, app, version_selector=VersionSelector.REINSTALL,
override_existing=True)
elif args.operation == 'downgrade':
install(args, app, version_selector=VersionSelector.LATEST_LOWER,
override_existing=True)
elif args.operation == 'upgrade':
install(args, app, version_selector=VersionSelector.LATEST_HIGHER,
override_existing=True)
elif args.operation == 'list':
list_templates(args, app, 'list')
elif args.operation == 'info':
list_templates(args, app, 'info')
elif args.operation == 'search':
search(args, app)
elif args.operation == 'remove':
remove(args, app)
elif args.operation == 'clean':
clean(args, app)
else:
2020-07-11 17:12:57 +02:00
parser.error('Operation \'%s\' not supported.' % args.operation)
return 0
if __name__ == '__main__':
sys.exit(main())