diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index 330aec4..4112385 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +'''Tool for managing VM templates.''' + import argparse import collections import datetime @@ -154,34 +156,37 @@ class TemplateState(enum.Enum): class VersionSelector(enum.Enum): """Enum representing how the candidate template version is chosen.""" LATEST = enum.auto() + """Install latest version.""" REINSTALL = enum.auto() + """Reinstall current version.""" LATEST_LOWER = enum.auto() + """Downgrade to the highest version that is lower than the current one.""" LATEST_HIGHER = enum.auto() + """Upgrade to the highest version that is higher than the current one.""" -# TODO: Docstrings and type hints for Template and DlEntry -Template = collections.namedtuple('Template', [ - 'name', - 'epoch', - 'version', - 'release', - 'reponame', - 'dlsize', - 'buildtime', - 'licence', - 'url', - 'summary', - 'description' -]) +class Template(typing.NamedTuple): + """Details of a template.""" + name: str + epoch: str + version: str + release: str + reponame: str + dlsize: int + buildtime: datetime.datetime + licence: str + url: str + summary: str + description: str -DlEntry = collections.namedtuple('DlEntry', [ - 'evr', - 'reponame', - 'dlsize' -]) +class DlEntry(typing.NamedTuple): + """Information about a template to be downloaded.""" + evr: typing.Tuple[str, str, str] + reponame: str + dlsize: int def build_version_str(evr: typing.Tuple[str, str, str]) -> str: - """Return version string described by ``evr`` in (epoch, version, release) - format.""" + """Return version string described by ``evr``, which is in (epoch, version, + release) format.""" return '%s:%s-%s' % evr def is_match_spec(name: str, epoch: str, version: str, release: str, spec: str @@ -191,11 +196,11 @@ def is_match_spec(name: str, epoch: str, version: str, release: str, spec: str For the algorithm, refer to section "NEVRA Matching" in the DNF documentation. - NOTE: Currently ``arch`` is ignored as the templates should be of + Note that currently ``arch`` is ignored as the templates should be of ``noarch``. - :return: the first element indicates whether there is a match; the second - element represents the priority of the match (lower is better). + :return: A tuple. The first element indicates whether there is a match; the + second element represents the priority of the match (lower is better) """ if epoch != 0: targets = [ @@ -263,24 +268,26 @@ def qrexec_popen( args: argparse.Namespace, app: qubesadmin.app.QubesBase, service: str, - stdout: int = subprocess.PIPE, + stdout: typing.Union[int, typing.IO] = subprocess.PIPE, filter_esc: bool = True) -> subprocess.Popen: - """Return Popen object that communicates with the given qrexec call. + """Return ``Popen`` object that communicates with the given qrexec call in + ``args.updatevm``. - Note that this falls back to invoking /etc/qubes-rpc/* directly if - args.updatevm is None. + Note that this falls back to invoking ``/etc/qubes-rpc/*`` directly if + ``args.updatevm`` is None. - :param args: arguments received by the application + :param args: Arguments received by the application. ``args.updatevm`` is + used :param app: Qubes application object - :param service: the qrexec call to invoke + :param service: The qrexec call to invoke :param stdout: Where the process stdout points to. This is passed directly - to subprocess.Popen. Defaults to subprocess.PIPE. + to ``subprocess.Popen``. Defaults to ``subprocess.PIPE`` - Note that stderr is always set to subprocess.PIPE. - :param filter_esc: whether to filter out escape sequences from - stdout/stderr. Defaults to True. + Note that stderr is always set to ``subprocess.PIPE`` + :param filter_esc: Whether to filter out escape sequences from + stdout/stderr. Defaults to True - :returns: Popen object that communicates with the given qrexec call + :returns: ``Popen`` object that communicates with the given qrexec call """ if args.updatevm: return app.domains[args.updatevm].run_service( @@ -294,7 +301,21 @@ def qrexec_popen( stdout=stdout, stderr=subprocess.PIPE) -def qrexec_payload(args, app, spec, refresh): +def qrexec_payload(args: argparse.Namespace, app: qubesadmin.app.QubesBase, + spec: str, refresh: bool) -> str: + """Return payload string for the ``qubes.Template*`` qrexec calls. + + :param args: Arguments received by the application. Specifically, + ``args.{enablerepo,disablerepo,repoid,releasever,repo_files}`` are used + :param app: Qubes application object + :param spec: Package spec to query (refer to ```` in the + DNF documentation) + :param refresh: Whether to force refresh repo metadata + + :return: Payload string + + :raises: Parser error if spec equals ``---`` or input contains ``\\n`` + """ _ = app # unused if spec == '---': @@ -327,7 +348,25 @@ def qrexec_payload(args, app, spec, refresh): payload += fd.read() + '\n' return payload -def qrexec_repoquery(args, app, spec='*', refresh=False): +def qrexec_repoquery( + args: argparse.Namespace, + app: qubesadmin.app.QubesBase, + spec: str = '*', + refresh: bool = False) -> typing.List[Template]: + """Query template information from repositories. + + :param args: Arguments received by the application. Specifically, + ``args.{enablerepo,disablerepo,repoid,releasever,repo_files,updatevm}`` + are used + :param app: Qubes application object + :param spec: Package spec to query (refer to ```` in the + DNF documentation). Defaults to ``*`` + :param refresh: Whether to force refresh repo metadata. Defaults to False + + :raises ConnectionError: if the qrexec call fails + + :return: List of ``Template`` objects representing the result of the query + """ payload = qrexec_payload(args, app, spec, refresh) proc = qrexec_popen(args, app, 'qubes.TemplateSearch') stdout, stderr = proc.communicate(payload.encode('UTF-8')) @@ -386,7 +425,28 @@ def qrexec_repoquery(args, app, spec='*', refresh=False): " unexpected data format.")) return result -def qrexec_download(args, app, spec, path, dlsize=None, refresh=False): +def qrexec_download( + args: argparse.Namespace, + app: qubesadmin.app.QubesBase, + spec: str, + path: str, + dlsize: typing.Optional[int] = None, + refresh: bool = False) -> None: + """Download a template from repositories. + + :param args: Arguments received by the application. Specifically, + ``args.{enablerepo,disablerepo,repoid,releasever,repo_files,updatevm}`` + are used + :param app: Qubes application object + :param spec: Package spec to query (refer to ```` in the + DNF documentation) + :param path: Path to place the downloaded template + :param dlsize: Size of template to be downloaded. Used for the progress + bar. Optional + :param refresh: Whether to force refresh repo metadata. Defaults to False + + :raises ConnectionError: if the qrexec call fails + """ with open(path, 'wb') as fd: payload = qrexec_payload(args, app, spec, refresh) # Don't filter ESCs for binary files @@ -405,12 +465,25 @@ def qrexec_download(args, app, spec, path, dlsize=None, refresh=False): if proc.wait() != 0: raise ConnectionError( "qrexec call 'qubes.TemplateDownload' failed.") - return True -def verify_rpm(path, nogpgcheck=False, transaction_set=None): - # NOTE: Verifying RPMs this way is prone to TOCTOU. This is okay for local - # files, but may create problems if multiple instances of `qvm-template` - # are downloading the same file, so a lock is needed in that case. +def verify_rpm( + path: str, + nogpgcheck: bool = False, + transaction_set: typing.Optional[rpm.transaction.TransactionSet] = None + ) -> bool: + """Verify the digest and signature of a RPM package. + + Note that verifying RPMs this way is prone to TOCTOU. This is okay for + local files, but may create problems if multiple instances of + **qvm-template** are downloading the same file, so a lock is needed in that + case. + + :param path: Location of the RPM package + :param nogpgcheck: Whether to allow invalid GPG signatures + :param transaction_set: Override RPM ``TransactionSet``. Optional + + :return: Whether the RPM is verified + """ if transaction_set is None: transaction_set = rpm.TransactionSet() with open(path, 'rb') as fd: @@ -427,14 +500,34 @@ def verify_rpm(path, nogpgcheck=False, transaction_set=None): return False return True -def get_package_hdr(path, transaction_set=None): +def get_package_hdr( + path: str, + transaction_set: typing.Optional[rpm.transaction.TransactionSet] = None + ) -> rpm.hdr: + """Return header of a RPM package. + + Note that this function **does not** check the integrity of the package. + + :param path: Location of the RPM package + :param transaction_set: Override RPM ``TransactionSet``. Optional + + :return: RPM headers + """ if transaction_set is None: transaction_set = rpm.TransactionSet() with open(path, 'rb') as fd: hdr = transaction_set.hdrFromFdno(fd) return hdr -def extract_rpm(name, path, target): +def extract_rpm(name: str, path: str, target: str) -> bool: + """Extract a template RPM package. + + :param name: Name of the template + :param path: Location of the RPM package + :param target: Target path to extract to + + :return: Whether the extraction succeeded + """ rpm2cpio = subprocess.Popen(['rpm2cpio', path], stdout=subprocess.PIPE) # `-D` is GNUism cpio = subprocess.Popen([ @@ -446,12 +539,26 @@ def extract_rpm(name, path, target): ], stdin=rpm2cpio.stdout, stdout=subprocess.DEVNULL) return rpm2cpio.wait() == 0 and cpio.wait() == 0 -def get_dl_list(args, app, version_selector=VersionSelector.LATEST): - full_candid = {} +def get_dl_list( + args: argparse.Namespace, + app: qubesadmin.app.QubesBase, + version_selector: VersionSelector = VersionSelector.LATEST + ) -> typing.Dict[str, DlEntry]: + """Return list of templates that needs to be downloaded. + + :param args: Arguments received by the application. + :param app: Qubes application object + :param version_selector: Specify algorithm to select the candidate version + of a package. Defaults to ``VersionSelector.LATEST`` + + :return: Dictionary that maps to ``DlEntry`` the names of templates that + needs to be downloaded + """ + full_candid: typing.Dict[str, DlEntry] = {} for template in args.templates: # This will be merged into `full_candid` later. # It is separated so that we can check whether it is empty. - candid = {} + candid: typing.Dict[str, DlEntry] = {} # Skip local RPMs if template.endswith('.rpm'): @@ -505,15 +612,35 @@ def get_dl_list(args, app, version_selector=VersionSelector.LATEST): file=sys.stderr) # Merge & choose the template with the highest version - for name, entry in candid.items(): + for name, dlentry in candid.items(): if name not in full_candid \ - or rpm.labelCompare(full_candid[name].evr, entry.evr) < 0: - full_candid[name] = entry + or rpm.labelCompare(full_candid[name].evr, dlentry.evr) < 0: + full_candid[name] = dlentry return full_candid -def download(args, app, path_override=None, - dl_list=None, suffix='', version_selector=VersionSelector.LATEST): +def download( + args: argparse.Namespace, + app: qubesadmin.app.QubesBase, + path_override: typing.Optional[str] = None, + dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None, + suffix: str = '', + version_selector: VersionSelector = VersionSelector.LATEST) -> None: + """Command that downloads template packages. + + :param args: Arguments received by the application. + :param app: Qubes application object + :param path_override: Override path to store downloads. If not set or set + to None, ``args.downloaddir`` is used. Optional + :param dl_list: Override list of templates to download. If not set or set + to None, ``get_dl_list`` is called, which generates the list from + ``args``. Optional + :param suffix: Suffix to add to the file name of downloaded packages. This + is useful if you want to distinguish between verified and unverified + packages. Defaults to an empty string + :param version_selector: Specify algorithm to select the candidate version + of a package. Defaults to ``VersionSelector.LATEST`` + """ if dl_list is None: dl_list = get_dl_list(args, app, version_selector=version_selector) @@ -553,8 +680,23 @@ def download(args, app, path_override=None, print('\'%s\' download failed.' % spec, file=sys.stderr) sys.exit(1) -def install(args, app, version_selector=VersionSelector.LATEST, - override_existing=False): +def install( + args: argparse.Namespace, + app: qubesadmin.app.QubesBase, + version_selector: VersionSelector = VersionSelector.LATEST, + override_existing: bool = False) -> None: + """Command that installs template packages. + + This command creates a lock file to ensure that two instances are not + running at the same time. + + :param args: Arguments received by the application. + :param app: Qubes application object + :param version_selector: Specify algorithm to select the candidate version + of a package. Defaults to ``VersionSelector.LATEST`` + :param override_existing: Whether to override existing packages. Used for + reinstall, upgrade, and downgrade operations + """ try: with open(LOCK_FILE, 'x') as _: pass @@ -700,7 +842,16 @@ def install(args, app, version_selector=VersionSelector.LATEST, finally: os.remove(LOCK_FILE) -def list_templates(args, app, operation): +def list_templates(args: argparse.Namespace, + app: qubesadmin.app.QubesBase, operation: str) -> None: + """Command that lists templates. + + :param args: Arguments received by the application. + :param app: Qubes application object + :param operation: If set to ``list``, display a listing similar to ``dnf + list``. If set to ``info``, display detailed template information + similar to ``dnf info``. Otherwise, an ``AssertionError`` is raised. + """ tpl_list = [] def append_list(data, status, install_time=None): @@ -745,10 +896,10 @@ def list_templates(args, app, operation): if args.all or args.available or args.extras or args.upgrades: if args.templates: - query_res = set() + query_res_set: typing.Set[Template] = set() for spec in args.templates: - query_res |= set(qrexec_repoquery(args, app, spec)) - query_res = list(query_res) + query_res_set |= set(qrexec_repoquery(args, app, spec)) + query_res = list(query_res_set) else: query_res = qrexec_repoquery(args, app) @@ -793,7 +944,12 @@ def list_templates(args, app, operation): print(k.title()) qubesadmin.tools.print_table(list(map(lambda x: x[1:], grp))) -def search(args, app): +def search(args: argparse.Namespace, app: qubesadmin.app.QubesBase) -> None: + """Command that searches template details for given patterns. + + :param args: Arguments received by the application. + :param app: Qubes application object + """ # Search in both installed and available templates query_res = qrexec_repoquery(args, app) for vm in app.domains: @@ -822,27 +978,29 @@ def search(args, app): (WEIGHT_DESCRIPTION, 'Description'), (WEIGHT_URL, 'URL')] - search_res = collections.defaultdict(list) + search_res_by_idx: \ + typing.Dict[int, typing.List[typing.Tuple[int, str, bool]]] = \ + collections.defaultdict(list) for keyword in args.templates: for idx, entry in enumerate(query_res): - needles = \ + needle_types = \ [(entry.name, WEIGHT_NAME), (entry.summary, WEIGHT_SUMMARY)] if args.all: - needles += [(entry.description, WEIGHT_DESCRIPTION), + needle_types += [(entry.description, WEIGHT_DESCRIPTION), (entry.url, WEIGHT_URL)] - for key, weight in needles: + for key, weight in needle_types: if fnmatch.fnmatch(key, '*' + keyword + '*'): exact = keyword == key if exact and weight == WEIGHT_NAME: weight = WEIGHT_NAME_EXACT - search_res[idx].append((weight, keyword, exact)) + search_res_by_idx[idx].append((weight, keyword, exact)) if not args.all: keywords = set(args.templates) - idxs = list(search_res.keys()) + idxs = list(search_res_by_idx.keys()) for idx in idxs: - if keywords != set(x[1] for x in search_res[idx]): - del search_res[idx] + if keywords != set(x[1] for x in search_res_by_idx[idx]): + del search_res_by_idx[idx] def key_func(x): # ORDER BY weight DESC, list_of_needles ASC, name ASC @@ -851,7 +1009,7 @@ def search(args, app): name = query_res[idx][0] return (-weight, needles, name) - search_res = sorted(search_res.items(), key=key_func) + search_res = sorted(search_res_by_idx.items(), key=key_func) def gen_header(needles): fields = [] @@ -874,7 +1032,12 @@ def search(args, app): print('===', cur_header, '===') print(query_res[idx].name, ':', query_res[idx].summary) -def remove(args, app): +def remove(args: argparse.Namespace, app: qubesadmin.app.QubesBase) -> None: + """Command that remove templates. + + :param args: Arguments received by the application. + :param app: Qubes application object + """ _ = args, app # unused # Remove 'remove' entry from the args... @@ -885,17 +1048,29 @@ def remove(args, app): # Use exec so stdio can be shared easily os.execvp('qvm-remove', ['qvm-remove'] + argv) -def clean(args, app): +def clean(args: argparse.Namespace, app: qubesadmin.app.QubesBase) -> None: + """Command that cleans the local package cache. + + :param args: Arguments received by the application. + :param app: Qubes application object + """ # TODO: More fine-grained options _ = app # unused shutil.rmtree(args.cachedir) -def main(args=None, app=None): - raw_args = args - args, unk_args = parser.parse_known_args(raw_args) - if args.operation != 'remove' and unk_args: - args = parser.parse_args(raw_args) # this should result in an error +def main(args: typing.Optional[typing.Sequence[str]] = None, + app: typing.Optional[qubesadmin.app.QubesBase] = None) -> int: + """Main routine of **qvm-template**. + + :param args: Override arguments received by the application. Optional + :param app: Override Qubes application object. Optional + + :return: Return code of the application + """ + p_args, unk_args = parser.parse_known_args(args) + if p_args.operation != 'remove' and unk_args: + p_args = parser.parse_args(args) # this should result in an error assert False and 'This line should not be executed.' # FIXME: Currently doing things this way as we have to forward # arguments to qvm-remove. While argparse.REMAINDER should be able to @@ -906,34 +1081,34 @@ def main(args=None, app=None): if app is None: app = qubesadmin.Qubes() - if args.refresh: - qrexec_repoquery(args, app, refresh=True) + if p_args.refresh: + qrexec_repoquery(p_args, app, refresh=True) - if args.operation == 'download': - download(args, app) - elif args.operation == 'install': - install(args, app) - elif args.operation == 'reinstall': - install(args, app, version_selector=VersionSelector.REINSTALL, + if p_args.operation == 'download': + download(p_args, app) + elif p_args.operation == 'install': + install(p_args, app) + elif p_args.operation == 'reinstall': + install(p_args, app, version_selector=VersionSelector.REINSTALL, override_existing=True) - elif args.operation == 'downgrade': - install(args, app, version_selector=VersionSelector.LATEST_LOWER, + elif p_args.operation == 'downgrade': + install(p_args, app, version_selector=VersionSelector.LATEST_LOWER, override_existing=True) - elif args.operation == 'upgrade': - install(args, app, version_selector=VersionSelector.LATEST_HIGHER, + elif p_args.operation == 'upgrade': + install(p_args, app, version_selector=VersionSelector.LATEST_HIGHER, override_existing=True) - elif args.operation == 'list': - list_templates(args, app, 'list') - elif args.operation == 'info': - list_templates(args, app, 'info') - elif args.operation == 'search': - search(args, app) - elif args.operation == 'remove': - remove(args, app) - elif args.operation == 'clean': - clean(args, app) + elif p_args.operation == 'list': + list_templates(p_args, app, 'list') + elif p_args.operation == 'info': + list_templates(p_args, app, 'info') + elif p_args.operation == 'search': + search(p_args, app) + elif p_args.operation == 'remove': + remove(p_args, app) + elif p_args.operation == 'clean': + clean(p_args, app) else: - parser.error('Operation \'%s\' not supported.' % args.operation) + parser.error('Operation \'%s\' not supported.' % p_args.operation) return 0