diff --git a/qubesmanager/tests/test_backup.py b/qubesmanager/tests/test_backup.py new file mode 100644 index 0000000..45f8b3c --- /dev/null +++ b/qubesmanager/tests/test_backup.py @@ -0,0 +1,697 @@ +#!/usr/bin/python3 +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 Marta Marczykowska-Górecka +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import logging.handlers +import sys +import unittest +import unittest.mock + +from PyQt4 import QtGui, QtTest, QtCore +from qubesadmin import Qubes, events, utils, exc +from qubesmanager import backup +import quamash +import asyncio +import gc + + +class BackupTest(unittest.TestCase): + def setUp(self): + super(BackupTest, self).setUp() + + # mock up nonexistence of saved backup settings + self.patcher_open = unittest.mock.patch('builtins.open') + self.mock_open = self.patcher_open.start() + self.mock_open.side_effect = FileNotFoundError() + self.addCleanup(self.patcher_open.stop) + + # mock up the Backup Thread to avoid accidentally changing system state + self.patcher_thread = unittest.mock.patch( + 'qubesmanager.backup.BackupThread') + self.mock_thread = self.patcher_thread.start() + self.addCleanup(self.patcher_thread.stop) + + self.qapp = Qubes() + self.qtapp = QtGui.QApplication(["test", "-style", "cleanlooks"]) + + self.dispatcher = events.EventsDispatcher(self.qapp) + + self.loop = quamash.QEventLoop(self.qtapp) + + self.dialog = backup.BackupVMsWindow( + self.qtapp, self.qapp, self.dispatcher) + + self.dialog.show() + + def tearDown(self): + self.dialog.hide() + # process any pending events before destroying the object + self.qtapp.processEvents() + + # queue destroying the QApplication object, do that for any other QT + # related objects here too + self.qtapp.deleteLater() + self.dialog.deleteLater() + + # process any pending events (other than just queued destroy), + # just in case + self.qtapp.processEvents() + + # execute main loop, which will process all events, _ + # including just queued destroy_ + self.loop.run_until_complete(asyncio.sleep(0)) + + # at this point it QT objects are destroyed, cleanup all remaining + # references; + # del other QT object here too + self.loop.close() + del self.dialog + del self.qtapp + del self.loop + gc.collect() + super(BackupTest, self).tearDown() + + def test_00_window_loads(self): + self.assertTrue(self.dialog.select_vms_widget is not None) + + def test_01_vms_load_correctly(self): + all_vms = len([vm for vm in self.qapp.domains + if not vm.features.get('internal', False)]) + + selected_vms = self.dialog.select_vms_widget.selected_list.count() + available_vms = self.dialog.select_vms_widget.available_list.count() + + self.assertEqual(all_vms, available_vms + selected_vms) + + def test_02_correct_defaults(self): + # backup is compressed + self.assertTrue(self.dialog.compress_checkbox.isChecked(), + "Compress backup should be checked by default") + + # correct VMs are selected + include_in_backups_no = len([vm for vm in self.qapp.domains + if not vm.features.get('internal', False) + and getattr(vm, 'include_in_backups', True)]) + selected_no = self.dialog.select_vms_widget.selected_list.count() + self.assertEqual(include_in_backups_no, selected_no, + "Incorrect VMs selected by default") + + # passphrase is empty + self.assertEqual(self.dialog.passphrase_line_edit.text(), "", + "Passphrase should be empty") + + # save defaults + self.assertTrue(self.dialog.save_profile_checkbox.isChecked(), + "By default, profile should be saved") + + def test_03_select_vms_widget(self): + number_of_all_vms = len([vm for vm in self.qapp.domains + if not vm.features.get('internal', False)]) + + # select all + self.dialog.select_vms_widget.add_all_button.click() + self.assertEqual(number_of_all_vms, + self.dialog.select_vms_widget.selected_list.count(), + "Add All VMs does not work") + + # remove all + self.dialog.select_vms_widget.remove_all_button.click() + self.assertEqual(number_of_all_vms, + self.dialog.select_vms_widget.available_list.count(), + "Remove All VMs does not work") + + self._select_vm("work") + + self.assertEqual(self.dialog.select_vms_widget.selected_list.count(), + 1, "Select a single VM does not work") + + def test_04_open_directory(self): + self.dialog.next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + with unittest.mock.patch('qubesmanager.backup_utils.' + 'select_path_button_clicked') as mock_func: + self.dialog.select_path_button.click() + mock_func.assert_called_once_with(unittest.mock.ANY) + + def test_05_running_vms_listed(self): + self.dialog.next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + running_vms = [vm.name for vm in self.qapp.domains if vm.is_running()] + + listed_vms = [] + + for i in range(self.dialog.appvm_combobox.count()): + listed_vms.append(self.dialog.appvm_combobox.itemText(i)) + + self.assertListEqual(sorted(running_vms), sorted(listed_vms), + "Incorrect list of running vms") + + def test_06_passphrase_verification(self): + self.dialog.next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + # required to check if next button is correctly enabled + self.dialog.dir_line_edit.setText("/home") + + next_button = self.dialog.button(self.dialog.NextButton) + + # check if next remains inactive for various incorrect + # passphrase/incorrect combinations + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("fail") + self.assertFalse(next_button.isEnabled(), + "Mismatched passphrase/verification accepted") + + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("") + self.assertFalse(next_button.isEnabled(), "Empty verification accepted") + + self.dialog.passphrase_line_edit.setText("") + self.dialog.passphrase_line_edit_verify.setText("fail") + self.assertFalse(next_button.isEnabled(), "Empty passphrase accepted") + + self.dialog.passphrase_line_edit.setText("") + self.dialog.passphrase_line_edit_verify.setText("") + self.assertFalse(next_button.isEnabled(), + "Empty passphrase and verification accepted") + + # check if next is active for a correct passphrase/verify + # combination + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + self.assertTrue(next_button.isEnabled(), + "Matching passphrase/verification not accepted") + + def test_07_disk_space_correct(self): + for i in range(self.dialog.select_vms_widget.available_list.count()): + item = self.dialog.select_vms_widget.available_list.item(i) + if item.vm.name == "dom0" or item.vm.get_disk_utilization() > 0: + self.assertGreater( + item.size, 0, + "{} size incorrectly reported as 0".format(item.vm.name)) + + def test_08_total_size_correct(self): + # select nothing + self.dialog.select_vms_widget.remove_all_button.click() + self.assertEqual(self.dialog.total_size_label.text(), "0", + "Total size of 0 vms incorrectly reported as 0") + + current_size = 0 + # select a single VM + self._select_vm("sys-net") + + current_size += self.qapp.domains["sys-net"].get_disk_utilization() + self.assertEqual(self.dialog.total_size_label.text(), + utils.size_to_human(current_size), + "Size incorrectly listed for a single VM") + + # add two more + self._select_vm("sys-firewall") + self._select_vm("work") + + current_size += self.qapp.domains["sys-firewall"].get_disk_utilization() + current_size += self.qapp.domains["work"].get_disk_utilization() + + self.assertEqual(self.dialog.total_size_label.text(), + utils.size_to_human(current_size), + "Size incorrectly listed for several VMs") + + # remove one + self._deselect_vm("sys-net") + current_size -= self.qapp.domains["sys-net"].get_disk_utilization() + self.assertEqual(self.dialog.total_size_label.text(), + utils.size_to_human(current_size), + "Size incorrectly listed for several VMs") + + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_10_first_backup(self, mock_qubesd, mock_write_profile): + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_vms_page) + + self.dialog.select_vms_widget.remove_all_button.click() + self._select_vm("work") + + self._click_next() + + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + # setup backup + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + self.dialog.save_profile_checkbox.setChecked(True) + self.dialog.turn_off_checkbox.setChecked(False) + self.dialog.compress_checkbox.setChecked(False) + expected_settings = {'destination_vm': "dom0", + 'destination_path': "/home", + 'include': ["work"], + 'passphrase_text': "pass", + 'compression': False} + with unittest.mock.patch.object(self.dialog.textEdit, 'setText')\ + as mock_set_text: + self._click_next() + + # make sure the confirmation is not empty + self.assertTrue(self.dialog.currentPage() + is self.dialog.confirm_page) + + mock_write_profile.assert_called_with(expected_settings, True) + mock_qubesd.assert_called_with('dom0', 'admin.backup.Info', + unittest.mock.ANY) + mock_set_text.assert_called_once_with("backup output") + + # make sure the backup is executed + self._click_next() + self.mock_thread.assert_called_once_with(self.qapp.domains["dom0"]) + self.mock_thread().start.assert_called_once_with() + + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_11_second_backup(self, mock_qubesd, mock_write_profile): + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_vms_page) + + self.dialog.select_vms_widget.remove_all_button.click() + self._select_vm("work") + self._select_vm("sys-net") + self._select_vm("dom0") + + self._click_next() + + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + # setup backup + self._select_location("sys-net") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("longerPassPhrase") + self.dialog.passphrase_line_edit_verify.setText("longerPassPhrase") + self.dialog.save_profile_checkbox.setChecked(False) + self.dialog.turn_off_checkbox.setChecked(False) + self.dialog.compress_checkbox.setChecked(True) + expected_settings = {'destination_vm': "sys-net", + 'destination_path': "/home", + 'include': ["dom0", "sys-net", "work"], + 'passphrase_text': "longerPassPhrase", + 'compression': True} + with unittest.mock.patch.object(self.dialog.textEdit, 'setText')\ + as mock_set_text: + self._click_next() + + # make sure the confirmation is not empty + self.assertTrue(self.dialog.currentPage() + is self.dialog.confirm_page) + + mock_write_profile.assert_called_with(expected_settings, True) + mock_qubesd.assert_called_with('dom0', 'admin.backup.Info', + unittest.mock.ANY) + mock_set_text.assert_called_once_with("backup output") + + # make sure the backup is executed + self._click_next() + self.mock_thread.assert_called_once_with(self.qapp.domains["sys-net"]) + self.mock_thread().start.assert_called_once_with() + + @unittest.mock.patch('qubesmanager.backup_utils.load_backup_profile') + def test_20_loading_settings(self, mock_load): + + mock_load.return_value = { + 'destination_vm': "sys-net", + 'destination_path': "/home", + 'include': ["dom0", "sys-net", "work"], + 'passphrase_text': "longerPassPhrase", + 'compression': True + } + + self.dialog.hide() + self.dialog.deleteLater() + self.qtapp.processEvents() + + self.dialog = backup.BackupVMsWindow( + self.qtapp, self.qapp, self.dispatcher) + self.dialog.show() + + # check if settings were loaded + self.assertEqual(self.dialog.appvm_combobox.currentText(), "sys-net", + "Destination VM not loaded") + self.assertEqual(self.dialog.dir_line_edit.text(), "/home", + "Destination path not loaded") + self.assertEqual(self.dialog.passphrase_line_edit.text(), + "longerPassPhrase", "Passphrase not loaded") + self.assertEqual(self.dialog.passphrase_line_edit_verify.text(), + "longerPassPhrase", "Passphrase verify not loaded") + self.assertTrue(self.dialog.compress_checkbox.isChecked()) + + # check that 'include' vms were not pre-selected + include_in_backups_no = len( + [vm for vm in self.qapp.domains + if not vm.features.get('internal', False) + and getattr(vm, 'include_in_backups', True)]) + selected_no = self.dialog.select_vms_widget.selected_list.count() + self.assertEqual(include_in_backups_no, selected_no, + "Incorrect VM list selected") + + # check no errors were detected + self.assertFalse(self.dialog.unrecognized_config_label.isVisible()) + + @unittest.mock.patch('qubesmanager.backup_utils.load_backup_profile') + def test_21_loading_settings_error(self, mock_load): + + mock_load.return_value = { + 'destination_vm': "incorrect_vm", + } + + self.dialog.hide() + self.dialog.deleteLater() + self.qtapp.processEvents() + + self.dialog = backup.BackupVMsWindow( + self.qtapp, self.qapp, self.dispatcher) + self.dialog.show() + + # check errors were detected + self.assertTrue(self.dialog.unrecognized_config_label.isVisible()) + + @unittest.mock.patch('qubesmanager.backup_utils.load_backup_profile') + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.information') + def test_22_loading_settings_exc(self, mock_info, mock_load): + + mock_load.side_effect = exc.QubesException('Error') + + self.dialog.hide() + self.dialog.deleteLater() + self.qtapp.processEvents() + + self.dialog = backup.BackupVMsWindow( + self.qtapp, self.qapp, self.dispatcher) + self.dialog.show() + + # check error was reported + self.assertEqual(mock_info.call_count, 1, "Warning not shown") + + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_23_cancel_confirm(self, *_args): + self._click_next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + + self._click_next() + + # attempt to cancel + with unittest.mock.patch('os.remove') as mock_remove: + self._click_cancel() + mock_remove.assert_called_once_with( + '/etc/qubes/backup/qubes-manager-backup-tmp.conf') + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_24_cancel_in_progress(self, mock_call, *_args): + self._click_next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + + self._click_next() + self._click_next() + + # attempt to cancel + with unittest.mock.patch('os.remove') as mock_remove: + self._click_cancel() + mock_call.assert_called_with('dom0', 'admin.backup.Cancel', + 'qubes-manager-backup-tmp') + mock_remove.assert_called_once_with( + '/etc/qubes/backup/qubes-manager-backup-tmp.conf') + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch('os.system') + @unittest.mock.patch('os.remove') + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_25_successful_backup(self, _a, _b, mock_remove, + mock_system, mock_warning): + self._click_next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + self.dialog.turn_off_checkbox.setChecked(False) + + self._click_next() + self._click_next() + + # assume backup went correctly + self.mock_thread().msg = None + + self.mock_thread().finished.connect.assert_called_once_with( + self.dialog.backup_finished) + + self.dialog.backup_finished() + + self.assertFalse(self.dialog.button( + self.dialog.CancelButton).isEnabled()) + self.assertTrue(self.dialog.button( + self.dialog.FinishButton).isEnabled()) + mock_remove.assert_called_once_with( + '/etc/qubes/backup/qubes-manager-backup-tmp.conf') + self.assertEqual(mock_system.call_count, 0, + "System turned off unnecessarily") + self.assertEqual(mock_warning.call_count, 0, + "Backup succeeded but received warning") + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch('os.system') + @unittest.mock.patch('os.remove') + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_26_success_backup_poweroff( + self, _a, _b, mock_remove, mock_system, mock_warning): + self._click_next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + self.dialog.turn_off_checkbox.setChecked(True) + + self._click_next() + + self._click_next() + + # assume backup went correctly + self.mock_thread().msg = None + self.mock_thread().finished.connect.assert_called_once_with( + self.dialog.backup_finished) + + self.dialog.backup_finished() + + self.assertFalse(self.dialog.button( + self.dialog.CancelButton).isEnabled()) + self.assertTrue(self.dialog.button( + self.dialog.FinishButton).isEnabled()) + mock_remove.assert_called_once_with( + '/etc/qubes/backup/qubes-manager-backup-tmp.conf') + mock_system.assert_called_once_with('systemctl poweroff') + self.assertEqual(mock_warning.call_count, 0, + "Backup succeeded but received warning") + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch('os.system') + @unittest.mock.patch('os.remove') + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_27_failed_backup( + self, _a, _b, mock_remove, mock_system, mock_warn): + self._click_next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + self.dialog.turn_off_checkbox.setChecked(True) + + self._click_next() + self._click_next() + + # assume backup went wrong + self.mock_thread().msg = "Error" + self.mock_thread().finished.connect.assert_called_once_with( + self.dialog.backup_finished) + + self.dialog.backup_finished() + + self.assertFalse(self.dialog.button( + self.dialog.CancelButton).isEnabled()) + self.assertTrue(self.dialog.button( + self.dialog.FinishButton).isEnabled()) + mock_remove.assert_called_once_with( + '/etc/qubes/backup/qubes-manager-backup-tmp.conf') + self.assertEqual(mock_system.call_count, 0, + "Attempted shutdown at failed backup") + self.assertEqual(mock_warn.call_count, 1) + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch('os.system') + @unittest.mock.patch('os.remove') + @unittest.mock.patch('qubesmanager.backup_utils.write_backup_profile') + @unittest.mock.patch('qubesadmin.Qubes.qubesd_call', + return_value=b'backup output') + def test_28_progress( + self, _a, _b, mock_remove, mock_system, mock_warn): + self._click_next() + self.assertTrue(self.dialog.currentPage() + is self.dialog.select_dir_page) + + self._select_location("dom0") + self.dialog.dir_line_edit.setText("/home") + self.dialog.passphrase_line_edit.setText("pass") + self.dialog.passphrase_line_edit_verify.setText("pass") + self.dialog.turn_off_checkbox.setChecked(True) + + self._click_next() + self._click_next() + + # see if backup is correctly in progress + self.assertTrue(self.dialog.button( + self.dialog.CancelButton).isEnabled()) + self.assertFalse(self.dialog.button( + self.dialog.FinishButton).isEnabled()) + self.assertEqual(self.dialog.progress_bar.value(), 0, + "Progress bar does not start at 0") + + # this is not a perfect method, but it is something + + self.dialog.on_backup_progress(None, None, progress='23.3123') + self.assertEqual(self.dialog.progress_bar.value(), 23, + "Progress bar does not update correctly") + + self.dialog.on_backup_progress(None, None, progress='87.89') + self.assertEqual(self.dialog.progress_bar.value(), 87, + "Progress bar does not update correctly") + + def _select_location(self, vm_name): + widget = self.dialog.appvm_combobox + widget.setCurrentIndex(0) + while not widget.currentText() == vm_name: + if widget.currentIndex() == widget.count(): + self.skipTest("target VM not found") + widget.setCurrentIndex(widget.currentIndex() + 1) + + def _click_next(self): + next_widget = self.dialog.button(QtGui.QWizard.NextButton) + QtTest.QTest.mouseClick(next_widget, QtCore.Qt.LeftButton) + + def _click_cancel(self): + cancel_widget = self.dialog.button(QtGui.QWizard.CancelButton) + QtTest.QTest.mouseClick(cancel_widget, QtCore.Qt.LeftButton) + + def _select_vm(self, name_starts_with): + for i in range(self.dialog.select_vms_widget.available_list.count()): + item = self.dialog.select_vms_widget.available_list.item(i) + if item.text().startswith(name_starts_with): + item.setSelected(True) + self.dialog.select_vms_widget.add_selected_button.click() + return + + def _deselect_vm(self, name_starts_with): + for i in range(self.dialog.select_vms_widget.selected_list.count()): + item = self.dialog.select_vms_widget.selected_list.item(i) + if item.text().startswith(name_starts_with): + item.setSelected(True) + self.dialog.select_vms_widget.remove_selected_button.click() + return + + +class BackupThreadTest(unittest.TestCase): + def test_01_backup_thread_vm_on(self): + vm = unittest.mock.Mock(spec=['is_running', 'app'], + **{'is_running.return_value': True}) + + vm.app = unittest.mock.Mock() + + thread = backup.BackupThread(vm) + thread.run() + + vm.app.qubesd_call.assert_called_with( + 'dom0', 'admin.backup.Execute', 'qubes-manager-backup-tmp') + + def test_02_backup_thread_vm_off(self): + vm = unittest.mock.Mock(spec=['is_running', 'app', 'start'], + **{'is_running.return_value': False}) + + vm.app = unittest.mock.Mock() + + thread = backup.BackupThread(vm) + thread.run() + + vm.app.qubesd_call.assert_called_with( + 'dom0', 'admin.backup.Execute', 'qubes-manager-backup-tmp') + vm.start.assert_called_once_with() + + def test_03_backup_thread_error(self): + vm = unittest.mock.Mock(spec=['is_running', 'app'], + **{'is_running.return_value': True}) + + vm.app = unittest.mock.Mock() + vm.app.qubesd_call.side_effect = exc.QubesException('Error') + + thread = backup.BackupThread(vm) + thread.run() + + self.assertIsNotNone(thread.msg) + + +if __name__ == "__main__": + ha_syslog = logging.handlers.SysLogHandler('/dev/log') + ha_syslog.setFormatter( + logging.Formatter('%(name)s[%(process)d]: %(message)s')) + logging.root.addHandler(ha_syslog) + unittest.main() diff --git a/qubesmanager/tests/test_backup_01.py b/qubesmanager/tests/test_backup_01.py deleted file mode 100644 index 3470061..0000000 --- a/qubesmanager/tests/test_backup_01.py +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/python3 -# -# The Qubes OS Project, https://www.qubes-os.org/ -# -# Copyright (C) 2016 Marta Marczykowska-Górecka -# -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# -import logging.handlers -import sys -import unittest -import unittest.mock - -from PyQt4 import QtGui, QtTest, QtCore -from qubesadmin import Qubes -import qubesmanager.backup as backup_gui - - -class BackupTest(unittest.TestCase): - def setUp(self): - super(BackupTest, self).setUp() - - # mock up nonexistence of saved backup settings - self.patcher = unittest.mock.patch('builtins.open') - self.mock_open = self.patcher.start() - self.mock_open.side_effect = FileNotFoundError() - self.addCleanup(self.patcher.stop) - - self.qapp = Qubes() - self.qtapp = QtGui.QApplication(sys.argv) - self.dialog = backup_gui.BackupVMsWindow(self.qtapp, self.qapp) - - def tearDown(self): - del self.dialog - del self.qtapp - del self.qapp - super(BackupTest, self).tearDown() - - def test_00_window_loads(self): - self.assertTrue(self.dialog.select_vms_widget is not None) - - def test_01_vms_load_correctly(self): - all_vms = len([vm for vm in self.qapp.domains - if not vm.features.get('internal', False)]) - - selected_vms = self.dialog.select_vms_widget.selected_list.count() - available_vms = self.dialog.select_vms_widget.available_list.count() - - self.assertEqual(all_vms, available_vms + selected_vms) - - def test_02_correct_defaults(self): - # backup is compressed - self.assertTrue(self.dialog.compress_checkbox.isChecked(), - "Compress backup should be checked by default") - - # correct VMs are selected - include_in_backups_no = len([vm for vm in self.qapp.domains - if not vm.features.get('internal', False) - and getattr(vm, 'include_in_backups', True)]) - selected_no = self.dialog.select_vms_widget.selected_list.count() - self.assertEqual(include_in_backups_no, selected_no, - "Incorrect VMs selected by default") - - # passphrase is empty - self.assertEqual(self.dialog.passphrase_line_edit.text(), "", - "Passphrase should be empty") - - # save defaults - self.assertTrue(self.dialog.save_profile_checkbox.isChecked(), - "By default, profile should be saved") - - # Check if target vms are selected - # Check if no default file loads correctly - another file?? - # TODO: make a separate backup testing file to test various backup defaults - - -if __name__ == "__main__": - ha_syslog = logging.handlers.SysLogHandler('/dev/log') - ha_syslog.setFormatter( - logging.Formatter('%(name)s[%(process)d]: %(message)s')) - logging.root.addHandler(ha_syslog) - unittest.main() diff --git a/qubesmanager/tests/test_backup_utils.py b/qubesmanager/tests/test_backup_utils.py new file mode 100644 index 0000000..13465e0 --- /dev/null +++ b/qubesmanager/tests/test_backup_utils.py @@ -0,0 +1,97 @@ +#!/usr/bin/python3 +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 Marta Marczykowska-Górecka +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import logging.handlers +import unittest.mock +import quamash +import asyncio +import gc +from PyQt4 import QtGui +from qubesadmin import Qubes + +from qubesmanager import backup_utils + + +class BackupUtilsTest(unittest.TestCase): + def setUp(self): + super(BackupUtilsTest, self).setUp() + self.qapp = Qubes() + self.qtapp = QtGui.QApplication(["test", "-style", "cleanlooks"]) + self.loop = quamash.QEventLoop(self.qtapp) + + def tearDown(self): + # process any pending events before destroying the object + self.qtapp.processEvents() + + # queue destroying the QApplication object, do that for any other QT + # related objects here too + self.qtapp.deleteLater() + + # process any pending events (other than just queued destroy), + # just in case + self.qtapp.processEvents() + + # execute main loop, which will process all events, _ + # including just queued destroy_ + self.loop.run_until_complete(asyncio.sleep(0)) + + # at this point it QT objects are destroyed, cleanup all remaining + # references; + # del other QT object here too + self.loop.close() + del self.qtapp + del self.loop + gc.collect() + super(BackupUtilsTest, self).tearDown() + + def test_01_fill_apvms(self): + dialog = QtGui.QDialog() + combobox = QtGui.QComboBox() + dialog.appvm_combobox = combobox + dialog.qubes_app = self.qapp + + backup_utils.fill_appvms_list(dialog) + + # see if the dialog has nothing selected + self.assertEqual(combobox.currentIndex(), 0, + "Incorrect item selected") + + # the combobox should contain running VMs that are not internal and + # not template + expected_vm_list = [vm.name for vm in self.qapp.domains + if vm.is_running() and vm.klass != 'TemplateVM' + and not getattr(vm, 'internal', False)] + received_vm_list = [] + for i in range(combobox.count()): + received_vm_list.append(combobox.itemText(i)) + + self.assertListEqual(sorted(expected_vm_list), sorted(received_vm_list), + "VM list not filled correctly") + + + + +if __name__ == "__main__": + ha_syslog = logging.handlers.SysLogHandler('/dev/log') + ha_syslog.setFormatter( + logging.Formatter('%(name)s[%(process)d]: %(message)s')) + logging.root.addHandler(ha_syslog) + unittest.main() diff --git a/qubesmanager/tests/test_create_new_vm.py b/qubesmanager/tests/test_create_new_vm.py new file mode 100644 index 0000000..878ee61 --- /dev/null +++ b/qubesmanager/tests/test_create_new_vm.py @@ -0,0 +1,318 @@ +#!/usr/bin/python3 +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 Marta Marczykowska-Górecka +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import logging.handlers +import quamash +import asyncio +import unittest +import unittest.mock +import qubesadmin +import gc + +from PyQt4 import QtGui, QtTest, QtCore +from qubesadmin import Qubes +from qubesmanager import create_new_vm + + +class NewVmTest(unittest.TestCase): + def setUp(self): + super(NewVmTest, self).setUp() + + self.qapp = Qubes() + self.qtapp = QtGui.QApplication(["test", "-style", "cleanlooks"]) + self.loop = quamash.QEventLoop(self.qtapp) + + # mock up the Create VM Thread to avoid changing system state + self.patcher_thread = unittest.mock.patch( + 'qubesmanager.create_new_vm.CreateVMThread') + self.mock_thread = self.patcher_thread.start() + self.addCleanup(self.patcher_thread.stop) + + # mock the progress dialog to speed testing up + self.patcher_progress = unittest.mock.patch( + 'PyQt4.QtGui.QProgressDialog') + self.mock_progress = self.patcher_progress.start() + self.addCleanup(self.patcher_progress.stop) + + self.dialog = create_new_vm.NewVmDlg( + self.qtapp, self.qapp) + + def tearDown(self): + # process any pending events before destroying the object + self.qtapp.processEvents() + + # queue destroying the QApplication object, do that for any other QT + # related objects here too + self.qtapp.deleteLater() + self.dialog.deleteLater() + + # process any pending events (other than just queued destroy), + # just in case + self.qtapp.processEvents() + + # execute main loop, which will process all events, _ + # including just queued destroy_ + self.loop.run_until_complete(asyncio.sleep(0)) + + # at this point it QT objects are destroyed, cleanup all remaining + # references; + # del other QT object here too + self.loop.close() + del self.dialog + del self.qtapp + del self.loop + gc.collect() + + super(NewVmTest, self).tearDown() + + def test_00_window_loads(self): + self.assertGreater(self.dialog.template_vm.count(), 0, + "No templates shown") + self.assertGreater(self.dialog.netvm.count(), 0, "No netvm listed") + + def test_01_cancel_works(self): + self.__click_cancel() + self.assertEqual(self.mock_thread.call_count, 0, + "Attempted to create VM on cancel") + + def test_02_create_simple_vm(self): + self.dialog.name.setText("testvm") + self.__click_ok() + + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + unittest.mock.ANY, qubesadmin.DEFAULT, + {'provides_network': False}) + self.mock_thread().start.assert_called_once_with() + + def test_03_label(self): + for i in range(self.dialog.label.count()): + if self.dialog.label.itemText(i) == 'blue': + self.dialog.label.setCurrentIndex(i) + break + + self.dialog.name.setText("testvm") + self.__click_ok() + + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + self.qapp.labels['blue'], qubesadmin.DEFAULT, + unittest.mock.ANY) + self.mock_thread().start.assert_called_once_with() + + def test_04_template(self): + template = None + for i in range(self.dialog.template_vm.count()): + if not self.dialog.template_vm.itemText(i).startswith('default'): + self.dialog.template_vm.setCurrentIndex(i) + template = self.dialog.template_vm.currentText() + break + + self.dialog.name.setText("testvm") + self.__click_ok() + + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + unittest.mock.ANY, template, + unittest.mock.ANY) + + def test_05_netvm(self): + netvm = None + for i in range(self.dialog.netvm.count()): + if not self.dialog.netvm.itemText(i).startswith('default'): + self.dialog.netvm.setCurrentIndex(i) + netvm = self.dialog.netvm.currentText() + break + + self.dialog.name.setText("testvm") + self.__click_ok() + + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + unittest.mock.ANY, unittest.mock.ANY, + {'netvm': netvm, 'provides_network': False}) + + def test_06_provides_network(self): + self.dialog.provides_network.setChecked(True) + + self.dialog.name.setText("testvm") + self.__click_ok() + + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + unittest.mock.ANY, unittest.mock.ANY, + {'provides_network': True}) + + @unittest.mock.patch('subprocess.check_call') + def test_07_launch_settings(self, mock_call): + self.dialog.launch_settings.setChecked(True) + + self.dialog.name.setText("testvm") + + self.__click_ok() + + # make sure the thread is not reporting an error + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + unittest.mock.ANY, unittest.mock.ANY, + unittest.mock.ANY) + + self.mock_thread().msg = None + self.dialog.create_finished() + + mock_call.assert_called_once_with(['qubes-vm-settings', "testvm"]) + + def test_08_progress_hides(self): + self.dialog.name.setText("testvm") + + self.__click_ok() + + self.mock_thread.assert_called_once_with( + self.qapp, "AppVM", "testvm", + unittest.mock.ANY, unittest.mock.ANY, + unittest.mock.ANY) + + # make sure the thread is not reporting an error + self.mock_thread().start.assert_called_once_with() + self.mock_thread().msg = None + + self.mock_progress().show.assert_called_once_with() + + self.dialog.create_finished() + + self.mock_progress().hide.assert_called_once_with() + + def test_09_standalone_clone(self): + self.dialog.name.setText("testvm") + for i in range(self.dialog.vm_type.count()): + opt_text = self.dialog.vm_type.itemText(i).lower() + if "standalone" in opt_text and "template" in opt_text and\ + "not based" not in opt_text and "empty" not in opt_text: + self.dialog.vm_type.setCurrentIndex(i) + break + + self.__click_ok() + self.mock_thread.assert_called_once_with( + self.qapp, "StandaloneVM", "testvm", + unittest.mock.ANY, unittest.mock.ANY, + unittest.mock.ANY) + + @unittest.mock.patch('subprocess.check_call') + def test_10_standalone_empty(self, mock_call): + self.dialog.name.setText("testvm") + for i in range(self.dialog.vm_type.count()): + opt_text = self.dialog.vm_type.itemText(i).lower() + if "standalone" in opt_text and\ + ("not based" in opt_text or "empty" in opt_text): + self.dialog.vm_type.setCurrentIndex(i) + break + + self.__click_ok() + self.mock_thread.assert_called_once_with( + self.qapp, "StandaloneVM", "testvm", + unittest.mock.ANY, None, + unittest.mock.ANY) + + self.mock_thread().msg = None + self.dialog.create_finished() + + mock_call.assert_called_once_with(['qubes-vm-boot-from-device', + 'testvm']) + + @unittest.mock.patch('subprocess.check_call') + def test_11_standalone_empty_not_install(self, mock_call): + self.dialog.name.setText("testvm") + + for i in range(self.dialog.vm_type.count()): + opt_text = self.dialog.vm_type.itemText(i).lower() + if "standalone" in opt_text and\ + ("not based" in opt_text or "empty" in opt_text): + self.dialog.vm_type.setCurrentIndex(i) + break + + self.dialog.install_system.setChecked(False) + + self.__click_ok() + self.mock_thread.assert_called_once_with( + self.qapp, "StandaloneVM", "testvm", + unittest.mock.ANY, None, + unittest.mock.ANY) + + self.mock_thread().msg = None + self.dialog.create_finished() + + self.assertEqual(mock_call.call_count, 0) + + def test_12_setting_change(self): + # cannot install system on a template-based appvm + for i in range(self.dialog.vm_type.count()): + opt_text = self.dialog.vm_type.itemText(i).lower() + if "appvm" in opt_text and "standalone" not in opt_text: + self.dialog.vm_type.setCurrentIndex(i) + break + self.assertFalse(self.dialog.install_system.isEnabled()) + self.assertTrue(self.dialog.launch_settings.isEnabled()) + self.assertTrue(self.dialog.template_vm.isEnabled()) + + # or on a standalone vm cloned from a template + for i in range(self.dialog.vm_type.count()): + opt_text = self.dialog.vm_type.itemText(i).lower() + if "standalone" in opt_text and "template" in opt_text and\ + "not based" not in opt_text and "empty" not in opt_text: + self.dialog.vm_type.setCurrentIndex(i) + break + self.assertFalse(self.dialog.install_system.isEnabled()) + self.assertTrue(self.dialog.launch_settings.isEnabled()) + self.assertTrue(self.dialog.template_vm.isEnabled()) + + # cannot set a template but can install system on a truly empty AppVM + for i in range(self.dialog.vm_type.count()): + opt_text = self.dialog.vm_type.itemText(i).lower() + if "standalone" in opt_text and\ + ("not based" in opt_text or "empty" in opt_text): + self.dialog.vm_type.setCurrentIndex(i) + break + self.assertTrue(self.dialog.install_system.isEnabled()) + self.assertTrue(self.dialog.launch_settings.isEnabled()) + self.assertFalse(self.dialog.template_vm.isEnabled()) + + def __click_ok(self): + okwidget = self.dialog.buttonBox.button( + self.dialog.buttonBox.Ok) + + QtTest.QTest.mouseClick(okwidget, QtCore.Qt.LeftButton) + + def __click_cancel(self): + cancelwidget = self.dialog.buttonBox.button( + self.dialog.buttonBox.Cancel) + + QtTest.QTest.mouseClick(cancelwidget, QtCore.Qt.LeftButton) + + +# class CreatteVMThreadTest(unittest.TestCase): + + +if __name__ == "__main__": + ha_syslog = logging.handlers.SysLogHandler('/dev/log') + ha_syslog.setFormatter( + logging.Formatter('%(name)s[%(process)d]: %(message)s')) + logging.root.addHandler(ha_syslog) + unittest.main() diff --git a/qubesmanager/tests/test_global_settings.py b/qubesmanager/tests/test_global_settings.py index dbccd5d..b9bc167 100644 --- a/qubesmanager/tests/test_global_settings.py +++ b/qubesmanager/tests/test_global_settings.py @@ -20,113 +20,144 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # import logging.handlers -import sys +import quamash +import asyncio import unittest import unittest.mock +import gc from PyQt4 import QtGui, QtTest, QtCore from qubesadmin import Qubes -from qubes.tests import SystemTestCase import qubesmanager.global_settings as global_settings -import concurrent.futures - -# sudo systemctl stop qubesd; sudo -E python3 test_backup.py -v ; sudo systemctl start qubesd - -def wrap_in_loop(func): - def wrapped(self): - self.loop.run_until_complete( - self.loop.run_in_executor(self.executor, - func, self)) - return wrapped -class GlobalSettingsTest(SystemTestCase): +class GlobalSettingsTest(unittest.TestCase): def setUp(self): super(GlobalSettingsTest, self).setUp() - self.qtapp = QtGui.QApplication(sys.argv) - self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) - self.setUpInExecutor() - - @wrap_in_loop - def setUpInExecutor(self): self.qapp = Qubes() - self.dialog = global_settings.GlobalSettingsWindow( - self.qtapp, self.qapp) + self.qtapp = QtGui.QApplication(["test", "-style", "cleanlooks"]) + self.loop = quamash.QEventLoop(self.qtapp) + self.dialog = global_settings.GlobalSettingsWindow(self.qtapp, + self.qapp) + + self.setattr_patcher = unittest.mock.patch.object( + type(self.dialog.qvm_collection), "__setattr__") + self.setattr_mock = self.setattr_patcher.start() + self.addCleanup(self.setattr_patcher.stop) def tearDown(self): - self.tearDownInExecutor() - super(GlobalSettingsTest, self).tearDown() + # process any pending events before destroying the object + self.qtapp.processEvents() - @wrap_in_loop - def tearDownInExecutor(self): + # queue destroying the QApplication object, do that for any other QT + # related objects here too + self.qtapp.deleteLater() + self.dialog.deleteLater() + + # process any pending events (other than just queued destroy), + # just in case + self.qtapp.processEvents() + + # execute main loop, which will process all events, _ + # including just queued destroy_ + self.loop.run_until_complete(asyncio.sleep(0)) + + # at this point it QT objects are destroyed, cleanup all remaining + # references; + # del other QT object here too + self.loop.close() del self.dialog del self.qtapp + del self.loop + gc.collect() + super(GlobalSettingsTest, self).tearDown() - @wrap_in_loop def test_00_settings_started(self): # non-empty drop-downs self.assertNotEqual( - self.dialog.default_kernel_combo.currentText(), "") + self.dialog.default_kernel_combo.currentText(), "", + "Default kernel not listed") self.assertNotEqual( - self.dialog.default_netvm_combo.currentText(), "") + self.dialog.default_netvm_combo.currentText(), "", + "Default netVM not listed") self.assertNotEqual( self.dialog.default_template_combo.currentText(), - "") + "", "Default template not listed") self.assertNotEqual( - self.dialog.clock_vm_combo.currentText(), "") + self.dialog.clock_vm_combo.currentText(), "", + "ClockVM not listed") self.assertNotEqual( - self.dialog.update_vm_combo.currentText(), "") + self.dialog.update_vm_combo.currentText(), "", + "UpdateVM for dom0 not listed") + self.assertNotEqual( + self.dialog.default_dispvm_combo.currentText(), "", + "Default DispVM not listed") + + # not empty memory settings + self.assertTrue(len(self.dialog.min_vm_mem.text()) > 4, + "Too short min mem value") + self.assertTrue(len(self.dialog.dom0_mem_boost.text()) > 4, + "Too short dom0 mem boost value") - @wrap_in_loop def test_01_load_correct_defs(self): # correctly selected default template selected_default_template = \ self.dialog.default_template_combo.currentText() self.assertTrue( selected_default_template.startswith( - self.app.default_template.name)) + str(getattr(self.qapp, 'default_template', '(none)'))), + "Incorrect default template loaded") # correctly selected default NetVM - selected_default_netvm = \ - self.dialog.default_netvm_combo.currentText() + selected_default_netvm = self.dialog.default_netvm_combo.currentText() self.assertTrue(selected_default_netvm.startswith( - self.app.default_netvm.name)) + str(getattr(self.qapp, 'default_netvm', '(none)'))), + "Incorrect default netVM loaded") # correctly selected default kernel - selected_default_kernel = \ - self.dialog.default_kernel_combo.currentText() + selected_default_kernel = self.dialog.default_kernel_combo.currentText() self.assertTrue(selected_default_kernel.startswith( - self.app.default_kernel)) + str(getattr(self.qapp, 'default_kernel', '(none)'))), + "Incorrect default kernel loaded") # correct ClockVM - selected_clockvm = \ - self.dialog.clock_vm_combo.currentText() - correct_clockvm = self.app.clockvm.name if self.app.clockvm \ - else "(none)" - self.assertTrue(selected_clockvm.startswith(correct_clockvm)) + selected_clockvm = self.dialog.clock_vm_combo.currentText() + correct_clockvm = str(getattr(self.qapp, 'clockvm', "(none)")) + self.assertTrue(selected_clockvm.startswith(correct_clockvm), + "Incorrect clockVM loaded") # correct updateVM - selected_updatevm = \ - self.dialog.update_vm_combo.currentText() - correct_updatevm = \ - self.app.updatevm.name if self.app.updatevm else "(none)" - self.assertTrue(selected_updatevm.startswith(correct_updatevm)) + selected_updatevm = self.dialog.update_vm_combo.currentText() + correct_updatevm = str(getattr(self.qapp, 'updatevm', "(none)")) + self.assertTrue(selected_updatevm.startswith(correct_updatevm), + "Incorrect updateVm loaded") + + # correct defaultDispVM + selected_default_dispvm = self.dialog.default_dispvm_combo.currentText() + correct_default_dispvm = \ + str(getattr(self.qapp, 'default_dispvm', "(none)")) + self.assertTrue( + selected_default_dispvm.startswith(correct_default_dispvm), + "Incorrect defaultDispVM loaded") # update vm status - self.assertEqual(self.app.check_updates_vm, - self.dialog.updates_vm.isChecked()) + self.assertEqual(self.qapp.check_updates_vm, + self.dialog.updates_vm.isChecked(), + "Incorrect check qube updates value loaded") - @wrap_in_loop def test_02_dom0_updates_load(self): # check dom0 updates try: - dom0_updates = self.app.check_updates_dom0 - except AttributeError: + dom0_updates = self.qapp.domains[ + 'dom0'].features['service.qubes-update-check'] + except KeyError: self.skipTest("check_updates_dom0 property not implemented") return - self.assertEqual(dom0_updates, self.dialog.updates_dom0.isChecked()) + self.assertEqual(bool(dom0_updates), + self.dialog.updates_dom0.isChecked(), + "Incorrect dom0 updates value") def __set_noncurrent(self, widget): if widget.count() < 2: @@ -150,178 +181,193 @@ class GlobalSettingsTest(SystemTestCase): okwidget = self.dialog.buttonBox.button( self.dialog.buttonBox.Ok) - QtTest.QTest.mouseClick(okwidget, - QtCore.Qt.LeftButton) + QtTest.QTest.mouseClick(okwidget, QtCore.Qt.LeftButton) + + def __click_cancel(self): + cancelwidget = self.dialog.buttonBox.button( + self.dialog.buttonBox.Cancel) + + QtTest.QTest.mouseClick(cancelwidget, QtCore.Qt.LeftButton) + + def test_03_nothing_changed_ok(self): + self.__click_ok() + + self.assertEqual(self.setattr_mock.call_count, 0, + "Changes occurred despite no changes being made") + + def test_04_nothing_changed_cancel(self): + self.__click_cancel() + + self.assertEqual(self.setattr_mock.call_count, 0, + "Changes occurred despite no changes being made") - @wrap_in_loop def test_10_set_update_vm(self): new_updatevm_name = self.__set_noncurrent(self.dialog.update_vm_combo) + self.__click_ok() - self.assertEqual(self.app.updatevm.name, new_updatevm_name) + self.setattr_mock.assert_called_once_with('updatevm', new_updatevm_name) - @wrap_in_loop def test_11_set_update_vm_to_none(self): self.__set_none(self.dialog.update_vm_combo) + self.__click_ok() - self.assertIsNone(self.app.updatevm) + self.setattr_mock.assert_called_once_with('updatevm', None) - @wrap_in_loop - def test_12_set_update_vm_to_none2(self): - self.app.updatevm = None + def test_20_set_clock_vm(self): + new_clockvm_name = self.__set_noncurrent(self.dialog.clock_vm_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('clockvm', new_clockvm_name) + + def test_21_set_clock_vm_to_none(self): + self.__set_none(self.dialog.clock_vm_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('clockvm', None) + + def test_30_set_default_netvm(self): + new_netvm_name = self.__set_noncurrent(self.dialog.default_netvm_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_netvm', + new_netvm_name) + + def test_31_set_default_netvm_to_none(self): + self.__set_none(self.dialog.default_netvm_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_netvm', None) + + def test_40_set_default_template(self): + new_def_template_name = self.__set_noncurrent( + self.dialog.default_template_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_template', + new_def_template_name) + + def test_50_set_default_kernel(self): + new_def_kernel_name = self.__set_noncurrent( + self.dialog.default_kernel_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_kernel', + new_def_kernel_name) + + def test_51_set_default_kernel_to_none(self): + self.__set_none(self.dialog.default_kernel_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_kernel', + None) + + def test_60_set_dom0_updates_true(self): + current_state = self.dialog.updates_dom0.isChecked() + self.dialog.updates_dom0.setChecked(not current_state) + + with unittest.mock.patch.object( + type(self.dialog.qvm_collection.domains['dom0'].features), + '__setitem__') as mock_features: + self.__click_ok() + mock_features.assert_called_once_with('service.qubes-update-check', + not current_state) + + def test_70_change_vm_updates(self): + current_state = self.dialog.updates_vm.isChecked() + self.dialog.updates_vm.setChecked(not current_state) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('check_updates_vm', + not current_state) + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + @unittest.mock.patch('qubesadmin.features.Features.__setitem__') + def test_72_set_all_vms_true(self, mock_features, msgbox): + + QtTest.QTest.mouseClick(self.dialog.enable_updates_all, + QtCore.Qt.LeftButton) + + self.assertEqual(msgbox.call_count, 1, + "Wrong number of confirmation window calls") + + call_list_expected = \ + [unittest.mock.call('service.qubes-update-check', True) for vm + in self.qapp.domains if vm.klass != 'AdminVM'] + + self.assertListEqual(call_list_expected, + mock_features.call_args_list) + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + @unittest.mock.patch('qubesadmin.features.Features.__setitem__') + def test_73_set_all_vms_false(self, mock_features, msgbox): + + QtTest.QTest.mouseClick(self.dialog.disable_updates_all, + QtCore.Qt.LeftButton) + + self.assertEqual(msgbox.call_count, 1, + "Wrong number of confirmation window calls") + + call_list_expected = \ + [unittest.mock.call('service.qubes-update-check', False) for vm + in self.qapp.domains if vm.klass != 'AdminVM'] + + self.assertListEqual(call_list_expected, + mock_features.call_args_list) + + def test_80_set_default_dispvm(self): + new_dispvm_name = self.__set_noncurrent( + self.dialog.default_dispvm_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_dispvm', + new_dispvm_name) + + def test_81_set_default_dispvm_to_none(self): + self.__set_none(self.dialog.default_dispvm_combo) + + self.__click_ok() + + self.setattr_mock.assert_called_once_with('default_dispvm', None) + + @unittest.mock.patch.object( + type(Qubes()), '__getattr__', + side_effect=(lambda x: False if x == 'check_updates_vm' else None)) + def test_90_test_all_set_none(self, mock_qubes): + mock_qubes.configure_mock() self.dialog = global_settings.GlobalSettingsWindow( self.qtapp, self.qapp) self.assertEqual(self.dialog.update_vm_combo.currentText(), - "(none) (current)") - - @wrap_in_loop - def test_20_set_clock_vm(self): - new_clockvm_name = self.__set_noncurrent(self.dialog.clock_vm_combo) - self.__click_ok() - - self.assertEqual(self.app.clockvm.name, new_clockvm_name) - - @wrap_in_loop - def test_21_set_clock_vm_to_none(self): - self.__set_none(self.dialog.clock_vm_combo) - self.__click_ok() - - self.assertIsNone(self.app.clockvm) - - @wrap_in_loop - def test_22_set_clock_vm_to_none2(self): - self.app.clockvm = None - self.dialog = global_settings.GlobalSettingsWindow( - self.qtapp, self.qapp) - + "(none) (current)", + "UpdateVM displays as none incorrectly") self.assertEqual(self.dialog.clock_vm_combo.currentText(), - "(none) (current)") - - @wrap_in_loop - def test_30_set_default_netvm(self): - new_netvm_name = self.__set_noncurrent(self.dialog.default_netvm_combo) - self.__click_ok() - - self.assertEqual(self.app.default_netvm.name, new_netvm_name) - - @wrap_in_loop - def test_31_set_default_netvm_to_none(self): - self.__set_none(self.dialog.default_netvm_combo) - self.__click_ok() - - self.assertIsNone(self.app.default_netvm) - - @wrap_in_loop - def test_32_set_default_netvm_to_none2(self): - self.app.default_netvm = None - self.dialog = global_settings.GlobalSettingsWindow( - self.qtapp, self.qapp) - + "(none) (current)", + "ClockVM displays as none incorrectly") self.assertEqual(self.dialog.default_netvm_combo.currentText(), - "(none) (current)") - - @wrap_in_loop - def test_40_set_default_template(self): - new_def_template_name = self.__set_noncurrent( - self.dialog.default_template_combo) - self.__click_ok() - - self.assertEqual(self.app.default_template.name, new_def_template_name) - - @wrap_in_loop - def test_50_set_default_kernel(self): - new_def_kernel_name = self.__set_noncurrent( - self.dialog.default_kernel_combo) - self.__click_ok() - - self.assertEqual(self.app.default_kernel, new_def_kernel_name) - - @wrap_in_loop - def test_51_set_default_kernel_to_none(self): - self.__set_none(self.dialog.default_kernel_combo) - self.__click_ok() - - self.assertEqual(self.app.default_kernel, '') - - @wrap_in_loop - def test_52_set_default_kernel_to_none2(self): - self.app.default_kernel = None - self.dialog = global_settings.GlobalSettingsWindow( - self.qtapp, self.qapp) - + "(none) (current)", + "Default NetVM displays as none incorrectly") + self.assertEqual(self.dialog.default_template_combo.currentText(), + "(none) (current)", + "Default template displays as none incorrectly") self.assertEqual(self.dialog.default_kernel_combo.currentText(), - "(none) (current)") - - @wrap_in_loop - def test_60_set_dom0_updates_true(self): - self.dialog.updates_dom0.setChecked(True) - self.__click_ok() - - if not hasattr(self.app, 'check_updates_dom0'): - self.skipTest("check_updates_dom0 property not implemented") - - self.assertTrue(self.app.check_updates_dom0) - - @wrap_in_loop - def test_61_set_dom0_updates_false(self): - self.dialog.updates_dom0.setChecked(False) - self.__click_ok() - - if not hasattr(self.app, 'check_updates_dom0'): - self.skipTest("check_updates_dom0 property not implemented") - - self.assertFalse(self.app.check_updates_dom0) - - @wrap_in_loop - def test_70_set_vm_updates_true(self): - self.dialog.updates_vm.setChecked(True) - self.__click_ok() - - self.assertTrue(self.app.check_updates_vm) - - @wrap_in_loop - def test_71_set_vm_updates_false(self): - self.dialog.updates_vm.setChecked(False) - self.__click_ok() - - self.assertFalse(self.app.check_updates_vm) - - @wrap_in_loop - def test_72_set_all_vms_true(self): - - with unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", - return_value=QtGui.QMessageBox.Yes) as msgbox: - - QtTest.QTest.mouseClick(self.dialog.enable_updates_all, - QtCore.Qt.LeftButton) - - msgbox.assert_called_once_with( - self.dialog, - "Change state of all qubes", - "Are you sure you want to set all qubes to check for updates?", - unittest.mock.ANY) - - for vm in self.app.domains: - self.assertTrue(vm.features['check-updates']) - - @wrap_in_loop - def test_73_set_all_vms_false(self): - with unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", - return_value=QtGui.QMessageBox.Yes) as msgbox: - QtTest.QTest.mouseClick(self.dialog.disable_updates_all, - QtCore.Qt.LeftButton) - - msgbox.assert_called_once_with( - self.dialog, - "Change state of all qubes", - "Are you sure you want to set all qubes to not check " - "for updates?", - unittest.mock.ANY) - - for vm in self.app.domains: - self.assertFalse(vm.features['check-updates']) + "(none) (current)", + "Defautl kernel displays as none incorrectly") + self.assertEqual(self.dialog.default_dispvm_combo.currentText(), + "(none) (current)", + "Default DispVM displays as none incorrectly") if __name__ == "__main__": @@ -330,3 +376,5 @@ if __name__ == "__main__": logging.Formatter('%(name)s[%(process)d]: %(message)s')) logging.root.addHandler(ha_syslog) unittest.main() + +# TODO: add tests for memory settings once memory is handled better diff --git a/qubesmanager/tests/test_qube_manager.py b/qubesmanager/tests/test_qube_manager.py index 46cd16c..03bca2b 100644 --- a/qubesmanager/tests/test_qube_manager.py +++ b/qubesmanager/tests/test_qube_manager.py @@ -19,14 +19,21 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # +import asyncio +import contextlib import logging.handlers import sys import unittest import unittest.mock -import gc +import gc +import subprocess +import datetime +import time + +import quamash from PyQt4 import QtGui, QtTest, QtCore -from qubesadmin import Qubes +from qubesadmin import Qubes, events, exc import qubesmanager.qube_manager as qube_manager @@ -34,36 +41,62 @@ class QubeManagerTest(unittest.TestCase): def setUp(self): super(QubeManagerTest, self).setUp() - # # todo: mockup no settings file - # self.patcher = unittest.mock.patch('builtins.open') - # self.mock_open = self.patcher.start() - # self.mock_open.side_effect = FileNotFoundError() - # self.addCleanup(self.patcher.stop) + self.mock_qprogress = unittest.mock.patch('PyQt4.QtGui.QProgressDialog') + self.mock_qprogress.start() + + self.addCleanup(self.mock_qprogress.stop) self.qapp = Qubes() - self.qtapp = QtGui.QApplication(sys.argv) - self.dialog = qube_manager.VmManagerWindow(self.qtapp, self.qapp) + self.qtapp = QtGui.QApplication(["test", "-style", "cleanlooks"]) + self.dispatcher = events.EventsDispatcher(self.qapp) + + self.loop = quamash.QEventLoop(self.qtapp) + + self.dialog = qube_manager.VmManagerWindow( + self.qtapp, self.qapp, self.dispatcher) def tearDown(self): + # process any pending events before destroying the object + self.qtapp.processEvents() + + # queue destroying the QApplication object, do that for any other QT + # related objects here too + self.qtapp.deleteLater() + self.dialog.deleteLater() + + # process any pending events (other than just queued destroy), + # just in case + self.qtapp.processEvents() + + # execute main loop, which will process all events, _ + # including just queued destroy_ + self.loop.run_until_complete(asyncio.sleep(0)) + + # at this point it QT objects are destroyed, cleanup all remaining + # references; + # del other QT object here too + self.loop.close() del self.dialog del self.qtapp - del self.qapp - super(QubeManagerTest, self).tearDown() + del self.loop gc.collect() + super(QubeManagerTest, self).tearDown() - # 0 - Check if the window was displayed and populated correctly + def test_000_window_loads(self): + self.assertTrue(self.dialog.table is not None, "Window did not load") - def test_00_window_loads(self): - self.assertTrue(self.dialog.table is not None) - - @unittest.expectedFailure - def test_01_table_populates_correctly(self): + def test_001_correct_vms_listed(self): vms_in_table = [] + for row in range(self.dialog.table.rowCount()): - item = self.dialog.table.item(row, - self.dialog.columns_indices["Name"]) - self.assertIsNotNone(item) - vms_in_table.append(item.text()) + vm = self._get_table_item(row, "Name").vm + self.assertIsNotNone(vm) + vms_in_table.append(vm.name) + + # check that name is listed correctly + name_item = self._get_table_item(row, "Name") + self.assertEqual(name_item.text(), vm.name, + "Incorrect VM name for {}".format(vm.name)) actual_vms = [vm.name for vm in self.qapp.domains] @@ -71,44 +104,1391 @@ class QubeManagerTest(unittest.TestCase): "Incorrect number of VMs loaded") self.assertListEqual(sorted(vms_in_table), sorted(actual_vms), "Incorrect VMs loaded") -# todos: - # did settings load correctly - # did settings save corectly + + def test_002_correct_template_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + # check that template is listed correctly + template_item = self._get_table_item(row, "Template") + if getattr(vm, "template", None): + self.assertEqual(vm.template, + template_item.text(), + "Incorrect template for {}".format(vm.name)) + else: + self.assertEqual(vm.klass, template_item.text(), + "Incorrect class for {}".format(vm.name)) + + def test_003_correct_netvm_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + # check that netvm is listed correctly + netvm_item = self._get_table_item(row, "NetVM") + netvm_value = getattr(vm, "netvm", None) + netvm_value = "n/a" if not netvm_value else netvm_value + if netvm_value and hasattr(vm, "netvm") \ + and vm.property_is_default("netvm"): + netvm_value = "default ({})".format(netvm_value) + + self.assertEqual(netvm_value, + netvm_item.text(), + "Incorrect netvm for {}".format(vm.name)) + + def test_004_correct_disk_usage_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + size_item = self._get_table_item(row, "Size") + if vm.klass == 'AdminVM': + size_value = "n/a" + else: + size_value = round(vm.get_disk_utilization() / (1024 * 1024), 2) + size_value = str(size_value) + " MiB" + + self.assertEqual(size_value, + size_item.text(), + "Incorrect size for {}".format(vm.name)) + + def test_005_correct_internal_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + internal_item = self._get_table_item(row, "Internal") + internal_value = "Yes" if vm.features.get('internal', False) else "" + + self.assertEqual(internal_item.text(), internal_value, + "Incorrect internal value for {}".format(vm.name)) + + def test_006_correct_ip_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + ip_item = self._get_table_item(row, "IP") + if hasattr(vm, 'ip'): + ip_value = getattr(vm, 'ip') + ip_value = "" if ip_value is None else ip_value + else: + ip_value = "n/a" + + self.assertEqual(ip_value, ip_item.text(), + "Incorrect ip value for {}".format(vm.name)) + + def test_007_incl_in_backups_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + incl_backups_item = self._get_table_item(row, "Include in backups") + incl_backups_value = getattr(vm, 'include_in_backups', False) + incl_backups_value = "Yes" if incl_backups_value else "" + + self.assertEqual( + incl_backups_value, incl_backups_item.text(), + "Incorrect include in backups value for {}".format(vm.name)) + + def test_008_last_backup_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + last_backup_item = self._get_table_item(row, "Last backup") + last_backup_value = getattr(vm, 'backup_timestamp', None) + + if last_backup_value: + last_backup_value = str( + datetime.datetime.fromtimestamp(last_backup_value)) + else: + last_backup_value = "" + + self.assertEqual( + last_backup_value, last_backup_item.text(), + "Incorrect last backup value for {}".format(vm.name)) + + def test_009_def_dispvm_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + def_dispvm_item = self._get_table_item(row, "Default DispVM") + def_dispvm_value = getattr(vm, "default_dispvm", None) + + self.assertEqual( + str(def_dispvm_value), def_dispvm_item.text(), + "Incorrect default dispvm value for {}".format(vm.name)) + + def test_010_is_dvm_template_listed(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + is_dvm_template_item = self._get_table_item(row, "Is DVM Template") + is_dvm_template_value = "Yes" if \ + getattr(vm, "template_for_dispvms", False) else "" + + self.assertEqual( + is_dvm_template_value, is_dvm_template_item.text(), + "Incorrect is DVM template value for {}".format(vm.name)) + + def test_011_is_label_correct(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + label_item = self._get_table_item(row, "Label") + + self.assertEqual(label_item.icon_path, vm.label.icon) + + def test_012_is_state_correct(self): + for row in range(self.dialog.table.rowCount()): + vm = self._get_table_item(row, "Name").vm + + state_item = self._get_table_item(row, "State") + + # this should not be done like that in table_widgets + displayed_power_state = state_item.on_icon.status + + if vm.is_running(): + correct_power_state = 3 + else: + correct_power_state = 0 + + self.assertEqual( + displayed_power_state, correct_power_state, + "Wrong power state displayed for {}".format(vm.name)) + + def test_013_incorrect_settings_file(self): + mock_settings = unittest.mock.MagicMock(spec=QtCore.QSettings) + + settings_result_dict = {"view/sort_column": "Cthulhu", + "view/sort_order": "Fhtagn", + "view/menubar_visible": "R'lyeh" + } + + mock_settings.side_effect = ( + lambda x, *args, **kwargs: settings_result_dict.get(x)) + + with unittest.mock.patch('PyQt4.QtCore.QSettings.value', mock_settings),\ + unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning')\ + as mock_warning: + self.dialog = qube_manager.VmManagerWindow( + self.qtapp, self.qapp, self.dispatcher) + self.assertEqual(mock_warning.call_count, 1) + + def test_100_sorting(self): + + self.dialog.table.sortByColumn(self.dialog.columns_indices["Template"]) + self.__check_sorting("Template") + + self.dialog.table.sortByColumn(self.dialog.columns_indices["Name"]) + self.__check_sorting("Name") + + @unittest.mock.patch('qubesmanager.qube_manager.QtCore.QSettings.setValue') + @unittest.mock.patch('qubesmanager.qube_manager.QtCore.QSettings.sync') + def test_101_hide_column(self, mock_sync, mock_settings): + self.dialog.action_is_dvm_template.trigger() + mock_settings.assert_called_with('columns/Is DVM Template', False) + self.assertEqual(mock_sync.call_count, 1, "Hidden column not synced") + + self.dialog.action_is_dvm_template.trigger() + mock_settings.assert_called_with('columns/Is DVM Template', True) + self.assertEqual(mock_sync.call_count, 2, "Hidden column not synced") @unittest.mock.patch('qubesmanager.settings.VMSettingsWindow') - def test_20_vm_open_settings(self, mock_window): + def test_200_vm_open_settings(self, mock_window): selected_vm = self._select_non_admin_vm() self.assertIsNotNone(selected_vm, "No valid non-admin VM found") widget = self.dialog.toolbar.widgetForAction( self.dialog.action_settings) QtTest.QTest.mouseClick(widget, QtCore.Qt.LeftButton) - mock_window.assert_called_once_with(selected_vm, self.qtapp, "basic") + mock_window.assert_called_once_with( + selected_vm, self.qtapp, "basic") + + def test_201_vm_open_settings_admin(self): + self._select_admin_vm() + + self.assertFalse(self.dialog.action_settings.isEnabled(), + "Settings not disabled for admin VM") + self.assertFalse(self.dialog.action_editfwrules.isEnabled(), + "Settings not disabled for admin VM") + self.assertFalse(self.dialog.action_appmenus.isEnabled(), + "Settings not disabled for admin VM") @unittest.mock.patch('qubesmanager.settings.VMSettingsWindow') - def test_21_vm_firewall_settings(self, mock_window): + def test_202_vm_open_firewall(self, mock_window): selected_vm = self._select_non_admin_vm() self.assertIsNotNone(selected_vm, "No valid non-admin VM found") widget = self.dialog.toolbar.widgetForAction( self.dialog.action_editfwrules) QtTest.QTest.mouseClick(widget, QtCore.Qt.LeftButton) - mock_window.assert_called_once_with(selected_vm, self.qtapp, "firewall") + mock_window.assert_called_once_with( + selected_vm, self.qtapp, "firewall") + @unittest.mock.patch('qubesmanager.settings.VMSettingsWindow') + def test_203_vm_open_apps(self, mock_window): + selected_vm = self._select_non_admin_vm() + self.assertIsNotNone(selected_vm, "No valid non-admin VM found") + widget = self.dialog.toolbar.widgetForAction( + self.dialog.action_appmenus) + QtTest.QTest.mouseClick(widget, + QtCore.Qt.LeftButton) + mock_window.assert_called_once_with( + selected_vm, self.qtapp, "applications") -# test whether pause/start/resume works - @unittest.mock.patch('qubesmanager.qubesadmin.vm.QubesVM.pause') - @unittest.mock.patch('qubesmanager.qubesadmin.vm.QubesVM.is_running') - @unittest.mock.patch('qubesmanager.qubesadmin.vm.QubesVM.get_power_state') - def _select_non_admin_vm(self): + def test_204_vm_keyboard(self): + selected_vm = self._select_non_admin_vm(running=True) + self.assertIsNotNone(selected_vm, "No valid non-admin VM found") + widget = self.dialog.toolbar.widgetForAction( + self.dialog.action_set_keyboard_layout) + with unittest.mock.patch.object(selected_vm, 'run') as mock_run: + QtTest.QTest.mouseClick(widget, + QtCore.Qt.LeftButton) + mock_run.assert_called_once_with("qubes-change-keyboard-layout") + + def test_205_vm_keyboard_not_running(self): + selected_vm = self._select_non_admin_vm(running=False) + self.assertIsNotNone(selected_vm, "No valid non-admin VM found") + widget = self.dialog.toolbar.widgetForAction( + self.dialog.action_set_keyboard_layout) + with unittest.mock.patch.object(selected_vm, 'run') as mock_run: + QtTest.QTest.mouseClick(widget, + QtCore.Qt.LeftButton) + self.assertEqual(mock_run.call_count, 0, + "Keyboard change called on a halted VM") + + def test_206_dom0_keyboard(self): + self._select_admin_vm() + + self.assertFalse(self.dialog.action_set_keyboard_layout.isEnabled()) + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + def test_207_update_vm_not_running(self, _): + selected_vm = self._select_templatevm(running=False) + self.assertIsNotNone(selected_vm, "No valid template VM found") + + widget = self.dialog.toolbar.widgetForAction( + self.dialog.action_updatevm) + + with unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread') \ + as mock_update: + QtTest.QTest.mouseClick(widget, + QtCore.Qt.LeftButton) + mock_update.assert_called_once_with(selected_vm) + mock_update().start.assert_called_once_with() + + def test_208_update_vm_admin(self): + selected_vm = self._select_admin_vm() + self.assertIsNotNone(selected_vm, "No valid admin VM found") + + widget = self.dialog.toolbar.widgetForAction( + self.dialog.action_updatevm) + + with unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread') \ + as mock_update: + QtTest.QTest.mouseClick(widget, + QtCore.Qt.LeftButton) + mock_update.assert_called_once_with(selected_vm) + mock_update().start.assert_called_once_with() + + @unittest.mock.patch("PyQt4.QtGui.QInputDialog.getText", + return_value=("command to run", True)) + def test_209_run_command_in_vm(self, _): + selected_vm = self._select_non_admin_vm() + + self.assertIsNotNone(selected_vm, "No valid non-admin VM found") + + with unittest.mock.patch('qubesmanager.qube_manager.RunCommandThread') \ + as mock_thread: + self.dialog.action_run_command_in_vm.trigger() + mock_thread.assert_called_once_with(selected_vm, "command to run") + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + def test_210_run_command_in_adminvm(self): + self._select_admin_vm() + + self.assertFalse(self.dialog.action_run_command_in_vm.isEnabled(), + "Should not be able to run commands for dom0") + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.warning") + def test_211_pausevm(self, mock_warn): + selected_vm = self._select_non_admin_vm(running=True) + + self.assertTrue(self.dialog.action_pausevm.isEnabled(), + "Pause not enabled for a running VM") + + with unittest.mock.patch.object(selected_vm, 'pause') as mock_pause: + self.dialog.action_pausevm.trigger() + mock_pause.assert_called_once_with() + + mock_pause.side_effect = exc.QubesException('Error') + self.dialog.action_pausevm.trigger() + self.assertEqual(mock_warn.call_count, 1) + + def test_212_resumevm(self): + selected_vm = self._select_non_admin_vm(running=False) + + with unittest.mock.patch.object(selected_vm, 'get_power_state')\ + as mock_state, \ + unittest.mock.patch.object(selected_vm, 'unpause')\ + as mock_unpause: + mock_state.return_value = 'Paused' + self.dialog.action_resumevm.trigger() + mock_unpause.assert_called_once_with() + + with unittest.mock.patch('qubesmanager.qube_manager.StartVMThread') \ + as mock_thread: + self.dialog.action_resumevm.trigger() + mock_thread.assert_called_once_with(selected_vm) + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + def test_213_resume_running_vm(self): + self._select_non_admin_vm(running=True) + self.assertFalse(self.dialog.action_resumevm.isEnabled()) + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + @unittest.mock.patch('PyQt4.QtCore.QTimer.singleShot') + @unittest.mock.patch('qubesmanager.qube_manager.VmShutdownMonitor') + def test_214_shutdownvm(self, mock_monitor, mock_timer, _): + selected_vm = self._select_non_admin_vm(running=True) + + with unittest.mock.patch.object(selected_vm, 'shutdown')\ + as mock_shutdown: + self.dialog.action_shutdownvm.trigger() + mock_shutdown.assert_called_once_with() + mock_monitor.assert_called_once_with( + selected_vm, + unittest.mock.ANY, unittest.mock.ANY, + unittest.mock.ANY, unittest.mock.ANY) + mock_timer.assert_called_once_with(unittest.mock.ANY, + unittest.mock.ANY) + + def test_215_shutdown_halted_vm(self): + self._select_non_admin_vm(running=False) + + self.assertFalse(self.dialog.action_shutdownvm.isEnabled()) + + @unittest.mock.patch('qubesmanager.create_new_vm.NewVmDlg') + def test_216_create_vm(self, mock_new_vm): + action = self.dialog.action_createvm + self.assertTrue(action.isEnabled()) + + action.trigger() + + self.assertEqual(mock_new_vm.call_count, 1, + "Create New VM window did not appear") + + def test_217_remove_admin_vm(self): + self._select_admin_vm() + + self.assertFalse(self.dialog.action_removevm.isEnabled()) + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox") + @unittest.mock.patch('qubesadmin.utils.vm_dependencies') + def test_218_remove_vm_dependencies(self, mock_dependencies, mock_msgbox): + action = self.dialog.action_removevm + + mock_vm = unittest.mock.Mock(spec=['name'], + **{'name.return_value': 'testvm'}) + mock_dependencies.return_value = [(mock_vm, "test_prop")] + + action.trigger() + mock_msgbox().show.assert_called_with() + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch("PyQt4.QtGui.QInputDialog.getText") + @unittest.mock.patch('qubesadmin.utils.vm_dependencies') + def test_219_remove_vm_no_depencies( + self, mock_dependencies, mock_input, mock_warning): + action = self.dialog.action_removevm + selected_vm = self._select_non_admin_vm(running=False) + + # test with no dependencies + mock_dependencies.return_value = None + + with unittest.mock.patch('qubesmanager.common_threads.RemoveVMThread')\ + as mock_thread: + mock_input.return_value = (selected_vm.name, False) + action.trigger() + self.assertEqual(mock_thread.call_count, 0, + "VM removed despite user clicking 'cancel") + + mock_input.return_value = ("wrong_name", True) + action.trigger() + self.assertEqual(mock_warning.call_count, 1) + self.assertEqual(mock_thread.call_count, 0, + "VM removed despite user not confirming the name") + + mock_input.return_value = (selected_vm.name, True) + action.trigger() + mock_thread.assert_called_once_with(selected_vm) + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + def test_220_restartvm_halted_vm(self): + self._select_non_admin_vm(running=False) + self.assertFalse(self.dialog.action_restartvm.isEnabled()) + + @unittest.mock.patch('PyQt4.QtCore.QTimer.singleShot') + @unittest.mock.patch('qubesmanager.qube_manager.VmShutdownMonitor') + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + def test_221_restartvm_running_vm(self, _msgbox, mock_monitor, _qtimer): + selected_vm = self._select_non_admin_vm(running=True) + + action = self.dialog.action_restartvm + + # currently the VM is running + with unittest.mock.patch.object(selected_vm, 'shutdown')\ + as mock_shutdown: + action.trigger() + mock_shutdown.assert_called_once_with() + mock_monitor.assert_called_once_with( + selected_vm, unittest.mock.ANY, + unittest.mock.ANY, True, unittest.mock.ANY) + + @unittest.mock.patch('qubesmanager.qube_manager.StartVMThread') + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + def test_222_restartvm_shutdown_meantime(self, _, mock_thread): + selected_vm = self._select_non_admin_vm(running=True) + + action = self.dialog.action_restartvm + + # it was shutdown in the meantime + with unittest.mock.patch.object( + selected_vm, 'is_running', **{'return_value': False}): + action.trigger() + mock_thread.assert_called_once_with(selected_vm) + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + @unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread') + def test_223_updatevm_running(self, mock_thread): + selected_vm = self._select_non_admin_vm(running=True) + + self.dialog.action_updatevm.trigger() + + mock_thread.assert_called_once_with(selected_vm) + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + @unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread') + def test_224_updatevm_halted(self, mock_thread, _): + selected_vm = self._select_non_admin_vm(running=False) + + self.dialog.action_updatevm.trigger() + + mock_thread.assert_called_once_with(selected_vm) + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Yes) + def test_224_killvm(self, _): + selected_vm = self._select_non_admin_vm(running=True) + action = self.dialog.action_killvm + + with unittest.mock.patch.object(selected_vm, 'kill') as mock_kill: + action.trigger() + mock_kill.assert_called_once_with() + + @unittest.mock.patch("PyQt4.QtGui.QMessageBox.question", + return_value=QtGui.QMessageBox.Cancel) + def test_225_killvm_cancel(self, _): + selected_vm = self._select_non_admin_vm(running=True) + action = self.dialog.action_killvm + + with unittest.mock.patch.object(selected_vm, 'kill') as mock_kill: + action.trigger() + self.assertEqual(mock_kill.call_count, 0, + "Ignored Cancel on kill VM") + + @unittest.mock.patch('qubesmanager.global_settings.GlobalSettingsWindow') + def test_226_global_settings(self, mock_settings): + self._select_non_admin_vm() + self.dialog.action_global_settings.trigger() + self.assertEqual(mock_settings.call_count, 1, + "Global Settings not opened") + + self._select_admin_vm() + self.dialog.action_global_settings.trigger() + self.assertEqual(mock_settings.call_count, 2, + "Global Settings not opened for the second time") + + @unittest.mock.patch('qubesmanager.backup.BackupVMsWindow') + def test_227_backup(self, mock_backup): + self.dialog.action_backup.trigger() + self.assertTrue(self.dialog.action_backup.isEnabled()) + self.assertEqual(mock_backup.call_count, 1, + "Backup window does not appear") + + @unittest.mock.patch('qubesmanager.restore.RestoreVMsWindow') + def test_228_restore(self, mock_restore): + self.dialog.action_restore.trigger() + self.assertTrue(self.dialog.action_restore.isEnabled()) + self.assertEqual(mock_restore.call_count, 1, + "Backup window does not appear") + + @unittest.mock.patch('qubesmanager.qube_manager.AboutDialog') + def test_229_about_qubes(self, mock_about): + self.assertTrue(self.dialog.action_about_qubes.isEnabled()) + self.dialog.action_about_qubes.trigger() + + self.assertEqual( + mock_about.call_count, 1, "About window does not appear") + + def test_230_exit_action(self): + self.assertTrue(self.dialog.action_exit.isEnabled()) + with unittest.mock.patch.object(self.dialog, 'close') as mock_close: + self.dialog.action_exit.trigger() + mock_close.assert_called_once_with() + + @unittest.mock.patch('subprocess.check_call') + def test_231_template_manager(self, mock_subprocess): + self.assertTrue(self.dialog.action_manage_templates.isEnabled()) + + self.dialog.action_manage_templates.trigger() + mock_subprocess.assert_called_once_with('qubes-template-manager') + + @unittest.mock.patch('qubesmanager.common_threads.CloneVMThread') + @unittest.mock.patch('PyQt4.QtGui.QInputDialog.getText') + def test_232_clonevm(self, mock_input, mock_thread): + action = self.dialog.action_clonevm + + self._select_admin_vm() + self.assertFalse(action.isEnabled()) + + selected_vm = self._select_non_admin_vm() + self.assertTrue(action.isEnabled()) + + mock_input.return_value = (selected_vm.name + "clone1", False) + action.trigger() + self.assertEqual(mock_thread.call_count, 0, + "Ignores cancelling clone VM") + + mock_input.return_value = (selected_vm.name + "clone1", True) + action.trigger() + mock_thread.assert_called_once_with(selected_vm, + selected_vm.name + "clone1") + mock_thread().finished.connect.assert_called_once_with( + self.dialog.clear_threads) + mock_thread().start.assert_called_once_with() + + def test_233_search_action(self): + self.qtapp.setActiveWindow(self.dialog.searchbox) + self.dialog.action_search.trigger() + self.assertTrue(self.dialog.searchbox.hasFocus()) + + # input text + self.dialog.searchbox.setText("sys") + # click outside the widget + QtTest.QTest.mouseClick(self.dialog.table, QtCore.Qt.LeftButton) + # click the widget, check if it is correctly activated and the whole + # text was selected + QtTest.QTest.mouseClick(self.dialog.searchbox, QtCore.Qt.LeftButton) + self.assertTrue(self.dialog.searchbox.hasFocus()) + self.assertEqual(self.dialog.searchbox.selectedText(), "sys") + + def test_234_searchbox(self): + # look for sys + self.dialog.searchbox.setText("sys") + expected_number = \ + len([vm for vm in self.qapp.domains if "sys" in vm.name]) + actual_number = self._count_visible_table_rows() + self.assertEqual(expected_number, actual_number, + "Incorrect number of vms shown for 'sys'") + + # clear search + self.dialog.searchbox.setText("") + expected_number = len([vm for vm in self.qapp.domains]) + actual_number = self._count_visible_table_rows() + self.assertEqual(expected_number, actual_number, + "Incorrect number of vms shown for cleared search box") + + def test_235_hide_show_toolbars(self): + with unittest.mock.patch('PyQt4.QtCore.QSettings.setValue')\ + as mock_setvalue: + self.dialog.action_menubar.trigger() + mock_setvalue.assert_called_with('view/menubar_visible', False) + self.dialog.action_toolbar.trigger() + mock_setvalue.assert_called_with('view/toolbar_visible', False) + + self.assertFalse(self.dialog.menubar.isVisible(), + "Menubar not hidden correctly") + self.assertFalse(self.dialog.toolbar.isVisible(), + "Toolbar not hidden correctly") + + def test_236_clear_searchbox(self): + self.dialog.searchbox.setText("text") + + self.assertEqual(self.dialog.searchbox.text(), "text") + + QtTest.QTest.keyPress(self.dialog, QtCore.Qt.Key_Escape) + + self.assertEqual(self.dialog.searchbox.text(), "", + "Escape failed to clear searchbox") + + expected_number = len([vm for vm in self.qapp.domains]) + actual_number = self._count_visible_table_rows() + self.assertEqual(expected_number, actual_number, + "Incorrect number of vms shown for cleared search box") + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.information') + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + def test_300_clear_threads(self, mock_warning, mock_info): + mock_thread_finished_ok = unittest.mock.Mock( + spec=['isFinished', 'msg', 'msg_is_success'], + msg=None, msg_is_success=False, + **{'isFinished.return_value': True}) + mock_thread_not_finished = unittest.mock.Mock( + spec=['isFinished', 'msg', 'msg_is_success'], + msg=None, msg_is_success=False, + **{'isFinished.return_value': False}) + mock_thread_finished_error = unittest.mock.Mock( + spec=['isFinished', 'msg', 'msg_is_success'], + msg=("Error", "Error"), msg_is_success=False, + **{'isFinished.return_value': True}) + mock_thread_fin_error_success = unittest.mock.Mock( + spec=['isFinished', 'msg', 'msg_is_success'], + msg=("Done", "Done"), msg_is_success=True, + **{'isFinished.return_value': True}) + + # single finished thread + self.dialog.threads_list = [mock_thread_not_finished, + mock_thread_finished_ok] + self.dialog.clear_threads() + self.assertEqual(mock_warning.call_count, 0) + self.assertEqual(mock_info.call_count, 0) + self.assertEqual(len(self.dialog.threads_list), 1) + + # an error thread and some in-progress ones + self.dialog.threads_list = [mock_thread_not_finished, + mock_thread_not_finished, + mock_thread_finished_error] + self.dialog.clear_threads() + self.assertEqual(mock_warning.call_count, 1) + self.assertEqual(mock_info.call_count, 0) + self.assertEqual(len(self.dialog.threads_list), 2) + + # an error-success thread and some in-progress ones + self.dialog.threads_list = [mock_thread_not_finished, + mock_thread_not_finished, + mock_thread_fin_error_success, + mock_thread_finished_error] + self.dialog.clear_threads() + self.assertEqual(mock_warning.call_count, 1) + self.assertEqual(mock_info.call_count, 1) + self.assertEqual(len(self.dialog.threads_list), 3) + + def test_400_event_domain_added(self): + number_of_vms = self.dialog.table.rowCount() + + self.addCleanup(subprocess.call, ["qvm-remove", "-f", "testvm"]) + + self._run_command_and_process_events( + ["qvm-create", "--label", "red", "testvm"]) + + # a single row was added to the table + self.assertEqual(self.dialog.table.rowCount(), number_of_vms + 1) + + # table contains the correct vms + vms_in_table = self._create_set_of_current_vms() + + vms_in_system = set([vm.name for vm in self.qapp.domains]) + + self.assertEqual(vms_in_table, vms_in_system, "Table not updated " + "correctly after add") + + # check if sorting works + self.dialog.table.sortItems(self.dialog.columns_indices["Name"], + QtCore.Qt.AscendingOrder) + self.__check_sorting("Name") + + # try opening settings for the added vm + for row in range(self.dialog.table.rowCount()): + name = self._get_table_item(row, "Name") + if name.text() == "testvm": + self.dialog.table.setCurrentItem(name) + break + with unittest.mock.patch('qubesmanager.settings.VMSettingsWindow')\ + as mock_settings: + self.dialog.action_settings.trigger() + mock_settings.assert_called_once_with( + self.qapp.domains["testvm"], self.qtapp, "basic") + + def test_401_event_domain_removed(self): + initial_vms = self._create_set_of_current_vms() + + self._run_command_and_process_events( + ["qvm-create", "--label", "red", "testvm"]) + + current_vms = self._create_set_of_current_vms() + self.assertEqual(len(initial_vms) + 1, len(current_vms)) + + self._run_command_and_process_events( + ["qvm-remove", "--force", "testvm"]) + current_vms = self._create_set_of_current_vms() + self.assertEqual(initial_vms, current_vms) + + # check if sorting works + self.dialog.table.sortItems(self.dialog.columns_indices["Name"], + QtCore.Qt.AscendingOrder) + self.__check_sorting("Name") + + def test_403_event_dispvm_added(self): + initial_vms = self._create_set_of_current_vms() + + dispvm_template = None + + for vm in self.qapp.domains: + if getattr(vm, "template_for_dispvms", False): + dispvm_template = vm.name + break + self.assertIsNotNone(dispvm_template, + "Cannot find a template for dispVMs") + + # this requires very long timeout, because it takes time for the + # dispvm to vanish + self._run_command_and_process_events( + ["qvm-run", "--dispvm", dispvm_template, "true"], timeout=60) + + final_vms = self._create_set_of_current_vms() + + self.assertEqual(initial_vms, final_vms, + "Failed handling of a created-and-removed dispvm") + + def test_404_crashing_dispvm(self): + initial_vms = self._create_set_of_current_vms() + + dispvm_template = None + + for vm in self.qapp.domains: + if getattr(vm, "template_for_dispvms", False): + dispvm_template = vm.name + break + + self.assertIsNotNone(dispvm_template, + "Cannot find a template for dispVMs") + + current_memory = getattr(self.qapp.domains[dispvm_template], "memory") + self.addCleanup( + subprocess.call, + ["qvm-prefs", dispvm_template, "memory", str(current_memory)]) + subprocess.check_call( + ["qvm-prefs", dispvm_template, "memory", "600000"]) + + self._run_command_and_process_events( + ["qvm-run", "--dispvm", dispvm_template, "true"], timeout=30) + + final_vms = self._create_set_of_current_vms() + + self.assertEqual(initial_vms, final_vms, + "Failed handling of dispvm that crashed on start") + + def test_405_prop_change_label(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + current_label_path = self._get_table_item(vm_row, "Label").icon_path + + self.addCleanup( + subprocess.call, ["qvm-prefs", target_vm_name, "label", "blue"]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "label", "red"]) + + new_label_path = self._get_table_item(vm_row, "Label").icon_path + + self.assertNotEqual(current_label_path, new_label_path, + "Label path did not change") + self.assertEqual( + new_label_path, + self.qapp.domains[target_vm_name].label.icon, + "Incorrect label") + + def test_406_prop_change_template(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + old_template = self._get_table_item(vm_row, "Template").text() + new_template = None + for vm in self.qapp.domains: + if vm.klass == 'TemplateVM' and vm.name != old_template: + new_template = vm.name + break + + self.addCleanup( + subprocess.call, + ["qvm-prefs", target_vm_name, "template", old_template]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "template", new_template]) + + self.assertNotEqual(old_template, + self._get_table_item(vm_row, "Template").text(), + "Template did not change") + self.assertEqual( + self._get_table_item(vm_row, "Template").text(), + self.qapp.domains[target_vm_name].template.name, + "Incorrect template") + + def test_407_prop_change_netvm(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + old_netvm = self._get_table_item(vm_row, "NetVM").text() + new_netvm = None + for vm in self.qapp.domains: + if getattr(vm, "provides_network", False) and vm.name != old_netvm: + new_netvm = vm.name + break + + self.addCleanup( + subprocess.call, ["qvm-prefs", target_vm_name, "netvm", old_netvm]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "netvm", new_netvm]) + + self.assertNotEqual(old_netvm, + self._get_table_item(vm_row, "NetVM").text(), + "NetVM did not change") + self.assertEqual( + self._get_table_item(vm_row, "NetVM").text(), + self.qapp.domains[target_vm_name].netvm.name, + "Incorrect NetVM") + + @unittest.expectedFailure + def test_408_prop_change_internal(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + self.addCleanup(subprocess.call, + ["qvm-features", "--unset", "work", "interal"]) + self._run_command_and_process_events( + ["qvm-features", "work", "interal", "1"]) + + self.assertEqual( + self._get_table_item(vm_row, "Internal").text(), + "Yes", + "Incorrect value for internal VM") + + self._run_command_and_process_events( + ["qvm-features", "--unset", "work", "interal"]) + + self.assertEqual( + self._get_table_item(vm_row, "Internal").text(), + "", + "Incorrect value for non-internal VM") + + def test_409_prop_change_ip(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + old_ip = self._get_table_item(vm_row, "IP").text() + new_ip = old_ip.replace(".0.", ".5.") + + self.addCleanup( + subprocess.call, ["qvm-prefs", target_vm_name, "ip", old_ip]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "ip", new_ip]) + + self.assertNotEqual(old_ip, + self._get_table_item(vm_row, "IP").text(), + "IP did not change") + self.assertEqual( + self._get_table_item(vm_row, "IP").text(), + self.qapp.domains[target_vm_name].ip, + "Incorrect IP") + + def test_410_prop_change_in_backups(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + old_value = self.qapp.domains[target_vm_name].include_in_backups + new_value = not old_value + + self.addCleanup( + subprocess.call, + ["qvm-prefs", target_vm_name, "include_in_backups", str(old_value)]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "include_in_backups", str(new_value)]) + + self.assertEqual( + self._get_table_item(vm_row, "Internal").text(), + "Yes" if new_value else "", + "Incorrect value for include_in_backups") + + def test_411_prop_change_last_backup(self): + target_vm_name = "work" + target_timestamp = "2015-01-01 17:00:00" + vm_row = self._find_vm_row(target_vm_name) + + old_value = self._get_table_item(vm_row, "Last backup").text() + new_value = datetime.datetime.strptime( + target_timestamp, "%Y-%m-%d %H:%M:%S") + + self.addCleanup( + subprocess.call, + ["qvm-prefs", '-D', target_vm_name, "backup_timestamp"]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "backup_timestamp", + str(int(new_value.timestamp()))]) + + self.assertNotEqual(old_value, + self._get_table_item(vm_row, "Last backup").text(), + "Last backup date did not change") + self.assertEqual( + self._get_table_item(vm_row, "Last backup").text(), + target_timestamp, + "Incorrect Last backup date") + + def test_412_prop_change_defdispvm(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + old_default_dispvm =\ + self._get_table_item(vm_row, "Default DispVM").text() + new_default_dispvm = None + for vm in self.qapp.domains: + if getattr(vm, "template_for_dispvms", False) and vm.name !=\ + old_default_dispvm: + new_default_dispvm = vm.name + break + + self.addCleanup( + subprocess.call, + ["qvm-prefs", target_vm_name, "default_dispvm", old_default_dispvm]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "default_dispvm", new_default_dispvm]) + + self.assertNotEqual( + old_default_dispvm, + self._get_table_item(vm_row, "Default DispVM").text(), + "Default DispVM did not change") + + self.assertEqual( + self._get_table_item(vm_row, "Default DispVM").text(), + self.qapp.domains[target_vm_name].default_dispvm.name, + "Incorrect Default DispVM") + + def test_413_prop_change_templ_disp(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + self.addCleanup( + subprocess.call, + ["qvm-prefs", "--default", target_vm_name, "template_for_dispvms"]) + self._run_command_and_process_events( + ["qvm-prefs", target_vm_name, "template_for_dispvms", "True"]) + + self.assertEqual( + self._get_table_item(vm_row, "Is DVM Template").text(), + "Yes", + "Incorrect value for DVM Template") + + self._run_command_and_process_events( + ["qvm-prefs", "--default", target_vm_name, "template_for_dispvms"]) + + self.assertEqual( + self._get_table_item(vm_row, "Is DVM Template").text(), + "", + "Incorrect value for not DVM Template") + + def test_414_vm_state_change(self): + target_vm_name = "work" + vm_row = self._find_vm_row(target_vm_name) + + self.assertFalse(self.qapp.domains[target_vm_name].is_running()) + + self.addCleanup( + subprocess.call, + ["qvm-shutdown", target_vm_name]) + self._run_command_and_process_events( + ["qvm-start", target_vm_name], timeout=20) + + status_item = self._get_table_item(vm_row, "State") + + displayed_power_state = status_item.on_icon.status + + self.assertEqual(displayed_power_state, 3, + "Power state failed to update on start") + + self._run_command_and_process_events( + ["qvm-shutdown", target_vm_name], timeout=20) + + displayed_power_state = status_item.on_icon.status + + self.assertEqual(displayed_power_state, 0, + "Power state failed to update on shutdown") + + def test_415_template_vm_started(self): + # check whether changing state of a template_vm causes all other + # vms depending on it to check theirs + target_vm_name = None + for vm in self.qapp.domains: + if vm.klass == 'TemplateVM': + for vm2 in self.qapp.domains: + if getattr(vm2, 'template', None) == vm.name: + target_vm_name = vm.name + break + if target_vm_name: + break + + for i in range(self.dialog.table.rowCount()): + self._get_table_item(i, "State").update_vm_state =\ + unittest.mock.Mock() + + self.addCleanup( + subprocess.call, + ["qvm-shutdown", target_vm_name]) + self._run_command_and_process_events( + ["qvm-start", target_vm_name], timeout=20) + + for i in range(self.dialog.table.rowCount()): + call_count = self._get_table_item( + i, "State").update_vm_state.call_count + if self._get_table_item(i, "Template").text() == target_vm_name: + self.assertGreater(call_count, 0) + elif self._get_table_item(i, "Name").text() == target_vm_name: + self.assertGreater(call_count, 0) + else: + self.assertEqual(call_count, 0) + + def test_500_logs(self): + self._select_admin_vm() + + self.assertTrue(self.dialog.logs_menu.isEnabled()) + + dom0_logs = set() + for c in self.dialog.logs_menu.actions(): + dom0_logs.add(c.text()) + self.assertIsNotNone( + c.data(), "Empty log file found: {}".format(c.text())) + self.assertIn("hypervisor", c.text(), + "Log for dom0 does not contain 'hypervisor'") + + selected_vm = self._select_non_admin_vm().name + + self.assertTrue(self.dialog.logs_menu.isEnabled()) + + vm_logs = set() + for c in self.dialog.logs_menu.actions(): + vm_logs.add(c.text()) + self.assertIsNotNone( + c.data(), + "Empty log file found: {}".format(c.text())) + self.assertIn( + selected_vm, + c.text(), + "Log for {} does not contain its name".format(selected_vm)) + + self.assertNotEqual(dom0_logs, vm_logs, + "Same logs found for dom0 and non-adminVM") + + def _find_vm_row(self, vm_name): + for row in range(self.dialog.table.rowCount()): + name = self._get_table_item(row, "Name") + if name.text() == vm_name: + return row + return None + + def _count_visible_table_rows(self): + result = 0 + for i in range(self.dialog.table.rowCount()): + if not self.dialog.table.isRowHidden(i): + result += 1 + return result + + def _run_command_and_process_events(self, command, timeout=5): + """ + helper function to run a given command and process eventsDispatcher + events + :param command: list of strings, containing the command and all its + parameters + :param timeout: default 20 seconds + :return: + """ + asyncio.set_event_loop(self.loop) + + future1 = asyncio.ensure_future(self.dispatcher.listen_for_events()) + future2 = asyncio.create_subprocess_exec(*command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + + (done, pending) = self.loop.run_until_complete( + asyncio.wait({future1, future2}, timeout=timeout)) + + for task in pending: + with contextlib.suppress(asyncio.CancelledError): + task.cancel() + + self.loop.call_soon(self.loop.stop) + self.loop.run_forever() + + def _create_set_of_current_vms(self): + result = set() + for i in range(self.dialog.table.rowCount()): + result.add(self._get_table_item(i, "Name").vm.name) + return result + + def _select_admin_vm(self): for row in range(self.dialog.table.rowCount()): template = self.dialog.table.item( row, self.dialog.columns_indices["Template"]) - if template.text() != 'AdminVM': + if template.text() == 'AdminVM': self.dialog.table.setCurrentItem(template) return template.vm return None + def _select_non_admin_vm(self, running=None): + for row in range(self.dialog.table.rowCount()): + template = self.dialog.table.item( + row, self.dialog.columns_indices["Template"]) + status = self.dialog.table.item( + row, self.dialog.columns_indices["State"]) + if template.text() != 'AdminVM' and \ + (running is None + or (running and status.on_icon.status == 3) + or (not running and status.on_icon.status != 3)): + self.dialog.table.setCurrentItem(template) + return template.vm + return None + + def _select_templatevm(self, running=None): + for row in range(self.dialog.table.rowCount()): + template = self.dialog.table.item( + row, self.dialog.columns_indices["Template"]) + status = self.dialog.table.item( + row, self.dialog.columns_indices["State"]) + if template.text() == 'TemplateVM' and \ + (running is None + or (running and status.on_icon.status == 3) + or (not running and status.on_icon.status != 3)): + self.dialog.table.setCurrentItem(template) + return template.vm + return None + + def __check_sorting(self, column_name): + last_text = None + last_vm = None + for row in range(self.dialog.table.rowCount()): + + vm = self._get_table_item(row, "Name").vm.name + text = self._get_table_item(row, column_name).text().lower() + + if row == 0: + self.assertEqual(vm, "dom0", "dom0 is not sorted first") + elif last_text is None: + last_text = text + last_vm = vm + else: + if last_text == text: + self.assertGreater( + vm, last_vm, + "Incorrect sorting for {}".format(column_name)) + else: + self.assertGreater( + text, last_text, + "Incorrect sorting for {}".format(column_name)) + last_text = text + last_vm = vm + + def _get_table_item(self, row, column_name): + value = self.dialog.table.cellWidget( + row, self.dialog.columns_indices[column_name]) + if not value: + value = self.dialog.table.item( + row, self.dialog.columns_indices[column_name]) + + return value + + +class QubeManagerThreadTest(unittest.TestCase): + def test_01_startvm_thread(self): + vm = unittest.mock.Mock(spec=['start']) + + thread = qube_manager.StartVMThread(vm) + thread.run() + + vm.start.assert_called_once_with() + + def test_02_startvm_thread_error(self): + vm = unittest.mock.Mock( + spec=['start'], + **{'start.side_effect': exc.QubesException('Error')}) + + thread = qube_manager.StartVMThread(vm) + thread.run() + + self.assertIsNotNone(thread.msg) + + def test_10_run_command_thread(self): + vm = unittest.mock.Mock(spec=['run']) + + thread = qube_manager.RunCommandThread(vm, "test_command") + thread.run() + + vm.run.assert_called_once_with("test_command") + + def test_11_run_command_thread_error(self): + vm = unittest.mock.Mock(spec=['run'], + **{'run.side_effect': ChildProcessError}) + + thread = qube_manager.RunCommandThread(vm, "test_command") + thread.run() + + self.assertIsNotNone(thread.msg) + + @unittest.mock.patch('subprocess.check_call') + def test_20_update_vm_thread_dom0(self, check_call): + vm = unittest.mock.Mock(spec=['qid']) + vm.qid = 0 + thread = qube_manager.UpdateVMThread(vm) + thread.run() + + check_call.assert_called_once_with( + ["/usr/bin/qubes-dom0-update", "--clean", "--gui"]) + + @unittest.mock.patch('builtins.open') + @unittest.mock.patch('subprocess.call') + def test_21_update_vm_thread_running(self, mock_call, mock_open): + vm = unittest.mock.Mock( + spec=['qid', 'is_running', 'run_service_for_stdio', 'run_service'], + **{'is_running.return_value': True}) + + vm.qid = 1 + vm.run_service_for_stdio.return_value = (b'changed=no\n', None) + + thread = qube_manager.UpdateVMThread(vm) + + thread.run() + + mock_open.assert_called_with( + '/usr/libexec/qubes-manager/dsa-4371-update', 'rb') + + vm.run_service_for_stdio.assert_called_once_with( + "qubes.VMShell", user='root', input=unittest.mock.ANY) + + vm.run_service.assert_called_once_with( + "qubes.InstallUpdatesGUI", user="root", wait=False) + + self.assertEqual(mock_call.call_count, 0) + + @unittest.mock.patch('builtins.open') + @unittest.mock.patch('subprocess.call') + def test_22_update_vm_thread_not_running(self, mock_call, mock_open): + vm = unittest.mock.Mock( + spec=['qid', 'is_running', 'run_service_for_stdio', + 'run_service', 'start', 'name'], + **{'is_running.return_value': False}) + + vm.qid = 1 + vm.run_service_for_stdio.return_value = (b'changed=yes\n', None) + + thread = qube_manager.UpdateVMThread(vm) + thread.run() + + mock_open.assert_called_with( + '/usr/libexec/qubes-manager/dsa-4371-update', 'rb') + + vm.start.assert_called_once_with() + + vm.run_service_for_stdio.assert_called_once_with( + "qubes.VMShell", user='root', input=unittest.mock.ANY) + + vm.run_service.assert_called_once_with( + "qubes.InstallUpdatesGUI", user="root", wait=False) + + self.assertEqual(mock_call.call_count, 1) + + @unittest.mock.patch('builtins.open') + @unittest.mock.patch('subprocess.check_call') + def test_23_update_vm_thread_error(self, *_args): + vm = unittest.mock.Mock( + spec=['qid', 'is_running'], + **{'is_running.side_effect': ChildProcessError}) + + vm.qid = 1 + + thread = qube_manager.UpdateVMThread(vm) + thread.run() + + self.assertIsNotNone(thread.msg) + + +class VMShutdownMonitorTest(unittest.TestCase): + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.question') + @unittest.mock.patch('PyQt4.QtCore.QTimer') + def test_01_vm_shutdown_correct(self, mock_timer, mock_question): + mock_vm = unittest.mock.Mock() + mock_vm.is_running.return_value = False + + monitor = qube_manager.VmShutdownMonitor(mock_vm) + monitor.restart_vm_if_needed = unittest.mock.Mock() + + monitor.check_if_vm_has_shutdown() + + self.assertEqual(mock_question.call_count, 0) + self.assertEqual(mock_timer.call_count, 0) + monitor.restart_vm_if_needed.assert_called_once_with() + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.question', + return_value=1) + @unittest.mock.patch('PyQt4.QtCore.QTimer.singleShot') + def test_02_vm_not_shutdown_wait(self, mock_timer, mock_question): + mock_vm = unittest.mock.Mock() + mock_vm.is_running.return_value = True + mock_vm.start_time = datetime.datetime.now().timestamp() - 3000 + + monitor = qube_manager.VmShutdownMonitor(mock_vm, shutdown_time=1) + time.sleep(3) + + monitor.check_if_vm_has_shutdown() + + self.assertEqual(mock_question.call_count, 1) + self.assertEqual(mock_timer.call_count, 1) + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.question', + return_value=0) + @unittest.mock.patch('PyQt4.QtCore.QTimer.singleShot') + def test_03_vm_kill(self, mock_timer, mock_question): + mock_vm = unittest.mock.Mock() + mock_vm.is_running.return_value = True + mock_vm.start_time = datetime.datetime.now().timestamp() - 3000 + + monitor = qube_manager.VmShutdownMonitor(mock_vm, shutdown_time=1) + time.sleep(3) + monitor.restart_vm_if_needed = unittest.mock.Mock() + + monitor.check_if_vm_has_shutdown() + + self.assertEqual(mock_question.call_count, 1) + self.assertEqual(mock_timer.call_count, 0) + mock_vm.kill.assert_called_once_with() + monitor.restart_vm_if_needed.assert_called_once_with() + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.question', + return_value=0) + @unittest.mock.patch('PyQt4.QtCore.QTimer.singleShot') + def test_04_check_later(self, mock_timer, mock_question): + mock_vm = unittest.mock.Mock() + mock_vm.is_running.return_value = True + mock_vm.start_time = datetime.datetime.now().timestamp() - 3000 + + monitor = qube_manager.VmShutdownMonitor(mock_vm, shutdown_time=3000) + time.sleep(1) + + monitor.check_if_vm_has_shutdown() + + self.assertEqual(mock_question.call_count, 0) + self.assertEqual(mock_timer.call_count, 1) + + if __name__ == "__main__": ha_syslog = logging.handlers.SysLogHandler('/dev/log') ha_syslog.setFormatter( diff --git a/qubesmanager/tests/test_vm_settings.py b/qubesmanager/tests/test_vm_settings.py new file mode 100644 index 0000000..3a15c5b --- /dev/null +++ b/qubesmanager/tests/test_vm_settings.py @@ -0,0 +1,605 @@ +#!/usr/bin/python3 +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2016 Marta Marczykowska-Górecka +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import logging.handlers +import unittest +import unittest.mock + +import gc +import quamash +import asyncio + +from PyQt4 import QtGui, QtTest, QtCore +from qubesadmin import Qubes +import qubesmanager.settings as vm_settings + + +class VMSettingsTest(unittest.TestCase): + def setUp(self): + super(VMSettingsTest, self).setUp() + + self.mock_qprogress = unittest.mock.patch('PyQt4.QtGui.QProgressDialog') + self.mock_qprogress.start() + + self.addCleanup(self.mock_qprogress.stop) + + self.qapp = Qubes() + self.qtapp = QtGui.QApplication(["test", "-style", "cleanlooks"]) + self.loop = quamash.QEventLoop(self.qtapp) + + def tearDown(self): + del self.qapp.domains["testvm"] + + # process any pending events before destroying the object + self.qtapp.processEvents() + + # queue destroying the QApplication object, do that for any other QT + # related objects here too + self.dialog.deleteLater() + self.qtapp.deleteLater() + + # process any pending events (other than just queued destroy), + # just in case + self.qtapp.processEvents() + self.qtapp.processEvents() + self.qtapp.processEvents() + + # execute main loop, which will process all events, _ + # including just queued destroy_ + self.loop.run_until_complete(asyncio.sleep(0)) + + # at this point it QT objects are destroyed, cleanup all remaining + # references; + # del other QT object here too + self.loop.close() + del self.dialog + del self.qtapp + del self.loop + gc.collect() + super(VMSettingsTest, self).tearDown() + + def test_00_load_correct_tab(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "red") + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + self.assertTrue( + self.dialog.tabWidget.currentWidget() is self.dialog.basic_tab) + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + self.assertTrue( + self.dialog.tabWidget.currentWidget() is self.dialog.advanced_tab) + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "firewall") + self.assertTrue( + self.dialog.tabWidget.currentWidget() is self.dialog.firewall_tab) + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "devices") + self.assertTrue( + self.dialog.tabWidget.currentWidget() is self.dialog.devices_tab) + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "applications") + self.assertTrue( + self.dialog.tabWidget.currentWidget() is self.dialog.apps_tab) + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "services") + self.assertTrue( + self.dialog.tabWidget.currentWidget() is self.dialog.services_tab) + self.dialog.deleteLater() + + def test_01_basic_tab_default(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + # set the vm to have a default template and netvm + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.assertEqual(self.dialog.vmname.text(), "testvm", + "Name displayed incorrectly") + + self.assertTrue("blue" in self.dialog.vmlabel.currentText(), + "Incorrect label displayed") + + displayed_template = self.dialog.template_name.currentText() + correct_template = self.vm.template.name + + self.assertTrue("current" in displayed_template, + "Template incorrectly not shown as current") + self.assertTrue(correct_template in displayed_template, + "Template not displayed correctly") + + displayed_netvm = self.dialog.netVM.currentText() + correct_netvm = self.vm.netvm.name + self.assertTrue("current" in displayed_netvm, + "NetVM incorrectly not shown as current") + self.assertTrue(correct_netvm in displayed_netvm, + "NetVM not displayed correctly") + + self.assertEqual(self.dialog.include_in_backups.isChecked(), + self.vm.include_in_backups, + "Incorrect 'include in backups' state") + + self.assertEqual(self.dialog.run_in_debug_mode.isChecked(), + self.vm.debug, + "Incorrect 'run in debug mode' state") + + self.assertEqual(self.dialog.autostart_vm.isChecked(), + self.vm.autostart, + "Incorrect 'autostart' state") + + self.assertEqual(self.dialog.type_label.text(), + self.vm.klass, + "Incorrect class displayed") + + self.assertEqual(self.dialog.ip_label.text(), + self.vm.ip, + "Incorrect IP displayed") + self.assertEqual(self.dialog.netmask_label.text(), + self.vm.visible_netmask, + "Incorrect netmask displayed") + self.assertEqual(self.dialog.gateway_label.text(), + self.vm.visible_gateway, + "Incorrect gateway displayed") + + self.assertEqual(self.dialog.max_priv_storage.value(), + self.vm.volumes['private'].size // 1024 ** 2, + "Incorrect max private storage size") + self.assertEqual(self.dialog.root_resize.value(), + self.vm.volumes['root'].size // 1024 ** 2, + "Incorrect max private root size") + + def test_02_basic_tab_nones(self): + self.vm = self.qapp.add_new_vm("StandaloneVM", "testvm", "blue") + # set the vm to have a default template and netvm + self.vm.netvm = None + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.assertEqual("", self.dialog.template_name.currentText(), + "No template incorrectly displayed") + + displayed_netvm = self.dialog.netVM.currentText() + self.assertTrue("current" in displayed_netvm, + "None NetVM incorrectly not shown as current") + self.assertTrue("none" in displayed_netvm, + "None NetVM not displayed correctly") + + self.assertEqual(self.dialog.type_label.text(), "StandaloneVM", + "Type displayed incorrectly for standaloneVM") + + self.assertEqual(self.dialog.ip_label.text(), + "---", + "Incorrect IP displayed") + self.assertEqual(self.dialog.netmask_label.text(), + "---", + "Incorrect netmask displayed") + self.assertEqual(self.dialog.gateway_label.text(), + "---", + "Incorrect gateway displayed") + + @unittest.expectedFailure + def test_03_change_label(self): + # this test fails due to error where we check whether label is visible + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + new_label = self._set_noncurrent(self.dialog.vmlabel) + self._click_ok() + + self.assertEqual(str(self.vm.label), new_label, + "Label is not set correctly") + + def test_04_change_template(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + new_template = self._set_noncurrent(self.dialog.template_name) + self._click_ok() + + self.assertEqual(self.vm.template.name, new_template, + "Template is not set correctly") + + def test_05_change_networking(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + new_netvm = self._set_noncurrent(self.dialog.netVM) + self._click_ok() + + self.assertEqual(self.vm.netvm.name, new_netvm, + "NetVM is not set correctly") + + def test_06_change_networking_none(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self._set_none(self.dialog.netVM) + self._click_ok() + + self.assertIsNone(self.vm.netvm, + "None netVM is not set correctly") + + def test_07_change_networking_to_default(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + + for vm in self.qapp.domains: + if getattr(vm, 'provides_network', False)\ + and vm != self.qapp.default_netvm: + self.vm.netvm = vm + break + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + new_netvm = self._set_default(self.dialog.netVM) + self._click_ok() + + self.assertTrue(self.vm.netvm.name in new_netvm, + "NetVM is not set correctly") + self.assertTrue(self.vm.property_is_default('netvm')) + + @unittest.expectedFailure + def test_08_basic_checkboxes_true(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.dialog.include_in_backups.setChecked(True) + self.dialog.autostart_vm.setChecked(True) + self.dialog.run_in_debug_mode.setChecked(True) + + self._click_ok() + + self.assertTrue(self.vm.include_in_backups, + "Include in backups not set to true") + self.assertTrue(self.vm.autostart, + "Autostart not set to true") + self.assertTrue(self.vm.debug, + "Debug mode not set to true") + + @unittest.expectedFailure + def test_09_basic_checkboxes_false(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.dialog.include_in_backups.setChecked(False) + self.dialog.autostart_vm.setChecked(False) + self.dialog.run_in_debug_mode.setChecked(False) + + self._click_ok() + + self.assertFalse(self.vm.include_in_backups, + "Include in backups not set to false") + self.assertFalse(self.vm.autostart, + "Autostart not set to false") + self.assertFalse(self.vm.debug, + "Debug mode not set to false") + + def test_10_increase_private_storage(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + current_storage = self.vm.volumes['private'].size // 1024**2 + new_storage = current_storage + 512 + + self.dialog.max_priv_storage.setValue(new_storage) + self._click_ok() + + self.assertEqual(self.vm.volumes['private'].size // 1024**2, + new_storage) + + # TODO are dependencies correctly processed + + @unittest.mock.patch('PyQt4.QtGui.QProgressDialog') + @unittest.mock.patch('PyQt4.QtGui.QInputDialog.getText') + @unittest.mock.patch('qubesmanager.settings.RenameVMThread') + def test_11_rename_vm(self, mock_thread, mock_input, _): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.assertTrue(self.dialog.rename_vm_button.isEnabled()) + + mock_input.return_value = ("testvm2", True) + self.dialog.rename_vm_button.click() + + mock_thread.assert_called_with(self.vm, "testvm2", unittest.mock.ANY) + mock_thread().start.assert_called_with() + +# TODO: thread tests for rename + + @unittest.mock.patch('PyQt4.QtGui.QProgressDialog') + @unittest.mock.patch('PyQt4.QtGui.QInputDialog.getText') + @unittest.mock.patch('qubesmanager.common_threads.CloneVMThread') + def test_12_clone_vm(self, mock_thread, mock_input, _): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.assertTrue(self.dialog.clone_vm_button.isEnabled()) + + mock_input.return_value = ("testvm2", True) + self.dialog.clone_vm_button.click() + + mock_thread.assert_called_with(self.vm, "testvm2") + mock_thread().start.assert_called_with() + + @unittest.mock.patch('PyQt4.QtGui.QMessageBox.warning') + @unittest.mock.patch('PyQt4.QtGui.QProgressDialog') + @unittest.mock.patch('PyQt4.QtGui.QInputDialog.getText') + @unittest.mock.patch('qubesmanager.common_threads.RemoveVMThread') + def test_13_remove_vm(self, mock_thread, mock_input, _, mock_warning): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "basic") + + self.assertTrue(self.dialog.delete_vm_button.isEnabled()) + + # try with a wrong name + mock_input.return_value = ("testvm2", True) + self.dialog.delete_vm_button.click() + self.assertEqual(mock_warning.call_count, 1) + + # and now correct one + mock_input.return_value = ("testvm", True) + self.dialog.delete_vm_button.click() + + mock_thread.assert_called_with(self.vm) + mock_thread().start.assert_called_with() + +# Advanced Tab + def test_20_advanced_loads(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + self.assertEqual(self.dialog.init_mem.value(), self.vm.memory, + "Incorrect initial memory") + # default maxmem + self.assertEqual(self.dialog.max_mem_size.value(), + self.vm.property_get_default('maxmem'), + "Maxmem incorrectly displayed for default value") + self.assertEqual(self.dialog.vcpus.value(), self.vm.vcpus, + "Incorrect number of VCPUs") + self.assertTrue(self.dialog.include_in_balancing.isChecked(), + "Include in memory balancing incorrectly not checked") + + # kernel + self.assertTrue(self.vm.kernel in self.dialog.kernel.currentText(), + "Kernel displayed incorrectly") + + # default dispvm + self.assertTrue( + str(self.vm.default_dispvm) in + self.dialog.default_dispvm.currentText(), + "Default dispVM incorrectly displayed") + self.assertEqual(self.vm.template_for_dispvms, + self.dialog.dvm_template_checkbox.isChecked(), + "Incorrectly shown to be template for dispvms") + + # virtmode + self.assertTrue("default" in self.dialog.virt_mode.currentText()) + self.assertTrue("PVH" in self.dialog.virt_mode.currentText()) + + def test_21_nondefaultmaxmem(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.vm.maxmem = 5000 + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + self.assertEqual(self.dialog.max_mem_size.value(), 5000) + + self.dialog.include_in_balancing.setChecked(False) + self._click_ok() + + self.assertEqual(self.vm.maxmem, 0) + + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + self.assertFalse(self.dialog.include_in_balancing.isChecked()) + + self.dialog.include_in_balancing.setChecked(True) + self.assertEqual(self.dialog.max_mem_size.value(), 5000) + self._click_ok() + + self.assertEqual(self.vm.maxmem, 5000) + + def test_22_initmem(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.vm.memory = 500 + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + self.assertEqual(self.dialog.init_mem.value(), 500, + "Incorrect initial memory") + self.dialog.init_mem.setValue(600) + self._click_ok() + + self.assertEqual(self.vm.memory, 600, "Setting initial memory failed") + + def test_23_vcpus(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.vm.vcpus = 1 + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + self.assertEqual(self.dialog.vcpus.value(), 1, + "Incorrect number of VCPUs") + + self.dialog.vcpus.setValue(2) + self._click_ok() + + self.assertEqual(self.vm.vcpus, 2, + "Incorrect number of VCPUs") + + def test_24_kernel(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + new_kernel = self._set_noncurrent(self.dialog.kernel) + self._click_ok() + + self.assertEqual(self.vm.kernel, new_kernel) + + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + self._set_default(self.dialog.kernel) + + self._click_ok() + self.assertTrue(self.vm.property_is_default('kernel')) + + def test_25_virtmode_change(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + + modes = ["HVM", "PVH", "PV"] + + for mode in modes: + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + self._set_value(self.dialog.virt_mode, mode) + self._click_ok() + + self.assertEqual(self.vm.virt_mode.upper(), mode) + + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + self._set_default(self.dialog.virt_mode) + self._click_ok() + + self.assertTrue(self.vm.property_is_default('virt_mode')) + + def test_26_default_dispvm(self): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + new_dvm = self._set_noncurrent(self.dialog.default_dispvm) + self._click_ok() + + self.assertEqual(self.vm.default_dispvm.name, new_dvm) + + self.dialog.deleteLater() + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + self._set_default(self.dialog.default_dispvm) + self._click_ok() + + self.assertTrue(self.vm.property_is_default('default_dispvm')) + + @unittest.mock.patch('subprocess.check_call') + def test_27_boot_cdrom(self, mock_call): + self.vm = self.qapp.add_new_vm("AppVM", "testvm", "blue") + + self.dialog = vm_settings.VMSettingsWindow( + self.vm, self.qtapp, "advanced") + + self.dialog.boot_from_device_button.click() + mock_call.assert_called_with(['qubes-vm-boot-from-device', "testvm"]) + + def _click_ok(self): + okwidget = self.dialog.buttonBox.button( + self.dialog.buttonBox.Ok) + + QtTest.QTest.mouseClick(okwidget, QtCore.Qt.LeftButton) + + def _click_cancel(self): + cancelwidget = self.dialog.buttonBox.button( + self.dialog.buttonBox.Cancel) + + QtTest.QTest.mouseClick(cancelwidget, QtCore.Qt.LeftButton) + + def _set_noncurrent(self, widget): + if widget.count() < 2: + self.skipTest("not enough choices for " + widget.objectName()) + + widget.setCurrentIndex(0) + while widget.currentText().endswith("(current)") \ + or widget.currentText().startswith("(none)"): + widget.setCurrentIndex(widget.currentIndex() + 1) + + return widget.currentText() + + def _set_default(self, widget): + if widget.count() < 2: + self.skipTest("not enough choices for " + widget.objectName()) + + widget.setCurrentIndex(0) + while "default" not in widget.currentText(): + widget.setCurrentIndex(widget.currentIndex() + 1) + + return widget.currentText() + + def _set_none(self, widget): + if widget.count() < 2: + self.skipTest("not enough choices for " + widget.objectName()) + + widget.setCurrentIndex(0) + while "none" not in widget.currentText(): + widget.setCurrentIndex(widget.currentIndex() + 1) + + return widget.currentText() + + def _set_value(self, widget, value): + if widget.count() < 2: + self.skipTest("not enough choices for " + widget.objectName()) + + widget.setCurrentIndex(0) + while value != widget.currentText(): + widget.setCurrentIndex(widget.currentIndex() + 1) + + return widget.currentText() + + +if __name__ == "__main__": + ha_syslog = logging.handlers.SysLogHandler('/dev/log') + ha_syslog.setFormatter( + logging.Formatter('%(name)s[%(process)d]: %(message)s')) + logging.root.addHandler(ha_syslog) + unittest.main() diff --git a/rpm_spec/qmgr.spec.in b/rpm_spec/qmgr.spec.in index 65adced..49a3d59 100644 --- a/rpm_spec/qmgr.spec.in +++ b/rpm_spec/qmgr.spec.in @@ -123,9 +123,12 @@ rm -rf $RPM_BUILD_ROOT %{python3_sitelib}/qubesmanager/tests/__pycache__ %{python3_sitelib}/qubesmanager/tests/__init__.py -%{python3_sitelib}/qubesmanager/tests/test_backup_01.py +%{python3_sitelib}/qubesmanager/tests/test_backup.py +%{python3_sitelib}/qubesmanager/tests/test_backup_utils.py %{python3_sitelib}/qubesmanager/tests/test_global_settings.py %{python3_sitelib}/qubesmanager/tests/test_qube_manager.py +%{python3_sitelib}/qubesmanager/tests/test_create_new_vm.py +%{python3_sitelib}/qubesmanager/tests/test_vm_settings.py %dir %{python3_sitelib}/qubesmanager-*.egg-info %{python3_sitelib}/qubesmanager-*.egg-info/*