basic.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. import os
  25. import subprocess
  26. import tempfile
  27. import time
  28. import unittest
  29. import qubes
  30. import qubes.tests
  31. import qubes.vm.appvm
  32. import qubes.vm.qubesvm
  33. import qubes.vm.templatevm
  34. import libvirt
  35. class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  36. def setUp(self):
  37. super(TC_00_Basic, self).setUp()
  38. self.init_default_template()
  39. def test_000_qubes_create(self):
  40. self.assertIsInstance(self.app, qubes.Qubes)
  41. def test_001_qvm_create_default_template(self):
  42. self.app.add_new_vm
  43. def test_100_qvm_create(self):
  44. vmname = self.make_vm_name('appvm')
  45. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  46. name=vmname, template=self.app.default_template,
  47. label='red'
  48. )
  49. self.assertIsNotNone(vm)
  50. self.assertEqual(vm.name, vmname)
  51. self.assertEqual(vm.template, self.app.default_template)
  52. vm.create_on_disk()
  53. with self.assertNotRaises(qubes.exc.QubesException):
  54. vm.verify_files()
  55. class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  56. def setUp(self):
  57. super(TC_01_Properties, self).setUp()
  58. self.init_default_template()
  59. self.vmname = self.make_vm_name('appvm')
  60. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  61. name=self.vmname,
  62. label='red')
  63. self.vm.create_on_disk()
  64. def save_and_reload_db(self):
  65. super(TC_01_Properties, self).save_and_reload_db()
  66. if hasattr(self, 'vm'):
  67. self.vm = self.app.domains[self.vm.qid]
  68. if hasattr(self, 'netvm'):
  69. self.netvm = self.app[self.netvm.qid]
  70. def test_000_rename(self):
  71. newname = self.make_vm_name('newname')
  72. self.assertEqual(self.vm.name, self.vmname)
  73. self.vm.write_firewall_conf({'allow': False, 'allowDns': False})
  74. self.vm.autostart = True
  75. self.addCleanup(os.system,
  76. 'sudo systemctl -q disable qubes-vm@{}.service || :'.
  77. format(self.vmname))
  78. pre_rename_firewall = self.vm.get_firewall_conf()
  79. with self.assertNotRaises(
  80. (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
  81. self.vm.name = newname
  82. self.assertEqual(self.vm.name, newname)
  83. self.assertEqual(self.vm.dir_path,
  84. os.path.join(qubes.config.system_path['qubes_appvms_dir'], newname))
  85. self.assertEqual(self.vm.conf_file,
  86. os.path.join(self.vm.dir_path, newname + '.conf'))
  87. self.assertTrue(os.path.exists(
  88. os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory")))
  89. # FIXME: set whitelisted-appmenus.list first
  90. self.assertTrue(os.path.exists(
  91. os.path.join(self.vm.dir_path, "apps", newname + "-firefox.desktop")))
  92. self.assertTrue(os.path.exists(
  93. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  94. newname + "-vm.directory")))
  95. self.assertTrue(os.path.exists(
  96. os.path.join(os.getenv("HOME"), ".local/share/applications",
  97. newname + "-firefox.desktop")))
  98. self.assertFalse(os.path.exists(
  99. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  100. self.vmname + "-vm.directory")))
  101. self.assertFalse(os.path.exists(
  102. os.path.join(os.getenv("HOME"), ".local/share/applications",
  103. self.vmname + "-firefox.desktop")))
  104. self.assertEquals(pre_rename_firewall, self.vm.get_firewall_conf())
  105. with self.assertNotRaises((qubes.exc.QubesException, OSError)):
  106. self.vm.write_firewall_conf({'allow': False})
  107. self.assertTrue(self.vm.autostart)
  108. self.assertTrue(os.path.exists(
  109. '/etc/systemd/system/multi-user.target.wants/'
  110. 'qubes-vm@{}.service'.format(newname)))
  111. self.assertFalse(os.path.exists(
  112. '/etc/systemd/system/multi-user.target.wants/'
  113. 'qubes-vm@{}.service'.format(self.vmname)))
  114. def test_001_rename_libvirt_undefined(self):
  115. self.vm.libvirt_domain.undefine()
  116. self.vm._libvirt_domain = None
  117. newname = self.make_vm_name('newname')
  118. with self.assertNotRaises(
  119. (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
  120. self.vm.name = newname
  121. def test_030_clone(self):
  122. testvm1 = self.app.add_new_vm(
  123. qubes.vm.appvm.AppVM,
  124. name=self.make_vm_name("vm"),
  125. label='red')
  126. testvm1.create_on_disk()
  127. testvm2 = self.app.add_new_vm(testvm1.__class__,
  128. name=self.make_vm_name("clone"),
  129. template=testvm1.template,
  130. label='red',
  131. )
  132. testvm2.clone_properties(testvm1)
  133. testvm2.clone_disk_files(testvm1)
  134. # qubes.xml reload
  135. self.save_and_reload_db()
  136. testvm1 = self.app.domains[testvm1.qid]
  137. testvm2 = self.app.domains[testvm2.qid]
  138. self.assertEquals(testvm1.label, testvm2.label)
  139. self.assertEquals(testvm1.netvm, testvm2.netvm)
  140. self.assertEquals(testvm1.property_is_default('netvm'),
  141. testvm2.property_is_default('netvm'))
  142. self.assertEquals(testvm1.kernel, testvm2.kernel)
  143. self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
  144. self.assertEquals(testvm1.property_is_default('kernel'),
  145. testvm2.property_is_default('kernel'))
  146. self.assertEquals(testvm1.property_is_default('kernelopts'),
  147. testvm2.property_is_default('kernelopts'))
  148. self.assertEquals(testvm1.memory, testvm2.memory)
  149. self.assertEquals(testvm1.maxmem, testvm2.maxmem)
  150. # TODO
  151. # self.assertEquals(testvm1.pcidevs, testvm2.pcidevs)
  152. self.assertEquals(testvm1.include_in_backups,
  153. testvm2.include_in_backups)
  154. self.assertEquals(testvm1.default_user, testvm2.default_user)
  155. self.assertEquals(testvm1.features, testvm2.features)
  156. self.assertEquals(testvm1.get_firewall_conf(),
  157. testvm2.get_firewall_conf())
  158. # now some non-default values
  159. testvm1.netvm = None
  160. testvm1.label = 'orange'
  161. testvm1.memory = 512
  162. firewall = testvm1.get_firewall_conf()
  163. firewall['allowDns'] = False
  164. firewall['allowYumProxy'] = False
  165. firewall['rules'] = [{'address': '1.2.3.4',
  166. 'netmask': 24,
  167. 'proto': 'tcp',
  168. 'portBegin': 22,
  169. 'portEnd': 22,
  170. }]
  171. testvm1.write_firewall_conf(firewall)
  172. testvm3 = self.app.add_new_vm(testvm1.__class__,
  173. name=self.make_vm_name("clone2"),
  174. template=testvm1.template,
  175. label='red',
  176. )
  177. testvm3.clone_properties(testvm1)
  178. testvm3.clone_disk_files(testvm1)
  179. # qubes.xml reload
  180. self.save_and_reload_db()
  181. testvm1 = self.app.domains[testvm1.qid]
  182. testvm3 = self.app.domains[testvm3.qid]
  183. self.assertEquals(testvm1.label, testvm3.label)
  184. self.assertEquals(testvm1.netvm, testvm3.netvm)
  185. self.assertEquals(testvm1.property_is_default('netvm'),
  186. testvm3.property_is_default('netvm'))
  187. self.assertEquals(testvm1.kernel, testvm3.kernel)
  188. self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
  189. self.assertEquals(testvm1.property_is_default('kernel'),
  190. testvm3.property_is_default('kernel'))
  191. self.assertEquals(testvm1.property_is_default('kernelopts'),
  192. testvm3.property_is_default('kernelopts'))
  193. self.assertEquals(testvm1.memory, testvm3.memory)
  194. self.assertEquals(testvm1.maxmem, testvm3.maxmem)
  195. # TODO
  196. # self.assertEquals(testvm1.pcidevs, testvm3.pcidevs)
  197. self.assertEquals(testvm1.include_in_backups,
  198. testvm3.include_in_backups)
  199. self.assertEquals(testvm1.default_user, testvm3.default_user)
  200. self.assertEquals(testvm1.features, testvm3.features)
  201. self.assertEquals(testvm1.get_firewall_conf(),
  202. testvm3.get_firewall_conf())
  203. def test_020_name_conflict_app(self):
  204. # TODO decide what exception should be here
  205. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  206. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  207. name=self.vmname, template=self.app.default_template)
  208. self.vm2.create_on_disk()
  209. def test_021_name_conflict_template(self):
  210. # TODO decide what exception should be here
  211. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  212. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  213. name=self.vmname)
  214. self.vm2.create_on_disk()
  215. def test_030_rename_conflict_app(self):
  216. vm2name = self.make_vm_name('newname')
  217. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  218. name=vm2name, template=self.app.default_template, label='red')
  219. self.vm2.create_on_disk()
  220. with self.assertNotRaises(OSError):
  221. with self.assertRaises(qubes.exc.QubesException):
  222. self.vm2.name = self.vmname
  223. class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  224. def setUp(self):
  225. super(TC_02_QvmPrefs, self).setUp()
  226. self.init_default_template()
  227. self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
  228. def setup_appvm(self):
  229. self.testvm = self.app.add_new_vm(
  230. qubes.vm.appvm.AppVM,
  231. name=self.make_vm_name("vm"),
  232. label='red')
  233. self.testvm.create_on_disk()
  234. self.save_and_reload_db()
  235. def setup_hvm(self):
  236. self.testvm = self.app.add_new_vm(
  237. qubes.vm.appvm.AppVM,
  238. name=self.make_vm_name("hvm"),
  239. label='red')
  240. self.testvm.hvm = True
  241. self.testvm.create_on_disk()
  242. self.save_and_reload_db()
  243. def pref_set(self, name, value, valid=True):
  244. p = subprocess.Popen(
  245. ['qvm-prefs'] + self.sharedopts +
  246. (['--'] if value != '-D' else []) + [self.testvm.name, name, value],
  247. stdout=subprocess.PIPE,
  248. stderr=subprocess.PIPE,
  249. )
  250. (stdout, stderr) = p.communicate()
  251. if valid:
  252. self.assertEquals(p.returncode, 0,
  253. "qvm-prefs .. '{}' '{}' failed: {}{}".format(
  254. name, value, stdout, stderr
  255. ))
  256. else:
  257. self.assertNotEquals(p.returncode, 0,
  258. "qvm-prefs should reject value '{}' for "
  259. "property '{}'".format(value, name))
  260. def pref_get(self, name):
  261. p = subprocess.Popen(['qvm-prefs'] + self.sharedopts +
  262. ['--', self.testvm.name, name], stdout=subprocess.PIPE)
  263. (stdout, _) = p.communicate()
  264. self.assertEquals(p.returncode, 0)
  265. return stdout.strip()
  266. bool_test_values = [
  267. ('true', 'True', True),
  268. ('False', 'False', True),
  269. ('0', 'False', True),
  270. ('1', 'True', True),
  271. ('invalid', '', False)
  272. ]
  273. def execute_tests(self, name, values):
  274. """
  275. Helper function, which executes tests for given property.
  276. :param values: list of tuples (value, expected, valid),
  277. where 'value' is what should be set and 'expected' is what should
  278. qvm-prefs returns as a property value and 'valid' marks valid and
  279. invalid values - if it's False, qvm-prefs should reject the value
  280. :return: None
  281. """
  282. for (value, expected, valid) in values:
  283. self.pref_set(name, value, valid)
  284. if valid:
  285. self.assertEquals(self.pref_get(name), expected)
  286. @unittest.skip('test not converted to core3 API')
  287. def test_006_template(self):
  288. templates = [tpl for tpl in self.app.domains.values() if
  289. tpl.is_template()]
  290. if not templates:
  291. self.skipTest("No templates installed")
  292. some_template = templates[0].name
  293. self.setup_appvm()
  294. self.execute_tests('template', [
  295. (some_template, some_template, True),
  296. ('invalid', '', False),
  297. ])
  298. @unittest.skip('test not converted to core3 API')
  299. def test_014_pcidevs(self):
  300. self.setup_appvm()
  301. self.execute_tests('pcidevs', [
  302. ('[]', '[]', True),
  303. ('[ "00:00.0" ]', "['00:00.0']", True),
  304. ('invalid', '', False),
  305. ('[invalid]', '', False),
  306. # TODO:
  307. #('["12:12.0"]', '', False)
  308. ])
  309. @unittest.skip('test not converted to core3 API')
  310. def test_024_pv_reject_hvm_props(self):
  311. self.setup_appvm()
  312. self.execute_tests('guiagent_installed', [('False', '', False)])
  313. self.execute_tests('qrexec_installed', [('False', '', False)])
  314. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  315. self.execute_tests('timezone', [('localtime', '', False)])
  316. @unittest.skip('test not converted to core3 API')
  317. def test_025_hvm_reject_pv_props(self):
  318. self.setup_hvm()
  319. self.execute_tests('kernel', [('default', '', False)])
  320. self.execute_tests('kernelopts', [('default', '', False)])
  321. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
  322. qubes.tests.QubesTestCase):
  323. def setUp(self):
  324. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  325. self.init_default_template()
  326. def setup_pv_template(self):
  327. self.test_template = self.app.add_new_vm(
  328. qubes.vm.templatevm.TemplateVM,
  329. name=self.make_vm_name("pv-clone"),
  330. label='red'
  331. )
  332. self.test_template.clone_properties(self.app.default_template)
  333. self.test_template.clone_disk_files(self.app.default_template)
  334. self.save_and_reload_db()
  335. def setup_hvm_template(self):
  336. self.test_template = self.app.add_new_vm(
  337. qubes.vm.templatevm.TemplateVM,
  338. name=self.make_vm_name("hvm"),
  339. label='red',
  340. hvm=True
  341. )
  342. self.test_template.create_on_disk()
  343. self.save_and_reload_db()
  344. def get_rootimg_checksum(self):
  345. p = subprocess.Popen(
  346. ['sha1sum', self.test_template.volumes['root'].vid],
  347. stdout=subprocess.PIPE)
  348. return p.communicate()[0]
  349. def _do_test(self):
  350. checksum_before = self.get_rootimg_checksum()
  351. self.test_template.start()
  352. self.shutdown_and_wait(self.test_template)
  353. checksum_changed = self.get_rootimg_checksum()
  354. if checksum_before == checksum_changed:
  355. self.log.warning("template not modified, test result will be "
  356. "unreliable")
  357. with self.assertNotRaises(subprocess.CalledProcessError):
  358. subprocess.check_call(['sudo', 'qvm-revert-template-changes',
  359. '--force', self.test_template.name])
  360. checksum_after = self.get_rootimg_checksum()
  361. self.assertEquals(checksum_before, checksum_after)
  362. def test_000_revert_pv(self):
  363. """
  364. Test qvm-revert-template-changes for PV template
  365. """
  366. self.setup_pv_template()
  367. self._do_test()
  368. @unittest.skip('HVM not yet implemented')
  369. def test_000_revert_hvm(self):
  370. """
  371. Test qvm-revert-template-changes for HVM template
  372. """
  373. # TODO: have some system there, so the root.img will get modified
  374. self.setup_hvm_template()
  375. self._do_test()
  376. class TC_04_DispVM(qubes.tests.SystemTestsMixin,
  377. qubes.tests.QubesTestCase):
  378. def setUp(self):
  379. self.skipTest('DisposableVMs not implemented in core3 yet')
  380. super(TC_04_DispVM, self).setUp()
  381. self.init_default_template()
  382. disp_tpl = self.host_app.domains[self.get_dispvm_template_name()]
  383. self.disp_tpl = self.app.add_new_vm(disp_tpl.__class__,
  384. name=disp_tpl.name,
  385. template=disp_tpl.template,
  386. label='red'
  387. )
  388. self.app.save()
  389. @staticmethod
  390. def get_dispvm_template_name():
  391. # FIXME: currently qubes.xml doesn't contain this information...
  392. vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
  393. return os.path.basename(vmdir)
  394. def test_000_firewall_propagation(self):
  395. """
  396. Check firewall propagation VM->DispVM, when VM have some firewall rules
  397. """
  398. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  399. name=self.make_vm_name('vm1'),
  400. label='red')
  401. testvm1.create_on_disk()
  402. firewall = testvm1.get_firewall_conf()
  403. firewall['allowDns'] = False
  404. firewall['allowYumProxy'] = False
  405. firewall['rules'] = [{'address': '1.2.3.4',
  406. 'netmask': 24,
  407. 'proto': 'tcp',
  408. 'portBegin': 22,
  409. 'portEnd': 22,
  410. }]
  411. testvm1.write_firewall_conf(firewall)
  412. self.app.save()
  413. testvm1.start()
  414. p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
  415. " read x'",
  416. passio_popen=True)
  417. dispvm_name = p.stdout.readline().strip()
  418. self.reload_db()
  419. dispvm = self.app.domains[dispvm_name]
  420. self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
  421. dispvm_name))
  422. # check if firewall was propagated to the DispVM
  423. self.assertEquals(testvm1.get_firewall_conf(),
  424. dispvm.get_firewall_conf())
  425. # and only there (#1608)
  426. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  427. dispvm.get_firewall_conf())
  428. # then modify some rule
  429. firewall = dispvm.get_firewall_conf()
  430. firewall['rules'] = [{'address': '4.3.2.1',
  431. 'netmask': 24,
  432. 'proto': 'tcp',
  433. 'portBegin': 22,
  434. 'portEnd': 22,
  435. }]
  436. dispvm.write_firewall_conf(firewall)
  437. # and check again if wasn't saved anywhere else (#1608)
  438. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  439. dispvm.get_firewall_conf())
  440. self.assertNotEquals(testvm1.get_firewall_conf(),
  441. dispvm.get_firewall_conf())
  442. p.stdin.write('\n')
  443. p.wait()
  444. def test_001_firewall_propagation(self):
  445. """
  446. Check firewall propagation VM->DispVM, when VM have no firewall rules
  447. """
  448. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  449. name=self.make_vm_name('vm1'),
  450. label='red')
  451. testvm1.create_on_disk()
  452. self.app.save()
  453. original_firewall = None
  454. if os.path.exists(self.disp_tpl.firewall_conf):
  455. original_firewall = tempfile.TemporaryFile()
  456. with open(self.disp_tpl.firewall_conf) as f:
  457. original_firewall.write(f.read())
  458. try:
  459. firewall = self.disp_tpl.get_firewall_conf()
  460. firewall['allowDns'] = False
  461. firewall['allowYumProxy'] = False
  462. firewall['rules'] = [{'address': '1.2.3.4',
  463. 'netmask': 24,
  464. 'proto': 'tcp',
  465. 'portBegin': 22,
  466. 'portEnd': 22,
  467. }]
  468. self.disp_tpl.write_firewall_conf(firewall)
  469. testvm1.start()
  470. p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
  471. " read x'",
  472. passio_popen=True)
  473. dispvm_name = p.stdout.readline().strip()
  474. self.reload_db()
  475. dispvm = self.app.domains[dispvm_name]
  476. self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
  477. dispvm_name))
  478. # check if firewall was propagated to the DispVM from the right VM
  479. self.assertEquals(testvm1.get_firewall_conf(),
  480. dispvm.get_firewall_conf())
  481. # and only there (#1608)
  482. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  483. dispvm.get_firewall_conf())
  484. # then modify some rule
  485. firewall = dispvm.get_firewall_conf()
  486. firewall['rules'] = [{'address': '4.3.2.1',
  487. 'netmask': 24,
  488. 'proto': 'tcp',
  489. 'portBegin': 22,
  490. 'portEnd': 22,
  491. }]
  492. dispvm.write_firewall_conf(firewall)
  493. # and check again if wasn't saved anywhere else (#1608)
  494. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  495. dispvm.get_firewall_conf())
  496. self.assertNotEquals(testvm1.get_firewall_conf(),
  497. dispvm.get_firewall_conf())
  498. p.stdin.write('\n')
  499. p.wait()
  500. finally:
  501. if original_firewall:
  502. original_firewall.seek(0)
  503. with open(self.disp_tpl.firewall_conf, 'w') as f:
  504. f.write(original_firewall.read())
  505. original_firewall.close()
  506. else:
  507. os.unlink(self.disp_tpl.firewall_conf)
  508. def test_002_cleanup(self):
  509. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  510. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  511. stdin=subprocess.PIPE,
  512. stdout=subprocess.PIPE,
  513. stderr=open(os.devnull, 'w'))
  514. (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
  515. "echo ERROR\n")
  516. self.assertEquals(p.returncode, 0)
  517. lines = stdout.splitlines()
  518. self.assertEqual(lines[0], "test")
  519. dispvm_name = lines[1]
  520. self.reload_db()
  521. dispvm = self.app.domains[dispvm_name]
  522. self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
  523. dispvm_name))
  524. def test_003_cleanup_destroyed(self):
  525. """
  526. Check if DispVM is properly removed even if it terminated itself (#1660)
  527. :return:
  528. """
  529. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  530. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  531. stdin=subprocess.PIPE,
  532. stdout=subprocess.PIPE,
  533. stderr=open(os.devnull, 'w'))
  534. p.stdin.write("qubesdb-read /name\n")
  535. p.stdin.write("echo ERROR\n")
  536. p.stdin.write("poweroff\n")
  537. # do not close p.stdin on purpose - wait to automatic disconnect when
  538. # domain is destroyed
  539. timeout = 30
  540. while timeout > 0:
  541. if p.poll():
  542. break
  543. time.sleep(1)
  544. timeout -= 1
  545. # includes check for None - timeout
  546. self.assertEquals(p.returncode, 0)
  547. lines = p.stdout.read().splitlines()
  548. dispvm_name = lines[0]
  549. self.assertNotEquals(dispvm_name, "ERROR")
  550. self.reload_db()
  551. dispvm = self.app.domains[dispvm_name]
  552. self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
  553. dispvm_name))
  554. # vim: ts=4 sw=4 et