basic.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769
  1. # pylint: disable=invalid-name
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2014-2015
  6. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  7. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  8. #
  9. # This library is free software; you can redistribute it and/or
  10. # modify it under the terms of the GNU Lesser General Public
  11. # License as published by the Free Software Foundation; either
  12. # version 2.1 of the License, or (at your option) any later version.
  13. #
  14. # This library is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  17. # Lesser General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Lesser General Public
  20. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  21. #
  22. from distutils import spawn
  23. import asyncio
  24. import os
  25. import subprocess
  26. import tempfile
  27. import time
  28. import unittest
  29. import collections
  30. import pkg_resources
  31. import shutil
  32. import sys
  33. import qubes
  34. import qubes.firewall
  35. import qubes.tests
  36. import qubes.storage
  37. import qubes.vm.appvm
  38. import qubes.vm.qubesvm
  39. import qubes.vm.standalonevm
  40. import qubes.vm.templatevm
  41. import libvirt # pylint: disable=import-error
  42. class TC_00_Basic(qubes.tests.SystemTestCase):
  43. def setUp(self):
  44. super(TC_00_Basic, self).setUp()
  45. self.init_default_template()
  46. def test_000_qubes_create(self):
  47. self.assertIsInstance(self.app, qubes.Qubes)
  48. def test_100_qvm_create(self):
  49. vmname = self.make_vm_name('appvm')
  50. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  51. name=vmname, template=self.app.default_template,
  52. label='red')
  53. self.assertIsNotNone(vm)
  54. self.assertEqual(vm.name, vmname)
  55. self.assertEqual(vm.template, self.app.default_template)
  56. self.loop.run_until_complete(vm.create_on_disk())
  57. with self.assertNotRaises(qubes.exc.QubesException):
  58. self.loop.run_until_complete(vm.storage.verify())
  59. def test_040_qdb_watch(self):
  60. flag = set()
  61. def handler(vm, event, path):
  62. if path == '/test-watch-path':
  63. flag.add(True)
  64. vm = self.app.domains[0]
  65. vm.watch_qdb_path('/test-watch-path')
  66. vm.add_handler('domain-qdb-change:/test-watch-path', handler)
  67. self.assertFalse(flag)
  68. vm.untrusted_qdb.write('/test-watch-path', 'test-value')
  69. self.loop.run_until_complete(asyncio.sleep(0.1))
  70. self.assertTrue(flag)
  71. @unittest.skipUnless(
  72. spawn.find_executable('xdotool'), "xdotool not installed")
  73. def test_120_start_standalone_with_cdrom_dom0(self):
  74. vmname = self.make_vm_name('appvm')
  75. self.vm = self.app.add_new_vm('StandaloneVM', label='red', name=vmname)
  76. self.loop.run_until_complete(self.vm.create_on_disk())
  77. self.vm.kernel = None
  78. self.vm.virt_mode = 'hvm'
  79. iso_path = self.create_bootable_iso()
  80. # start the VM using qvm-start tool, to test --cdrom option there
  81. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  82. 'qvm-start', '--cdrom=dom0:' + iso_path, self.vm.name,
  83. stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
  84. (stdout, _) = self.loop.run_until_complete(p.communicate())
  85. self.assertEqual(p.returncode, 0, stdout)
  86. # check if VM do not crash instantly
  87. self.loop.run_until_complete(asyncio.sleep(5))
  88. self.assertTrue(self.vm.is_running())
  89. # Type 'poweroff'
  90. subprocess.check_call(['xdotool', 'search', '--name', self.vm.name,
  91. 'type', '--window', '%1', 'poweroff\r'])
  92. for _ in range(10):
  93. if not self.vm.is_running():
  94. break
  95. self.loop.run_until_complete(asyncio.sleep(1))
  96. self.assertFalse(self.vm.is_running())
  97. def test_130_autostart_disable_on_remove(self):
  98. vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  99. name=self.make_vm_name('vm'),
  100. template=self.app.default_template,
  101. label='red')
  102. self.assertIsNotNone(vm)
  103. self.loop.run_until_complete(vm.create_on_disk())
  104. vm.autostart = True
  105. self.assertTrue(os.path.exists(
  106. '/etc/systemd/system/multi-user.target.wants/'
  107. 'qubes-vm@{}.service'.format(vm.name)),
  108. "systemd service not enabled by autostart=True")
  109. del self.app.domains[vm]
  110. self.loop.run_until_complete(vm.remove_from_disk())
  111. self.assertFalse(os.path.exists(
  112. '/etc/systemd/system/multi-user.target.wants/'
  113. 'qubes-vm@{}.service'.format(vm.name)),
  114. "systemd service not disabled on domain remove")
  115. def _test_200_on_domain_start(self, vm, event, **_kwargs):
  116. '''Simulate domain crash just after startup'''
  117. vm.libvirt_domain.destroy()
  118. def test_200_shutdown_event_race(self):
  119. '''Regression test for 3164'''
  120. vmname = self.make_vm_name('appvm')
  121. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  122. name=vmname, template=self.app.default_template,
  123. label='red')
  124. # help the luck a little - don't wait for qrexec to easier win the race
  125. self.vm.features['qrexec'] = False
  126. self.loop.run_until_complete(self.vm.create_on_disk())
  127. # another way to help the luck a little - make sure the private
  128. # volume is first in (normally unordered) dict - this way if any
  129. # volume action fails, it will be at or after private volume - not
  130. # before (preventing private volume action)
  131. old_volumes = self.vm.volumes
  132. self.vm.volumes = collections.OrderedDict()
  133. self.vm.volumes['private'] = old_volumes.pop('private')
  134. self.vm.volumes.update(old_volumes.items())
  135. del old_volumes
  136. self.loop.run_until_complete(self.vm.start())
  137. # kill it the way it does not give a chance for domain-shutdown it
  138. # execute
  139. self.vm.libvirt_domain.destroy()
  140. # now, lets try to start the VM again, before domain-shutdown event
  141. # got handled (#3164), and immediately trigger second domain-shutdown
  142. self.vm.add_handler('domain-start', self._test_200_on_domain_start)
  143. self.loop.run_until_complete(self.vm.start())
  144. # and give a chance for both domain-shutdown handlers to execute
  145. self.loop.run_until_complete(asyncio.sleep(1))
  146. with self.assertNotRaises(qubes.exc.QubesException):
  147. # if the above caused two domain-shutdown handlers being called
  148. # one after another, private volume is gone
  149. self.loop.run_until_complete(self.vm.storage.verify())
  150. def _test_201_on_domain_pre_start(self, vm, event, **_kwargs):
  151. '''Simulate domain crash just after startup'''
  152. if not self.domain_shutdown_handled and not self.test_failure_reason:
  153. self.test_failure_reason = \
  154. 'domain-shutdown event was not dispatched before subsequent ' \
  155. 'start'
  156. self.domain_shutdown_handled = False
  157. def _test_201_domain_shutdown_handler(self, vm, event, **kwargs):
  158. if self.domain_shutdown_handled and not self.test_failure_reason:
  159. self.test_failure_reason = 'domain-shutdown event received twice'
  160. self.domain_shutdown_handled = True
  161. def test_201_shutdown_event_race(self):
  162. '''Regression test for 3164 - pure events edition'''
  163. vmname = self.make_vm_name('appvm')
  164. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  165. name=vmname, template=self.app.default_template,
  166. label='red')
  167. # help the luck a little - don't wait for qrexec to easier win the race
  168. self.vm.features['qrexec'] = False
  169. self.loop.run_until_complete(self.vm.create_on_disk())
  170. # do not throw exception from inside event handler - test framework
  171. # will not recover from it (various objects leaks)
  172. self.test_failure_reason = None
  173. self.domain_shutdown_handled = False
  174. self.vm.add_handler('domain-shutdown',
  175. self._test_201_domain_shutdown_handler)
  176. self.loop.run_until_complete(self.vm.start())
  177. if self.test_failure_reason:
  178. self.fail(self.test_failure_reason)
  179. self.vm.add_handler('domain-pre-start',
  180. self._test_201_on_domain_pre_start)
  181. # kill it the way it does not give a chance for domain-shutdown it
  182. # execute
  183. self.vm.libvirt_domain.destroy()
  184. # now, lets try to start the VM again, before domain-shutdown event
  185. # got handled (#3164), and immediately trigger second domain-shutdown
  186. self.vm.add_handler('domain-start', self._test_200_on_domain_start)
  187. self.loop.run_until_complete(self.vm.start())
  188. if self.test_failure_reason:
  189. self.fail(self.test_failure_reason)
  190. while self.vm.get_power_state() != 'Halted':
  191. self.loop.run_until_complete(asyncio.sleep(1))
  192. # and give a chance for both domain-shutdown handlers to execute
  193. self.loop.run_until_complete(asyncio.sleep(3))
  194. # wait for running shutdown handler to complete
  195. self.loop.run_until_complete(self.vm._domain_stopped_lock.acquire())
  196. self.vm._domain_stopped_lock.release()
  197. if self.test_failure_reason:
  198. self.fail(self.test_failure_reason)
  199. self.assertTrue(self.domain_shutdown_handled,
  200. 'second domain-shutdown event was not dispatched after domain '
  201. 'shutdown')
  202. def _check_udev_for_uuid(self, uuid_value):
  203. udev_data_path = '/run/udev/data'
  204. for udev_item in os.listdir(udev_data_path):
  205. # check only block devices
  206. if not udev_item.startswith('b'):
  207. continue
  208. with open(os.path.join(udev_data_path, udev_item)) as udev_file:
  209. self.assertNotIn(uuid_value, udev_file.read(),
  210. 'udev parsed filesystem UUID! ' + udev_item)
  211. def assertVolumesExcludedFromUdev(self, vm):
  212. try:
  213. # first boot, mkfs private volume
  214. self.loop.run_until_complete(vm.start())
  215. self.loop.run_until_complete(self.wait_for_session(vm))
  216. # get private volume UUID
  217. private_uuid, _ = self.loop.run_until_complete(
  218. vm.run_for_stdio('blkid -o value /dev/xvdb', user='root'))
  219. private_uuid = private_uuid.decode().splitlines()[0]
  220. # now check if dom0 udev know about it - it shouldn't
  221. self._check_udev_for_uuid(private_uuid)
  222. # now restart the VM and check again
  223. self.loop.run_until_complete(vm.shutdown(wait=True))
  224. self.loop.run_until_complete(vm.start())
  225. self._check_udev_for_uuid(private_uuid)
  226. finally:
  227. del vm
  228. def _test_140_on_domain_paused(self, vm, event, **kwargs):
  229. self.domain_paused_received = True
  230. def test_140_libvirt_events_reconnect(self):
  231. vmname = self.make_vm_name('vm')
  232. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  233. name=vmname, template=self.app.default_template,
  234. label='red')
  235. self.loop.run_until_complete(self.vm.create_on_disk())
  236. self.loop.run_until_complete(self.vm.start())
  237. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  238. 'systemctl', 'restart', 'libvirtd'))
  239. self.loop.run_until_complete(p.communicate())
  240. # check if events still works
  241. self.domain_paused_received = False
  242. self.vm.add_handler('domain-paused', self._test_140_on_domain_paused)
  243. self.loop.run_until_complete(self.vm.pause())
  244. self.loop.run_until_complete(self.vm.kill())
  245. self.loop.run_until_complete(asyncio.sleep(1))
  246. self.assertTrue(self.domain_paused_received,
  247. 'event not received after libvirt restart')
  248. def test_141_libvirt_objects_reconnect(self):
  249. vmname = self.make_vm_name('vm')
  250. # make sure libvirt object is cached
  251. self.app.domains[0].libvirt_domain.isActive()
  252. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  253. 'systemctl', 'restart', 'libvirtd'))
  254. self.loop.run_until_complete(p.communicate())
  255. # trigger reconnect
  256. with self.assertNotRaises(libvirt.libvirtError):
  257. self.app.vmm.libvirt_conn.getHostname()
  258. # check if vm object still works
  259. with self.assertNotRaises(libvirt.libvirtError):
  260. self.app.domains[0].libvirt_domain.isActive()
  261. def test_202_udev_block_exclude_default(self):
  262. '''Check if VM images are excluded from udev parsing -
  263. default volume pool'''
  264. vmname = self.make_vm_name('appvm')
  265. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  266. name=vmname, template=self.app.default_template,
  267. label='red')
  268. self.loop.run_until_complete(self.vm.create_on_disk())
  269. self.assertVolumesExcludedFromUdev(self.vm)
  270. def test_203_udev_block_exclude_varlibqubes(self):
  271. '''Check if VM images are excluded from udev parsing -
  272. varlibqubes pool'''
  273. vmname = self.make_vm_name('appvm')
  274. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  275. name=vmname, template=self.app.default_template,
  276. label='red')
  277. self.loop.run_until_complete(self.vm.create_on_disk(
  278. pool=self.app.pools['varlibqubes']))
  279. self.assertVolumesExcludedFromUdev(self.vm)
  280. def test_204_udev_block_exclude_custom_file(self):
  281. '''Check if VM images are excluded from udev parsing -
  282. custom file pool'''
  283. vmname = self.make_vm_name('appvm')
  284. pool_path = tempfile.mkdtemp(
  285. prefix='qubes-pool-', dir='/var/tmp')
  286. self.addCleanup(shutil.rmtree, pool_path)
  287. pool = self.loop.run_until_complete(
  288. self.app.add_pool('test-filep', dir_path=pool_path, driver='file'))
  289. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  290. name=vmname, template=self.app.default_template,
  291. label='red')
  292. self.loop.run_until_complete(self.vm.create_on_disk(
  293. pool=pool))
  294. self.assertVolumesExcludedFromUdev(self.vm)
  295. class TC_01_Properties(qubes.tests.SystemTestCase):
  296. # pylint: disable=attribute-defined-outside-init
  297. def setUp(self):
  298. super(TC_01_Properties, self).setUp()
  299. self.init_default_template()
  300. self.vmname = self.make_vm_name('appvm')
  301. self.vm = self.app.add_new_vm(qubes.vm.appvm.AppVM, name=self.vmname,
  302. template=self.app.default_template,
  303. label='red')
  304. self.loop.run_until_complete(self.vm.create_on_disk())
  305. self.addCleanup(self.cleanup_props)
  306. def cleanup_props(self):
  307. del self.vm
  308. def test_030_clone(self):
  309. try:
  310. testvm1 = self.app.add_new_vm(
  311. qubes.vm.appvm.AppVM,
  312. name=self.make_vm_name("vm"),
  313. template=self.app.default_template,
  314. label='red')
  315. self.loop.run_until_complete(testvm1.create_on_disk())
  316. testvm2 = self.app.add_new_vm(testvm1.__class__,
  317. name=self.make_vm_name("clone"),
  318. template=testvm1.template,
  319. label='red')
  320. testvm2.clone_properties(testvm1)
  321. testvm2.firewall.clone(testvm1.firewall)
  322. self.loop.run_until_complete(testvm2.clone_disk_files(testvm1))
  323. self.assertTrue(self.loop.run_until_complete(testvm1.storage.verify()))
  324. self.assertIn('source', testvm1.volumes['root'].config)
  325. self.assertNotEquals(testvm2, None)
  326. self.assertNotEquals(testvm2.volumes, {})
  327. self.assertIn('source', testvm2.volumes['root'].config)
  328. # qubes.xml reload
  329. self.app.save()
  330. testvm1 = self.app.domains[testvm1.qid]
  331. testvm2 = self.app.domains[testvm2.qid]
  332. self.assertEqual(testvm1.label, testvm2.label)
  333. self.assertEqual(testvm1.netvm, testvm2.netvm)
  334. self.assertEqual(testvm1.property_is_default('netvm'),
  335. testvm2.property_is_default('netvm'))
  336. self.assertEqual(testvm1.kernel, testvm2.kernel)
  337. self.assertEqual(testvm1.kernelopts, testvm2.kernelopts)
  338. self.assertEqual(testvm1.property_is_default('kernel'),
  339. testvm2.property_is_default('kernel'))
  340. self.assertEqual(testvm1.property_is_default('kernelopts'),
  341. testvm2.property_is_default('kernelopts'))
  342. self.assertEqual(testvm1.memory, testvm2.memory)
  343. self.assertEqual(testvm1.maxmem, testvm2.maxmem)
  344. self.assertEqual(testvm1.devices, testvm2.devices)
  345. self.assertEqual(testvm1.include_in_backups,
  346. testvm2.include_in_backups)
  347. self.assertEqual(testvm1.default_user, testvm2.default_user)
  348. self.assertEqual(testvm1.features, testvm2.features)
  349. self.assertEqual(testvm1.firewall.rules,
  350. testvm2.firewall.rules)
  351. # now some non-default values
  352. testvm1.netvm = None
  353. testvm1.label = 'orange'
  354. testvm1.memory = 512
  355. firewall = testvm1.firewall
  356. firewall.rules = [
  357. qubes.firewall.Rule(None, action='accept', dsthost='1.2.3.0/24',
  358. proto='tcp', dstports=22)]
  359. firewall.save()
  360. testvm3 = self.app.add_new_vm(testvm1.__class__,
  361. name=self.make_vm_name("clone2"),
  362. template=testvm1.template,
  363. label='red',)
  364. testvm3.clone_properties(testvm1)
  365. testvm3.firewall.clone(testvm1.firewall)
  366. self.loop.run_until_complete(testvm3.clone_disk_files(testvm1))
  367. # qubes.xml reload
  368. self.app.save()
  369. testvm1 = self.app.domains[testvm1.qid]
  370. testvm3 = self.app.domains[testvm3.qid]
  371. self.assertEqual(testvm1.label, testvm3.label)
  372. self.assertEqual(testvm1.netvm, testvm3.netvm)
  373. self.assertEqual(testvm1.property_is_default('netvm'),
  374. testvm3.property_is_default('netvm'))
  375. self.assertEqual(testvm1.kernel, testvm3.kernel)
  376. self.assertEqual(testvm1.kernelopts, testvm3.kernelopts)
  377. self.assertEqual(testvm1.property_is_default('kernel'),
  378. testvm3.property_is_default('kernel'))
  379. self.assertEqual(testvm1.property_is_default('kernelopts'),
  380. testvm3.property_is_default('kernelopts'))
  381. self.assertEqual(testvm1.memory, testvm3.memory)
  382. self.assertEqual(testvm1.maxmem, testvm3.maxmem)
  383. self.assertEqual(testvm1.devices, testvm3.devices)
  384. self.assertEqual(testvm1.include_in_backups,
  385. testvm3.include_in_backups)
  386. self.assertEqual(testvm1.default_user, testvm3.default_user)
  387. self.assertEqual(testvm1.features, testvm3.features)
  388. self.assertEqual(testvm1.firewall.rules,
  389. testvm3.firewall.rules)
  390. finally:
  391. try:
  392. del firewall
  393. except NameError:
  394. pass
  395. try:
  396. del testvm1
  397. except NameError:
  398. pass
  399. try:
  400. del testvm2
  401. except NameError:
  402. pass
  403. try:
  404. del testvm3
  405. except NameError:
  406. pass
  407. def test_020_name_conflict_app(self):
  408. # TODO decide what exception should be here
  409. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  410. self.vm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  411. name=self.vmname, template=self.app.default_template,
  412. label='red')
  413. self.loop.run_until_complete(self.vm2.create_on_disk())
  414. def test_021_name_conflict_template(self):
  415. # TODO decide what exception should be here
  416. with self.assertRaises((qubes.exc.QubesException, ValueError)):
  417. self.vm2 = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM,
  418. name=self.vmname, label='red')
  419. self.loop.run_until_complete(self.vm2.create_on_disk())
  420. class TC_03_QvmRevertTemplateChanges(qubes.tests.SystemTestCase):
  421. # pylint: disable=attribute-defined-outside-init
  422. def setUp(self):
  423. super(TC_03_QvmRevertTemplateChanges, self).setUp()
  424. self.init_default_template()
  425. def cleanup_template(self):
  426. del self.test_template
  427. def setup_template(self):
  428. self.test_template = self.app.add_new_vm(
  429. qubes.vm.templatevm.TemplateVM,
  430. name=self.make_vm_name("pv-clone"),
  431. label='red'
  432. )
  433. self.addCleanup(self.cleanup_template)
  434. self.test_template.clone_properties(self.app.default_template)
  435. self.test_template.features.update(self.app.default_template.features)
  436. self.test_template.tags.update(self.app.default_template.tags)
  437. self.loop.run_until_complete(
  438. self.test_template.clone_disk_files(self.app.default_template))
  439. self.test_template.volumes['root'].revisions_to_keep = 3
  440. self.app.save()
  441. def get_rootimg_checksum(self):
  442. path = self.loop.run_until_complete(
  443. self.test_template.storage.export('root'))
  444. try:
  445. return subprocess.check_output(['sha1sum', path]).\
  446. decode().split(' ')[0]
  447. finally:
  448. self.loop.run_until_complete(
  449. self.test_template.storage.export_end('root', path))
  450. def _do_test(self):
  451. checksum_before = self.get_rootimg_checksum()
  452. self.loop.run_until_complete(self.test_template.start())
  453. self.loop.run_until_complete(self.wait_for_session(self.test_template))
  454. self.shutdown_and_wait(self.test_template)
  455. checksum_changed = self.get_rootimg_checksum()
  456. if checksum_before == checksum_changed:
  457. self.log.warning("template not modified, test result will be "
  458. "unreliable")
  459. self.assertNotEqual(self.test_template.volumes['root'].revisions, {})
  460. revert_cmd = ['qvm-volume', 'revert', self.test_template.name + ':root']
  461. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  462. *revert_cmd))
  463. self.loop.run_until_complete(p.wait())
  464. self.assertEqual(p.returncode, 0)
  465. del p
  466. checksum_after = self.get_rootimg_checksum()
  467. self.assertEqual(checksum_before, checksum_after)
  468. def test_000_revert_linux(self):
  469. """
  470. Test qvm-revert-template-changes for PV template
  471. """
  472. self.setup_template()
  473. self._do_test()
  474. @unittest.skip('TODO: some non-linux system')
  475. def test_001_revert_non_linux(self):
  476. """
  477. Test qvm-revert-template-changes for HVM template
  478. """
  479. # TODO: have some system there, so the root.img will get modified
  480. self.setup_template()
  481. self._do_test()
  482. class TC_30_Gui_daemon(qubes.tests.SystemTestCase):
  483. def setUp(self):
  484. super(TC_30_Gui_daemon, self).setUp()
  485. self.init_default_template()
  486. @unittest.skipUnless(
  487. spawn.find_executable('xdotool'),
  488. "xdotool not installed")
  489. def test_000_clipboard(self):
  490. testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  491. name=self.make_vm_name('vm1'), label='red')
  492. self.loop.run_until_complete(testvm1.create_on_disk())
  493. testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  494. name=self.make_vm_name('vm2'), label='red')
  495. self.loop.run_until_complete(testvm2.create_on_disk())
  496. self.app.save()
  497. self.loop.run_until_complete(asyncio.wait([
  498. testvm1.start(),
  499. testvm2.start()]))
  500. self.loop.run_until_complete(asyncio.wait([
  501. self.wait_for_session(testvm1),
  502. self.wait_for_session(testvm2)]))
  503. window_title = 'user@{}'.format(testvm1.name)
  504. self.loop.run_until_complete(testvm1.run(
  505. 'zenity --text-info --editable --title={}'.format(window_title)))
  506. self.wait_for_window(window_title)
  507. time.sleep(0.5)
  508. test_string = "test{}".format(testvm1.xid)
  509. # Type and copy some text
  510. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  511. 'windowactivate', '--sync',
  512. 'type', test_string])
  513. # second xdotool call because type --terminator do not work (SEGV)
  514. # additionally do not use search here, so window stack will be empty
  515. # and xdotool will use XTEST instead of generating events manually -
  516. # this will be much better - at least because events will have
  517. # correct timestamp (so gui-daemon would not drop the copy request)
  518. subprocess.check_call(['xdotool',
  519. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  520. 'Escape'])
  521. clipboard_content = \
  522. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  523. self.assertEqual(clipboard_content, test_string,
  524. "Clipboard copy operation failed - content")
  525. clipboard_source = \
  526. open('/var/run/qubes/qubes-clipboard.bin.source',
  527. 'r').read().strip()
  528. self.assertEqual(clipboard_source, testvm1.name,
  529. "Clipboard copy operation failed - owner")
  530. # Then paste it to the other window
  531. window_title = 'user@{}'.format(testvm2.name)
  532. p = self.loop.run_until_complete(testvm2.run(
  533. 'zenity --entry --title={} > /tmp/test.txt'.format(window_title)))
  534. self.wait_for_window(window_title)
  535. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  536. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  537. self.loop.run_until_complete(p.wait())
  538. # And compare the result
  539. (test_output, _) = self.loop.run_until_complete(
  540. testvm2.run_for_stdio('cat /tmp/test.txt'))
  541. self.assertEqual(test_string, test_output.strip().decode('ascii'))
  542. clipboard_content = \
  543. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  544. self.assertEqual(clipboard_content, "",
  545. "Clipboard not wiped after paste - content")
  546. clipboard_source = \
  547. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').\
  548. read().strip()
  549. self.assertEqual(clipboard_source, "",
  550. "Clipboard not wiped after paste - owner")
  551. class TC_05_StandaloneVMMixin(object):
  552. def setUp(self):
  553. super(TC_05_StandaloneVMMixin, self).setUp()
  554. self.init_default_template(self.template)
  555. def test_000_create_start(self):
  556. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  557. name=self.make_vm_name('vm1'), label='red')
  558. self.testvm1.features.update(self.app.default_template.features)
  559. self.loop.run_until_complete(
  560. self.testvm1.clone_disk_files(self.app.default_template))
  561. self.app.save()
  562. self.loop.run_until_complete(self.testvm1.start())
  563. self.assertEqual(self.testvm1.get_power_state(), "Running")
  564. def test_100_resize_root_img(self):
  565. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  566. name=self.make_vm_name('vm1'), label='red')
  567. self.testvm1.features.update(self.app.default_template.features)
  568. self.loop.run_until_complete(
  569. self.testvm1.clone_disk_files(self.app.default_template))
  570. self.app.save()
  571. try:
  572. self.loop.run_until_complete(
  573. self.testvm1.storage.resize(self.testvm1.volumes['root'],
  574. 20 * 1024 ** 3))
  575. except (subprocess.CalledProcessError,
  576. qubes.storage.StoragePoolException) as e:
  577. # exception object would leak VM reference
  578. self.fail(str(e))
  579. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  580. self.loop.run_until_complete(self.testvm1.start())
  581. # new_size in 1k-blocks
  582. (new_size, _) = self.loop.run_until_complete(
  583. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  584. # some safety margin for FS metadata
  585. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  586. def test_101_resize_root_img_online(self):
  587. self.testvm1 = self.app.add_new_vm(qubes.vm.standalonevm.StandaloneVM,
  588. name=self.make_vm_name('vm1'), label='red')
  589. self.testvm1.features['qrexec'] = True
  590. self.loop.run_until_complete(
  591. self.testvm1.clone_disk_files(self.app.default_template))
  592. self.testvm1.features.update(self.app.default_template.features)
  593. self.app.save()
  594. self.loop.run_until_complete(self.testvm1.start())
  595. try:
  596. self.loop.run_until_complete(
  597. self.testvm1.storage.resize(self.testvm1.volumes['root'],
  598. 20 * 1024 ** 3))
  599. except (subprocess.CalledProcessError,
  600. qubes.storage.StoragePoolException) as e:
  601. # exception object would leak VM reference
  602. self.fail(str(e))
  603. self.assertEqual(self.testvm1.volumes['root'].size, 20 * 1024 ** 3)
  604. # new_size in 1k-blocks
  605. (new_size, _) = self.loop.run_until_complete(
  606. self.testvm1.run_for_stdio('df --output=size /|tail -n 1'))
  607. # some safety margin for FS metadata
  608. self.assertGreater(int(new_size.strip()), 19 * 1024 ** 2)
  609. class TC_06_AppVMMixin(object):
  610. template = None
  611. def setUp(self):
  612. super(TC_06_AppVMMixin, self).setUp()
  613. self.init_default_template(self.template)
  614. @unittest.skipUnless(
  615. spawn.find_executable('xdotool'), "xdotool not installed")
  616. def test_121_start_standalone_with_cdrom_vm(self):
  617. cdrom_vmname = self.make_vm_name('cdrom')
  618. self.cdrom_vm = self.app.add_new_vm('AppVM', label='red',
  619. name=cdrom_vmname)
  620. self.loop.run_until_complete(self.cdrom_vm.create_on_disk())
  621. self.loop.run_until_complete(self.cdrom_vm.start())
  622. iso_path = self.create_bootable_iso()
  623. with open(iso_path, 'rb') as iso_f:
  624. self.loop.run_until_complete(
  625. self.cdrom_vm.run_for_stdio('cat > /home/user/boot.iso',
  626. stdin=iso_f))
  627. vmname = self.make_vm_name('appvm')
  628. self.vm = self.app.add_new_vm('StandaloneVM', label='red', name=vmname)
  629. self.loop.run_until_complete(self.vm.create_on_disk())
  630. self.vm.kernel = None
  631. self.vm.virt_mode = 'hvm'
  632. # start the VM using qvm-start tool, to test --cdrom option there
  633. p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
  634. 'qvm-start', '--cdrom=' + cdrom_vmname + ':/home/user/boot.iso',
  635. self.vm.name,
  636. stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
  637. (stdout, _) = self.loop.run_until_complete(p.communicate())
  638. self.assertEqual(p.returncode, 0, stdout)
  639. # check if VM do not crash instantly
  640. self.loop.run_until_complete(asyncio.sleep(5))
  641. self.assertTrue(self.vm.is_running())
  642. # Type 'poweroff'
  643. subprocess.check_call(['xdotool', 'search', '--name', self.vm.name,
  644. 'type', '--window', '%1', 'poweroff\r'])
  645. for _ in range(10):
  646. if not self.vm.is_running():
  647. break
  648. self.loop.run_until_complete(asyncio.sleep(1))
  649. self.assertFalse(self.vm.is_running())
  650. def create_testcases_for_templates():
  651. yield from qubes.tests.create_testcases_for_templates('TC_05_StandaloneVM',
  652. TC_05_StandaloneVMMixin, qubes.tests.SystemTestCase,
  653. module=sys.modules[__name__])
  654. yield from qubes.tests.create_testcases_for_templates('TC_06_AppVM',
  655. TC_06_AppVMMixin, qubes.tests.SystemTestCase,
  656. module=sys.modules[__name__])
  657. def load_tests(loader, tests, pattern):
  658. tests.addTests(loader.loadTestsFromNames(
  659. create_testcases_for_templates()))
  660. return tests
  661. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)
  662. # vim: ts=4 sw=4 et