test_qube_manager.py 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743
  1. #!/usr/bin/python3
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2016 Marta Marczykowska-Górecka
  6. # <marmarta@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, write to the Free Software Foundation, Inc.,
  20. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. #
  22. import asyncio
  23. import contextlib
  24. import functools
  25. import logging.handlers
  26. import unittest
  27. import unittest.mock
  28. import subprocess
  29. import datetime
  30. import time
  31. from PyQt5 import QtTest, QtCore, QtWidgets
  32. from PyQt5.QtCore import (Qt, QSize)
  33. from PyQt5.QtGui import (QIcon)
  34. from qubesadmin import Qubes, events, exc
  35. import qubesmanager.qube_manager as qube_manager
  36. from qubesmanager.tests import init_qtapp
  37. icon_size = qube_manager.icon_size
  38. def listen_for_events(func):
  39. """Wrapper for a test that needs events listener to be registered all the time.
  40. Note the test still needs to yield to the event loop to actually handle events.
  41. """
  42. @functools.wraps(func)
  43. def wrapper(self, *args, **kwargs):
  44. events_listener = \
  45. asyncio.ensure_future(self.dispatcher.listen_for_events())
  46. # let it connect (run until first yield/await)
  47. self.loop.run_until_complete(asyncio.sleep(0))
  48. try:
  49. return func(self, *args, **kwargs)
  50. finally:
  51. events_listener.cancel()
  52. self.loop.call_soon(self.loop.stop)
  53. self.loop.run_forever()
  54. return wrapper
  55. class QubeManagerTest(unittest.TestCase):
  56. def setUp(self):
  57. super(QubeManagerTest, self).setUp()
  58. self.qtapp, self.loop = init_qtapp()
  59. self.mock_qprogress = unittest.mock.patch(
  60. 'PyQt5.QtWidgets.QProgressDialog')
  61. self.mock_qprogress.start()
  62. self.addCleanup(self.mock_qprogress.stop)
  63. self.qapp = Qubes()
  64. self.dispatcher = events.EventsDispatcher(self.qapp)
  65. self.dialog = qube_manager.VmManagerWindow(
  66. self.qtapp, self.qapp, self.dispatcher)
  67. def test_000_window_loads(self):
  68. self.assertTrue(self.dialog.table is not None, "Window did not load")
  69. def test_001_correct_vms_listed(self):
  70. vms_in_table = []
  71. for row in range(self.dialog.table.model().rowCount()):
  72. vm = self._get_table_vm(row)
  73. self.assertIsNotNone(vm)
  74. vms_in_table.append(vm.name)
  75. # check that name is listed correctly
  76. name_item = self._get_table_item(row, "Name")
  77. self.assertEqual(name_item, vm.name,
  78. "Incorrect VM name for {}".format(vm.name))
  79. actual_vms = [vm.name for vm in self.qapp.domains]
  80. self.assertEqual(len(vms_in_table), len(actual_vms),
  81. "Incorrect number of VMs loaded")
  82. self.assertListEqual(sorted(vms_in_table), sorted(actual_vms),
  83. "Incorrect VMs loaded")
  84. def test_002_correct_template_listed(self):
  85. for row in range(self.dialog.table.model().rowCount()):
  86. vm = self._get_table_vm(row)
  87. # check that template is listed correctly
  88. template_item = self._get_table_item(row, "Template")
  89. if getattr(vm, "template", None):
  90. self.assertEqual(vm.template,
  91. template_item,
  92. "Incorrect template for {}".format(vm.name))
  93. else:
  94. self.assertEqual(vm.klass, template_item,
  95. "Incorrect class for {}".format(vm.name))
  96. def test_003_correct_netvm_listed(self):
  97. for row in range(self.dialog.table.model().rowCount()):
  98. vm = self._get_table_vm(row)
  99. # check that netvm is listed correctly
  100. netvm_item = self._get_table_item(row, "NetVM")
  101. netvm_value = getattr(vm, "netvm", None)
  102. if not netvm_value:
  103. netvm_value = "n/a"
  104. if netvm_value and hasattr(vm, "netvm") \
  105. and vm.property_is_default("netvm"):
  106. netvm_value = "default ({})".format(netvm_value)
  107. self.assertEqual(netvm_value,
  108. netvm_item,
  109. "Incorrect netvm for {}".format(vm.name))
  110. def test_004_correct_disk_usage_listed(self):
  111. for row in range(self.dialog.table.model().rowCount()):
  112. vm = self._get_table_vm(row)
  113. size_item = self._get_table_item(row, "Disk Usage")
  114. if vm.klass == 'AdminVM':
  115. size_value = "n/a"
  116. else:
  117. size_value = round(vm.get_disk_utilization() / (1024 * 1024), 2)
  118. size_value = str(size_value) + " MiB"
  119. self.assertEqual(size_value,
  120. size_item,
  121. "Incorrect size for {}".format(vm.name))
  122. def test_005_correct_internal_listed(self):
  123. for row in range(self.dialog.table.model().rowCount()):
  124. vm = self._get_table_vm(row)
  125. internal_item = self._get_table_item(row, "Internal")
  126. internal_value = "Yes" if vm.features.get('internal', False) else ""
  127. self.assertEqual(internal_item, internal_value,
  128. "Incorrect internal value for {}".format(vm.name))
  129. def test_006_correct_ip_listed(self):
  130. for row in range(self.dialog.table.model().rowCount()):
  131. vm = self._get_table_vm(row)
  132. ip_item = self._get_table_item(row, "IP")
  133. if hasattr(vm, 'ip'):
  134. ip_value = getattr(vm, 'ip')
  135. ip_value = "" if ip_value is None else ip_value
  136. else:
  137. ip_value = "n/a"
  138. self.assertEqual(ip_value, ip_item,
  139. "Incorrect ip value for {}".format(vm.name))
  140. def test_007_incl_in_backups_listed(self):
  141. for row in range(self.dialog.table.model().rowCount()):
  142. vm = self._get_table_vm(row)
  143. incl_backups_item = self._get_table_item(row, "Backup", Qt.CheckStateRole) == Qt.Checked
  144. incl_backups_value = getattr(vm, 'include_in_backups', False)
  145. self.assertEqual(
  146. incl_backups_value, incl_backups_item,
  147. "Incorrect include in backups value for {}".format(vm.name))
  148. def test_008_last_backup_listed(self):
  149. for row in range(self.dialog.table.model().rowCount()):
  150. vm = self._get_table_vm(row)
  151. last_backup_item = self._get_table_item(row, "Last backup")
  152. last_backup_value = getattr(vm, 'backup_timestamp', None)
  153. if last_backup_value:
  154. last_backup_value = str(
  155. datetime.datetime.fromtimestamp(last_backup_value))
  156. self.assertEqual(
  157. last_backup_value, last_backup_item,
  158. "Incorrect last backup value for {}".format(vm.name))
  159. def test_009_def_dispvm_listed(self):
  160. for row in range(self.dialog.table.model().rowCount()):
  161. vm = self._get_table_vm(row)
  162. def_dispvm_item = self._get_table_item(row, "Default DispVM")
  163. if vm.property_is_default("default_dispvm"):
  164. def_dispvm_value = "default ({})".format(
  165. vm.property_get_default("default_dispvm"))
  166. else:
  167. def_dispvm_value = getattr(vm, "default_dispvm", None)
  168. self.assertEqual(
  169. def_dispvm_value, def_dispvm_item,
  170. "Incorrect default dispvm value for {}".format(vm.name))
  171. def test_010_is_dvm_template_listed(self):
  172. for row in range(self.dialog.table.model().rowCount()):
  173. vm = self._get_table_vm(row)
  174. is_dvm_template_item = self._get_table_item(row, "Is DVM Template")
  175. is_dvm_template_value = "Yes" if \
  176. getattr(vm, "template_for_dispvms", False) else ""
  177. self.assertEqual(
  178. is_dvm_template_value, is_dvm_template_item,
  179. "Incorrect is DVM template value for {}".format(vm.name))
  180. def test_011_is_label_correct(self):
  181. for row in range(self.dialog.table.model().rowCount()):
  182. vm = self._get_table_vm(row)
  183. icon = QIcon.fromTheme(getattr(vm, 'icon', 'appvm-black'))
  184. icon = icon.pixmap(icon_size)
  185. label_pixmap = self._get_table_item(row, "Label", Qt.DecorationRole)
  186. self.assertEqual(label_pixmap.toImage(), icon.toImage())
  187. def test_012_is_state_correct(self):
  188. for row in range(self.dialog.table.model().rowCount()):
  189. vm = self._get_table_vm(row)
  190. displayed_power_state = self._get_table_item(row, "State")['power']
  191. self.assertEqual(
  192. displayed_power_state, vm.get_power_state(),
  193. "Wrong power state displayed for {}".format(vm.name))
  194. def test_013_incorrect_settings_file(self):
  195. mock_settings = unittest.mock.MagicMock(spec=QtCore.QSettings)
  196. settings_result_dict = {"view/sort_column": "Cthulhu",
  197. "view/sort_order": "Fhtagn",
  198. "view/menubar_visible": "R'lyeh"
  199. }
  200. mock_settings.side_effect = (
  201. lambda x, *args, **kwargs: settings_result_dict.get(x))
  202. with unittest.mock.patch('PyQt5.QtCore.QSettings.value',
  203. mock_settings),\
  204. unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.warning')\
  205. as mock_warning:
  206. self.dialog = qube_manager.VmManagerWindow(
  207. self.qtapp, self.qapp, self.dispatcher)
  208. self.assertEqual(mock_warning.call_count, 1)
  209. def test_100_sorting(self):
  210. col = self.dialog.qubes_model.columns_indices.index("Template")
  211. self.dialog.table.sortByColumn(col, QtCore.Qt.AscendingOrder)
  212. self.__check_sorting("Template")
  213. col = self.dialog.qubes_model.columns_indices.index("Name")
  214. self.dialog.table.sortByColumn(col, QtCore.Qt.AscendingOrder)
  215. self.__check_sorting("Name")
  216. @unittest.mock.patch('qubesmanager.qube_manager.QSettings.setValue')
  217. def test_101_hide_column(self, mock_settings):
  218. model = self.dialog.qubes_model
  219. action_no = model.columns_indices.index('Is DVM Template')
  220. self.dialog.menu_view.actions()[action_no].trigger()
  221. mock_settings.assert_called_with('columns/Is DVM Template', True)
  222. self.dialog.menu_view.actions()[action_no].trigger()
  223. mock_settings.assert_called_with('columns/Is DVM Template', False)
  224. @unittest.mock.patch('qubesmanager.settings.VMSettingsWindow')
  225. def test_200_vm_open_settings(self, mock_window):
  226. selected_vm = self._select_non_admin_vm()
  227. self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
  228. widget = self.dialog.toolbar.widgetForAction(
  229. self.dialog.action_settings)
  230. QtTest.QTest.mouseClick(widget,
  231. QtCore.Qt.LeftButton)
  232. mock_window.assert_called_once_with(
  233. selected_vm, qapp=self.qtapp, init_page="basic", qubesapp=self.qapp)
  234. def test_201_vm_open_settings_admin(self):
  235. self._select_admin_vm()
  236. self.assertFalse(self.dialog.action_settings.isEnabled(),
  237. "Settings not disabled for admin VM")
  238. self.assertFalse(self.dialog.action_editfwrules.isEnabled(),
  239. "Editfw not disabled for admin VM")
  240. self.assertFalse(self.dialog.action_appmenus.isEnabled(),
  241. "Appmenus not disabled for admin VM")
  242. @unittest.mock.patch('qubesmanager.settings.VMSettingsWindow')
  243. def test_202_vm_open_firewall(self, mock_window):
  244. selected_vm = self._select_non_admin_vm()
  245. self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
  246. widget = self.dialog.toolbar.widgetForAction(
  247. self.dialog.action_editfwrules)
  248. QtTest.QTest.mouseClick(widget,
  249. QtCore.Qt.LeftButton)
  250. mock_window.assert_called_once_with(
  251. selected_vm, qapp=self.qtapp, init_page="firewall",
  252. qubesapp=self.qapp)
  253. @unittest.mock.patch('qubesmanager.settings.VMSettingsWindow')
  254. def test_203_vm_open_apps(self, mock_window):
  255. selected_vm = self._select_non_admin_vm()
  256. self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
  257. widget = self.dialog.toolbar.widgetForAction(
  258. self.dialog.action_appmenus)
  259. QtTest.QTest.mouseClick(widget,
  260. QtCore.Qt.LeftButton)
  261. mock_window.assert_called_once_with(
  262. selected_vm, qapp=self.qtapp, init_page="applications",
  263. qubesapp=self.qapp)
  264. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.warning')
  265. def test_204_vm_keyboard(self, mock_message):
  266. selected_vm = self._select_non_admin_vm(running=True)
  267. self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
  268. if 'supported-feature.keyboard-layout' not in selected_vm.features:
  269. self.skipTest("VM {!s} does not support new layout change".format(selected_vm))
  270. widget = self.dialog.toolbar.widgetForAction(
  271. self.dialog.action_set_keyboard_layout)
  272. with unittest.mock.patch.object(selected_vm, 'run') as mock_run:
  273. QtTest.QTest.mouseClick(widget,
  274. QtCore.Qt.LeftButton)
  275. mock_run.assert_called_once_with("qubes-change-keyboard-layout")
  276. self.assertEqual(mock_message.call_count, 0,
  277. "VM does not support new layout change")
  278. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.warning')
  279. def test_205_vm_keyboard_not_running(self, mock_message):
  280. selected_vm = self._select_non_admin_vm(running=False)
  281. self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
  282. widget = self.dialog.toolbar.widgetForAction(
  283. self.dialog.action_set_keyboard_layout)
  284. with unittest.mock.patch.object(selected_vm, 'run') as mock_run:
  285. QtTest.QTest.mouseClick(widget,
  286. QtCore.Qt.LeftButton)
  287. self.assertEqual(mock_run.call_count, 0,
  288. "Keyboard change called on a halted VM")
  289. def test_206_dom0_keyboard(self):
  290. self._select_admin_vm()
  291. self.assertFalse(self.dialog.action_set_keyboard_layout.isEnabled())
  292. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  293. return_value=QtWidgets.QMessageBox.Yes)
  294. def test_207_update_vm_not_running(self, _):
  295. selected_vm = self._select_templatevm(running=False)
  296. self.assertIsNotNone(selected_vm, "No valid template VM found")
  297. widget = self.dialog.toolbar.widgetForAction(
  298. self.dialog.action_updatevm)
  299. with unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread') \
  300. as mock_update:
  301. QtTest.QTest.mouseClick(widget,
  302. QtCore.Qt.LeftButton)
  303. mock_update.assert_called_once_with(selected_vm)
  304. mock_update().start.assert_called_once_with()
  305. def test_208_update_vm_admin(self):
  306. selected_vm = self._select_admin_vm()
  307. self.assertIsNotNone(selected_vm, "No valid admin VM found")
  308. widget = self.dialog.toolbar.widgetForAction(
  309. self.dialog.action_updatevm)
  310. with unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread') \
  311. as mock_update:
  312. QtTest.QTest.mouseClick(widget,
  313. QtCore.Qt.LeftButton)
  314. mock_update.assert_called_once_with(selected_vm)
  315. mock_update().start.assert_called_once_with()
  316. @unittest.mock.patch("PyQt5.QtWidgets.QInputDialog.getText",
  317. return_value=("command to run", True))
  318. def test_209_run_command_in_vm(self, _):
  319. selected_vm = self._select_non_admin_vm()
  320. self.assertIsNotNone(selected_vm, "No valid non-admin VM found")
  321. with unittest.mock.patch('qubesmanager.qube_manager.RunCommandThread') \
  322. as mock_thread:
  323. self.dialog.action_run_command_in_vm.trigger()
  324. mock_thread.assert_called_once_with(selected_vm, "command to run")
  325. mock_thread().finished.connect.assert_called_once_with(
  326. self.dialog.clear_threads)
  327. mock_thread().start.assert_called_once_with()
  328. def test_210_run_command_in_adminvm(self):
  329. self._select_admin_vm()
  330. self.assertFalse(self.dialog.action_run_command_in_vm.isEnabled(),
  331. "Should not be able to run commands for dom0")
  332. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.warning")
  333. def test_211_pausevm(self, mock_warn):
  334. selected_vm = self._select_non_admin_vm(running=True)
  335. self.assertTrue(self.dialog.action_pausevm.isEnabled(),
  336. "Pause not enabled for a running VM")
  337. with unittest.mock.patch.object(selected_vm, 'pause') as mock_pause:
  338. self.dialog.action_pausevm.trigger()
  339. mock_pause.assert_called_once_with()
  340. mock_pause.side_effect = exc.QubesException('Error')
  341. self.dialog.action_pausevm.trigger()
  342. self.assertEqual(mock_warn.call_count, 1)
  343. def test_212_resumevm(self):
  344. selected_vm = self._select_non_admin_vm(running=False)
  345. with unittest.mock.patch.object(selected_vm, 'get_power_state')\
  346. as mock_state, \
  347. unittest.mock.patch.object(selected_vm, 'unpause')\
  348. as mock_unpause:
  349. mock_state.return_value = 'Paused'
  350. self.dialog.action_resumevm.trigger()
  351. mock_unpause.assert_called_once_with()
  352. with unittest.mock.patch('qubesmanager.qube_manager.StartVMThread') \
  353. as mock_thread:
  354. self.dialog.action_resumevm.trigger()
  355. mock_thread.assert_called_once_with(selected_vm)
  356. mock_thread().finished.connect.assert_called_once_with(
  357. self.dialog.clear_threads)
  358. mock_thread().start.assert_called_once_with()
  359. def test_213_resume_running_vm(self):
  360. self._select_non_admin_vm(running=True)
  361. self.assertFalse(self.dialog.action_resumevm.isEnabled())
  362. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  363. return_value=QtWidgets.QMessageBox.Yes)
  364. @unittest.mock.patch('PyQt5.QtCore.QTimer.singleShot')
  365. @unittest.mock.patch('qubesmanager.qube_manager.VmShutdownMonitor')
  366. def test_214_shutdownvm(self, mock_monitor, mock_timer, _):
  367. selected_vm = self._select_non_admin_vm(running=True)
  368. with unittest.mock.patch.object(selected_vm, 'shutdown')\
  369. as mock_shutdown:
  370. self.dialog.action_shutdownvm.trigger()
  371. mock_shutdown.assert_called_once_with()
  372. mock_monitor.assert_called_once_with(
  373. selected_vm,
  374. unittest.mock.ANY, unittest.mock.ANY,
  375. unittest.mock.ANY, unittest.mock.ANY)
  376. mock_timer.assert_called_once_with(unittest.mock.ANY,
  377. unittest.mock.ANY)
  378. def test_215_shutdown_halted_vm(self):
  379. self._select_non_admin_vm(running=False)
  380. self.assertFalse(self.dialog.action_shutdownvm.isEnabled())
  381. @unittest.mock.patch('qubesmanager.create_new_vm.NewVmDlg')
  382. def test_216_create_vm(self, mock_new_vm):
  383. action = self.dialog.action_createvm
  384. self.assertTrue(action.isEnabled())
  385. action.trigger()
  386. self.assertEqual(mock_new_vm.call_count, 1,
  387. "Create New VM window did not appear")
  388. def test_217_remove_admin_vm(self):
  389. self._select_admin_vm()
  390. self.assertFalse(self.dialog.action_removevm.isEnabled())
  391. @unittest.mock.patch("qubesmanager.qube_manager.QMessageBox")
  392. @unittest.mock.patch('qubesadmin.utils.vm_dependencies')
  393. def test_218_remove_vm_dependencies(self, mock_dependencies, mock_msgbox):
  394. mock_vm = unittest.mock.Mock(spec=['name'],
  395. **{'name.return_value': 'test-vm'})
  396. mock_dependencies.return_value = [(mock_vm, "test_prop")]
  397. action = self.dialog.action_removevm
  398. self._select_non_admin_vm()
  399. action.trigger()
  400. mock_msgbox().show.assert_called_with()
  401. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.warning')
  402. @unittest.mock.patch("PyQt5.QtWidgets.QInputDialog.getText")
  403. @unittest.mock.patch('qubesadmin.utils.vm_dependencies')
  404. def test_219_remove_vm_no_depencies(
  405. self, mock_dependencies, mock_input, mock_warning):
  406. action = self.dialog.action_removevm
  407. selected_vm = self._select_non_admin_vm(running=False)
  408. # test with no dependencies
  409. mock_dependencies.return_value = None
  410. with unittest.mock.patch('qubesmanager.common_threads.RemoveVMThread')\
  411. as mock_thread:
  412. mock_input.return_value = (selected_vm, False)
  413. action.trigger()
  414. self.assertEqual(mock_thread.call_count, 0,
  415. "VM removed despite user clicking 'cancel")
  416. mock_input.return_value = ("wrong_name", True)
  417. action.trigger()
  418. self.assertEqual(mock_warning.call_count, 1)
  419. self.assertEqual(mock_thread.call_count, 0,
  420. "VM removed despite user not confirming the name")
  421. mock_input.return_value = (selected_vm.name, True)
  422. action.trigger()
  423. mock_thread.assert_called_once_with(selected_vm)
  424. mock_thread().finished.connect.assert_called_once_with(
  425. self.dialog.clear_threads)
  426. mock_thread().start.assert_called_once_with()
  427. def test_220_restartvm_halted_vm(self):
  428. self._select_non_admin_vm(running=False)
  429. self.assertFalse(self.dialog.action_restartvm.isEnabled())
  430. @unittest.mock.patch('PyQt5.QtCore.QTimer.singleShot')
  431. @unittest.mock.patch('qubesmanager.qube_manager.VmShutdownMonitor')
  432. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  433. return_value=QtWidgets.QMessageBox.Yes)
  434. def test_221_restartvm_running_vm(self, _msgbox, mock_monitor, _qtimer):
  435. selected_vm = self._select_non_admin_vm(running=True)
  436. action = self.dialog.action_restartvm
  437. # currently the VM is running
  438. with unittest.mock.patch.object(selected_vm, 'shutdown')\
  439. as mock_shutdown:
  440. action.trigger()
  441. mock_shutdown.assert_called_once_with()
  442. mock_monitor.assert_called_once_with(
  443. selected_vm, unittest.mock.ANY,
  444. unittest.mock.ANY, True, unittest.mock.ANY)
  445. @unittest.mock.patch('qubesmanager.qube_manager.StartVMThread')
  446. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  447. return_value=QtWidgets.QMessageBox.Yes)
  448. def test_222_restartvm_shutdown_meantime(self, _, mock_thread):
  449. selected_vm = self._select_non_admin_vm(running=True)
  450. action = self.dialog.action_restartvm
  451. # it was shutdown in the meantime
  452. with unittest.mock.patch.object(
  453. selected_vm, 'is_running', **{'return_value': False}):
  454. action.trigger()
  455. mock_thread.assert_called_once_with(selected_vm)
  456. mock_thread().finished.connect.assert_called_once_with(
  457. self.dialog.clear_threads)
  458. mock_thread().start.assert_called_once_with()
  459. @unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread')
  460. def test_223_updatevm_running(self, mock_thread):
  461. selected_vm = self._select_non_admin_vm(running=True)
  462. self.dialog.action_updatevm.trigger()
  463. mock_thread.assert_called_once_with(selected_vm)
  464. mock_thread().finished.connect.assert_called_once_with(
  465. self.dialog.clear_threads)
  466. mock_thread().start.assert_called_once_with()
  467. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  468. return_value=QtWidgets.QMessageBox.Yes)
  469. @unittest.mock.patch('qubesmanager.qube_manager.UpdateVMThread')
  470. def test_224_updatevm_halted(self, mock_thread, _):
  471. selected_vm = self._select_non_admin_vm(running=False)
  472. self.dialog.action_updatevm.trigger()
  473. mock_thread.assert_called_once_with(selected_vm)
  474. mock_thread().finished.connect.assert_called_once_with(
  475. self.dialog.clear_threads)
  476. mock_thread().start.assert_called_once_with()
  477. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  478. return_value=QtWidgets.QMessageBox.Yes)
  479. def test_224_killvm(self, _):
  480. selected_vm = self._select_non_admin_vm(running=True)
  481. action = self.dialog.action_killvm
  482. with unittest.mock.patch.object(selected_vm, 'kill') as mock_kill:
  483. action.trigger()
  484. mock_kill.assert_called_once_with()
  485. @unittest.mock.patch("PyQt5.QtWidgets.QMessageBox.question",
  486. return_value=QtWidgets.QMessageBox.Cancel)
  487. def test_225_killvm_cancel(self, _):
  488. selected_vm = self._select_non_admin_vm(running=True)
  489. action = self.dialog.action_killvm
  490. with unittest.mock.patch.object(selected_vm, 'kill') as mock_kill:
  491. action.trigger()
  492. self.assertEqual(mock_kill.call_count, 0,
  493. "Ignored Cancel on kill VM")
  494. @unittest.mock.patch('qubesmanager.global_settings.GlobalSettingsWindow')
  495. def test_226_global_settings(self, mock_settings):
  496. self._select_non_admin_vm()
  497. self.dialog.action_global_settings.trigger()
  498. self.assertEqual(mock_settings.call_count, 1,
  499. "Global Settings not opened")
  500. self._select_admin_vm()
  501. self.dialog.action_global_settings.trigger()
  502. self.assertEqual(mock_settings.call_count, 2,
  503. "Global Settings not opened for the second time")
  504. @unittest.mock.patch('qubesmanager.backup.BackupVMsWindow')
  505. def test_227_backup(self, mock_backup):
  506. self.dialog.action_backup.trigger()
  507. self.assertTrue(self.dialog.action_backup.isEnabled())
  508. self.assertEqual(mock_backup.call_count, 1,
  509. "Backup window does not appear")
  510. @unittest.mock.patch('qubesmanager.restore.RestoreVMsWindow')
  511. def test_228_restore(self, mock_restore):
  512. self.dialog.action_restore.trigger()
  513. self.assertTrue(self.dialog.action_restore.isEnabled())
  514. self.assertEqual(mock_restore.call_count, 1,
  515. "Backup window does not appear")
  516. @unittest.mock.patch('qubesmanager.qube_manager.AboutDialog')
  517. def test_229_about_qubes(self, mock_about):
  518. self.assertTrue(self.dialog.action_about_qubes.isEnabled())
  519. self.dialog.action_about_qubes.trigger()
  520. self.assertEqual(
  521. mock_about.call_count, 1, "About window does not appear")
  522. def test_230_exit_action(self):
  523. self.assertTrue(self.dialog.action_exit.isEnabled())
  524. with unittest.mock.patch.object(self.dialog, 'close') as mock_close:
  525. self.dialog.action_exit.trigger()
  526. mock_close.assert_called_once_with()
  527. @unittest.mock.patch('subprocess.check_call')
  528. def test_231_template_manager(self, mock_subprocess):
  529. self.assertTrue(self.dialog.action_manage_templates.isEnabled())
  530. self.dialog.action_manage_templates.trigger()
  531. mock_subprocess.assert_called_once_with('qubes-template-manager')
  532. @unittest.mock.patch('qubesmanager.clone_vm.CloneVMDlg')
  533. def test_232_clonevm(self, mock_clone):
  534. action = self.dialog.action_clonevm
  535. self._select_admin_vm()
  536. self.assertFalse(action.isEnabled())
  537. selected_vm = self._select_non_admin_vm()
  538. self.assertTrue(action.isEnabled())
  539. action.trigger()
  540. mock_clone.assert_called_once_with(self.qtapp, self.qapp,
  541. src_vm=selected_vm)
  542. def test_233_search_action(self):
  543. self.qtapp.setActiveWindow(self.dialog.searchbox)
  544. self.dialog.action_search.trigger()
  545. self.assertTrue(self.dialog.searchbox.hasFocus())
  546. # input text
  547. self.dialog.searchbox.setText("sys")
  548. # click outside the widget
  549. QtTest.QTest.mouseClick(self.dialog.table, QtCore.Qt.LeftButton)
  550. # click the widget, check if it is correctly activated and the whole
  551. # text was selected
  552. QtTest.QTest.mouseClick(self.dialog.searchbox, QtCore.Qt.LeftButton)
  553. self.assertTrue(self.dialog.searchbox.hasFocus())
  554. self.assertEqual(self.dialog.searchbox.selectedText(), "sys")
  555. def test_234_searchbox(self):
  556. # look for sys
  557. self.dialog.searchbox.setText("sys")
  558. expected_number = \
  559. len([vm for vm in self.qapp.domains if "sys" in vm.name])
  560. actual_number = self._count_visible_table_rows()
  561. self.assertEqual(expected_number, actual_number,
  562. "Incorrect number of vms shown for 'sys'")
  563. # clear search
  564. self.dialog.searchbox.setText("")
  565. expected_number = len([vm for vm in self.qapp.domains])
  566. actual_number = self._count_visible_table_rows()
  567. self.assertEqual(expected_number, actual_number,
  568. "Incorrect number of vms shown for cleared search box")
  569. def test_235_hide_show_toolbars(self):
  570. with unittest.mock.patch('PyQt5.QtCore.QSettings.setValue')\
  571. as mock_setvalue:
  572. self.dialog.action_menubar.trigger()
  573. mock_setvalue.assert_called_with('view/menubar_visible', False)
  574. self.dialog.action_toolbar.trigger()
  575. mock_setvalue.assert_called_with('view/toolbar_visible', False)
  576. self.assertFalse(self.dialog.menubar.isVisible(),
  577. "Menubar not hidden correctly")
  578. self.assertFalse(self.dialog.toolbar.isVisible(),
  579. "Toolbar not hidden correctly")
  580. def test_236_clear_searchbox(self):
  581. self.dialog.searchbox.setText("text")
  582. self.assertEqual(self.dialog.searchbox.text(), "text")
  583. QtTest.QTest.keyPress(self.dialog, QtCore.Qt.Key_Escape)
  584. self.assertEqual(self.dialog.searchbox.text(), "",
  585. "Escape failed to clear searchbox")
  586. expected_number = len([vm for vm in self.qapp.domains])
  587. actual_number = self._count_visible_table_rows()
  588. self.assertEqual(expected_number, actual_number,
  589. "Incorrect number of vms shown for cleared search box")
  590. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
  591. @listen_for_events
  592. def test_240_network_menu_single(self, mock_question):
  593. mock_question.return_value = QtWidgets.QMessageBox.Yes
  594. target_vm_name = 'work'
  595. self._run_command_and_process_events(
  596. ['qvm-prefs', '-D', target_vm_name, 'netvm'], timeout=20)
  597. self._select_vms(['work'])
  598. selected_vm = self.qapp.domains[target_vm_name]
  599. # reset to default even in case of failure
  600. self.addCleanup(functools.partial(delattr, selected_vm, 'netvm'))
  601. # this is the method to get '==' operator working on icons...
  602. on_icon = QIcon(":/on.png").pixmap(64).toImage()
  603. off_icon = QIcon().pixmap(64).toImage()
  604. for action in self.dialog.network_menu.actions():
  605. if action.text().startswith('default '):
  606. self.assertEqual(action.icon().pixmap(64).toImage(), on_icon)
  607. break
  608. else:
  609. self.fail('default netvm not found')
  610. # change to specific value
  611. for action in self.dialog.network_menu.actions():
  612. if action.text() == 'sys-net':
  613. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  614. action.trigger()
  615. break
  616. else:
  617. self.fail('sys-net netvm not found')
  618. # process events
  619. self.loop.run_until_complete(asyncio.sleep(0))
  620. mock_question.assert_called()
  621. self.assertEqual(str(selected_vm.netvm), 'sys-net')
  622. mock_question.reset_mock()
  623. # change to none
  624. for action in self.dialog.network_menu.actions():
  625. if action.text() == 'None':
  626. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  627. action.trigger()
  628. break
  629. else:
  630. self.fail('"none" netvm not found')
  631. # process events
  632. self.loop.run_until_complete(asyncio.sleep(0))
  633. mock_question.assert_called()
  634. self.assertIsNone(selected_vm.netvm)
  635. mock_question.reset_mock()
  636. # then go back to the default
  637. for action in self.dialog.network_menu.actions():
  638. if action.text().startswith('default '):
  639. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  640. action.trigger()
  641. break
  642. # process events
  643. self.loop.run_until_complete(asyncio.sleep(0))
  644. mock_question.assert_called()
  645. self.assertTrue(selected_vm.property_is_default('netvm'))
  646. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
  647. @listen_for_events
  648. def test_241_network_menu_multiple(self, mock_question):
  649. mock_question.return_value = QtWidgets.QMessageBox.Yes
  650. target_vm_names = ['work', 'personal', 'vault']
  651. work = self.qapp.domains['work']
  652. personal = self.qapp.domains['personal']
  653. vault = self.qapp.domains['vault']
  654. # reset to default even in case of failure
  655. self.addCleanup(functools.partial(delattr, work, 'netvm'))
  656. self.addCleanup(functools.partial(delattr, personal, 'netvm'))
  657. self.addCleanup(functools.partial(setattr, vault, 'netvm', None))
  658. self._run_command_and_process_events(
  659. ['qvm-prefs', '-D', 'work', 'netvm'], timeout=5)
  660. self._run_command_and_process_events(
  661. ['qvm-prefs', '-D', 'personal', 'netvm'], timeout=5)
  662. self._run_command_and_process_events(
  663. ['qvm-prefs', 'vault', ''], timeout=5)
  664. self._select_vms(target_vm_names)
  665. # this is the method to get '==' operator working on icons...
  666. on_icon = QIcon(":/on.png").pixmap(64).toImage()
  667. transient_icon = QIcon(":/transient.png").pixmap(64).toImage()
  668. off_icon = QIcon().pixmap(64).toImage()
  669. for action in self.dialog.network_menu.actions():
  670. if action.text().startswith('default '):
  671. # work, personal
  672. self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
  673. elif action.text() == 'None':
  674. # vault
  675. self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
  676. else:
  677. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  678. # change to specific value
  679. for action in self.dialog.network_menu.actions():
  680. if action.text() == 'sys-net':
  681. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  682. action.trigger()
  683. break
  684. else:
  685. self.fail('sys-net netvm not found')
  686. # process events
  687. self.loop.run_until_complete(asyncio.sleep(0))
  688. mock_question.assert_called()
  689. self.assertEqual(str(work.netvm), 'sys-net')
  690. self.assertEqual(str(personal.netvm), 'sys-net')
  691. self.assertEqual(str(vault.netvm), 'sys-net')
  692. mock_question.reset_mock()
  693. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
  694. @listen_for_events
  695. def test_250_template_menu_single(self, mock_question):
  696. mock_question.return_value = QtWidgets.QMessageBox.Yes
  697. target_vm_name = 'work'
  698. selected_vm = self.qapp.domains[target_vm_name]
  699. if selected_vm.is_running():
  700. self.skipTest(
  701. 'VM {!s} is running, please stop it first'.format(selected_vm))
  702. current_template = selected_vm.template
  703. new_template = self._select_templatevm(
  704. different_than=[str(current_template)])
  705. self._select_vms(['work'])
  706. # reset to previous value even in case of failure
  707. self.addCleanup(functools.partial(
  708. setattr, selected_vm, 'template', str(current_template)))
  709. # this is the method to get '==' operator working on icons...
  710. on_icon = QIcon(":/on.png").pixmap(64).toImage()
  711. off_icon = QIcon().pixmap(64).toImage()
  712. found = False
  713. for action in self.dialog.template_menu.actions():
  714. if action.text() == str(current_template):
  715. self.assertEqual(action.icon().pixmap(64).toImage(), on_icon)
  716. found = True
  717. else:
  718. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  719. if not found:
  720. self.fail(
  721. 'current template value ({!s}) not found in the menu'.format(
  722. current_template))
  723. # change to specific value
  724. for action in self.dialog.template_menu.actions():
  725. if action.text() == str(new_template):
  726. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  727. action.trigger()
  728. break
  729. else:
  730. self.fail('template {!s} not found in the menu'.format(new_template))
  731. # process events
  732. self.loop.run_until_complete(asyncio.sleep(0))
  733. mock_question.assert_called()
  734. # compare str(), to have better error message on mismatch
  735. self.assertEqual(str(selected_vm.template), str(new_template))
  736. mock_question.reset_mock()
  737. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.question')
  738. @listen_for_events
  739. def test_251_template_menu_multiple(self, mock_question):
  740. mock_question.return_value = QtWidgets.QMessageBox.Yes
  741. target_vm_names = ['work', 'personal', 'untrusted']
  742. work = self.qapp.domains['work']
  743. personal = self.qapp.domains['personal']
  744. untrusted = self.qapp.domains['untrusted']
  745. if any(vm.is_running() for vm in [work, personal, untrusted]):
  746. self.skipTest('Any of work, personal, untrusted VM is running')
  747. old_template = work.template
  748. new_template = self._select_templatevm(
  749. different_than=[str(work.template),
  750. str(personal.template),
  751. str(untrusted.template)])
  752. # reset to previous value even in case of failure
  753. self.addCleanup(functools.partial(
  754. setattr, work, 'template', str(work.template)))
  755. self.addCleanup(functools.partial(
  756. setattr, personal, 'template', str(personal.template)))
  757. self.addCleanup(functools.partial(
  758. setattr, untrusted, 'template', str(untrusted.template)))
  759. # set all to the same value
  760. self._run_command_and_process_events(
  761. ['qvm-prefs', 'personal', 'template', str(work.template)], timeout=5)
  762. self._run_command_and_process_events(
  763. ['qvm-prefs', 'untrusted', 'template', str(work.template)], timeout=5)
  764. self._select_vms(target_vm_names)
  765. # this is the method to get '==' operator working on icons...
  766. on_icon = QIcon(":/on.png").pixmap(64).toImage()
  767. transient_icon = QIcon(":/transient.png").pixmap(64).toImage()
  768. off_icon = QIcon().pixmap(64).toImage()
  769. for action in self.dialog.template_menu.actions():
  770. if action.text() == str(old_template):
  771. self.assertIn(
  772. action.icon().pixmap(64).toImage(),
  773. (on_icon, transient_icon))
  774. else:
  775. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  776. # make one different
  777. self._run_command_and_process_events(
  778. ['qvm-prefs', 'work', 'template', str(new_template)], timeout=5)
  779. for action in self.dialog.template_menu.actions():
  780. if action.text() == str(old_template):
  781. self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
  782. elif action.text() == str(new_template):
  783. self.assertEqual(action.icon().pixmap(64).toImage(), transient_icon)
  784. else:
  785. self.assertEqual(action.icon().pixmap(64).toImage(), off_icon)
  786. # change all to the same value
  787. for action in self.dialog.template_menu.actions():
  788. if action.text() == str(new_template):
  789. action.trigger()
  790. break
  791. else:
  792. self.fail('{!s} template not found'.format(new_template))
  793. # process events
  794. self.loop.run_until_complete(asyncio.sleep(0))
  795. mock_question.assert_called()
  796. self.assertEqual(str(work.template), str(new_template))
  797. self.assertEqual(str(personal.template), str(new_template))
  798. self.assertEqual(str(untrusted.template), str(new_template))
  799. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.information')
  800. @unittest.mock.patch('PyQt5.QtWidgets.QMessageBox.warning')
  801. def test_300_clear_threads(self, mock_warning, mock_info):
  802. mock_thread_finished_ok = unittest.mock.Mock(
  803. spec=['isFinished', 'msg', 'msg_is_success'],
  804. msg=None, msg_is_success=False,
  805. **{'isFinished.return_value': True})
  806. mock_thread_not_finished = unittest.mock.Mock(
  807. spec=['isFinished', 'msg', 'msg_is_success'],
  808. msg=None, msg_is_success=False,
  809. **{'isFinished.return_value': False})
  810. mock_thread_finished_error = unittest.mock.Mock(
  811. spec=['isFinished', 'msg', 'msg_is_success'],
  812. msg=("Error", "Error"), msg_is_success=False,
  813. **{'isFinished.return_value': True})
  814. mock_thread_fin_error_success = unittest.mock.Mock(
  815. spec=['isFinished', 'msg', 'msg_is_success'],
  816. msg=("Done", "Done"), msg_is_success=True,
  817. **{'isFinished.return_value': True})
  818. # single finished thread
  819. self.dialog.threads_list = [mock_thread_not_finished,
  820. mock_thread_finished_ok]
  821. self.dialog.clear_threads()
  822. self.assertEqual(mock_warning.call_count, 0)
  823. self.assertEqual(mock_info.call_count, 0)
  824. self.assertEqual(len(self.dialog.threads_list), 1)
  825. # an error thread and some in-progress ones
  826. self.dialog.threads_list = [mock_thread_not_finished,
  827. mock_thread_not_finished,
  828. mock_thread_finished_error]
  829. self.dialog.clear_threads()
  830. self.assertEqual(mock_warning.call_count, 1)
  831. self.assertEqual(mock_info.call_count, 0)
  832. self.assertEqual(len(self.dialog.threads_list), 2)
  833. # an error-success thread and some in-progress ones
  834. self.dialog.threads_list = [mock_thread_not_finished,
  835. mock_thread_not_finished,
  836. mock_thread_fin_error_success,
  837. mock_thread_finished_error]
  838. self.dialog.clear_threads()
  839. self.assertEqual(mock_warning.call_count, 1)
  840. self.assertEqual(mock_info.call_count, 1)
  841. self.assertEqual(len(self.dialog.threads_list), 3)
  842. def test_400_event_domain_added(self):
  843. number_of_vms = self.dialog.table.model().rowCount()
  844. self.addCleanup(subprocess.call, ["qvm-remove", "-f", "test-vm"])
  845. self._run_command_and_process_events(
  846. ["qvm-create", "--label", "red", "test-vm"])
  847. # a single row was added to the table
  848. self.assertEqual(self.dialog.table.model().rowCount(), number_of_vms+1)
  849. # table contains the correct vms
  850. vms_in_table = self._create_set_of_current_vms()
  851. vms_in_system = set([vm.name for vm in self.qapp.domains])
  852. self.assertEqual(vms_in_table, vms_in_system, "Table not updated "
  853. "correctly after add")
  854. # check if sorting works
  855. self.__check_sorting("Name")
  856. # try opening settings for the added vm
  857. for row in range(self.dialog.table.model().rowCount()):
  858. name = self._get_table_item(row, "Name")
  859. if name == "test-vm":
  860. index = self.dialog.table.model().index(row, 0)
  861. self.dialog.table.setCurrentIndex(index)
  862. break
  863. with unittest.mock.patch('qubesmanager.settings.VMSettingsWindow')\
  864. as mock_settings:
  865. self.dialog.action_settings.trigger()
  866. mock_settings.assert_called_once_with(
  867. self.qapp.domains["test-vm"], qapp = self.qtapp,
  868. init_page = "basic", qubesapp = self.qapp)
  869. def test_401_event_domain_removed(self):
  870. initial_vms = self._create_set_of_current_vms()
  871. self._run_command_and_process_events(
  872. ["qvm-create", "--label", "red", "test-vm"])
  873. current_vms = self._create_set_of_current_vms()
  874. self.assertEqual(len(initial_vms) + 1, len(current_vms))
  875. self._run_command_and_process_events(
  876. ["qvm-remove", "--force", "test-vm"])
  877. current_vms = self._create_set_of_current_vms()
  878. self.assertEqual(initial_vms, current_vms)
  879. # check if sorting works
  880. self.__check_sorting("Name")
  881. def test_403_event_dispvm_added(self):
  882. initial_vms = self._create_set_of_current_vms()
  883. dispvm_template = None
  884. for vm in self.qapp.domains:
  885. if getattr(vm, "template_for_dispvms", False):
  886. dispvm_template = vm.name
  887. break
  888. self.assertIsNotNone(dispvm_template,
  889. "Cannot find a template for dispVMs")
  890. # this requires very long timeout, because it takes time for the
  891. # dispvm to vanish
  892. self._run_command_and_process_events(
  893. ["qvm-run", "--dispvm", dispvm_template, "true"], timeout=60,
  894. additional_timeout=60)
  895. final_vms = self._create_set_of_current_vms()
  896. self.assertEqual(initial_vms, final_vms,
  897. "Failed handling of a created-and-removed dispvm")
  898. def test_404_crashing_dispvm(self):
  899. initial_vms = self._create_set_of_current_vms()
  900. dispvm_template = None
  901. for vm in self.qapp.domains:
  902. if getattr(vm, "template_for_dispvms", False):
  903. dispvm_template = vm.name
  904. break
  905. self.assertIsNotNone(dispvm_template,
  906. "Cannot find a template for dispVMs")
  907. current_memory = getattr(self.qapp.domains[dispvm_template], "memory")
  908. self.addCleanup(
  909. subprocess.call,
  910. ["qvm-prefs", dispvm_template, "memory", str(current_memory)])
  911. subprocess.check_call(
  912. ["qvm-prefs", dispvm_template, "memory", "600000"])
  913. self._run_command_and_process_events(
  914. ["qvm-run", "--dispvm", dispvm_template, "true"], timeout=30,
  915. additional_timeout=15)
  916. final_vms = self._create_set_of_current_vms()
  917. self.assertEqual(initial_vms, final_vms,
  918. "Failed handling of dispvm that crashed on start")
  919. def test_405_prop_change_label(self):
  920. target_vm_name = "work"
  921. vm_row = self._find_vm_row(target_vm_name)
  922. current_label = self._get_table_item(vm_row, "Label", Qt.DecorationRole)
  923. self.addCleanup(
  924. subprocess.call, ["qvm-prefs", target_vm_name, "label", "blue"])
  925. self._run_command_and_process_events(
  926. ["qvm-prefs", target_vm_name, "label", "red"], timeout=20)
  927. new_label = self._get_table_item(vm_row, "Label", Qt.DecorationRole)
  928. self.assertNotEqual(current_label.toImage(), new_label.toImage(),
  929. "Label icon did not change")
  930. icon = QIcon.fromTheme(self.qapp.domains[target_vm_name].label.icon)
  931. icon = icon.pixmap(icon_size)
  932. self.assertEqual(new_label.toImage(), icon.toImage(), "Incorrect label")
  933. def test_406_prop_change_template(self):
  934. target_vm_name = "work"
  935. vm_row = self._find_vm_row(target_vm_name)
  936. old_template = self._get_table_item(vm_row, "Template")
  937. new_template = None
  938. for vm in self.qapp.domains:
  939. if vm.klass == 'TemplateVM' and vm.name != old_template:
  940. new_template = vm.name
  941. break
  942. self.addCleanup(
  943. subprocess.call,
  944. ["qvm-prefs", target_vm_name, "template", old_template])
  945. self._run_command_and_process_events(
  946. ["qvm-prefs", target_vm_name, "template", new_template])
  947. self.assertNotEqual(old_template,
  948. self._get_table_item(vm_row, "Template"),
  949. "Template did not change")
  950. self.assertEqual(
  951. self._get_table_item(vm_row, "Template"),
  952. self.qapp.domains[target_vm_name].template.name,
  953. "Incorrect template")
  954. def test_407_prop_change_netvm(self):
  955. target_vm_name = "work"
  956. vm_row = self._find_vm_row(target_vm_name)
  957. old_netvm = self._get_table_item(vm_row, "NetVM")
  958. new_netvm = None
  959. for vm in self.qapp.domains:
  960. if getattr(vm, "provides_network", False) and vm.name != old_netvm:
  961. new_netvm = vm.name
  962. break
  963. self.addCleanup(
  964. subprocess.call, ["qvm-prefs", target_vm_name, "netvm", old_netvm])
  965. self._run_command_and_process_events(
  966. ["qvm-prefs", target_vm_name, "netvm", new_netvm])
  967. self.assertNotEqual(old_netvm,
  968. self._get_table_item(vm_row, "NetVM"),
  969. "NetVM did not change")
  970. self.assertEqual(
  971. self._get_table_item(vm_row, "NetVM"),
  972. self.qapp.domains[target_vm_name].netvm.name,
  973. "Incorrect NetVM")
  974. @unittest.expectedFailure
  975. def test_408_prop_change_internal(self):
  976. target_vm_name = "work"
  977. vm_row = self._find_vm_row(target_vm_name)
  978. self.addCleanup(subprocess.call,
  979. ["qvm-features", "--unset", "work", "interal"])
  980. self._run_command_and_process_events(
  981. ["qvm-features", "work", "interal", "1"])
  982. self.assertEqual(
  983. self._get_table_item(vm_row, "Internal"),
  984. "Yes",
  985. "Incorrect value for internal VM")
  986. self._run_command_and_process_events(
  987. ["qvm-features", "--unset", "work", "interal"])
  988. self.assertEqual(
  989. self._get_table_item(vm_row, "Internal"),
  990. "",
  991. "Incorrect value for non-internal VM")
  992. def test_409_prop_change_ip(self):
  993. target_vm_name = "work"
  994. vm_row = self._find_vm_row(target_vm_name)
  995. old_ip = self._get_table_item(vm_row, "IP")
  996. new_ip = old_ip.replace(".0.", ".5.")
  997. self.addCleanup(
  998. subprocess.call, ["qvm-prefs", target_vm_name, "ip", old_ip])
  999. self._run_command_and_process_events(
  1000. ["qvm-prefs", target_vm_name, "ip", new_ip])
  1001. self.assertNotEqual(old_ip,
  1002. self._get_table_item(vm_row, "IP"),
  1003. "IP did not change")
  1004. self.assertEqual(
  1005. self._get_table_item(vm_row, "IP"),
  1006. self.qapp.domains[target_vm_name].ip,
  1007. "Incorrect IP")
  1008. def test_410_prop_change_in_backups(self):
  1009. target_vm_name = "work"
  1010. vm_row = self._find_vm_row(target_vm_name)
  1011. old_value = self.qapp.domains[target_vm_name].include_in_backups
  1012. new_value = not old_value
  1013. self.addCleanup(
  1014. subprocess.call,
  1015. ["qvm-prefs", target_vm_name, "include_in_backups", str(old_value)])
  1016. self._run_command_and_process_events(
  1017. ["qvm-prefs", target_vm_name, "include_in_backups", str(new_value)])
  1018. self.assertEqual(
  1019. self._get_table_item(vm_row, "Internal"),
  1020. "Yes" if new_value else "",
  1021. "Incorrect value for include_in_backups")
  1022. def test_411_prop_change_last_backup(self):
  1023. target_vm_name = "work"
  1024. target_timestamp = "2015-01-01 17:00:00"
  1025. vm_row = self._find_vm_row(target_vm_name)
  1026. old_value = self._get_table_item(vm_row, "Last backup")
  1027. new_value = datetime.datetime.strptime(
  1028. target_timestamp, "%Y-%m-%d %H:%M:%S")
  1029. self.addCleanup(
  1030. subprocess.call,
  1031. ["qvm-prefs", '-D', target_vm_name, "backup_timestamp"])
  1032. self._run_command_and_process_events(
  1033. ["qvm-prefs", target_vm_name, "backup_timestamp",
  1034. str(int(new_value.timestamp()))])
  1035. self.assertNotEqual(old_value,
  1036. self._get_table_item(vm_row, "Last backup"),
  1037. "Last backup date did not change")
  1038. self.assertEqual(
  1039. self._get_table_item(vm_row, "Last backup"),
  1040. target_timestamp,
  1041. "Incorrect Last backup date")
  1042. def test_412_prop_change_defdispvm(self):
  1043. target_vm_name = "work"
  1044. vm_row = self._find_vm_row(target_vm_name)
  1045. old_default_dispvm =\
  1046. self._get_table_item(vm_row, "Default DispVM")
  1047. new_default_dispvm = None
  1048. for vm in self.qapp.domains:
  1049. if getattr(vm, "template_for_dispvms", False) and vm.name !=\
  1050. old_default_dispvm:
  1051. new_default_dispvm = vm.name
  1052. break
  1053. self.addCleanup(
  1054. subprocess.call,
  1055. ["qvm-prefs", target_vm_name, "default_dispvm", old_default_dispvm])
  1056. self._run_command_and_process_events(
  1057. ["qvm-prefs", target_vm_name, "default_dispvm", new_default_dispvm])
  1058. self.assertNotEqual(
  1059. old_default_dispvm,
  1060. self._get_table_item(vm_row, "Default DispVM"),
  1061. "Default DispVM did not change")
  1062. self.assertEqual(
  1063. self._get_table_item(vm_row, "Default DispVM"),
  1064. self.qapp.domains[target_vm_name].default_dispvm.name,
  1065. "Incorrect Default DispVM")
  1066. def test_413_prop_change_templ_disp(self):
  1067. target_vm_name = "work"
  1068. vm_row = self._find_vm_row(target_vm_name)
  1069. self.addCleanup(
  1070. subprocess.call,
  1071. ["qvm-prefs", "--default", target_vm_name, "template_for_dispvms"])
  1072. self._run_command_and_process_events(
  1073. ["qvm-prefs", target_vm_name, "template_for_dispvms", "True"])
  1074. self.assertEqual(
  1075. self._get_table_item(vm_row, "Is DVM Template"),
  1076. "Yes",
  1077. "Incorrect value for DVM Template")
  1078. self._run_command_and_process_events(
  1079. ["qvm-prefs", "--default", target_vm_name, "template_for_dispvms"])
  1080. self.assertEqual(
  1081. self._get_table_item(vm_row, "Is DVM Template"),
  1082. "",
  1083. "Incorrect value for not DVM Template")
  1084. def test_414_vm_state_change(self):
  1085. target_vm_name = "work"
  1086. vm_row = self._find_vm_row(target_vm_name)
  1087. self.assertFalse(self.qapp.domains[target_vm_name].is_running())
  1088. self.addCleanup(
  1089. subprocess.call,
  1090. ["qvm-shutdown", "--wait", target_vm_name])
  1091. self._run_command_and_process_events(
  1092. ["qvm-start", target_vm_name], timeout=60)
  1093. displayed_state = self._get_table_item(vm_row, "State")
  1094. self.assertEqual(displayed_state['power'], 'Running',
  1095. "Power state failed to update on start")
  1096. self._run_command_and_process_events(
  1097. ["qvm-shutdown", "--wait", target_vm_name], timeout=30)
  1098. displayed_state = self._get_table_item(vm_row, "State")
  1099. self.assertEqual(displayed_state['power'], 'Halted',
  1100. "Power state failed to update on shutdown")
  1101. def test_415_template_vm_started(self):
  1102. # check whether changing state of a template_vm causes all other
  1103. # vms depending on it to check theirs
  1104. target_vm_name = None
  1105. for vm in self.qapp.domains:
  1106. if vm.klass == 'TemplateVM':
  1107. for vm2 in self.qapp.domains:
  1108. if getattr(vm2, 'template', None) == vm.name:
  1109. target_vm_name = vm.name
  1110. break
  1111. if target_vm_name:
  1112. break
  1113. for i in range(self.dialog.table.model().rowCount()):
  1114. self._get_table_vminfo(i).update = unittest.mock.Mock()
  1115. self.addCleanup(
  1116. subprocess.call,
  1117. ["qvm-shutdown", "--wait", target_vm_name])
  1118. self._run_command_and_process_events(
  1119. ["qvm-start", target_vm_name], timeout=60)
  1120. for i in range(self.dialog.table.model().rowCount()):
  1121. call_count = self._get_table_vminfo(
  1122. i).update.call_count
  1123. if self._get_table_item(i, "Template") == target_vm_name:
  1124. self.assertGreater(call_count, 0)
  1125. elif self._get_table_item(i, "Name") == target_vm_name:
  1126. self.assertGreater(call_count, 0)
  1127. else:
  1128. self.assertEqual(call_count, 0)
  1129. @unittest.mock.patch('qubesmanager.log_dialog.LogDialog')
  1130. def test_500_logs(self, mock_log_dialog):
  1131. self._select_admin_vm()
  1132. self.dialog.action_show_logs.trigger()
  1133. mock_log_dialog.assert_called_once()
  1134. dom0_logs = mock_log_dialog.mock_calls[0][1][1]
  1135. for c in dom0_logs:
  1136. self.assertIn("hypervisor", c,
  1137. "Log for dom0 does not contain 'hypervisor'")
  1138. mock_log_dialog.reset_mock()
  1139. selected_vm = self._select_non_admin_vm(running=True).name
  1140. self.dialog.action_show_logs.trigger()
  1141. mock_log_dialog.assert_called_once()
  1142. vm_logs = mock_log_dialog.mock_calls[0][1][1]
  1143. for c in vm_logs:
  1144. self.assertIn(
  1145. selected_vm,
  1146. c,
  1147. "Log for {} does not contain its name".format(selected_vm))
  1148. self.assertNotEqual(dom0_logs, vm_logs,
  1149. "Same logs found for dom0 and non-adminVM")
  1150. def _find_vm_row(self, vm_name):
  1151. for row in range(self.dialog.table.model().rowCount()):
  1152. name = self._get_table_item(row, "Name")
  1153. if name == vm_name:
  1154. return row
  1155. return None
  1156. def _count_visible_table_rows(self):
  1157. result = 0
  1158. for i in range(self.dialog.table.model().rowCount()):
  1159. if not self.dialog.table.isRowHidden(i):
  1160. result += 1
  1161. return result
  1162. def _run_command_and_process_events(self, command, timeout=5,
  1163. additional_timeout=None):
  1164. """
  1165. helper function to run a given command and process eventsDispatcher
  1166. events
  1167. :param command: list of strings, containing the command and all its
  1168. parameters
  1169. :param timeout: default 5 seconds
  1170. :param additional_timeout: default none
  1171. :return:
  1172. """
  1173. asyncio.set_event_loop(self.loop)
  1174. future1 = asyncio.ensure_future(self.dispatcher.listen_for_events())
  1175. self.loop.run_until_complete(asyncio.sleep(0))
  1176. future2 = asyncio.create_subprocess_exec(*command,
  1177. stdout=subprocess.DEVNULL,
  1178. stderr=subprocess.DEVNULL)
  1179. future2 = self.loop.run_until_complete(future2).wait()
  1180. if additional_timeout:
  1181. (done, pending) = self.loop.run_until_complete(
  1182. asyncio.wait({future1, future2}, timeout=timeout,
  1183. return_when=asyncio.FIRST_COMPLETED))
  1184. (done, pending) = self.loop.run_until_complete(
  1185. asyncio.wait(pending, timeout=additional_timeout))
  1186. else:
  1187. (done, pending) = self.loop.run_until_complete(
  1188. asyncio.wait({future1, future2}, timeout=timeout))
  1189. for task in pending:
  1190. with contextlib.suppress(asyncio.CancelledError):
  1191. task.cancel()
  1192. self.loop.call_soon(self.loop.stop)
  1193. self.loop.run_forever()
  1194. def _create_set_of_current_vms(self):
  1195. result = set()
  1196. for i in range(self.dialog.table.model().rowCount()):
  1197. result.add(self._get_table_item(i, "Name"))
  1198. return result
  1199. def _select_admin_vm(self):
  1200. for row in range(self.dialog.table.model().rowCount()):
  1201. template = self._get_table_item(row, "Template")
  1202. if template == 'AdminVM':
  1203. index = self.dialog.table.model().index(row, 0)
  1204. self.dialog.table.setCurrentIndex(index)
  1205. return index.data(Qt.UserRole).vm
  1206. return None
  1207. def _select_non_admin_vm(self, running=None):
  1208. for row in range(self.dialog.table.model().rowCount()):
  1209. template = self._get_table_item(row, "Template")
  1210. vm = self._get_table_vm(row)
  1211. if template != 'AdminVM' and \
  1212. (running is None
  1213. or (running and vm.is_running())
  1214. or (not running and not vm.is_running())):
  1215. index = self.dialog.table.model().index(row, 0)
  1216. self.dialog.table.setCurrentIndex(index)
  1217. return vm
  1218. return None
  1219. def _select_templatevm(self, running=None, different_than=()):
  1220. for row in range(self.dialog.table.model().rowCount()):
  1221. template = self._get_table_item(row, "Template")
  1222. vm = self._get_table_vm(row)
  1223. if template == 'TemplateVM' and \
  1224. (template not in different_than) and \
  1225. (running is None
  1226. or (bool(running) == bool(vm.is_running()))):
  1227. index = self.dialog.table.model().index(row, 0)
  1228. self.dialog.table.setCurrentIndex(index)
  1229. return vm
  1230. return None
  1231. def _select_vms(self, vms: list):
  1232. self.dialog.table.selectionModel().clear()
  1233. mode = QtCore.QItemSelectionModel.Select | QtCore.QItemSelectionModel.Rows
  1234. for row in range(self.dialog.table.model().rowCount()):
  1235. vm = self._get_table_vm(row)
  1236. if str(vm) in vms:
  1237. index = self.dialog.table.model().index(row, 0)
  1238. self.dialog.table.selectionModel().select(index, mode)
  1239. def __check_sorting(self, column_name):
  1240. last_text = None
  1241. last_vm = None
  1242. for row in range(self.dialog.table.model().rowCount()):
  1243. vm = self._get_table_item(row, "Name")
  1244. text = self._get_table_item(row, column_name)
  1245. if row == 0:
  1246. self.assertEqual(vm, "dom0", "dom0 is not sorted first")
  1247. elif last_text is None:
  1248. last_text = text
  1249. last_vm = vm
  1250. else:
  1251. if last_text == text:
  1252. self.assertGreater(
  1253. vm.lower(), last_vm.lower(),
  1254. "Incorrect sorting for {}".format(column_name))
  1255. else:
  1256. self.assertGreater(
  1257. text.lower(), last_text.lower(),
  1258. "Incorrect sorting for {}".format(column_name))
  1259. last_text = text
  1260. last_vm = vm
  1261. def _get_table_vminfo(self, row):
  1262. model = self.dialog.table.model()
  1263. return model.index(row, 0).data(Qt.UserRole)
  1264. def _get_table_vm(self, row):
  1265. model = self.dialog.table.model()
  1266. return model.index(row, 0).data(Qt.UserRole).vm
  1267. def _get_table_item(self, row, column_name, role=Qt.DisplayRole):
  1268. model = self.dialog.table.model()
  1269. column = self.dialog.qubes_model.columns_indices.index(column_name)
  1270. return model.index(row, column).data(role)
  1271. class QubeManagerThreadTest(unittest.TestCase):
  1272. def test_01_startvm_thread(self):
  1273. vm = unittest.mock.Mock(spec=['start'])
  1274. thread = qube_manager.StartVMThread(vm)
  1275. thread.run()
  1276. vm.start.assert_called_once_with()
  1277. def test_02_startvm_thread_error(self):
  1278. vm = unittest.mock.Mock(
  1279. spec=['start'],
  1280. **{'start.side_effect': exc.QubesException('Error')})
  1281. thread = qube_manager.StartVMThread(vm)
  1282. thread.run()
  1283. self.assertIsNotNone(thread.msg)
  1284. def test_10_run_command_thread(self):
  1285. vm = unittest.mock.Mock(spec=['run'])
  1286. thread = qube_manager.RunCommandThread(vm, "test_command")
  1287. thread.run()
  1288. vm.run.assert_called_once_with("test_command")
  1289. def test_11_run_command_thread_error(self):
  1290. vm = unittest.mock.Mock(spec=['run'],
  1291. **{'run.side_effect': ChildProcessError})
  1292. thread = qube_manager.RunCommandThread(vm, "test_command")
  1293. thread.run()
  1294. self.assertIsNotNone(thread.msg)
  1295. @unittest.mock.patch('subprocess.check_call')
  1296. def test_20_update_vm_thread_dom0(self, check_call):
  1297. vm = unittest.mock.Mock(spec=['klass'])
  1298. vm.klass = 'AdminVM'
  1299. thread = qube_manager.UpdateVMThread(vm)
  1300. thread.run()
  1301. check_call.assert_called_once_with(
  1302. ["/usr/bin/qubes-dom0-update", "--clean", "--gui"])
  1303. @unittest.mock.patch('builtins.open')
  1304. @unittest.mock.patch('subprocess.call')
  1305. def test_21_update_vm_thread_running(self, mock_call, mock_open):
  1306. vm = unittest.mock.Mock(
  1307. spec=['klass', 'is_running', 'run_service_for_stdio', 'run_service'],
  1308. **{'is_running.return_value': True})
  1309. vm.klass = 'AppVM'
  1310. vm.run_service_for_stdio.return_value = (b'changed=no\n', None)
  1311. thread = qube_manager.UpdateVMThread(vm)
  1312. thread.run()
  1313. mock_open.assert_called_with(
  1314. '/usr/libexec/qubes-manager/dsa-4371-update', 'rb')
  1315. vm.run_service_for_stdio.assert_called_once_with(
  1316. "qubes.VMShell", user='root', input=unittest.mock.ANY)
  1317. vm.run_service.assert_called_once_with(
  1318. "qubes.InstallUpdatesGUI", user="root", wait=False)
  1319. self.assertEqual(mock_call.call_count, 0)
  1320. @unittest.mock.patch('builtins.open')
  1321. @unittest.mock.patch('subprocess.call')
  1322. def test_22_update_vm_thread_not_running(self, mock_call, mock_open):
  1323. vm = unittest.mock.Mock(
  1324. spec=['klass', 'is_running', 'run_service_for_stdio',
  1325. 'run_service', 'start', 'name'],
  1326. **{'is_running.return_value': False})
  1327. vm.klass = 'AppVM'
  1328. vm.run_service_for_stdio.return_value = (b'changed=yes\n', None)
  1329. thread = qube_manager.UpdateVMThread(vm)
  1330. thread.run()
  1331. mock_open.assert_called_with(
  1332. '/usr/libexec/qubes-manager/dsa-4371-update', 'rb')
  1333. vm.start.assert_called_once_with()
  1334. vm.run_service_for_stdio.assert_called_once_with(
  1335. "qubes.VMShell", user='root', input=unittest.mock.ANY)
  1336. vm.run_service.assert_called_once_with(
  1337. "qubes.InstallUpdatesGUI", user="root", wait=False)
  1338. self.assertEqual(mock_call.call_count, 1)
  1339. @unittest.mock.patch('builtins.open')
  1340. @unittest.mock.patch('subprocess.check_call')
  1341. def test_23_update_vm_thread_error(self, *_args):
  1342. vm = unittest.mock.Mock(
  1343. spec=['klass', 'is_running'],
  1344. **{'is_running.side_effect': ChildProcessError})
  1345. vm.klass = 'AppVM'
  1346. thread = qube_manager.UpdateVMThread(vm)
  1347. thread.run()
  1348. self.assertIsNotNone(thread.msg)
  1349. class VMShutdownMonitorTest(unittest.TestCase):
  1350. @unittest.mock.patch('qubesmanager.qube_manager.QMessageBox')
  1351. @unittest.mock.patch('PyQt5.QtCore.QTimer')
  1352. def test_01_vm_shutdown_correct(self, mock_timer, mock_question):
  1353. mock_vm = unittest.mock.Mock()
  1354. mock_vm.is_running.return_value = False
  1355. monitor = qube_manager.VmShutdownMonitor(mock_vm)
  1356. monitor.restart_vm_if_needed = unittest.mock.Mock()
  1357. monitor.check_if_vm_has_shutdown()
  1358. self.assertEqual(mock_question.call_count, 0)
  1359. self.assertEqual(mock_timer.call_count, 0)
  1360. monitor.restart_vm_if_needed.assert_called_once_with()
  1361. @unittest.mock.patch('qubesmanager.qube_manager.QMessageBox')
  1362. @unittest.mock.patch('PyQt5.QtCore.QTimer.singleShot')
  1363. def test_02_vm_not_shutdown_wait(self, mock_timer, mock_question):
  1364. mock_question().clickedButton.return_value = 1
  1365. mock_question().addButton.return_value = 0
  1366. mock_vm = unittest.mock.Mock()
  1367. mock_vm.is_running.return_value = True
  1368. mock_vm.start_time = datetime.datetime.now().timestamp() - 3000
  1369. monitor = qube_manager.VmShutdownMonitor(mock_vm, shutdown_time=1)
  1370. time.sleep(3)
  1371. monitor.check_if_vm_has_shutdown()
  1372. self.assertEqual(mock_timer.call_count, 1)
  1373. @unittest.mock.patch('qubesmanager.qube_manager.QMessageBox')
  1374. @unittest.mock.patch('PyQt5.QtCore.QTimer.singleShot')
  1375. def test_03_vm_kill(self, mock_timer, mock_question):
  1376. mock_question().clickedButton.return_value = 1
  1377. mock_question().addButton.return_value = 1
  1378. mock_vm = unittest.mock.Mock()
  1379. mock_vm.is_running.return_value = True
  1380. mock_vm.start_time = datetime.datetime.now().timestamp() - 3000
  1381. monitor = qube_manager.VmShutdownMonitor(mock_vm, shutdown_time=1)
  1382. time.sleep(3)
  1383. monitor.restart_vm_if_needed = unittest.mock.Mock()
  1384. monitor.check_if_vm_has_shutdown()
  1385. self.assertEqual(mock_timer.call_count, 0)
  1386. mock_vm.kill.assert_called_once_with()
  1387. monitor.restart_vm_if_needed.assert_called_once_with()
  1388. @unittest.mock.patch('qubesmanager.qube_manager.QMessageBox')
  1389. @unittest.mock.patch('PyQt5.QtCore.QTimer.singleShot')
  1390. def test_04_check_later(self, mock_timer, mock_question):
  1391. mock_vm = unittest.mock.Mock()
  1392. mock_vm.is_running.return_value = True
  1393. mock_vm.start_time = datetime.datetime.now().timestamp() - 3000
  1394. monitor = qube_manager.VmShutdownMonitor(mock_vm, shutdown_time=3000)
  1395. time.sleep(1)
  1396. monitor.check_if_vm_has_shutdown()
  1397. self.assertEqual(mock_question.call_count, 0)
  1398. self.assertEqual(mock_timer.call_count, 1)
  1399. if __name__ == "__main__":
  1400. ha_syslog = logging.handlers.SysLogHandler('/dev/log')
  1401. ha_syslog.setFormatter(
  1402. logging.Formatter('%(name)s[%(process)d]: %(message)s'))
  1403. logging.root.addHandler(ha_syslog)
  1404. unittest.main()