qvm-template: More docstrings.

This commit is contained in:
WillyPillow 2020-08-06 02:05:57 +08:00
parent 41cf9f948e
commit 7b6fa39d1c
No known key found for this signature in database
GPG Key ID: 3839E194B1415A9C

View File

@ -1,5 +1,7 @@
#!/usr/bin/env python3
'''Tool for managing VM templates.'''
import argparse
import collections
import datetime
@ -154,34 +156,37 @@ class TemplateState(enum.Enum):
class VersionSelector(enum.Enum):
"""Enum representing how the candidate template version is chosen."""
LATEST = enum.auto()
"""Install latest version."""
REINSTALL = enum.auto()
"""Reinstall current version."""
LATEST_LOWER = enum.auto()
"""Downgrade to the highest version that is lower than the current one."""
LATEST_HIGHER = enum.auto()
"""Upgrade to the highest version that is higher than the current one."""
# TODO: Docstrings and type hints for Template and DlEntry
Template = collections.namedtuple('Template', [
'name',
'epoch',
'version',
'release',
'reponame',
'dlsize',
'buildtime',
'licence',
'url',
'summary',
'description'
])
class Template(typing.NamedTuple):
"""Details of a template."""
name: str
epoch: str
version: str
release: str
reponame: str
dlsize: int
buildtime: datetime.datetime
licence: str
url: str
summary: str
description: str
DlEntry = collections.namedtuple('DlEntry', [
'evr',
'reponame',
'dlsize'
])
class DlEntry(typing.NamedTuple):
"""Information about a template to be downloaded."""
evr: typing.Tuple[str, str, str]
reponame: str
dlsize: int
def build_version_str(evr: typing.Tuple[str, str, str]) -> str:
"""Return version string described by ``evr`` in (epoch, version, release)
format."""
"""Return version string described by ``evr``, which is in (epoch, version,
release) format."""
return '%s:%s-%s' % evr
def is_match_spec(name: str, epoch: str, version: str, release: str, spec: str
@ -191,11 +196,11 @@ def is_match_spec(name: str, epoch: str, version: str, release: str, spec: str
For the algorithm, refer to section "NEVRA Matching" in the DNF
documentation.
NOTE: Currently ``arch`` is ignored as the templates should be of
Note that currently ``arch`` is ignored as the templates should be of
``noarch``.
:return: the first element indicates whether there is a match; the second
element represents the priority of the match (lower is better).
:return: A tuple. The first element indicates whether there is a match; the
second element represents the priority of the match (lower is better)
"""
if epoch != 0:
targets = [
@ -263,24 +268,26 @@ def qrexec_popen(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
service: str,
stdout: int = subprocess.PIPE,
stdout: typing.Union[int, typing.IO] = subprocess.PIPE,
filter_esc: bool = True) -> subprocess.Popen:
"""Return Popen object that communicates with the given qrexec call.
"""Return ``Popen`` object that communicates with the given qrexec call in
``args.updatevm``.
Note that this falls back to invoking /etc/qubes-rpc/* directly if
args.updatevm is None.
Note that this falls back to invoking ``/etc/qubes-rpc/*`` directly if
``args.updatevm`` is None.
:param args: arguments received by the application
:param args: Arguments received by the application. ``args.updatevm`` is
used
:param app: Qubes application object
:param service: the qrexec call to invoke
:param service: The qrexec call to invoke
:param stdout: Where the process stdout points to. This is passed directly
to subprocess.Popen. Defaults to subprocess.PIPE.
to ``subprocess.Popen``. Defaults to ``subprocess.PIPE``
Note that stderr is always set to subprocess.PIPE.
:param filter_esc: whether to filter out escape sequences from
stdout/stderr. Defaults to True.
Note that stderr is always set to ``subprocess.PIPE``
:param filter_esc: Whether to filter out escape sequences from
stdout/stderr. Defaults to True
:returns: Popen object that communicates with the given qrexec call
:returns: ``Popen`` object that communicates with the given qrexec call
"""
if args.updatevm:
return app.domains[args.updatevm].run_service(
@ -294,7 +301,21 @@ def qrexec_popen(
stdout=stdout,
stderr=subprocess.PIPE)
def qrexec_payload(args, app, spec, refresh):
def qrexec_payload(args: argparse.Namespace, app: qubesadmin.app.QubesBase,
spec: str, refresh: bool) -> str:
"""Return payload string for the ``qubes.Template*`` qrexec calls.
:param args: Arguments received by the application. Specifically,
``args.{enablerepo,disablerepo,repoid,releasever,repo_files}`` are used
:param app: Qubes application object
:param spec: Package spec to query (refer to ``<package-name-spec>`` in the
DNF documentation)
:param refresh: Whether to force refresh repo metadata
:return: Payload string
:raises: Parser error if spec equals ``---`` or input contains ``\\n``
"""
_ = app # unused
if spec == '---':
@ -327,7 +348,25 @@ def qrexec_payload(args, app, spec, refresh):
payload += fd.read() + '\n'
return payload
def qrexec_repoquery(args, app, spec='*', refresh=False):
def qrexec_repoquery(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
spec: str = '*',
refresh: bool = False) -> typing.List[Template]:
"""Query template information from repositories.
:param args: Arguments received by the application. Specifically,
``args.{enablerepo,disablerepo,repoid,releasever,repo_files,updatevm}``
are used
:param app: Qubes application object
:param spec: Package spec to query (refer to ``<package-name-spec>`` in the
DNF documentation). Defaults to ``*``
:param refresh: Whether to force refresh repo metadata. Defaults to False
:raises ConnectionError: if the qrexec call fails
:return: List of ``Template`` objects representing the result of the query
"""
payload = qrexec_payload(args, app, spec, refresh)
proc = qrexec_popen(args, app, 'qubes.TemplateSearch')
stdout, stderr = proc.communicate(payload.encode('UTF-8'))
@ -386,7 +425,28 @@ def qrexec_repoquery(args, app, spec='*', refresh=False):
" unexpected data format."))
return result
def qrexec_download(args, app, spec, path, dlsize=None, refresh=False):
def qrexec_download(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
spec: str,
path: str,
dlsize: typing.Optional[int] = None,
refresh: bool = False) -> None:
"""Download a template from repositories.
:param args: Arguments received by the application. Specifically,
``args.{enablerepo,disablerepo,repoid,releasever,repo_files,updatevm}``
are used
:param app: Qubes application object
:param spec: Package spec to query (refer to ``<package-name-spec>`` in the
DNF documentation)
:param path: Path to place the downloaded template
:param dlsize: Size of template to be downloaded. Used for the progress
bar. Optional
:param refresh: Whether to force refresh repo metadata. Defaults to False
:raises ConnectionError: if the qrexec call fails
"""
with open(path, 'wb') as fd:
payload = qrexec_payload(args, app, spec, refresh)
# Don't filter ESCs for binary files
@ -405,12 +465,25 @@ def qrexec_download(args, app, spec, path, dlsize=None, refresh=False):
if proc.wait() != 0:
raise ConnectionError(
"qrexec call 'qubes.TemplateDownload' failed.")
return True
def verify_rpm(path, nogpgcheck=False, transaction_set=None):
# NOTE: Verifying RPMs this way is prone to TOCTOU. This is okay for local
# files, but may create problems if multiple instances of `qvm-template`
# are downloading the same file, so a lock is needed in that case.
def verify_rpm(
path: str,
nogpgcheck: bool = False,
transaction_set: typing.Optional[rpm.transaction.TransactionSet] = None
) -> bool:
"""Verify the digest and signature of a RPM package.
Note that verifying RPMs this way is prone to TOCTOU. This is okay for
local files, but may create problems if multiple instances of
**qvm-template** are downloading the same file, so a lock is needed in that
case.
:param path: Location of the RPM package
:param nogpgcheck: Whether to allow invalid GPG signatures
:param transaction_set: Override RPM ``TransactionSet``. Optional
:return: Whether the RPM is verified
"""
if transaction_set is None:
transaction_set = rpm.TransactionSet()
with open(path, 'rb') as fd:
@ -427,14 +500,34 @@ def verify_rpm(path, nogpgcheck=False, transaction_set=None):
return False
return True
def get_package_hdr(path, transaction_set=None):
def get_package_hdr(
path: str,
transaction_set: typing.Optional[rpm.transaction.TransactionSet] = None
) -> rpm.hdr:
"""Return header of a RPM package.
Note that this function **does not** check the integrity of the package.
:param path: Location of the RPM package
:param transaction_set: Override RPM ``TransactionSet``. Optional
:return: RPM headers
"""
if transaction_set is None:
transaction_set = rpm.TransactionSet()
with open(path, 'rb') as fd:
hdr = transaction_set.hdrFromFdno(fd)
return hdr
def extract_rpm(name, path, target):
def extract_rpm(name: str, path: str, target: str) -> bool:
"""Extract a template RPM package.
:param name: Name of the template
:param path: Location of the RPM package
:param target: Target path to extract to
:return: Whether the extraction succeeded
"""
rpm2cpio = subprocess.Popen(['rpm2cpio', path], stdout=subprocess.PIPE)
# `-D` is GNUism
cpio = subprocess.Popen([
@ -446,12 +539,26 @@ def extract_rpm(name, path, target):
], stdin=rpm2cpio.stdout, stdout=subprocess.DEVNULL)
return rpm2cpio.wait() == 0 and cpio.wait() == 0
def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
full_candid = {}
def get_dl_list(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
version_selector: VersionSelector = VersionSelector.LATEST
) -> typing.Dict[str, DlEntry]:
"""Return list of templates that needs to be downloaded.
:param args: Arguments received by the application.
:param app: Qubes application object
:param version_selector: Specify algorithm to select the candidate version
of a package. Defaults to ``VersionSelector.LATEST``
:return: Dictionary that maps to ``DlEntry`` the names of templates that
needs to be downloaded
"""
full_candid: typing.Dict[str, DlEntry] = {}
for template in args.templates:
# This will be merged into `full_candid` later.
# It is separated so that we can check whether it is empty.
candid = {}
candid: typing.Dict[str, DlEntry] = {}
# Skip local RPMs
if template.endswith('.rpm'):
@ -505,15 +612,35 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST):
file=sys.stderr)
# Merge & choose the template with the highest version
for name, entry in candid.items():
for name, dlentry in candid.items():
if name not in full_candid \
or rpm.labelCompare(full_candid[name].evr, entry.evr) < 0:
full_candid[name] = entry
or rpm.labelCompare(full_candid[name].evr, dlentry.evr) < 0:
full_candid[name] = dlentry
return full_candid
def download(args, app, path_override=None,
dl_list=None, suffix='', version_selector=VersionSelector.LATEST):
def download(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
path_override: typing.Optional[str] = None,
dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None,
suffix: str = '',
version_selector: VersionSelector = VersionSelector.LATEST) -> None:
"""Command that downloads template packages.
:param args: Arguments received by the application.
:param app: Qubes application object
:param path_override: Override path to store downloads. If not set or set
to None, ``args.downloaddir`` is used. Optional
:param dl_list: Override list of templates to download. If not set or set
to None, ``get_dl_list`` is called, which generates the list from
``args``. Optional
:param suffix: Suffix to add to the file name of downloaded packages. This
is useful if you want to distinguish between verified and unverified
packages. Defaults to an empty string
:param version_selector: Specify algorithm to select the candidate version
of a package. Defaults to ``VersionSelector.LATEST``
"""
if dl_list is None:
dl_list = get_dl_list(args, app, version_selector=version_selector)
@ -553,8 +680,23 @@ def download(args, app, path_override=None,
print('\'%s\' download failed.' % spec, file=sys.stderr)
sys.exit(1)
def install(args, app, version_selector=VersionSelector.LATEST,
override_existing=False):
def install(
args: argparse.Namespace,
app: qubesadmin.app.QubesBase,
version_selector: VersionSelector = VersionSelector.LATEST,
override_existing: bool = False) -> None:
"""Command that installs template packages.
This command creates a lock file to ensure that two instances are not
running at the same time.
:param args: Arguments received by the application.
:param app: Qubes application object
:param version_selector: Specify algorithm to select the candidate version
of a package. Defaults to ``VersionSelector.LATEST``
:param override_existing: Whether to override existing packages. Used for
reinstall, upgrade, and downgrade operations
"""
try:
with open(LOCK_FILE, 'x') as _:
pass
@ -700,7 +842,16 @@ def install(args, app, version_selector=VersionSelector.LATEST,
finally:
os.remove(LOCK_FILE)
def list_templates(args, app, operation):
def list_templates(args: argparse.Namespace,
app: qubesadmin.app.QubesBase, operation: str) -> None:
"""Command that lists templates.
:param args: Arguments received by the application.
:param app: Qubes application object
:param operation: If set to ``list``, display a listing similar to ``dnf
list``. If set to ``info``, display detailed template information
similar to ``dnf info``. Otherwise, an ``AssertionError`` is raised.
"""
tpl_list = []
def append_list(data, status, install_time=None):
@ -745,10 +896,10 @@ def list_templates(args, app, operation):
if args.all or args.available or args.extras or args.upgrades:
if args.templates:
query_res = set()
query_res_set: typing.Set[Template] = set()
for spec in args.templates:
query_res |= set(qrexec_repoquery(args, app, spec))
query_res = list(query_res)
query_res_set |= set(qrexec_repoquery(args, app, spec))
query_res = list(query_res_set)
else:
query_res = qrexec_repoquery(args, app)
@ -793,7 +944,12 @@ def list_templates(args, app, operation):
print(k.title())
qubesadmin.tools.print_table(list(map(lambda x: x[1:], grp)))
def search(args, app):
def search(args: argparse.Namespace, app: qubesadmin.app.QubesBase) -> None:
"""Command that searches template details for given patterns.
:param args: Arguments received by the application.
:param app: Qubes application object
"""
# Search in both installed and available templates
query_res = qrexec_repoquery(args, app)
for vm in app.domains:
@ -822,27 +978,29 @@ def search(args, app):
(WEIGHT_DESCRIPTION, 'Description'),
(WEIGHT_URL, 'URL')]
search_res = collections.defaultdict(list)
search_res_by_idx: \
typing.Dict[int, typing.List[typing.Tuple[int, str, bool]]] = \
collections.defaultdict(list)
for keyword in args.templates:
for idx, entry in enumerate(query_res):
needles = \
needle_types = \
[(entry.name, WEIGHT_NAME), (entry.summary, WEIGHT_SUMMARY)]
if args.all:
needles += [(entry.description, WEIGHT_DESCRIPTION),
needle_types += [(entry.description, WEIGHT_DESCRIPTION),
(entry.url, WEIGHT_URL)]
for key, weight in needles:
for key, weight in needle_types:
if fnmatch.fnmatch(key, '*' + keyword + '*'):
exact = keyword == key
if exact and weight == WEIGHT_NAME:
weight = WEIGHT_NAME_EXACT
search_res[idx].append((weight, keyword, exact))
search_res_by_idx[idx].append((weight, keyword, exact))
if not args.all:
keywords = set(args.templates)
idxs = list(search_res.keys())
idxs = list(search_res_by_idx.keys())
for idx in idxs:
if keywords != set(x[1] for x in search_res[idx]):
del search_res[idx]
if keywords != set(x[1] for x in search_res_by_idx[idx]):
del search_res_by_idx[idx]
def key_func(x):
# ORDER BY weight DESC, list_of_needles ASC, name ASC
@ -851,7 +1009,7 @@ def search(args, app):
name = query_res[idx][0]
return (-weight, needles, name)
search_res = sorted(search_res.items(), key=key_func)
search_res = sorted(search_res_by_idx.items(), key=key_func)
def gen_header(needles):
fields = []
@ -874,7 +1032,12 @@ def search(args, app):
print('===', cur_header, '===')
print(query_res[idx].name, ':', query_res[idx].summary)
def remove(args, app):
def remove(args: argparse.Namespace, app: qubesadmin.app.QubesBase) -> None:
"""Command that remove templates.
:param args: Arguments received by the application.
:param app: Qubes application object
"""
_ = args, app # unused
# Remove 'remove' entry from the args...
@ -885,17 +1048,29 @@ def remove(args, app):
# Use exec so stdio can be shared easily
os.execvp('qvm-remove', ['qvm-remove'] + argv)
def clean(args, app):
def clean(args: argparse.Namespace, app: qubesadmin.app.QubesBase) -> None:
"""Command that cleans the local package cache.
:param args: Arguments received by the application.
:param app: Qubes application object
"""
# TODO: More fine-grained options
_ = app # unused
shutil.rmtree(args.cachedir)
def main(args=None, app=None):
raw_args = args
args, unk_args = parser.parse_known_args(raw_args)
if args.operation != 'remove' and unk_args:
args = parser.parse_args(raw_args) # this should result in an error
def main(args: typing.Optional[typing.Sequence[str]] = None,
app: typing.Optional[qubesadmin.app.QubesBase] = None) -> int:
"""Main routine of **qvm-template**.
:param args: Override arguments received by the application. Optional
:param app: Override Qubes application object. Optional
:return: Return code of the application
"""
p_args, unk_args = parser.parse_known_args(args)
if p_args.operation != 'remove' and unk_args:
p_args = parser.parse_args(args) # this should result in an error
assert False and 'This line should not be executed.'
# FIXME: Currently doing things this way as we have to forward
# arguments to qvm-remove. While argparse.REMAINDER should be able to
@ -906,34 +1081,34 @@ def main(args=None, app=None):
if app is None:
app = qubesadmin.Qubes()
if args.refresh:
qrexec_repoquery(args, app, refresh=True)
if p_args.refresh:
qrexec_repoquery(p_args, app, refresh=True)
if args.operation == 'download':
download(args, app)
elif args.operation == 'install':
install(args, app)
elif args.operation == 'reinstall':
install(args, app, version_selector=VersionSelector.REINSTALL,
if p_args.operation == 'download':
download(p_args, app)
elif p_args.operation == 'install':
install(p_args, app)
elif p_args.operation == 'reinstall':
install(p_args, app, version_selector=VersionSelector.REINSTALL,
override_existing=True)
elif args.operation == 'downgrade':
install(args, app, version_selector=VersionSelector.LATEST_LOWER,
elif p_args.operation == 'downgrade':
install(p_args, app, version_selector=VersionSelector.LATEST_LOWER,
override_existing=True)
elif args.operation == 'upgrade':
install(args, app, version_selector=VersionSelector.LATEST_HIGHER,
elif p_args.operation == 'upgrade':
install(p_args, app, version_selector=VersionSelector.LATEST_HIGHER,
override_existing=True)
elif args.operation == 'list':
list_templates(args, app, 'list')
elif args.operation == 'info':
list_templates(args, app, 'info')
elif args.operation == 'search':
search(args, app)
elif args.operation == 'remove':
remove(args, app)
elif args.operation == 'clean':
clean(args, app)
elif p_args.operation == 'list':
list_templates(p_args, app, 'list')
elif p_args.operation == 'info':
list_templates(p_args, app, 'info')
elif p_args.operation == 'search':
search(p_args, app)
elif p_args.operation == 'remove':
remove(p_args, app)
elif p_args.operation == 'clean':
clean(p_args, app)
else:
parser.error('Operation \'%s\' not supported.' % args.operation)
parser.error('Operation \'%s\' not supported.' % p_args.operation)
return 0