qvm-template: verify template package signature directly at download

Make the download() function save the package into a temporary space and
move to the target location only after checking the signature. This is
safer option than requiring all callers to explicitly verify the
signature. Also, make the download() function verify if the template
name inside the package matches what was requested.
Especially, make `qvm-template download` action verify the signature
too.

On `qvm-template install` avoid checking the signature again for
downloaded packages, by passing extra argument to the verify_rpm()
function. But still verify signature of packages loaded from disk.
This commit is contained in:
Marek Marczykowski-Górecki 2021-02-18 20:03:50 +01:00
parent ce36dc55c5
commit e424c7df9c
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 384 additions and 130 deletions

View File

@ -1,3 +1,4 @@
import re
from unittest import mock
import argparse
import asyncio
@ -14,6 +15,13 @@ import rpm
import qubesadmin.tests
import qubesadmin.tools.qvm_template
class re_str(str):
def __eq__(self, other):
return bool(re.match(self, other))
def __hash__(self):
return super().__hash__()
class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
def setUp(self):
# Print str(list) directly so that the output is consistent no matter
@ -248,7 +256,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name
args = argparse.Namespace(
templates=[path],
keyring='/tmp',
keyring='/tmp/keyring.gpg',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
repo_files=[],
@ -267,8 +275,9 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
])
# Nothing downloaded
mock_dl.assert_called_with(args, self.app,
path_override='/var/tmp/qvm-template-tmpdir',
dl_list={}, suffix='.unverified', version_selector=selector)
dl_list={}, version_selector=selector)
mock_verify.assert_called_once_with(template_file.name, '/tmp/keyring.gpg',
nogpgcheck=False)
# Package is extracted
mock_extract.assert_called_with('test-vm', path,
'/var/tmp/qvm-template-tmpdir')
@ -375,8 +384,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
])
# Nothing downloaded
mock_dl.assert_called_with(args, self.app,
path_override='/var/tmp/qvm-template-tmpdir',
dl_list={}, suffix='.unverified', version_selector=selector)
dl_list={}, version_selector=selector)
# Package is extracted
mock_extract.assert_called_with('test-vm', path,
'/var/tmp/qvm-template-tmpdir')
@ -520,8 +528,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
])
# Nothing downloaded
self.assertEqual(mock_dl.mock_calls, [
mock.call(args, self.app, path_override='/var/tmp/qvm-template-tmpdir',
dl_list={}, suffix='.unverified', version_selector=selector)
mock.call(args, self.app,
dl_list={}, version_selector=selector)
])
# Should not be executed:
self.assertEqual(mock_extract.mock_calls, [])
@ -644,6 +652,119 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
self.assertEqual(mock_rename.mock_calls, [])
self.assertAllCalled()
@mock.patch('os.rename')
@mock.patch('os.makedirs')
@mock.patch('subprocess.check_call')
@mock.patch('qubesadmin.tools.qvm_template.confirm_action')
@mock.patch('qubesadmin.tools.qvm_template.extract_rpm')
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
def test_107_install_download_success(
self,
mock_verify,
mock_dl_list,
mock_dl,
mock_extract,
mock_confirm,
mock_call,
mock_mkdirs,
mock_rename):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = b'0\0'
build_time = '2020-09-01 14:30:00' # 1598970600
install_time = '2020-09-01 15:30:00'
for key, val in [
('name', 'test-vm'),
('epoch', '2'),
('version', '4.1'),
('release', '2020'),
('reponame', 'qubes-templates-itl'),
('buildtime', build_time),
('installtime', install_time),
('license', 'GPL'),
('url', 'https://qubes-os.org'),
('summary', 'Summary'),
('description', 'Desc|desc')]:
self.app.expected_calls[(
'test-vm',
'admin.vm.feature.Set',
f'template-{key}',
val.encode())] = b'0\0'
mock_dl.return_value = {'test-vm': {
rpm.RPMTAG_NAME : 'qubes-template-test-vm',
rpm.RPMTAG_BUILDTIME : 1598970600,
rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc',
rpm.RPMTAG_EPOCHNUM : 2,
rpm.RPMTAG_LICENSE : 'GPL',
rpm.RPMTAG_RELEASE : '2020',
rpm.RPMTAG_SUMMARY : 'Summary',
rpm.RPMTAG_URL : 'https://qubes-os.org',
rpm.RPMTAG_VERSION : '4.1'
}}
dl_list = {
'test-vm': qubesadmin.tools.qvm_template.DlEntry(
('1', '4.1', '20200101'), 'qubes-templates-itl', 1048576)
}
mock_dl_list.return_value = dl_list
mock_call.side_effect = self.add_new_vm_side_effect
mock_time = mock.Mock(wraps=datetime.datetime)
mock_time.now.return_value = \
datetime.datetime(2020, 9, 1, 15, 30, tzinfo=datetime.timezone.utc)
with mock.patch('qubesadmin.tools.qvm_template.LOCK_FILE', '/tmp/test.lock'), \
mock.patch('datetime.datetime', new=mock_time), \
mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \
mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
args = argparse.Namespace(
templates='test-vm',
keyring='/tmp/keyring.gpg',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
repo_files=[],
releasever='4.1',
yes=False,
keep_cache=True,
allow_pv=False,
pool=None
)
mock_tmpdir.return_value.__enter__.return_value = \
'/var/tmp/qvm-template-tmpdir'
qubesadmin.tools.qvm_template.install(args, self.app)
# Attempt to get download list
selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST
self.assertEqual(mock_dl_list.mock_calls, [
mock.call(args, self.app, version_selector=selector)
])
# Nothing downloaded
mock_dl.assert_called_with(args, self.app,
dl_list=dl_list, version_selector=selector)
# download already verify the package internally
self.assertEqual(mock_verify.mock_calls, [])
# Package is extracted
mock_extract.assert_called_with('test-vm',
'/var/cache/qvm-template/qubes-template-test-vm-1:4.1-20200101.rpm',
'/var/tmp/qvm-template-tmpdir')
# No packages overwritten, so no confirm needed
self.assertEqual(mock_confirm.mock_calls, [])
# qvm-template-postprocess is called
self.assertEqual(mock_call.mock_calls, [
mock.call([
'qvm-template-postprocess',
'--really',
'--no-installed-by-rpm',
'post-install',
'test-vm',
'/var/tmp/qvm-template-tmpdir'
'/var/lib/qubes/vm-templates/test-vm'
])
])
# Cache directory created
self.assertEqual(mock_mkdirs.mock_calls, [
mock.call(args.cachedir, exist_ok=True)
])
# No templates downloaded, thus no renames needed
self.assertEqual(mock_rename.mock_calls, [])
self.assertAllCalled()
def test_110_qrexec_payload_refresh_success(self):
with tempfile.NamedTemporaryFile() as repo_conf1, \
tempfile.NamedTemporaryFile() as repo_conf2:
@ -3188,11 +3309,27 @@ test-vm : Qubes template for fedora-31
])
self.assertAllCalled()
def _mock_qrexec_download(self, args, app, spec, path,
dlsize=None, refresh=False):
self.assertFalse(os.path.exists(path),
'{} should not exist before'.format(path))
# just create an empty file
with open(path, 'wb') as f:
if f is not None:
f.truncate(dlsize)
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_180_download_success(self, mock_qrexec, mock_dllist):
def test_180_download_success(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
retries=1
)
qubesadmin.tools.qvm_template.download(args, self.app, dir, {
@ -3202,25 +3339,36 @@ test-vm : Qubes template for fedora-31
('0', '1', '2'),
'qubes-templates-itl-testing',
2048576)
}, '.unverified')
})
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576),
mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2',
dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified',
re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
2048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
self.assertEqual(mock_verify_rpm.mock_calls, [
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-31'),
mock.call(re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-32'),
])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist):
def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
@ -3230,28 +3378,39 @@ test-vm : Qubes template for fedora-31
})
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertEqual(mock_verify_rpm.mock_calls, [
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-31'),
])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist):
def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
mock_dllist.return_value = {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
}
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=1
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
)
qubesadmin.tools.qvm_template.download(args, self.app,
dir, None, '.unverified',
dir, None,
qubesadmin.tools.qvm_template.VersionSelector.LATEST_LOWER)
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [
@ -3260,40 +3419,54 @@ test-vm : Qubes template for fedora-31
qubesadmin.tools.qvm_template.\
VersionSelector.LATEST_LOWER)
])
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
self.assertEqual(mock_verify_rpm.mock_calls, [
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-31'),
])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist):
def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
qubesadmin.tools.qvm_template.download(args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
}, '.unverified')
})
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_184_download_success_exists(self, mock_qrexec, mock_dllist):
def test_184_download_success_exists(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm.unverified'),
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
'w') as _:
pass
args = argparse.Namespace(
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
@ -3304,47 +3477,22 @@ test-vm : Qubes template for fedora-31
('0', '1', '2'),
'qubes-templates-itl-testing',
2048576)
}, '.unverified')
})
self.assertTrue('already exists, skipping'
in mock_err.getvalue())
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2',
dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified',
re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
2048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist):
with tempfile.TemporaryDirectory() as dir:
with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
'w') as _:
pass
args = argparse.Namespace(
retries=1,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
qubesadmin.tools.qvm_template.download(args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
}, '.unverified')
self.assertTrue('already exists, skipping'
in mock_err.getvalue())
self.assertEqual(mock_qrexec.mock_calls, [])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(os.path.exists(
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified'))
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist):
def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
@ -3352,6 +3500,10 @@ test-vm : Qubes template for fedora-31
pass
args = argparse.Namespace(
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
@ -3366,19 +3518,57 @@ test-vm : Qubes template for fedora-31
self.assertTrue(os.path.exists(
dir + '/qubes-template-fedora-31-1:2-3.rpm'))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_187_download_success_retry(self, mock_qrexec, mock_dllist):
def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
'w') as _:
pass
args = argparse.Namespace(
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
qubesadmin.tools.qvm_template.download(args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
})
self.assertTrue('already exists, skipping'
in mock_err.getvalue())
self.assertEqual(mock_qrexec.mock_calls, [])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(os.path.exists(
dir + '/qubes-template-fedora-31-1:2-3.rpm'))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_187_download_success_retry(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
counter = 0
def f(*args):
def f(*args, **kwargs):
nonlocal counter
counter += 1
if counter == 1:
raise ConnectionError
self._mock_qrexec_download(*args, **kwargs)
mock_qrexec.side_effect = f
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=2,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
@ -3389,31 +3579,39 @@ test-vm : Qubes template for fedora-31
})
self.assertTrue('retrying...' in mock_err.getvalue())
self.assertEqual(mock_rm.mock_calls, [
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'))
])
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576),
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_188_download_fail_retry(self, mock_qrexec, mock_dllist):
def test_188_download_fail_retry(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
counter = 0
def f(*args):
def f(*args, **kwargs):
nonlocal counter
counter += 1
if counter <= 3:
raise ConnectionError
self._mock_qrexec_download(*args, **kwargs)
mock_qrexec.side_effect = f
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=3,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
@ -3427,32 +3625,38 @@ test-vm : Qubes template for fedora-31
self.assertEqual(mock_err.getvalue().count('retrying...'), 2)
self.assertTrue('download failed' in mock_err.getvalue())
self.assertEqual(mock_rm.mock_calls, [
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'),
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'),
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')),
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')),
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'))
])
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576),
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576),
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist):
def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
def f(*args):
raise RuntimeError
mock_qrexec.side_effect = f
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=3,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
@ -3463,12 +3667,45 @@ test-vm : Qubes template for fedora-31
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
})
self.assertEqual(mock_rm.mock_calls, [
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
])
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_189_download_verify_fail(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
mock_verify_rpm.side_effect = \
qubesadmin.tools.qvm_template.SignatureVerificationError
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=3,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with self.assertRaises(qubesadmin.tools.qvm_template.SignatureVerificationError):
qubesadmin.tools.qvm_template.download(
args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
})
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertEqual(os.listdir(dir), [])
# TODO: test unexpected name downloaded
# TODO: test truncated download
# TODO: test no disk space

