Browse Source

qvm-template: factor filter_version() out of get_dl_list()

This allows reusing version filtering (getting only a single version per
template) in other places.

For equal versions packages, prefer the one from non-testing repository.
Marek Marczykowski-Górecki 3 years ago
parent
commit
86326b53c4
1 changed files with 57 additions and 35 deletions
  1. 57 35
      qubesadmin/tools/qvm_template.py

+ 57 - 35
qubesadmin/tools/qvm_template.py

@@ -263,6 +263,11 @@ class Template(typing.NamedTuple):
     summary: str
     description: str
 
+    @property
+    def evr(self):
+        """Return a tuple of (EPOCH, VERSION, RELEASE)"""
+        return self.epoch, self.version, self.release
+
 class DlEntry(typing.NamedTuple):
     """Information about a template to be downloaded."""
     evr: typing.Tuple[str, str, str]
@@ -634,6 +639,50 @@ def extract_rpm(name: str, path: str, target: str) -> bool:
         ], stdin=rpm2cpio.stdout, stdout=subprocess.DEVNULL)
     return rpm2cpio.wait() == 0 and cpio.wait() == 0
 
+
+def filter_version(
+        query_res,
+        app: qubesadmin.app.QubesBase,
+        version_selector: VersionSelector = VersionSelector.LATEST):
+    """Select only one version for given template name"""
+    # We only select one package for each distinct package name
+    results: typing.Dict[str, Template] = {}
+
+    for entry in query_res:
+        evr = (entry.epoch, entry.version, entry.release)
+        insert = False
+        if version_selector == VersionSelector.LATEST:
+            if entry.name not in results:
+                insert = True
+            if entry.name in results \
+                    and rpm.labelCompare(results[entry.name].evr, evr) < 0:
+                insert = True
+            if entry.name in results \
+                    and rpm.labelCompare(results[entry.name].evr, evr) == 0 \
+                    and 'testing' not in entry.reponame:
+                # for the same-version matches, prefer non-testing one
+                insert = True
+        elif version_selector == VersionSelector.REINSTALL:
+            vm = get_managed_template_vm(app, entry.name)
+            cur_ver = query_local_evr(vm)
+            if rpm.labelCompare(evr, cur_ver) == 0:
+                insert = True
+        elif version_selector in [VersionSelector.LATEST_LOWER,
+                                  VersionSelector.LATEST_HIGHER]:
+            vm = get_managed_template_vm(app, entry.name)
+            cur_ver = query_local_evr(vm)
+            cmp_res = -1 \
+                if version_selector == VersionSelector.LATEST_LOWER \
+                else 1
+            if rpm.labelCompare(evr, cur_ver) == cmp_res:
+                if entry.name not in results \
+                        or rpm.labelCompare(results[entry.name].evr, evr) < 0:
+                    insert = True
+        if insert:
+            results[entry.name] = entry
+
+    return results.values()
+
 def get_dl_list(
         args: argparse.Namespace,
         app: qubesadmin.app.QubesBase,
@@ -651,10 +700,6 @@ def get_dl_list(
     """
     full_candid: typing.Dict[str, DlEntry] = {}
     for template in args.templates:
-        # This will be merged into `full_candid` later.
-        # It is separated so that we can check whether it is empty.
-        candid: typing.Dict[str, DlEntry] = {}
-
         # Skip local RPMs
         if template.endswith('.rpm'):
             continue
@@ -662,35 +707,10 @@ def get_dl_list(
         query_res = qrexec_repoquery(args, app, PACKAGE_NAME_PREFIX + template)
 
         # We only select one package for each distinct package name
-        for entry in query_res:
-            ver = (entry.epoch, entry.version, entry.release)
-            insert = False
-            if version_selector == VersionSelector.LATEST:
-                if entry.name not in candid \
-                        or rpm.labelCompare(candid[entry.name][0], ver) < 0:
-                    insert = True
-            elif version_selector == VersionSelector.REINSTALL:
-                vm = get_managed_template_vm(app, entry.name)
-                cur_ver = query_local_evr(vm)
-                if rpm.labelCompare(ver, cur_ver) == 0:
-                    insert = True
-            elif version_selector in [VersionSelector.LATEST_LOWER,
-                    VersionSelector.LATEST_HIGHER]:
-                vm = get_managed_template_vm(app, entry.name)
-                cur_ver = query_local_evr(vm)
-                cmp_res = -1 \
-                    if version_selector == VersionSelector.LATEST_LOWER \
-                    else 1
-                if rpm.labelCompare(ver, cur_ver) == cmp_res:
-                    if entry.name not in candid \
-                            or rpm.labelCompare(candid[entry.name][0], ver) < 0:
-                        insert = True
-            if insert:
-                candid[entry.name] = DlEntry(ver, entry.reponame, entry.dlsize)
-
+        query_res = filter_version(query_res, app, version_selector)
         # XXX: As it's possible to include version information in `template`,
         #      perhaps the messages can be improved
-        if len(candid) == 0:
+        if len(query_res) == 0:
             if version_selector == VersionSelector.LATEST:
                 parser.error('Template \'%s\' not found.' % template)
             elif version_selector == VersionSelector.REINSTALL:
@@ -707,10 +727,12 @@ def get_dl_list(
                     file=sys.stderr)
 
         # Merge & choose the template with the highest version
-        for name, dlentry in candid.items():
-            if name not in full_candid \
-                    or rpm.labelCompare(full_candid[name].evr, dlentry.evr) < 0:
-                full_candid[name] = dlentry
+        for entry in query_res:
+            if entry.name not in full_candid \
+                    or rpm.labelCompare(full_candid[entry.name].evr,
+                                        entry.evr) < 0:
+                full_candid[entry.name] = \
+                    DlEntry(entry.evr, entry.reponame, entry.dlsize)
 
     return full_candid