basic.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. # pylint: disable=invalid-name
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2014-2015
  6. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. #
  9. # This library is free software; you can redistribute it and/or
  10. # modify it under the terms of the GNU Lesser General Public
  11. # License as published by the Free Software Foundation; either
  12. # version 2.1 of the License, or (at your option) any later version.
  13. #
  14. # This library is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  17. # Lesser General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Lesser General Public
  20. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  21. #
  22. from distutils import spawn
  23. import asyncio
  24. import os
  25. import subprocess
  26. import tempfile
  27. import time
  28. import unittest
  29. import collections
  30. import pkg_resources
  31. import qubes
  32. import qubes.firewall
  33. import qubes.tests
  34. import qubes.storage
  35. import qubes.vm.appvm
  36. import qubes.vm.qubesvm
  37. import qubes.vm.standalonevm
  38. import qubes.vm.templatevm
  39. import libvirt # pylint: disable=import-error
  40. class TC_00_Basic(qubes.tests.SystemTestCase):
  41. def setUp(self):
  42. super(TC_00_Basic, self).setUp()
  43. self.init_default_template()
  44. def test_000_qubes_create(self):
  45. self.assertIsInstance(self.app, qubes.Qubes)
  46. def test_100_qvm_create(self):
  47. vmname = self.make_vm_name('appvm')
  48. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  49. name=vmname, template=self.app.default_template,
  50. label='red')
  51. self.assertIsNotNone(vm)
  52. self.assertEqual(vm.name, vmname)
  53. self.assertEqual(vm.template, self.app.default_template)
  54. self.loop.run_until_complete(vm.create_on_disk())
  55. with self.assertNotRaises(qubes.exc.QubesException):
  56. self.loop.run_until_complete(vm.storage.verify())
  57. def test_040_qdb_watch(self):
  58. flag = set()
  59. def handler(vm, event, path):
  60. if path == '/test-watch-path':
  61. flag.add(True)
  62. vm = self.app.domains[0]
  63. vm.watch_qdb_path('/test-watch-path')
  64. vm.add_handler('domain-qdb-change:/test-watch-path', handler)
  65. self.assertFalse(flag)
  66. vm.untrusted_qdb.write('/test-watch-path', 'test-value')
  67. self.loop.run_until_complete(asyncio.sleep(0.1))
  68. self.assertTrue(flag)
  69. @unittest.skipUnless(
  70. spawn.find_executable('xdotool'), "xdotool not installed")
  71. def test_120_start_standalone_with_cdrom_dom0(self):
  72. vmname = self.make_vm_name('appvm')
  73. self.vm = self.app.add_new_vm('StandaloneVM', label='red', name=vmname)
  74. self.loop.run_until_complete(self.vm.create_on_disk())
  75. self.vm.kernel = None
  76. self.vm.virt_mode = 'hvm'
  77. iso_path = self.create_bootable_iso()
  78. # start the VM using qvm-start tool, to test --cdrom option there
  79. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  80. 'qvm-start', '--cdrom=dom0:' + iso_path, self.vm.name,
  81. stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
  82. (stdout, _) = self.loop.run_until_complete(p.communicate())
  83. self.assertEqual(p.returncode, 0, stdout)
  84. # check if VM do not crash instantly
  85. self.loop.run_until_complete(asyncio.sleep(5))
  86. self.assertTrue(self.vm.is_running())
  87. # Type 'poweroff'
  88. subprocess.check_call(['xdotool', 'search', '--name', self.vm.name,
  89. 'type', 'poweroff\r'])
  90. self.loop.run_until_complete(asyncio.sleep(1))
  91. self.assertFalse(self.vm.is_running())
  92. def _test_200_on_domain_start(self, vm, event, **_kwargs):
  93. '''Simulate domain crash just after startup'''
  94. vm.libvirt_domain.destroy()
  95. def test_200_shutdown_event_race(self):
  96. '''Regression test for 3164'''
  97. vmname = self.make_vm_name('appvm')
  98. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  99. name=vmname, template=self.app.default_template,
  100. label='red')
  101. # help the luck a little - don't wait for qrexec to easier win the race
  102. self.vm.features['qrexec'] = False
  103. self.loop.run_until_complete(self.vm.create_on_disk())
  104. # another way to help the luck a little - make sure the private
  105. # volume is first in (normally unordered) dict - this way if any
  106. # volume action fails, it will be at or after private volume - not
  107. # before (preventing private volume action)
  108. old_volumes = self.vm.volumes
  109. self.vm.volumes = collections.OrderedDict()
  110. self.vm.volumes['private'] = old_volumes.pop('private')
  111. self.vm.volumes.update(old_volumes.items())
  112. del old_volumes
  113. self.loop.run_until_complete(self.vm.start())
  114. # kill it the way it does not give a chance for domain-shutdown it
  115. # execute
  116. self.vm.libvirt_domain.destroy()
  117. # now, lets try to start the VM again, before domain-shutdown event
  118. # got handled (#3164), and immediately trigger second domain-shutdown
  119. self.vm.add_handler('domain-start', self._test_200_on_domain_start)
  120. self.loop.run_until_complete(self.vm.start())
  121. # and give a chance for both domain-shutdown handlers to execute
  122. self.loop.run_until_complete(asyncio.sleep(1))
  123. with self.assertNotRaises(qubes.exc.QubesException):
  124. # if the above caused two domain-shutdown handlers being called
  125. # one after another, private volume is gone
  126. self.loop.run_until_complete(self.vm.storage.verify())
  127. def _test_201_on_domain_pre_start(self, vm, event, **_kwargs):
  128. '''Simulate domain crash just after startup'''
  129. if not self.domain_shutdown_handled and not self.test_failure_reason:
  130. self.test_failure_reason = \
  131. 'domain-shutdown event was not dispatched before subsequent ' \
  132. 'start'
  133. self.domain_shutdown_handled = False
  134. def _test_201_domain_shutdown_handler(self, vm, event, **kwargs):
  135. if self.domain_shutdown_handled and not self.test_failure_reason:
  136. self.test_failure_reason = 'domain-shutdown event received twice'
  137. self.domain_shutdown_handled = True
  138. def test_201_shutdown_event_race(self):
  139. '''Regression test for 3164 - pure events edition'''
  140. vmname = self.make_vm_name('appvm')
  141. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  142. name=vmname, template=self.app.default_template,
  143. label='red')
  144. # help the luck a little - don't wait for qrexec to easier win the race
  145. self.vm.features['qrexec'] = False
  146. self.loop.run_until_complete(self.vm.create_on_disk())
  147. # do not throw exception from inside event handler - test framework
  148. # will not recover from it (various objects leaks)
  149. self.test_failure_reason = None
  150. self.domain_shutdown_handled = False
  151. self.vm.add_handler('domain-shutdown',
  152. self._test_201_domain_shutdown_handler)
  153. self.loop.run_until_complete(self.vm.start())
  154. if self.test_failure_reason:
  155. self.fail(self.test_failure_reason)
  156. self.vm.add_handler('domain-pre-start',
  157. self._test_201_on_domain_pre_start)
  158. # kill it the way it does not give a chance for domain-shutdown it
  159. # execute
  160. self.vm.libvirt_domain.destroy()
  161. # now, lets try to start the VM again, before domain-shutdown event
  162. # got handled (#3164), and immediately trigger second domain-shutdown
  163. self.vm.add_handler('domain-start', self._test_200_on_domain_start)
  164. self.loop.run_until_complete(self.vm.start())
  165. if self.test_failure_reason:
  166. self.fail(self.test_failure_reason)
  167. # and give a chance for both domain-shutdown handlers to execute
  168. self.loop.run_until_complete(asyncio.sleep(1))
  169. if self.test_failure_reason:
  170. self.fail(self.test_failure_reason)
  171. self.assertTrue(self.domain_shutdown_handled,
  172. 'second domain-shutdown event was not dispatched after domain '
  173. 'shutdown')
  174. class TC_01_Properties(qubes.tests.SystemTestCase):
  175. # pylint: disable=attribute-defined-outside-init
  176. def setUp(self):
  177. super(TC_01_Properties, self).setUp()
  178. self.init_default_template()
  179. self.vmname = self.make_vm_name('appvm')
  180. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
  181. template=self.app.default_template,
  182. label='red')
  183. self.loop.run_until_complete(self.vm.create_on_disk())
  184. self.addCleanup(self.cleanup_props)
  185. def cleanup_props(self):
  186. del self.vm
  187. def test_030_clone(self):
  188. try:
  189. testvm1 = self.app.add_new_vm(
  190. qubes.vm.appvm.AppVM,
  191. name=self.make_vm_name("vm"),
  192. template=self.app.default_template,
  193. label='red')
  194. self.loop.run_until_complete(testvm1.create_on_disk())
  195. testvm2 = self.app.add_new_vm(testvm1.__class__,
  196. name=self.make_vm_name("clone"),
  197. template=testvm1.template,
  198. label='red')
  199. testvm2.clone_properties(testvm1)
  200. testvm2.firewall.clone(testvm1.firewall)
  201. self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
  202. self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
  203. self.assertIn('source', testvm1.volumes['root'].config)
  204. self.assertNotEquals(testvm2, None)
  205. self.assertNotEquals(testvm2.volumes, {})
  206. self.assertIn('source', testvm2.volumes['root'].config)
  207. # qubes.xml reload
  208. self.app.save()
  209. testvm1 = self.app.domains[testvm1.qid]
  210. testvm2 = self.app.domains[testvm2.qid]
  211. self.assertEqual(testvm1.label, testvm2.label)
  212. self.assertEqual(testvm1.netvm, testvm2.netvm)
  213. self.assertEqual(testvm1.property_is_default('netvm'),
  214. testvm2.property_is_default('netvm'))
  215. self.assertEqual(testvm1.kernel, testvm2.kernel)
  216. self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
  217. self.assertEqual(testvm1.property_is_default('kernel'),
  218. testvm2.property_is_default('kernel'))
  219. self.assertEqual(testvm1.property_is_default('kernelopts'),
  220. testvm2.property_is_default('kernelopts'))
  221. self.assertEqual(testvm1.memory, testvm2.memory)
  222. self.assertEqual(testvm1.maxmem, testvm2.maxmem)
  223. self.assertEqual(testvm1.devices, testvm2.devices)
  224. self.assertEqual(testvm1.include_in_backups,
  225. testvm2.include_in_backups)
  226. self.assertEqual(testvm1.default_user, testvm2.default_user)
  227. self.assertEqual(testvm1.features, testvm2.features)
  228. self.assertEqual(testvm1.firewall.rules,
  229. testvm2.firewall.rules)
  230. # now some non-default values
  231. testvm1.netvm = None
  232. testvm1.label = 'orange'
  233. testvm1.memory = 512
  234. firewall = testvm1.firewall
  235. firewall.rules = [
  236. qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
  237. proto='tcp', dstports=22)]
  238. firewall.save()
  239. testvm3 = self.app.add_new_vm(testvm1.__class__,
  240. name=self.make_vm_name("clone2"),
  241. template=testvm1.template,
  242. label='red',)
  243. testvm3.clone_properties(testvm1)
  244. testvm3.firewall.clone(testvm1.firewall)
  245. self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
  246. # qubes.xml reload
  247. self.app.save()
  248. testvm1 = self.app.domains[testvm1.qid]
  249. testvm3 = self.app.domains[testvm3.qid]
  250. self.assertEqual(testvm1.label, testvm3.label)
  251. self.assertEqual(testvm1.netvm, testvm3.netvm)
  252. self.assertEqual(testvm1.property_is_default('netvm'),
  253. testvm3.property_is_default('netvm'))
  254. self.assertEqual(testvm1.kernel, testvm3.kernel)
  255. self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
  256. self.assertEqual(testvm1.property_is_default('kernel'),
  257. testvm3.property_is_default('kernel'))
  258. self.assertEqual(testvm1.property_is_default('kernelopts'),
  259. testvm3.property_is_default('kernelopts'))
  260. self.assertEqual(testvm1.memory, testvm3.memory)
  261. self.assertEqual(testvm1.maxmem, testvm3.maxmem)
  262. self.assertEqual(testvm1.devices, testvm3.devices)
  263. self.assertEqual(testvm1.include_in_backups,
  264. testvm3.include_in_backups)
  265. self.assertEqual(testvm1.default_user, testvm3.default_user)
  266. self.assertEqual(testvm1.features, testvm3.features)
  267. self.assertEqual(testvm1.firewall.rules,
  268. testvm3.firewall.rules)
  269. finally:
  270. try:
  271. del firewall
  272. except NameError:
  273. pass
  274. try:
  275. del testvm1
  276. except NameError:
  277. pass
  278. try:
  279. del testvm2
  280. except NameError:
  281. pass
  282. try:
  283. del testvm3
  284. except NameError:
  285. pass
  286. def test_020_name_conflict_app(self):
  287. # TODO decide what exception should be here
  288. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  289. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  290. name=self.vmname, template=self.app.default_template,
  291. label='red')
  292. self.loop.run_until_complete(self.vm2.create_on_disk())
  293. def test_021_name_conflict_template(self):
  294. # TODO decide what exception should be here
  295. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  296. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  297. name=self.vmname, label='red')
  298. self.loop.run_until_complete(self.vm2.create_on_disk())
  299. class TC_02_QvmPrefs(qubes.tests.SystemTestCase):
  300. # pylint: disable=attribute-defined-outside-init
  301. def setUp(self):
  302. super(TC_02_QvmPrefs, self).setUp()
  303. self.init_default_template()
  304. self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
  305. def setup_appvm(self):
  306. self.testvm = self.app.add_new_vm(
  307. qubes.vm.appvm.AppVM,
  308. name=self.make_vm_name("vm"),
  309. label='red')
  310. self.loop.run_until_complete(self.testvm.create_on_disk())
  311. self.app.save()
  312. def setup_hvm(self):
  313. self.testvm = self.app.add_new_vm(
  314. qubes.vm.appvm.AppVM,
  315. name=self.make_vm_name("hvm"),
  316. label='red')
  317. self.testvm.virt_mode = 'hvm'
  318. self.loop.run_until_complete(self.testvm.create_on_disk())
  319. self.app.save()
  320. def pref_set(self, name, value, valid=True):
  321. self.loop.run_until_complete(self._pref_set(name, value, valid))
  322. @asyncio.coroutine
  323. def _pref_set(self, name, value, valid=True):
  324. cmd = ['qvm-prefs']
  325. if value != '-D':
  326. cmd.append('--')
  327. cmd.extend((self.testvm.name, name, value))
  328. p = yield from asyncio.create_subprocess_exec(*cmd,
  329. stdout=subprocess.PIPE,
  330. stderr=subprocess.PIPE)
  331. (stdout, stderr) = yield from p.communicate()
  332. if valid:
  333. self.assertEqual(p.returncode, 0,
  334. "qvm-prefs .. '{}' '{}' failed: {}{}".format(
  335. name, value, stdout, stderr
  336. ))
  337. else:
  338. self.assertNotEquals(p.returncode, 0,
  339. "qvm-prefs should reject value '{}' for "
  340. "property '{}'".format(value, name))
  341. def pref_get(self, name):
  342. self.loop.run_until_complete(self._pref_get(name))
  343. @asyncio.coroutine
  344. def _pref_get(self, name):
  345. p = yield from asyncio.create_subprocess_exec(
  346. 'qvm-prefs', *self.sharedopts, '--', self.testvm.name, name,
  347. stdout=subprocess.PIPE)
  348. (stdout, _) = yield from p.communicate()
  349. self.assertEqual(p.returncode, 0)
  350. return stdout.strip()
  351. bool_test_values = [
  352. ('true', 'True', True),
  353. ('False', 'False', True),
  354. ('0', 'False', True),
  355. ('1', 'True', True),
  356. ('invalid', '', False)
  357. ]
  358. def execute_tests(self, name, values):
  359. """
  360. Helper function, which executes tests for given property.
  361. :param values: list of tuples (value, expected, valid),
  362. where 'value' is what should be set and 'expected' is what should
  363. qvm-prefs returns as a property value and 'valid' marks valid and
  364. invalid values - if it's False, qvm-prefs should reject the value
  365. :return: None
  366. """
  367. for (value, expected, valid) in values:
  368. self.pref_set(name, value, valid)
  369. if valid:
  370. self.assertEqual(self.pref_get(name), expected)
  371. @unittest.skip('test not converted to core3 API')
  372. def test_006_template(self):
  373. templates = [tpl for tpl in self.app.domains.values() if
  374. isinstance(tpl, qubes.vm.templatevm.TemplateVM)]
  375. if not templates:
  376. self.skipTest("No templates installed")
  377. some_template = templates[0].name
  378. self.setup_appvm()
  379. self.execute_tests('template', [
  380. (some_template, some_template, True),
  381. ('invalid', '', False),
  382. ])
  383. @unittest.skip('test not converted to core3 API')
  384. def test_014_pcidevs(self):
  385. self.setup_appvm()
  386. self.execute_tests('pcidevs', [
  387. ('[]', '[]', True),
  388. ('[ "00:00.0" ]', "['00:00.0']", True),
  389. ('invalid', '', False),
  390. ('[invalid]', '', False),
  391. # TODO:
  392. # ('["12:12.0"]', '', False)
  393. ])
  394. @unittest.skip('test not converted to core3 API')
  395. def test_024_pv_reject_hvm_props(self):
  396. self.setup_appvm()
  397. self.execute_tests('guiagent_installed', [('False', '', False)])
  398. self.execute_tests('qrexec_installed', [('False', '', False)])
  399. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  400. self.execute_tests('timezone', [('localtime', '', False)])
  401. @unittest.skip('test not converted to core3 API')
  402. def test_025_hvm_reject_pv_props(self):
  403. self.setup_hvm()
  404. self.execute_tests('kernel', [('default', '', False)])
  405. self.execute_tests('kernelopts', [('default', '', False)])
  406. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
  407. # pylint: disable=attribute-defined-outside-init
  408. def setUp(self):
  409. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  410. self.init_default_template()
  411. def cleanup_template(self):
  412. del self.test_template
  413. def setup_template(self):
  414. self.test_template = self.app.add_new_vm(
  415. qubes.vm.templatevm.TemplateVM,
  416. name=self.make_vm_name("pv-clone"),
  417. label='red'
  418. )
  419. self.addCleanup(self.cleanup_template)
  420. self.test_template.clone_properties(self.app.default_template)
  421. self.test_template.features.update(self.app.default_template.features)
  422. self.test_template.tags.update(self.app.default_template.tags)
  423. self.loop.run_until_complete(
  424. self.test_template.clone_disk_files(self.app.default_template))
  425. self.test_template.volumes['root'].revisions_to_keep = 3
  426. self.app.save()
  427. def get_rootimg_checksum(self):
  428. return subprocess.check_output(
  429. ['sha1sum', self.test_template.volumes['root'].path])
  430. def _do_test(self):
  431. checksum_before = self.get_rootimg_checksum()
  432. self.loop.run_until_complete(self.test_template.start())
  433. self.shutdown_and_wait(self.test_template)
  434. checksum_changed = self.get_rootimg_checksum()
  435. if checksum_before == checksum_changed:
  436. self.log.warning("template not modified, test result will be "
  437. "unreliable")
  438. self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
  439. revert_cmd = ['qvm-volume', 'revert', self.test_template.name + ':root']
  440. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  441. *revert_cmd))
  442. self.loop.run_until_complete(p.wait())
  443. self.assertEqual(p.returncode, 0)
  444. del p
  445. checksum_after = self.get_rootimg_checksum()
  446. self.assertEqual(checksum_before, checksum_after)
  447. def test_000_revert_linux(self):
  448. """
  449. Test qvm-revert-template-changes for PV template
  450. """
  451. self.setup_template()
  452. self._do_test()
  453. @unittest.skip('TODO: some non-linux system')
  454. def test_001_revert_non_linux(self):
  455. """
  456. Test qvm-revert-template-changes for HVM template
  457. """
  458. # TODO: have some system there, so the root.img will get modified
  459. self.setup_template()
  460. self._do_test()
  461. class TC_30_Gui_daemon(qubes.tests.SystemTestCase):
  462. def setUp(self):
  463. super(TC_30_Gui_daemon, self).setUp()
  464. self.init_default_template()
  465. @unittest.skipUnless(
  466. spawn.find_executable('xdotool'),
  467. "xdotool not installed")
  468. def test_000_clipboard(self):
  469. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  470. name=self.make_vm_name('vm1'), label='red')
  471. self.loop.run_until_complete(testvm1.create_on_disk())
  472. testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  473. name=self.make_vm_name('vm2'), label='red')
  474. self.loop.run_until_complete(testvm2.create_on_disk())
  475. self.app.save()
  476. self.loop.run_until_complete(asyncio.wait([
  477. testvm1.start(),
  478. testvm2.start()]))
  479. self.loop.run_until_complete(asyncio.wait([
  480. self.wait_for_session(testvm1),
  481. self.wait_for_session(testvm2)]))
  482. window_title = 'user@{}'.format(testvm1.name)
  483. self.loop.run_until_complete(testvm1.run(
  484. 'zenity --text-info --editable --title={}'.format(window_title)))
  485. self.wait_for_window(window_title)
  486. time.sleep(0.5)
  487. test_string = "test{}".format(testvm1.xid)
  488. # Type and copy some text
  489. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  490. 'windowactivate', '--sync',
  491. 'type', test_string])
  492. # second xdotool call because type --terminator do not work (SEGV)
  493. # additionally do not use search here, so window stack will be empty
  494. # and xdotool will use XTEST instead of generating events manually -
  495. # this will be much better - at least because events will have
  496. # correct timestamp (so gui-daemon would not drop the copy request)
  497. subprocess.check_call(['xdotool',
  498. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  499. 'Escape'])
  500. clipboard_content = \
  501. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  502. self.assertEqual(clipboard_content, test_string,
  503. "Clipboard copy operation failed - content")
  504. clipboard_source = \
  505. open('/var/run/qubes/qubes-clipboard.bin.source',
  506. 'r').read().strip()
  507. self.assertEqual(clipboard_source, testvm1.name,
  508. "Clipboard copy operation failed - owner")
  509. # Then paste it to the other window
  510. window_title = 'user@{}'.format(testvm2.name)
  511. p = self.loop.run_until_complete(testvm2.run(
  512. 'zenity --entry --title={} > /tmp/test.txt'.format(window_title)))
  513. self.wait_for_window(window_title)
  514. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  515. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  516. self.loop.run_until_complete(p.wait())
  517. # And compare the result
  518. (test_output, _) = self.loop.run_until_complete(
  519. testvm2.run_for_stdio('cat /tmp/test.txt'))
  520. self.assertEqual(test_string, test_output.strip().decode('ascii'))
  521. clipboard_content = \
  522. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  523. self.assertEqual(clipboard_content, "",
  524. "Clipboard not wiped after paste - content")
  525. clipboard_source = \
  526. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
  527. read().strip()
  528. self.assertEqual(clipboard_source, "",
  529. "Clipboard not wiped after paste - owner")
  530. class TC_05_StandaloneVMMixin(object):
  531. def setUp(self):
  532. super(TC_05_StandaloneVMMixin, self).setUp()
  533. self.init_default_template(self.template)
  534. def test_000_create_start(self):
  535. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  536. name=self.make_vm_name('vm1'), label='red')
  537. self.testvm1.features.update(self.app.default_template.features)
  538. self.loop.run_until_complete(
  539. self.testvm1.clone_disk_files(self.app.default_template))
  540. self.app.save()
  541. self.loop.run_until_complete(self.testvm1.start())
  542. self.assertEqual(self.testvm1.get_power_state(), "Running")
  543. def test_100_resize_root_img(self):
  544. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  545. name=self.make_vm_name('vm1'), label='red')
  546. self.testvm1.features.update(self.app.default_template.features)
  547. self.loop.run_until_complete(
  548. self.testvm1.clone_disk_files(self.app.default_template))
  549. self.app.save()
  550. try:
  551. self.loop.run_until_complete(
  552. self.testvm1.storage.resize(self.testvm1.volumes['root'],
  553. 20 * 1024 ** 3))
  554. except (subprocess.CalledProcessError,
  555. qubes.storage.StoragePoolException) as e:
  556. # exception object would leak VM reference
  557. self.fail(str(e))
  558. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  559. self.loop.run_until_complete(self.testvm1.start())
  560. # new_size in 1k-blocks
  561. (new_size, _) = self.loop.run_until_complete(
  562. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  563. # some safety margin for FS metadata
  564. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  565. def test_101_resize_root_img_online(self):
  566. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  567. name=self.make_vm_name('vm1'), label='red')
  568. self.testvm1.features['qrexec'] = True
  569. self.loop.run_until_complete(
  570. self.testvm1.clone_disk_files(self.app.default_template))
  571. self.testvm1.features.update(self.app.default_template.features)
  572. self.app.save()
  573. self.loop.run_until_complete(self.testvm1.start())
  574. try:
  575. self.loop.run_until_complete(
  576. self.testvm1.storage.resize(self.testvm1.volumes['root'],
  577. 20 * 1024 ** 3))
  578. except (subprocess.CalledProcessError,
  579. qubes.storage.StoragePoolException) as e:
  580. # exception object would leak VM reference
  581. self.fail(str(e))
  582. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  583. # new_size in 1k-blocks
  584. (new_size, _) = self.loop.run_until_complete(
  585. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  586. # some safety margin for FS metadata
  587. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  588. class TC_06_AppVMMixin(object):
  589. template = None
  590. def setUp(self):
  591. super(TC_06_AppVMMixin, self).setUp()
  592. self.init_default_template(self.template)
  593. @unittest.skipUnless(
  594. spawn.find_executable('xdotool'), "xdotool not installed")
  595. def test_121_start_standalone_with_cdrom_vm(self):
  596. cdrom_vmname = self.make_vm_name('cdrom')
  597. self.cdrom_vm = self.app.add_new_vm('AppVM', label='red',
  598. name=cdrom_vmname)
  599. self.loop.run_until_complete(self.cdrom_vm.create_on_disk())
  600. self.loop.run_until_complete(self.cdrom_vm.start())
  601. iso_path = self.create_bootable_iso()
  602. with open(iso_path, 'rb') as iso_f:
  603. self.loop.run_until_complete(
  604. self.cdrom_vm.run_for_stdio('cat > /home/user/boot.iso',
  605. stdin=iso_f))
  606. vmname = self.make_vm_name('appvm')
  607. self.vm = self.app.add_new_vm('StandaloneVM', label='red', name=vmname)
  608. self.loop.run_until_complete(self.vm.create_on_disk())
  609. self.vm.kernel = None
  610. self.vm.virt_mode = 'hvm'
  611. # start the VM using qvm-start tool, to test --cdrom option there
  612. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  613. 'qvm-start', '--cdrom=' + cdrom_vmname + ':/home/user/boot.iso',
  614. self.vm.name,
  615. stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
  616. (stdout, _) = self.loop.run_until_complete(p.communicate())
  617. self.assertEqual(p.returncode, 0, stdout)
  618. # check if VM do not crash instantly
  619. self.loop.run_until_complete(asyncio.sleep(5))
  620. self.assertTrue(self.vm.is_running())
  621. # Type 'poweroff'
  622. subprocess.check_call(['xdotool', 'search', '--name', self.vm.name,
  623. 'type', 'poweroff\r'])
  624. self.loop.run_until_complete(asyncio.sleep(1))
  625. self.assertFalse(self.vm.is_running())
  626. def load_tests(loader, tests, pattern):
  627. for template in qubes.tests.list_templates():
  628. tests.addTests(loader.loadTestsFromTestCase(
  629. type(
  630. 'TC_05_StandaloneVM_' + template,
  631. (TC_05_StandaloneVMMixin, qubes.tests.SystemTestCase),
  632. {'template': template})))
  633. tests.addTests(loader.loadTestsFromTestCase(
  634. type(
  635. 'TC_06_AppVM_' + template,
  636. (TC_06_AppVMMixin, qubes.tests.SystemTestCase),
  637. {'template': template})))
  638. return tests
  639. # vim: ts=4 sw=4 et