From f3f6750a3ff8b114b2a598dc7b76354b1daa1b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Fri, 29 Jan 2021 22:47:38 +0100 Subject: [PATCH] qvm-template: call rpmkeys --checksig for signature verification RPM API is confusing and it's easy to get it wrong when verifying package signatures. Call 'rpmkeys --checksig' which is more rebust here - RPM authors should know how to use their API. QubesOS/qubes-issues#2534 --- qubesadmin/tests/tools/qvm_template.py | 143 +++++++++---------------- qubesadmin/tools/qvm_template.py | 57 +++++----- test-packages/rpm.py | 14 +-- 3 files changed, 91 insertions(+), 123 deletions(-) diff --git a/qubesadmin/tests/tools/qvm_template.py b/qubesadmin/tests/tools/qvm_template.py index 8638f8e..a150c1b 100644 --- a/qubesadmin/tests/tools/qvm_template.py +++ b/qubesadmin/tests/tools/qvm_template.py @@ -29,53 +29,75 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): self.mock_table.stop() super().tearDown() - def test_000_verify_rpm_success(self): - ts = mock.MagicMock() + @mock.patch('rpm.TransactionSet') + @mock.patch('subprocess.check_call') + @mock.patch('subprocess.check_output') + def test_000_verify_rpm_success(self, mock_proc, mock_call, mock_ts): # Just return a dict instead of rpm.hdr hdr = { rpm.RPMTAG_SIGPGP: 'xxx', # non-empty rpm.RPMTAG_SIGGPG: 'xxx', # non-empty } - ts.hdrFromFdno.return_value = hdr - ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts) - ts.hdrFromFdno.assert_called_once() + mock_ts.return_value.hdrFromFdno.return_value = hdr + mock_proc.return_value = b'dummy.rpm: digests signatures OK\n' + ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', + ['/path/to/key']) + mock_call.assert_called_once() + mock_proc.assert_called_once() self.assertEqual(hdr, ret) self.assertAllCalled() - def test_001_verify_rpm_nosig_fail(self): - ts = mock.MagicMock() + @mock.patch('rpm.TransactionSet') + @mock.patch('subprocess.check_call') + @mock.patch('subprocess.check_output') + def test_001_verify_rpm_nosig_fail(self, mock_proc, mock_call, mock_ts): # Just return a dict instead of rpm.hdr hdr = { rpm.RPMTAG_SIGPGP: None, # empty rpm.RPMTAG_SIGGPG: None, # empty } - ts.hdrFromFdno.return_value = hdr - ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts) - ts.hdrFromFdno.assert_called_once() - self.assertEqual(ret, None) + mock_ts.return_value.hdrFromFdno.return_value = hdr + mock_proc.return_value = b'dummy.rpm: digests OK\n' + with self.assertRaises(Exception) as e: + qubesadmin.tools.qvm_template.verify_rpm('/dev/null', + ['/path/to/key']) + mock_call.assert_called_once() + mock_proc.assert_called_once() + self.assertIn('Signature verification failed', e.exception.args[0]) + mock_ts.assert_not_called() self.assertAllCalled() - def test_002_verify_rpm_nosig_success(self): - ts = mock.MagicMock() + @mock.patch('rpm.TransactionSet') + @mock.patch('subprocess.check_call') + @mock.patch('subprocess.check_output') + def test_002_verify_rpm_nosig_success(self, mock_proc, mock_call, mock_ts): # Just return a dict instead of rpm.hdr hdr = { rpm.RPMTAG_SIGPGP: None, # empty rpm.RPMTAG_SIGGPG: None, # empty } - ts.hdrFromFdno.return_value = hdr - ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts, True) - ts.hdrFromFdno.assert_called_once() + mock_ts.return_value.hdrFromFdno.return_value = hdr + mock_proc.return_value = b'dummy.rpm: digests OK\n' + ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', + ['/path/to/key'], True) + mock_proc.assert_not_called() + mock_call.assert_not_called() self.assertEqual(ret, hdr) self.assertAllCalled() - def test_003_verify_rpm_badsig_fail(self): - ts = mock.MagicMock() - def f(*args): - raise rpm.error('public key not trusted') - ts.hdrFromFdno.side_effect = f - ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts) - ts.hdrFromFdno.assert_called_once() - self.assertEqual(ret, None) + @mock.patch('rpm.TransactionSet') + @mock.patch('subprocess.check_call') + @mock.patch('subprocess.check_output') + def test_003_verify_rpm_badsig_fail(self, mock_proc, mock_call, mock_ts): + mock_proc.side_effect = subprocess.CalledProcessError(1, + ['rpmkeys', '--checksig'], b'/dev/null: digests SIGNATURES NOT OK\n') + with self.assertRaises(Exception) as e: + qubesadmin.tools.qvm_template.verify_rpm('/dev/null', + ['/path/to/key']) + mock_call.assert_called_once() + mock_proc.assert_called_once() + self.assertIn('Signature verification failed', e.exception.args[0]) + mock_ts.assert_not_called() self.assertAllCalled() @mock.patch('subprocess.Popen') @@ -144,10 +166,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') - @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_100_install_local_success( self, - mock_ts, mock_verify, mock_dl_list, mock_dl, @@ -201,7 +221,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = template_file.name args = argparse.Namespace( templates=[path], - keyring='/usr/share/qubes/repo-templates/keys', + keyring='/tmp', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, @@ -217,15 +237,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) - # Keyring created - self.assertEqual(mock_ts.mock_calls, [ - mock.call('/usr/share/qubes/repo-templates/keys') - ]) - # Package verified - self.assertEqual(mock_verify.mock_calls, [ - mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), - False) - ]) # Attempt to get download list selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST self.assertEqual(mock_dl_list.mock_calls, [ @@ -275,10 +286,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') - @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_101_install_local_postprocargs_success( self, - mock_ts, mock_verify, mock_dl_list, mock_dl, @@ -332,7 +341,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = template_file.name args = argparse.Namespace( templates=[path], - keyring='/usr/share/qubes/repo-templates/keys', + keyring='/tmp', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, @@ -348,15 +357,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) - # Keyring created - self.assertEqual(mock_ts.mock_calls, [ - mock.call('/usr/share/qubes/repo-templates/keys') - ]) - # Package verified - self.assertEqual(mock_verify.mock_calls, [ - mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), - False) - ]) # Attempt to get download list selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST self.assertEqual(mock_dl_list.mock_calls, [ @@ -409,10 +409,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') - @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_102_install_local_badsig_fail( self, - mock_ts, mock_verify, mock_dl_list, mock_dl, @@ -432,7 +430,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = template_file.name args = argparse.Namespace( templates=[path], - keyring='/usr/share/qubes/repo-templates/keys', + keyring='/tmp', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, @@ -452,15 +450,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): ]) # Check error message self.assertTrue('verification failed' in mock_err.getvalue()) - # Keyring created - self.assertEqual(mock_ts.mock_calls, [ - mock.call('/usr/share/qubes/repo-templates/keys') - ]) - # Package verified - self.assertEqual(mock_verify.mock_calls, [ - mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), - False) - ]) # Should not be executed: self.assertEqual(mock_dl_list.mock_calls, []) self.assertEqual(mock_dl.mock_calls, []) @@ -483,10 +472,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') - @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_103_install_local_exists_fail( self, - mock_ts, mock_verify, mock_dl_list, mock_dl, @@ -519,7 +506,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = template_file.name args = argparse.Namespace( templates=[path], - keyring='/usr/share/qubes/repo-templates/keys', + keyring='/tmp', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, @@ -537,15 +524,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): ]) # Check warning message self.assertTrue('already installed' in mock_err.getvalue()) - # Keyring created - self.assertEqual(mock_ts.mock_calls, [ - mock.call('/usr/share/qubes/repo-templates/keys') - ]) - # Package verified - self.assertEqual(mock_verify.mock_calls, [ - mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), - False) - ]) # Attempt to get download list selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST self.assertEqual(mock_dl_list.mock_calls, [ @@ -576,10 +554,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') - @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_104_install_local_badpkgname_fail( self, - mock_ts, mock_verify, mock_dl_list, mock_dl, @@ -609,7 +585,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = template_file.name args = argparse.Namespace( templates=[path], - keyring='/usr/share/qubes/repo-templates/keys', + keyring='/tmp', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, @@ -628,15 +604,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): ]) # Check error message self.assertTrue('Illegal package name' in mock_err.getvalue()) - # Keyring created - self.assertEqual(mock_ts.mock_calls, [ - mock.call('/usr/share/qubes/repo-templates/keys') - ]) - # Package verified - self.assertEqual(mock_verify.mock_calls, [ - mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), - False) - ]) # Should not be executed: self.assertEqual(mock_dl_list.mock_calls, []) self.assertEqual(mock_dl.mock_calls, []) @@ -720,10 +687,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') - @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_106_install_local_badpath_fail( self, - mock_ts, mock_verify, mock_dl_list, mock_dl, @@ -741,7 +706,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = '/var/tmp/ShOulD-NoT-ExIsT.rpm' args = argparse.Namespace( templates=[path], - keyring='/usr/share/qubes/repo-templates/keys', + keyring='/tmp', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, @@ -761,10 +726,6 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): # Check error message self.assertTrue(f"RPM file '{path}' not found" \ in mock_err.getvalue()) - # Keyring created - self.assertEqual(mock_ts.mock_calls, [ - mock.call('/usr/share/qubes/repo-templates/keys') - ]) # Should not be executed: self.assertEqual(mock_verify.mock_calls, []) self.assertEqual(mock_dl_list.mock_calls, []) diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index 05f74f9..aef780b 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -20,9 +20,9 @@ import tempfile import time import typing -import rpm import tqdm import xdg.BaseDirectory +import rpm import qubesadmin import qubesadmin.tools @@ -39,6 +39,9 @@ DATE_FMT = '%Y-%m-%d %H:%M:%S' UPDATEVM = str('global UpdateVM') +class SignatureVerificationError(Exception): + """Package signature is invalid or missing""" + def qubes_release() -> str: """Return the Qubes release.""" if os.path.exists('/usr/share/qubes/marker-vm'): @@ -540,21 +543,18 @@ def qrexec_download( raise ConnectionError( "qrexec call 'qubes.TemplateDownload' failed.") -def rpm_transactionset(key_dir: str) -> rpm.transaction.TransactionSet: - """Create RPM TransactionSet using the keys in the given directory.""" - tset = rpm.TransactionSet() - kring = rpm.keyring() +def get_keys(key_dir: str) -> typing.List[str]: + """List gpg keys""" + keys = [] for name in os.listdir(key_dir): path = os.path.join(key_dir, name) if os.path.isfile(path): - with open(path, 'rb') as fd: - kring.addKey(rpm.pubkey(fd.read())) - tset.setKeyring(kring) - return tset + keys.append(path) + return keys def verify_rpm( path: str, - transaction_set: rpm.transaction.TransactionSet, + keys: typing.List[str], nogpgcheck: bool = False ) -> rpm.hdr: """Verify the digest and signature of a RPM package and return the package @@ -566,24 +566,29 @@ def verify_rpm( case. :param path: Location of the RPM package - :param transaction_set: RPM ``TransactionSet`` :param nogpgcheck: Whether to allow invalid GPG signatures - :return: RPM package header. If verification fails, ``None`` is returned. + :return: RPM package header. If verification fails, raises an exception. """ + if not nogpgcheck: + with tempfile.TemporaryDirectory() as rpmdb_dir: + for key in keys: + subprocess.check_call( + ['rpmkeys', '--dbpath=' + rpmdb_dir, '--import', key]) + try: + output = subprocess.check_output( + ['rpmkeys', '--dbpath=' + rpmdb_dir, '--checksig', path]) + except subprocess.CalledProcessError as e: + raise SignatureVerificationError( + 'Signature verification failed: {}'.format( + e.output.decode())) + if not output.endswith(b': digests signatures OK\n'): + raise SignatureVerificationError( + 'Signature verification failed: {}'.format(output.decode())) with open(path, 'rb') as fd: - try: - hdr = transaction_set.hdrFromFdno(fd) - if hdr[rpm.RPMTAG_SIGPGP] is None \ - and hdr[rpm.RPMTAG_SIGGPG] is None: - return hdr if nogpgcheck else None - except rpm.error as e: - if str(e) == 'public key not trusted' \ - or str(e) == 'public key not available': - # FIXME: This does not work - # Should just tell TransactionSet not to verify sigs - return hdr if nogpgcheck else None - return None + tset = rpm.TransactionSet() + tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES) + hdr = tset.hdrFromFdno(fd) return hdr def extract_rpm(name: str, path: str, target: str) -> bool: @@ -772,7 +777,7 @@ def install( % LOCK_FILE) try: - transaction_set = rpm_transactionset(args.keyring) + keys = get_keys(args.keyring) unverified_rpm_list = [] # rpmfile, reponame verified_rpm_list = [] @@ -784,7 +789,7 @@ def install( else: path = rpmfile - package_hdr = verify_rpm(path, transaction_set, args.nogpgcheck) + package_hdr = verify_rpm(path, keys, args.nogpgcheck) if not package_hdr: parser.error('Package \'%s\' verification failed.' % rpmfile) diff --git a/test-packages/rpm.py b/test-packages/rpm.py index c167adb..37d0730 100644 --- a/test-packages/rpm.py +++ b/test-packages/rpm.py @@ -13,6 +13,8 @@ RPMTAG_SUMMARY = 9 RPMTAG_URL = 10 RPMTAG_VERSION = 11 +RPMVSF_MASK_NOSIGNATURES = 0xc0c00 + class error(BaseException): def __init__(self, msg): self.msg = msg @@ -21,7 +23,8 @@ class error(BaseException): return self.msg class hdr(): - pass + def __getitem__(self, key): + pass class keyring(): def addKey(self, *args): @@ -31,13 +34,12 @@ class pubkey(): pass class TransactionSet(): + def setVSFlags(self, flags): + pass def setKeyring(self, *args): pass - -class transaction(): - class TransactionSet(): - def setKeyring(self, *args): - pass + def hdrFromFdno(self, fdno) -> hdr: + return hdr() def labelCompare(a, b): # Pretend that we're comparing the versions lexographically in the stub