|
@@ -21,6 +21,7 @@
|
|
|
|
|
|
import argparse
|
|
|
import collections
|
|
|
+import configparser
|
|
|
import datetime
|
|
|
import enum
|
|
|
import fcntl
|
|
@@ -104,8 +105,10 @@ def get_parser() -> argparse.ArgumentParser:
|
|
|
help=('Specify files containing DNF repository configuration.'
|
|
|
' Can be used more than once.'))
|
|
|
parser_main.add_argument('--keyring',
|
|
|
- default='/etc/qubes/repo-templates/keys',
|
|
|
- help='Specify directory containing RPM public keys.')
|
|
|
+ default='/etc/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-4.1-primary',
|
|
|
+ help='Specify a file containing default RPM public key. '
|
|
|
+ 'Individual repositories may point at repo-specific key '
|
|
|
+ 'using \'gpgkey\' option')
|
|
|
parser_main.add_argument('--updatevm', default=UPDATEVM,
|
|
|
help=('Specify VM to download updates from.'
|
|
|
' (Set to empty string to specify the current VM.)'))
|
|
@@ -573,18 +576,32 @@ def qrexec_download(
|
|
|
raise ConnectionError(
|
|
|
"qrexec call 'qubes.TemplateDownload' failed.")
|
|
|
|
|
|
-def get_keys(key_dir: str) -> typing.List[str]:
|
|
|
- """List gpg keys"""
|
|
|
- keys = []
|
|
|
- for name in os.listdir(key_dir):
|
|
|
- path = os.path.join(key_dir, name)
|
|
|
- if os.path.isfile(path):
|
|
|
- keys.append(path)
|
|
|
+
|
|
|
+def get_keys_for_repos(repo_files: typing.List[str],
|
|
|
+ releasever: str) -> typing.Dict[str, str]:
|
|
|
+ """List gpg keys
|
|
|
+
|
|
|
+ Returns a dict reponame -> key path
|
|
|
+ """
|
|
|
+ keys = {}
|
|
|
+ for repo_file in repo_files:
|
|
|
+ repo_config = configparser.ConfigParser()
|
|
|
+ repo_config.read(repo_file)
|
|
|
+ for repo in repo_config.sections():
|
|
|
+ try:
|
|
|
+ gpgkey_url = repo_config.get(repo, 'gpgkey')
|
|
|
+ except configparser.NoOptionError:
|
|
|
+ continue
|
|
|
+ gpgkey_url = gpgkey_url.replace('$releasever', releasever)
|
|
|
+ # support only file:// urls
|
|
|
+ if gpgkey_url.startswith('file://'):
|
|
|
+ keys[repo] = gpgkey_url[len('file://'):]
|
|
|
return keys
|
|
|
|
|
|
+
|
|
|
def verify_rpm(
|
|
|
path: str,
|
|
|
- keys: typing.List[str],
|
|
|
+ key: str,
|
|
|
nogpgcheck: bool = False
|
|
|
) -> rpm.hdr:
|
|
|
"""Verify the digest and signature of a RPM package and return the package
|
|
@@ -602,9 +619,8 @@ def verify_rpm(
|
|
|
"""
|
|
|
if not nogpgcheck:
|
|
|
with tempfile.TemporaryDirectory() as rpmdb_dir:
|
|
|
- for key in keys:
|
|
|
- subprocess.check_call(
|
|
|
- ['rpmkeys', '--dbpath=' + rpmdb_dir, '--import', key])
|
|
|
+ subprocess.check_call(
|
|
|
+ ['rpmkeys', '--dbpath=' + rpmdb_dir, '--import', key])
|
|
|
try:
|
|
|
output = subprocess.check_output(
|
|
|
['rpmkeys', '--dbpath=' + rpmdb_dir, '--checksig', path])
|
|
@@ -834,7 +850,7 @@ def install(
|
|
|
:param override_existing: Whether to override existing packages. Used for
|
|
|
reinstall, upgrade, and downgrade operations
|
|
|
"""
|
|
|
- keys = get_keys(args.keyring)
|
|
|
+ keys = get_keys_for_repos(args.repo_files, args.releasever)
|
|
|
|
|
|
unverified_rpm_list = [] # rpmfile, reponame
|
|
|
verified_rpm_list = []
|
|
@@ -847,7 +863,10 @@ def install(
|
|
|
else:
|
|
|
path = rpmfile
|
|
|
|
|
|
- package_hdr = verify_rpm(path, keys, args.nogpgcheck)
|
|
|
+ repo_key = keys.get(reponame)
|
|
|
+ if repo_key is None:
|
|
|
+ repo_key = args.keyring
|
|
|
+ package_hdr = verify_rpm(path, repo_key, args.nogpgcheck)
|
|
|
if not package_hdr:
|
|
|
parser.error('Package \'%s\' verification failed.' % rpmfile)
|
|
|
|