basic.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. import multiprocessing
  25. import os
  26. import shutil
  27. import subprocess
  28. import tempfile
  29. import unittest
  30. import time
  31. from qubes.qubes import QubesVmCollection, QubesException, system_path
  32. import libvirt
  33. import qubes.qubes
  34. import qubes.tests
  35. from qubes.qubes import QubesVmLabels
  36. class TC_00_Basic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  37. def test_000_create(self):
  38. vmname = self.make_vm_name('appvm')
  39. vm = self.qc.add_new_vm('QubesAppVm',
  40. name=vmname, template=self.qc.get_default_template())
  41. self.assertIsNotNone(vm)
  42. self.assertEqual(vm.name, vmname)
  43. self.assertEqual(vm.template, self.qc.get_default_template())
  44. vm.create_on_disk(verbose=False)
  45. with self.assertNotRaises(qubes.qubes.QubesException):
  46. vm.verify_files()
  47. class TC_01_Properties(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  48. def setUp(self):
  49. super(TC_01_Properties, self).setUp()
  50. self.vmname = self.make_vm_name('appvm')
  51. self.vm = self.qc.add_new_vm('QubesAppVm',
  52. name=self.vmname, template=self.qc.get_default_template())
  53. self.vm.create_on_disk(verbose=False)
  54. def save_and_reload_db(self):
  55. super(TC_01_Properties, self).save_and_reload_db()
  56. if hasattr(self, 'vm'):
  57. self.vm = self.qc.get(self.vm.qid, None)
  58. if hasattr(self, 'netvm'):
  59. self.netvm = self.qc.get(self.netvm.qid, None)
  60. def test_000_rename(self):
  61. newname = self.make_vm_name('newname')
  62. self.assertEqual(self.vm.name, self.vmname)
  63. self.vm.write_firewall_conf({'allow': False, 'allowDns': False})
  64. self.vm.autostart = True
  65. self.addCleanup(os.system,
  66. 'sudo systemctl -q disable qubes-vm@{}.service || :'.
  67. format(self.vmname))
  68. pre_rename_firewall = self.vm.get_firewall_conf()
  69. #TODO: change to setting property when implemented
  70. self.vm.set_name(newname)
  71. self.assertEqual(self.vm.name, newname)
  72. self.assertEqual(self.vm.dir_path,
  73. os.path.join(system_path['qubes_appvms_dir'], newname))
  74. self.assertEqual(self.vm.conf_file,
  75. os.path.join(self.vm.dir_path, newname + '.conf'))
  76. self.assertTrue(os.path.exists(
  77. os.path.join(self.vm.dir_path, "apps", newname + "-vm.directory")))
  78. # FIXME: set whitelisted-appmenus.list first
  79. self.assertTrue(os.path.exists(
  80. os.path.join(self.vm.dir_path, "apps", newname + "-firefox.desktop")))
  81. self.assertTrue(os.path.exists(
  82. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  83. newname + "-vm.directory")))
  84. self.assertTrue(os.path.exists(
  85. os.path.join(os.getenv("HOME"), ".local/share/applications",
  86. newname + "-firefox.desktop")))
  87. self.assertFalse(os.path.exists(
  88. os.path.join(os.getenv("HOME"), ".local/share/desktop-directories",
  89. self.vmname + "-vm.directory")))
  90. self.assertFalse(os.path.exists(
  91. os.path.join(os.getenv("HOME"), ".local/share/applications",
  92. self.vmname + "-firefox.desktop")))
  93. self.assertEquals(pre_rename_firewall, self.vm.get_firewall_conf())
  94. with self.assertNotRaises((QubesException, OSError)):
  95. self.vm.write_firewall_conf({'allow': False})
  96. self.assertTrue(self.vm.autostart)
  97. self.assertTrue(os.path.exists(
  98. '/etc/systemd/system/multi-user.target.wants/'
  99. 'qubes-vm@{}.service'.format(newname)))
  100. self.assertFalse(os.path.exists(
  101. '/etc/systemd/system/multi-user.target.wants/'
  102. 'qubes-vm@{}.service'.format(self.vmname)))
  103. def test_001_rename_libvirt_undefined(self):
  104. self.vm.libvirt_domain.undefine()
  105. self.vm._libvirt_domain = None
  106. newname = self.make_vm_name('newname')
  107. with self.assertNotRaises(libvirt.libvirtError):
  108. self.vm.set_name(newname)
  109. def test_010_netvm(self):
  110. if self.qc.get_default_netvm() is None:
  111. self.skip("Set default NetVM before running this test")
  112. self.netvm = self.qc.add_new_vm("QubesNetVm",
  113. name=self.make_vm_name('netvm'),
  114. template=self.qc.get_default_template())
  115. self.netvm.create_on_disk(verbose=False)
  116. # TODO: remove this line after switching to core3
  117. self.save_and_reload_db()
  118. self.assertEquals(self.vm.netvm, self.qc.get_default_netvm())
  119. self.vm.uses_default_netvm = False
  120. self.vm.netvm = None
  121. self.assertIsNone(self.vm.netvm)
  122. self.save_and_reload_db()
  123. self.assertIsNone(self.vm.netvm)
  124. self.vm.netvm = self.qc[self.netvm.qid]
  125. self.assertEquals(self.vm.netvm.qid, self.netvm.qid)
  126. self.save_and_reload_db()
  127. self.assertEquals(self.vm.netvm.qid, self.netvm.qid)
  128. self.vm.uses_default_netvm = True
  129. # TODO: uncomment when properly implemented
  130. # self.assertEquals(self.vm.netvm.qid, self.qc.get_default_netvm().qid)
  131. self.save_and_reload_db()
  132. self.assertEquals(self.vm.netvm.qid, self.qc.get_default_netvm().qid)
  133. with self.assertRaises(ValueError):
  134. self.vm.netvm = self.vm
  135. def test_020_dispvm_netvm(self):
  136. if self.qc.get_default_netvm() is None:
  137. self.skip("Set default NetVM before running this test")
  138. self.netvm = self.qc.add_new_vm("QubesNetVm",
  139. name=self.make_vm_name('netvm'),
  140. template=self.qc.get_default_template())
  141. self.netvm.create_on_disk(verbose=False)
  142. self.assertEquals(self.vm.netvm, self.vm.dispvm_netvm)
  143. self.vm.uses_default_dispvm_netvm = False
  144. self.vm.dispvm_netvm = None
  145. self.assertIsNone(self.vm.dispvm_netvm)
  146. self.save_and_reload_db()
  147. self.assertIsNone(self.vm.dispvm_netvm)
  148. self.vm.dispvm_netvm = self.netvm
  149. self.assertEquals(self.vm.dispvm_netvm, self.netvm)
  150. self.save_and_reload_db()
  151. self.assertEquals(self.vm.dispvm_netvm, self.netvm)
  152. self.vm.uses_default_dispvm_netvm = True
  153. self.assertEquals(self.vm.dispvm_netvm, self.vm.netvm)
  154. self.save_and_reload_db()
  155. self.assertEquals(self.vm.dispvm_netvm, self.vm.netvm)
  156. with self.assertRaises(ValueError):
  157. self.vm.dispvm_netvm = self.vm
  158. def test_030_clone(self):
  159. testvm1 = self.qc.add_new_vm(
  160. "QubesAppVm",
  161. name=self.make_vm_name("vm"),
  162. template=self.qc.get_default_template())
  163. testvm1.create_on_disk(verbose=False)
  164. testvm2 = self.qc.add_new_vm(testvm1.__class__.__name__,
  165. name=self.make_vm_name("clone"),
  166. template=testvm1.template,
  167. )
  168. testvm2.clone_attrs(src_vm=testvm1)
  169. testvm2.clone_disk_files(src_vm=testvm1, verbose=False)
  170. # qubes.xml reload
  171. self.save_and_reload_db()
  172. testvm1 = self.qc[testvm1.qid]
  173. testvm2 = self.qc[testvm2.qid]
  174. self.assertEquals(testvm1.label, testvm2.label)
  175. self.assertEquals(testvm1.netvm, testvm2.netvm)
  176. self.assertEquals(testvm1.uses_default_netvm,
  177. testvm2.uses_default_netvm)
  178. self.assertEquals(testvm1.kernel, testvm2.kernel)
  179. self.assertEquals(testvm1.kernelopts, testvm2.kernelopts)
  180. self.assertEquals(testvm1.uses_default_kernel,
  181. testvm2.uses_default_kernel)
  182. self.assertEquals(testvm1.uses_default_kernelopts,
  183. testvm2.uses_default_kernelopts)
  184. self.assertEquals(testvm1.memory, testvm2.memory)
  185. self.assertEquals(testvm1.maxmem, testvm2.maxmem)
  186. self.assertEquals(testvm1.pcidevs, testvm2.pcidevs)
  187. self.assertEquals(testvm1.include_in_backups,
  188. testvm2.include_in_backups)
  189. self.assertEquals(testvm1.default_user, testvm2.default_user)
  190. self.assertEquals(testvm1.services, testvm2.services)
  191. self.assertEquals(testvm1.get_firewall_conf(),
  192. testvm2.get_firewall_conf())
  193. # now some non-default values
  194. testvm1.netvm = None
  195. testvm1.uses_default_netvm = False
  196. testvm1.label = QubesVmLabels['orange']
  197. testvm1.memory = 512
  198. firewall = testvm1.get_firewall_conf()
  199. firewall['allowDns'] = False
  200. firewall['allowYumProxy'] = False
  201. firewall['rules'] = [{'address': '1.2.3.4',
  202. 'netmask': 24,
  203. 'proto': 'tcp',
  204. 'portBegin': 22,
  205. 'portEnd': 22,
  206. }]
  207. testvm1.write_firewall_conf(firewall)
  208. testvm3 = self.qc.add_new_vm(testvm1.__class__.__name__,
  209. name=self.make_vm_name("clone2"),
  210. template=testvm1.template,
  211. )
  212. testvm3.clone_attrs(src_vm=testvm1)
  213. testvm3.clone_disk_files(src_vm=testvm1, verbose=False)
  214. # qubes.xml reload
  215. self.save_and_reload_db()
  216. testvm1 = self.qc[testvm1.qid]
  217. testvm3 = self.qc[testvm3.qid]
  218. self.assertEquals(testvm1.label, testvm3.label)
  219. self.assertEquals(testvm1.netvm, testvm3.netvm)
  220. self.assertEquals(testvm1.uses_default_netvm,
  221. testvm3.uses_default_netvm)
  222. self.assertEquals(testvm1.kernel, testvm3.kernel)
  223. self.assertEquals(testvm1.kernelopts, testvm3.kernelopts)
  224. self.assertEquals(testvm1.uses_default_kernel,
  225. testvm3.uses_default_kernel)
  226. self.assertEquals(testvm1.uses_default_kernelopts,
  227. testvm3.uses_default_kernelopts)
  228. self.assertEquals(testvm1.memory, testvm3.memory)
  229. self.assertEquals(testvm1.maxmem, testvm3.maxmem)
  230. self.assertEquals(testvm1.pcidevs, testvm3.pcidevs)
  231. self.assertEquals(testvm1.include_in_backups,
  232. testvm3.include_in_backups)
  233. self.assertEquals(testvm1.default_user, testvm3.default_user)
  234. self.assertEquals(testvm1.services, testvm3.services)
  235. self.assertEquals(testvm1.get_firewall_conf(),
  236. testvm3.get_firewall_conf())
  237. def test_020_name_conflict_app(self):
  238. with self.assertRaises(QubesException):
  239. self.vm2 = self.qc.add_new_vm('QubesAppVm',
  240. name=self.vmname, template=self.qc.get_default_template())
  241. self.vm2.create_on_disk(verbose=False)
  242. def test_021_name_conflict_hvm(self):
  243. with self.assertRaises(QubesException):
  244. self.vm2 = self.qc.add_new_vm('QubesHVm',
  245. name=self.vmname, template=self.qc.get_default_template())
  246. self.vm2.create_on_disk(verbose=False)
  247. def test_022_name_conflict_net(self):
  248. with self.assertRaises(QubesException):
  249. self.vm2 = self.qc.add_new_vm('QubesNetVm',
  250. name=self.vmname, template=self.qc.get_default_template())
  251. self.vm2.create_on_disk(verbose=False)
  252. def test_030_rename_conflict_app(self):
  253. vm2name = self.make_vm_name('newname')
  254. self.vm2 = self.qc.add_new_vm('QubesAppVm',
  255. name=vm2name, template=self.qc.get_default_template())
  256. self.vm2.create_on_disk(verbose=False)
  257. with self.assertRaises(QubesException):
  258. self.vm2.set_name(self.vmname)
  259. def test_031_rename_conflict_net(self):
  260. vm3name = self.make_vm_name('newname')
  261. self.vm3 = self.qc.add_new_vm('QubesNetVm',
  262. name=vm3name, template=self.qc.get_default_template())
  263. self.vm3.create_on_disk(verbose=False)
  264. with self.assertRaises(QubesException):
  265. self.vm3.set_name(self.vmname)
  266. class TC_02_QvmPrefs(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  267. def setup_appvm(self):
  268. self.testvm = self.qc.add_new_vm(
  269. "QubesAppVm",
  270. name=self.make_vm_name("vm"),
  271. template=self.qc.get_default_template())
  272. self.testvm.create_on_disk(verbose=False)
  273. self.save_and_reload_db()
  274. self.qc.unlock_db()
  275. def setup_hvm(self):
  276. self.testvm = self.qc.add_new_vm(
  277. "QubesHVm",
  278. name=self.make_vm_name("hvm"))
  279. self.testvm.create_on_disk(verbose=False)
  280. self.save_and_reload_db()
  281. self.qc.unlock_db()
  282. def pref_set(self, name, value, valid=True):
  283. p = subprocess.Popen(
  284. ['qvm-prefs', '-s', '--', self.testvm.name, name, value],
  285. stdout=subprocess.PIPE,
  286. stderr=subprocess.PIPE,
  287. )
  288. (stdout, stderr) = p.communicate()
  289. if valid:
  290. self.assertEquals(p.returncode, 0,
  291. "qvm-prefs -s .. '{}' '{}' failed: {}{}".format(
  292. name, value, stdout, stderr
  293. ))
  294. else:
  295. self.assertNotEquals(p.returncode, 0,
  296. "qvm-prefs should reject value '{}' for "
  297. "property '{}'".format(value, name))
  298. def pref_get(self, name):
  299. p = subprocess.Popen(['qvm-prefs', '-g', self.testvm.name, name],
  300. stdout=subprocess.PIPE)
  301. (stdout, _) = p.communicate()
  302. self.assertEquals(p.returncode, 0)
  303. return stdout.strip()
  304. bool_test_values = [
  305. ('true', 'True', True),
  306. ('False', 'False', True),
  307. ('0', 'False', True),
  308. ('1', 'True', True),
  309. ('invalid', '', False)
  310. ]
  311. def execute_tests(self, name, values):
  312. """
  313. Helper function, which executes tests for given property.
  314. :param values: list of tuples (value, expected, valid),
  315. where 'value' is what should be set and 'expected' is what should
  316. qvm-prefs returns as a property value and 'valid' marks valid and
  317. invalid values - if it's False, qvm-prefs should reject the value
  318. :return: None
  319. """
  320. for (value, expected, valid) in values:
  321. self.pref_set(name, value, valid)
  322. if valid:
  323. self.assertEquals(self.pref_get(name), expected)
  324. def test_000_kernel(self):
  325. self.setup_appvm()
  326. default_kernel = self.qc.get_default_kernel()
  327. self.execute_tests('kernel', [
  328. ('default', default_kernel, True),
  329. (default_kernel, default_kernel, True),
  330. ('invalid', '', False),
  331. ])
  332. def test_001_include_in_backups(self):
  333. self.setup_appvm()
  334. self.execute_tests('include_in_backups', self.bool_test_values)
  335. def test_002_qrexec_timeout(self):
  336. self.setup_appvm()
  337. self.execute_tests('qrexec_timeout', [
  338. ('60', '60', True),
  339. ('0', '0', True),
  340. ('-10', '', False),
  341. ('invalid', '', False)
  342. ])
  343. def test_003_internal(self):
  344. self.setup_appvm()
  345. self.execute_tests('include_in_backups', self.bool_test_values)
  346. def test_004_label(self):
  347. self.setup_appvm()
  348. self.execute_tests('label', [
  349. ('red', 'red', True),
  350. ('blue', 'blue', True),
  351. ('amber', '', False),
  352. ])
  353. def test_005_kernelopts(self):
  354. self.setup_appvm()
  355. self.execute_tests('kernelopts', [
  356. ('option', 'option', True),
  357. ('default', 'nopat', True),
  358. ('', '', True),
  359. ])
  360. def test_006_template(self):
  361. templates = [tpl for tpl in self.qc.values() if tpl.is_template()]
  362. if not templates:
  363. self.skip("No templates installed")
  364. some_template = templates[0].name
  365. self.setup_appvm()
  366. self.execute_tests('template', [
  367. (some_template, some_template, True),
  368. ('invalid', '', False),
  369. ])
  370. def test_007_memory(self):
  371. self.setup_appvm()
  372. qh = qubes.qubes.QubesHost()
  373. memory_total = qh.memory_total
  374. self.execute_tests('memory', [
  375. ('300', '300', True),
  376. ('1500', '1500', True),
  377. # TODO:
  378. #('500M', '500', True),
  379. #(str(self.testvm.maxmem+500), '', False),
  380. (str(2*memory_total), '', False),
  381. ])
  382. def test_008_maxmem(self):
  383. self.setup_appvm()
  384. qh = qubes.qubes.QubesHost()
  385. memory_total = qh.memory_total
  386. self.execute_tests('memory', [
  387. ('300', '300', True),
  388. ('1500', '1500', True),
  389. # TODO:
  390. #('500M', '500', True),
  391. #(str(self.testvm.memory-50), '', False),
  392. (str(2*memory_total), '', False),
  393. ])
  394. def test_009_autostart(self):
  395. self.setup_appvm()
  396. self.execute_tests('autostart', self.bool_test_values)
  397. def test_010_pci_strictreset(self):
  398. self.setup_appvm()
  399. self.execute_tests('pci_strictreset', self.bool_test_values)
  400. def test_011_dispvm_netvm(self):
  401. self.setup_appvm()
  402. default_netvm = self.qc.get_default_netvm().name
  403. netvms = [tpl for tpl in self.qc.values() if tpl.is_netvm()]
  404. if not netvms:
  405. self.skip("No netvms installed")
  406. some_netvm = netvms[0].name
  407. if some_netvm == default_netvm:
  408. if len(netvms) <= 1:
  409. self.skip("At least two NetVM/ProxyVM required")
  410. some_netvm = netvms[1].name
  411. self.execute_tests('dispvm_netvm', [
  412. (some_netvm, some_netvm, True),
  413. (default_netvm, default_netvm, True),
  414. ('default', default_netvm, True),
  415. ('none', '', True),
  416. (self.testvm.name, '', False),
  417. ('invalid', '', False)
  418. ])
  419. def test_012_mac(self):
  420. self.setup_appvm()
  421. default_mac = self.testvm.mac
  422. self.execute_tests('mac', [
  423. ('00:11:22:33:44:55', '00:11:22:33:44:55', True),
  424. ('auto', default_mac, True),
  425. # TODO:
  426. #('00:11:22:33:44:55:66', '', False),
  427. ('invalid', '', False),
  428. ])
  429. def test_013_default_user(self):
  430. self.setup_appvm()
  431. self.execute_tests('default_user', [
  432. ('someuser', self.testvm.template.default_user, True)
  433. # TODO: tests for standalone VMs
  434. ])
  435. def test_014_pcidevs(self):
  436. self.setup_appvm()
  437. self.execute_tests('pcidevs', [
  438. ('[]', '[]', True),
  439. ('[ "00:00.0" ]', "['00:00.0']", True),
  440. ('invalid', '', False),
  441. ('[invalid]', '', False),
  442. # TODO:
  443. #('["12:12.0"]', '', False)
  444. ])
  445. def test_015_name(self):
  446. self.setup_appvm()
  447. self.execute_tests('name', [
  448. ('invalid!@#name', '', False),
  449. # TODO: duplicate name test - would fail for now...
  450. ])
  451. newname = self.make_vm_name('newname')
  452. self.pref_set('name', newname, True)
  453. self.qc.lock_db_for_reading()
  454. self.qc.load()
  455. self.qc.unlock_db()
  456. self.testvm = self.qc.get_vm_by_name(newname)
  457. self.assertEquals(self.pref_get('name'), newname)
  458. def test_016_vcpus(self):
  459. self.setup_appvm()
  460. self.execute_tests('vcpus', [
  461. ('1', '1', True),
  462. ('100', '', False),
  463. ('-1', '', False),
  464. ('invalid', '', False),
  465. ])
  466. def test_017_debug(self):
  467. self.setup_appvm()
  468. self.execute_tests('debug', [
  469. ('on', 'True', True),
  470. ('off', 'False', True),
  471. ('true', 'True', True),
  472. ('0', 'False', True),
  473. ('invalid', '', False)
  474. ])
  475. def test_018_netvm(self):
  476. self.setup_appvm()
  477. default_netvm = self.qc.get_default_netvm().name
  478. netvms = [tpl for tpl in self.qc.values() if tpl.is_netvm()]
  479. if not netvms:
  480. self.skip("No netvms installed")
  481. some_netvm = netvms[0].name
  482. if some_netvm == default_netvm:
  483. if len(netvms) <= 1:
  484. self.skip("At least two NetVM/ProxyVM required")
  485. some_netvm = netvms[1].name
  486. self.execute_tests('netvm', [
  487. (some_netvm, some_netvm, True),
  488. (default_netvm, default_netvm, True),
  489. ('default', default_netvm, True),
  490. ('none', '', True),
  491. (self.testvm.name, '', False),
  492. ('invalid', '', False)
  493. ])
  494. def test_019_guiagent_installed(self):
  495. self.setup_hvm()
  496. self.execute_tests('guiagent_installed', self.bool_test_values)
  497. def test_020_qrexec_installed(self):
  498. self.setup_hvm()
  499. self.execute_tests('qrexec_installed', self.bool_test_values)
  500. def test_021_seamless_gui_mode(self):
  501. self.setup_hvm()
  502. # should reject seamless mode without gui agent
  503. self.execute_tests('seamless_gui_mode', [
  504. ('True', '', False),
  505. ('False', 'False', True),
  506. ])
  507. self.execute_tests('guiagent_installed', [('True', 'True', True)])
  508. self.execute_tests('seamless_gui_mode', self.bool_test_values)
  509. def test_022_drive(self):
  510. self.setup_hvm()
  511. self.execute_tests('drive', [
  512. ('hd:dom0:/tmp/drive.img', 'hd:dom0:/tmp/drive.img', True),
  513. ('hd:/tmp/drive.img', 'hd:dom0:/tmp/drive.img', True),
  514. ('cdrom:dom0:/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  515. ('cdrom:/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  516. ('/tmp/drive.img', 'cdrom:dom0:/tmp/drive.img', True),
  517. ('hd:drive.img', '', False),
  518. ('drive.img', '', False),
  519. ])
  520. def test_023_timezone(self):
  521. self.setup_hvm()
  522. self.execute_tests('timezone', [
  523. ('localtime', 'localtime', True),
  524. ('0', '0', True),
  525. ('3600', '3600', True),
  526. ('-7200', '-7200', True),
  527. ('invalid', '', False),
  528. ])
  529. def test_024_pv_reject_hvm_props(self):
  530. self.setup_appvm()
  531. self.execute_tests('guiagent_installed', [('False', '', False)])
  532. self.execute_tests('qrexec_installed', [('False', '', False)])
  533. self.execute_tests('drive', [('/tmp/drive.img', '', False)])
  534. self.execute_tests('timezone', [('localtime', '', False)])
  535. def test_025_hvm_reject_pv_props(self):
  536. self.setup_hvm()
  537. self.execute_tests('kernel', [('default', '', False)])
  538. self.execute_tests('kernelopts', [('default', '', False)])
  539. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestsMixin,
  540. qubes.tests.QubesTestCase):
  541. def setup_pv_template(self):
  542. self.test_template = self.qc.add_new_vm(
  543. "QubesTemplateVm",
  544. name=self.make_vm_name("pv-clone"),
  545. )
  546. self.test_template.clone_attrs(src_vm=self.qc.get_default_template())
  547. self.test_template.clone_disk_files(
  548. src_vm=self.qc.get_default_template(),
  549. verbose=False)
  550. self.save_and_reload_db()
  551. self.qc.unlock_db()
  552. def setup_hvm_template(self):
  553. self.test_template = self.qc.add_new_vm(
  554. "QubesTemplateHVm",
  555. name=self.make_vm_name("hvm"),
  556. )
  557. self.test_template.create_on_disk(verbose=False)
  558. self.save_and_reload_db()
  559. self.qc.unlock_db()
  560. def get_rootimg_checksum(self):
  561. p = subprocess.Popen(['sha1sum', self.test_template.root_img],
  562. stdout=subprocess.PIPE)
  563. return p.communicate()[0]
  564. def _do_test(self):
  565. checksum_before = self.get_rootimg_checksum()
  566. self.test_template.start(verbose=False)
  567. self.shutdown_and_wait(self.test_template)
  568. checksum_changed = self.get_rootimg_checksum()
  569. if checksum_before == checksum_changed:
  570. self.log.warning("template not modified, test result will be "
  571. "unreliable")
  572. with self.assertNotRaises(subprocess.CalledProcessError):
  573. subprocess.check_call(['sudo', 'qvm-revert-template-changes',
  574. '--force', self.test_template.name])
  575. checksum_after = self.get_rootimg_checksum()
  576. self.assertEquals(checksum_before, checksum_after)
  577. def test_000_revert_pv(self):
  578. """
  579. Test qvm-revert-template-changes for PV template
  580. """
  581. self.setup_pv_template()
  582. self._do_test()
  583. def test_000_revert_hvm(self):
  584. """
  585. Test qvm-revert-template-changes for HVM template
  586. """
  587. # TODO: have some system there, so the root.img will get modified
  588. self.setup_hvm_template()
  589. self._do_test()
  590. class TC_04_DispVM(qubes.tests.SystemTestsMixin,
  591. qubes.tests.QubesTestCase):
  592. @staticmethod
  593. def get_dispvm_template_name():
  594. vmdir = os.readlink('/var/lib/qubes/dvmdata/vmdir')
  595. return os.path.basename(vmdir)
  596. def test_000_firewall_propagation(self):
  597. """
  598. Check firewall propagation VM->DispVM, when VM have some firewall rules
  599. """
  600. # FIXME: currently qubes.xml doesn't contain this information...
  601. dispvm_template_name = self.get_dispvm_template_name()
  602. dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
  603. testvm1 = self.qc.add_new_vm("QubesAppVm",
  604. name=self.make_vm_name('vm1'),
  605. template=self.qc.get_default_template())
  606. testvm1.create_on_disk(verbose=False)
  607. firewall = testvm1.get_firewall_conf()
  608. firewall['allowDns'] = False
  609. firewall['allowYumProxy'] = False
  610. firewall['rules'] = [{'address': '1.2.3.4',
  611. 'netmask': 24,
  612. 'proto': 'tcp',
  613. 'portBegin': 22,
  614. 'portEnd': 22,
  615. }]
  616. testvm1.write_firewall_conf(firewall)
  617. self.qc.save()
  618. self.qc.unlock_db()
  619. testvm1.start()
  620. p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
  621. " read x'",
  622. passio_popen=True)
  623. dispvm_name = p.stdout.readline().strip()
  624. self.qc.lock_db_for_reading()
  625. self.qc.load()
  626. self.qc.unlock_db()
  627. dispvm = self.qc.get_vm_by_name(dispvm_name)
  628. self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
  629. dispvm_name))
  630. # check if firewall was propagated to the DispVM
  631. self.assertEquals(testvm1.get_firewall_conf(),
  632. dispvm.get_firewall_conf())
  633. # and only there (#1608)
  634. self.assertNotEquals(dispvm_template.get_firewall_conf(),
  635. dispvm.get_firewall_conf())
  636. # then modify some rule
  637. firewall = dispvm.get_firewall_conf()
  638. firewall['rules'] = [{'address': '4.3.2.1',
  639. 'netmask': 24,
  640. 'proto': 'tcp',
  641. 'portBegin': 22,
  642. 'portEnd': 22,
  643. }]
  644. dispvm.write_firewall_conf(firewall)
  645. # and check again if wasn't saved anywhere else (#1608)
  646. self.assertNotEquals(dispvm_template.get_firewall_conf(),
  647. dispvm.get_firewall_conf())
  648. self.assertNotEquals(testvm1.get_firewall_conf(),
  649. dispvm.get_firewall_conf())
  650. p.stdin.write('\n')
  651. p.wait()
  652. def test_001_firewall_propagation(self):
  653. """
  654. Check firewall propagation VM->DispVM, when VM have no firewall rules
  655. """
  656. testvm1 = self.qc.add_new_vm("QubesAppVm",
  657. name=self.make_vm_name('vm1'),
  658. template=self.qc.get_default_template())
  659. testvm1.create_on_disk(verbose=False)
  660. self.qc.save()
  661. self.qc.unlock_db()
  662. # FIXME: currently qubes.xml doesn't contain this information...
  663. dispvm_template_name = self.get_dispvm_template_name()
  664. dispvm_template = self.qc.get_vm_by_name(dispvm_template_name)
  665. original_firewall = None
  666. if os.path.exists(dispvm_template.firewall_conf):
  667. original_firewall = tempfile.TemporaryFile()
  668. with open(dispvm_template.firewall_conf) as f:
  669. original_firewall.write(f.read())
  670. try:
  671. firewall = dispvm_template.get_firewall_conf()
  672. firewall['allowDns'] = False
  673. firewall['allowYumProxy'] = False
  674. firewall['rules'] = [{'address': '1.2.3.4',
  675. 'netmask': 24,
  676. 'proto': 'tcp',
  677. 'portBegin': 22,
  678. 'portEnd': 22,
  679. }]
  680. dispvm_template.write_firewall_conf(firewall)
  681. testvm1.start()
  682. p = testvm1.run("qvm-run --dispvm 'qubesdb-read /name; echo ERROR;"
  683. " read x'",
  684. passio_popen=True)
  685. dispvm_name = p.stdout.readline().strip()
  686. self.qc.lock_db_for_reading()
  687. self.qc.load()
  688. self.qc.unlock_db()
  689. dispvm = self.qc.get_vm_by_name(dispvm_name)
  690. self.assertIsNotNone(dispvm, "DispVM {} not found in qubes.xml".format(
  691. dispvm_name))
  692. # check if firewall was propagated to the DispVM from the right VM
  693. self.assertEquals(testvm1.get_firewall_conf(),
  694. dispvm.get_firewall_conf())
  695. # and only there (#1608)
  696. self.assertNotEquals(dispvm_template.get_firewall_conf(),
  697. dispvm.get_firewall_conf())
  698. # then modify some rule
  699. firewall = dispvm.get_firewall_conf()
  700. firewall['rules'] = [{'address': '4.3.2.1',
  701. 'netmask': 24,
  702. 'proto': 'tcp',
  703. 'portBegin': 22,
  704. 'portEnd': 22,
  705. }]
  706. dispvm.write_firewall_conf(firewall)
  707. # and check again if wasn't saved anywhere else (#1608)
  708. self.assertNotEquals(dispvm_template.get_firewall_conf(),
  709. dispvm.get_firewall_conf())
  710. self.assertNotEquals(testvm1.get_firewall_conf(),
  711. dispvm.get_firewall_conf())
  712. p.stdin.write('\n')
  713. p.wait()
  714. finally:
  715. if original_firewall:
  716. original_firewall.seek(0)
  717. with open(dispvm_template.firewall_conf, 'w') as f:
  718. f.write(original_firewall.read())
  719. original_firewall.close()
  720. else:
  721. os.unlink(dispvm_template.firewall_conf)
  722. def test_002_cleanup(self):
  723. self.qc.unlock_db()
  724. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  725. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  726. stdin=subprocess.PIPE,
  727. stdout=subprocess.PIPE,
  728. stderr=open(os.devnull, 'w'))
  729. (stdout, _) = p.communicate(input="echo test; qubesdb-read /name; "
  730. "echo ERROR\n")
  731. self.assertEquals(p.returncode, 0)
  732. lines = stdout.splitlines()
  733. self.assertEqual(lines[0], "test")
  734. dispvm_name = lines[1]
  735. self.qc.lock_db_for_reading()
  736. self.qc.load()
  737. self.qc.unlock_db()
  738. dispvm = self.qc.get_vm_by_name(dispvm_name)
  739. self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
  740. dispvm_name))
  741. def test_003_cleanup_destroyed(self):
  742. """
  743. Check if DispVM is properly removed even if it terminated itself (#1660)
  744. :return:
  745. """
  746. self.qc.unlock_db()
  747. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  748. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  749. stdin=subprocess.PIPE,
  750. stdout=subprocess.PIPE,
  751. stderr=open(os.devnull, 'w'))
  752. p.stdin.write("qubesdb-read /name\n")
  753. p.stdin.write("echo ERROR\n")
  754. p.stdin.write("poweroff\n")
  755. # do not close p.stdin on purpose - wait to automatic disconnect when
  756. # domain is destroyed
  757. timeout = 30
  758. while timeout > 0:
  759. if p.poll():
  760. break
  761. time.sleep(1)
  762. timeout -= 1
  763. # includes check for None - timeout
  764. self.assertEquals(p.returncode, 0)
  765. lines = p.stdout.read().splitlines()
  766. dispvm_name = lines[0]
  767. self.assertNotEquals(dispvm_name, "ERROR")
  768. self.qc.lock_db_for_reading()
  769. self.qc.load()
  770. self.qc.unlock_db()
  771. dispvm = self.qc.get_vm_by_name(dispvm_name)
  772. self.assertIsNone(dispvm, "DispVM {} still exists in qubes.xml".format(
  773. dispvm_name))
  774. # vim: ts=4 sw=4 et