123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- # -*- encoding: utf8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2017 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU Lesser General Public License as published by
- # the Free Software Foundation; either version 2.1 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Lesser General Public License for more details.
- #
- # You should have received a copy of the GNU Lesser General Public License along
- # with this program; if not, see <http://www.gnu.org/licenses/>.
- import asyncio
- import os
- import subprocess
- import tempfile
- from unittest import mock
- import qubesadmin.tests
- import qubesadmin.tools.qvm_template_postprocess
- class QubesLocalMock(qubesadmin.tests.QubesTest):
- def __init__(self):
- super(QubesLocalMock, self).__init__()
- self.__class__ = qubesadmin.app.QubesLocal
- qubesd_call = qubesadmin.tests.QubesTest.qubesd_call
- run_service = qubesadmin.tests.QubesTest.run_service
- class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
- def setUp(self):
- super(TC_00_qvm_template_postprocess, self).setUp()
- self.source_dir = tempfile.TemporaryDirectory()
- def tearDown(self):
- try:
- self.source_dir.cleanup()
- except FileNotFoundError:
- pass
- super(TC_00_qvm_template_postprocess, self).tearDown()
- def test_000_import_root_img_raw(self):
- root_img = os.path.join(self.source_dir.name, 'root.img')
- volume_data = b'volume data'
- with open(root_img, 'wb') as f:
- f.write(volume_data)
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
- None)] = \
- b'0\0root\nprivate\nvolatile\nkernel\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-test-vm-root\n' \
- b'size=10737418240\n' \
- b'usage=0\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
- str(len(volume_data)).encode())] = \
- b'0\0'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
- volume_data)] = b'0\0'
- vm = self.app.domains['test-vm']
- qubesadmin.tools.qvm_template_postprocess.import_root_img(
- vm, self.source_dir.name)
- self.assertAllCalled()
- def test_001_import_root_img_tar(self):
- root_img = os.path.join(self.source_dir.name, 'root.img')
- volume_data = b'volume data' * 1000
- with open(root_img, 'wb') as f:
- f.write(volume_data)
- subprocess.check_call(['tar', 'cf', 'root.img.tar', 'root.img'],
- cwd=self.source_dir.name)
- subprocess.check_call(['split', '-d', '-b', '1024', 'root.img.tar',
- 'root.img.part.'], cwd=self.source_dir.name)
- os.unlink(root_img)
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=lvm\n' \
- b'vid=qubes_dom0/vm-test-vm-root\n' \
- b'size=10737418240\n' \
- b'usage=0\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
- None)] = \
- b'0\0root\nprivate\nvolatile\nkernel\n'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
- str(len(volume_data)).encode())] = \
- b'0\0'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
- volume_data)] = b'0\0'
- vm = self.app.domains['test-vm']
- qubesadmin.tools.qvm_template_postprocess.import_root_img(
- vm, self.source_dir.name)
- self.assertAllCalled()
- def test_002_import_root_img_no_overwrite(self):
- self.app.qubesd_connection_type = 'socket'
- template_dir = os.path.join(self.source_dir.name, 'vm-templates',
- 'test-vm')
- os.makedirs(template_dir)
- root_img = os.path.join(template_dir, 'root.img')
- volume_data = b'volume data'
- with open(root_img, 'wb') as f:
- f.write(volume_data)
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.List', None, None)] = \
- b'0\0root\nprivate\nvolatile\nkernel\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
- b'0\x00pool=default\n' \
- b'vid=vm-templates/test-vm/root\n' \
- b'size=10737418240\n' \
- b'usage=0\n' \
- b'rw=True\n' \
- b'source=\n' \
- b'save_on_stop=True\n' \
- b'snap_on_start=False\n' \
- b'revisions_to_keep=3\n' \
- b'is_outdated=False\n'
- self.app.expected_calls[
- ('dom0', 'admin.pool.List', None, None)] = \
- b'0\0default\n'
- self.app.expected_calls[
- ('dom0', 'admin.pool.Info', 'default', None)] = \
- b'0\0driver=file\ndir_path=' + self.source_dir.name.encode() + b'\n'
- vm = self.app.domains['test-vm']
- qubesadmin.tools.qvm_template_postprocess.import_root_img(
- vm, template_dir)
- self.assertAllCalled()
- def test_005_reset_private_img(self):
- self.app.qubesd_connection_type = 'socket'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.volume.List', None, None)] = \
- b'0\0root\nprivate\nvolatile\nkernel\n'
- self.app.expected_calls[('test-vm', 'admin.vm.volume.Clear', 'private',
- None)] = b'0\0'
- vm = self.app.domains['test-vm']
- qubesadmin.tools.qvm_template_postprocess.reset_private_img(vm)
- self.assertAllCalled()
- def test_010_import_appmenus(self):
- default_menu_items = [
- 'org.gnome.Terminal.desktop',
- 'firefox.desktop']
- menu_items = [
- 'org.gnome.Terminal.desktop',
- 'org.gnome.Software.desktop',
- 'gnome-control-center.desktop']
- netvm_menu_items = [
- 'org.gnome.Terminal.desktop',
- 'nm-connection-editor.desktop']
- with open(os.path.join(self.source_dir.name,
- 'vm-whitelisted-appmenus.list'), 'w') as f:
- for entry in default_menu_items:
- f.write(entry + '\n')
- with open(os.path.join(self.source_dir.name,
- 'whitelisted-appmenus.list'), 'w') as f:
- for entry in menu_items:
- f.write(entry + '\n')
- with open(os.path.join(self.source_dir.name,
- 'netvm-whitelisted-appmenus.list'), 'w') as f:
- for entry in netvm_menu_items:
- f.write(entry + '\n')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[(
- 'test-vm',
- 'admin.vm.feature.Set',
- 'default-menu-items',
- ' '.join(default_menu_items).encode())] = b'0\0'
- self.app.expected_calls[(
- 'test-vm',
- 'admin.vm.feature.Set',
- 'menu-items',
- ' '.join(menu_items).encode())] = b'0\0'
- self.app.expected_calls[(
- 'test-vm',
- 'admin.vm.feature.Set',
- 'netvm-menu-items',
- ' '.join(netvm_menu_items).encode())] = b'0\0'
- vm = self.app.domains['test-vm']
- with mock.patch('subprocess.check_call') as mock_proc:
- qubesadmin.tools.qvm_template_postprocess.import_appmenus(
- vm, self.source_dir.name)
- self.assertEqual(mock_proc.mock_calls, [
- mock.call(['qvm-appmenus',
- '--set-default-whitelist=' + os.path.join(self.source_dir.name,
- 'vm-whitelisted-appmenus.list'), 'test-vm']),
- mock.call(['qvm-appmenus', '--set-whitelist=' + os.path.join(
- self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
- ])
- self.assertAllCalled()
- @mock.patch('grp.getgrnam')
- @mock.patch('os.getuid')
- def test_011_import_appmenus_as_root(self, mock_getuid, mock_getgrnam):
- default_menu_items = [
- 'org.gnome.Terminal.desktop',
- 'firefox.desktop']
- menu_items = [
- 'org.gnome.Terminal.desktop',
- 'org.gnome.Software.desktop',
- 'gnome-control-center.desktop']
- netvm_menu_items = [
- 'org.gnome.Terminal.desktop',
- 'nm-connection-editor.desktop']
- with open(os.path.join(self.source_dir.name,
- 'vm-whitelisted-appmenus.list'), 'w') as f:
- for entry in default_menu_items:
- f.write(entry + '\n')
- with open(os.path.join(self.source_dir.name,
- 'whitelisted-appmenus.list'), 'w') as f:
- for entry in menu_items:
- f.write(entry + '\n')
- with open(os.path.join(self.source_dir.name,
- 'netvm-whitelisted-appmenus.list'), 'w') as f:
- for entry in netvm_menu_items:
- f.write(entry + '\n')
- self.app.expected_calls[(
- 'test-vm',
- 'admin.vm.feature.Set',
- 'default-menu-items',
- ' '.join(default_menu_items).encode())] = b'0\0'
- self.app.expected_calls[(
- 'test-vm',
- 'admin.vm.feature.Set',
- 'menu-items',
- ' '.join(menu_items).encode())] = b'0\0'
- self.app.expected_calls[(
- 'test-vm',
- 'admin.vm.feature.Set',
- 'netvm-menu-items',
- ' '.join(netvm_menu_items).encode())] = b'0\0'
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- mock_getuid.return_value = 0
- mock_getgrnam.configure_mock(**{
- 'return_value.gr_mem.__getitem__.return_value': 'user'
- })
- vm = self.app.domains['test-vm']
- with mock.patch('subprocess.check_call') as mock_proc:
- qubesadmin.tools.qvm_template_postprocess.import_appmenus(
- vm, self.source_dir.name)
- self.assertEqual(mock_proc.mock_calls, [
- mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
- 'qvm-appmenus',
- '--set-default-whitelist=' + os.path.join(self.source_dir.name,
- 'vm-whitelisted-appmenus.list'), 'test-vm']),
- mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
- 'qvm-appmenus', '--set-whitelist=' + os.path.join(
- self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
- ])
- self.assertAllCalled()
- @mock.patch('grp.getgrnam')
- @mock.patch('os.getuid')
- def test_012_import_appmenus_missing_user(self, mock_getuid, mock_getgrnam):
- with open(os.path.join(self.source_dir.name,
- 'vm-whitelisted-appmenus.list'), 'w') as f:
- f.write('org.gnome.Terminal.desktop\n')
- f.write('firefox.desktop\n')
- with open(os.path.join(self.source_dir.name,
- 'whitelisted-appmenus.list'), 'w') as f:
- f.write('org.gnome.Terminal.desktop\n')
- f.write('org.gnome.Software.desktop\n')
- f.write('gnome-control-center.desktop\n')
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- mock_getuid.return_value = 0
- mock_getgrnam.side_effect = KeyError
- vm = self.app.domains['test-vm']
- with mock.patch('subprocess.check_call') as mock_proc:
- qubesadmin.tools.qvm_template_postprocess.import_appmenus(
- vm, self.source_dir.name)
- self.assertEqual(mock_proc.mock_calls, [])
- self.assertAllCalled()
- def add_new_vm_side_effect(self, *args, **kwargs):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.domains.clear_cache()
- return self.app.domains['test-vm']
- @asyncio.coroutine
- def wait_for_shutdown(self, vm):
- pass
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
- def test_020_post_install(self, mock_import_root_img,
- mock_import_appmenus):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0'
- self.app.add_new_vm = mock.Mock(side_effect=self.add_new_vm_side_effect)
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
- = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
- if qubesadmin.tools.qvm_template_postprocess.have_events:
- patch_domain_shutdown = mock.patch(
- 'qubesadmin.events.utils.wait_for_domain_shutdown')
- self.addCleanup(patch_domain_shutdown.stop)
- mock_domain_shutdown = patch_domain_shutdown.start()
- mock_domain_shutdown.side_effect = self.wait_for_shutdown
- else:
- self.app.expected_calls[
- ('test-vm', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- asyncio.set_event_loop(asyncio.new_event_loop())
- ret = qubesadmin.tools.qvm_template_postprocess.main([
- '--really', 'post-install', 'test-vm', self.source_dir.name],
- app=self.app)
- self.assertEqual(ret, 0)
- self.app.add_new_vm.assert_called_once_with('TemplateVM',
- name='test-vm', label='black', pool=None)
- mock_import_root_img.assert_called_once_with(self.app.domains[
- 'test-vm'], self.source_dir.name)
- mock_import_appmenus.assert_called_once_with(self.app.domains[
- 'test-vm'], self.source_dir.name)
- if qubesadmin.tools.qvm_template_postprocess.have_events:
- mock_domain_shutdown.assert_called_once_with([self.app.domains[
- 'test-vm']])
- self.assertEqual(self.app.service_calls, [
- ('test-vm', 'qubes.PostInstall', {}),
- ('test-vm', 'qubes.PostInstall', b''),
- ])
- self.assertAllCalled()
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img')
- def test_021_post_install_reinstall(self, mock_reset_private_img,
- mock_import_root_img, mock_import_appmenus):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.add_new_vm = mock.Mock()
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
- = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
- if qubesadmin.tools.qvm_template_postprocess.have_events:
- patch_domain_shutdown = mock.patch(
- 'qubesadmin.events.utils.wait_for_domain_shutdown')
- self.addCleanup(patch_domain_shutdown.stop)
- mock_domain_shutdown = patch_domain_shutdown.start()
- mock_domain_shutdown.side_effect = self.wait_for_shutdown
- else:
- self.app.expected_calls[
- ('test-vm', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- asyncio.set_event_loop(asyncio.new_event_loop())
- ret = qubesadmin.tools.qvm_template_postprocess.main([
- '--really', 'post-install', 'test-vm', self.source_dir.name],
- app=self.app)
- self.assertEqual(ret, 0)
- self.assertFalse(self.app.add_new_vm.called)
- mock_import_root_img.assert_called_once_with(self.app.domains[
- 'test-vm'], self.source_dir.name)
- mock_reset_private_img.assert_called_once_with(self.app.domains[
- 'test-vm'])
- mock_import_appmenus.assert_called_once_with(self.app.domains[
- 'test-vm'], self.source_dir.name)
- if qubesadmin.tools.qvm_template_postprocess.have_events:
- mock_domain_shutdown.assert_called_once_with([self.app.domains[
- 'test-vm']])
- self.assertEqual(self.app.service_calls, [
- ('test-vm', 'qubes.PostInstall', {}),
- ('test-vm', 'qubes.PostInstall', b''),
- ])
- self.assertAllCalled()
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
- @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img')
- def test_022_post_install_skip_start(self, mock_reset_private_img,
- mock_import_root_img, mock_import_appmenus):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
- = b'0\0'
- self.app.add_new_vm = mock.Mock()
- if qubesadmin.tools.qvm_template_postprocess.have_events:
- patch_domain_shutdown = mock.patch(
- 'qubesadmin.events.utils.wait_for_domain_shutdown')
- self.addCleanup(patch_domain_shutdown.stop)
- mock_domain_shutdown = patch_domain_shutdown.start()
- mock_domain_shutdown.side_effect = self.wait_for_shutdown
- asyncio.set_event_loop(asyncio.new_event_loop())
- ret = qubesadmin.tools.qvm_template_postprocess.main([
- '--really', '--skip-start', 'post-install', 'test-vm',
- self.source_dir.name],
- app=self.app)
- self.assertEqual(ret, 0)
- self.assertFalse(self.app.add_new_vm.called)
- mock_import_root_img.assert_called_once_with(self.app.domains[
- 'test-vm'], self.source_dir.name)
- mock_reset_private_img.assert_called_once_with(self.app.domains[
- 'test-vm'])
- mock_import_appmenus.assert_called_once_with(self.app.domains[
- 'test-vm'], self.source_dir.name)
- if qubesadmin.tools.qvm_template_postprocess.have_events:
- self.assertFalse(mock_domain_shutdown.called)
- self.assertEqual(self.app.service_calls, [])
- self.assertAllCalled()
- def test_030_pre_remove(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n'
- self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
- b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'False')]\
- = b'0\0'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
- b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
- b'test-vm\0'
- ret = qubesadmin.tools.qvm_template_postprocess.main([
- '--really', 'pre-remove', 'test-vm',
- self.source_dir.name],
- app=self.app)
- self.assertEqual(ret, 0)
- self.assertEqual(self.app.service_calls, [])
- self.assertAllCalled()
- def test_031_pre_remove_existing_appvm(self):
- self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
- b'0\0test-vm class=TemplateVM state=Halted\n' \
- b'test-vm2 class=AppVM state=Halted\n'
- self.app.expected_calls[
- ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
- b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
- b'test-vm\0'
- self.app.expected_calls[
- ('test-vm2', 'admin.vm.property.Get', 'template', None)] = \
- b'0\0default=no type=vm test-vm'
- with self.assertRaises(SystemExit):
- qubesadmin.tools.qvm_template_postprocess.main([
- '--really', 'pre-remove', 'test-vm',
- self.source_dir.name],
- app=self.app)
- self.assertEqual(self.app.service_calls, [])
- self.assertAllCalled()
- def test_040_missing_really(self):
- with self.assertRaises(SystemExit):
- qubesadmin.tools.qvm_template_postprocess.main([
- 'post-install', 'test-vm', self.source_dir.name],
- app=self.app)
- self.assertAllCalled()
|