basic.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. # pylint: disable=invalid-name
  4. #
  5. # The Qubes OS Project, https://www.qubes-os.org/
  6. #
  7. # Copyright (C) 2014-2015
  8. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  9. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  10. #
  11. # This program is free software; you can redistribute it and/or modify
  12. # it under the terms of the GNU General Public License as published by
  13. # the Free Software Foundation; either version 2 of the License, or
  14. # (at your option) any later version.
  15. #
  16. # This program is distributed in the hope that it will be useful,
  17. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  19. # GNU General Public License for more details.
  20. #
  21. # You should have received a copy of the GNU General Public License along
  22. # with this program; if not, write to the Free Software Foundation, Inc.,
  23. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  24. #
  25. from distutils import spawn
  26. import os
  27. import subprocess
  28. import tempfile
  29. import time
  30. import unittest
  31. import qubes
  32. import qubes.firewall
  33. import qubes.tests
  34. import qubes.vm.appvm
  35. import qubes.vm.qubesvm
  36. import qubes.vm.standalonevm
  37. import qubes.vm.templatevm
  38. import libvirt # pylint: disable=import-error
  39. class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  40. def setUp(self):
  41. super(TC_00_Basic, self).setUp()
  42. self.init_default_template()
  43. def test_000_qubes_create(self):
  44. self.assertIsInstance(self.app, qubes.Qubes)
  45. def test_100_qvm_create(self):
  46. vmname = self.make_vm_name('appvm')
  47. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  48. name=vmname, template=self.app.default_template,
  49. label='red')
  50. self.assertIsNotNone(vm)
  51. self.assertEqual(vm.name, vmname)
  52. self.assertEqual(vm.template, self.app.default_template)
  53. vm.create_on_disk()
  54. with self.assertNotRaises(qubes.exc.QubesException):
  55. vm.storage.verify()
  56. class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  57. # pylint: disable=attribute-defined-outside-init
  58. def setUp(self):
  59. super(TC_01_Properties, self).setUp()
  60. self.init_default_template()
  61. self.vmname = self.make_vm_name('appvm')
  62. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
  63. template=self.app.default_template,
  64. label='red')
  65. self.vm.create_on_disk()
  66. def save_and_reload_db(self):
  67. super(TC_01_Properties, self).save_and_reload_db()
  68. if hasattr(self, 'vm'):
  69. self.vm = self.app.domains[self.vm.qid]
  70. if hasattr(self, 'netvm'):
  71. self.netvm = self.app.domains[self.netvm.qid]
  72. def test_000_rename(self):
  73. newname = self.make_vm_name('newname')
  74. self.assertEqual(self.vm.name, self.vmname)
  75. self.vm.firewall.policy = 'drop'
  76. self.vm.firewall.rules = [
  77. qubes.firewall.Rule(None, action='accept', specialtarget='dns')
  78. ]
  79. self.vm.firewall.save()
  80. self.vm.autostart = True
  81. self.addCleanup(os.system,
  82. 'sudo systemctl -q disable qubes-vm@{}.service || :'.
  83. format(self.vmname))
  84. pre_rename_firewall = self.vm.firewall.rules
  85. with self.assertNotRaises(
  86. (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
  87. self.vm.name = newname
  88. self.assertEqual(self.vm.name, newname)
  89. self.assertEqual(self.vm.dir_path,
  90. os.path.join(
  91. qubes.config.system_path['qubes_base_dir'],
  92. qubes.config.system_path['qubes_appvms_dir'], newname))
  93. self.assertTrue(os.path.exists(
  94. os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory")))
  95. # FIXME: set whitelisted-appmenus.list first
  96. self.assertTrue(os.path.exists(os.path.join(
  97. self.vm.dir_path, "apps", newname + "-firefox.desktop")))
  98. self.assertTrue(os.path.exists(
  99. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  100. newname + "-vm.directory")))
  101. self.assertTrue(os.path.exists(
  102. os.path.join(os.getenv("HOME"), ".local/share/applications",
  103. newname + "-firefox.desktop")))
  104. self.assertFalse(os.path.exists(
  105. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  106. self.vmname + "-vm.directory")))
  107. self.assertFalse(os.path.exists(
  108. os.path.join(os.getenv("HOME"), ".local/share/applications",
  109. self.vmname + "-firefox.desktop")))
  110. self.vm.firewall.load()
  111. self.assertEquals(pre_rename_firewall, self.vm.firewall.rules)
  112. with self.assertNotRaises((qubes.exc.QubesException, OSError)):
  113. self.vm.firewall.save()
  114. self.assertTrue(self.vm.autostart)
  115. self.assertTrue(os.path.exists(
  116. '/etc/systemd/system/multi-user.target.wants/'
  117. 'qubes-vm@{}.service'.format(newname)))
  118. self.assertFalse(os.path.exists(
  119. '/etc/systemd/system/multi-user.target.wants/'
  120. 'qubes-vm@{}.service'.format(self.vmname)))
  121. def test_001_rename_libvirt_undefined(self):
  122. self.vm.libvirt_domain.undefine()
  123. self.vm._libvirt_domain = None # pylint: disable=protected-access
  124. newname = self.make_vm_name('newname')
  125. with self.assertNotRaises(
  126. (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
  127. self.vm.name = newname
  128. def test_030_clone(self):
  129. testvm1 = self.app.add_new_vm(
  130. qubes.vm.appvm.AppVM,
  131. name=self.make_vm_name("vm"),
  132. template=self.app.default_template,
  133. label='red')
  134. testvm1.create_on_disk()
  135. testvm2 = self.app.add_new_vm(testvm1.__class__,
  136. name=self.make_vm_name("clone"),
  137. template=testvm1.template,
  138. label='red')
  139. testvm2.clone_properties(testvm1)
  140. testvm2.clone_disk_files(testvm1)
  141. self.assertTrue(testvm1.storage.verify())
  142. self.assertIn('source', testvm1.volumes['root'].config)
  143. self.assertNotEquals(testvm2, None)
  144. self.assertNotEquals(testvm2.volumes, {})
  145. self.assertIn('source', testvm2.volumes['root'].config)
  146. # qubes.xml reload
  147. self.save_and_reload_db()
  148. testvm1 = self.app.domains[testvm1.qid]
  149. testvm2 = self.app.domains[testvm2.qid]
  150. self.assertEquals(testvm1.label, testvm2.label)
  151. self.assertEquals(testvm1.netvm, testvm2.netvm)
  152. self.assertEquals(testvm1.property_is_default('netvm'),
  153. testvm2.property_is_default('netvm'))
  154. self.assertEquals(testvm1.kernel, testvm2.kernel)
  155. self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
  156. self.assertEquals(testvm1.property_is_default('kernel'),
  157. testvm2.property_is_default('kernel'))
  158. self.assertEquals(testvm1.property_is_default('kernelopts'),
  159. testvm2.property_is_default('kernelopts'))
  160. self.assertEquals(testvm1.memory, testvm2.memory)
  161. self.assertEquals(testvm1.maxmem, testvm2.maxmem)
  162. self.assertEquals(testvm1.devices, testvm2.devices)
  163. self.assertEquals(testvm1.include_in_backups,
  164. testvm2.include_in_backups)
  165. self.assertEquals(testvm1.default_user, testvm2.default_user)
  166. self.assertEquals(testvm1.features, testvm2.features)
  167. self.assertEquals(testvm1.firewall.rules,
  168. testvm2.firewall.rules)
  169. # now some non-default values
  170. testvm1.netvm = None
  171. testvm1.label = 'orange'
  172. testvm1.memory = 512
  173. firewall = testvm1.firewall
  174. firewall.policy = 'drop'
  175. firewall.rules = [
  176. qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
  177. proto='tcp', dstports=22)]
  178. firewall.save()
  179. testvm3 = self.app.add_new_vm(testvm1.__class__,
  180. name=self.make_vm_name("clone2"),
  181. template=testvm1.template,
  182. label='red',)
  183. testvm3.clone_properties(testvm1)
  184. testvm3.clone_disk_files(testvm1)
  185. # qubes.xml reload
  186. self.save_and_reload_db()
  187. testvm1 = self.app.domains[testvm1.qid]
  188. testvm3 = self.app.domains[testvm3.qid]
  189. self.assertEquals(testvm1.label, testvm3.label)
  190. self.assertEquals(testvm1.netvm, testvm3.netvm)
  191. self.assertEquals(testvm1.property_is_default('netvm'),
  192. testvm3.property_is_default('netvm'))
  193. self.assertEquals(testvm1.kernel, testvm3.kernel)
  194. self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
  195. self.assertEquals(testvm1.property_is_default('kernel'),
  196. testvm3.property_is_default('kernel'))
  197. self.assertEquals(testvm1.property_is_default('kernelopts'),
  198. testvm3.property_is_default('kernelopts'))
  199. self.assertEquals(testvm1.memory, testvm3.memory)
  200. self.assertEquals(testvm1.maxmem, testvm3.maxmem)
  201. self.assertEquals(testvm1.devices, testvm3.devices)
  202. self.assertEquals(testvm1.include_in_backups,
  203. testvm3.include_in_backups)
  204. self.assertEquals(testvm1.default_user, testvm3.default_user)
  205. self.assertEquals(testvm1.features, testvm3.features)
  206. self.assertEquals(testvm1.firewall.rules,
  207. testvm2.firewall.rules)
  208. def test_020_name_conflict_app(self):
  209. # TODO decide what exception should be here
  210. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  211. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  212. name=self.vmname, template=self.app.default_template,
  213. label='red')
  214. self.vm2.create_on_disk()
  215. def test_021_name_conflict_template(self):
  216. # TODO decide what exception should be here
  217. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  218. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  219. name=self.vmname, label='red')
  220. self.vm2.create_on_disk()
  221. def test_030_rename_conflict_app(self):
  222. vm2name = self.make_vm_name('newname')
  223. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  224. name=vm2name, template=self.app.default_template, label='red')
  225. self.vm2.create_on_disk()
  226. with self.assertNotRaises(OSError):
  227. with self.assertRaises(qubes.exc.QubesException):
  228. self.vm2.name = self.vmname
  229. class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  230. # pylint: disable=attribute-defined-outside-init
  231. def setUp(self):
  232. super(TC_02_QvmPrefs, self).setUp()
  233. self.init_default_template()
  234. self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
  235. def setup_appvm(self):
  236. self.testvm = self.app.add_new_vm(
  237. qubes.vm.appvm.AppVM,
  238. name=self.make_vm_name("vm"),
  239. label='red')
  240. self.testvm.create_on_disk()
  241. self.save_and_reload_db()
  242. def setup_hvm(self):
  243. self.testvm = self.app.add_new_vm(
  244. qubes.vm.appvm.AppVM,
  245. name=self.make_vm_name("hvm"),
  246. label='red')
  247. self.testvm.hvm = True
  248. self.testvm.create_on_disk()
  249. self.save_and_reload_db()
  250. def pref_set(self, name, value, valid=True):
  251. p = subprocess.Popen(
  252. ['qvm-prefs'] + self.sharedopts +
  253. (['--'] if value != '-D' else []) + [self.testvm.name, name, value],
  254. stdout=subprocess.PIPE,
  255. stderr=subprocess.PIPE,
  256. )
  257. (stdout, stderr) = p.communicate()
  258. if valid:
  259. self.assertEquals(p.returncode, 0,
  260. "qvm-prefs .. '{}' '{}' failed: {}{}".format(
  261. name, value, stdout, stderr
  262. ))
  263. else:
  264. self.assertNotEquals(p.returncode, 0,
  265. "qvm-prefs should reject value '{}' for "
  266. "property '{}'".format(value, name))
  267. def pref_get(self, name):
  268. p = subprocess.Popen(['qvm-prefs'] + self.sharedopts +
  269. ['--', self.testvm.name, name], stdout=subprocess.PIPE)
  270. (stdout, _) = p.communicate()
  271. self.assertEquals(p.returncode, 0)
  272. return stdout.strip()
  273. bool_test_values = [
  274. ('true', 'True', True),
  275. ('False', 'False', True),
  276. ('0', 'False', True),
  277. ('1', 'True', True),
  278. ('invalid', '', False)
  279. ]
  280. def execute_tests(self, name, values):
  281. """
  282. Helper function, which executes tests for given property.
  283. :param values: list of tuples (value, expected, valid),
  284. where 'value' is what should be set and 'expected' is what should
  285. qvm-prefs returns as a property value and 'valid' marks valid and
  286. invalid values - if it's False, qvm-prefs should reject the value
  287. :return: None
  288. """
  289. for (value, expected, valid) in values:
  290. self.pref_set(name, value, valid)
  291. if valid:
  292. self.assertEquals(self.pref_get(name), expected)
  293. @unittest.skip('test not converted to core3 API')
  294. def test_006_template(self):
  295. templates = [tpl for tpl in self.app.domains.values() if
  296. isinstance(tpl, qubes.vm.templatevm.TemplateVM)]
  297. if not templates:
  298. self.skipTest("No templates installed")
  299. some_template = templates[0].name
  300. self.setup_appvm()
  301. self.execute_tests('template', [
  302. (some_template, some_template, True),
  303. ('invalid', '', False),
  304. ])
  305. @unittest.skip('test not converted to core3 API')
  306. def test_014_pcidevs(self):
  307. self.setup_appvm()
  308. self.execute_tests('pcidevs', [
  309. ('[]', '[]', True),
  310. ('[ "00:00.0" ]', "['00:00.0']", True),
  311. ('invalid', '', False),
  312. ('[invalid]', '', False),
  313. # TODO:
  314. # ('["12:12.0"]', '', False)
  315. ])
  316. @unittest.skip('test not converted to core3 API')
  317. def test_024_pv_reject_hvm_props(self):
  318. self.setup_appvm()
  319. self.execute_tests('guiagent_installed', [('False', '', False)])
  320. self.execute_tests('qrexec_installed', [('False', '', False)])
  321. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  322. self.execute_tests('timezone', [('localtime', '', False)])
  323. @unittest.skip('test not converted to core3 API')
  324. def test_025_hvm_reject_pv_props(self):
  325. self.setup_hvm()
  326. self.execute_tests('kernel', [('default', '', False)])
  327. self.execute_tests('kernelopts', [('default', '', False)])
  328. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
  329. qubes.tests.QubesTestCase):
  330. # pylint: disable=attribute-defined-outside-init
  331. def setUp(self):
  332. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  333. self.init_default_template()
  334. def setup_pv_template(self):
  335. self.test_template = self.app.add_new_vm(
  336. qubes.vm.templatevm.TemplateVM,
  337. name=self.make_vm_name("pv-clone"),
  338. label='red'
  339. )
  340. self.test_template.clone_properties(self.app.default_template)
  341. self.test_template.clone_disk_files(self.app.default_template)
  342. self.save_and_reload_db()
  343. def setup_hvm_template(self):
  344. self.test_template = self.app.add_new_vm(
  345. qubes.vm.templatevm.TemplateVM,
  346. name=self.make_vm_name("hvm"),
  347. label='red',
  348. hvm=True
  349. )
  350. self.test_template.create_on_disk()
  351. self.save_and_reload_db()
  352. def get_rootimg_checksum(self):
  353. p = subprocess.Popen(
  354. ['sha1sum', self.test_template.volumes['root'].path],
  355. stdout=subprocess.PIPE)
  356. return p.communicate()[0]
  357. def _do_test(self):
  358. checksum_before = self.get_rootimg_checksum()
  359. self.test_template.start()
  360. self.shutdown_and_wait(self.test_template)
  361. checksum_changed = self.get_rootimg_checksum()
  362. if checksum_before == checksum_changed:
  363. self.log.warning("template not modified, test result will be "
  364. "unreliable")
  365. self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
  366. with self.assertNotRaises(subprocess.CalledProcessError):
  367. pool_vid = repr(self.test_template.volumes['root']).strip("'")
  368. revert_cmd = ['qvm-block', 'revert', pool_vid]
  369. subprocess.check_call(revert_cmd)
  370. checksum_after = self.get_rootimg_checksum()
  371. self.assertEquals(checksum_before, checksum_after)
  372. @unittest.expectedFailure
  373. def test_000_revert_pv(self):
  374. """
  375. Test qvm-revert-template-changes for PV template
  376. """
  377. self.setup_pv_template()
  378. self._do_test()
  379. @unittest.skip('HVM not yet implemented')
  380. def test_000_revert_hvm(self):
  381. """
  382. Test qvm-revert-template-changes for HVM template
  383. """
  384. # TODO: have some system there, so the root.img will get modified
  385. self.setup_hvm_template()
  386. self._do_test()
  387. class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  388. def setUp(self):
  389. super(TC_30_Gui_daemon, self).setUp()
  390. self.init_default_template()
  391. @unittest.skipUnless(
  392. spawn.find_executable('xdotool'),
  393. "xdotool not installed")
  394. def test_000_clipboard(self):
  395. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  396. name=self.make_vm_name('vm1'), label='red')
  397. testvm1.create_on_disk()
  398. testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  399. name=self.make_vm_name('vm2'), label='red')
  400. testvm2.create_on_disk()
  401. self.app.save()
  402. testvm1.start()
  403. testvm2.start()
  404. window_title = 'user@{}'.format(testvm1.name)
  405. testvm1.run('zenity --text-info --editable --title={}'.format(
  406. window_title))
  407. self.wait_for_window(window_title)
  408. time.sleep(0.5)
  409. test_string = "test{}".format(testvm1.xid)
  410. # Type and copy some text
  411. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  412. 'windowactivate', '--sync',
  413. 'type', test_string])
  414. # second xdotool call because type --terminator do not work (SEGV)
  415. # additionally do not use search here, so window stack will be empty
  416. # and xdotool will use XTEST instead of generating events manually -
  417. # this will be much better - at least because events will have
  418. # correct timestamp (so gui-daemon would not drop the copy request)
  419. subprocess.check_call(['xdotool',
  420. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  421. 'Escape'])
  422. clipboard_content = \
  423. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  424. self.assertEquals(clipboard_content, test_string,
  425. "Clipboard copy operation failed - content")
  426. clipboard_source = \
  427. open('/var/run/qubes/qubes-clipboard.bin.source',
  428. 'r').read().strip()
  429. self.assertEquals(clipboard_source, testvm1.name,
  430. "Clipboard copy operation failed - owner")
  431. # Then paste it to the other window
  432. window_title = 'user@{}'.format(testvm2.name)
  433. p = testvm2.run('zenity --entry --title={} > test.txt'.format(
  434. window_title), passio_popen=True)
  435. self.wait_for_window(window_title)
  436. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  437. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  438. p.wait()
  439. # And compare the result
  440. (test_output, _) = testvm2.run('cat test.txt',
  441. passio_popen=True).communicate()
  442. self.assertEquals(test_string, test_output.strip())
  443. clipboard_content = \
  444. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  445. self.assertEquals(clipboard_content, "",
  446. "Clipboard not wiped after paste - content")
  447. clipboard_source = \
  448. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
  449. read().strip()
  450. self.assertEquals(clipboard_source, "",
  451. "Clipboard not wiped after paste - owner")
  452. class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  453. def setUp(self):
  454. super(TC_05_StandaloneVM, self).setUp()
  455. self.init_default_template()
  456. def test_000_create_start(self):
  457. testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  458. name=self.make_vm_name('vm1'), label='red')
  459. testvm1.clone_disk_files(self.app.default_template)
  460. self.app.save()
  461. testvm1.start()
  462. self.assertEquals(testvm1.get_power_state(), "Running")
  463. def test_100_resize_root_img(self):
  464. testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  465. name=self.make_vm_name('vm1'), label='red')
  466. testvm1.clone_disk_files(self.app.default_template)
  467. self.app.save()
  468. testvm1.storage.resize(testvm1.volumes['root'], 20 * 1024 ** 3)
  469. self.assertEquals(testvm1.volumes['root'].size, 20 * 1024 ** 3)
  470. testvm1.start()
  471. p = testvm1.run('df --output=size /|tail -n 1',
  472. passio_popen=True)
  473. # new_size in 1k-blocks
  474. (new_size, _) = p.communicate()
  475. # some safety margin for FS metadata
  476. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  477. # vim: ts=4 sw=4 et