basic.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. # pylint: disable=invalid-name
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2014-2015
  6. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. #
  9. # This program is free software; you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by
  11. # the Free Software Foundation; either version 2 of the License, or
  12. # (at your option) any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License along
  20. # with this program; if not, write to the Free Software Foundation, Inc.,
  21. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  22. #
  23. from distutils import spawn
  24. import asyncio
  25. import os
  26. import subprocess
  27. import tempfile
  28. import time
  29. import unittest
  30. import qubes
  31. import qubes.firewall
  32. import qubes.tests
  33. import qubes.vm.appvm
  34. import qubes.vm.qubesvm
  35. import qubes.vm.standalonevm
  36. import qubes.vm.templatevm
  37. import libvirt # pylint: disable=import-error
  38. class TC_00_Basic(qubes.tests.SystemTestCase):
  39. def setUp(self):
  40. super(TC_00_Basic, self).setUp()
  41. self.init_default_template()
  42. def test_000_qubes_create(self):
  43. self.assertIsInstance(self.app, qubes.Qubes)
  44. def test_100_qvm_create(self):
  45. vmname = self.make_vm_name('appvm')
  46. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  47. name=vmname, template=self.app.default_template,
  48. label='red')
  49. self.assertIsNotNone(vm)
  50. self.assertEqual(vm.name, vmname)
  51. self.assertEqual(vm.template, self.app.default_template)
  52. self.loop.run_until_complete(vm.create_on_disk())
  53. with self.assertNotRaises(qubes.exc.QubesException):
  54. self.loop.run_until_complete(vm.storage.verify())
  55. def test_040_qdb_watch(self):
  56. flag = set()
  57. def handler(vm, event, path):
  58. if path == '/test-watch-path':
  59. flag.add(True)
  60. vm = self.app.domains[0]
  61. vm.watch_qdb_path('/test-watch-path')
  62. vm.add_handler('domain-qdb-change:/test-watch-path', handler)
  63. self.assertFalse(flag)
  64. vm.untrusted_qdb.write('/test-watch-path', 'test-value')
  65. self.loop.run_until_complete(asyncio.sleep(0.1))
  66. self.assertTrue(flag)
  67. class TC_01_Properties(qubes.tests.SystemTestCase):
  68. # pylint: disable=attribute-defined-outside-init
  69. def setUp(self):
  70. super(TC_01_Properties, self).setUp()
  71. self.init_default_template()
  72. self.vmname = self.make_vm_name('appvm')
  73. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
  74. template=self.app.default_template,
  75. label='red')
  76. self.loop.run_until_complete(self.vm.create_on_disk())
  77. self.addCleanup(self.cleanup_props)
  78. def cleanup_props(self):
  79. del self.vm
  80. def test_030_clone(self):
  81. try:
  82. testvm1 = self.app.add_new_vm(
  83. qubes.vm.appvm.AppVM,
  84. name=self.make_vm_name("vm"),
  85. template=self.app.default_template,
  86. label='red')
  87. self.loop.run_until_complete(testvm1.create_on_disk())
  88. testvm2 = self.app.add_new_vm(testvm1.__class__,
  89. name=self.make_vm_name("clone"),
  90. template=testvm1.template,
  91. label='red')
  92. testvm2.clone_properties(testvm1)
  93. testvm2.firewall.clone(testvm1.firewall)
  94. self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
  95. self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
  96. self.assertIn('source', testvm1.volumes['root'].config)
  97. self.assertNotEquals(testvm2, None)
  98. self.assertNotEquals(testvm2.volumes, {})
  99. self.assertIn('source', testvm2.volumes['root'].config)
  100. # qubes.xml reload
  101. self.app.save()
  102. testvm1 = self.app.domains[testvm1.qid]
  103. testvm2 = self.app.domains[testvm2.qid]
  104. self.assertEqual(testvm1.label, testvm2.label)
  105. self.assertEqual(testvm1.netvm, testvm2.netvm)
  106. self.assertEqual(testvm1.property_is_default('netvm'),
  107. testvm2.property_is_default('netvm'))
  108. self.assertEqual(testvm1.kernel, testvm2.kernel)
  109. self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
  110. self.assertEqual(testvm1.property_is_default('kernel'),
  111. testvm2.property_is_default('kernel'))
  112. self.assertEqual(testvm1.property_is_default('kernelopts'),
  113. testvm2.property_is_default('kernelopts'))
  114. self.assertEqual(testvm1.memory, testvm2.memory)
  115. self.assertEqual(testvm1.maxmem, testvm2.maxmem)
  116. self.assertEqual(testvm1.devices, testvm2.devices)
  117. self.assertEqual(testvm1.include_in_backups,
  118. testvm2.include_in_backups)
  119. self.assertEqual(testvm1.default_user, testvm2.default_user)
  120. self.assertEqual(testvm1.features, testvm2.features)
  121. self.assertEqual(testvm1.firewall.rules,
  122. testvm2.firewall.rules)
  123. # now some non-default values
  124. testvm1.netvm = None
  125. testvm1.label = 'orange'
  126. testvm1.memory = 512
  127. firewall = testvm1.firewall
  128. firewall.rules = [
  129. qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
  130. proto='tcp', dstports=22)]
  131. firewall.save()
  132. testvm3 = self.app.add_new_vm(testvm1.__class__,
  133. name=self.make_vm_name("clone2"),
  134. template=testvm1.template,
  135. label='red',)
  136. testvm3.clone_properties(testvm1)
  137. testvm3.firewall.clone(testvm1.firewall)
  138. self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
  139. # qubes.xml reload
  140. self.app.save()
  141. testvm1 = self.app.domains[testvm1.qid]
  142. testvm3 = self.app.domains[testvm3.qid]
  143. self.assertEqual(testvm1.label, testvm3.label)
  144. self.assertEqual(testvm1.netvm, testvm3.netvm)
  145. self.assertEqual(testvm1.property_is_default('netvm'),
  146. testvm3.property_is_default('netvm'))
  147. self.assertEqual(testvm1.kernel, testvm3.kernel)
  148. self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
  149. self.assertEqual(testvm1.property_is_default('kernel'),
  150. testvm3.property_is_default('kernel'))
  151. self.assertEqual(testvm1.property_is_default('kernelopts'),
  152. testvm3.property_is_default('kernelopts'))
  153. self.assertEqual(testvm1.memory, testvm3.memory)
  154. self.assertEqual(testvm1.maxmem, testvm3.maxmem)
  155. self.assertEqual(testvm1.devices, testvm3.devices)
  156. self.assertEqual(testvm1.include_in_backups,
  157. testvm3.include_in_backups)
  158. self.assertEqual(testvm1.default_user, testvm3.default_user)
  159. self.assertEqual(testvm1.features, testvm3.features)
  160. self.assertEqual(testvm1.firewall.rules,
  161. testvm3.firewall.rules)
  162. finally:
  163. try:
  164. del firewall
  165. except NameError:
  166. pass
  167. try:
  168. del testvm1
  169. except NameError:
  170. pass
  171. try:
  172. del testvm2
  173. except NameError:
  174. pass
  175. try:
  176. del testvm3
  177. except NameError:
  178. pass
  179. def test_020_name_conflict_app(self):
  180. # TODO decide what exception should be here
  181. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  182. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  183. name=self.vmname, template=self.app.default_template,
  184. label='red')
  185. self.loop.run_until_complete(self.vm2.create_on_disk())
  186. def test_021_name_conflict_template(self):
  187. # TODO decide what exception should be here
  188. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  189. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  190. name=self.vmname, label='red')
  191. self.loop.run_until_complete(self.vm2.create_on_disk())
  192. class TC_02_QvmPrefs(qubes.tests.SystemTestCase):
  193. # pylint: disable=attribute-defined-outside-init
  194. def setUp(self):
  195. super(TC_02_QvmPrefs, self).setUp()
  196. self.init_default_template()
  197. self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
  198. def setup_appvm(self):
  199. self.testvm = self.app.add_new_vm(
  200. qubes.vm.appvm.AppVM,
  201. name=self.make_vm_name("vm"),
  202. label='red')
  203. self.loop.run_until_complete(self.testvm.create_on_disk())
  204. self.app.save()
  205. def setup_hvm(self):
  206. self.testvm = self.app.add_new_vm(
  207. qubes.vm.appvm.AppVM,
  208. name=self.make_vm_name("hvm"),
  209. label='red')
  210. self.testvm.virt_mode = 'hvm'
  211. self.loop.run_until_complete(self.testvm.create_on_disk())
  212. self.app.save()
  213. def pref_set(self, name, value, valid=True):
  214. self.loop.run_until_complete(self._pref_set(name, value, valid))
  215. @asyncio.coroutine
  216. def _pref_set(self, name, value, valid=True):
  217. cmd = ['qvm-prefs']
  218. if value != '-D':
  219. cmd.append('--')
  220. cmd.extend((self.testvm.name, name, value))
  221. p = yield from asyncio.create_subprocess_exec(*cmd,
  222. stdout=subprocess.PIPE,
  223. stderr=subprocess.PIPE)
  224. (stdout, stderr) = yield from p.communicate()
  225. if valid:
  226. self.assertEqual(p.returncode, 0,
  227. "qvm-prefs .. '{}' '{}' failed: {}{}".format(
  228. name, value, stdout, stderr
  229. ))
  230. else:
  231. self.assertNotEquals(p.returncode, 0,
  232. "qvm-prefs should reject value '{}' for "
  233. "property '{}'".format(value, name))
  234. def pref_get(self, name):
  235. self.loop.run_until_complete(self._pref_get(name))
  236. @asyncio.coroutine
  237. def _pref_get(self, name):
  238. p = yield from asyncio.create_subprocess_exec(
  239. 'qvm-prefs', *self.sharedopts, '--', self.testvm.name, name,
  240. stdout=subprocess.PIPE)
  241. (stdout, _) = yield from p.communicate()
  242. self.assertEqual(p.returncode, 0)
  243. return stdout.strip()
  244. bool_test_values = [
  245. ('true', 'True', True),
  246. ('False', 'False', True),
  247. ('0', 'False', True),
  248. ('1', 'True', True),
  249. ('invalid', '', False)
  250. ]
  251. def execute_tests(self, name, values):
  252. """
  253. Helper function, which executes tests for given property.
  254. :param values: list of tuples (value, expected, valid),
  255. where 'value' is what should be set and 'expected' is what should
  256. qvm-prefs returns as a property value and 'valid' marks valid and
  257. invalid values - if it's False, qvm-prefs should reject the value
  258. :return: None
  259. """
  260. for (value, expected, valid) in values:
  261. self.pref_set(name, value, valid)
  262. if valid:
  263. self.assertEqual(self.pref_get(name), expected)
  264. @unittest.skip('test not converted to core3 API')
  265. def test_006_template(self):
  266. templates = [tpl for tpl in self.app.domains.values() if
  267. isinstance(tpl, qubes.vm.templatevm.TemplateVM)]
  268. if not templates:
  269. self.skipTest("No templates installed")
  270. some_template = templates[0].name
  271. self.setup_appvm()
  272. self.execute_tests('template', [
  273. (some_template, some_template, True),
  274. ('invalid', '', False),
  275. ])
  276. @unittest.skip('test not converted to core3 API')
  277. def test_014_pcidevs(self):
  278. self.setup_appvm()
  279. self.execute_tests('pcidevs', [
  280. ('[]', '[]', True),
  281. ('[ "00:00.0" ]', "['00:00.0']", True),
  282. ('invalid', '', False),
  283. ('[invalid]', '', False),
  284. # TODO:
  285. # ('["12:12.0"]', '', False)
  286. ])
  287. @unittest.skip('test not converted to core3 API')
  288. def test_024_pv_reject_hvm_props(self):
  289. self.setup_appvm()
  290. self.execute_tests('guiagent_installed', [('False', '', False)])
  291. self.execute_tests('qrexec_installed', [('False', '', False)])
  292. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  293. self.execute_tests('timezone', [('localtime', '', False)])
  294. @unittest.skip('test not converted to core3 API')
  295. def test_025_hvm_reject_pv_props(self):
  296. self.setup_hvm()
  297. self.execute_tests('kernel', [('default', '', False)])
  298. self.execute_tests('kernelopts', [('default', '', False)])
  299. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
  300. # pylint: disable=attribute-defined-outside-init
  301. def setUp(self):
  302. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  303. self.init_default_template()
  304. def cleanup_template(self):
  305. del self.test_template
  306. def setup_template(self):
  307. self.test_template = self.app.add_new_vm(
  308. qubes.vm.templatevm.TemplateVM,
  309. name=self.make_vm_name("pv-clone"),
  310. label='red'
  311. )
  312. self.addCleanup(self.cleanup_template)
  313. self.test_template.clone_properties(self.app.default_template)
  314. self.test_template.features.update(self.app.default_template.features)
  315. self.test_template.tags.update(self.app.default_template.tags)
  316. self.loop.run_until_complete(
  317. self.test_template.clone_disk_files(self.app.default_template))
  318. self.test_template.volumes['root'].revisions_to_keep = 3
  319. self.app.save()
  320. def get_rootimg_checksum(self):
  321. return subprocess.check_output(
  322. ['sha1sum', self.test_template.volumes['root'].path])
  323. def _do_test(self):
  324. checksum_before = self.get_rootimg_checksum()
  325. self.loop.run_until_complete(self.test_template.start())
  326. self.shutdown_and_wait(self.test_template)
  327. checksum_changed = self.get_rootimg_checksum()
  328. if checksum_before == checksum_changed:
  329. self.log.warning("template not modified, test result will be "
  330. "unreliable")
  331. self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
  332. revert_cmd = ['qvm-volume', 'revert', self.test_template.name + ':root']
  333. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  334. *revert_cmd))
  335. self.loop.run_until_complete(p.wait())
  336. self.assertEqual(p.returncode, 0)
  337. del p
  338. checksum_after = self.get_rootimg_checksum()
  339. self.assertEqual(checksum_before, checksum_after)
  340. def test_000_revert_linux(self):
  341. """
  342. Test qvm-revert-template-changes for PV template
  343. """
  344. self.setup_template()
  345. self._do_test()
  346. @unittest.skip('TODO: some non-linux system')
  347. def test_001_revert_non_linux(self):
  348. """
  349. Test qvm-revert-template-changes for HVM template
  350. """
  351. # TODO: have some system there, so the root.img will get modified
  352. self.setup_template()
  353. self._do_test()
  354. class TC_30_Gui_daemon(qubes.tests.SystemTestCase):
  355. def setUp(self):
  356. super(TC_30_Gui_daemon, self).setUp()
  357. self.init_default_template()
  358. @unittest.skipUnless(
  359. spawn.find_executable('xdotool'),
  360. "xdotool not installed")
  361. def test_000_clipboard(self):
  362. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  363. name=self.make_vm_name('vm1'), label='red')
  364. self.loop.run_until_complete(testvm1.create_on_disk())
  365. testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  366. name=self.make_vm_name('vm2'), label='red')
  367. self.loop.run_until_complete(testvm2.create_on_disk())
  368. self.app.save()
  369. self.loop.run_until_complete(asyncio.wait([
  370. testvm1.start(),
  371. testvm2.start()]))
  372. self.loop.run_until_complete(asyncio.wait([
  373. self.wait_for_session(testvm1),
  374. self.wait_for_session(testvm2)]))
  375. window_title = 'user@{}'.format(testvm1.name)
  376. self.loop.run_until_complete(testvm1.run(
  377. 'zenity --text-info --editable --title={}'.format(window_title)))
  378. self.wait_for_window(window_title)
  379. time.sleep(0.5)
  380. test_string = "test{}".format(testvm1.xid)
  381. # Type and copy some text
  382. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  383. 'windowactivate', '--sync',
  384. 'type', test_string])
  385. # second xdotool call because type --terminator do not work (SEGV)
  386. # additionally do not use search here, so window stack will be empty
  387. # and xdotool will use XTEST instead of generating events manually -
  388. # this will be much better - at least because events will have
  389. # correct timestamp (so gui-daemon would not drop the copy request)
  390. subprocess.check_call(['xdotool',
  391. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  392. 'Escape'])
  393. clipboard_content = \
  394. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  395. self.assertEqual(clipboard_content, test_string,
  396. "Clipboard copy operation failed - content")
  397. clipboard_source = \
  398. open('/var/run/qubes/qubes-clipboard.bin.source',
  399. 'r').read().strip()
  400. self.assertEqual(clipboard_source, testvm1.name,
  401. "Clipboard copy operation failed - owner")
  402. # Then paste it to the other window
  403. window_title = 'user@{}'.format(testvm2.name)
  404. p = self.loop.run_until_complete(testvm2.run(
  405. 'zenity --entry --title={} > /tmp/test.txt'.format(window_title)))
  406. self.wait_for_window(window_title)
  407. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  408. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  409. self.loop.run_until_complete(p.wait())
  410. # And compare the result
  411. (test_output, _) = self.loop.run_until_complete(
  412. testvm2.run_for_stdio('cat /tmp/test.txt'))
  413. self.assertEqual(test_string, test_output.strip().decode('ascii'))
  414. clipboard_content = \
  415. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  416. self.assertEqual(clipboard_content, "",
  417. "Clipboard not wiped after paste - content")
  418. clipboard_source = \
  419. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
  420. read().strip()
  421. self.assertEqual(clipboard_source, "",
  422. "Clipboard not wiped after paste - owner")
  423. class TC_05_StandaloneVM(qubes.tests.SystemTestCase):
  424. def setUp(self):
  425. super(TC_05_StandaloneVM, self).setUp()
  426. self.init_default_template()
  427. def test_000_create_start(self):
  428. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  429. name=self.make_vm_name('vm1'), label='red')
  430. self.testvm1.features['qrexec'] = True
  431. self.loop.run_until_complete(
  432. self.testvm1.clone_disk_files(self.app.default_template))
  433. self.app.save()
  434. self.loop.run_until_complete(self.testvm1.start())
  435. self.assertEqual(self.testvm1.get_power_state(), "Running")
  436. def test_100_resize_root_img(self):
  437. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  438. name=self.make_vm_name('vm1'), label='red')
  439. self.testvm1.features['qrexec'] = True
  440. self.loop.run_until_complete(
  441. self.testvm1.clone_disk_files(self.app.default_template))
  442. self.app.save()
  443. self.loop.run_until_complete(
  444. self.testvm1.storage.resize(self.testvm1.volumes['root'], 20 * 1024 ** 3))
  445. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  446. self.loop.run_until_complete(self.testvm1.start())
  447. # new_size in 1k-blocks
  448. (new_size, _) = self.loop.run_until_complete(
  449. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  450. # some safety margin for FS metadata
  451. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  452. # vim: ts=4 sw=4 et