basic.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. import multiprocessing
  25. import os
  26. import shutil
  27. import subprocess
  28. import tempfile
  29. import unittest
  30. import time
  31. import libvirt
  32. import qubes
  33. import qubes.vm.qubesvm
  34. import qubes.vm.appvm
  35. import qubes.vm.templatevm
  36. import qubes.tests
  37. class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  38. def setUp(self):
  39. super(TC_00_Basic, self).setUp()
  40. self.init_default_template()
  41. def test_000_qubes_create(self):
  42. self.assertIsInstance(self.app, qubes.Qubes)
  43. def test_001_qvm_create_default_template(self):
  44. self.app.add_new_vm
  45. def test_100_qvm_create(self):
  46. vmname = self.make_vm_name('appvm')
  47. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  48. name=vmname, template=self.app.default_template,
  49. label='red'
  50. )
  51. self.assertIsNotNone(vm)
  52. self.assertEqual(vm.name, vmname)
  53. self.assertEqual(vm.template, self.app.default_template)
  54. vm.create_on_disk()
  55. with self.assertNotRaises(qubes.exc.QubesException):
  56. vm.verify_files()
  57. class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  58. def setUp(self):
  59. super(TC_01_Properties, self).setUp()
  60. self.init_default_template()
  61. self.vmname = self.make_vm_name('appvm')
  62. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  63. name=self.vmname,
  64. label='red')
  65. self.vm.create_on_disk()
  66. def save_and_reload_db(self):
  67. super(TC_01_Properties, self).save_and_reload_db()
  68. if hasattr(self, 'vm'):
  69. self.vm = self.app.domains[self.vm.qid]
  70. if hasattr(self, 'netvm'):
  71. self.netvm = self.app[self.netvm.qid]
  72. def test_000_rename(self):
  73. newname = self.make_vm_name('newname')
  74. self.assertEqual(self.vm.name, self.vmname)
  75. self.vm.write_firewall_conf({'allow': False, 'allowDns': False})
  76. self.vm.autostart = True
  77. self.addCleanup(os.system,
  78. 'sudo systemctl -q disable qubes-vm@{}.service || :'.
  79. format(self.vmname))
  80. pre_rename_firewall = self.vm.get_firewall_conf()
  81. with self.assertNotRaises(
  82. (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
  83. self.vm.name = newname
  84. self.assertEqual(self.vm.name, newname)
  85. self.assertEqual(self.vm.dir_path,
  86. os.path.join(qubes.config.system_path['qubes_appvms_dir'], newname))
  87. self.assertEqual(self.vm.conf_file,
  88. os.path.join(self.vm.dir_path, newname + '.conf'))
  89. self.assertTrue(os.path.exists(
  90. os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory")))
  91. # FIXME: set whitelisted-appmenus.list first
  92. self.assertTrue(os.path.exists(
  93. os.path.join(self.vm.dir_path, "apps", newname + "-firefox.desktop")))
  94. self.assertTrue(os.path.exists(
  95. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  96. newname + "-vm.directory")))
  97. self.assertTrue(os.path.exists(
  98. os.path.join(os.getenv("HOME"), ".local/share/applications",
  99. newname + "-firefox.desktop")))
  100. self.assertFalse(os.path.exists(
  101. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  102. self.vmname + "-vm.directory")))
  103. self.assertFalse(os.path.exists(
  104. os.path.join(os.getenv("HOME"), ".local/share/applications",
  105. self.vmname + "-firefox.desktop")))
  106. self.assertEquals(pre_rename_firewall, self.vm.get_firewall_conf())
  107. with self.assertNotRaises((qubes.exc.QubesException, OSError)):
  108. self.vm.write_firewall_conf({'allow': False})
  109. self.assertTrue(self.vm.autostart)
  110. self.assertTrue(os.path.exists(
  111. '/etc/systemd/system/multi-user.target.wants/'
  112. 'qubes-vm@{}.service'.format(newname)))
  113. self.assertFalse(os.path.exists(
  114. '/etc/systemd/system/multi-user.target.wants/'
  115. 'qubes-vm@{}.service'.format(self.vmname)))
  116. def test_001_rename_libvirt_undefined(self):
  117. self.vm.libvirt_domain.undefine()
  118. self.vm._libvirt_domain = None
  119. newname = self.make_vm_name('newname')
  120. with self.assertNotRaises(
  121. (OSError, libvirt.libvirtError, qubes.exc.QubesException)):
  122. self.vm.name = newname
  123. def test_030_clone(self):
  124. testvm1 = self.app.add_new_vm(
  125. qubes.vm.appvm.AppVM,
  126. name=self.make_vm_name("vm"),
  127. label='red')
  128. testvm1.create_on_disk()
  129. testvm2 = self.app.add_new_vm(testvm1.__class__,
  130. name=self.make_vm_name("clone"),
  131. template=testvm1.template,
  132. label='red',
  133. )
  134. testvm2.clone_properties(testvm1)
  135. testvm2.clone_disk_files(testvm1)
  136. # qubes.xml reload
  137. self.save_and_reload_db()
  138. testvm1 = self.app.domains[testvm1.qid]
  139. testvm2 = self.app.domains[testvm2.qid]
  140. self.assertEquals(testvm1.label, testvm2.label)
  141. self.assertEquals(testvm1.netvm, testvm2.netvm)
  142. self.assertEquals(testvm1.property_is_default('netvm'),
  143. testvm2.property_is_default('netvm'))
  144. self.assertEquals(testvm1.kernel, testvm2.kernel)
  145. self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
  146. self.assertEquals(testvm1.property_is_default('kernel'),
  147. testvm2.property_is_default('kernel'))
  148. self.assertEquals(testvm1.property_is_default('kernelopts'),
  149. testvm2.property_is_default('kernelopts'))
  150. self.assertEquals(testvm1.memory, testvm2.memory)
  151. self.assertEquals(testvm1.maxmem, testvm2.maxmem)
  152. # TODO
  153. # self.assertEquals(testvm1.pcidevs, testvm2.pcidevs)
  154. self.assertEquals(testvm1.include_in_backups,
  155. testvm2.include_in_backups)
  156. self.assertEquals(testvm1.default_user, testvm2.default_user)
  157. self.assertEquals(testvm1.features, testvm2.features)
  158. self.assertEquals(testvm1.get_firewall_conf(),
  159. testvm2.get_firewall_conf())
  160. # now some non-default values
  161. testvm1.netvm = None
  162. testvm1.label = 'orange'
  163. testvm1.memory = 512
  164. firewall = testvm1.get_firewall_conf()
  165. firewall['allowDns'] = False
  166. firewall['allowYumProxy'] = False
  167. firewall['rules'] = [{'address': '1.2.3.4',
  168. 'netmask': 24,
  169. 'proto': 'tcp',
  170. 'portBegin': 22,
  171. 'portEnd': 22,
  172. }]
  173. testvm1.write_firewall_conf(firewall)
  174. testvm3 = self.app.add_new_vm(testvm1.__class__,
  175. name=self.make_vm_name("clone2"),
  176. template=testvm1.template,
  177. label='red',
  178. )
  179. testvm3.clone_properties(testvm1)
  180. testvm3.clone_disk_files(testvm1)
  181. # qubes.xml reload
  182. self.save_and_reload_db()
  183. testvm1 = self.app.domains[testvm1.qid]
  184. testvm3 = self.app.domains[testvm3.qid]
  185. self.assertEquals(testvm1.label, testvm3.label)
  186. self.assertEquals(testvm1.netvm, testvm3.netvm)
  187. self.assertEquals(testvm1.property_is_default('netvm'),
  188. testvm3.property_is_default('netvm'))
  189. self.assertEquals(testvm1.kernel, testvm3.kernel)
  190. self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
  191. self.assertEquals(testvm1.property_is_default('kernel'),
  192. testvm3.property_is_default('kernel'))
  193. self.assertEquals(testvm1.property_is_default('kernelopts'),
  194. testvm3.property_is_default('kernelopts'))
  195. self.assertEquals(testvm1.memory, testvm3.memory)
  196. self.assertEquals(testvm1.maxmem, testvm3.maxmem)
  197. # TODO
  198. # self.assertEquals(testvm1.pcidevs, testvm3.pcidevs)
  199. self.assertEquals(testvm1.include_in_backups,
  200. testvm3.include_in_backups)
  201. self.assertEquals(testvm1.default_user, testvm3.default_user)
  202. self.assertEquals(testvm1.features, testvm3.features)
  203. self.assertEquals(testvm1.get_firewall_conf(),
  204. testvm3.get_firewall_conf())
  205. def test_020_name_conflict_app(self):
  206. # TODO decide what exception should be here
  207. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  208. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  209. name=self.vmname, template=self.app.default_template)
  210. self.vm2.create_on_disk()
  211. def test_021_name_conflict_template(self):
  212. # TODO decide what exception should be here
  213. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  214. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  215. name=self.vmname)
  216. self.vm2.create_on_disk()
  217. def test_030_rename_conflict_app(self):
  218. vm2name = self.make_vm_name('newname')
  219. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  220. name=vm2name, template=self.app.default_template, label='red')
  221. self.vm2.create_on_disk()
  222. with self.assertNotRaises(OSError):
  223. with self.assertRaises(qubes.exc.QubesException):
  224. self.vm2.name = self.vmname
  225. class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  226. def setUp(self):
  227. super(TC_02_QvmPrefs, self).setUp()
  228. self.init_default_template()
  229. self.sharedopts = ['--qubesxml', qubes.tests.XMLPATH]
  230. def setup_appvm(self):
  231. self.testvm = self.app.add_new_vm(
  232. qubes.vm.appvm.AppVM,
  233. name=self.make_vm_name("vm"),
  234. label='red')
  235. self.testvm.create_on_disk()
  236. self.save_and_reload_db()
  237. def setup_hvm(self):
  238. self.testvm = self.app.add_new_vm(
  239. qubes.vm.appvm.AppVM,
  240. name=self.make_vm_name("hvm"),
  241. label='red')
  242. self.testvm.hvm = True
  243. self.testvm.create_on_disk()
  244. self.save_and_reload_db()
  245. def pref_set(self, name, value, valid=True):
  246. p = subprocess.Popen(
  247. ['qvm-prefs'] + self.sharedopts +
  248. (['--'] if value != '-D' else []) + [self.testvm.name, name, value],
  249. stdout=subprocess.PIPE,
  250. stderr=subprocess.PIPE,
  251. )
  252. (stdout, stderr) = p.communicate()
  253. if valid:
  254. self.assertEquals(p.returncode, 0,
  255. "qvm-prefs .. '{}' '{}' failed: {}{}".format(
  256. name, value, stdout, stderr
  257. ))
  258. else:
  259. self.assertNotEquals(p.returncode, 0,
  260. "qvm-prefs should reject value '{}' for "
  261. "property '{}'".format(value, name))
  262. def pref_get(self, name):
  263. p = subprocess.Popen(['qvm-prefs'] + self.sharedopts +
  264. ['--', self.testvm.name, name], stdout=subprocess.PIPE)
  265. (stdout, _) = p.communicate()
  266. self.assertEquals(p.returncode, 0)
  267. return stdout.strip()
  268. bool_test_values = [
  269. ('true', 'True', True),
  270. ('False', 'False', True),
  271. ('0', 'False', True),
  272. ('1', 'True', True),
  273. ('invalid', '', False)
  274. ]
  275. def execute_tests(self, name, values):
  276. """
  277. Helper function, which executes tests for given property.
  278. :param values: list of tuples (value, expected, valid),
  279. where 'value' is what should be set and 'expected' is what should
  280. qvm-prefs returns as a property value and 'valid' marks valid and
  281. invalid values - if it's False, qvm-prefs should reject the value
  282. :return: None
  283. """
  284. for (value, expected, valid) in values:
  285. self.pref_set(name, value, valid)
  286. if valid:
  287. self.assertEquals(self.pref_get(name), expected)
  288. @unittest.skip('test not converted to core3 API')
  289. def test_006_template(self):
  290. templates = [tpl for tpl in self.app.domains.values() if
  291. tpl.is_template()]
  292. if not templates:
  293. self.skipTest("No templates installed")
  294. some_template = templates[0].name
  295. self.setup_appvm()
  296. self.execute_tests('template', [
  297. (some_template, some_template, True),
  298. ('invalid', '', False),
  299. ])
  300. @unittest.skip('test not converted to core3 API')
  301. def test_014_pcidevs(self):
  302. self.setup_appvm()
  303. self.execute_tests('pcidevs', [
  304. ('[]', '[]', True),
  305. ('[ "00:00.0" ]', "['00:00.0']", True),
  306. ('invalid', '', False),
  307. ('[invalid]', '', False),
  308. # TODO:
  309. #('["12:12.0"]', '', False)
  310. ])
  311. @unittest.skip('test not converted to core3 API')
  312. def test_024_pv_reject_hvm_props(self):
  313. self.setup_appvm()
  314. self.execute_tests('guiagent_installed', [('False', '', False)])
  315. self.execute_tests('qrexec_installed', [('False', '', False)])
  316. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  317. self.execute_tests('timezone', [('localtime', '', False)])
  318. @unittest.skip('test not converted to core3 API')
  319. def test_025_hvm_reject_pv_props(self):
  320. self.setup_hvm()
  321. self.execute_tests('kernel', [('default', '', False)])
  322. self.execute_tests('kernelopts', [('default', '', False)])
  323. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
  324. qubes.tests.QubesTestCase):
  325. def setUp(self):
  326. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  327. self.init_default_template()
  328. def setup_pv_template(self):
  329. self.test_template = self.app.add_new_vm(
  330. qubes.vm.templatevm.TemplateVM,
  331. name=self.make_vm_name("pv-clone"),
  332. label='red'
  333. )
  334. self.test_template.clone_properties(self.app.default_template)
  335. self.test_template.clone_disk_files(self.app.default_template)
  336. self.save_and_reload_db()
  337. def setup_hvm_template(self):
  338. self.test_template = self.app.add_new_vm(
  339. qubes.vm.templatevm.TemplateVM,
  340. name=self.make_vm_name("hvm"),
  341. label='red',
  342. hvm=True
  343. )
  344. self.test_template.create_on_disk()
  345. self.save_and_reload_db()
  346. def get_rootimg_checksum(self):
  347. p = subprocess.Popen(['sha1sum', self.test_template.root_img],
  348. stdout=subprocess.PIPE)
  349. return p.communicate()[0]
  350. def _do_test(self):
  351. checksum_before = self.get_rootimg_checksum()
  352. self.test_template.start()
  353. self.shutdown_and_wait(self.test_template)
  354. checksum_changed = self.get_rootimg_checksum()
  355. if checksum_before == checksum_changed:
  356. self.log.warning("template not modified, test result will be "
  357. "unreliable")
  358. with self.assertNotRaises(subprocess.CalledProcessError):
  359. subprocess.check_call(['sudo', 'qvm-revert-template-changes',
  360. '--force', self.test_template.name])
  361. checksum_after = self.get_rootimg_checksum()
  362. self.assertEquals(checksum_before, checksum_after)
  363. def test_000_revert_pv(self):
  364. """
  365. Test qvm-revert-template-changes for PV template
  366. """
  367. self.setup_pv_template()
  368. self._do_test()
  369. @unittest.skip('HVM not yet implemented')
  370. def test_000_revert_hvm(self):
  371. """
  372. Test qvm-revert-template-changes for HVM template
  373. """
  374. # TODO: have some system there, so the root.img will get modified
  375. self.setup_hvm_template()
  376. self._do_test()
  377. class TC_04_DispVM(qubes.tests.SystemTestsMixin,
  378. qubes.tests.QubesTestCase):
  379. def setUp(self):
  380. self.skipTest('DisposableVMs not implemented in core3 yet')
  381. super(TC_04_DispVM, self).setUp()
  382. self.init_default_template()
  383. disp_tpl = self.host_app.domains[self.get_dispvm_template_name()]
  384. self.disp_tpl = self.app.add_new_vm(disp_tpl.__class__,
  385. name=disp_tpl.name,
  386. template=disp_tpl.template,
  387. label='red'
  388. )
  389. self.app.save()
  390. @staticmethod
  391. def get_dispvm_template_name():
  392. # FIXME: currently qubes.xml doesn't contain this information...
  393. vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
  394. return os.path.basename(vmdir)
  395. def test_000_firewall_propagation(self):
  396. """
  397. Check firewall propagation VM->DispVM, when VM have some firewall rules
  398. """
  399. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  400. name=self.make_vm_name('vm1'),
  401. label='red')
  402. testvm1.create_on_disk()
  403. firewall = testvm1.get_firewall_conf()
  404. firewall['allowDns'] = False
  405. firewall['allowYumProxy'] = False
  406. firewall['rules'] = [{'address': '1.2.3.4',
  407. 'netmask': 24,
  408. 'proto': 'tcp',
  409. 'portBegin': 22,
  410. 'portEnd': 22,
  411. }]
  412. testvm1.write_firewall_conf(firewall)
  413. self.app.save()
  414. testvm1.start()
  415. p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
  416. " read x'",
  417. passio_popen=True)
  418. dispvm_name = p.stdout.readline().strip()
  419. self.reload_db()
  420. dispvm = self.app.domains[dispvm_name]
  421. self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
  422. dispvm_name))
  423. # check if firewall was propagated to the DispVM
  424. self.assertEquals(testvm1.get_firewall_conf(),
  425. dispvm.get_firewall_conf())
  426. # and only there (#1608)
  427. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  428. dispvm.get_firewall_conf())
  429. # then modify some rule
  430. firewall = dispvm.get_firewall_conf()
  431. firewall['rules'] = [{'address': '4.3.2.1',
  432. 'netmask': 24,
  433. 'proto': 'tcp',
  434. 'portBegin': 22,
  435. 'portEnd': 22,
  436. }]
  437. dispvm.write_firewall_conf(firewall)
  438. # and check again if wasn't saved anywhere else (#1608)
  439. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  440. dispvm.get_firewall_conf())
  441. self.assertNotEquals(testvm1.get_firewall_conf(),
  442. dispvm.get_firewall_conf())
  443. p.stdin.write('\n')
  444. p.wait()
  445. def test_001_firewall_propagation(self):
  446. """
  447. Check firewall propagation VM->DispVM, when VM have no firewall rules
  448. """
  449. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  450. name=self.make_vm_name('vm1'),
  451. label='red')
  452. testvm1.create_on_disk()
  453. self.app.save()
  454. original_firewall = None
  455. if os.path.exists(self.disp_tpl.firewall_conf):
  456. original_firewall = tempfile.TemporaryFile()
  457. with open(self.disp_tpl.firewall_conf) as f:
  458. original_firewall.write(f.read())
  459. try:
  460. firewall = self.disp_tpl.get_firewall_conf()
  461. firewall['allowDns'] = False
  462. firewall['allowYumProxy'] = False
  463. firewall['rules'] = [{'address': '1.2.3.4',
  464. 'netmask': 24,
  465. 'proto': 'tcp',
  466. 'portBegin': 22,
  467. 'portEnd': 22,
  468. }]
  469. self.disp_tpl.write_firewall_conf(firewall)
  470. testvm1.start()
  471. p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
  472. " read x'",
  473. passio_popen=True)
  474. dispvm_name = p.stdout.readline().strip()
  475. self.reload_db()
  476. dispvm = self.app.domains[dispvm_name]
  477. self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
  478. dispvm_name))
  479. # check if firewall was propagated to the DispVM from the right VM
  480. self.assertEquals(testvm1.get_firewall_conf(),
  481. dispvm.get_firewall_conf())
  482. # and only there (#1608)
  483. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  484. dispvm.get_firewall_conf())
  485. # then modify some rule
  486. firewall = dispvm.get_firewall_conf()
  487. firewall['rules'] = [{'address': '4.3.2.1',
  488. 'netmask': 24,
  489. 'proto': 'tcp',
  490. 'portBegin': 22,
  491. 'portEnd': 22,
  492. }]
  493. dispvm.write_firewall_conf(firewall)
  494. # and check again if wasn't saved anywhere else (#1608)
  495. self.assertNotEquals(self.disp_tpl.get_firewall_conf(),
  496. dispvm.get_firewall_conf())
  497. self.assertNotEquals(testvm1.get_firewall_conf(),
  498. dispvm.get_firewall_conf())
  499. p.stdin.write('\n')
  500. p.wait()
  501. finally:
  502. if original_firewall:
  503. original_firewall.seek(0)
  504. with open(self.disp_tpl.firewall_conf, 'w') as f:
  505. f.write(original_firewall.read())
  506. original_firewall.close()
  507. else:
  508. os.unlink(self.disp_tpl.firewall_conf)
  509. def test_002_cleanup(self):
  510. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  511. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  512. stdin=subprocess.PIPE,
  513. stdout=subprocess.PIPE,
  514. stderr=open(os.devnull, 'w'))
  515. (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
  516. "echo ERROR\n")
  517. self.assertEquals(p.returncode, 0)
  518. lines = stdout.splitlines()
  519. self.assertEqual(lines[0], "test")
  520. dispvm_name = lines[1]
  521. self.reload_db()
  522. dispvm = self.app.domains[dispvm_name]
  523. self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
  524. dispvm_name))
  525. def test_003_cleanup_destroyed(self):
  526. """
  527. Check if DispVM is properly removed even if it terminated itself (#1660)
  528. :return:
  529. """
  530. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  531. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  532. stdin=subprocess.PIPE,
  533. stdout=subprocess.PIPE,
  534. stderr=open(os.devnull, 'w'))
  535. p.stdin.write("qubesdb-read /name\n")
  536. p.stdin.write("echo ERROR\n")
  537. p.stdin.write("poweroff\n")
  538. # do not close p.stdin on purpose - wait to automatic disconnect when
  539. # domain is destroyed
  540. timeout = 30
  541. while timeout > 0:
  542. if p.poll():
  543. break
  544. time.sleep(1)
  545. timeout -= 1
  546. # includes check for None - timeout
  547. self.assertEquals(p.returncode, 0)
  548. lines = p.stdout.read().splitlines()
  549. dispvm_name = lines[0]
  550. self.assertNotEquals(dispvm_name, "ERROR")
  551. self.reload_db()
  552. dispvm = self.app.domains[dispvm_name]
  553. self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
  554. dispvm_name))
  555. # vim: ts=4 sw=4 et