from unittest import mock import argparse import asyncio import datetime import io import os import pathlib import subprocess import tempfile import rpm import qubesadmin.tests import qubesadmin.tools.qvm_template class TC_00_qvm_template(qubesadmin.tests.QubesTestCase): def setUp(self): super().setUp() def tearDown(self): super().tearDown() def test_000_verify_rpm_success(self): ts = mock.MagicMock() # Just return a dict instead of rpm.hdr hdr = { rpm.RPMTAG_SIGPGP: 'xxx', # non-empty rpm.RPMTAG_SIGGPG: 'xxx', # non-empty } ts.hdrFromFdno.return_value = hdr ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts) ts.hdrFromFdno.assert_called_once() self.assertEqual(hdr, ret) self.assertAllCalled() def test_001_verify_rpm_nosig_fail(self): ts = mock.MagicMock() # Just return a dict instead of rpm.hdr hdr = { rpm.RPMTAG_SIGPGP: None, # empty rpm.RPMTAG_SIGGPG: None, # empty } ts.hdrFromFdno.return_value = hdr ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts) ts.hdrFromFdno.assert_called_once() self.assertEqual(ret, None) self.assertAllCalled() def test_002_verify_rpm_nosig_success(self): ts = mock.MagicMock() # Just return a dict instead of rpm.hdr hdr = { rpm.RPMTAG_SIGPGP: None, # empty rpm.RPMTAG_SIGGPG: None, # empty } ts.hdrFromFdno.return_value = hdr ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts, True) ts.hdrFromFdno.assert_called_once() self.assertEqual(ret, hdr) self.assertAllCalled() def test_003_verify_rpm_badsig_fail(self): ts = mock.MagicMock() def f(*args): raise rpm.error('public key not trusted') ts.hdrFromFdno.side_effect = f ret = qubesadmin.tools.qvm_template.verify_rpm('/dev/null', ts) ts.hdrFromFdno.assert_called_once() self.assertEqual(ret, None) self.assertAllCalled() @mock.patch('subprocess.Popen') def test_010_extract_rpm_success(self, mock_popen): pipe = mock.Mock() mock_popen.return_value.stdout = pipe mock_popen.return_value.wait.return_value = 0 with tempfile.NamedTemporaryFile() as fd, \ tempfile.TemporaryDirectory() as dir: path = fd.name dirpath = dir ret = qubesadmin.tools.qvm_template.extract_rpm( 'test-vm', path, dirpath) self.assertEqual(ret, True) self.assertEqual(mock_popen.mock_calls, [ mock.call(['rpm2cpio', path], stdout=subprocess.PIPE), mock.call([ 'cpio', '-idm', '-D', dirpath, './var/lib/qubes/vm-templates/test-vm/*' ], stdin=pipe, stdout=subprocess.DEVNULL), mock.call().wait(), mock.call().wait() ]) self.assertAllCalled() @mock.patch('subprocess.Popen') def test_011_extract_rpm_fail(self, mock_popen): pipe = mock.Mock() mock_popen.return_value.stdout = pipe mock_popen.return_value.wait.return_value = 1 with tempfile.NamedTemporaryFile() as fd, \ tempfile.TemporaryDirectory() as dir: path = fd.name dirpath = dir ret = qubesadmin.tools.qvm_template.extract_rpm( 'test-vm', path, dirpath) self.assertEqual(ret, False) self.assertEqual(mock_popen.mock_calls, [ mock.call(['rpm2cpio', path], stdout=subprocess.PIPE), mock.call([ 'cpio', '-idm', '-D', dirpath, './var/lib/qubes/vm-templates/test-vm/*' ], stdin=pipe, stdout=subprocess.DEVNULL), mock.call().wait() ]) self.assertAllCalled() def add_new_vm_side_effect(self, *args, **kwargs): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' self.app.domains.clear_cache() return self.app.domains['test-vm'] @mock.patch('os.remove') @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_100_install_local_success( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename, mock_remove): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = b'0\0' build_time = '2020-09-01 14:30:00' # 1598970600 install_time = '2020-09-01 15:30:00' for key, val in [ ('name', 'test-vm'), ('epoch', '2'), ('version', '4.1'), ('release', '2020'), ('reponame', '@commandline'), ('buildtime', build_time), ('installtime', install_time), ('license', 'GPL'), ('url', 'https://qubes-os.org'), ('summary', 'Summary'), ('description', 'Desc|desc')]: self.app.expected_calls[( 'test-vm', 'admin.vm.feature.Set', f'template-{key}', val.encode())] = b'0\0' mock_verify.return_value = { rpm.RPMTAG_NAME : 'qubes-template-test-vm', rpm.RPMTAG_BUILDTIME : 1598970600, rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc', rpm.RPMTAG_EPOCHNUM : 2, rpm.RPMTAG_LICENSE : 'GPL', rpm.RPMTAG_RELEASE : '2020', rpm.RPMTAG_SUMMARY : 'Summary', rpm.RPMTAG_URL : 'https://qubes-os.org', rpm.RPMTAG_VERSION : '4.1' } mock_dl_list.return_value = {} mock_call.side_effect = self.add_new_vm_side_effect mock_time = mock.Mock(wraps=datetime.datetime) mock_time.today.return_value = \ datetime.datetime.fromisoformat(install_time) with mock.patch('builtins.open', mock.mock_open()) as mock_open, \ mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ tempfile.NamedTemporaryFile(suffix='.rpm') as template_file: path = template_file.name args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=False, pool=None ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' qubesadmin.tools.qvm_template.install(args, self.app) # Lock file created self.assertEqual(mock_open.mock_calls, [ mock.call('/var/tmp/qvm-template.lck', 'x'), mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) # Keyring created self.assertEqual(mock_ts.mock_calls, [ mock.call('/usr/share/qubes/repo-templates/keys') ]) # Package verified self.assertEqual(mock_verify.mock_calls, [ mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), False) ]) # Attempt to get download list selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST self.assertEqual(mock_dl_list.mock_calls, [ mock.call(args, self.app, version_selector=selector) ]) # Nothing downloaded self.assertEqual(mock_dl.mock_calls, [ mock.call(args, self.app, path_override='/var/cache/qvm-template', dl_list={}, suffix='.unverified', version_selector=selector) ]) # Package is extracted self.assertEqual(mock_extract.mock_calls, [ mock.call('test-vm', path, '/var/tmp/qvm-template-tmpdir') ]) # No packages overwritten, so no confirm needed self.assertEqual(mock_confirm.mock_calls, []) # qvm-template-postprocess is called self.assertEqual(mock_call.mock_calls, [ mock.call([ 'qvm-template-postprocess', '--really', '--no-installed-by-rpm', 'post-install', 'test-vm', '/var/tmp/qvm-template-tmpdir' '/var/lib/qubes/vm-templates/test-vm' ]) ]) # Cache directory created self.assertEqual(mock_mkdirs.mock_calls, [ mock.call(args.cachedir, exist_ok=True) ]) # No templates downloaded, thus no renames needed self.assertEqual(mock_rename.mock_calls, []) # Lock file removed self.assertEqual(mock_remove.mock_calls, [ mock.call('/var/tmp/qvm-template.lck') ]) self.assertAllCalled() @mock.patch('os.remove') @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_101_install_local_postprocargs_success( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename, mock_remove): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = b'0\0' build_time = '2020-09-01 14:30:00' # 1598970600 install_time = '2020-09-01 15:30:00' for key, val in [ ('name', 'test-vm'), ('epoch', '2'), ('version', '4.1'), ('release', '2020'), ('reponame', '@commandline'), ('buildtime', build_time), ('installtime', install_time), ('license', 'GPL'), ('url', 'https://qubes-os.org'), ('summary', 'Summary'), ('description', 'Desc|desc')]: self.app.expected_calls[( 'test-vm', 'admin.vm.feature.Set', f'template-{key}', val.encode())] = b'0\0' mock_verify.return_value = { rpm.RPMTAG_NAME : 'qubes-template-test-vm', rpm.RPMTAG_BUILDTIME : 1598970600, rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc', rpm.RPMTAG_EPOCHNUM : 2, rpm.RPMTAG_LICENSE : 'GPL', rpm.RPMTAG_RELEASE : '2020', rpm.RPMTAG_SUMMARY : 'Summary', rpm.RPMTAG_URL : 'https://qubes-os.org', rpm.RPMTAG_VERSION : '4.1' } mock_dl_list.return_value = {} mock_call.side_effect = self.add_new_vm_side_effect mock_time = mock.Mock(wraps=datetime.datetime) mock_time.today.return_value = \ datetime.datetime.fromisoformat(install_time) with mock.patch('builtins.open', mock.mock_open()) as mock_open, \ mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ tempfile.NamedTemporaryFile(suffix='.rpm') as template_file: path = template_file.name args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=True, pool='my-pool' ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' qubesadmin.tools.qvm_template.install(args, self.app) # Lock file created self.assertEqual(mock_open.mock_calls, [ mock.call('/var/tmp/qvm-template.lck', 'x'), mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) # Keyring created self.assertEqual(mock_ts.mock_calls, [ mock.call('/usr/share/qubes/repo-templates/keys') ]) # Package verified self.assertEqual(mock_verify.mock_calls, [ mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), False) ]) # Attempt to get download list selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST self.assertEqual(mock_dl_list.mock_calls, [ mock.call(args, self.app, version_selector=selector) ]) # Nothing downloaded self.assertEqual(mock_dl.mock_calls, [ mock.call(args, self.app, path_override='/var/cache/qvm-template', dl_list={}, suffix='.unverified', version_selector=selector) ]) # Package is extracted self.assertEqual(mock_extract.mock_calls, [ mock.call('test-vm', path, '/var/tmp/qvm-template-tmpdir') ]) # No packages overwritten, so no confirm needed self.assertEqual(mock_confirm.mock_calls, []) # qvm-template-postprocess is called self.assertEqual(mock_call.mock_calls, [ mock.call([ 'qvm-template-postprocess', '--really', '--no-installed-by-rpm', '--allow-pv', '--pool', 'my-pool', 'post-install', 'test-vm', '/var/tmp/qvm-template-tmpdir' '/var/lib/qubes/vm-templates/test-vm' ]) ]) # Cache directory created self.assertEqual(mock_mkdirs.mock_calls, [ mock.call(args.cachedir, exist_ok=True) ]) # No templates downloaded, thus no renames needed self.assertEqual(mock_rename.mock_calls, []) # Lock file removed self.assertEqual(mock_remove.mock_calls, [ mock.call('/var/tmp/qvm-template.lck') ]) self.assertAllCalled() @mock.patch('os.remove') @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_102_install_local_badsig_fail( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename, mock_remove): mock_verify.return_value = None mock_time = mock.Mock(wraps=datetime.datetime) with mock.patch('builtins.open', mock.mock_open()) as mock_open, \ mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ tempfile.NamedTemporaryFile(suffix='.rpm') as template_file: path = template_file.name args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=False, pool=None ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' # Should raise parser.error with self.assertRaises(SystemExit): qubesadmin.tools.qvm_template.install(args, self.app) # Lock file created self.assertEqual(mock_open.mock_calls, [ mock.call('/var/tmp/qvm-template.lck', 'x'), mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) # Check error message self.assertTrue('verification failed' in mock_err.getvalue()) # Keyring created self.assertEqual(mock_ts.mock_calls, [ mock.call('/usr/share/qubes/repo-templates/keys') ]) # Package verified self.assertEqual(mock_verify.mock_calls, [ mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), False) ]) # Should not be executed: self.assertEqual(mock_dl_list.mock_calls, []) self.assertEqual(mock_dl.mock_calls, []) self.assertEqual(mock_extract.mock_calls, []) self.assertEqual(mock_confirm.mock_calls, []) self.assertEqual(mock_call.mock_calls, []) self.assertEqual(mock_rename.mock_calls, []) # Lock file removed self.assertEqual(mock_remove.mock_calls, [ mock.call('/var/tmp/qvm-template.lck') ]) self.assertAllCalled() @mock.patch('os.remove') @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_103_install_local_exists_fail( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename, mock_remove): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\0test-vm class=TemplateVM state=Halted\n' mock_verify.return_value = { rpm.RPMTAG_NAME : 'qubes-template-test-vm', rpm.RPMTAG_BUILDTIME : 1598970600, rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc', rpm.RPMTAG_EPOCHNUM : 2, rpm.RPMTAG_LICENSE : 'GPL', rpm.RPMTAG_RELEASE : '2020', rpm.RPMTAG_SUMMARY : 'Summary', rpm.RPMTAG_URL : 'https://qubes-os.org', rpm.RPMTAG_VERSION : '4.1' } mock_dl_list.return_value = {} mock_time = mock.Mock(wraps=datetime.datetime) with mock.patch('builtins.open', mock.mock_open()) as mock_open, \ mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ tempfile.NamedTemporaryFile(suffix='.rpm') as template_file: path = template_file.name args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=False, pool=None ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' qubesadmin.tools.qvm_template.install(args, self.app) # Lock file created self.assertEqual(mock_open.mock_calls, [ mock.call('/var/tmp/qvm-template.lck', 'x'), mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) # Check warning message self.assertTrue('already installed' in mock_err.getvalue()) # Keyring created self.assertEqual(mock_ts.mock_calls, [ mock.call('/usr/share/qubes/repo-templates/keys') ]) # Package verified self.assertEqual(mock_verify.mock_calls, [ mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), False) ]) # Attempt to get download list selector = qubesadmin.tools.qvm_template.VersionSelector.LATEST self.assertEqual(mock_dl_list.mock_calls, [ mock.call(args, self.app, version_selector=selector) ]) # Nothing downloaded self.assertEqual(mock_dl.mock_calls, [ mock.call(args, self.app, path_override='/var/cache/qvm-template', dl_list={}, suffix='.unverified', version_selector=selector) ]) # Should not be executed: self.assertEqual(mock_extract.mock_calls, []) self.assertEqual(mock_confirm.mock_calls, []) self.assertEqual(mock_call.mock_calls, []) self.assertEqual(mock_rename.mock_calls, []) # Lock file removed self.assertEqual(mock_remove.mock_calls, [ mock.call('/var/tmp/qvm-template.lck') ]) self.assertAllCalled() @mock.patch('os.remove') @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_104_install_local_badpkgname_fail( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename, mock_remove): mock_verify.return_value = { rpm.RPMTAG_NAME : 'Xqubes-template-test-vm', rpm.RPMTAG_BUILDTIME : 1598970600, rpm.RPMTAG_DESCRIPTION : 'Desc\ndesc', rpm.RPMTAG_EPOCHNUM : 2, rpm.RPMTAG_LICENSE : 'GPL', rpm.RPMTAG_RELEASE : '2020', rpm.RPMTAG_SUMMARY : 'Summary', rpm.RPMTAG_URL : 'https://qubes-os.org', rpm.RPMTAG_VERSION : '4.1' } mock_time = mock.Mock(wraps=datetime.datetime) with mock.patch('builtins.open', mock.mock_open()) as mock_open, \ mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ tempfile.NamedTemporaryFile(suffix='.rpm') as template_file: path = template_file.name args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=False, pool=None ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' with self.assertRaises(SystemExit): qubesadmin.tools.qvm_template.install(args, self.app) # Lock file created self.assertEqual(mock_open.mock_calls, [ mock.call('/var/tmp/qvm-template.lck', 'x'), mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) # Check error message self.assertTrue('Illegal package name' in mock_err.getvalue()) # Keyring created self.assertEqual(mock_ts.mock_calls, [ mock.call('/usr/share/qubes/repo-templates/keys') ]) # Package verified self.assertEqual(mock_verify.mock_calls, [ mock.call(path, mock_ts('/usr/share/qubes/repo-templates/keys'), False) ]) # Should not be executed: self.assertEqual(mock_dl_list.mock_calls, []) self.assertEqual(mock_dl.mock_calls, []) self.assertEqual(mock_extract.mock_calls, []) self.assertEqual(mock_confirm.mock_calls, []) self.assertEqual(mock_call.mock_calls, []) self.assertEqual(mock_rename.mock_calls, []) # Lock file removed self.assertEqual(mock_remove.mock_calls, [ mock.call('/var/tmp/qvm-template.lck') ]) self.assertAllCalled() @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_105_install_local_existinginstance_fail( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename): mock_time = mock.Mock(wraps=datetime.datetime) with mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err, \ tempfile.NamedTemporaryFile(suffix='.rpm') as template_file: path = template_file.name args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=False, pool=None ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' pathlib.Path('/var/tmp/qvm-template.lck').touch() try: with self.assertRaises(SystemExit), \ mock.patch('os.remove') as mock_remove: qubesadmin.tools.qvm_template.install(args, self.app) self.assertEqual(mock_remove.mock_calls, []) finally: # Lock file not removed self.assertTrue(os.path.exists('/var/tmp/qvm-template.lck')) os.remove('/var/tmp/qvm-template.lck') # Check error message self.assertTrue('another instance of qvm-template is running' \ in mock_err.getvalue()) # Should not be executed: self.assertEqual(mock_ts.mock_calls, []) self.assertEqual(mock_verify.mock_calls, []) self.assertEqual(mock_dl_list.mock_calls, []) self.assertEqual(mock_dl.mock_calls, []) self.assertEqual(mock_extract.mock_calls, []) self.assertEqual(mock_confirm.mock_calls, []) self.assertEqual(mock_call.mock_calls, []) self.assertEqual(mock_rename.mock_calls, []) self.assertAllCalled() @mock.patch('os.remove') @mock.patch('os.rename') @mock.patch('os.makedirs') @mock.patch('subprocess.check_call') @mock.patch('qubesadmin.tools.qvm_template.confirm_action') @mock.patch('qubesadmin.tools.qvm_template.extract_rpm') @mock.patch('qubesadmin.tools.qvm_template.download') @mock.patch('qubesadmin.tools.qvm_template.get_dl_list') @mock.patch('qubesadmin.tools.qvm_template.verify_rpm') @mock.patch('qubesadmin.tools.qvm_template.rpm_transactionset') def test_106_install_local_badpath_fail( self, mock_ts, mock_verify, mock_dl_list, mock_dl, mock_extract, mock_confirm, mock_call, mock_mkdirs, mock_rename, mock_remove): mock_time = mock.Mock(wraps=datetime.datetime) with mock.patch('builtins.open', mock.mock_open()) as mock_open, \ mock.patch('datetime.datetime', new=mock_time), \ mock.patch('tempfile.TemporaryDirectory') as mock_tmpdir, \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: path = '/var/tmp/ShOulD-NoT-ExIsT.rpm' args = argparse.Namespace( templates=[path], keyring='/usr/share/qubes/repo-templates/keys', nogpgcheck=False, cachedir='/var/cache/qvm-template', yes=False, allow_pv=False, pool=None ) mock_tmpdir.return_value.__enter__.return_value = \ '/var/tmp/qvm-template-tmpdir' with self.assertRaises(SystemExit): qubesadmin.tools.qvm_template.install(args, self.app) # Lock file created self.assertEqual(mock_open.mock_calls, [ mock.call('/var/tmp/qvm-template.lck', 'x'), mock.call().__enter__(), mock.call().__exit__(None, None, None) ]) # Check error message self.assertTrue(f"RPM file '{path}' not found" \ in mock_err.getvalue()) # Keyring created self.assertEqual(mock_ts.mock_calls, [ mock.call('/usr/share/qubes/repo-templates/keys') ]) # Should not be executed: self.assertEqual(mock_verify.mock_calls, []) self.assertEqual(mock_dl_list.mock_calls, []) self.assertEqual(mock_dl.mock_calls, []) self.assertEqual(mock_extract.mock_calls, []) self.assertEqual(mock_confirm.mock_calls, []) self.assertEqual(mock_call.mock_calls, []) self.assertEqual(mock_rename.mock_calls, []) # Lock file removed self.assertEqual(mock_remove.mock_calls, [ mock.call('/var/tmp/qvm-template.lck') ]) self.assertAllCalled() def test_110_qrexec_payload_refresh_success(self): with tempfile.NamedTemporaryFile() as repo_conf1, \ tempfile.NamedTemporaryFile() as repo_conf2: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_str2 = \ '''[qubes-templates-itl-testing] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl-testing #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl-testing metalink = https://yum.qubes-os.org/r$releasever/templates-itl-testing/repodata/repomd.xml.metalink enabled = 0 fastestmirror = 1 gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() repo_conf2.write(repo_str2.encode()) repo_conf2.flush() args = argparse.Namespace( enablerepo=['repo1', 'repo2'], disablerepo=['repo3', 'repo4', 'repo5'], repoid=[], releasever='4.1', repo_files=[repo_conf1.name, repo_conf2.name] ) res = qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora-32', True) self.assertEqual(res, '''--enablerepo=repo1 --enablerepo=repo2 --disablerepo=repo3 --disablerepo=repo4 --disablerepo=repo5 --refresh --releasever=4.1 qubes-template-fedora-32 --- ''' + repo_str1 + '\n' + repo_str2 + '\n') self.assertAllCalled() def test_111_qrexec_payload_norefresh_success(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=[], disablerepo=[], repoid=['repo1', 'repo2'], releasever='4.1', repo_files=[repo_conf1.name] ) res = qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora-32', False) self.assertEqual(res, '''--repoid=repo1 --repoid=repo2 --releasever=4.1 qubes-template-fedora-32 --- ''' + repo_str1 + '\n') self.assertAllCalled() def test_112_qrexec_payload_specnewline_fail(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=[], disablerepo=[], repoid=['repo1', 'repo2'], releasever='4.1', repo_files=[repo_conf1.name] ) with self.assertRaises(SystemExit), \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora\n-32', False) # Check error message self.assertTrue('Malformed template name' in mock_err.getvalue()) self.assertTrue("argument should not contain '\\n'" in mock_err.getvalue()) self.assertAllCalled() def test_113_qrexec_payload_enablereponewline_fail(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=['repo\n0'], disablerepo=[], repoid=['repo1', 'repo2'], releasever='4.1', repo_files=[repo_conf1.name] ) with self.assertRaises(SystemExit), \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora-32', False) # Check error message self.assertTrue('Malformed --enablerepo' in mock_err.getvalue()) self.assertTrue("argument should not contain '\\n'" in mock_err.getvalue()) self.assertAllCalled() def test_114_qrexec_payload_disablereponewline_fail(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=[], disablerepo=['repo\n0'], repoid=['repo1', 'repo2'], releasever='4.1', repo_files=[repo_conf1.name] ) with self.assertRaises(SystemExit), \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora-32', False) # Check error message self.assertTrue('Malformed --disablerepo' in mock_err.getvalue()) self.assertTrue("argument should not contain '\\n'" in mock_err.getvalue()) self.assertAllCalled() def test_115_qrexec_payload_repoidnewline_fail(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=[], disablerepo=[], repoid=['repo\n1', 'repo2'], releasever='4.1', repo_files=[repo_conf1.name] ) with self.assertRaises(SystemExit), \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora-32', False) # Check error message self.assertTrue('Malformed --repoid' in mock_err.getvalue()) self.assertTrue("argument should not contain '\\n'" in mock_err.getvalue()) self.assertAllCalled() def test_116_qrexec_payload_releasevernewline_fail(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=[], disablerepo=[], repoid=['repo1', 'repo2'], releasever='4\n.1', repo_files=[repo_conf1.name] ) with self.assertRaises(SystemExit), \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, 'qubes-template-fedora-32', False) # Check error message self.assertTrue('Malformed --releasever' in mock_err.getvalue()) self.assertTrue("argument should not contain '\\n'" in mock_err.getvalue()) self.assertAllCalled() def test_117_qrexec_payload_specdash_fail(self): with tempfile.NamedTemporaryFile() as repo_conf1: repo_str1 = \ '''[qubes-templates-itl] name = Qubes Templates repository #baseurl = https://yum.qubes-os.org/r$releasever/templates-itl #baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink enabled = 1 fastestmirror = 1 metadata_expire = 7d gpgcheck = 1 gpgkey = file:///usr/share/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary ''' repo_conf1.write(repo_str1.encode()) repo_conf1.flush() args = argparse.Namespace( enablerepo=[], disablerepo=[], repoid=['repo1', 'repo2'], releasever='4.1', repo_files=[repo_conf1.name] ) with self.assertRaises(SystemExit), \ mock.patch('sys.stderr', new=io.StringIO()) as mock_err: qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, '---', False) # Check error message self.assertTrue('Malformed template name' in mock_err.getvalue()) self.assertTrue("argument should not be '---'" in mock_err.getvalue()) self.assertAllCalled() @mock.patch('qubesadmin.tools.qvm_template.qrexec_payload') def test_120_qrexec_repoquery_success(self, mock_payload): args = argparse.Namespace(updatevm='test-vm') mock_payload.return_value = 'str1\nstr2' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00test-vm class=AppVM state=Halted\n' self.app.expected_service_calls[ ('test-vm', 'qubes.TemplateSearch')] = \ b'''qubes-template-fedora-32|0|4.1|20200101|qubes-templates-itl|1048576|2020-01-23 04:56|GPL|https://qubes-os.org|Qubes template for fedora-32|Qubes template\n for fedora-32\n| qubes-template-fedora-32|1|4.2|20200201|qubes-templates-itl-testing|2048576|2020-02-23 04:56|GPLv2|https://qubes-os.org/?|Qubes template for fedora-32 v2|Qubes template\n for fedora-32 v2\n| ''' res = qubesadmin.tools.qvm_template.qrexec_repoquery(args, self.app, 'qubes-template-fedora-32') self.assertEqual(res, [ qubesadmin.tools.qvm_template.Template( 'fedora-32', '0', '4.1', '20200101', 'qubes-templates-itl', 1048576, datetime.datetime(2020, 1, 23, 4, 56), 'GPL', 'https://qubes-os.org', 'Qubes template for fedora-32', 'Qubes template\n for fedora-32\n' ), qubesadmin.tools.qvm_template.Template( 'fedora-32', '1', '4.2', '20200201', 'qubes-templates-itl-testing', 2048576, datetime.datetime(2020, 2, 23, 4, 56), 'GPLv2', 'https://qubes-os.org/?', 'Qubes template for fedora-32 v2', 'Qubes template\n for fedora-32 v2\n' ) ]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.TemplateSearch', {'filter_esc': True, 'stdout': subprocess.PIPE}), ('test-vm', 'qubes.TemplateSearch', b'str1\nstr2') ]) self.assertEqual(mock_payload.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-32', False) ]) self.assertAllCalled() @mock.patch('qubesadmin.tools.qvm_template.qrexec_payload') def test_121_qrexec_repoquery_refresh_success(self, mock_payload): args = argparse.Namespace(updatevm='test-vm') mock_payload.return_value = 'str1\nstr2' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00test-vm class=AppVM state=Halted\n' self.app.expected_service_calls[ ('test-vm', 'qubes.TemplateSearch')] = \ b'''qubes-template-fedora-32|0|4.1|20200101|qubes-templates-itl|1048576|2020-01-23 04:56|GPL|https://qubes-os.org|Qubes template for fedora-32|Qubes template\n for fedora-32\n| qubes-template-fedora-32|1|4.2|20200201|qubes-templates-itl-testing|2048576|2020-02-23 04:56|GPLv2|https://qubes-os.org/?|Qubes template for fedora-32 v2|Qubes template\n for fedora-32 v2\n| ''' res = qubesadmin.tools.qvm_template.qrexec_repoquery(args, self.app, 'qubes-template-fedora-32', True) self.assertEqual(res, [ qubesadmin.tools.qvm_template.Template( 'fedora-32', '0', '4.1', '20200101', 'qubes-templates-itl', 1048576, datetime.datetime(2020, 1, 23, 4, 56), 'GPL', 'https://qubes-os.org', 'Qubes template for fedora-32', 'Qubes template\n for fedora-32\n' ), qubesadmin.tools.qvm_template.Template( 'fedora-32', '1', '4.2', '20200201', 'qubes-templates-itl-testing', 2048576, datetime.datetime(2020, 2, 23, 4, 56), 'GPLv2', 'https://qubes-os.org/?', 'Qubes template for fedora-32 v2', 'Qubes template\n for fedora-32 v2\n' ) ]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.TemplateSearch', {'filter_esc': True, 'stdout': subprocess.PIPE}), ('test-vm', 'qubes.TemplateSearch', b'str1\nstr2') ]) self.assertEqual(mock_payload.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-32', True) ]) self.assertAllCalled() @mock.patch('qubesadmin.tools.qvm_template.qrexec_payload') def test_122_qrexec_repoquery_ignorenonspec_success(self, mock_payload): args = argparse.Namespace(updatevm='test-vm') mock_payload.return_value = 'str1\nstr2' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00test-vm class=AppVM state=Halted\n' self.app.expected_service_calls[ ('test-vm', 'qubes.TemplateSearch')] = \ b'''qubes-template-debian-10|1|4.2|20200201|qubes-templates-itl-testing|2048576|2020-02-23 04:56|GPLv2|https://qubes-os.org/?|Qubes template for debian-10|Qubes template for debian-10\n| qubes-template-fedora-32|0|4.1|20200101|qubes-templates-itl|1048576|2020-01-23 04:56|GPL|https://qubes-os.org|Qubes template for fedora-32|Qubes template for fedora-32\n| ''' res = qubesadmin.tools.qvm_template.qrexec_repoquery(args, self.app, 'qubes-template-fedora-32') self.assertEqual(res, [ qubesadmin.tools.qvm_template.Template( 'fedora-32', '0', '4.1', '20200101', 'qubes-templates-itl', 1048576, datetime.datetime(2020, 1, 23, 4, 56), 'GPL', 'https://qubes-os.org', 'Qubes template for fedora-32', 'Qubes template for fedora-32\n' ) ]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.TemplateSearch', {'filter_esc': True, 'stdout': subprocess.PIPE}), ('test-vm', 'qubes.TemplateSearch', b'str1\nstr2') ]) self.assertEqual(mock_payload.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-32', False) ]) self.assertAllCalled() @mock.patch('qubesadmin.tools.qvm_template.qrexec_payload') def test_123_qrexec_repoquery_ignorebadname_success(self, mock_payload): args = argparse.Namespace(updatevm='test-vm') mock_payload.return_value = 'str1\nstr2' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00test-vm class=AppVM state=Halted\n' self.app.expected_service_calls[ ('test-vm', 'qubes.TemplateSearch')] = \ b'''template-fedora-32|1|4.2|20200201|qubes-templates-itl-testing|2048576|2020-02-23 04:56|GPLv2|https://qubes-os.org/?|Qubes template for fedora-32 v2|Qubes template\n for fedora-32 v2\n| qubes-template-fedora-32|0|4.1|20200101|qubes-templates-itl|1048576|2020-01-23 04:56|GPL|https://qubes-os.org|Qubes template for fedora-32|Qubes template for fedora-32\n| ''' res = qubesadmin.tools.qvm_template.qrexec_repoquery(args, self.app, 'qubes-template-fedora-32') self.assertEqual(res, [ qubesadmin.tools.qvm_template.Template( 'fedora-32', '0', '4.1', '20200101', 'qubes-templates-itl', 1048576, datetime.datetime(2020, 1, 23, 4, 56), 'GPL', 'https://qubes-os.org', 'Qubes template for fedora-32', 'Qubes template for fedora-32\n' ) ]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.TemplateSearch', {'filter_esc': True, 'stdout': subprocess.PIPE}), ('test-vm', 'qubes.TemplateSearch', b'str1\nstr2') ]) self.assertEqual(mock_payload.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-32', False) ]) self.assertAllCalled() @mock.patch('qubesadmin.tools.qvm_template.qrexec_payload') def test_124_qrexec_repoquery_searchfail_fail(self, mock_payload): args = argparse.Namespace(updatevm='test-vm') mock_payload.return_value = 'str1\nstr2' self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ b'0\x00test-vm class=AppVM state=Halted\n' with mock.patch('qubesadmin.tests.TestProcess.wait') \ as mock_wait: mock_wait.return_value = 1 with self.assertRaises(ConnectionError): qubesadmin.tools.qvm_template.qrexec_repoquery(args, self.app, 'qubes-template-fedora-32') self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.TemplateSearch', {'filter_esc': True, 'stdout': subprocess.PIPE}), ('test-vm', 'qubes.TemplateSearch', b'str1\nstr2') ]) self.assertEqual(mock_payload.mock_calls, [ mock.call(args, self.app, 'qubes-template-fedora-32', False) ]) self.assertAllCalled() # TODO: Also test feeding broken data to qrexec_repoquery