qvm-template: call rpmkeys --checksig for signature verification

RPM API is confusing and it's easy to get it wrong when verifying
package signatures.
Call 'rpmkeys --checksig' which is more rebust here - RPM authors should
know how to use their API.

QubesOS/qubes-issues#2534
This commit is contained in:
Marek Marczykowski-Górecki 2021-01-29 22:47:38 +01:00
parent b500462abb
commit f3f6750a3f
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 91 additions and 123 deletions

View File

@ -29,53 +29,75 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
self.mock_table.stop()
super().tearDown()
def test_000_verify_rpm_success(self):
ts = mock.MagicMock()
@mock.patch('rpm.TransactionSet')
@mock.patch('subprocess.check_call')
@mock.patch('subprocess.check_output')
def test_000_verify_rpm_success(self, mock_proc, mock_call, mock_ts):
# Just return a dict instead of rpm.hdr
hdr = {
rpm.RPMTAG_SIGPGP: 'xxx', # non-empty
rpm.RPMTAG_SIGGPG: 'xxx', # non-empty
}
ts.hdrFromFdno.return_value = hdr
ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts)
ts.hdrFromFdno.assert_called_once()
mock_ts.return_value.hdrFromFdno.return_value = hdr
mock_proc.return_value = b'dummy.rpm: digests signatures OK\n'
ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null',
['/path/to/key'])
mock_call.assert_called_once()
mock_proc.assert_called_once()
self.assertEqual(hdr, ret)
self.assertAllCalled()
def test_001_verify_rpm_nosig_fail(self):
ts = mock.MagicMock()
@mock.patch('rpm.TransactionSet')
@mock.patch('subprocess.check_call')
@mock.patch('subprocess.check_output')
def test_001_verify_rpm_nosig_fail(self, mock_proc, mock_call, mock_ts):
# Just return a dict instead of rpm.hdr
hdr = {
rpm.RPMTAG_SIGPGP: None, # empty
rpm.RPMTAG_SIGGPG: None, # empty
}
ts.hdrFromFdno.return_value = hdr
ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts)
ts.hdrFromFdno.assert_called_once()
self.assertEqual(ret, None)
mock_ts.return_value.hdrFromFdno.return_value = hdr
mock_proc.return_value = b'dummy.rpm: digests OK\n'
with self.assertRaises(Exception) as e:
qubesadmin.tools.qvm_template.verify_rpm('/dev/null',
['/path/to/key'])
mock_call.assert_called_once()
mock_proc.assert_called_once()
self.assertIn('Signature verification failed', e.exception.args[0])
mock_ts.assert_not_called()
self.assertAllCalled()
def test_002_verify_rpm_nosig_success(self):
ts = mock.MagicMock()
@mock.patch('rpm.TransactionSet')
@mock.patch('subprocess.check_call')
@mock.patch('subprocess.check_output')
def test_002_verify_rpm_nosig_success(self, mock_proc, mock_call, mock_ts):
# Just return a dict instead of rpm.hdr
hdr = {
rpm.RPMTAG_SIGPGP: None, # empty
rpm.RPMTAG_SIGGPG: None, # empty
}
ts.hdrFromFdno.return_value = hdr
ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts, True)
ts.hdrFromFdno.assert_called_once()
mock_ts.return_value.hdrFromFdno.return_value = hdr
mock_proc.return_value = b'dummy.rpm: digests OK\n'
ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null',
['/path/to/key'], True)
mock_proc.assert_not_called()
mock_call.assert_not_called()
self.assertEqual(ret, hdr)
self.assertAllCalled()
def test_003_verify_rpm_badsig_fail(self):
ts = mock.MagicMock()
def f(*args):
raise rpm.error('public key not trusted')
ts.hdrFromFdno.side_effect = f
ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts)
ts.hdrFromFdno.assert_called_once()
self.assertEqual(ret, None)
@mock.patch('rpm.TransactionSet')
@mock.patch('subprocess.check_call')
@mock.patch('subprocess.check_output')
def test_003_verify_rpm_badsig_fail(self, mock_proc, mock_call, mock_ts):
mock_proc.side_effect = subprocess.CalledProcessError(1,
['rpmkeys', '--checksig'], b'/dev/null: digests SIGNATURES NOT OK\n')
with self.assertRaises(Exception) as e:
qubesadmin.tools.qvm_template.verify_rpm('/dev/null',
['/path/to/key'])
mock_call.assert_called_once()
mock_proc.assert_called_once()
self.assertIn('Signature verification failed', e.exception.args[0])
mock_ts.assert_not_called()
self.assertAllCalled()
@mock.patch('subprocess.Popen')
@ -144,10 +166,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset')
def test_100_install_local_success(
self,
mock_ts,
mock_verify,
mock_dl_list,
mock_dl,
@ -201,7 +221,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name
args = argparse.Namespace(
templates=[path],
keyring='/usr/share/qubes/repo-templates/keys',
keyring='/tmp',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
yes=False,
@ -217,15 +237,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
mock.call().__enter__(),
mock.call().__exit__(None, None, None)
])
# Keyring created
self.assertEqual(mock_ts.mock_calls, [
mock.call('/usr/share/qubes/repo-templates/keys')
])
# Package verified
self.assertEqual(mock_verify.mock_calls, [
mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'),
False)
])
# Attempt to get download list
selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST
self.assertEqual(mock_dl_list.mock_calls, [
@ -275,10 +286,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset')
def test_101_install_local_postprocargs_success(
self,
mock_ts,
mock_verify,
mock_dl_list,
mock_dl,
@ -332,7 +341,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name
args = argparse.Namespace(
templates=[path],
keyring='/usr/share/qubes/repo-templates/keys',
keyring='/tmp',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
yes=False,
@ -348,15 +357,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
mock.call().__enter__(),
mock.call().__exit__(None, None, None)
])
# Keyring created
self.assertEqual(mock_ts.mock_calls, [
mock.call('/usr/share/qubes/repo-templates/keys')
])
# Package verified
self.assertEqual(mock_verify.mock_calls, [
mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'),
False)
])
# Attempt to get download list
selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST
self.assertEqual(mock_dl_list.mock_calls, [
@ -409,10 +409,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset')
def test_102_install_local_badsig_fail(
self,
mock_ts,
mock_verify,
mock_dl_list,
mock_dl,
@ -432,7 +430,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name
args = argparse.Namespace(
templates=[path],
keyring='/usr/share/qubes/repo-templates/keys',
keyring='/tmp',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
yes=False,
@ -452,15 +450,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
])
# Check error message
self.assertTrue('verification failed' in mock_err.getvalue())
# Keyring created
self.assertEqual(mock_ts.mock_calls, [
mock.call('/usr/share/qubes/repo-templates/keys')
])
# Package verified
self.assertEqual(mock_verify.mock_calls, [
mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'),
False)
])
# Should not be executed:
self.assertEqual(mock_dl_list.mock_calls, [])
self.assertEqual(mock_dl.mock_calls, [])
@ -483,10 +472,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset')
def test_103_install_local_exists_fail(
self,
mock_ts,
mock_verify,
mock_dl_list,
mock_dl,
@ -519,7 +506,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name
args = argparse.Namespace(
templates=[path],
keyring='/usr/share/qubes/repo-templates/keys',
keyring='/tmp',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
yes=False,
@ -537,15 +524,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
])
# Check warning message
self.assertTrue('already installed' in mock_err.getvalue())
# Keyring created
self.assertEqual(mock_ts.mock_calls, [
mock.call('/usr/share/qubes/repo-templates/keys')
])
# Package verified
self.assertEqual(mock_verify.mock_calls, [
mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'),
False)
])
# Attempt to get download list
selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST
self.assertEqual(mock_dl_list.mock_calls, [
@ -576,10 +554,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset')
def test_104_install_local_badpkgname_fail(
self,
mock_ts,
mock_verify,
mock_dl_list,
mock_dl,
@ -609,7 +585,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name
args = argparse.Namespace(
templates=[path],
keyring='/usr/share/qubes/repo-templates/keys',
keyring='/tmp',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
yes=False,
@ -628,15 +604,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
])
# Check error message
self.assertTrue('Illegal package name' in mock_err.getvalue())
# Keyring created
self.assertEqual(mock_ts.mock_calls, [
mock.call('/usr/share/qubes/repo-templates/keys')
])
# Package verified
self.assertEqual(mock_verify.mock_calls, [
mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'),
False)
])
# Should not be executed:
self.assertEqual(mock_dl_list.mock_calls, [])
self.assertEqual(mock_dl.mock_calls, [])
@ -720,10 +687,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset')
def test_106_install_local_badpath_fail(
self,
mock_ts,
mock_verify,
mock_dl_list,
mock_dl,
@ -741,7 +706,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = '/var/tmp/ShOulD-NoT-ExIsT.rpm'
args = argparse.Namespace(
templates=[path],
keyring='/usr/share/qubes/repo-templates/keys',
keyring='/tmp',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
yes=False,
@ -761,10 +726,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
# Check error message
self.assertTrue(f"RPM file '{path}' not found" \
in mock_err.getvalue())
# Keyring created
self.assertEqual(mock_ts.mock_calls, [
mock.call('/usr/share/qubes/repo-templates/keys')
])
# Should not be executed:
self.assertEqual(mock_verify.mock_calls, [])
self.assertEqual(mock_dl_list.mock_calls, [])

View File

@ -20,9 +20,9 @@ import tempfile
import time
import typing
import rpm
import tqdm
import xdg.BaseDirectory
import rpm
import qubesadmin
import qubesadmin.tools
@ -39,6 +39,9 @@ DATE_FMT = '%Y-%m-%d %H:%M:%S'
UPDATEVM = str('global UpdateVM')
class SignatureVerificationError(Exception):
"""Package signature is invalid or missing"""
def qubes_release() -> str:
"""Return the Qubes release."""
if os.path.exists('/usr/share/qubes/marker-vm'):
@ -540,21 +543,18 @@ def qrexec_download(
raise ConnectionError(
"qrexec call 'qubes.TemplateDownload' failed.")
def rpm_transactionset(key_dir: str) -> rpm.transaction.TransactionSet:
"""Create RPM TransactionSet using the keys in the given directory."""
tset = rpm.TransactionSet()
kring = rpm.keyring()
def get_keys(key_dir: str) -> typing.List[str]:
"""List gpg keys"""
keys = []
for name in os.listdir(key_dir):
path = os.path.join(key_dir, name)
if os.path.isfile(path):
with open(path, 'rb') as fd:
kring.addKey(rpm.pubkey(fd.read()))
tset.setKeyring(kring)
return tset
keys.append(path)
return keys
def verify_rpm(
path: str,
transaction_set: rpm.transaction.TransactionSet,
keys: typing.List[str],
nogpgcheck: bool = False
) -> rpm.hdr:
"""Verify the digest and signature of a RPM package and return the package
@ -566,24 +566,29 @@ def verify_rpm(
case.
:param path: Location of the RPM package
:param transaction_set: RPM ``TransactionSet``
:param nogpgcheck: Whether to allow invalid GPG signatures
:return: RPM package header. If verification fails, ``None`` is returned.
:return: RPM package header. If verification fails, raises an exception.
"""
if not nogpgcheck:
with tempfile.TemporaryDirectory() as rpmdb_dir:
for key in keys:
subprocess.check_call(
['rpmkeys', '--dbpath=' + rpmdb_dir, '--import', key])
try:
output = subprocess.check_output(
['rpmkeys', '--dbpath=' + rpmdb_dir, '--checksig', path])
except subprocess.CalledProcessError as e:
raise SignatureVerificationError(
'Signature verification failed: {}'.format(
e.output.decode()))
if not output.endswith(b': digests signatures OK\n'):
raise SignatureVerificationError(
'Signature verification failed: {}'.format(output.decode()))
with open(path, 'rb') as fd:
try:
hdr = transaction_set.hdrFromFdno(fd)
if hdr[rpm.RPMTAG_SIGPGP] is None \
and hdr[rpm.RPMTAG_SIGGPG] is None:
return hdr if nogpgcheck else None
except rpm.error as e:
if str(e) == 'public key not trusted' \
or str(e) == 'public key not available':
# FIXME: This does not work
# Should just tell TransactionSet not to verify sigs
return hdr if nogpgcheck else None
return None
tset = rpm.TransactionSet()
tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES)
hdr = tset.hdrFromFdno(fd)
return hdr
def extract_rpm(name: str, path: str, target: str) -> bool:
@ -772,7 +777,7 @@ def install(
% LOCK_FILE)
try:
transaction_set = rpm_transactionset(args.keyring)
keys = get_keys(args.keyring)
unverified_rpm_list = [] # rpmfile, reponame
verified_rpm_list = []
@ -784,7 +789,7 @@ def install(
else:
path = rpmfile
package_hdr = verify_rpm(path, transaction_set, args.nogpgcheck)
package_hdr = verify_rpm(path, keys, args.nogpgcheck)
if not package_hdr:
parser.error('Package \'%s\' verification failed.' % rpmfile)

View File

@ -13,6 +13,8 @@ RPMTAG_SUMMARY = 9
RPMTAG_URL = 10
RPMTAG_VERSION = 11
RPMVSF_MASK_NOSIGNATURES = 0xc0c00
class error(BaseException):
def __init__(self, msg):
self.msg = msg
@ -21,7 +23,8 @@ class error(BaseException):
return self.msg
class hdr():
pass
def __getitem__(self, key):
pass
class keyring():
def addKey(self, *args):
@ -31,13 +34,12 @@ class pubkey():
pass
class TransactionSet():
def setVSFlags(self, flags):
pass
def setKeyring(self, *args):
pass
class transaction():
class TransactionSet():
def setKeyring(self, *args):
pass
def hdrFromFdno(self, fdno) -> hdr:
return hdr()
def labelCompare(a, b):
# Pretend that we're comparing the versions lexographically in the stub