View File

@ -146,8 +146,6 @@ def get_parser() -> argparse.ArgumentParser:
help_str='Upgrade template packages.')
for parser_x in [parser_install, parser_reinstall,
parser_downgrade, parser_upgrade]:
parser_x.add_argument('--nogpgcheck', action='store_true',
help='Disable signature checks.')
parser_x.add_argument('--allow-pv', action='store_true',
help='Allow templates that set virt_mode to pv.')
parser_x.add_argument('templates', nargs='*', metavar='TEMPLATESPEC')
@ -160,6 +158,8 @@ def get_parser() -> argparse.ArgumentParser:
help='Specify download directory.')
parser_x.add_argument('--retries', default=5, type=int,
help='Specify maximum number of retries for downloads.')
parser_x.add_argument('--nogpgcheck', action='store_true',
help='Disable signature checks.')
parser_download.add_argument('templates', nargs='*',
metavar='TEMPLATESPEC')
# qvm-template {list,info}
@ -602,7 +602,8 @@ def get_keys_for_repos(repo_files: typing.List[str],
def verify_rpm(
path: str,
key: str,
nogpgcheck: bool = False
nogpgcheck: bool = False,
template_name: typing.Optional[str] = None
) -> rpm.hdr:
"""Verify the digest and signature of a RPM package and return the package
header.
@ -614,6 +615,8 @@ def verify_rpm(
:param path: Location of the RPM package
:param nogpgcheck: Whether to allow invalid GPG signatures
:param template_name: expected template name - if specified, verifies if
the package name matches expected template name
:return: RPM package header. If verification fails, raises an exception.
"""
@ -635,6 +638,10 @@ def verify_rpm(
tset = rpm.TransactionSet()
tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES)
hdr = tset.hdrFromFdno(fd)
if template_name is not None:
if hdr[rpm.RPMTAG_NAME] != PACKAGE_NAME_PREFIX + template_name:
raise SignatureVerificationError(
'Downloaded package does not match expected template name')
return hdr
def extract_rpm(name: str, path: str, target: str) -> bool:
@ -759,8 +766,8 @@ def download(
app: qubesadmin.app.QubesBase,
path_override: typing.Optional[str] = None,
dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None,
suffix: str = '',
version_selector: VersionSelector = VersionSelector.LATEST) -> None:
version_selector: VersionSelector = VersionSelector.LATEST) \
-> typing.Dict[str, rpm.hdr]:
"""Command that downloads template packages.
:param args: Arguments received by the application.
@ -770,50 +777,65 @@ def download(
:param dl_list: Override list of templates to download. If not set or set
to None, ``get_dl_list`` is called, which generates the list from
``args``. Optional
:param suffix: Suffix to add to the file name of downloaded packages. This
is useful if you want to distinguish between verified and unverified
packages. Defaults to an empty string
:param version_selector: Specify algorithm to select the candidate version
of a package. Defaults to ``VersionSelector.LATEST``
:return package headers of downloaded templates
"""
if dl_list is None:
dl_list = get_dl_list(args, app, version_selector=version_selector)
keys = get_keys_for_repos(args.repo_files, args.releasever)
package_hdrs = {}
path = path_override if path_override is not None else args.downloaddir
for name, entry in dl_list.items():
version_str = build_version_str(entry.evr)
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
target = os.path.join(path, '%s.rpm' % spec)
target_suffix = target + suffix
if os.path.exists(target_suffix):
print('\'%s\' already exists, skipping...' % target,
file=sys.stderr)
elif os.path.exists(target):
print('\'%s\' already exists, skipping...' % target,
file=sys.stderr)
os.rename(target, target_suffix)
else:
with tempfile.TemporaryDirectory(dir=path) as dl_dir:
for name, entry in dl_list.items():
version_str = build_version_str(entry.evr)
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
target = os.path.join(path, '%s.rpm' % spec)
target_temp = os.path.join(dl_dir, '%s.rpm.UNTRUSTED' % spec)
repo_key = keys.get(entry.reponame)
if repo_key is None:
repo_key = args.keyring
if os.path.exists(target):
print('\'%s\' already exists, skipping...' % target,
file=sys.stderr)
# but still verify the package
verify_rpm(target, repo_key, template_name=name)
continue
print('Downloading \'%s\'...' % spec, file=sys.stderr)
done = False
for attempt in range(args.retries):
try:
qrexec_download(args, app, spec, target_suffix,
qrexec_download(args, app, spec, target_temp,
entry.dlsize)
size = os.path.getsize(target_temp)
if size != entry.dlsize:
raise ConnectionError(
'Downloaded file is {} bytes, expected {}'.format(
size, entry.dlsize))
done = True
break
except ConnectionError:
os.remove(target_suffix)
os.remove(target_temp)
if attempt + 1 < args.retries:
print('\'%s\' download failed, retrying...' % spec,
file=sys.stderr)
except:
# Also remove file if interrupted by other means
os.remove(target_suffix)
raise
if not done:
print('Error: \'%s\' download failed.' % spec, file=sys.stderr)
sys.exit(1)
if args.nogpgcheck:
print(
'Warning: --nogpgcheck is ignored for downloaded templates',
file=sys.stderr)
package_hdr = verify_rpm(target_temp, repo_key, template_name=name)
# after package is verified, rename to the target location
os.rename(target_temp, target)
package_hdrs[name] = package_hdr
return package_hdrs
def locked(func):
"""Execute given function under a lock in *LOCK_FILE*"""
@functools.wraps(func)
@ -854,21 +876,20 @@ def install(
unverified_rpm_list = [] # rpmfile, reponame
verified_rpm_list = []
def verify(rpmfile, reponame, dl_dir=None):
def verify(rpmfile, reponame, package_hdr=None):
"""Verify package signature and version, remove "unverified"
suffix, and parse package header."""
if dl_dir:
path = os.path.join(
dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX)
else:
path = rpmfile
suffix, and parse package header.
repo_key = keys.get(reponame)
if repo_key is None:
repo_key = args.keyring
package_hdr = verify_rpm(path, repo_key, args.nogpgcheck)
if not package_hdr:
parser.error('Package \'%s\' verification failed.' % rpmfile)
If package_hdr is provided, the signature check is skipped and only
other checks are performed."""
if package_hdr is None:
repo_key = keys.get(reponame)
if repo_key is None:
repo_key = args.keyring
package_hdr = verify_rpm(rpmfile, repo_key,
nogpgcheck=args.nogpgcheck)
if not package_hdr:
parser.error('Package \'%s\' verification failed.' % rpmfile)
package_name = package_hdr[rpm.RPMTAG_NAME]
if not package_name.startswith(PACKAGE_NAME_PREFIX):
@ -877,9 +898,6 @@ def install(
# Remove prefix to get the real template name
name = package_name[len(PACKAGE_NAME_PREFIX):]
if path != rpmfile:
os.rename(path, rpmfile)
# Check if already installed
if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...'
@ -927,7 +945,7 @@ def install(
# First verify local RPMs and extract header
for rpmfile, reponame in unverified_rpm_list:
verify(rpmfile, reponame)
unverified_rpm_list = []
unverified_rpm_list = {}
os.makedirs(args.cachedir, exist_ok=True)
@ -951,7 +969,7 @@ def install(
version_str = build_version_str(entry.evr)
target_file = \
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
unverified_rpm_list.append(
unverified_rpm_list[name] = (
(os.path.join(args.cachedir, target_file), entry.reponame))
dl_list = dl_list_copy
@ -969,15 +987,14 @@ def install(
'This will override changes made in the following VMs:',
override_tpls)
with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir:
download(args, app, path_override=dl_dir,
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
version_selector=version_selector)
package_hdrs = download(args, app,
dl_list=dl_list,
version_selector=version_selector)
# Verify downloaded templates
for rpmfile, reponame in unverified_rpm_list:
verify(rpmfile, reponame, dl_dir=dl_dir)
unverified_rpm_list = []
# Verify downloaded templates
for name, (rpmfile, reponame) in unverified_rpm_list.items():
verify(rpmfile, reponame, package_hdrs[name])
del unverified_rpm_list
# Unpack and install
for rpmfile, reponame, name, package_hdr in verified_rpm_list: