diff --git a/qubesadmin/tests/tools/qvm_template.py b/qubesadmin/tests/tools/qvm_template.py index c131a4b..2746028 100644 --- a/qubesadmin/tests/tools/qvm_template.py +++ b/qubesadmin/tests/tools/qvm_template.py @@ -1,3 +1,4 @@ +import re from unittest import mock import argparse import asyncio @@ -14,6 +15,13 @@ import rpm import qubesadmin.tests import qubesadmin.tools.qvm_template +class re_str(str): + def __eq__(self, other): + return bool(re.match(self, other)) + + def __hash__(self): + return super().__hash__() + class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): def setUp(self): # Print str(list) directly so that the output is consistent no matter @@ -248,7 +256,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): path = template_file.name args = argparse.Namespace( templates=[path], - keyring='/tmp', + keyring='/tmp/keyring.gpg', nogpgcheck=False, cachedir='/var/cache/qvm-template', repo_files=[], @@ -267,8 +275,9 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): ]) # Nothing downloaded mock_dl.assert_called_with(args, self.app, - path_override='/var/tmp/qvm-template-tmpdir', - dl_list={}, suffix='.unverified', version_selector=selector) + dl_list={}, version_selector=selector) + mock_verify.assert_called_once_with(template_file.name, '/tmp/keyring.gpg', + nogpgcheck=False) # Package is extracted mock_extract.assert_called_with('test-vm', path, '/var/tmp/qvm-template-tmpdir') @@ -375,8 +384,7 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): ]) # Nothing downloaded mock_dl.assert_called_with(args, self.app, - path_override='/var/tmp/qvm-template-tmpdir', - dl_list={}, suffix='.unverified', version_selector=selector) + dl_list={}, version_selector=selector) # Package is extracted mock_extract.assert_called_with('test-vm', path, '/var/tmp/qvm-template-tmpdir') @@ -520,8 +528,8 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): ]) # Nothing downloaded self.assertEqual(mock_dl.mock_calls, [ - mock.call(args, self.app, path_override='/var/tmp/qvm-template-tmpdir', - dl_list={}, suffix='.unverified', version_selector=selector) + mock.call(args, self.app, + dl_list={}, version_selector=selector) ]) # Should not be executed: self.assertEqual(mock_extract.mock_calls, []) @@ -644,6 +652,119 @@ class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): self.assertEqual(mock_rename.mock_calls, []) self.assertAllCalled() + @mock.patch('os.rename') + @mock.patch('os.makedirs') + @mock.patch('subprocess.check_call') + @mock.patch('qubesadmin.tools.qvm_template.confirm_action') + @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') + @mock.patch('qubesadmin.tools.qvm_template.download') + @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') + def test_107_install_download_success( + self, + mock_verify, + mock_dl_list, + mock_dl, + mock_extract, + mock_confirm, + mock_call, + mock_mkdirs, + mock_rename): + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = b'0\0' + build_time = '2020-09-01 14:30:00' # 1598970600 + install_time = '2020-09-01 15:30:00' + for key, val in [ + ('name', 'test-vm'), + ('epoch', '2'), + ('version', '4.1'), + ('release', '2020'), + ('reponame', 'qubes-templates-itl'), + ('buildtime', build_time), + ('installtime', install_time), + ('license', 'GPL'), + ('url', 'https://qubes-os.org'), + ('summary', 'Summary'), + ('description', 'Desc|desc')]: + self.app.expected_calls[( + 'test-vm', + 'admin.vm.feature.Set', + f'template-{key}', + val.encode())] = b'0\0' + mock_dl.return_value = {'test-vm': { + rpm.RPMTAG_NAME : 'qubes-template-test-vm', + rpm.RPMTAG_BUILDTIME : 1598970600, + rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc', + rpm.RPMTAG_EPOCHNUM : 2, + rpm.RPMTAG_LICENSE : 'GPL', + rpm.RPMTAG_RELEASE : '2020', + rpm.RPMTAG_SUMMARY : 'Summary', + rpm.RPMTAG_URL : 'https://qubes-os.org', + rpm.RPMTAG_VERSION : '4.1' + }} + dl_list = { + 'test-vm': qubesadmin.tools.qvm_template.DlEntry( + ('1', '4.1', '20200101'), 'qubes-templates-itl', 1048576) + } + mock_dl_list.return_value = dl_list + mock_call.side_effect = self.add_new_vm_side_effect + mock_time = mock.Mock(wraps=datetime.datetime) + mock_time.now.return_value = \ + datetime.datetime(2020, 9, 1, 15, 30, tzinfo=datetime.timezone.utc) + with mock.patch('qubesadmin.tools.qvm_template.LOCK_FILE', '/tmp/test.lock'), \ + mock.patch('datetime.datetime', new=mock_time), \ + mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ + mock.patch('sys.stderr', new=io.StringIO()) as mock_err: + args = argparse.Namespace( + templates='test-vm', + keyring='/tmp/keyring.gpg', + nogpgcheck=False, + cachedir='/var/cache/qvm-template', + repo_files=[], + releasever='4.1', + yes=False, + keep_cache=True, + allow_pv=False, + pool=None + ) + mock_tmpdir.return_value.__enter__.return_value = \ + '/var/tmp/qvm-template-tmpdir' + qubesadmin.tools.qvm_template.install(args, self.app) + # Attempt to get download list + selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST + self.assertEqual(mock_dl_list.mock_calls, [ + mock.call(args, self.app, version_selector=selector) + ]) + # Nothing downloaded + mock_dl.assert_called_with(args, self.app, + dl_list=dl_list, version_selector=selector) + # download already verify the package internally + self.assertEqual(mock_verify.mock_calls, []) + # Package is extracted + mock_extract.assert_called_with('test-vm', + '/var/cache/qvm-template/qubes-template-test-vm-1:4.1-20200101.rpm', + '/var/tmp/qvm-template-tmpdir') + # No packages overwritten, so no confirm needed + self.assertEqual(mock_confirm.mock_calls, []) + # qvm-template-postprocess is called + self.assertEqual(mock_call.mock_calls, [ + mock.call([ + 'qvm-template-postprocess', + '--really', + '--no-installed-by-rpm', + 'post-install', + 'test-vm', + '/var/tmp/qvm-template-tmpdir' + '/var/lib/qubes/vm-templates/test-vm' + ]) + ]) + # Cache directory created + self.assertEqual(mock_mkdirs.mock_calls, [ + mock.call(args.cachedir, exist_ok=True) + ]) + # No templates downloaded, thus no renames needed + self.assertEqual(mock_rename.mock_calls, []) + self.assertAllCalled() + def test_110_qrexec_payload_refresh_success(self): with tempfile.NamedTemporaryFile() as repo_conf1, \ tempfile.NamedTemporaryFile() as repo_conf2: @@ -3188,11 +3309,27 @@ test-vm : Qubes template for fedora-31 ]) self.assertAllCalled() + def _mock_qrexec_download(self, args, app, spec, path, + dlsize=None, refresh=False): + self.assertFalse(os.path.exists(path), + '{} should not exist before'.format(path)) + # just create an empty file + with open(path, 'wb') as f: + if f is not None: + f.truncate(dlsize) + + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_180_download_success(self, mock_qrexec, mock_dllist): + def test_180_download_success(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, retries=1 ) qubesadmin.tools.qvm_template.download(args, self.app, dir, { @@ -3202,25 +3339,36 @@ test-vm : Qubes template for fedora-31 ('0', '1', '2'), 'qubes-templates-itl-testing', 2048576) - }, '.unverified') + }) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576), mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2', - dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified', + re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'), 2048576) ]) self.assertEqual(mock_dllist.mock_calls, []) - self.assertTrue(all( - [x.endswith('.unverified') for x in os.listdir(dir)])) + self.assertEqual(mock_verify_rpm.mock_calls, [ + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), + '/tmp/keyring.gpg', template_name='fedora-31'), + mock.call(re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'), + '/tmp/keyring.gpg', template_name='fedora-32'), + ]) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist): + def test_181_download_success_nosuffix(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( retries=1, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: @@ -3230,28 +3378,39 @@ test-vm : Qubes template for fedora-31 }) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576) ]) self.assertEqual(mock_dllist.mock_calls, []) + self.assertEqual(mock_verify_rpm.mock_calls, [ + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), + '/tmp/keyring.gpg', template_name='fedora-31'), + ]) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist): + def test_182_download_success_getdllist(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download mock_dllist.return_value = { 'fedora-31': qubesadmin.tools.qvm_template.DlEntry( ('1', '2', '3'), 'qubes-templates-itl', 1048576) } with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( - retries=1 + retries=1, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, ) qubesadmin.tools.qvm_template.download(args, self.app, - dir, None, '.unverified', + dir, None, qubesadmin.tools.qvm_template.VersionSelector.LATEST_LOWER) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576) ]) self.assertEqual(mock_dllist.mock_calls, [ @@ -3260,40 +3419,54 @@ test-vm : Qubes template for fedora-31 qubesadmin.tools.qvm_template.\ VersionSelector.LATEST_LOWER) ]) - self.assertTrue(all( - [x.endswith('.unverified') for x in os.listdir(dir)])) + self.assertEqual(mock_verify_rpm.mock_calls, [ + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), + '/tmp/keyring.gpg', template_name='fedora-31'), + ]) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist): + def test_183_download_success_downloaddir(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( retries=1, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) qubesadmin.tools.qvm_template.download(args, self.app, None, { 'fedora-31': qubesadmin.tools.qvm_template.DlEntry( ('1', '2', '3'), 'qubes-templates-itl', 1048576) - }, '.unverified') + }) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576) ]) self.assertEqual(mock_dllist.mock_calls, []) - self.assertTrue(all( - [x.endswith('.unverified') for x in os.listdir(dir)])) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_184_download_success_exists(self, mock_qrexec, mock_dllist): + def test_184_download_success_exists(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download with tempfile.TemporaryDirectory() as dir: with open(os.path.join( - dir, 'qubes-template-fedora-31-1:2-3.rpm.unverified'), + dir, 'qubes-template-fedora-31-1:2-3.rpm'), 'w') as _: pass args = argparse.Namespace( retries=1, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: @@ -3304,47 +3477,22 @@ test-vm : Qubes template for fedora-31 ('0', '1', '2'), 'qubes-templates-itl-testing', 2048576) - }, '.unverified') + }) self.assertTrue('already exists, skipping' in mock_err.getvalue()) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-32-0:1-2', - dir + '/qubes-template-fedora-32-0:1-2.rpm.unverified', + re_str(dir + '/.*/qubes-template-fedora-32-0:1-2.rpm.UNTRUSTED'), 2048576) ]) self.assertEqual(mock_dllist.mock_calls, []) - self.assertTrue(all( - [x.endswith('.unverified') for x in os.listdir(dir)])) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist): - with tempfile.TemporaryDirectory() as dir: - with open(os.path.join( - dir, 'qubes-template-fedora-31-1:2-3.rpm'), - 'w') as _: - pass - args = argparse.Namespace( - retries=1, - downloaddir=dir - ) - with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: - qubesadmin.tools.qvm_template.download(args, self.app, None, { - 'fedora-31': qubesadmin.tools.qvm_template.DlEntry( - ('1', '2', '3'), 'qubes-templates-itl', 1048576) - }, '.unverified') - self.assertTrue('already exists, skipping' - in mock_err.getvalue()) - self.assertEqual(mock_qrexec.mock_calls, []) - self.assertEqual(mock_dllist.mock_calls, []) - self.assertTrue(os.path.exists( - dir + '/qubes-template-fedora-31-1:2-3.rpm.unverified')) - self.assertTrue(all( - [x.endswith('.unverified') for x in os.listdir(dir)])) - - @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') - @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist): + def test_185_download_success_existsmove(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download with tempfile.TemporaryDirectory() as dir: with open(os.path.join( dir, 'qubes-template-fedora-31-1:2-3.rpm'), @@ -3352,6 +3500,10 @@ test-vm : Qubes template for fedora-31 pass args = argparse.Namespace( retries=1, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: @@ -3366,19 +3518,57 @@ test-vm : Qubes template for fedora-31 self.assertTrue(os.path.exists( dir + '/qubes-template-fedora-31-1:2-3.rpm')) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_187_download_success_retry(self, mock_qrexec, mock_dllist): + def test_186_download_success_existsnosuffix(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download + with tempfile.TemporaryDirectory() as dir: + with open(os.path.join( + dir, 'qubes-template-fedora-31-1:2-3.rpm'), + 'w') as _: + pass + args = argparse.Namespace( + retries=1, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, + downloaddir=dir + ) + with mock.patch('sys.stderr', new=io.StringIO()) as mock_err: + qubesadmin.tools.qvm_template.download(args, self.app, None, { + 'fedora-31': qubesadmin.tools.qvm_template.DlEntry( + ('1', '2', '3'), 'qubes-templates-itl', 1048576) + }) + self.assertTrue('already exists, skipping' + in mock_err.getvalue()) + self.assertEqual(mock_qrexec.mock_calls, []) + self.assertEqual(mock_dllist.mock_calls, []) + self.assertTrue(os.path.exists( + dir + '/qubes-template-fedora-31-1:2-3.rpm')) + + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') + @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') + @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') + def test_187_download_success_retry(self, mock_qrexec, mock_dllist, + mock_verify_rpm): counter = 0 - def f(*args): + def f(*args, **kwargs): nonlocal counter counter += 1 if counter == 1: raise ConnectionError + self._mock_qrexec_download(*args, **kwargs) mock_qrexec.side_effect = f with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( retries=2, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ @@ -3389,31 +3579,39 @@ test-vm : Qubes template for fedora-31 }) self.assertTrue('retrying...' in mock_err.getvalue()) self.assertEqual(mock_rm.mock_calls, [ - mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm') + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')) ]) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576), mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576) ]) self.assertEqual(mock_dllist.mock_calls, []) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_188_download_fail_retry(self, mock_qrexec, mock_dllist): + def test_188_download_fail_retry(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download counter = 0 - def f(*args): + def f(*args, **kwargs): nonlocal counter counter += 1 if counter <= 3: raise ConnectionError + self._mock_qrexec_download(*args, **kwargs) mock_qrexec.side_effect = f with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( retries=3, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ @@ -3427,32 +3625,38 @@ test-vm : Qubes template for fedora-31 self.assertEqual(mock_err.getvalue().count('retrying...'), 2) self.assertTrue('download failed' in mock_err.getvalue()) self.assertEqual(mock_rm.mock_calls, [ - mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'), - mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm'), - mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm') + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')), + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')), + mock.call(re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED')) ]) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576), mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576), mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm.UNTRUSTED'), 1048576) ]) self.assertEqual(mock_dllist.mock_calls, []) + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') - def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist): + def test_189_download_fail_interrupt(self, mock_qrexec, mock_dllist, + mock_verify_rpm): def f(*args): raise RuntimeError mock_qrexec.side_effect = f with tempfile.TemporaryDirectory() as dir: args = argparse.Namespace( retries=3, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, downloaddir=dir ) with mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ @@ -3463,12 +3667,45 @@ test-vm : Qubes template for fedora-31 'fedora-31': qubesadmin.tools.qvm_template.DlEntry( ('1', '2', '3'), 'qubes-templates-itl', 1048576) }) - self.assertEqual(mock_rm.mock_calls, [ - mock.call(dir + '/qubes-template-fedora-31-1:2-3.rpm') - ]) self.assertEqual(mock_qrexec.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', - dir + '/qubes-template-fedora-31-1:2-3.rpm', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'), 1048576) ]) self.assertEqual(mock_dllist.mock_calls, []) + + @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') + @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') + @mock.patch('qubesadmin.tools.qvm_template.qrexec_download') + def test_189_download_verify_fail(self, mock_qrexec, mock_dllist, + mock_verify_rpm): + mock_qrexec.side_effect = self._mock_qrexec_download + mock_verify_rpm.side_effect = \ + qubesadmin.tools.qvm_template.SignatureVerificationError + + with tempfile.TemporaryDirectory() as dir: + args = argparse.Namespace( + retries=3, + repo_files=[], + keyring='/tmp/keyring.gpg', + releasever='4.1', + nogpgcheck=False, + downloaddir=dir + ) + with self.assertRaises(qubesadmin.tools.qvm_template.SignatureVerificationError): + qubesadmin.tools.qvm_template.download( + args, self.app, None, { + 'fedora-31': qubesadmin.tools.qvm_template.DlEntry( + ('1', '2', '3'), 'qubes-templates-itl', 1048576) + }) + self.assertEqual(mock_qrexec.mock_calls, [ + mock.call(args, self.app, 'qubes-template-fedora-31-1:2-3', + re_str(dir + '/.*/qubes-template-fedora-31-1:2-3.rpm'), + 1048576) + ]) + self.assertEqual(mock_dllist.mock_calls, []) + self.assertEqual(os.listdir(dir), []) + + # TODO: test unexpected name downloaded + # TODO: test truncated download + # TODO: test no disk space \ No newline at end of file diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index df9c5ab..406f839 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -146,8 +146,6 @@ def get_parser() -> argparse.ArgumentParser: help_str='Upgrade template packages.') for parser_x in [parser_install, parser_reinstall, parser_downgrade, parser_upgrade]: - parser_x.add_argument('--nogpgcheck', action='store_true', - help='Disable signature checks.') parser_x.add_argument('--allow-pv', action='store_true', help='Allow templates that set virt_mode to pv.') parser_x.add_argument('templates', nargs='*', metavar='TEMPLATESPEC') @@ -160,6 +158,8 @@ def get_parser() -> argparse.ArgumentParser: help='Specify download directory.') parser_x.add_argument('--retries', default=5, type=int, help='Specify maximum number of retries for downloads.') + parser_x.add_argument('--nogpgcheck', action='store_true', + help='Disable signature checks.') parser_download.add_argument('templates', nargs='*', metavar='TEMPLATESPEC') # qvm-template {list,info} @@ -602,7 +602,8 @@ def get_keys_for_repos(repo_files: typing.List[str], def verify_rpm( path: str, key: str, - nogpgcheck: bool = False + nogpgcheck: bool = False, + template_name: typing.Optional[str] = None ) -> rpm.hdr: """Verify the digest and signature of a RPM package and return the package header. @@ -614,6 +615,8 @@ def verify_rpm( :param path: Location of the RPM package :param nogpgcheck: Whether to allow invalid GPG signatures + :param template_name: expected template name - if specified, verifies if + the package name matches expected template name :return: RPM package header. If verification fails, raises an exception. """ @@ -635,6 +638,10 @@ def verify_rpm( tset = rpm.TransactionSet() tset.setVSFlags(rpm.RPMVSF_MASK_NOSIGNATURES) hdr = tset.hdrFromFdno(fd) + if template_name is not None: + if hdr[rpm.RPMTAG_NAME] != PACKAGE_NAME_PREFIX + template_name: + raise SignatureVerificationError( + 'Downloaded package does not match expected template name') return hdr def extract_rpm(name: str, path: str, target: str) -> bool: @@ -759,8 +766,8 @@ def download( app: qubesadmin.app.QubesBase, path_override: typing.Optional[str] = None, dl_list: typing.Optional[typing.Dict[str, DlEntry]] = None, - suffix: str = '', - version_selector: VersionSelector = VersionSelector.LATEST) -> None: + version_selector: VersionSelector = VersionSelector.LATEST) \ + -> typing.Dict[str, rpm.hdr]: """Command that downloads template packages. :param args: Arguments received by the application. @@ -770,50 +777,65 @@ def download( :param dl_list: Override list of templates to download. If not set or set to None, ``get_dl_list`` is called, which generates the list from ``args``. Optional - :param suffix: Suffix to add to the file name of downloaded packages. This - is useful if you want to distinguish between verified and unverified - packages. Defaults to an empty string :param version_selector: Specify algorithm to select the candidate version of a package. Defaults to ``VersionSelector.LATEST`` + :return package headers of downloaded templates """ if dl_list is None: dl_list = get_dl_list(args, app, version_selector=version_selector) + keys = get_keys_for_repos(args.repo_files, args.releasever) + + package_hdrs = {} + path = path_override if path_override is not None else args.downloaddir - for name, entry in dl_list.items(): - version_str = build_version_str(entry.evr) - spec = PACKAGE_NAME_PREFIX + name + '-' + version_str - target = os.path.join(path, '%s.rpm' % spec) - target_suffix = target + suffix - if os.path.exists(target_suffix): - print('\'%s\' already exists, skipping...' % target, - file=sys.stderr) - elif os.path.exists(target): - print('\'%s\' already exists, skipping...' % target, - file=sys.stderr) - os.rename(target, target_suffix) - else: + with tempfile.TemporaryDirectory(dir=path) as dl_dir: + for name, entry in dl_list.items(): + version_str = build_version_str(entry.evr) + spec = PACKAGE_NAME_PREFIX + name + '-' + version_str + target = os.path.join(path, '%s.rpm' % spec) + target_temp = os.path.join(dl_dir, '%s.rpm.UNTRUSTED' % spec) + repo_key = keys.get(entry.reponame) + if repo_key is None: + repo_key = args.keyring + if os.path.exists(target): + print('\'%s\' already exists, skipping...' % target, + file=sys.stderr) + # but still verify the package + verify_rpm(target, repo_key, template_name=name) + continue print('Downloading \'%s\'...' % spec, file=sys.stderr) done = False for attempt in range(args.retries): try: - qrexec_download(args, app, spec, target_suffix, + qrexec_download(args, app, spec, target_temp, entry.dlsize) + size = os.path.getsize(target_temp) + if size != entry.dlsize: + raise ConnectionError( + 'Downloaded file is {} bytes, expected {}'.format( + size, entry.dlsize)) done = True break except ConnectionError: - os.remove(target_suffix) + os.remove(target_temp) if attempt + 1 < args.retries: print('\'%s\' download failed, retrying...' % spec, file=sys.stderr) - except: - # Also remove file if interrupted by other means - os.remove(target_suffix) - raise if not done: print('Error: \'%s\' download failed.' % spec, file=sys.stderr) sys.exit(1) + if args.nogpgcheck: + print( + 'Warning: --nogpgcheck is ignored for downloaded templates', + file=sys.stderr) + package_hdr = verify_rpm(target_temp, repo_key, template_name=name) + # after package is verified, rename to the target location + os.rename(target_temp, target) + package_hdrs[name] = package_hdr + return package_hdrs + def locked(func): """Execute given function under a lock in *LOCK_FILE*""" @functools.wraps(func) @@ -854,21 +876,20 @@ def install( unverified_rpm_list = [] # rpmfile, reponame verified_rpm_list = [] - def verify(rpmfile, reponame, dl_dir=None): + def verify(rpmfile, reponame, package_hdr=None): """Verify package signature and version, remove "unverified" - suffix, and parse package header.""" - if dl_dir: - path = os.path.join( - dl_dir, os.path.basename(rpmfile) + UNVERIFIED_SUFFIX) - else: - path = rpmfile + suffix, and parse package header. - repo_key = keys.get(reponame) - if repo_key is None: - repo_key = args.keyring - package_hdr = verify_rpm(path, repo_key, args.nogpgcheck) - if not package_hdr: - parser.error('Package \'%s\' verification failed.' % rpmfile) + If package_hdr is provided, the signature check is skipped and only + other checks are performed.""" + if package_hdr is None: + repo_key = keys.get(reponame) + if repo_key is None: + repo_key = args.keyring + package_hdr = verify_rpm(rpmfile, repo_key, + nogpgcheck=args.nogpgcheck) + if not package_hdr: + parser.error('Package \'%s\' verification failed.' % rpmfile) package_name = package_hdr[rpm.RPMTAG_NAME] if not package_name.startswith(PACKAGE_NAME_PREFIX): @@ -877,9 +898,6 @@ def install( # Remove prefix to get the real template name name = package_name[len(PACKAGE_NAME_PREFIX):] - if path != rpmfile: - os.rename(path, rpmfile) - # Check if already installed if not override_existing and name in app.domains: print(('Template \'%s\' already installed, skipping...' @@ -927,7 +945,7 @@ def install( # First verify local RPMs and extract header for rpmfile, reponame in unverified_rpm_list: verify(rpmfile, reponame) - unverified_rpm_list = [] + unverified_rpm_list = {} os.makedirs(args.cachedir, exist_ok=True) @@ -951,7 +969,7 @@ def install( version_str = build_version_str(entry.evr) target_file = \ '%s%s-%s.rpm' % (PACKAGE_NAME_PREFIX, name, version_str) - unverified_rpm_list.append( + unverified_rpm_list[name] = ( (os.path.join(args.cachedir, target_file), entry.reponame)) dl_list = dl_list_copy @@ -969,15 +987,14 @@ def install( 'This will override changes made in the following VMs:', override_tpls) - with tempfile.TemporaryDirectory(dir=args.cachedir) as dl_dir: - download(args, app, path_override=dl_dir, - dl_list=dl_list, suffix=UNVERIFIED_SUFFIX, - version_selector=version_selector) + package_hdrs = download(args, app, + dl_list=dl_list, + version_selector=version_selector) - # Verify downloaded templates - for rpmfile, reponame in unverified_rpm_list: - verify(rpmfile, reponame, dl_dir=dl_dir) - unverified_rpm_list = [] + # Verify downloaded templates + for name, (rpmfile, reponame) in unverified_rpm_list.items(): + verify(rpmfile, reponame, package_hdrs[name]) + del unverified_rpm_list # Unpack and install for rpmfile, reponame, name, package_hdr in verified_rpm_list: