tools: add qvm-template-postprocess
Tool to be called from template's rpm post-installation script.
This commit is contained in:
		
							parent
							
								
									93d7249ef0
								
							
						
					
					
						commit
						2675d63579
					
				| @ -26,3 +26,8 @@ QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client' | |||||||
| QREXEC_CLIENT_VM = '/usr/bin/qrexec-client-vm' | QREXEC_CLIENT_VM = '/usr/bin/qrexec-client-vm' | ||||||
| QUBESD_RECONNECT_DELAY = 1.0 | QUBESD_RECONNECT_DELAY = 1.0 | ||||||
| QREXEC_SERVICES_DIR = '/etc/qubes-rpc' | QREXEC_SERVICES_DIR = '/etc/qubes-rpc' | ||||||
|  | 
 | ||||||
|  | defaults = { | ||||||
|  |     'template_label': 'black', | ||||||
|  |     'shutdown_timeout': 60, | ||||||
|  | } | ||||||
|  | |||||||
							
								
								
									
										392
									
								
								qubesadmin/tests/tools/qvm_template_postprocess.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										392
									
								
								qubesadmin/tests/tools/qvm_template_postprocess.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,392 @@ | |||||||
|  | # -*- encoding: utf8 -*- | ||||||
|  | # | ||||||
|  | # The Qubes OS Project, http://www.qubes-os.org | ||||||
|  | # | ||||||
|  | # Copyright (C) 2017 Marek Marczykowski-Górecki | ||||||
|  | #                               <marmarek@invisiblethingslab.com> | ||||||
|  | # | ||||||
|  | # This program is free software; you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Lesser General Public License as published by | ||||||
|  | # the Free Software Foundation; either version 2.1 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | # | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Lesser General Public License for more details. | ||||||
|  | # | ||||||
|  | # You should have received a copy of the GNU Lesser General Public License along | ||||||
|  | # with this program; if not, see <http://www.gnu.org/licenses/>. | ||||||
|  | import asyncio | ||||||
|  | import os | ||||||
|  | import subprocess | ||||||
|  | import tempfile | ||||||
|  | from unittest import mock | ||||||
|  | import qubesadmin.tests | ||||||
|  | import qubesadmin.tools.qvm_template_postprocess | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class QubesLocalMock(qubesadmin.tests.QubesTest): | ||||||
|  |     def __init__(self): | ||||||
|  |         super(QubesLocalMock, self).__init__() | ||||||
|  |         self.__class__ = qubesadmin.app.QubesLocal | ||||||
|  | 
 | ||||||
|  |     qubesd_call = qubesadmin.tests.QubesTest.qubesd_call | ||||||
|  |     run_service = qubesadmin.tests.QubesTest.run_service | ||||||
|  | 
 | ||||||
|  | class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase): | ||||||
|  |     def setUp(self): | ||||||
|  |         super(TC_00_qvm_template_postprocess, self).setUp() | ||||||
|  |         self.source_dir = tempfile.TemporaryDirectory() | ||||||
|  | 
 | ||||||
|  |     def tearDown(self): | ||||||
|  |         try: | ||||||
|  |             self.source_dir.cleanup() | ||||||
|  |         except FileNotFoundError: | ||||||
|  |             pass | ||||||
|  |         super(TC_00_qvm_template_postprocess, self).tearDown() | ||||||
|  | 
 | ||||||
|  |     def test_000_import_root_img_raw(self): | ||||||
|  |         root_img = os.path.join(self.source_dir.name, 'root.img') | ||||||
|  |         volume_data = b'volume data' | ||||||
|  |         with open(root_img, 'wb') as f: | ||||||
|  |             f.write(volume_data) | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None, | ||||||
|  |                 None)] = \ | ||||||
|  |             b'0\0root\nprivate\nvolatile\nkernel\n' | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root', | ||||||
|  |                 str(len(volume_data)).encode())] = \ | ||||||
|  |             b'0\0' | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root', | ||||||
|  |             volume_data)] = b'0\0' | ||||||
|  |         vm = self.app.domains['test-vm'] | ||||||
|  |         qubesadmin.tools.qvm_template_postprocess.import_root_img( | ||||||
|  |             vm, self.source_dir.name) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def test_001_import_root_img_tar(self): | ||||||
|  |         root_img = os.path.join(self.source_dir.name, 'root.img') | ||||||
|  |         volume_data = b'volume data' * 1000 | ||||||
|  |         with open(root_img, 'wb') as f: | ||||||
|  |             f.write(volume_data) | ||||||
|  | 
 | ||||||
|  |         subprocess.check_call(['tar', 'cf', 'root.img.tar', 'root.img'], | ||||||
|  |             cwd=self.source_dir.name) | ||||||
|  |         subprocess.check_call(['split', '-d', '-b', '1024', 'root.img.tar', | ||||||
|  |             'root.img.part.'], cwd=self.source_dir.name) | ||||||
|  |         os.unlink(root_img) | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None, | ||||||
|  |                 None)] = \ | ||||||
|  |             b'0\0root\nprivate\nvolatile\nkernel\n' | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root', | ||||||
|  |                 str(len(volume_data)).encode())] = \ | ||||||
|  |             b'0\0' | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root', | ||||||
|  |             volume_data)] = b'0\0' | ||||||
|  |         vm = self.app.domains['test-vm'] | ||||||
|  |         qubesadmin.tools.qvm_template_postprocess.import_root_img( | ||||||
|  |             vm, self.source_dir.name) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def test_002_import_root_img_no_overwrite(self): | ||||||
|  |         self.app.qubesd_connection_type = 'socket' | ||||||
|  | 
 | ||||||
|  |         template_dir = os.path.join(self.source_dir.name, 'vm-templates', | ||||||
|  |             'test-vm') | ||||||
|  |         os.makedirs(template_dir) | ||||||
|  |         root_img = os.path.join(template_dir, 'root.img') | ||||||
|  |         volume_data = b'volume data' | ||||||
|  |         with open(root_img, 'wb') as f: | ||||||
|  |             f.write(volume_data) | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.volume.List', None, None)] = \ | ||||||
|  |             b'0\0root\nprivate\nvolatile\nkernel\n' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \ | ||||||
|  |             b'0\0pool=default\nvid=vm-templates/test-vm/root\n' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('dom0', 'admin.pool.List', None, None)] = \ | ||||||
|  |             b'0\0default\n' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('dom0', 'admin.pool.Info', 'default', None)] = \ | ||||||
|  |             b'0\0driver=file\ndir_path=' + self.source_dir.name.encode() + b'\n' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.volume.Resize', 'root', | ||||||
|  |                 str(len(volume_data)).encode())] = \ | ||||||
|  |             b'0\0' | ||||||
|  | 
 | ||||||
|  |         vm = self.app.domains['test-vm'] | ||||||
|  |         qubesadmin.tools.qvm_template_postprocess.import_root_img( | ||||||
|  |             vm, template_dir) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def test_010_import_appmenus(self): | ||||||
|  |         with open(os.path.join(self.source_dir.name, | ||||||
|  |                 'vm-whitelisted-appmenus.list'), 'w') as f: | ||||||
|  |             f.write('org.gnome.Terminal.desktop\n') | ||||||
|  |             f.write('firefox.desktop\n') | ||||||
|  |         with open(os.path.join(self.source_dir.name, | ||||||
|  |                 'whitelisted-appmenus.list'), 'w') as f: | ||||||
|  |             f.write('org.gnome.Terminal.desktop\n') | ||||||
|  |             f.write('org.gnome.Software.desktop\n') | ||||||
|  |             f.write('gnome-control-center.desktop\n') | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  | 
 | ||||||
