qvm-template: verify template package signature directly at download

Make the download() function save the package into a temporary space and
move to the target location only after checking the signature. This is
safer option than requiring all callers to explicitly verify the
signature. Also, make the download() function verify if the template
name inside the package matches what was requested.
Especially, make `qvm-template download` action verify the signature
too.

On `qvm-template install` avoid checking the signature again for
downloaded packages, by passing extra argument to the verify_rpm()
function. But still verify signature of packages loaded from disk.
This commit is contained in:
Marek Marczykowski-Górecki 2021-02-18 20:03:50 +01:00
parent ce36dc55c5
commit e424c7df9c
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 384 additions and 130 deletions

View File

@ -1,3 +1,4 @@
import re
from unittest import mock from unittest import mock
import argparse import argparse
import asyncio import asyncio
@ -14,6 +15,13 @@ import rpm
import qubesadmin.tests import qubesadmin.tests
import qubesadmin.tools.qvm_template import qubesadmin.tools.qvm_template
class re_str(str):
def __eq__(self, other):
return bool(re.match(self, other))
def __hash__(self):
return super().__hash__()
class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
def setUp(self): def setUp(self):
# Print str(list) directly so that the output is consistent no matter # Print str(list) directly so that the output is consistent no matter
@ -248,7 +256,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
path = template_file.name path = template_file.name
args = argparse.Namespace( args = argparse.Namespace(
templates=[path], templates=[path],
keyring='/tmp', keyring='/tmp/keyring.gpg',
nogpgcheck=False, nogpgcheck=False,
cachedir='/var/cache/qvm-template', cachedir='/var/cache/qvm-template',
repo_files=[], repo_files=[],
@ -267,8 +275,9 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
]) ])
# Nothing downloaded # Nothing downloaded
mock_dl.assert_called_with(args, self.app, mock_dl.assert_called_with(args, self.app,
path_override='/var/tmp/qvm-template-tmpdir', dl_list={}, version_selector=selector)
dl_list={}, suffix='.unverified', version_selector=selector) mock_verify.assert_called_once_with(template_file.name, '/tmp/keyring.gpg',
nogpgcheck=False)
# Package is extracted # Package is extracted
mock_extract.assert_called_with('test-vm', path, mock_extract.assert_called_with('test-vm', path,
'/var/tmp/qvm-template-tmpdir') '/var/tmp/qvm-template-tmpdir')
@ -375,8 +384,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
]) ])
# Nothing downloaded # Nothing downloaded
mock_dl.assert_called_with(args, self.app, mock_dl.assert_called_with(args, self.app,
path_override='/var/tmp/qvm-template-tmpdir', dl_list={}, version_selector=selector)
dl_list={}, suffix='.unverified', version_selector=selector)
# Package is extracted # Package is extracted
mock_extract.assert_called_with('test-vm', path, mock_extract.assert_called_with('test-vm', path,
'/var/tmp/qvm-template-tmpdir') '/var/tmp/qvm-template-tmpdir')
@ -520,8 +528,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
]) ])
# Nothing downloaded # Nothing downloaded
self.assertEqual(mock_dl.mock_calls, [ self.assertEqual(mock_dl.mock_calls, [
mock.call(args, self.app, path_override='/var/tmp/qvm-template-tmpdir', mock.call(args, self.app,
dl_list={}, suffix='.unverified', version_selector=selector) dl_list={}, version_selector=selector)
]) ])
# Should not be executed: # Should not be executed:
self.assertEqual(mock_extract.mock_calls, []) self.assertEqual(mock_extract.mock_calls, [])
@ -644,6 +652,119 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
self.assertEqual(mock_rename.mock_calls, []) self.assertEqual(mock_rename.mock_calls, [])
self.assertAllCalled() self.assertAllCalled()
@mock.patch('os.rename')
@mock.patch('os.makedirs')
@mock.patch('subprocess.check_call')
@mock.patch('qubesadmin.tools.qvm_template.confirm_action')
@mock.patch('qubesadmin.tools.qvm_template.extract_rpm')
@mock.patch('qubesadmin.tools.qvm_template.download')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
def test_107_install_download_success(
self,
mock_verify,
mock_dl_list,
mock_dl,
mock_extract,
mock_confirm,
mock_call,
mock_mkdirs,
mock_rename):
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = b'0\0'
build_time = '2020-09-01 14:30:00' # 1598970600
install_time = '2020-09-01 15:30:00'
for key, val in [
('name', 'test-vm'),
('epoch', '2'),
('version', '4.1'),
('release', '2020'),
('reponame', 'qubes-templates-itl'),
('buildtime', build_time),
('installtime', install_time),
('license', 'GPL'),
('url', 'https://qubes-os.org'),
('summary', 'Summary'),
('description', 'Desc|desc')]:
self.app.expected_calls[(
'test-vm',
'admin.vm.feature.Set',
f'template-{key}',
val.encode())] = b'0\0'
mock_dl.return_value = {'test-vm': {
rpm.RPMTAG_NAME : 'qubes-template-test-vm',
rpm.RPMTAG_BUILDTIME : 1598970600,
rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc',
rpm.RPMTAG_EPOCHNUM : 2,
rpm.RPMTAG_LICENSE : 'GPL',
rpm.RPMTAG_RELEASE : '2020',
rpm.RPMTAG_SUMMARY : 'Summary',
rpm.RPMTAG_URL : 'https://qubes-os.org',
rpm.RPMTAG_VERSION : '4.1'
}}
dl_list = {
'test-vm': qubesadmin.tools.qvm_template.DlEntry(
('1', '4.1', '20200101'), 'qubes-templates-itl', 1048576)
}
mock_dl_list.return_value = dl_list
mock_call.side_effect = self.add_new_vm_side_effect
mock_time = mock.Mock(wraps=datetime.datetime)
mock_time.now.return_value = \
datetime.datetime(2020, 9, 1, 15, 30, tzinfo=datetime.timezone.utc)
with mock.patch('qubesadmin.tools.qvm_template.LOCK_FILE', '/tmp/test.lock'), \
mock.patch('datetime.datetime', new=mock_time), \
mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \
mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
args = argparse.Namespace(
templates='test-vm',
keyring='/tmp/keyring.gpg',
nogpgcheck=False,
cachedir='/var/cache/qvm-template',
repo_files=[],
releasever='4.1',
yes=False,
keep_cache=True,
allow_pv=False,
pool=None
)
mock_tmpdir.return_value.__enter__.return_value = \
'/var/tmp/qvm-template-tmpdir'
qubesadmin.tools.qvm_template.install(args, self.app)
# Attempt to get download list
selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST
self.assertEqual(mock_dl_list.mock_calls, [
mock.call(args, self.app, version_selector=selector)
])
# Nothing downloaded
mock_dl.assert_called_with(args, self.app,
dl_list=dl_list, version_selector=selector)
# download already verify the package internally
self.assertEqual(mock_verify.mock_calls, [])
# Package is extracted
mock_extract.assert_called_with('test-vm',
'/var/cache/qvm-template/qubes-template-test-vm-1:4.1-20200101.rpm',
'/var/tmp/qvm-template-tmpdir')
# No packages overwritten, so no confirm needed
self.assertEqual(mock_confirm.mock_calls, [])
# qvm-template-postprocess is called
self.assertEqual(mock_call.mock_calls, [
mock.call([
'qvm-template-postprocess',
'--really',
'--no-installed-by-rpm',
'post-install',
'test-vm',
'/var/tmp/qvm-template-tmpdir'
'/var/lib/qubes/vm-templates/test-vm'
])
])
# Cache directory created
self.assertEqual(mock_mkdirs.mock_calls, [
mock.call(args.cachedir, exist_ok=True)
])
# No templates downloaded, thus no renames needed
self.assertEqual(mock_rename.mock_calls, [])
self.assertAllCalled()
def test_110_qrexec_payload_refresh_success(self): def test_110_qrexec_payload_refresh_success(self):
with tempfile.NamedTemporaryFile() as repo_conf1, \ with tempfile.NamedTemporaryFile() as repo_conf1, \
tempfile.NamedTemporaryFile() as repo_conf2: tempfile.NamedTemporaryFile() as repo_conf2:
@ -3188,11 +3309,27 @@ test-vm : Qubes template for fedora-31
]) ])
self.assertAllCalled() self.assertAllCalled()
def _mock_qrexec_download(self, args, app, spec, path,
dlsize=None, refresh=False):
self.assertFalse(os.path.exists(path),
'{} should not exist before'.format(path))
# just create an empty file
with open(path, 'wb') as f:
if f is not None:
f.truncate(dlsize)
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_180_download_success(self, mock_qrexec, mock_dllist): def test_180_download_success(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
retries=1 retries=1
) )
qubesadmin.tools.qvm_template.download(args, self.app, dir, { qubesadmin.tools.qvm_template.download(args, self.app, dir, {
@ -3202,25 +3339,36 @@ test-vm : Qubes template for fedora-31
('0', '1', '2'), ('0', '1', '2'),
'qubes-templates-itl-testing', 'qubes-templates-itl-testing',
2048576) 2048576)
}, '.unverified') })
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576), 1048576),
mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2', mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2',
dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified', re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
2048576) 2048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(all( self.assertEqual(mock_verify_rpm.mock_calls, [
[x.endswith('.unverified') for x in os.listdir(dir)])) mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-31'),
mock.call(re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-32'),
])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist): def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
retries=1, retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
@ -3230,28 +3378,39 @@ test-vm : Qubes template for fedora-31
}) })
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576) 1048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
self.assertEqual(mock_verify_rpm.mock_calls, [
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-31'),
])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist): def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
mock_dllist.return_value = { mock_dllist.return_value = {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry( 'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576) ('1', '2', '3'), 'qubes-templates-itl', 1048576)
} }
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
retries=1 retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
) )
qubesadmin.tools.qvm_template.download(args, self.app, qubesadmin.tools.qvm_template.download(args, self.app,
dir, None, '.unverified', dir, None,
qubesadmin.tools.qvm_template.VersionSelector.LATEST_LOWER) qubesadmin.tools.qvm_template.VersionSelector.LATEST_LOWER)
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576) 1048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, [ self.assertEqual(mock_dllist.mock_calls, [
@ -3260,40 +3419,54 @@ test-vm : Qubes template for fedora-31
qubesadmin.tools.qvm_template.\ qubesadmin.tools.qvm_template.\
VersionSelector.LATEST_LOWER) VersionSelector.LATEST_LOWER)
]) ])
self.assertTrue(all( self.assertEqual(mock_verify_rpm.mock_calls, [
[x.endswith('.unverified') for x in os.listdir(dir)])) mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
'/tmp/keyring.gpg', template_name='fedora-31'),
])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist): def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
retries=1, retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
qubesadmin.tools.qvm_template.download(args, self.app, None, { qubesadmin.tools.qvm_template.download(args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry( 'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576) ('1', '2', '3'), 'qubes-templates-itl', 1048576)
}, '.unverified') })
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576) 1048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_184_download_success_exists(self, mock_qrexec, mock_dllist): def test_184_download_success_exists(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
with open(os.path.join( with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm.unverified'), dir, 'qubes-template-fedora-31-1:2-3.rpm'),
'w') as _: 'w') as _:
pass pass
args = argparse.Namespace( args = argparse.Namespace(
retries=1, retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
@ -3304,47 +3477,22 @@ test-vm : Qubes template for fedora-31
('0', '1', '2'), ('0', '1', '2'),
'qubes-templates-itl-testing', 'qubes-templates-itl-testing',
2048576) 2048576)
}, '.unverified') })
self.assertTrue('already exists, skipping' self.assertTrue('already exists, skipping'
in mock_err.getvalue()) in mock_err.getvalue())
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2', mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2',
dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified', re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
2048576) 2048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist): def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist,
with tempfile.TemporaryDirectory() as dir: mock_verify_rpm):
with open(os.path.join( mock_qrexec.side_effect = self._mock_qrexec_download
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
'w') as _:
pass
args = argparse.Namespace(
retries=1,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
qubesadmin.tools.qvm_template.download(args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
}, '.unverified')
self.assertTrue('already exists, skipping'
in mock_err.getvalue())
self.assertEqual(mock_qrexec.mock_calls, [])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(os.path.exists(
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified'))
self.assertTrue(all(
[x.endswith('.unverified') for x in os.listdir(dir)]))
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist):
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
with open(os.path.join( with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm'), dir, 'qubes-template-fedora-31-1:2-3.rpm'),
@ -3352,6 +3500,10 @@ test-vm : Qubes template for fedora-31
pass pass
args = argparse.Namespace( args = argparse.Namespace(
retries=1, retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
@ -3366,19 +3518,57 @@ test-vm : Qubes template for fedora-31
self.assertTrue(os.path.exists( self.assertTrue(os.path.exists(
dir + '/qubes-template-fedora-31-1:2-3.rpm')) dir + '/qubes-template-fedora-31-1:2-3.rpm'))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_187_download_success_retry(self, mock_qrexec, mock_dllist): def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
with tempfile.TemporaryDirectory() as dir:
with open(os.path.join(
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
'w') as _:
pass
args = argparse.Namespace(
retries=1,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
qubesadmin.tools.qvm_template.download(args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
})
self.assertTrue('already exists, skipping'
in mock_err.getvalue())
self.assertEqual(mock_qrexec.mock_calls, [])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertTrue(os.path.exists(
dir + '/qubes-template-fedora-31-1:2-3.rpm'))
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_187_download_success_retry(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
counter = 0 counter = 0
def f(*args): def f(*args, **kwargs):
nonlocal counter nonlocal counter
counter += 1 counter += 1
if counter == 1: if counter == 1:
raise ConnectionError raise ConnectionError
self._mock_qrexec_download(*args, **kwargs)
mock_qrexec.side_effect = f mock_qrexec.side_effect = f
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
retries=2, retries=2,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
@ -3389,31 +3579,39 @@ test-vm : Qubes template for fedora-31
}) })
self.assertTrue('retrying...' in mock_err.getvalue()) self.assertTrue('retrying...' in mock_err.getvalue())
self.assertEqual(mock_rm.mock_calls, [ self.assertEqual(mock_rm.mock_calls, [
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm') mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'))
]) ])
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576), 1048576),
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576) 1048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_188_download_fail_retry(self, mock_qrexec, mock_dllist): def test_188_download_fail_retry(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
counter = 0 counter = 0
def f(*args): def f(*args, **kwargs):
nonlocal counter nonlocal counter
counter += 1 counter += 1
if counter <= 3: if counter <= 3:
raise ConnectionError raise ConnectionError
self._mock_qrexec_download(*args, **kwargs)
mock_qrexec.side_effect = f mock_qrexec.side_effect = f
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
retries=3, retries=3,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
@ -3427,32 +3625,38 @@ test-vm : Qubes template for fedora-31
self.assertEqual(mock_err.getvalue().count('retrying...'), 2) self.assertEqual(mock_err.getvalue().count('retrying...'), 2)
self.assertTrue('download failed' in mock_err.getvalue()) self.assertTrue('download failed' in mock_err.getvalue())
self.assertEqual(mock_rm.mock_calls, [ self.assertEqual(mock_rm.mock_calls, [
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'), mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')),
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'), mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')),
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm') mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'))
]) ])
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576), 1048576),
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576), 1048576),
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
1048576) 1048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist): def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
def f(*args): def f(*args):
raise RuntimeError raise RuntimeError
mock_qrexec.side_effect = f mock_qrexec.side_effect = f
with tempfile.TemporaryDirectory() as dir: with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace( args = argparse.Namespace(
retries=3, retries=3,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir downloaddir=dir
) )
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
@ -3463,12 +3667,45 @@ test-vm : Qubes template for fedora-31
'fedora-31': qubesadmin.tools.qvm_template.DlEntry( 'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576) ('1', '2', '3'), 'qubes-templates-itl', 1048576)
}) })
self.assertEqual(mock_rm.mock_calls, [
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
])
self.assertEqual(mock_qrexec.mock_calls, [ self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
dir + '/qubes-template-fedora-31-1:2-3.rpm', re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'),
1048576) 1048576)
]) ])
self.assertEqual(mock_dllist.mock_calls, []) self.assertEqual(mock_dllist.mock_calls, [])
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
def test_189_download_verify_fail(self, mock_qrexec, mock_dllist,
mock_verify_rpm):
mock_qrexec.side_effect = self._mock_qrexec_download
mock_verify_rpm.side_effect = \
qubesadmin.tools.qvm_template.SignatureVerificationError
with tempfile.TemporaryDirectory() as dir:
args = argparse.Namespace(
retries=3,
repo_files=[],
keyring='/tmp/keyring.gpg',
releasever='4.1',
nogpgcheck=False,
downloaddir=dir
)
with self.assertRaises(qubesadmin.tools.qvm_template.SignatureVerificationError):
qubesadmin.tools.qvm_template.download(
args, self.app, None, {
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
('1', '2', '3'), 'qubes-templates-itl', 1048576)
})
self.assertEqual(mock_qrexec.mock_calls, [
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'),
1048576)
])
self.assertEqual(mock_dllist.mock_calls, [])
self.assertEqual(os.listdir(dir), [])
# TODO: test unexpected name downloaded
# TODO: test truncated download
# TODO: test no disk space

