|
@@ -21,6 +21,7 @@
|
|
|
#
|
|
|
import asyncio
|
|
|
import contextlib
|
|
|
+import functools
|
|
|
import logging.handlers
|
|
|
import unittest
|
|
|
import unittest.mock
|
|
@@ -41,6 +42,24 @@ from qubesmanager.tests import init_qtapp
|
|
|
icon_size = qube_manager.icon_size
|
|
|
|
|
|
|
|
|
+def listen_for_events(func):
|
|
|
+ """Wrapper for a test that needs events listener to be registered all the time.
|
|
|
+ Note the test still needs to yield to the event loop to actually handle events.
|
|
|
+ """
|
|
|
+ @functools.wraps(func)
|
|
|
+ def wrapper(self, *args, **kwargs):
|
|
|
+ events_listener = \
|
|
|
+ asyncio.ensure_future(self.dispatcher.listen_for_events())
|
|
|
+ # let it connect (run until first yield/await)
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+ try:
|
|
|
+ return func(self, *args, **kwargs)
|
|
|
+ finally:
|
|
|
+ events_listener.cancel()
|
|
|
+ self.loop.call_soon(self.loop.stop)
|
|
|
+ self.loop.run_forever()
|
|
|
+ return wrapper
|
|
|
+
|
|
|
class QubeManagerTest(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
super(QubeManagerTest, self).setUp()
|
|
@@ -156,10 +175,8 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
for row in range(self.dialog.table.model().rowCount()):
|
|
|
vm = self._get_table_vm(row)
|
|
|
|
|
|
- incl_backups_item = self._get_table_item(row, "Backup",
|
|
|
- Qt.CheckStateRole)
|
|
|
+ incl_backups_item = self._get_table_item(row, "Backup", Qt.CheckStateRole) == Qt.Checked
|
|
|
incl_backups_value = getattr(vm, 'include_in_backups', False)
|
|
|
- incl_backups_value = Qt.Checked if incl_backups_value else Qt.Unchecked
|
|
|
|
|
|
self.assertEqual(
|
|
|
incl_backups_value, incl_backups_item,
|
|
@@ -187,7 +204,7 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
def_dispvm_item = self._get_table_item(row, "Default DispVM")
|
|
|
if vm.property_is_default("default_dispvm"):
|
|
|
def_dispvm_value = "default ({})".format(
|
|
|
- self.qapp.default_dispvm)
|
|
|
+ vm.property_get_default("default_dispvm"))
|
|
|
else:
|
|
|
def_dispvm_value = getattr(vm, "default_dispvm", None)
|
|
|
|
|
@@ -314,6 +331,8 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
def test_204_vm_keyboard(self, mock_message):
|
|
|
selected_vm = self._select_non_admin_vm(running=True)
|
|
|
self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
|
|
|
+ if 'supported-feature.keyboard-layout' not in selected_vm.features:
|
|
|
+ self.skipTest("VM {!s} does not support new layout change".format(selected_vm))
|
|
|
widget = self.dialog.toolbar.widgetForAction(
|
|
|
self.dialog.action_set_keyboard_layout)
|
|
|
with unittest.mock.patch.object(selected_vm, 'run') as mock_run:
|
|
@@ -334,9 +353,6 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
QtCore.Qt.LeftButton)
|
|
|
self.assertEqual(mock_run.call_count, 0,
|
|
|
"Keyboard change called on a halted VM")
|
|
|
- self.assertEqual(mock_message.call_count, 0,
|
|
|
- "Keyboard change called on a halted VM with"
|
|
|
- " obsolete keyboard-layout handling")
|
|
|
|
|
|
def test_206_dom0_keyboard(self):
|
|
|
self._select_admin_vm()
|
|
@@ -715,6 +731,241 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
self.assertEqual(expected_number, actual_number,
|
|
|
"Incorrect number of vms shown for cleared search box")
|
|
|
|
|
|
+ @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
|
|
|
+ @listen_for_events
|
|
|
+ def test_240_network_menu_single(self, mock_question):
|
|
|
+ mock_question.return_value = QtWidgets.QMessageBox.Yes
|
|
|
+ target_vm_name = 'work'
|
|
|
+
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', '-D', target_vm_name, 'netvm'], timeout=20)
|
|
|
+ self._select_vms(['work'])
|
|
|
+ selected_vm = self.qapp.domains[target_vm_name]
|
|
|
+ # reset to default even in case of failure
|
|
|
+ self.addCleanup(functools.partial(delattr, selected_vm, 'netvm'))
|
|
|
+
|
|
|
+ # this is the method to get '==' operator working on icons...
|
|
|
+ on_icon = QIcon(":/on.png").pixmap(64).toImage()
|
|
|
+ off_icon = QIcon().pixmap(64).toImage()
|
|
|
+ for action in self.dialog.network_menu.actions():
|
|
|
+ if action.text().startswith('default '):
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), on_icon)
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.fail('default netvm not found')
|
|
|
+
|
|
|
+ # change to specific value
|
|
|
+ for action in self.dialog.network_menu.actions():
|
|
|
+ if action.text() == 'sys-net':
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+ action.trigger()
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.fail('sys-net netvm not found')
|
|
|
+ # process events
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+ mock_question.assert_called()
|
|
|
+ self.assertEqual(str(selected_vm.netvm), 'sys-net')
|
|
|
+ mock_question.reset_mock()
|
|
|
+
|
|
|
+ # change to none
|
|
|
+ for action in self.dialog.network_menu.actions():
|
|
|
+ if action.text() == 'None':
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+ action.trigger()
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.fail('"none" netvm not found')
|
|
|
+ # process events
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+ mock_question.assert_called()
|
|
|
+ self.assertIsNone(selected_vm.netvm)
|
|
|
+ mock_question.reset_mock()
|
|
|
+
|
|
|
+ # then go back to the default
|
|
|
+ for action in self.dialog.network_menu.actions():
|
|
|
+ if action.text().startswith('default '):
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+ action.trigger()
|
|
|
+ break
|
|
|
+ # process events
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+
|
|
|
+ mock_question.assert_called()
|
|
|
+ self.assertTrue(selected_vm.property_is_default('netvm'))
|
|
|
+
|
|
|
+ @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
|
|
|
+ @listen_for_events
|
|
|
+ def test_241_network_menu_multiple(self, mock_question):
|
|
|
+ mock_question.return_value = QtWidgets.QMessageBox.Yes
|
|
|
+ target_vm_names = ['work', 'personal', 'vault']
|
|
|
+ work = self.qapp.domains['work']
|
|
|
+ personal = self.qapp.domains['personal']
|
|
|
+ vault = self.qapp.domains['vault']
|
|
|
+ # reset to default even in case of failure
|
|
|
+ self.addCleanup(functools.partial(delattr, work, 'netvm'))
|
|
|
+ self.addCleanup(functools.partial(delattr, personal, 'netvm'))
|
|
|
+ self.addCleanup(functools.partial(setattr, vault, 'netvm', None))
|
|
|
+
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', '-D', 'work', 'netvm'], timeout=5)
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', '-D', 'personal', 'netvm'], timeout=5)
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', 'vault', ''], timeout=5)
|
|
|
+ self._select_vms(target_vm_names)
|
|
|
+
|
|
|
+ # this is the method to get '==' operator working on icons...
|
|
|
+ on_icon = QIcon(":/on.png").pixmap(64).toImage()
|
|
|
+ transient_icon = QIcon(":/transient.png").pixmap(64).toImage()
|
|
|
+ off_icon = QIcon().pixmap(64).toImage()
|
|
|
+ for action in self.dialog.network_menu.actions():
|
|
|
+ if action.text().startswith('default '):
|
|
|
+ # work, personal
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
|
|
|
+ elif action.text() == 'None':
|
|
|
+ # vault
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
|
|
|
+ else:
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+
|
|
|
+ # change to specific value
|
|
|
+ for action in self.dialog.network_menu.actions():
|
|
|
+ if action.text() == 'sys-net':
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+ action.trigger()
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.fail('sys-net netvm not found')
|
|
|
+ # process events
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+ mock_question.assert_called()
|
|
|
+ self.assertEqual(str(work.netvm), 'sys-net')
|
|
|
+ self.assertEqual(str(personal.netvm), 'sys-net')
|
|
|
+ self.assertEqual(str(vault.netvm), 'sys-net')
|
|
|
+ mock_question.reset_mock()
|
|
|
+
|
|
|
+ @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
|
|
|
+ @listen_for_events
|
|
|
+ def test_250_template_menu_single(self, mock_question):
|
|
|
+ mock_question.return_value = QtWidgets.QMessageBox.Yes
|
|
|
+ target_vm_name = 'work'
|
|
|
+ selected_vm = self.qapp.domains[target_vm_name]
|
|
|
+ if selected_vm.is_running():
|
|
|
+ self.skipTest(
|
|
|
+ 'VM {!s} is running, please stop it first'.format(selected_vm))
|
|
|
+ current_template = selected_vm.template
|
|
|
+ new_template = self._select_templatevm(
|
|
|
+ different_than=[str(current_template)])
|
|
|
+
|
|
|
+ self._select_vms(['work'])
|
|
|
+
|
|
|
+ # reset to previous value even in case of failure
|
|
|
+ self.addCleanup(functools.partial(
|
|
|
+ setattr, selected_vm, 'template', str(current_template)))
|
|
|
+
|
|
|
+ # this is the method to get '==' operator working on icons...
|
|
|
+ on_icon = QIcon(":/on.png").pixmap(64).toImage()
|
|
|
+ off_icon = QIcon().pixmap(64).toImage()
|
|
|
+ found = False
|
|
|
+ for action in self.dialog.template_menu.actions():
|
|
|
+ if action.text() == str(current_template):
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), on_icon)
|
|
|
+ found = True
|
|
|
+ else:
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+
|
|
|
+ if not found:
|
|
|
+ self.fail(
|
|
|
+ 'current template value ({!s}) not found in the menu'.format(
|
|
|
+ current_template))
|
|
|
+
|
|
|
+ # change to specific value
|
|
|
+ for action in self.dialog.template_menu.actions():
|
|
|
+ if action.text() == str(new_template):
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+ action.trigger()
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.fail('template {!s} not found in the menu'.format(new_template))
|
|
|
+ # process events
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+ mock_question.assert_called()
|
|
|
+ # compare str(), to have better error message on mismatch
|
|
|
+ self.assertEqual(str(selected_vm.template), str(new_template))
|
|
|
+ mock_question.reset_mock()
|
|
|
+
|
|
|
+ @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
|
|
|
+ @listen_for_events
|
|
|
+ def test_251_template_menu_multiple(self, mock_question):
|
|
|
+ mock_question.return_value = QtWidgets.QMessageBox.Yes
|
|
|
+ target_vm_names = ['work', 'personal', 'untrusted']
|
|
|
+ work = self.qapp.domains['work']
|
|
|
+ personal = self.qapp.domains['personal']
|
|
|
+ untrusted = self.qapp.domains['untrusted']
|
|
|
+ if any(vm.is_running() for vm in [work, personal, untrusted]):
|
|
|
+ self.skipTest('Any of work, personal, untrusted VM is running')
|
|
|
+
|
|
|
+ old_template = work.template
|
|
|
+ new_template = self._select_templatevm(
|
|
|
+ different_than=[str(work.template),
|
|
|
+ str(personal.template),
|
|
|
+ str(untrusted.template)])
|
|
|
+ # reset to previous value even in case of failure
|
|
|
+ self.addCleanup(functools.partial(
|
|
|
+ setattr, work, 'template', str(work.template)))
|
|
|
+ self.addCleanup(functools.partial(
|
|
|
+ setattr, personal, 'template', str(personal.template)))
|
|
|
+ self.addCleanup(functools.partial(
|
|
|
+ setattr, untrusted, 'template', str(untrusted.template)))
|
|
|
+
|
|
|
+ # set all to the same value
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', 'personal', 'template', str(work.template)], timeout=5)
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', 'untrusted', 'template', str(work.template)], timeout=5)
|
|
|
+
|
|
|
+ self._select_vms(target_vm_names)
|
|
|
+
|
|
|
+ # this is the method to get '==' operator working on icons...
|
|
|
+ on_icon = QIcon(":/on.png").pixmap(64).toImage()
|
|
|
+ transient_icon = QIcon(":/transient.png").pixmap(64).toImage()
|
|
|
+ off_icon = QIcon().pixmap(64).toImage()
|
|
|
+ for action in self.dialog.template_menu.actions():
|
|
|
+ if action.text() == str(old_template):
|
|
|
+ self.assertIn(
|
|
|
+ action.icon().pixmap(64).toImage(),
|
|
|
+ (on_icon, transient_icon))
|
|
|
+ else:
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+
|
|
|
+ # make one different
|
|
|
+ self._run_command_and_process_events(
|
|
|
+ ['qvm-prefs', 'work', 'template', str(new_template)], timeout=5)
|
|
|
+
|
|
|
+ for action in self.dialog.template_menu.actions():
|
|
|
+ if action.text() == str(old_template):
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
|
|
|
+ elif action.text() == str(new_template):
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
|
|
|
+ else:
|
|
|
+ self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
|
|
|
+
|
|
|
+ # change all to the same value
|
|
|
+ for action in self.dialog.template_menu.actions():
|
|
|
+ if action.text() == str(new_template):
|
|
|
+ action.trigger()
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.fail('{!s} template not found'.format(new_template))
|
|
|
+ # process events
|
|
|
+ self.loop.run_until_complete(asyncio.sleep(0))
|
|
|
+ mock_question.assert_called()
|
|
|
+ self.assertEqual(str(work.template), str(new_template))
|
|
|
+ self.assertEqual(str(personal.template), str(new_template))
|
|
|
+ self.assertEqual(str(untrusted.template), str(new_template))
|
|
|
+
|
|
|
+
|
|
|
@unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.information')
|
|
|
@unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.warning')
|
|
|
def test_300_clear_threads(self, mock_warning, mock_info):
|
|
@@ -1133,27 +1384,31 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
self.assertEqual(call_count, 0)
|
|
|
|
|
|
@unittest.mock.patch('qubesmanager.log_dialog.LogDialog')
|
|
|
- def test_500_logs(self, mock_logDialog):
|
|
|
+ def test_500_logs(self, mock_log_dialog):
|
|
|
self._select_admin_vm()
|
|
|
|
|
|
- self.assertTrue(self.dialog.action_show_logs.isEnabled())
|
|
|
self.dialog.action_show_logs.trigger()
|
|
|
- dom0_logs = mock_logDialog.call_args.args[1]
|
|
|
- self.assertIn('/var/log/xen/console/hypervisor.log', dom0_logs,
|
|
|
- "Log for dom0 does not contain 'hypervisor'")
|
|
|
+ mock_log_dialog.assert_called_once()
|
|
|
+ dom0_logs = mock_log_dialog.mock_calls[0][1][1]
|
|
|
+ for c in dom0_logs:
|
|
|
+ self.assertIn("hypervisor", c,
|
|
|
+ "Log for dom0 does not contain 'hypervisor'")
|
|
|
+
|
|
|
+ mock_log_dialog.reset_mock()
|
|
|
|
|
|
selected_vm = self._select_non_admin_vm(running=True).name
|
|
|
|
|
|
- self.assertTrue(self.dialog.action_show_logs.isEnabled())
|
|
|
self.dialog.action_show_logs.trigger()
|
|
|
- vm_logs = mock_logDialog.call_args.args[1]
|
|
|
- self.assertIn(
|
|
|
- selected_vm,
|
|
|
- ",".join(vm_logs),
|
|
|
- "Log for {} does not contain its name".format(selected_vm))
|
|
|
+ mock_log_dialog.assert_called_once()
|
|
|
+ vm_logs = mock_log_dialog.mock_calls[0][1][1]
|
|
|
+ for c in vm_logs:
|
|
|
+ self.assertIn(
|
|
|
+ selected_vm,
|
|
|
+ c,
|
|
|
+ "Log for {} does not contain its name".format(selected_vm))
|
|
|
|
|
|
self.assertNotEqual(dom0_logs, vm_logs,
|
|
|
- "Same logs found for dom0 and non-adminVM")
|
|
|
+ "Same logs found for dom0 and non-adminVM")
|
|
|
|
|
|
def _find_vm_row(self, vm_name):
|
|
|
for row in range(self.dialog.table.model().rowCount()):
|
|
@@ -1227,7 +1482,7 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
for row in range(self.dialog.table.model().rowCount()):
|
|
|
template = self._get_table_item(row, "Template")
|
|
|
vm = self._get_table_vm(row)
|
|
|
- if template != 'AdminVM' and \
|
|
|
+ if template != 'AdminVM' and not vm.provides_network and \
|
|
|
(running is None
|
|
|
or (running and vm.is_running())
|
|
|
or (not running and not vm.is_running())):
|
|
@@ -1236,19 +1491,28 @@ class QubeManagerTest(unittest.TestCase):
|
|
|
return vm
|
|
|
return None
|
|
|
|
|
|
- def _select_templatevm(self, running=None):
|
|
|
+ def _select_templatevm(self, running=None, different_than=()):
|
|
|
for row in range(self.dialog.table.model().rowCount()):
|
|
|
template = self._get_table_item(row, "Template")
|
|
|
vm = self._get_table_vm(row)
|
|
|
if template == 'TemplateVM' and \
|
|
|
+ (template not in different_than) and \
|
|
|
(running is None
|
|
|
- or (running and vm.is_running())
|
|
|
- or (not running and not vm.is_running())):
|
|
|
+ or (bool(running) == bool(vm.is_running()))):
|
|
|
index = self.dialog.table.model().index(row, 0)
|
|
|
self.dialog.table.setCurrentIndex(index)
|
|
|
return vm
|
|
|
return None
|
|
|
|
|
|
+ def _select_vms(self, vms: list):
|
|
|
+ self.dialog.table.selectionModel().clear()
|
|
|
+ mode = QtCore.QItemSelectionModel.Select | QtCore.QItemSelectionModel.Rows
|
|
|
+ for row in range(self.dialog.table.model().rowCount()):
|
|
|
+ vm = self._get_table_vm(row)
|
|
|
+ if str(vm) in vms:
|
|
|
+ index = self.dialog.table.model().index(row, 0)
|
|
|
+ self.dialog.table.selectionModel().select(index, mode)
|
|
|
+
|
|
|
def __check_sorting(self, column_name):
|
|
|
last_text = None
|
|
|
last_vm = None
|