|  |         vm = self.app.domains['test-vm'] | ||||||
|  |         with mock.patch('subprocess.check_call') as mock_proc: | ||||||
|  |             qubesadmin.tools.qvm_template_postprocess.import_appmenus( | ||||||
|  |                 vm, self.source_dir.name) | ||||||
|  |         self.assertEqual(mock_proc.mock_calls, [ | ||||||
|  |             mock.call(['qvm-appmenus', | ||||||
|  |                 '--set-default-whitelist=' + os.path.join(self.source_dir.name, | ||||||
|  |                     'vm-whitelisted-appmenus.list'), 'test-vm']), | ||||||
|  |             mock.call(['qvm-appmenus', '--set-whitelist=' + os.path.join( | ||||||
|  |                 self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']), | ||||||
|  |         ]) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     @mock.patch('grp.getgrnam') | ||||||
|  |     @mock.patch('os.getuid') | ||||||
|  |     def test_011_import_appmenus_as_root(self, mock_getuid, mock_getgrnam): | ||||||
|  |         with open(os.path.join(self.source_dir.name, | ||||||
|  |                 'vm-whitelisted-appmenus.list'), 'w') as f: | ||||||
|  |             f.write('org.gnome.Terminal.desktop\n') | ||||||
|  |             f.write('firefox.desktop\n') | ||||||
|  |         with open(os.path.join(self.source_dir.name, | ||||||
|  |                 'whitelisted-appmenus.list'), 'w') as f: | ||||||
|  |             f.write('org.gnome.Terminal.desktop\n') | ||||||
|  |             f.write('org.gnome.Software.desktop\n') | ||||||
|  |             f.write('gnome-control-center.desktop\n') | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  | 
 | ||||||
|  |         mock_getuid.return_value = 0 | ||||||
|  |         mock_getgrnam.configure_mock(**{ | ||||||
|  |             'return_value.gr_mem.__getitem__.return_value': 'user' | ||||||
|  |         }) | ||||||
|  | 
 | ||||||
|  |         vm = self.app.domains['test-vm'] | ||||||
|  |         with mock.patch('subprocess.check_call') as mock_proc: | ||||||
|  |             qubesadmin.tools.qvm_template_postprocess.import_appmenus( | ||||||
|  |                 vm, self.source_dir.name) | ||||||
|  |         self.assertEqual(mock_proc.mock_calls, [ | ||||||
|  |             mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0', | ||||||
|  |                 'qvm-appmenus', | ||||||
|  |                 '--set-default-whitelist=' + os.path.join(self.source_dir.name, | ||||||
|  |                     'vm-whitelisted-appmenus.list'), 'test-vm']), | ||||||
|  |             mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0', | ||||||
|  |                 'qvm-appmenus', '--set-whitelist=' + os.path.join( | ||||||
|  |                 self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']), | ||||||
|  |         ]) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     @mock.patch('grp.getgrnam') | ||||||
|  |     @mock.patch('os.getuid') | ||||||
|  |     def test_012_import_appmenus_missing_user(self, mock_getuid, mock_getgrnam): | ||||||
|  |         with open(os.path.join(self.source_dir.name, | ||||||
|  |                 'vm-whitelisted-appmenus.list'), 'w') as f: | ||||||
|  |             f.write('org.gnome.Terminal.desktop\n') | ||||||
|  |             f.write('firefox.desktop\n') | ||||||
|  |         with open(os.path.join(self.source_dir.name, | ||||||
|  |                 'whitelisted-appmenus.list'), 'w') as f: | ||||||
|  |             f.write('org.gnome.Terminal.desktop\n') | ||||||
|  |             f.write('org.gnome.Software.desktop\n') | ||||||
|  |             f.write('gnome-control-center.desktop\n') | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  | 
 | ||||||
|  |         mock_getuid.return_value = 0 | ||||||
|  |         mock_getgrnam.side_effect = KeyError | ||||||
|  | 
 | ||||||
|  |         vm = self.app.domains['test-vm'] | ||||||
|  |         with mock.patch('subprocess.check_call') as mock_proc: | ||||||
|  |             qubesadmin.tools.qvm_template_postprocess.import_appmenus( | ||||||
|  |                 vm, self.source_dir.name) | ||||||
|  |         self.assertEqual(mock_proc.mock_calls, []) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def add_new_vm_side_effect(self, *args, **kwargs): | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.domains.clear_cache() | ||||||
|  |         return self.app.domains['test-vm'] | ||||||
|  | 
 | ||||||
|  |     @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') | ||||||
|  |     @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') | ||||||
|  |     def test_020_post_install(self, mock_import_root_img, | ||||||
|  |             mock_import_appmenus): | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0' | ||||||
|  |         self.app.add_new_vm = mock.Mock(side_effect=self.add_new_vm_side_effect) | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.Start', None, None)] = b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0' | ||||||
|  | 
 | ||||||
|  |         if qubesadmin.tools.qvm_template_postprocess.have_events: | ||||||
|  |             patch_domain_shutdown = mock.patch( | ||||||
|  |                 'qubesadmin.events.utils.wait_for_domain_shutdown') | ||||||
|  |             self.addCleanup(patch_domain_shutdown.stop) | ||||||
|  |             mock_domain_shutdown = patch_domain_shutdown.start() | ||||||
|  |         else: | ||||||
|  |             self.app.expected_calls[ | ||||||
|  |                 ('test-vm', 'admin.vm.List', None, None)] = \ | ||||||
|  |                 b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  | 
 | ||||||
|  |         ret = qubesadmin.tools.qvm_template_postprocess.main([ | ||||||
|  |             '--really', 'post-install', 'test-vm', self.source_dir.name], | ||||||
|  |             app=self.app) | ||||||
|  |         self.assertEqual(ret, 0) | ||||||
|  |         self.app.add_new_vm.assert_called_once_with('TemplateVM', | ||||||
|  |             name='test-vm', label='black') | ||||||
|  |         mock_import_root_img.assert_called_once_with(self.app.domains[ | ||||||
|  |             'test-vm'], self.source_dir.name) | ||||||
|  |         mock_import_appmenus.assert_called_once_with(self.app.domains[ | ||||||
|  |             'test-vm'], self.source_dir.name) | ||||||
|  |         if qubesadmin.tools.qvm_template_postprocess.have_events: | ||||||
|  |             mock_domain_shutdown.assert_called_once_with(self.app.domains[ | ||||||
|  |                 'test-vm'], 60) | ||||||
|  |         self.assertEqual(self.app.service_calls, [ | ||||||
|  |             ('test-vm', 'qubes.PostInstall', {}), | ||||||
|  |             ('test-vm', 'qubes.PostInstall', b''), | ||||||
|  |         ]) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') | ||||||
|  |     @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') | ||||||
|  |     def test_021_post_install_reinstall(self, mock_import_root_img, | ||||||
|  |             mock_import_appmenus): | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.add_new_vm = mock.Mock() | ||||||
|  | 
 | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.Start', None, None)] = b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0' | ||||||
|  | 
 | ||||||
|  |         if qubesadmin.tools.qvm_template_postprocess.have_events: | ||||||
|  |             patch_domain_shutdown = mock.patch( | ||||||
|  |                 'qubesadmin.events.utils.wait_for_domain_shutdown') | ||||||
|  |             self.addCleanup(patch_domain_shutdown.stop) | ||||||
|  |             mock_domain_shutdown = patch_domain_shutdown.start() | ||||||
|  |         else: | ||||||
|  |             self.app.expected_calls[ | ||||||
|  |                 ('test-vm', 'admin.vm.List', None, None)] = \ | ||||||
|  |                 b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  | 
 | ||||||
|  |         ret = qubesadmin.tools.qvm_template_postprocess.main([ | ||||||
|  |             '--really', 'post-install', 'test-vm', self.source_dir.name], | ||||||
|  |             app=self.app) | ||||||
|  |         self.assertEqual(ret, 0) | ||||||
|  |         self.assertFalse(self.app.add_new_vm.called) | ||||||
|  |         mock_import_root_img.assert_called_once_with(self.app.domains[ | ||||||
|  |             'test-vm'], self.source_dir.name) | ||||||
|  |         mock_import_appmenus.assert_called_once_with(self.app.domains[ | ||||||
|  |             'test-vm'], self.source_dir.name) | ||||||
|  |         if qubesadmin.tools.qvm_template_postprocess.have_events: | ||||||
|  |             mock_domain_shutdown.assert_called_once_with(self.app.domains[ | ||||||
|  |                 'test-vm'], 60) | ||||||
|  |         self.assertEqual(self.app.service_calls, [ | ||||||
|  |             ('test-vm', 'qubes.PostInstall', {}), | ||||||
|  |             ('test-vm', 'qubes.PostInstall', b''), | ||||||
|  |         ]) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus') | ||||||
|  |     @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img') | ||||||
|  |     def test_022_post_install_skip_start(self, mock_import_root_img, | ||||||
|  |             mock_import_appmenus): | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.add_new_vm = mock.Mock() | ||||||
|  | 
 | ||||||
|  |         if qubesadmin.tools.qvm_template_postprocess.have_events: | ||||||
|  |             patch_domain_shutdown = mock.patch( | ||||||
|  |                 'qubesadmin.events.utils.wait_for_domain_shutdown') | ||||||
|  |             self.addCleanup(patch_domain_shutdown.stop) | ||||||
|  |             mock_domain_shutdown = patch_domain_shutdown.start() | ||||||
|  | 
 | ||||||
|  |         ret = qubesadmin.tools.qvm_template_postprocess.main([ | ||||||
|  |             '--really', '--skip-start', 'post-install', 'test-vm', | ||||||
|  |             self.source_dir.name], | ||||||
|  |             app=self.app) | ||||||
|  |         self.assertEqual(ret, 0) | ||||||
|  |         self.assertFalse(self.app.add_new_vm.called) | ||||||
|  |         mock_import_root_img.assert_called_once_with(self.app.domains[ | ||||||
|  |             'test-vm'], self.source_dir.name) | ||||||
|  |         mock_import_appmenus.assert_called_once_with(self.app.domains[ | ||||||
|  |             'test-vm'], self.source_dir.name) | ||||||
|  |         if qubesadmin.tools.qvm_template_postprocess.have_events: | ||||||
|  |             self.assertFalse(mock_domain_shutdown.called) | ||||||
|  |         self.assertEqual(self.app.service_calls, []) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def test_030_pre_remove(self): | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' | ||||||
|  |         self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \ | ||||||
|  |             b'0\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ | ||||||
|  |             b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \ | ||||||
|  |             b'test-vm\0' | ||||||
|  | 
 | ||||||
|  |         ret = qubesadmin.tools.qvm_template_postprocess.main([ | ||||||
|  |             '--really', 'pre-remove', 'test-vm', | ||||||
|  |             self.source_dir.name], | ||||||
|  |             app=self.app) | ||||||
|  |         self.assertEqual(ret, 0) | ||||||
|  |         self.assertEqual(self.app.service_calls, []) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def test_031_pre_remove_existing_appvm(self): | ||||||
|  |         self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ | ||||||
|  |             b'0\0test-vm class=TemplateVM state=Halted\n' \ | ||||||
|  |             b'test-vm2 class=AppVM state=Halted\n' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm', 'admin.vm.property.Get', 'template', None)] = \ | ||||||
|  |             b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \ | ||||||
|  |             b'test-vm\0' | ||||||
|  |         self.app.expected_calls[ | ||||||
|  |             ('test-vm2', 'admin.vm.property.Get', 'template', None)] = \ | ||||||
|  |             b'0\0default=no type=vm test-vm' | ||||||
|  | 
 | ||||||
|  |         with self.assertRaises(SystemExit): | ||||||
|  |             qubesadmin.tools.qvm_template_postprocess.main([ | ||||||
|  |                 '--really', 'pre-remove', 'test-vm', | ||||||
|  |                 self.source_dir.name], | ||||||
|  |                 app=self.app) | ||||||
|  |         self.assertEqual(self.app.service_calls, []) | ||||||
|  |         self.assertAllCalled() | ||||||
|  | 
 | ||||||
|  |     def test_040_missing_really(self): | ||||||
|  |         with self.assertRaises(SystemExit): | ||||||
|  |             qubesadmin.tools.qvm_template_postprocess.main([ | ||||||
|  |                 'post-install', 'test-vm', self.source_dir.name], | ||||||
|  |                 app=self.app) | ||||||
|  |         self.assertAllCalled() | ||||||
							
								
								
									
										229
									
								
								qubesadmin/tools/qvm_template_postprocess.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										229
									
								
								qubesadmin/tools/qvm_template_postprocess.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,229 @@ | |||||||
|  | # | ||||||
|  | # The Qubes OS Project, https://www.qubes-os.org/ | ||||||
|  | # | ||||||
|  | # Copyright (C) 2016  Marek Marczykowski-Górecki | ||||||
|  | #                                       <marmarek@invisiblethingslab.com> | ||||||
|  | # | ||||||
|  | # This program is free software; you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Lesser General Public License as published by | ||||||
|  | # the Free Software Foundation; either version 2.1 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | # | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Lesser General Public License for more details. | ||||||
|  | # | ||||||
|  | # You should have received a copy of the GNU Lesser General Public License along | ||||||
|  | # with this program; if not, write to the Free Software Foundation, Inc., | ||||||
|  | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||||||
|  | 
 | ||||||
|  | ''' Tool for importing rpm-installed template''' | ||||||
|  | 
 | ||||||
|  | import asyncio | ||||||
|  | import glob | ||||||
|  | import os | ||||||
|  | 
 | ||||||
|  | import shutil | ||||||
|  | import subprocess | ||||||
|  | 
 | ||||||
|  | import sys | ||||||
|  | 
 | ||||||
|  | import grp | ||||||
|  | 
 | ||||||
|  | import time | ||||||
|  | 
 | ||||||
|  | import qubesadmin | ||||||
|  | import qubesadmin.exc | ||||||
|  | import qubesadmin.tools | ||||||
|  | try: | ||||||
|  |     # pylint: disable=wrong-import-position | ||||||
|  |     import qubesadmin.events.utils | ||||||
|  |     have_events = True | ||||||
|  | except ImportError: | ||||||
|  |     have_events = False | ||||||
|  | 
 | ||||||
|  | parser = qubesadmin.tools.QubesArgumentParser( | ||||||
|  |     description='Postprocess template package') | ||||||
|  | parser.add_argument('--really', action='store_true', default=False, | ||||||
|  |     help='Really perform the action, YOU SHOULD REALLY KNOW WHAT YOU ARE DOING') | ||||||
|  | parser.add_argument('--skip-start', action='store_true', | ||||||
|  |     help='Do not start the VM - do not retrieve menu entries etc.') | ||||||
|  | parser.add_argument('--keep-source', action='store_true', | ||||||
|  |     help='Do not remove imported data') | ||||||
|  | parser.add_argument('action', choices=['post-install', 'pre-remove'], | ||||||
|  |     help='Action to perform') | ||||||
|  | parser.add_argument('name', action='store', | ||||||
|  |     help='Template name') | ||||||
|  | parser.add_argument('dir', action='store', | ||||||
|  |     help='Template directory') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def move_if_exists(source, dest_dir): | ||||||
|  |     '''Move file/directory if exists''' | ||||||
|  |     if os.path.exists(source): | ||||||
|  |         shutil.move(source, os.path.join(dest_dir, os.path.basename(source))) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_root_img_size(source_dir): | ||||||
|  |     '''Extract size of root.img to be imported''' | ||||||
|  |     root_path = os.path.join(source_dir, 'root.img') | ||||||
|  |     if os.path.exists(root_path + '.part.00'): | ||||||
|  |         # get just file root_size from the tar header | ||||||
|  |         p = subprocess.Popen(['tar', 'tvf', root_path + '.part.00'], | ||||||
|  |             stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) | ||||||
|  |         (stdout, _) = p.communicate() | ||||||
|  |         # -rw-r--r-- 0/0      1073741824 1970-01-01 01:00 root.img | ||||||
|  |         root_size = int(stdout.split()[2]) | ||||||
|  |     elif os.path.exists(root_path): | ||||||
|  |         root_size = os.path.getsize(root_path) | ||||||
|  |     else: | ||||||
|  |         raise qubesadmin.exc.QubesException('root.img not found') | ||||||
|  |     return root_size | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def import_root_img(vm, source_dir): | ||||||
|  |     '''Import root.img into VM object''' | ||||||
|  | 
 | ||||||
|  |     root_size = get_root_img_size(source_dir) | ||||||
|  |     vm.volumes['root'].resize(root_size) | ||||||
|  | 
 | ||||||
|  |     root_path = os.path.join(source_dir, 'root.img') | ||||||
|  |     if os.path.exists(root_path + '.part.00'): | ||||||
|  |         input_files = glob.glob(root_path + '.part.*') | ||||||
|  |         cat = subprocess.Popen(['cat'] + sorted(input_files), | ||||||
|  |             stdout=subprocess.PIPE) | ||||||
|  |         tar = subprocess.Popen(['tar', 'xSOf', '-'], | ||||||
|  |             stdin=cat.stdout, | ||||||
|  |             stdout=subprocess.PIPE) | ||||||
|  |         vm.volumes['root'].import_data(stream=tar.stdout) | ||||||
|  |         if tar.wait() != 0: | ||||||
|  |             raise qubesadmin.exc.QubesException('root.img extraction failed') | ||||||
|  |         if cat.wait() != 0: | ||||||
|  |             raise qubesadmin.exc.QubesException('root.img extraction failed') | ||||||
|  |         cat.stdout.close() | ||||||
|  |         tar.stdout.close() | ||||||
|  |     elif os.path.exists(root_path): | ||||||
|  |         if vm.app.qubesd_connection_type == 'socket': | ||||||
|  |             # check if root.img was already overwritten, i.e. if the source | ||||||
|  |             # and destination paths are the same | ||||||
|  |             vid = vm.volumes['root'].vid | ||||||
|  |             pool = vm.app.pools[vm.volumes['root'].pool] | ||||||
|  |             if pool.driver == 'file' and root_path == os.path.join( | ||||||
|  |                     pool.config['dir_path'], vid + '.img'): | ||||||
|  |                 vm.log.info('root.img already in place, do not re-import') | ||||||
|  |                 return | ||||||
|  |         with open(root_path, 'rb') as root_file: | ||||||
|  |             vm.volumes['root'].import_data(stream=root_file) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def import_appmenus(vm, source_dir): | ||||||
|  |     '''Import appmenus settings into VM object (later: GUI VM)''' | ||||||
|  |     if os.getuid() == 0: | ||||||
|  |         try: | ||||||
|  |             qubes_group = grp.getgrnam('qubes') | ||||||
|  |             user = qubes_group.gr_mem[0] | ||||||
|  |             cmd_prefix = ['runuser', '-u', user, '--', 'env', 'DISPLAY=:0'] | ||||||
|  |         except KeyError as e: | ||||||
|  |             vm.log.warning('Default user not found, not importing appmenus: ' + | ||||||
|  |                            str(e)) | ||||||
|  |             return | ||||||
|  |     else: | ||||||
|  |         cmd_prefix = [] | ||||||
|  | 
 | ||||||
|  |     # TODO: change this to qrexec calls to GUI VM, when GUI VM will be | ||||||
|  |     # implemented | ||||||
|  |     subprocess.check_call(cmd_prefix + ['qvm-appmenus', | ||||||
|  |         '--set-default-whitelist={}'.format(os.path.join(source_dir, | ||||||
|  |             'vm-whitelisted-appmenus.list')), vm.name]) | ||||||
|  |     subprocess.check_call(cmd_prefix + ['qvm-appmenus', | ||||||
|  |         '--set-whitelist={}'.format(os.path.join(source_dir, | ||||||
|  |             'whitelisted-appmenus.list')), vm.name]) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def post_install(args): | ||||||
|  |     '''Handle post-installation tasks''' | ||||||
|  | 
 | ||||||
|  |     app = args.app | ||||||
|  |     try: | ||||||
|  |         # reinstall | ||||||
|  |         vm = app.domains[args.name] | ||||||
|  |     except KeyError: | ||||||
|  |         if app.qubesd_connection_type == 'socket' and \ | ||||||
|  |                 args.dir == '/var/lib/qubes/vm-templates/' + args.name: | ||||||
|  |             # vm.create_on_disk() need to create the directory on its own, | ||||||
|  |             # move it away for from its way | ||||||
|  |             tmp_sourcedir = os.path.join('/var/lib/qubes/vm-templates', | ||||||
|  |                 'tmp-' + args.name) | ||||||
|  |             shutil.move(args.dir, tmp_sourcedir) | ||||||
|  |             args.dir = tmp_sourcedir | ||||||
|  | 
 | ||||||
|  |         vm = app.add_new_vm('TemplateVM', | ||||||
|  |             name=args.name, | ||||||
|  |             label=qubesadmin.config.defaults['template_label']) | ||||||
|  | 
 | ||||||
|  |     vm.log.info('Importing data') | ||||||
|  |     import_root_img(vm, args.dir) | ||||||
|  |     import_appmenus(vm, args.dir) | ||||||
|  | 
 | ||||||
|  |     if not args.skip_start: | ||||||
|  |         # just created, so no need to save previous value - we know what it was | ||||||
|  |         vm.netvm = None | ||||||
|  |         vm.start() | ||||||
|  |         try: | ||||||
|  |             vm.run_service_for_stdio('qubes.PostInstall') | ||||||
|  |         except qubesadmin.exc.QubesVMError: | ||||||
|  |             vm.log.error('qubes.PostInstall service failed') | ||||||
|  |         vm.shutdown() | ||||||
|  |         if have_events: | ||||||
|  |             try: | ||||||
|  |                 # pylint: disable=no-member | ||||||
|  |                 qubesadmin.events.utils.wait_for_domain_shutdown(vm, | ||||||
|  |                     qubesadmin.config.defaults['shutdown_timeout']) | ||||||
|  |             except qubesadmin.exc.QubesVMShutdownTimeout: | ||||||
|  |                 vm.kill() | ||||||
|  |             asyncio.get_event_loop().close() | ||||||
|  |         else: | ||||||
|  |             timeout = qubesadmin.config.defaults['shutdown_timeout'] | ||||||
|  |             while timeout >= 0: | ||||||
|  |                 if vm.is_halted(): | ||||||
|  |                     break | ||||||
|  |                 time.sleep(1) | ||||||
|  |                 timeout -= 1 | ||||||
|  |             if not vm.is_halted(): | ||||||
|  |                 vm.kill() | ||||||
|  | 
 | ||||||
|  |         vm.netvm = qubesadmin.DEFAULT | ||||||
|  | 
 | ||||||
|  |     return 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def pre_remove(args): | ||||||
|  |     '''Handle pre-removal tasks''' | ||||||
|  |     app = args.app | ||||||
|  |     try: | ||||||
|  |         tpl = app.domains[args.name] | ||||||
|  |     except KeyError: | ||||||
|  |         parser.error('Qube with this name do not exist') | ||||||
|  |     for appvm in tpl.appvms: | ||||||
|  |         parser.error('Qube {} use this template'.format(appvm.name)) | ||||||
|  | 
 | ||||||
|  |     del app.domains[args.name] | ||||||
|  |     return 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def main(args=None, app=None): | ||||||
|  |     '''Main function of qvm-template-postprocess''' | ||||||
|  |     args = parser.parse_args(args, app=app) | ||||||
|  |     if not args.really: | ||||||
|  |         parser.error('Do not call this tool directly.') | ||||||
|  |     if args.action == 'post-install': | ||||||
|  |         return post_install(args) | ||||||
|  |     elif args.action == 'pre-remove': | ||||||
|  |         pre_remove(args) | ||||||
|  |     else: | ||||||
|  |         parser.error('Unknown action') | ||||||
|  |     return 0 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |     sys.exit(main()) | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 Marek Marczykowski-Górecki
						Marek Marczykowski-Górecki