qvm-template: verify template package signature directly at download
Make the download() function save the package into a temporary space and move to the target location only after checking the signature. This is safer option than requiring all callers to explicitly verify the signature. Also, make the download() function verify if the template name inside the package matches what was requested. Especially, make `qvm-template download` action verify the signature too. On `qvm-template install` avoid checking the signature again for downloaded packages, by passing extra argument to the verify_rpm() function. But still verify signature of packages loaded from disk.
This commit is contained in:
parent
ce36dc55c5
commit
e424c7df9c
@ -1,3 +1,4 @@
|
||||
import re
|
||||
from unittest import mock
|
||||
import argparse
|
||||
import asyncio
|
||||
@ -14,6 +15,13 @@ import rpm
|
||||
import qubesadmin.tests
|
||||
import qubesadmin.tools.qvm_template
|
||||
|
||||
class re_str(str):
|
||||
def __eq__(self, other):
|
||||
return bool(re.match(self, other))
|
||||
|
||||
def __hash__(self):
|
||||
return super().__hash__()
|
||||
|
||||
class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
|
||||
def setUp(self):
|
||||
# Print str(list) directly so that the output is consistent no matter
|
||||
@ -248,7 +256,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
|
||||
path = template_file.name
|
||||
args = argparse.Namespace(
|
||||
templates=[path],
|
||||
keyring='/tmp',
|
||||
keyring='/tmp/keyring.gpg',
|
||||
nogpgcheck=False,
|
||||
cachedir='/var/cache/qvm-template',
|
||||
repo_files=[],
|
||||
@ -267,8 +275,9 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
|
||||
])
|
||||
# Nothing downloaded
|
||||
mock_dl.assert_called_with(args, self.app,
|
||||
path_override='/var/tmp/qvm-template-tmpdir',
|
||||
dl_list={}, suffix='.unverified', version_selector=selector)
|
||||
dl_list={}, version_selector=selector)
|
||||
mock_verify.assert_called_once_with(template_file.name, '/tmp/keyring.gpg',
|
||||
nogpgcheck=False)
|
||||
# Package is extracted
|
||||
mock_extract.assert_called_with('test-vm', path,
|
||||
'/var/tmp/qvm-template-tmpdir')
|
||||
@ -375,8 +384,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
|
||||
])
|
||||
# Nothing downloaded
|
||||
mock_dl.assert_called_with(args, self.app,
|
||||
path_override='/var/tmp/qvm-template-tmpdir',
|
||||
dl_list={}, suffix='.unverified', version_selector=selector)
|
||||
dl_list={}, version_selector=selector)
|
||||
# Package is extracted
|
||||
mock_extract.assert_called_with('test-vm', path,
|
||||
'/var/tmp/qvm-template-tmpdir')
|
||||
@ -520,8 +528,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
|
||||
])
|
||||
# Nothing downloaded
|
||||
self.assertEqual(mock_dl.mock_calls, [
|
||||
mock.call(args, self.app, path_override='/var/tmp/qvm-template-tmpdir',
|
||||
dl_list={}, suffix='.unverified', version_selector=selector)
|
||||
mock.call(args, self.app,
|
||||
dl_list={}, version_selector=selector)
|
||||
])
|
||||
# Should not be executed:
|
||||
self.assertEqual(mock_extract.mock_calls, [])
|
||||
@ -644,6 +652,119 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase):
|
||||
self.assertEqual(mock_rename.mock_calls, [])
|
||||
self.assertAllCalled()
|
||||
|
||||
@mock.patch('os.rename')
|
||||
@mock.patch('os.makedirs')
|
||||
@mock.patch('subprocess.check_call')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.confirm_action')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.extract_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.download')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
def test_107_install_download_success(
|
||||
self,
|
||||
mock_verify,
|
||||
mock_dl_list,
|
||||
mock_dl,
|
||||
mock_extract,
|
||||
mock_confirm,
|
||||
mock_call,
|
||||
mock_mkdirs,
|
||||
mock_rename):
|
||||
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = b'0\0'
|
||||
build_time = '2020-09-01 14:30:00' # 1598970600
|
||||
install_time = '2020-09-01 15:30:00'
|
||||
for key, val in [
|
||||
('name', 'test-vm'),
|
||||
('epoch', '2'),
|
||||
('version', '4.1'),
|
||||
('release', '2020'),
|
||||
('reponame', 'qubes-templates-itl'),
|
||||
('buildtime', build_time),
|
||||
('installtime', install_time),
|
||||
('license', 'GPL'),
|
||||
('url', 'https://qubes-os.org'),
|
||||
('summary', 'Summary'),
|
||||
('description', 'Desc|desc')]:
|
||||
self.app.expected_calls[(
|
||||
'test-vm',
|
||||
'admin.vm.feature.Set',
|
||||
f'template-{key}',
|
||||
val.encode())] = b'0\0'
|
||||
mock_dl.return_value = {'test-vm': {
|
||||
rpm.RPMTAG_NAME : 'qubes-template-test-vm',
|
||||
rpm.RPMTAG_BUILDTIME : 1598970600,
|
||||
rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc',
|
||||
rpm.RPMTAG_EPOCHNUM : 2,
|
||||
rpm.RPMTAG_LICENSE : 'GPL',
|
||||
rpm.RPMTAG_RELEASE : '2020',
|
||||
rpm.RPMTAG_SUMMARY : 'Summary',
|
||||
rpm.RPMTAG_URL : 'https://qubes-os.org',
|
||||
rpm.RPMTAG_VERSION : '4.1'
|
||||
}}
|
||||
dl_list = {
|
||||
'test-vm': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '4.1', '20200101'), 'qubes-templates-itl', 1048576)
|
||||
}
|
||||
mock_dl_list.return_value = dl_list
|
||||
mock_call.side_effect = self.add_new_vm_side_effect
|
||||
mock_time = mock.Mock(wraps=datetime.datetime)
|
||||
mock_time.now.return_value = \
|
||||
datetime.datetime(2020, 9, 1, 15, 30, tzinfo=datetime.timezone.utc)
|
||||
with mock.patch('qubesadmin.tools.qvm_template.LOCK_FILE', '/tmp/test.lock'), \
|
||||
mock.patch('datetime.datetime', new=mock_time), \
|
||||
mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \
|
||||
mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
|
||||
args = argparse.Namespace(
|
||||
templates='test-vm',
|
||||
keyring='/tmp/keyring.gpg',
|
||||
nogpgcheck=False,
|
||||
cachedir='/var/cache/qvm-template',
|
||||
repo_files=[],
|
||||
releasever='4.1',
|
||||
yes=False,
|
||||
keep_cache=True,
|
||||
allow_pv=False,
|
||||
pool=None
|
||||
)
|
||||
mock_tmpdir.return_value.__enter__.return_value = \
|
||||
'/var/tmp/qvm-template-tmpdir'
|
||||
qubesadmin.tools.qvm_template.install(args, self.app)
|
||||
# Attempt to get download list
|
||||
selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST
|
||||
self.assertEqual(mock_dl_list.mock_calls, [
|
||||
mock.call(args, self.app, version_selector=selector)
|
||||
])
|
||||
# Nothing downloaded
|
||||
mock_dl.assert_called_with(args, self.app,
|
||||
dl_list=dl_list, version_selector=selector)
|
||||
# download already verify the package internally
|
||||
self.assertEqual(mock_verify.mock_calls, [])
|
||||
# Package is extracted
|
||||
mock_extract.assert_called_with('test-vm',
|
||||
'/var/cache/qvm-template/qubes-template-test-vm-1:4.1-20200101.rpm',
|
||||
'/var/tmp/qvm-template-tmpdir')
|
||||
# No packages overwritten, so no confirm needed
|
||||
self.assertEqual(mock_confirm.mock_calls, [])
|
||||
# qvm-template-postprocess is called
|
||||
self.assertEqual(mock_call.mock_calls, [
|
||||
mock.call([
|
||||
'qvm-template-postprocess',
|
||||
'--really',
|
||||
'--no-installed-by-rpm',
|
||||
'post-install',
|
||||
'test-vm',
|
||||
'/var/tmp/qvm-template-tmpdir'
|
||||
'/var/lib/qubes/vm-templates/test-vm'
|
||||
])
|
||||
])
|
||||
# Cache directory created
|
||||
self.assertEqual(mock_mkdirs.mock_calls, [
|
||||
mock.call(args.cachedir, exist_ok=True)
|
||||
])
|
||||
# No templates downloaded, thus no renames needed
|
||||
self.assertEqual(mock_rename.mock_calls, [])
|
||||
self.assertAllCalled()
|
||||
|
||||
def test_110_qrexec_payload_refresh_success(self):
|
||||
with tempfile.NamedTemporaryFile() as repo_conf1, \
|
||||
tempfile.NamedTemporaryFile() as repo_conf2:
|
||||
@ -3188,11 +3309,27 @@ test-vm : Qubes template for fedora-31
|
||||
])
|
||||
self.assertAllCalled()
|
||||
|
||||
def _mock_qrexec_download(self, args, app, spec, path,
|
||||
dlsize=None, refresh=False):
|
||||
self.assertFalse(os.path.exists(path),
|
||||
'{} should not exist before'.format(path))
|
||||
# just create an empty file
|
||||
with open(path, 'wb') as f:
|
||||
if f is not None:
|
||||
f.truncate(dlsize)
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_180_download_success(self, mock_qrexec, mock_dllist):
|
||||
def test_180_download_success(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
retries=1
|
||||
)
|
||||
qubesadmin.tools.qvm_template.download(args, self.app, dir, {
|
||||
@ -3202,25 +3339,36 @@ test-vm : Qubes template for fedora-31
|
||||
('0', '1', '2'),
|
||||
'qubes-templates-itl-testing',
|
||||
2048576)
|
||||
}, '.unverified')
|
||||
})
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576),
|
||||
mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2',
|
||||
dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified',
|
||||
re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
|
||||
2048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertTrue(all(
|
||||
[x.endswith('.unverified') for x in os.listdir(dir)]))
|
||||
self.assertEqual(mock_verify_rpm.mock_calls, [
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
'/tmp/keyring.gpg', template_name='fedora-31'),
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
|
||||
'/tmp/keyring.gpg', template_name='fedora-32'),
|
||||
])
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist):
|
||||
def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=1,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
|
||||
@ -3230,28 +3378,39 @@ test-vm : Qubes template for fedora-31
|
||||
})
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertEqual(mock_verify_rpm.mock_calls, [
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
'/tmp/keyring.gpg', template_name='fedora-31'),
|
||||
])
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist):
|
||||
def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
mock_dllist.return_value = {
|
||||
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '2', '3'), 'qubes-templates-itl', 1048576)
|
||||
}
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=1
|
||||
retries=1,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
)
|
||||
qubesadmin.tools.qvm_template.download(args, self.app,
|
||||
dir, None, '.unverified',
|
||||
dir, None,
|
||||
qubesadmin.tools.qvm_template.VersionSelector.LATEST_LOWER)
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [
|
||||
@ -3260,40 +3419,54 @@ test-vm : Qubes template for fedora-31
|
||||
qubesadmin.tools.qvm_template.\
|
||||
VersionSelector.LATEST_LOWER)
|
||||
])
|
||||
self.assertTrue(all(
|
||||
[x.endswith('.unverified') for x in os.listdir(dir)]))
|
||||
self.assertEqual(mock_verify_rpm.mock_calls, [
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
'/tmp/keyring.gpg', template_name='fedora-31'),
|
||||
])
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist):
|
||||
def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=1,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
qubesadmin.tools.qvm_template.download(args, self.app, None, {
|
||||
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '2', '3'), 'qubes-templates-itl', 1048576)
|
||||
}, '.unverified')
|
||||
})
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertTrue(all(
|
||||
[x.endswith('.unverified') for x in os.listdir(dir)]))
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_184_download_success_exists(self, mock_qrexec, mock_dllist):
|
||||
def test_184_download_success_exists(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
with open(os.path.join(
|
||||
dir, 'qubes-template-fedora-31-1:2-3.rpm.unverified'),
|
||||
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
|
||||
'w') as _:
|
||||
pass
|
||||
args = argparse.Namespace(
|
||||
retries=1,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
|
||||
@ -3304,47 +3477,22 @@ test-vm : Qubes template for fedora-31
|
||||
('0', '1', '2'),
|
||||
'qubes-templates-itl-testing',
|
||||
2048576)
|
||||
}, '.unverified')
|
||||
})
|
||||
self.assertTrue('already exists, skipping'
|
||||
in mock_err.getvalue())
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2',
|
||||
dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified',
|
||||
re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'),
|
||||
2048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertTrue(all(
|
||||
[x.endswith('.unverified') for x in os.listdir(dir)]))
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist):
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
with open(os.path.join(
|
||||
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
|
||||
'w') as _:
|
||||
pass
|
||||
args = argparse.Namespace(
|
||||
retries=1,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
|
||||
qubesadmin.tools.qvm_template.download(args, self.app, None, {
|
||||
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '2', '3'), 'qubes-templates-itl', 1048576)
|
||||
}, '.unverified')
|
||||
self.assertTrue('already exists, skipping'
|
||||
in mock_err.getvalue())
|
||||
self.assertEqual(mock_qrexec.mock_calls, [])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertTrue(os.path.exists(
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified'))
|
||||
self.assertTrue(all(
|
||||
[x.endswith('.unverified') for x in os.listdir(dir)]))
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist):
|
||||
def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
with open(os.path.join(
|
||||
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
|
||||
@ -3352,6 +3500,10 @@ test-vm : Qubes template for fedora-31
|
||||
pass
|
||||
args = argparse.Namespace(
|
||||
retries=1,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
|
||||
@ -3366,19 +3518,57 @@ test-vm : Qubes template for fedora-31
|
||||
self.assertTrue(os.path.exists(
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm'))
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_187_download_success_retry(self, mock_qrexec, mock_dllist):
|
||||
def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
with open(os.path.join(
|
||||
dir, 'qubes-template-fedora-31-1:2-3.rpm'),
|
||||
'w') as _:
|
||||
pass
|
||||
args = argparse.Namespace(
|
||||
retries=1,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err:
|
||||
qubesadmin.tools.qvm_template.download(args, self.app, None, {
|
||||
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '2', '3'), 'qubes-templates-itl', 1048576)
|
||||
})
|
||||
self.assertTrue('already exists, skipping'
|
||||
in mock_err.getvalue())
|
||||
self.assertEqual(mock_qrexec.mock_calls, [])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertTrue(os.path.exists(
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm'))
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_187_download_success_retry(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
counter = 0
|
||||
def f(*args):
|
||||
def f(*args, **kwargs):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
if counter == 1:
|
||||
raise ConnectionError
|
||||
self._mock_qrexec_download(*args, **kwargs)
|
||||
mock_qrexec.side_effect = f
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=2,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
|
||||
@ -3389,31 +3579,39 @@ test-vm : Qubes template for fedora-31
|
||||
})
|
||||
self.assertTrue('retrying...' in mock_err.getvalue())
|
||||
self.assertEqual(mock_rm.mock_calls, [
|
||||
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'))
|
||||
])
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576),
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_188_download_fail_retry(self, mock_qrexec, mock_dllist):
|
||||
def test_188_download_fail_retry(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
counter = 0
|
||||
def f(*args):
|
||||
def f(*args, **kwargs):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
if counter <= 3:
|
||||
raise ConnectionError
|
||||
self._mock_qrexec_download(*args, **kwargs)
|
||||
mock_qrexec.side_effect = f
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=3,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
|
||||
@ -3427,32 +3625,38 @@ test-vm : Qubes template for fedora-31
|
||||
self.assertEqual(mock_err.getvalue().count('retrying...'), 2)
|
||||
self.assertTrue('download failed' in mock_err.getvalue())
|
||||
self.assertEqual(mock_rm.mock_calls, [
|
||||
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'),
|
||||
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'),
|
||||
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')),
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')),
|
||||
mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'))
|
||||
])
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576),
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576),
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist):
|
||||
def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
def f(*args):
|
||||
raise RuntimeError
|
||||
mock_qrexec.side_effect = f
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=3,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \
|
||||
@ -3463,12 +3667,45 @@ test-vm : Qubes template for fedora-31
|
||||
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '2', '3'), 'qubes-templates-itl', 1048576)
|
||||
})
|
||||
self.assertEqual(mock_rm.mock_calls, [
|
||||
mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm')
|
||||
])
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
dir + '/qubes-template-fedora-31-1:2-3.rpm',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
|
||||
@mock.patch('qubesadmin.tools.qvm_template.verify_rpm')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.get_dl_list')
|
||||
@mock.patch('qubesadmin.tools.qvm_template.qrexec_download')
|
||||
def test_189_download_verify_fail(self, mock_qrexec, mock_dllist,
|
||||
mock_verify_rpm):
|
||||
mock_qrexec.side_effect = self._mock_qrexec_download
|
||||
mock_verify_rpm.side_effect = \
|
||||
qubesadmin.tools.qvm_template.SignatureVerificationError
|
||||
|
||||
with tempfile.TemporaryDirectory() as dir:
|
||||
args = argparse.Namespace(
|
||||
retries=3,
|
||||
repo_files=[],
|
||||
keyring='/tmp/keyring.gpg',
|
||||
releasever='4.1',
|
||||
nogpgcheck=False,
|
||||
downloaddir=dir
|
||||
)
|
||||
with self.assertRaises(qubesadmin.tools.qvm_template.SignatureVerificationError):
|
||||
qubesadmin.tools.qvm_template.download(
|
||||
args, self.app, None, {
|
||||
'fedora-31': qubesadmin.tools.qvm_template.DlEntry(
|
||||
('1', '2', '3'), 'qubes-templates-itl', 1048576)
|
||||
})
|
||||
self.assertEqual(mock_qrexec.mock_calls, [
|
||||
mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3',
|
||||
re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'),
|
||||
1048576)
|
||||
])
|
||||
self.assertEqual(mock_dllist.mock_calls, [])
|
||||
self.assertEqual(os.listdir(dir), [])
|
||||
|
||||
# TODO: test unexpected name downloaded
|
||||
# TODO: test truncated download
|
||||
# TODO: test no disk space
|
@ -146,8 +146,6 @@ def get_parser() -> argparse.ArgumentParser:
|
||||
help_str='Upgrade template packages.')
|
||||
for parser_x in [parser_install, parser_reinstall,
|
||||
parser_downgrade, parser_upgrade]:
|
||||
parser_x.add_argument('--nogpgcheck', action='store_true',
|
||||
help='Disable signature checks.')
|
||||
parser_x.add_argument('--allow-pv', action='store_true',
|
||||
help='Allow templates that set virt_mode to pv.')
|
||||
parser_x.add_argument('templates', nargs='*', metavar='TEMPLATESPEC')
|
||||
@ -160,6 +158,8 @@ def get_parser() -> argparse.ArgumentParser:
|
||||
help='Specify download directory.')
|
||||
parser_x.add_argument('--retries', default=5, type=int,
|
||||
help='Specify maximum number of retries for downloads.')
|
||||
parser_x.add_argument('--nogpgcheck', action='store_true',
|
||||
help='Disable signature checks.')
|
||||
parser_download.add_argument('templates', nargs='*',
|
||||
metavar='TEMPLATESPEC')
|
||||
# qvm-template {list,info}
|
||||
@ -602,7 +602,8 @@ def get_keys_for_repos(repo_files: typing.List[str],
|
||||
def verify_rpm(
|
||||
path: str,
|
||||
key: str,
|
||||
nogpgcheck: bool = False
|
||||
nogpgcheck: bool = False,
|
||||
template_name: typing.Optional[str] = None
|
||||
) -> rpm.hdr:
|
||||
"""Verify the digest and signature of a RPM package and return the package
|
||||
header.
|
||||
@ -614,6 +615,8 @@ def verify_rpm(
|
||||
|
||||
:param path: Location of the RPM package
|
||||
:param nogpgcheck: Whether to allow invalid GPG signatures
|
||||
:param template_name: expected template name - if specified, verifies if
|
||||
the package name matches expected template name
|
||||
|
||||
:return: RPM package header. If verification fails, raises an exception.
|
||||
"""
|
||||
@ -635,6 +638,10 @@ def verify_rpm(
|
||||
tset = rpm.TransactionSet()
|
||||
tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES)
|
||||
hdr = tset.hdrFromFdno(fd)
|
||||
if template_name is not None:
|
||||
if hdr[rpm.RPMTAG_NAME] != PACKAGE_NAME_PREFIX + template_name:
|
||||
raise SignatureVerificationError(
|
||||
'Downloaded package does not match expected template name')
|
||||
return hdr
|
||||
|
||||
def extract_rpm(name: str, path: str, target: str) -> bool:
|
||||
@ -759,8 +766,8 @@ def download(
|
||||
app: qubesadmin.app.QubesBase,
|
||||
path_override: typing.Optional[str] = None,
|
||||
dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None,
|
||||
suffix: str = '',
|
||||
version_selector: VersionSelector = VersionSelector.LATEST) -> None:
|
||||
version_selector: VersionSelector = VersionSelector.LATEST) \
|
||||
-> typing.Dict[str, rpm.hdr]:
|
||||
"""Command that downloads template packages.
|
||||
|
||||
:param args: Arguments received by the application.
|
||||
@ -770,50 +777,65 @@ def download(
|
||||
:param dl_list: Override list of templates to download. If not set or set
|
||||
to None, ``get_dl_list`` is called, which generates the list from
|
||||
``args``. Optional
|
||||
:param suffix: Suffix to add to the file name of downloaded packages. This
|
||||
is useful if you want to distinguish between verified and unverified
|
||||
packages. Defaults to an empty string
|
||||
:param version_selector: Specify algorithm to select the candidate version
|
||||
of a package. Defaults to ``VersionSelector.LATEST``
|
||||
:return package headers of downloaded templates
|
||||
"""
|
||||
if dl_list is None:
|
||||
dl_list = get_dl_list(args, app, version_selector=version_selector)
|
||||
|
||||
keys = get_keys_for_repos(args.repo_files, args.releasever)
|
||||
|
||||
package_hdrs = {}
|
||||
|
||||
path = path_override if path_override is not None else args.downloaddir
|
||||
with tempfile.TemporaryDirectory(dir=path) as dl_dir:
|
||||
for name, entry in dl_list.items():
|
||||
version_str = build_version_str(entry.evr)
|
||||
spec = PACKAGE_NAME_PREFIX + name + '-' + version_str
|
||||
target = os.path.join(path, '%s.rpm' % spec)
|
||||
target_suffix = target + suffix
|
||||
if os.path.exists(target_suffix):
|
||||
target_temp = os.path.join(dl_dir, '%s.rpm.UNTRUSTED' % spec)
|
||||
repo_key = keys.get(entry.reponame)
|
||||
if repo_key is None:
|
||||
repo_key = args.keyring
|
||||
if os.path.exists(target):
|
||||
print('\'%s\' already exists, skipping...' % target,
|
||||
file=sys.stderr)
|
||||
elif os.path.exists(target):
|
||||
print('\'%s\' already exists, skipping...' % target,
|
||||
file=sys.stderr)
|
||||
os.rename(target, target_suffix)
|
||||
else:
|
||||
# but still verify the package
|
||||
verify_rpm(target, repo_key, template_name=name)
|
||||
continue
|
||||
print('Downloading \'%s\'...' % spec, file=sys.stderr)
|
||||
done = False
|
||||
for attempt in range(args.retries):
|
||||
try:
|
||||
qrexec_download(args, app, spec, target_suffix,
|
||||
qrexec_download(args, app, spec, target_temp,
|
||||
entry.dlsize)
|
||||
size = os.path.getsize(target_temp)
|
||||
if size != entry.dlsize:
|
||||
raise ConnectionError(
|
||||
'Downloaded file is {} bytes, expected {}'.format(
|
||||
size, entry.dlsize))
|
||||
done = True
|
||||
break
|
||||
except ConnectionError:
|
||||
os.remove(target_suffix)
|
||||
os.remove(target_temp)
|
||||
if attempt + 1 < args.retries:
|
||||
print('\'%s\' download failed, retrying...' % spec,
|
||||
file=sys.stderr)
|
||||
except:
|
||||
# Also remove file if interrupted by other means
|
||||
os.remove(target_suffix)
|
||||
raise
|
||||
if not done:
|
||||
print('Error: \'%s\' download failed.' % spec, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if args.nogpgcheck:
|
||||
print(
|
||||
'Warning: --nogpgcheck is ignored for downloaded templates',
|
||||
file=sys.stderr)
|
||||
package_hdr = verify_rpm(target_temp, repo_key, template_name=name)
|
||||
# after package is verified, rename to the target location
|
||||
os.rename(target_temp, target)
|
||||
package_hdrs[name] = package_hdr
|
||||
return package_hdrs
|
||||
|
||||
def locked(func):
|
||||
"""Execute given function under a lock in *LOCK_FILE*"""
|
||||
@functools.wraps(func)
|
||||
@ -854,19 +876,18 @@ def install(
|
||||
|
||||
unverified_rpm_list = [] # rpmfile, reponame
|
||||
verified_rpm_list = []
|
||||
def verify(rpmfile, reponame, dl_dir=None):
|
||||
def verify(rpmfile, reponame, package_hdr=None):
|
||||
"""Verify package signature and version, remove "unverified"
|
||||
suffix, and parse package header."""
|
||||
if dl_dir:
|
||||
path = os.path.join(
|
||||
dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX)
|
||||
else:
|
||||
path = rpmfile
|
||||
suffix, and parse package header.
|
||||
|
||||
If package_hdr is provided, the signature check is skipped and only
|
||||
other checks are performed."""
|
||||
if package_hdr is None:
|
||||
repo_key = keys.get(reponame)
|
||||
if repo_key is None:
|
||||
repo_key = args.keyring
|
||||
package_hdr = verify_rpm(path, repo_key, args.nogpgcheck)
|
||||
package_hdr = verify_rpm(rpmfile, repo_key,
|
||||
nogpgcheck=args.nogpgcheck)
|
||||
if not package_hdr:
|
||||
parser.error('Package \'%s\' verification failed.' % rpmfile)
|
||||
|
||||
@ -877,9 +898,6 @@ def install(
|
||||
# Remove prefix to get the real template name
|
||||
name = package_name[len(PACKAGE_NAME_PREFIX):]
|
||||
|
||||
if path != rpmfile:
|
||||
os.rename(path, rpmfile)
|
||||
|
||||
# Check if already installed
|
||||
if not override_existing and name in app.domains:
|
||||
print(('Template \'%s\' already installed, skipping...'
|
||||
@ -927,7 +945,7 @@ def install(
|
||||
# First verify local RPMs and extract header
|
||||
for rpmfile, reponame in unverified_rpm_list:
|
||||
verify(rpmfile, reponame)
|
||||
unverified_rpm_list = []
|
||||
unverified_rpm_list = {}
|
||||
|
||||
os.makedirs(args.cachedir, exist_ok=True)
|
||||
|
||||
@ -951,7 +969,7 @@ def install(
|
||||
version_str = build_version_str(entry.evr)
|
||||
target_file = \
|
||||
'%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str)
|
||||
unverified_rpm_list.append(
|
||||
unverified_rpm_list[name] = (
|
||||
(os.path.join(args.cachedir, target_file), entry.reponame))
|
||||
dl_list = dl_list_copy
|
||||
|
||||
@ -969,15 +987,14 @@ def install(
|
||||
'This will override changes made in the following VMs:',
|
||||
override_tpls)
|
||||
|
||||
with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir:
|
||||
download(args, app, path_override=dl_dir,
|
||||
dl_list=dl_list, suffix=UNVERIFIED_SUFFIX,
|
||||
package_hdrs = download(args, app,
|
||||
dl_list=dl_list,
|
||||
version_selector=version_selector)
|
||||
|
||||
# Verify downloaded templates
|
||||
for rpmfile, reponame in unverified_rpm_list:
|
||||
verify(rpmfile, reponame, dl_dir=dl_dir)
|
||||
unverified_rpm_list = []
|
||||
for name, (rpmfile, reponame) in unverified_rpm_list.items():
|
||||
verify(rpmfile, reponame, package_hdrs[name])
|
||||
del unverified_rpm_list
|
||||
|
||||
# Unpack and install
|
||||
for rpmfile, reponame, name, package_hdr in verified_rpm_list:
|
||||
|
Loading…
Reference in New Issue
Block a user