View File

@ -146,8 +146,6 @@ def get_parser() -> argparse.ArgumentParser:
help_str='Upgrade template packages.') help_str='Upgrade template packages.')
for parser_x in [parser_install, parser_reinstall, for parser_x in [parser_install, parser_reinstall,
parser_downgrade, parser_upgrade]: parser_downgrade, parser_upgrade]:
parser_x.add_argument('--nogpgcheck', action='store_true',
help='Disable signature checks.')
parser_x.add_argument('--allow-pv', action='store_true', parser_x.add_argument('--allow-pv', action='store_true',
help='Allow templates that set virt_mode to pv.') help='Allow templates that set virt_mode to pv.')
parser_x.add_argument('templates', nargs='*', metavar='TEMPLATESPEC') parser_x.add_argument('templates', nargs='*', metavar='TEMPLATESPEC')
@ -160,6 +158,8 @@ def get_parser() -> argparse.ArgumentParser:
help='Specify download directory.') help='Specify download directory.')
parser_x.add_argument('--retries', default=5, type=int, parser_x.add_argument('--retries', default=5, type=int,
help='Specify maximum number of retries for downloads.') help='Specify maximum number of retries for downloads.')
parser_x.add_argument('--nogpgcheck', action='store_true',
help='Disable signature checks.')
parser_download.add_argument('templates', nargs='*', parser_download.add_argument('templates', nargs='*',
metavar='TEMPLATESPEC') metavar='TEMPLATESPEC')
# qvm-template {list,info} # qvm-template {list,info}
@ -602,7 +602,8 @@ def get_keys_for_repos(repo_files: typing.List[str],
def verify_rpm( def verify_rpm(
path: str, path: str,
key: str, key: str,
nogpgcheck: bool = False nogpgcheck: bool = False,
template_name: typing.Optional[str] = None
) -> rpm.hdr: ) -> rpm.hdr:
"""Verify the digest and signature of a RPM package and return the package """Verify the digest and signature of a RPM package and return the package
header. header.
@ -614,6 +615,8 @@ def verify_rpm(
:param path: Location of the RPM package :param path: Location of the RPM package
:param nogpgcheck: Whether to allow invalid GPG signatures :param nogpgcheck: Whether to allow invalid GPG signatures
:param template_name: expected template name - if specified, verifies if
the package name matches expected template name
:return: RPM package header. If verification fails, raises an exception. :return: RPM package header. If verification fails, raises an exception.
""" """
@ -635,6 +638,10 @@ def verify_rpm(
tset = rpm.TransactionSet() tset = rpm.TransactionSet()
tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES) tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES)
hdr = tset.hdrFromFdno(fd) hdr = tset.hdrFromFdno(fd)
if template_name is not None:
if hdr[rpm.RPMTAG_NAME] != PACKAGE_NAME_PREFIX + template_name:
raise SignatureVerificationError(
'Downloaded package does not match expected template name')
return hdr return hdr
def extract_rpm(name: str, path: str, target: str) -> bool: def extract_rpm(name: str, path: str, target: str) -> bool:
@ -759,8 +766,8 @@ def download(
app: qubesadmin.app.QubesBase, app: qubesadmin.app.QubesBase,
path_override: typing.Optional[str] = None, path_override: typing.Optional[str] = None,
dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None, dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None,
suffix: str = '', version_selector: VersionSelector = VersionSelector.LATEST) \
version_selector: VersionSelector = VersionSelector.LATEST) -> None: -> typing.Dict[str, rpm.hdr]:
"""Command that downloads template packages. """Command that downloads template packages.
:param args: Arguments received by the application. :param args: Arguments received by the application.
@ -770,50 +777,65 @@ def download(
:param dl_list: Override list of templates to download. If not set or set :param dl_list: Override list of templates to download. If not set or set
to None, ``get_dl_list`` is called, which generates the list from to None, ``get_dl_list`` is called, which generates the list from
``args``. Optional ``args``. Optional
:param suffix: Suffix to add to the file name of downloaded packages. This
is useful if you want to distinguish between verified and unverified
packages. Defaults to an empty string
:param version_selector: Specify algorithm to select the candidate version :param version_selector: Specify algorithm to select the candidate version
of a package. Defaults to ``VersionSelector.LATEST`` of a package. Defaults to ``VersionSelector.LATEST``
:return package headers of downloaded templates
""" """
if dl_list is None: if dl_list is None:
dl_list = get_dl_list(args, app, version_selector=version_selector) dl_list = get_dl_list(args, app, version_selector=version_selector)
keys = get_keys_for_repos(args.repo_files, args.releasever)
package_hdrs = {}
path = path_override if path_override is not None else args.downloaddir path = path_override if path_override is not None else args.downloaddir
with tempfile.TemporaryDirectory(dir=path) as dl_dir:
for name, entry in dl_list.items(): for name, entry in dl_list.items():
version_str = build_version_str(entry.evr) version_str = build_version_str(entry.evr)
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
target = os.path.join(path, '%s.rpm' % spec) target = os.path.join(path, '%s.rpm' % spec)
target_suffix = target + suffix target_temp = os.path.join(dl_dir, '%s.rpm.UNTRUSTED' % spec)
if os.path.exists(target_suffix): repo_key = keys.get(entry.reponame)
if repo_key is None:
repo_key = args.keyring
if os.path.exists(target):
print('\'%s\' already exists, skipping...' % target, print('\'%s\' already exists, skipping...' % target,
file=sys.stderr) file=sys.stderr)
elif os.path.exists(target): # but still verify the package
print('\'%s\' already exists, skipping...' % target, verify_rpm(target, repo_key, template_name=name)
file=sys.stderr) continue
os.rename(target, target_suffix)
else:
print('Downloading \'%s\'...' % spec, file=sys.stderr) print('Downloading \'%s\'...' % spec, file=sys.stderr)
done = False done = False
for attempt in range(args.retries): for attempt in range(args.retries):
try: try:
qrexec_download(args, app, spec, target_suffix, qrexec_download(args, app, spec, target_temp,
entry.dlsize) entry.dlsize)
size = os.path.getsize(target_temp)
if size != entry.dlsize:
raise ConnectionError(
'Downloaded file is {} bytes, expected {}'.format(
size, entry.dlsize))
done = True done = True
break break
except ConnectionError: except ConnectionError:
os.remove(target_suffix) os.remove(target_temp)
if attempt + 1 < args.retries: if attempt + 1 < args.retries:
print('\'%s\' download failed, retrying...' % spec, print('\'%s\' download failed, retrying...' % spec,
file=sys.stderr) file=sys.stderr)
except:
# Also remove file if interrupted by other means
os.remove(target_suffix)
raise
if not done: if not done:
print('Error: \'%s\' download failed.' % spec, file=sys.stderr) print('Error: \'%s\' download failed.' % spec, file=sys.stderr)
sys.exit(1) sys.exit(1)
if args.nogpgcheck:
print(
'Warning: --nogpgcheck is ignored for downloaded templates',
file=sys.stderr)
package_hdr = verify_rpm(target_temp, repo_key, template_name=name)
# after package is verified, rename to the target location
os.rename(target_temp, target)
package_hdrs[name] = package_hdr
return package_hdrs
def locked(func): def locked(func):
"""Execute given function under a lock in *LOCK_FILE*""" """Execute given function under a lock in *LOCK_FILE*"""
@functools.wraps(func) @functools.wraps(func)
@ -854,19 +876,18 @@ def install(
unverified_rpm_list = [] # rpmfile, reponame unverified_rpm_list = [] # rpmfile, reponame
verified_rpm_list = [] verified_rpm_list = []
def verify(rpmfile, reponame, dl_dir=None): def verify(rpmfile, reponame, package_hdr=None):
"""Verify package signature and version, remove "unverified" """Verify package signature and version, remove "unverified"
suffix, and parse package header.""" suffix, and parse package header.
if dl_dir:
path = os.path.join(
dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX)
else:
path = rpmfile
If package_hdr is provided, the signature check is skipped and only
other checks are performed."""
if package_hdr is None:
repo_key = keys.get(reponame) repo_key = keys.get(reponame)
if repo_key is None: if repo_key is None:
repo_key = args.keyring repo_key = args.keyring
package_hdr = verify_rpm(path, repo_key, args.nogpgcheck) package_hdr = verify_rpm(rpmfile, repo_key,
nogpgcheck=args.nogpgcheck)
if not package_hdr: if not package_hdr:
parser.error('Package \'%s\' verification failed.' % rpmfile) parser.error('Package \'%s\' verification failed.' % rpmfile)
@ -877,9 +898,6 @@ def install(
# Remove prefix to get the real template name # Remove prefix to get the real template name
name = package_name[len(PACKAGE_NAME_PREFIX):] name = package_name[len(PACKAGE_NAME_PREFIX):]
if path != rpmfile:
os.rename(path, rpmfile)
# Check if already installed # Check if already installed
if not override_existing and name in app.domains: if not override_existing and name in app.domains:
print(('Template \'%s\' already installed, skipping...' print(('Template \'%s\' already installed, skipping...'
@ -927,7 +945,7 @@ def install(
# First verify local RPMs and extract header # First verify local RPMs and extract header
for rpmfile, reponame in unverified_rpm_list: for rpmfile, reponame in unverified_rpm_list:
verify(rpmfile, reponame) verify(rpmfile, reponame)
unverified_rpm_list = [] unverified_rpm_list = {}
os.makedirs(args.cachedir, exist_ok=True) os.makedirs(args.cachedir, exist_ok=True)
@ -951,7 +969,7 @@ def install(
version_str = build_version_str(entry.evr) version_str = build_version_str(entry.evr)
target_file = \ target_file = \
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str) '%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
unverified_rpm_list.append( unverified_rpm_list[name] = (
(os.path.join(args.cachedir, target_file), entry.reponame)) (os.path.join(args.cachedir, target_file), entry.reponame))
dl_list = dl_list_copy dl_list = dl_list_copy
@ -969,15 +987,14 @@ def install(
'This will override changes made in the following VMs:', 'This will override changes made in the following VMs:',
override_tpls) override_tpls)
with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir: package_hdrs = download(args, app,
download(args, app, path_override=dl_dir, dl_list=dl_list,
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
version_selector=version_selector) version_selector=version_selector)
# Verify downloaded templates # Verify downloaded templates
for rpmfile, reponame in unverified_rpm_list: for name, (rpmfile, reponame) in unverified_rpm_list.items():
verify(rpmfile, reponame, dl_dir=dl_dir) verify(rpmfile, reponame, package_hdrs[name])
unverified_rpm_list = [] del unverified_rpm_list
# Unpack and install # Unpack and install
for rpmfile, reponame, name, package_hdr in verified_rpm_list: for rpmfile, reponame, name, package_hdr in verified_rpm_list: