basic.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. # pylint: disable=invalid-name
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2014-2015
  6. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. #
  9. # This library is free software; you can redistribute it and/or
  10. # modify it under the terms of the GNU Lesser General Public
  11. # License as published by the Free Software Foundation; either
  12. # version 2.1 of the License, or (at your option) any later version.
  13. #
  14. # This library is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  17. # Lesser General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Lesser General Public
  20. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  21. #
  22. from distutils import spawn
  23. import asyncio
  24. import os
  25. import subprocess
  26. import tempfile
  27. import time
  28. import unittest
  29. import collections
  30. import qubes
  31. import qubes.firewall
  32. import qubes.tests
  33. import qubes.storage
  34. import qubes.vm.appvm
  35. import qubes.vm.qubesvm
  36. import qubes.vm.standalonevm
  37. import qubes.vm.templatevm
  38. import libvirt # pylint: disable=import-error
  39. class TC_00_Basic(qubes.tests.SystemTestCase):
  40. def setUp(self):
  41. super(TC_00_Basic, self).setUp()
  42. self.init_default_template()
  43. def test_000_qubes_create(self):
  44. self.assertIsInstance(self.app, qubes.Qubes)
  45. def test_100_qvm_create(self):
  46. vmname = self.make_vm_name('appvm')
  47. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  48. name=vmname, template=self.app.default_template,
  49. label='red')
  50. self.assertIsNotNone(vm)
  51. self.assertEqual(vm.name, vmname)
  52. self.assertEqual(vm.template, self.app.default_template)
  53. self.loop.run_until_complete(vm.create_on_disk())
  54. with self.assertNotRaises(qubes.exc.QubesException):
  55. self.loop.run_until_complete(vm.storage.verify())
  56. def test_040_qdb_watch(self):
  57. flag = set()
  58. def handler(vm, event, path):
  59. if path == '/test-watch-path':
  60. flag.add(True)
  61. vm = self.app.domains[0]
  62. vm.watch_qdb_path('/test-watch-path')
  63. vm.add_handler('domain-qdb-change:/test-watch-path', handler)
  64. self.assertFalse(flag)
  65. vm.untrusted_qdb.write('/test-watch-path', 'test-value')
  66. self.loop.run_until_complete(asyncio.sleep(0.1))
  67. self.assertTrue(flag)
  68. def _test_200_on_domain_start(self, vm, event, **_kwargs):
  69. '''Simulate domain crash just after startup'''
  70. vm.libvirt_domain.destroy()
  71. def test_200_shutdown_event_race(self):
  72. '''Regression test for 3164'''
  73. vmname = self.make_vm_name('appvm')
  74. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  75. name=vmname, template=self.app.default_template,
  76. label='red')
  77. # help the luck a little - don't wait for qrexec to easier win the race
  78. self.vm.features['qrexec'] = False
  79. self.loop.run_until_complete(self.vm.create_on_disk())
  80. # another way to help the luck a little - make sure the private
  81. # volume is first in (normally unordered) dict - this way if any
  82. # volume action fails, it will be at or after private volume - not
  83. # before (preventing private volume action)
  84. old_volumes = self.vm.volumes
  85. self.vm.volumes = collections.OrderedDict()
  86. self.vm.volumes['private'] = old_volumes.pop('private')
  87. self.vm.volumes.update(old_volumes.items())
  88. del old_volumes
  89. self.loop.run_until_complete(self.vm.start())
  90. # kill it the way it does not give a chance for domain-shutdown it
  91. # execute
  92. self.vm.libvirt_domain.destroy()
  93. # now, lets try to start the VM again, before domain-shutdown event
  94. # got handled (#3164), and immediately trigger second domain-shutdown
  95. self.vm.add_handler('domain-start', self._test_200_on_domain_start)
  96. self.loop.run_until_complete(self.vm.start())
  97. # and give a chance for both domain-shutdown handlers to execute
  98. self.loop.run_until_complete(asyncio.sleep(1))
  99. with self.assertNotRaises(qubes.exc.QubesException):
  100. # if the above caused two domain-shutdown handlers being called
  101. # one after another, private volume is gone
  102. self.loop.run_until_complete(self.vm.storage.verify())
  103. def _test_201_on_domain_pre_start(self, vm, event, **_kwargs):
  104. '''Simulate domain crash just after startup'''
  105. if not self.domain_shutdown_handled and not self.test_failure_reason:
  106. self.test_failure_reason = \
  107. 'domain-shutdown event was not dispatched before subsequent ' \
  108. 'start'
  109. self.domain_shutdown_handled = False
  110. def _test_201_domain_shutdown_handler(self, vm, event, **kwargs):
  111. if self.domain_shutdown_handled and not self.test_failure_reason:
  112. self.test_failure_reason = 'domain-shutdown event received twice'
  113. self.domain_shutdown_handled = True
  114. def test_201_shutdown_event_race(self):
  115. '''Regression test for 3164 - pure events edition'''
  116. vmname = self.make_vm_name('appvm')
  117. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  118. name=vmname, template=self.app.default_template,
  119. label='red')
  120. # help the luck a little - don't wait for qrexec to easier win the race
  121. self.vm.features['qrexec'] = False
  122. self.loop.run_until_complete(self.vm.create_on_disk())
  123. # do not throw exception from inside event handler - test framework
  124. # will not recover from it (various objects leaks)
  125. self.test_failure_reason = None
  126. self.domain_shutdown_handled = False
  127. self.vm.add_handler('domain-shutdown',
  128. self._test_201_domain_shutdown_handler)
  129. self.loop.run_until_complete(self.vm.start())
  130. if self.test_failure_reason:
  131. self.fail(self.test_failure_reason)
  132. self.vm.add_handler('domain-pre-start',
  133. self._test_201_on_domain_pre_start)
  134. # kill it the way it does not give a chance for domain-shutdown it
  135. # execute
  136. self.vm.libvirt_domain.destroy()
  137. # now, lets try to start the VM again, before domain-shutdown event
  138. # got handled (#3164), and immediately trigger second domain-shutdown
  139. self.vm.add_handler('domain-start', self._test_200_on_domain_start)
  140. self.loop.run_until_complete(self.vm.start())
  141. if self.test_failure_reason:
  142. self.fail(self.test_failure_reason)
  143. # and give a chance for both domain-shutdown handlers to execute
  144. self.loop.run_until_complete(asyncio.sleep(1))
  145. if self.test_failure_reason:
  146. self.fail(self.test_failure_reason)
  147. self.assertTrue(self.domain_shutdown_handled,
  148. 'second domain-shutdown event was not dispatched after domain '
  149. 'shutdown')
  150. class TC_01_Properties(qubes.tests.SystemTestCase):
  151. # pylint: disable=attribute-defined-outside-init
  152. def setUp(self):
  153. super(TC_01_Properties, self).setUp()
  154. self.init_default_template()
  155. self.vmname = self.make_vm_name('appvm')
  156. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
  157. template=self.app.default_template,
  158. label='red')
  159. self.loop.run_until_complete(self.vm.create_on_disk())
  160. self.addCleanup(self.cleanup_props)
  161. def cleanup_props(self):
  162. del self.vm
  163. def test_030_clone(self):
  164. try:
  165. testvm1 = self.app.add_new_vm(
  166. qubes.vm.appvm.AppVM,
  167. name=self.make_vm_name("vm"),
  168. template=self.app.default_template,
  169. label='red')
  170. self.loop.run_until_complete(testvm1.create_on_disk())
  171. testvm2 = self.app.add_new_vm(testvm1.__class__,
  172. name=self.make_vm_name("clone"),
  173. template=testvm1.template,
  174. label='red')
  175. testvm2.clone_properties(testvm1)
  176. testvm2.firewall.clone(testvm1.firewall)
  177. self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
  178. self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
  179. self.assertIn('source', testvm1.volumes['root'].config)
  180. self.assertNotEquals(testvm2, None)
  181. self.assertNotEquals(testvm2.volumes, {})
  182. self.assertIn('source', testvm2.volumes['root'].config)
  183. # qubes.xml reload
  184. self.app.save()
  185. testvm1 = self.app.domains[testvm1.qid]
  186. testvm2 = self.app.domains[testvm2.qid]
  187. self.assertEqual(testvm1.label, testvm2.label)
  188. self.assertEqual(testvm1.netvm, testvm2.netvm)
  189. self.assertEqual(testvm1.property_is_default('netvm'),
  190. testvm2.property_is_default('netvm'))
  191. self.assertEqual(testvm1.kernel, testvm2.kernel)
  192. self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
  193. self.assertEqual(testvm1.property_is_default('kernel'),
  194. testvm2.property_is_default('kernel'))
  195. self.assertEqual(testvm1.property_is_default('kernelopts'),
  196. testvm2.property_is_default('kernelopts'))
  197. self.assertEqual(testvm1.memory, testvm2.memory)
  198. self.assertEqual(testvm1.maxmem, testvm2.maxmem)
  199. self.assertEqual(testvm1.devices, testvm2.devices)
  200. self.assertEqual(testvm1.include_in_backups,
  201. testvm2.include_in_backups)
  202. self.assertEqual(testvm1.default_user, testvm2.default_user)
  203. self.assertEqual(testvm1.features, testvm2.features)
  204. self.assertEqual(testvm1.firewall.rules,
  205. testvm2.firewall.rules)
  206. # now some non-default values
  207. testvm1.netvm = None
  208. testvm1.label = 'orange'
  209. testvm1.memory = 512
  210. firewall = testvm1.firewall
  211. firewall.rules = [
  212. qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
  213. proto='tcp', dstports=22)]
  214. firewall.save()
  215. testvm3 = self.app.add_new_vm(testvm1.__class__,
  216. name=self.make_vm_name("clone2"),
  217. template=testvm1.template,
  218. label='red',)
  219. testvm3.clone_properties(testvm1)
  220. testvm3.firewall.clone(testvm1.firewall)
  221. self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
  222. # qubes.xml reload
  223. self.app.save()
  224. testvm1 = self.app.domains[testvm1.qid]
  225. testvm3 = self.app.domains[testvm3.qid]
  226. self.assertEqual(testvm1.label, testvm3.label)
  227. self.assertEqual(testvm1.netvm, testvm3.netvm)
  228. self.assertEqual(testvm1.property_is_default('netvm'),
  229. testvm3.property_is_default('netvm'))
  230. self.assertEqual(testvm1.kernel, testvm3.kernel)
  231. self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
  232. self.assertEqual(testvm1.property_is_default('kernel'),
  233. testvm3.property_is_default('kernel'))
  234. self.assertEqual(testvm1.property_is_default('kernelopts'),
  235. testvm3.property_is_default('kernelopts'))
  236. self.assertEqual(testvm1.memory, testvm3.memory)
  237. self.assertEqual(testvm1.maxmem, testvm3.maxmem)
  238. self.assertEqual(testvm1.devices, testvm3.devices)
  239. self.assertEqual(testvm1.include_in_backups,
  240. testvm3.include_in_backups)
  241. self.assertEqual(testvm1.default_user, testvm3.default_user)
  242. self.assertEqual(testvm1.features, testvm3.features)
  243. self.assertEqual(testvm1.firewall.rules,
  244. testvm3.firewall.rules)
  245. finally:
  246. try:
  247. del firewall
  248. except NameError:
  249. pass
  250. try:
  251. del testvm1
  252. except NameError:
  253. pass
  254. try:
  255. del testvm2
  256. except NameError:
  257. pass
  258. try:
  259. del testvm3
  260. except NameError:
  261. pass
  262. def test_020_name_conflict_app(self):
  263. # TODO decide what exception should be here
  264. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  265. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  266. name=self.vmname, template=self.app.default_template,
  267. label='red')
  268. self.loop.run_until_complete(self.vm2.create_on_disk())
  269. def test_021_name_conflict_template(self):
  270. # TODO decide what exception should be here
  271. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  272. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  273. name=self.vmname, label='red')
  274. self.loop.run_until_complete(self.vm2.create_on_disk())
  275. class TC_02_QvmPrefs(qubes.tests.SystemTestCase):
  276. # pylint: disable=attribute-defined-outside-init
  277. def setUp(self):
  278. super(TC_02_QvmPrefs, self).setUp()
  279. self.init_default_template()
  280. self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
  281. def setup_appvm(self):
  282. self.testvm = self.app.add_new_vm(
  283. qubes.vm.appvm.AppVM,
  284. name=self.make_vm_name("vm"),
  285. label='red')
  286. self.loop.run_until_complete(self.testvm.create_on_disk())
  287. self.app.save()
  288. def setup_hvm(self):
  289. self.testvm = self.app.add_new_vm(
  290. qubes.vm.appvm.AppVM,
  291. name=self.make_vm_name("hvm"),
  292. label='red')
  293. self.testvm.virt_mode = 'hvm'
  294. self.loop.run_until_complete(self.testvm.create_on_disk())
  295. self.app.save()
  296. def pref_set(self, name, value, valid=True):
  297. self.loop.run_until_complete(self._pref_set(name, value, valid))
  298. @asyncio.coroutine
  299. def _pref_set(self, name, value, valid=True):
  300. cmd = ['qvm-prefs']
  301. if value != '-D':
  302. cmd.append('--')
  303. cmd.extend((self.testvm.name, name, value))
  304. p = yield from asyncio.create_subprocess_exec(*cmd,
  305. stdout=subprocess.PIPE,
  306. stderr=subprocess.PIPE)
  307. (stdout, stderr) = yield from p.communicate()
  308. if valid:
  309. self.assertEqual(p.returncode, 0,
  310. "qvm-prefs .. '{}' '{}' failed: {}{}".format(
  311. name, value, stdout, stderr
  312. ))
  313. else:
  314. self.assertNotEquals(p.returncode, 0,
  315. "qvm-prefs should reject value '{}' for "
  316. "property '{}'".format(value, name))
  317. def pref_get(self, name):
  318. self.loop.run_until_complete(self._pref_get(name))
  319. @asyncio.coroutine
  320. def _pref_get(self, name):
  321. p = yield from asyncio.create_subprocess_exec(
  322. 'qvm-prefs', *self.sharedopts, '--', self.testvm.name, name,
  323. stdout=subprocess.PIPE)
  324. (stdout, _) = yield from p.communicate()
  325. self.assertEqual(p.returncode, 0)
  326. return stdout.strip()
  327. bool_test_values = [
  328. ('true', 'True', True),
  329. ('False', 'False', True),
  330. ('0', 'False', True),
  331. ('1', 'True', True),
  332. ('invalid', '', False)
  333. ]
  334. def execute_tests(self, name, values):
  335. """
  336. Helper function, which executes tests for given property.
  337. :param values: list of tuples (value, expected, valid),
  338. where 'value' is what should be set and 'expected' is what should
  339. qvm-prefs returns as a property value and 'valid' marks valid and
  340. invalid values - if it's False, qvm-prefs should reject the value
  341. :return: None
  342. """
  343. for (value, expected, valid) in values:
  344. self.pref_set(name, value, valid)
  345. if valid:
  346. self.assertEqual(self.pref_get(name), expected)
  347. @unittest.skip('test not converted to core3 API')
  348. def test_006_template(self):
  349. templates = [tpl for tpl in self.app.domains.values() if
  350. isinstance(tpl, qubes.vm.templatevm.TemplateVM)]
  351. if not templates:
  352. self.skipTest("No templates installed")
  353. some_template = templates[0].name
  354. self.setup_appvm()
  355. self.execute_tests('template', [
  356. (some_template, some_template, True),
  357. ('invalid', '', False),
  358. ])
  359. @unittest.skip('test not converted to core3 API')
  360. def test_014_pcidevs(self):
  361. self.setup_appvm()
  362. self.execute_tests('pcidevs', [
  363. ('[]', '[]', True),
  364. ('[ "00:00.0" ]', "['00:00.0']", True),
  365. ('invalid', '', False),
  366. ('[invalid]', '', False),
  367. # TODO:
  368. # ('["12:12.0"]', '', False)
  369. ])
  370. @unittest.skip('test not converted to core3 API')
  371. def test_024_pv_reject_hvm_props(self):
  372. self.setup_appvm()
  373. self.execute_tests('guiagent_installed', [('False', '', False)])
  374. self.execute_tests('qrexec_installed', [('False', '', False)])
  375. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  376. self.execute_tests('timezone', [('localtime', '', False)])
  377. @unittest.skip('test not converted to core3 API')
  378. def test_025_hvm_reject_pv_props(self):
  379. self.setup_hvm()
  380. self.execute_tests('kernel', [('default', '', False)])
  381. self.execute_tests('kernelopts', [('default', '', False)])
  382. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
  383. # pylint: disable=attribute-defined-outside-init
  384. def setUp(self):
  385. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  386. self.init_default_template()
  387. def cleanup_template(self):
  388. del self.test_template
  389. def setup_template(self):
  390. self.test_template = self.app.add_new_vm(
  391. qubes.vm.templatevm.TemplateVM,
  392. name=self.make_vm_name("pv-clone"),
  393. label='red'
  394. )
  395. self.addCleanup(self.cleanup_template)
  396. self.test_template.clone_properties(self.app.default_template)
  397. self.test_template.features.update(self.app.default_template.features)
  398. self.test_template.tags.update(self.app.default_template.tags)
  399. self.loop.run_until_complete(
  400. self.test_template.clone_disk_files(self.app.default_template))
  401. self.test_template.volumes['root'].revisions_to_keep = 3
  402. self.app.save()
  403. def get_rootimg_checksum(self):
  404. return subprocess.check_output(
  405. ['sha1sum', self.test_template.volumes['root'].path])
  406. def _do_test(self):
  407. checksum_before = self.get_rootimg_checksum()
  408. self.loop.run_until_complete(self.test_template.start())
  409. self.shutdown_and_wait(self.test_template)
  410. checksum_changed = self.get_rootimg_checksum()
  411. if checksum_before == checksum_changed:
  412. self.log.warning("template not modified, test result will be "
  413. "unreliable")
  414. self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
  415. revert_cmd = ['qvm-volume', 'revert', self.test_template.name + ':root']
  416. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  417. *revert_cmd))
  418. self.loop.run_until_complete(p.wait())
  419. self.assertEqual(p.returncode, 0)
  420. del p
  421. checksum_after = self.get_rootimg_checksum()
  422. self.assertEqual(checksum_before, checksum_after)
  423. def test_000_revert_linux(self):
  424. """
  425. Test qvm-revert-template-changes for PV template
  426. """
  427. self.setup_template()
  428. self._do_test()
  429. @unittest.skip('TODO: some non-linux system')
  430. def test_001_revert_non_linux(self):
  431. """
  432. Test qvm-revert-template-changes for HVM template
  433. """
  434. # TODO: have some system there, so the root.img will get modified
  435. self.setup_template()
  436. self._do_test()
  437. class TC_30_Gui_daemon(qubes.tests.SystemTestCase):
  438. def setUp(self):
  439. super(TC_30_Gui_daemon, self).setUp()
  440. self.init_default_template()
  441. @unittest.skipUnless(
  442. spawn.find_executable('xdotool'),
  443. "xdotool not installed")
  444. def test_000_clipboard(self):
  445. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  446. name=self.make_vm_name('vm1'), label='red')
  447. self.loop.run_until_complete(testvm1.create_on_disk())
  448. testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  449. name=self.make_vm_name('vm2'), label='red')
  450. self.loop.run_until_complete(testvm2.create_on_disk())
  451. self.app.save()
  452. self.loop.run_until_complete(asyncio.wait([
  453. testvm1.start(),
  454. testvm2.start()]))
  455. self.loop.run_until_complete(asyncio.wait([
  456. self.wait_for_session(testvm1),
  457. self.wait_for_session(testvm2)]))
  458. window_title = 'user@{}'.format(testvm1.name)
  459. self.loop.run_until_complete(testvm1.run(
  460. 'zenity --text-info --editable --title={}'.format(window_title)))
  461. self.wait_for_window(window_title)
  462. time.sleep(0.5)
  463. test_string = "test{}".format(testvm1.xid)
  464. # Type and copy some text
  465. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  466. 'windowactivate', '--sync',
  467. 'type', test_string])
  468. # second xdotool call because type --terminator do not work (SEGV)
  469. # additionally do not use search here, so window stack will be empty
  470. # and xdotool will use XTEST instead of generating events manually -
  471. # this will be much better - at least because events will have
  472. # correct timestamp (so gui-daemon would not drop the copy request)
  473. subprocess.check_call(['xdotool',
  474. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  475. 'Escape'])
  476. clipboard_content = \
  477. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  478. self.assertEqual(clipboard_content, test_string,
  479. "Clipboard copy operation failed - content")
  480. clipboard_source = \
  481. open('/var/run/qubes/qubes-clipboard.bin.source',
  482. 'r').read().strip()
  483. self.assertEqual(clipboard_source, testvm1.name,
  484. "Clipboard copy operation failed - owner")
  485. # Then paste it to the other window
  486. window_title = 'user@{}'.format(testvm2.name)
  487. p = self.loop.run_until_complete(testvm2.run(
  488. 'zenity --entry --title={} > /tmp/test.txt'.format(window_title)))
  489. self.wait_for_window(window_title)
  490. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  491. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  492. self.loop.run_until_complete(p.wait())
  493. # And compare the result
  494. (test_output, _) = self.loop.run_until_complete(
  495. testvm2.run_for_stdio('cat /tmp/test.txt'))
  496. self.assertEqual(test_string, test_output.strip().decode('ascii'))
  497. clipboard_content = \
  498. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  499. self.assertEqual(clipboard_content, "",
  500. "Clipboard not wiped after paste - content")
  501. clipboard_source = \
  502. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
  503. read().strip()
  504. self.assertEqual(clipboard_source, "",
  505. "Clipboard not wiped after paste - owner")
  506. class TC_05_StandaloneVMMixin(object):
  507. def setUp(self):
  508. super(TC_05_StandaloneVMMixin, self).setUp()
  509. self.init_default_template(self.template)
  510. def test_000_create_start(self):
  511. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  512. name=self.make_vm_name('vm1'), label='red')
  513. self.testvm1.features.update(self.app.default_template.features)
  514. self.loop.run_until_complete(
  515. self.testvm1.clone_disk_files(self.app.default_template))
  516. self.app.save()
  517. self.loop.run_until_complete(self.testvm1.start())
  518. self.assertEqual(self.testvm1.get_power_state(), "Running")
  519. def test_100_resize_root_img(self):
  520. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  521. name=self.make_vm_name('vm1'), label='red')
  522. self.testvm1.features.update(self.app.default_template.features)
  523. self.loop.run_until_complete(
  524. self.testvm1.clone_disk_files(self.app.default_template))
  525. self.app.save()
  526. try:
  527. self.loop.run_until_complete(
  528. self.testvm1.storage.resize(self.testvm1.volumes['root'],
  529. 20 * 1024 ** 3))
  530. except (subprocess.CalledProcessError,
  531. qubes.storage.StoragePoolException) as e:
  532. # exception object would leak VM reference
  533. self.fail(str(e))
  534. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  535. self.loop.run_until_complete(self.testvm1.start())
  536. # new_size in 1k-blocks
  537. (new_size, _) = self.loop.run_until_complete(
  538. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  539. # some safety margin for FS metadata
  540. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  541. def test_101_resize_root_img_online(self):
  542. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  543. name=self.make_vm_name('vm1'), label='red')
  544. self.testvm1.features['qrexec'] = True
  545. self.loop.run_until_complete(
  546. self.testvm1.clone_disk_files(self.app.default_template))
  547. self.testvm1.features.update(self.app.default_template.features)
  548. self.app.save()
  549. self.loop.run_until_complete(self.testvm1.start())
  550. try:
  551. self.loop.run_until_complete(
  552. self.testvm1.storage.resize(self.testvm1.volumes['root'],
  553. 20 * 1024 ** 3))
  554. except (subprocess.CalledProcessError,
  555. qubes.storage.StoragePoolException) as e:
  556. # exception object would leak VM reference
  557. self.fail(str(e))
  558. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  559. # new_size in 1k-blocks
  560. (new_size, _) = self.loop.run_until_complete(
  561. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  562. # some safety margin for FS metadata
  563. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  564. def load_tests(loader, tests, pattern):
  565. for template in qubes.tests.list_templates():
  566. tests.addTests(loader.loadTestsFromTestCase(
  567. type(
  568. 'TC_05_StandaloneVM_' + template,
  569. (TC_05_StandaloneVMMixin, qubes.tests.SystemTestCase),
  570. {'template': template})))
  571. return tests
  572. # vim: ts=4 sw=4 et