vm_qrexec_gui.py 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. from distutils import spawn
  25. import multiprocessing
  26. import os
  27. import subprocess
  28. import unittest
  29. import time
  30. from qubes.qubes import QubesVmCollection, defaults, QubesException
  31. import qubes.tests
  32. import re
  33. TEST_DATA = "0123456789" * 1024
  34. class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
  35. def setUp(self):
  36. super(TC_00_AppVMMixin, self).setUp()
  37. self.testvm1 = self.qc.add_new_vm(
  38. "QubesAppVm",
  39. name=self.make_vm_name('vm1'),
  40. template=self.qc.get_vm_by_name(self.template))
  41. self.testvm1.create_on_disk(verbose=False)
  42. self.testvm2 = self.qc.add_new_vm(
  43. "QubesAppVm",
  44. name=self.make_vm_name('vm2'),
  45. template=self.qc.get_vm_by_name(self.template))
  46. self.testvm2.create_on_disk(verbose=False)
  47. self.save_and_reload_db()
  48. self.qc.unlock_db()
  49. self.testvm1 = self.qc[self.testvm1.qid]
  50. self.testvm2 = self.qc[self.testvm2.qid]
  51. def test_000_start_shutdown(self):
  52. self.testvm1.start()
  53. self.assertEquals(self.testvm1.get_power_state(), "Running")
  54. self.testvm1.shutdown()
  55. shutdown_counter = 0
  56. while self.testvm1.is_running():
  57. if shutdown_counter > defaults["shutdown_counter_max"]:
  58. self.fail("VM hanged during shutdown")
  59. shutdown_counter += 1
  60. time.sleep(1)
  61. time.sleep(1)
  62. self.assertEquals(self.testvm1.get_power_state(), "Halted")
  63. @unittest.skipUnless(spawn.find_executable('xdotool'),
  64. "xdotool not installed")
  65. def test_010_run_xterm(self):
  66. self.testvm1.start()
  67. self.assertEquals(self.testvm1.get_power_state(), "Running")
  68. self.testvm1.run("xterm")
  69. wait_count = 0
  70. title = 'user@{}'.format(self.testvm1.name)
  71. if self.template.count("whonix"):
  72. title = 'user@host'
  73. while subprocess.call(
  74. ['xdotool', 'search', '--name', title],
  75. stdout=open(os.path.devnull, 'w'),
  76. stderr=subprocess.STDOUT) > 0:
  77. wait_count += 1
  78. if wait_count > 100:
  79. self.fail("Timeout while waiting for xterm window")
  80. time.sleep(0.1)
  81. time.sleep(0.5)
  82. subprocess.check_call(
  83. ['xdotool', 'search', '--name', title,
  84. 'windowactivate', 'type', 'exit\n'])
  85. wait_count = 0
  86. while subprocess.call(['xdotool', 'search', '--name', title],
  87. stdout=open(os.path.devnull, 'w'),
  88. stderr=subprocess.STDOUT) == 0:
  89. wait_count += 1
  90. if wait_count > 100:
  91. self.fail("Timeout while waiting for xterm "
  92. "termination")
  93. time.sleep(0.1)
  94. @unittest.skipUnless(spawn.find_executable('xdotool'),
  95. "xdotool not installed")
  96. def test_011_run_gnome_terminal(self):
  97. if "minimal" in self.template:
  98. self.skipTest("Minimal template doesn't have 'gnome-terminal'")
  99. self.testvm1.start()
  100. self.assertEquals(self.testvm1.get_power_state(), "Running")
  101. self.testvm1.run("gnome-terminal")
  102. title = 'user@{}'.format(self.testvm1.name)
  103. if self.template.count("whonix"):
  104. title = 'user@host'
  105. wait_count = 0
  106. while subprocess.call(
  107. ['xdotool', 'search', '--name', title],
  108. stdout=open(os.path.devnull, 'w'),
  109. stderr=subprocess.STDOUT) > 0:
  110. wait_count += 1
  111. if wait_count > 100:
  112. self.fail("Timeout while waiting for gnome-terminal window")
  113. time.sleep(0.1)
  114. time.sleep(0.5)
  115. subprocess.check_call(
  116. ['xdotool', 'search', '--name', title,
  117. 'windowactivate', 'type', 'exit\n'])
  118. wait_count = 0
  119. while subprocess.call(['xdotool', 'search', '--name', title],
  120. stdout=open(os.path.devnull, 'w'),
  121. stderr=subprocess.STDOUT) == 0:
  122. wait_count += 1
  123. if wait_count > 100:
  124. self.fail("Timeout while waiting for gnome-terminal "
  125. "termination")
  126. time.sleep(0.1)
  127. @unittest.skipUnless(spawn.find_executable('xdotool'),
  128. "xdotool not installed")
  129. def test_012_qubes_desktop_run(self):
  130. self.testvm1.start()
  131. self.assertEquals(self.testvm1.get_power_state(), "Running")
  132. xterm_desktop_path = "/usr/share/applications/xterm.desktop"
  133. # Debian has it different...
  134. xterm_desktop_path_debian = \
  135. "/usr/share/applications/debian-xterm.desktop"
  136. if self.testvm1.run("test -r {}".format(xterm_desktop_path_debian),
  137. wait=True) == 0:
  138. xterm_desktop_path = xterm_desktop_path_debian
  139. self.testvm1.run("qubes-desktop-run {}".format(xterm_desktop_path))
  140. title = 'user@{}'.format(self.testvm1.name)
  141. if self.template.count("whonix"):
  142. title = 'user@host'
  143. wait_count = 0
  144. while subprocess.call(
  145. ['xdotool', 'search', '--name', title],
  146. stdout=open(os.path.devnull, 'w'),
  147. stderr=subprocess.STDOUT) > 0:
  148. wait_count += 1
  149. if wait_count > 100:
  150. self.fail("Timeout while waiting for xterm window")
  151. time.sleep(0.1)
  152. time.sleep(0.5)
  153. subprocess.check_call(
  154. ['xdotool', 'search', '--name', title,
  155. 'windowactivate', 'type', 'exit\n'])
  156. wait_count = 0
  157. while subprocess.call(['xdotool', 'search', '--name', title],
  158. stdout=open(os.path.devnull, 'w'),
  159. stderr=subprocess.STDOUT) == 0:
  160. wait_count += 1
  161. if wait_count > 100:
  162. self.fail("Timeout while waiting for xterm "
  163. "termination")
  164. time.sleep(0.1)
  165. def test_050_qrexec_simple_eof(self):
  166. """Test for data and EOF transmission dom0->VM"""
  167. result = multiprocessing.Value('i', 0)
  168. def run(self, result):
  169. p = self.testvm1.run("cat", passio_popen=True,
  170. passio_stderr=True)
  171. (stdout, stderr) = p.communicate(TEST_DATA)
  172. if stdout != TEST_DATA:
  173. result.value = 1
  174. if len(stderr) > 0:
  175. result.value = 2
  176. self.testvm1.start()
  177. t = multiprocessing.Process(target=run, args=(self, result))
  178. t.start()
  179. t.join(timeout=10)
  180. if t.is_alive():
  181. t.terminate()
  182. self.fail("Timeout, probably EOF wasn't transferred to the VM "
  183. "process")
  184. if result.value == 1:
  185. self.fail("Received data differs from what was sent")
  186. elif result.value == 2:
  187. self.fail("Some data was printed to stderr")
  188. def test_051_qrexec_simple_eof_reverse(self):
  189. """Test for EOF transmission VM->dom0"""
  190. result = multiprocessing.Value('i', 0)
  191. def run(self, result):
  192. p = self.testvm1.run("echo test; exec >&-; cat > /dev/null",
  193. passio_popen=True, passio_stderr=True)
  194. # this will hang on test failure
  195. stdout = p.stdout.read()
  196. p.stdin.write(TEST_DATA)
  197. p.stdin.close()
  198. if stdout.strip() != "test":
  199. result.value = 1
  200. # this may hang in some buggy cases
  201. elif len(p.stderr.read()) > 0:
  202. result.value = 2
  203. elif p.poll() is None:
  204. time.sleep(1)
  205. if p.poll() is None:
  206. result.value = 3
  207. self.testvm1.start()
  208. t = multiprocessing.Process(target=run, args=(self, result))
  209. t.start()
  210. t.join(timeout=10)
  211. if t.is_alive():
  212. t.terminate()
  213. self.fail("Timeout, probably EOF wasn't transferred from the VM "
  214. "process")
  215. if result.value == 1:
  216. self.fail("Received data differs from what was expected")
  217. elif result.value == 2:
  218. self.fail("Some data was printed to stderr")
  219. elif result.value == 3:
  220. self.fail("VM proceess didn't terminated on EOF")
  221. def test_052_qrexec_vm_service_eof(self):
  222. """Test for EOF transmission VM(src)->VM(dst)"""
  223. result = multiprocessing.Value('i', 0)
  224. def run(self, result):
  225. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.EOF "
  226. "/bin/sh -c 'echo test; exec >&-; cat "
  227. ">&$SAVED_FD_1'" % self.testvm2.name,
  228. passio_popen=True)
  229. (stdout, stderr) = p.communicate()
  230. if stdout != "test\n":
  231. result.value = 1
  232. self.testvm1.start()
  233. self.testvm2.start()
  234. p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
  235. passio_popen=True)
  236. p.stdin.write("/bin/cat")
  237. p.stdin.close()
  238. p.wait()
  239. policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
  240. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  241. policy.close()
  242. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.EOF")
  243. t = multiprocessing.Process(target=run, args=(self, result))
  244. t.start()
  245. t.join(timeout=10)
  246. if t.is_alive():
  247. t.terminate()
  248. self.fail("Timeout, probably EOF wasn't transferred")
  249. if result.value == 1:
  250. self.fail("Received data differs from what was expected")
  251. @unittest.expectedFailure
  252. def test_053_qrexec_vm_service_eof_reverse(self):
  253. """Test for EOF transmission VM(src)<-VM(dst)"""
  254. result = multiprocessing.Value('i', 0)
  255. def run(self, result):
  256. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.EOF "
  257. "/bin/sh -c 'cat >&$SAVED_FD_1'"
  258. % self.testvm2.name,
  259. passio_popen=True)
  260. (stdout, stderr) = p.communicate()
  261. if stdout != "test\n":
  262. result.value = 1
  263. self.testvm1.start()
  264. self.testvm2.start()
  265. p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
  266. passio_popen=True)
  267. p.stdin.write("echo test; exec >&-; cat >/dev/null")
  268. p.stdin.close()
  269. p.wait()
  270. policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
  271. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  272. policy.close()
  273. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.EOF")
  274. t = multiprocessing.Process(target=run, args=(self, result))
  275. t.start()
  276. t.join(timeout=10)
  277. if t.is_alive():
  278. t.terminate()
  279. self.fail("Timeout, probably EOF wasn't transferred")
  280. if result.value == 1:
  281. self.fail("Received data differs from what was expected")
  282. def test_055_qrexec_dom0_service_abort(self):
  283. """
  284. Test if service abort (by dom0) is properly handled by source VM.
  285. If "remote" part of the service terminates, the source part should
  286. properly be notified. This includes closing its stdin (which is
  287. already checked by test_053_qrexec_vm_service_eof_reverse), but also
  288. its stdout - otherwise such service might hang on write(2) call.
  289. """
  290. def run (src):
  291. p = src.run("/usr/lib/qubes/qrexec-client-vm dom0 "
  292. "test.Abort /bin/cat /dev/zero",
  293. passio_popen=True)
  294. p.communicate()
  295. p.wait()
  296. self.testvm1.start()
  297. service = open("/etc/qubes-rpc/test.Abort", "w")
  298. service.write("sleep 1")
  299. service.close()
  300. self.addCleanup(os.unlink, "/etc/qubes-rpc/test.Abort")
  301. policy = open("/etc/qubes-rpc/policy/test.Abort", "w")
  302. policy.write("%s dom0 allow" % (self.testvm1.name))
  303. policy.close()
  304. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Abort")
  305. t = multiprocessing.Process(target=run, args=(self.testvm1,))
  306. t.start()
  307. t.join(timeout=10)
  308. if t.is_alive():
  309. t.terminate()
  310. self.fail("Timeout, probably stdout wasn't closed")
  311. def test_060_qrexec_exit_code_dom0(self):
  312. self.testvm1.start()
  313. p = self.testvm1.run("exit 0", passio_popen=True)
  314. p.wait()
  315. self.assertEqual(0, p.returncode)
  316. p = self.testvm1.run("exit 3", passio_popen=True)
  317. p.wait()
  318. self.assertEqual(3, p.returncode)
  319. @unittest.expectedFailure
  320. def test_065_qrexec_exit_code_vm(self):
  321. self.testvm1.start()
  322. self.testvm2.start()
  323. policy = open("/etc/qubes-rpc/policy/test.Retcode", "w")
  324. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  325. policy.close()
  326. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Retcode")
  327. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
  328. passio_popen=True)
  329. p.stdin.write("exit 0")
  330. p.stdin.close()
  331. p.wait()
  332. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.Retcode "
  333. "/bin/sh -c 'cat >/dev/null'; echo $?"
  334. % self.testvm1.name,
  335. passio_popen=True)
  336. (stdout, stderr) = p.communicate()
  337. self.assertEqual(stdout, "0\n")
  338. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
  339. passio_popen=True)
  340. p.stdin.write("exit 3")
  341. p.stdin.close()
  342. p.wait()
  343. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.Retcode "
  344. "/bin/sh -c 'cat >/dev/null'; echo $?"
  345. % self.testvm1.name,
  346. passio_popen=True)
  347. (stdout, stderr) = p.communicate()
  348. self.assertEqual(stdout, "3\n")
  349. def test_070_qrexec_vm_simultaneous_write(self):
  350. """Test for simultaneous write in VM(src)->VM(dst) connection
  351. This is regression test for #1347
  352. Check for deadlock when initially both sides writes a lot of data
  353. (and not read anything). When one side starts reading, it should
  354. get the data and the remote side should be possible to write then more.
  355. There was a bug where remote side was waiting on write(2) and not
  356. handling anything else.
  357. """
  358. result = multiprocessing.Value('i', -1)
  359. def run(self):
  360. p = self.testvm1.run(
  361. "/usr/lib/qubes/qrexec-client-vm %s test.write "
  362. "/bin/sh -c '"
  363. # first write a lot of data to fill all the buffers
  364. "dd if=/dev/zero bs=993 count=10000 iflag=fullblock & "
  365. # then after some time start reading
  366. "sleep 1; "
  367. "dd of=/dev/null bs=993 count=10000 iflag=fullblock; "
  368. "wait"
  369. "'" % self.testvm2.name, passio_popen=True)
  370. p.communicate()
  371. result.value = p.returncode
  372. self.testvm1.start()
  373. self.testvm2.start()
  374. p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
  375. passio_popen=True)
  376. # first write a lot of data
  377. p.stdin.write("dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n")
  378. # and only then read something
  379. p.stdin.write("dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
  380. p.stdin.close()
  381. p.wait()
  382. policy = open("/etc/qubes-rpc/policy/test.write", "w")
  383. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  384. policy.close()
  385. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.write")
  386. t = multiprocessing.Process(target=run, args=(self,))
  387. t.start()
  388. t.join(timeout=10)
  389. if t.is_alive():
  390. t.terminate()
  391. self.fail("Timeout, probably deadlock")
  392. self.assertEqual(result.value, 0, "Service call failed")
  393. def test_071_qrexec_dom0_simultaneous_write(self):
  394. """Test for simultaneous write in dom0(src)->VM(dst) connection
  395. Similar to test_070_qrexec_vm_simultaneous_write, but with dom0
  396. as a source.
  397. """
  398. result = multiprocessing.Value('i', -1)
  399. def run(self):
  400. result.value = self.testvm2.run_service(
  401. "test.write", localcmd="/bin/sh -c '"
  402. # first write a lot of data to fill all the buffers
  403. "dd if=/dev/zero bs=993 count=10000 iflag=fullblock & "
  404. # then after some time start reading
  405. "sleep 1; "
  406. "dd of=/dev/null bs=993 count=10000 iflag=fullblock; "
  407. "wait"
  408. "'")
  409. self.testvm2.start()
  410. p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
  411. passio_popen=True)
  412. # first write a lot of data
  413. p.stdin.write("dd if=/dev/zero bs=993 count=10000 iflag=fullblock\n")
  414. # and only then read something
  415. p.stdin.write("dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
  416. p.stdin.close()
  417. p.wait()
  418. policy = open("/etc/qubes-rpc/policy/test.write", "w")
  419. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  420. policy.close()
  421. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.write")
  422. t = multiprocessing.Process(target=run, args=(self,))
  423. t.start()
  424. t.join(timeout=10)
  425. if t.is_alive():
  426. t.terminate()
  427. self.fail("Timeout, probably deadlock")
  428. self.assertEqual(result.value, 0, "Service call failed")
  429. def test_072_qrexec_to_dom0_simultaneous_write(self):
  430. """Test for simultaneous write in dom0(src)<-VM(dst) connection
  431. Similar to test_071_qrexec_dom0_simultaneous_write, but with dom0
  432. as a "hanging" side.
  433. """
  434. result = multiprocessing.Value('i', -1)
  435. def run(self):
  436. result.value = self.testvm2.run_service(
  437. "test.write", localcmd="/bin/sh -c '"
  438. # first write a lot of data to fill all the buffers
  439. "dd if=/dev/zero bs=993 count=10000 iflag=fullblock "
  440. # then, only when all written, read something
  441. "dd of=/dev/null bs=993 count=10000 iflag=fullblock; "
  442. "'")
  443. self.testvm2.start()
  444. p = self.testvm2.run("cat > /etc/qubes-rpc/test.write", user="root",
  445. passio_popen=True)
  446. # first write a lot of data
  447. p.stdin.write("dd if=/dev/zero bs=993 count=10000 iflag=fullblock &\n")
  448. # and only then read something
  449. p.stdin.write("dd of=/dev/null bs=993 count=10000 iflag=fullblock\n")
  450. p.stdin.write("sleep 1; \n")
  451. p.stdin.write("wait\n")
  452. p.stdin.close()
  453. p.wait()
  454. policy = open("/etc/qubes-rpc/policy/test.write", "w")
  455. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  456. policy.close()
  457. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.write")
  458. t = multiprocessing.Process(target=run, args=(self,))
  459. t.start()
  460. t.join(timeout=10)
  461. if t.is_alive():
  462. t.terminate()
  463. self.fail("Timeout, probably deadlock")
  464. self.assertEqual(result.value, 0, "Service call failed")
  465. def test_080_qrexec_service_argument_allow_default(self):
  466. """Qrexec service call with argument"""
  467. self.testvm1.start()
  468. self.testvm2.start()
  469. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
  470. passio_popen=True)
  471. p.communicate("/bin/echo $1")
  472. with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
  473. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  474. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
  475. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
  476. "test.Argument+argument".format(self.testvm2.name),
  477. passio_popen=True)
  478. (stdout, stderr) = p.communicate()
  479. self.assertEqual(stdout, "argument\n")
  480. def test_081_qrexec_service_argument_allow_specific(self):
  481. """Qrexec service call with argument - allow only specific value"""
  482. self.testvm1.start()
  483. self.testvm2.start()
  484. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
  485. passio_popen=True)
  486. p.communicate("/bin/echo $1")
  487. with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
  488. policy.write("$anyvm $anyvm deny")
  489. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
  490. with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \
  491. policy:
  492. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  493. self.addCleanup(os.unlink,
  494. "/etc/qubes-rpc/policy/test.Argument+argument")
  495. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
  496. "test.Argument+argument".format(self.testvm2.name),
  497. passio_popen=True)
  498. (stdout, stderr) = p.communicate()
  499. self.assertEqual(stdout, "argument\n")
  500. def test_082_qrexec_service_argument_deny_specific(self):
  501. """Qrexec service call with argument - deny specific value"""
  502. self.testvm1.start()
  503. self.testvm2.start()
  504. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
  505. passio_popen=True)
  506. p.communicate("/bin/echo $1")
  507. with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
  508. policy.write("$anyvm $anyvm allow")
  509. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
  510. with open("/etc/qubes-rpc/policy/test.Argument+argument", "w") as \
  511. policy:
  512. policy.write("%s %s deny" % (self.testvm1.name, self.testvm2.name))
  513. self.addCleanup(os.unlink,
  514. "/etc/qubes-rpc/policy/test.Argument+argument")
  515. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
  516. "test.Argument+argument".format(self.testvm2.name),
  517. passio_popen=True)
  518. (stdout, stderr) = p.communicate()
  519. self.assertEqual(stdout, "")
  520. self.assertEqual(p.returncode, 1, "Service request should be denied")
  521. def test_083_qrexec_service_argument_specific_implementation(self):
  522. """Qrexec service call with argument - argument specific
  523. implementatation"""
  524. self.testvm1.start()
  525. self.testvm2.start()
  526. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
  527. passio_popen=True)
  528. p.communicate("/bin/echo $1")
  529. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument+argument",
  530. user="root", passio_popen=True)
  531. p.communicate("/bin/echo specific: $1")
  532. with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
  533. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  534. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
  535. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
  536. "test.Argument+argument".format(self.testvm2.name),
  537. passio_popen=True)
  538. (stdout, stderr) = p.communicate()
  539. self.assertEqual(stdout, "specific: argument\n")
  540. def test_084_qrexec_service_argument_extra_env(self):
  541. """Qrexec service call with argument - extra env variables"""
  542. self.testvm1.start()
  543. self.testvm2.start()
  544. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Argument", user="root",
  545. passio_popen=True)
  546. p.communicate("/bin/echo $QREXEC_SERVICE_FULL_NAME "
  547. "$QREXEC_SERVICE_ARGUMENT")
  548. with open("/etc/qubes-rpc/policy/test.Argument", "w") as policy:
  549. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  550. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Argument")
  551. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm {} "
  552. "test.Argument+argument".format(self.testvm2.name),
  553. passio_popen=True)
  554. (stdout, stderr) = p.communicate()
  555. self.assertEqual(stdout, "test.Argument+argument argument\n")
  556. def test_100_qrexec_filecopy(self):
  557. self.testvm1.start()
  558. self.testvm2.start()
  559. self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
  560. self.testvm2.name)
  561. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  562. self.testvm2.name, passio_popen=True,
  563. passio_stderr=True)
  564. p.wait()
  565. self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
  566. p.stderr.read())
  567. retcode = self.testvm2.run("diff /etc/passwd "
  568. "/home/user/QubesIncoming/{}/passwd".format(
  569. self.testvm1.name),
  570. wait=True)
  571. self.assertEqual(retcode, 0, "file differs")
  572. def test_105_qrexec_filemove(self):
  573. self.testvm1.start()
  574. self.testvm2.start()
  575. self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
  576. self.testvm2.name)
  577. retcode = self.testvm1.run("cp /etc/passwd passwd", wait=True)
  578. assert retcode == 0, "Failed to prepare source file"
  579. p = self.testvm1.run("qvm-move-to-vm %s passwd" %
  580. self.testvm2.name, passio_popen=True,
  581. passio_stderr=True)
  582. p.wait()
  583. self.assertEqual(p.returncode, 0, "qvm-move-to-vm failed: %s" %
  584. p.stderr.read())
  585. retcode = self.testvm2.run("diff /etc/passwd "
  586. "/home/user/QubesIncoming/{}/passwd".format(
  587. self.testvm1.name),
  588. wait=True)
  589. self.assertEqual(retcode, 0, "file differs")
  590. retcode = self.testvm1.run("test -f passwd", wait=True)
  591. self.assertEqual(retcode, 1, "source file not removed")
  592. def test_101_qrexec_filecopy_with_autostart(self):
  593. self.testvm1.start()
  594. self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
  595. self.testvm2.name)
  596. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  597. self.testvm2.name, passio_popen=True,
  598. passio_stderr=True)
  599. p.wait()
  600. self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
  601. p.stderr.read())
  602. # workaround for libvirt bug (domain ID isn't updated when is started
  603. # from other application) - details in
  604. # QubesOS/qubes-core-libvirt@63ede4dfb4485c4161dd6a2cc809e8fb45ca664f
  605. self.testvm2._libvirt_domain = None
  606. self.assertTrue(self.testvm2.is_running())
  607. retcode = self.testvm2.run("diff /etc/passwd "
  608. "/home/user/QubesIncoming/{}/passwd".format(
  609. self.testvm1.name),
  610. wait=True)
  611. self.assertEqual(retcode, 0, "file differs")
  612. def test_110_qrexec_filecopy_deny(self):
  613. self.testvm1.start()
  614. self.testvm2.start()
  615. self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
  616. self.testvm2.name, allow=False)
  617. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  618. self.testvm2.name, passio_popen=True)
  619. p.wait()
  620. self.assertNotEqual(p.returncode, 0, "qvm-copy-to-vm unexpectedly "
  621. "succeeded")
  622. retcode = self.testvm1.run("ls /home/user/QubesIncoming/%s" %
  623. self.testvm1.name, wait=True,
  624. ignore_stderr=True)
  625. self.assertNotEqual(retcode, 0, "QubesIncoming exists although file "
  626. "copy was denied")
  627. @unittest.skip("Xen gntalloc driver crashes when page is mapped in the "
  628. "same domain")
  629. def test_120_qrexec_filecopy_self(self):
  630. self.testvm1.start()
  631. self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
  632. self.testvm1.name)
  633. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  634. self.testvm1.name, passio_popen=True,
  635. passio_stderr=True)
  636. p.wait()
  637. self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
  638. p.stderr.read())
  639. retcode = self.testvm1.run(
  640. "diff /etc/passwd /home/user/QubesIncoming/{}/passwd".format(
  641. self.testvm1.name),
  642. wait=True)
  643. self.assertEqual(retcode, 0, "file differs")
  644. @unittest.skipUnless(spawn.find_executable('xdotool'),
  645. "xdotool not installed")
  646. def test_130_qrexec_filemove_disk_full(self):
  647. self.testvm1.start()
  648. self.testvm2.start()
  649. self.qrexec_policy('qubes.Filecopy', self.testvm1.name,
  650. self.testvm2.name)
  651. # Prepare test file
  652. prepare_cmd = ("yes teststring | dd of=testfile bs=1M "
  653. "count=50 iflag=fullblock")
  654. retcode = self.testvm1.run(prepare_cmd, wait=True)
  655. if retcode != 0:
  656. raise RuntimeError("Failed '{}' in {}".format(prepare_cmd,
  657. self.testvm1.name))
  658. # Prepare target directory with limited size
  659. prepare_cmd = (
  660. "mkdir -p /home/user/QubesIncoming && "
  661. "chown user /home/user/QubesIncoming && "
  662. "mount -t tmpfs none /home/user/QubesIncoming -o size=48M"
  663. )
  664. retcode = self.testvm2.run(prepare_cmd, user="root", wait=True)
  665. if retcode != 0:
  666. raise RuntimeError("Failed '{}' in {}".format(prepare_cmd,
  667. self.testvm2.name))
  668. p = self.testvm1.run("qvm-move-to-vm %s testfile" %
  669. self.testvm2.name, passio_popen=True,
  670. passio_stderr=True)
  671. # Close GUI error message
  672. self.enter_keys_in_window('Error', ['Return'])
  673. p.wait()
  674. self.assertNotEqual(p.returncode, 0, "qvm-move-to-vm should fail")
  675. retcode = self.testvm1.run("test -f testfile", wait=True)
  676. self.assertEqual(retcode, 0, "testfile should not be deleted in "
  677. "source VM")
  678. def test_200_timezone(self):
  679. """Test whether timezone setting is properly propagated to the VM"""
  680. if "whonix" in self.template:
  681. self.skipTest("Timezone propagation disabled on Whonix templates")
  682. self.testvm1.start()
  683. (vm_tz, _) = self.testvm1.run("date +%Z",
  684. passio_popen=True).communicate()
  685. (dom0_tz, _) = subprocess.Popen(["date", "+%Z"],
  686. stdout=subprocess.PIPE).communicate()
  687. self.assertEqual(vm_tz.strip(), dom0_tz.strip())
  688. # Check if reverting back to UTC works
  689. (vm_tz, _) = self.testvm1.run("TZ=UTC date +%Z",
  690. passio_popen=True).communicate()
  691. self.assertEqual(vm_tz.strip(), "UTC")
  692. def test_210_time_sync(self):
  693. """Test time synchronization mechanism"""
  694. self.testvm1.start()
  695. self.testvm2.start()
  696. (start_time, _) = subprocess.Popen(["date", "-u", "+%s"],
  697. stdout=subprocess.PIPE).communicate()
  698. original_clockvm = self.qc.get_clockvm_vm()
  699. if original_clockvm:
  700. original_clockvm_name = original_clockvm.name
  701. else:
  702. original_clockvm_name = "none"
  703. try:
  704. # use qubes-prefs to not hassle with qubes.xml locking
  705. subprocess.check_call(["qubes-prefs", "-s", "clockvm",
  706. self.testvm1.name])
  707. # break vm and dom0 time, to check if qvm-sync-clock would fix it
  708. subprocess.check_call(["sudo", "date", "-s",
  709. "2001-01-01T12:34:56"],
  710. stdout=open(os.devnull, 'w'))
  711. retcode = self.testvm1.run("date -s 2001-01-01T12:34:56",
  712. user="root", wait=True)
  713. self.assertEquals(retcode, 0, "Failed to break the VM(1) time")
  714. retcode = self.testvm2.run("date -s 2001-01-01T12:34:56",
  715. user="root", wait=True)
  716. self.assertEquals(retcode, 0, "Failed to break the VM(2) time")
  717. retcode = subprocess.call(["qvm-sync-clock"])
  718. self.assertEquals(retcode, 0,
  719. "qvm-sync-clock failed with code {}".
  720. format(retcode))
  721. # qvm-sync-clock is asynchronous - it spawns qubes.SetDateTime
  722. # service, send it timestamp value and exists without waiting for
  723. # actual time set
  724. time.sleep(1)
  725. (vm_time, _) = self.testvm1.run("date -u +%s",
  726. passio_popen=True).communicate()
  727. self.assertAlmostEquals(int(vm_time), int(start_time), delta=30)
  728. (vm_time, _) = self.testvm2.run("date -u +%s",
  729. passio_popen=True).communicate()
  730. self.assertAlmostEquals(int(vm_time), int(start_time), delta=30)
  731. (dom0_time, _) = subprocess.Popen(["date", "-u", "+%s"],
  732. stdout=subprocess.PIPE
  733. ).communicate()
  734. self.assertAlmostEquals(int(dom0_time), int(start_time), delta=30)
  735. except:
  736. # reset time to some approximation of the real time
  737. subprocess.Popen(["sudo", "date", "-u", "-s", "@" + start_time])
  738. raise
  739. finally:
  740. subprocess.call(["qubes-prefs", "-s", "clockvm",
  741. original_clockvm_name])
  742. def test_250_resize_private_img(self):
  743. """
  744. Test private.img resize, both offline and online
  745. :return:
  746. """
  747. # First offline test
  748. self.testvm1.resize_private_img(4*1024**3)
  749. self.testvm1.start()
  750. df_cmd = '( df --output=size /rw || df /rw | awk \'{print $2}\' )|' \
  751. 'tail -n 1'
  752. p = self.testvm1.run(df_cmd,
  753. passio_popen=True)
  754. # new_size in 1k-blocks
  755. (new_size, _) = p.communicate()
  756. # some safety margin for FS metadata
  757. self.assertGreater(int(new_size.strip()), 3.8*1024**2)
  758. # Then online test
  759. self.testvm1.resize_private_img(6*1024**3)
  760. p = self.testvm1.run(df_cmd,
  761. passio_popen=True)
  762. # new_size in 1k-blocks
  763. (new_size, _) = p.communicate()
  764. # some safety margin for FS metadata
  765. self.assertGreater(int(new_size.strip()), 5.8*1024**2)
  766. class TC_05_StandaloneVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  767. def test_000_create_start(self):
  768. testvm1 = self.qc.add_new_vm("QubesAppVm",
  769. template=None,
  770. name=self.make_vm_name('vm1'))
  771. testvm1.create_on_disk(verbose=False,
  772. source_template=self.qc.get_default_template())
  773. self.qc.save()
  774. self.qc.unlock_db()
  775. testvm1.start()
  776. self.assertEquals(testvm1.get_power_state(), "Running")
  777. def test_100_resize_root_img(self):
  778. testvm1 = self.qc.add_new_vm("QubesAppVm",
  779. template=None,
  780. name=self.make_vm_name('vm1'))
  781. testvm1.create_on_disk(verbose=False,
  782. source_template=self.qc.get_default_template())
  783. self.qc.save()
  784. self.qc.unlock_db()
  785. with self.assertRaises(QubesException):
  786. testvm1.resize_root_img(20*1024**3)
  787. testvm1.resize_root_img(20*1024**3, allow_start=True)
  788. timeout = 60
  789. while testvm1.is_running():
  790. time.sleep(1)
  791. timeout -= 1
  792. if timeout == 0:
  793. self.fail("Timeout while waiting for VM shutdown")
  794. self.assertEquals(testvm1.get_root_img_sz(), 20*1024**3)
  795. testvm1.start()
  796. p = testvm1.run('df --output=size /|tail -n 1',
  797. passio_popen=True)
  798. # new_size in 1k-blocks
  799. (new_size, _) = p.communicate()
  800. # some safety margin for FS metadata
  801. self.assertGreater(int(new_size.strip()), 19*1024**2)
  802. class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  803. # TODO: test with some OS inside
  804. # TODO: windows tools tests
  805. def test_000_create_start(self):
  806. testvm1 = self.qc.add_new_vm("QubesHVm",
  807. name=self.make_vm_name('vm1'))
  808. testvm1.create_on_disk(verbose=False)
  809. self.qc.save()
  810. self.qc.unlock_db()
  811. testvm1.start()
  812. self.assertEquals(testvm1.get_power_state(), "Running")
  813. def test_010_create_start_template(self):
  814. templatevm = self.qc.add_new_vm("QubesTemplateHVm",
  815. name=self.make_vm_name('template'))
  816. templatevm.create_on_disk(verbose=False)
  817. self.qc.save()
  818. self.qc.unlock_db()
  819. templatevm.start()
  820. self.assertEquals(templatevm.get_power_state(), "Running")
  821. def test_020_create_start_template_vm(self):
  822. templatevm = self.qc.add_new_vm("QubesTemplateHVm",
  823. name=self.make_vm_name('template'))
  824. templatevm.create_on_disk(verbose=False)
  825. testvm2 = self.qc.add_new_vm("QubesHVm",
  826. name=self.make_vm_name('vm2'),
  827. template=templatevm)
  828. testvm2.create_on_disk(verbose=False)
  829. self.qc.save()
  830. self.qc.unlock_db()
  831. testvm2.start()
  832. self.assertEquals(testvm2.get_power_state(), "Running")
  833. def test_030_prevent_simultaneus_start(self):
  834. templatevm = self.qc.add_new_vm("QubesTemplateHVm",
  835. name=self.make_vm_name('template'))
  836. templatevm.create_on_disk(verbose=False)
  837. testvm2 = self.qc.add_new_vm("QubesHVm",
  838. name=self.make_vm_name('vm2'),
  839. template=templatevm)
  840. testvm2.create_on_disk(verbose=False)
  841. self.qc.save()
  842. self.qc.unlock_db()
  843. templatevm.start()
  844. self.assertEquals(templatevm.get_power_state(), "Running")
  845. self.assertRaises(QubesException, testvm2.start)
  846. templatevm.force_shutdown()
  847. testvm2.start()
  848. self.assertEquals(testvm2.get_power_state(), "Running")
  849. self.assertRaises(QubesException, templatevm.start)
  850. def test_100_resize_root_img(self):
  851. testvm1 = self.qc.add_new_vm("QubesHVm",
  852. name=self.make_vm_name('vm1'))
  853. testvm1.create_on_disk(verbose=False)
  854. self.qc.save()
  855. self.qc.unlock_db()
  856. testvm1.resize_root_img(30*1024**3)
  857. self.assertEquals(testvm1.get_root_img_sz(), 30*1024**3)
  858. testvm1.start()
  859. self.assertEquals(testvm1.get_power_state(), "Running")
  860. # TODO: launch some OS there and check the size
  861. class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
  862. def test_000_prepare_dvm(self):
  863. self.qc.unlock_db()
  864. retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
  865. self.template],
  866. stderr=open(os.devnull, 'w'))
  867. self.assertEqual(retcode, 0)
  868. self.qc.lock_db_for_writing()
  869. self.qc.load()
  870. self.assertIsNotNone(self.qc.get_vm_by_name(
  871. self.template + "-dvm"))
  872. # TODO: check mtime of snapshot file
  873. def test_010_simple_dvm_run(self):
  874. self.qc.unlock_db()
  875. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  876. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  877. stdin=subprocess.PIPE,
  878. stdout=subprocess.PIPE,
  879. stderr=open(os.devnull, 'w'))
  880. (stdout, _) = p.communicate(input="echo test")
  881. self.assertEqual(stdout, "test\n")
  882. # TODO: check if DispVM is destroyed
  883. @unittest.skipUnless(spawn.find_executable('xdotool'),
  884. "xdotool not installed")
  885. def test_020_gui_app(self):
  886. self.qc.unlock_db()
  887. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  888. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  889. stdin=subprocess.PIPE,
  890. stdout=subprocess.PIPE,
  891. stderr=open(os.devnull, 'w'))
  892. # wait for DispVM startup:
  893. p.stdin.write("echo test\n")
  894. p.stdin.flush()
  895. l = p.stdout.readline()
  896. self.assertEqual(l, "test\n")
  897. # potential race condition, but our tests are supposed to be
  898. # running on dedicated machine, so should not be a problem
  899. self.qc.lock_db_for_reading()
  900. self.qc.load()
  901. self.qc.unlock_db()
  902. max_qid = 0
  903. for vm in self.qc.values():
  904. if not vm.is_disposablevm():
  905. continue
  906. if vm.qid > max_qid:
  907. max_qid = vm.qid
  908. dispvm = self.qc[max_qid]
  909. self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
  910. self.assertTrue(dispvm.is_running())
  911. try:
  912. window_title = 'user@%s' % (dispvm.template.name + "-dvm")
  913. p.stdin.write("xterm -e "
  914. "\"sh -s -c 'echo \\\"\033]0;{}\007\\\";read x;'\"\n".
  915. format(window_title))
  916. self.wait_for_window(window_title)
  917. time.sleep(0.5)
  918. self.enter_keys_in_window(window_title, ['Return'])
  919. # Wait for window to close
  920. self.wait_for_window(window_title, show=False)
  921. finally:
  922. p.stdin.close()
  923. wait_count = 0
  924. while dispvm.is_running():
  925. wait_count += 1
  926. if wait_count > 100:
  927. self.fail("Timeout while waiting for DispVM destruction")
  928. time.sleep(0.1)
  929. wait_count = 0
  930. while p.poll() is None:
  931. wait_count += 1
  932. if wait_count > 100:
  933. self.fail("Timeout while waiting for qfile-daemon-dvm "
  934. "termination")
  935. time.sleep(0.1)
  936. self.assertEqual(p.returncode, 0)
  937. self.qc.lock_db_for_reading()
  938. self.qc.load()
  939. self.qc.unlock_db()
  940. self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
  941. "DispVM not removed from qubes.xml")
  942. def _handle_editor(self, winid):
  943. (window_title, _) = subprocess.Popen(
  944. ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
  945. communicate()
  946. window_title = window_title.strip().\
  947. replace('(', '\(').replace(')', '\)')
  948. time.sleep(1)
  949. if "gedit" in window_title:
  950. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  951. 'windowactivate', 'type', 'test test 2\n'])
  952. time.sleep(0.5)
  953. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  954. 'key', 'ctrl+s', 'ctrl+q'])
  955. elif "emacs" in window_title:
  956. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  957. 'windowactivate', 'type', 'test test 2\n'])
  958. time.sleep(0.5)
  959. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  960. 'key', 'ctrl+x', 'ctrl+s'])
  961. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  962. 'key', 'ctrl+x', 'ctrl+c'])
  963. elif "vim" in window_title:
  964. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  965. 'windowactivate', 'key', 'i',
  966. 'type', 'test test 2\n'])
  967. subprocess.check_call(
  968. ['xdotool', 'search', '--name', window_title,
  969. 'key', 'Escape', 'colon', 'w', 'q', 'Return'])
  970. else:
  971. self.fail("Unknown editor window: {}".format(window_title))
  972. @unittest.skipUnless(spawn.find_executable('xdotool'),
  973. "xdotool not installed")
  974. def test_030_edit_file(self):
  975. testvm1 = self.qc.add_new_vm("QubesAppVm",
  976. name=self.make_vm_name('vm1'),
  977. template=self.qc.get_vm_by_name(
  978. self.template))
  979. testvm1.create_on_disk(verbose=False)
  980. self.qc.save()
  981. testvm1.start()
  982. testvm1.run("echo test1 > /home/user/test.txt", wait=True)
  983. self.qc.unlock_db()
  984. p = testvm1.run("qvm-open-in-dvm /home/user/test.txt",
  985. passio_popen=True)
  986. wait_count = 0
  987. winid = None
  988. while True:
  989. search = subprocess.Popen(['xdotool', 'search',
  990. '--onlyvisible', '--class', 'disp*'],
  991. stdout=subprocess.PIPE,
  992. stderr=open(os.path.devnull, 'w'))
  993. retcode = search.wait()
  994. if retcode == 0:
  995. winid = search.stdout.read().strip()
  996. break
  997. wait_count += 1
  998. if wait_count > 100:
  999. self.fail("Timeout while waiting for editor window")
  1000. time.sleep(0.3)
  1001. self._handle_editor(winid)
  1002. p.wait()
  1003. p = testvm1.run("cat /home/user/test.txt",
  1004. passio_popen=True)
  1005. (test_txt_content, _) = p.communicate()
  1006. self.assertEqual(test_txt_content, "test test 2\ntest1\n")
  1007. class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  1008. @unittest.skipUnless(spawn.find_executable('xdotool'),
  1009. "xdotool not installed")
  1010. def test_000_clipboard(self):
  1011. testvm1 = self.qc.add_new_vm("QubesAppVm",
  1012. name=self.make_vm_name('vm1'),
  1013. template=self.qc.get_default_template())
  1014. testvm1.create_on_disk(verbose=False)
  1015. testvm2 = self.qc.add_new_vm("QubesAppVm",
  1016. name=self.make_vm_name('vm2'),
  1017. template=self.qc.get_default_template())
  1018. testvm2.create_on_disk(verbose=False)
  1019. self.qc.save()
  1020. self.qc.unlock_db()
  1021. testvm1.start()
  1022. testvm2.start()
  1023. window_title = 'user@{}'.format(testvm1.name)
  1024. testvm1.run('zenity --text-info --editable --title={}'.format(
  1025. window_title))
  1026. self.wait_for_window(window_title)
  1027. time.sleep(0.5)
  1028. test_string = "test{}".format(testvm1.xid)
  1029. # Type and copy some text
  1030. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  1031. 'windowactivate',
  1032. 'type', '{}'.format(test_string)])
  1033. # second xdotool call because type --terminator do not work (SEGV)
  1034. # additionally do not use search here, so window stack will be empty
  1035. # and xdotool will use XTEST instead of generating events manually -
  1036. # this will be much better - at least because events will have
  1037. # correct timestamp (so gui-daemon would not drop the copy request)
  1038. subprocess.check_call(['xdotool',
  1039. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  1040. 'Escape'])
  1041. clipboard_content = \
  1042. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  1043. self.assertEquals(clipboard_content, test_string,
  1044. "Clipboard copy operation failed - content")
  1045. clipboard_source = \
  1046. open('/var/run/qubes/qubes-clipboard.bin.source',
  1047. 'r').read().strip()
  1048. self.assertEquals(clipboard_source, testvm1.name,
  1049. "Clipboard copy operation failed - owner")
  1050. # Then paste it to the other window
  1051. window_title = 'user@{}'.format(testvm2.name)
  1052. p = testvm2.run('zenity --entry --title={} > test.txt'.format(
  1053. window_title), passio_popen=True)
  1054. self.wait_for_window(window_title)
  1055. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  1056. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  1057. p.wait()
  1058. # And compare the result
  1059. (test_output, _) = testvm2.run('cat test.txt',
  1060. passio_popen=True).communicate()
  1061. self.assertEquals(test_string, test_output.strip())
  1062. clipboard_content = \
  1063. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  1064. self.assertEquals(clipboard_content, "",
  1065. "Clipboard not wiped after paste - content")
  1066. clipboard_source = \
  1067. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
  1068. ).strip()
  1069. self.assertEquals(clipboard_source, "",
  1070. "Clipboard not wiped after paste - owner")
  1071. @unittest.skipUnless(os.path.exists('/var/lib/qubes/vm-kernels/pvgrub2'),
  1072. 'grub-xen package not installed')
  1073. class TC_40_PVGrub(qubes.tests.SystemTestsMixin):
  1074. def setUp(self):
  1075. super(TC_40_PVGrub, self).setUp()
  1076. supported = False
  1077. if self.template.startswith('fedora-'):
  1078. supported = True
  1079. elif self.template.startswith('debian-'):
  1080. supported = True
  1081. if not supported:
  1082. self.skipTest("Template {} not supported by this test".format(
  1083. self.template))
  1084. def install_packages(self, vm):
  1085. if self.template.startswith('fedora-'):
  1086. cmd_install1 = 'yum clean expire-cache && ' \
  1087. 'yum install -y qubes-kernel-vm-support grub2-tools'
  1088. cmd_install2 = 'yum install -y kernel kernel-devel'
  1089. cmd_update_grub = 'grub2-mkconfig -o /boot/grub2/grub.cfg'
  1090. elif self.template.startswith('debian-'):
  1091. cmd_install1 = 'apt-get update && apt-get install -y ' \
  1092. 'qubes-kernel-vm-support grub2-common'
  1093. cmd_install2 = 'apt-get install -y linux-image-amd64'
  1094. cmd_update_grub = 'mkdir /boot/grub && update-grub2'
  1095. else:
  1096. assert False, "Unsupported template?!"
  1097. for cmd in [cmd_install1, cmd_install2, cmd_update_grub]:
  1098. p = vm.run(cmd, user="root", passio_popen=True, passio_stderr=True)
  1099. (stdout, stderr) = p.communicate()
  1100. self.assertEquals(p.returncode, 0,
  1101. "Failed command: {}\nSTDOUT: {}\nSTDERR: {}"
  1102. .format(cmd, stdout, stderr))
  1103. def get_kernel_version(self, vm):
  1104. if self.template.startswith('fedora-'):
  1105. cmd_get_kernel_version = 'rpm -q kernel|sort -n|tail -1|' \
  1106. 'cut -d - -f 2-'
  1107. elif self.template.startswith('debian-'):
  1108. cmd_get_kernel_version = \
  1109. 'dpkg-query --showformat=\'${Package}\\n\' --show ' \
  1110. '\'linux-image-*-amd64\'|sort -n|tail -1|cut -d - -f 3-'
  1111. else:
  1112. raise RuntimeError("Unsupported template?!")
  1113. p = vm.run(cmd_get_kernel_version, user="root", passio_popen=True)
  1114. (kver, _) = p.communicate()
  1115. self.assertEquals(p.returncode, 0,
  1116. "Failed command: {}".format(cmd_get_kernel_version))
  1117. return kver.strip()
  1118. def test_000_standalone_vm(self):
  1119. testvm1 = self.qc.add_new_vm("QubesAppVm",
  1120. template=None,
  1121. name=self.make_vm_name('vm1'))
  1122. testvm1.create_on_disk(verbose=False,
  1123. source_template=self.qc.get_vm_by_name(
  1124. self.template))
  1125. self.save_and_reload_db()
  1126. self.qc.unlock_db()
  1127. testvm1 = self.qc[testvm1.qid]
  1128. testvm1.start()
  1129. self.install_packages(testvm1)
  1130. kver = self.get_kernel_version(testvm1)
  1131. self.shutdown_and_wait(testvm1)
  1132. self.qc.lock_db_for_writing()
  1133. self.qc.load()
  1134. testvm1 = self.qc[testvm1.qid]
  1135. testvm1.kernel = 'pvgrub2'
  1136. self.save_and_reload_db()
  1137. self.qc.unlock_db()
  1138. testvm1 = self.qc[testvm1.qid]
  1139. testvm1.start()
  1140. p = testvm1.run('uname -r', passio_popen=True)
  1141. (actual_kver, _) = p.communicate()
  1142. self.assertEquals(actual_kver.strip(), kver)
  1143. def test_010_template_based_vm(self):
  1144. test_template = self.qc.add_new_vm("QubesTemplateVm",
  1145. template=None,
  1146. name=self.make_vm_name('template'))
  1147. test_template.clone_attrs(self.qc.get_vm_by_name(self.template))
  1148. test_template.clone_disk_files(
  1149. src_vm=self.qc.get_vm_by_name(self.template),
  1150. verbose=False)
  1151. testvm1 = self.qc.add_new_vm("QubesAppVm",
  1152. template=test_template,
  1153. name=self.make_vm_name('vm1'))
  1154. testvm1.create_on_disk(verbose=False,
  1155. source_template=test_template)
  1156. self.save_and_reload_db()
  1157. self.qc.unlock_db()
  1158. test_template = self.qc[test_template.qid]
  1159. testvm1 = self.qc[testvm1.qid]
  1160. test_template.start()
  1161. self.install_packages(test_template)
  1162. kver = self.get_kernel_version(test_template)
  1163. self.shutdown_and_wait(test_template)
  1164. self.qc.lock_db_for_writing()
  1165. self.qc.load()
  1166. test_template = self.qc[test_template.qid]
  1167. test_template.kernel = 'pvgrub2'
  1168. testvm1 = self.qc[testvm1.qid]
  1169. testvm1.kernel = 'pvgrub2'
  1170. self.save_and_reload_db()
  1171. self.qc.unlock_db()
  1172. # Check if TemplateBasedVM boots and has the right kernel
  1173. testvm1 = self.qc[testvm1.qid]
  1174. testvm1.start()
  1175. p = testvm1.run('uname -r', passio_popen=True)
  1176. (actual_kver, _) = p.communicate()
  1177. self.assertEquals(actual_kver.strip(), kver)
  1178. # And the same for the TemplateVM itself
  1179. test_template = self.qc[test_template.qid]
  1180. test_template.start()
  1181. p = test_template.run('uname -r', passio_popen=True)
  1182. (actual_kver, _) = p.communicate()
  1183. self.assertEquals(actual_kver.strip(), kver)
  1184. @unittest.skipUnless(
  1185. spawn.find_executable('xprop') and
  1186. spawn.find_executable('xdotool') and
  1187. spawn.find_executable('wmctrl'),
  1188. "xprop or xdotool or wmctrl not installed")
  1189. class TC_50_MimeHandlers(qubes.tests.SystemTestsMixin):
  1190. @classmethod
  1191. def setUpClass(cls):
  1192. if cls.template == 'whonix-gw' or 'minimal' in cls.template:
  1193. raise unittest.SkipTest(
  1194. 'Template {} not supported by this test'.format(cls.template))
  1195. if cls.template == 'whonix-ws':
  1196. # TODO remove when Whonix-based DispVMs will work (Whonix 13?)
  1197. raise unittest.SkipTest(
  1198. 'Template {} not supported by this test'.format(cls.template))
  1199. qc = QubesVmCollection()
  1200. cls._kill_test_vms(qc, prefix=qubes.tests.CLSVMPREFIX)
  1201. qc.lock_db_for_writing()
  1202. qc.load()
  1203. cls._remove_test_vms(qc, qubes.qubes.vmm.libvirt_conn,
  1204. prefix=qubes.tests.CLSVMPREFIX)
  1205. cls.source_vmname = cls.make_vm_name('source', True)
  1206. source_vm = qc.add_new_vm("QubesAppVm",
  1207. template=qc.get_vm_by_name(cls.template),
  1208. name=cls.source_vmname)
  1209. source_vm.create_on_disk(verbose=False)
  1210. cls.target_vmname = cls.make_vm_name('target', True)
  1211. target_vm = qc.add_new_vm("QubesAppVm",
  1212. template=qc.get_vm_by_name(cls.template),
  1213. name=cls.target_vmname)
  1214. target_vm.create_on_disk(verbose=False)
  1215. qc.save()
  1216. qc.unlock_db()
  1217. source_vm.start()
  1218. target_vm.start()
  1219. # make sure that DispVMs will be started of the same template
  1220. retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
  1221. cls.template],
  1222. stderr=open(os.devnull, 'w'))
  1223. assert retcode == 0, "Error preparing DispVM"
  1224. def setUp(self):
  1225. super(TC_50_MimeHandlers, self).setUp()
  1226. self.source_vm = self.qc.get_vm_by_name(self.source_vmname)
  1227. self.target_vm = self.qc.get_vm_by_name(self.target_vmname)
  1228. def get_window_class(self, winid, dispvm=False):
  1229. (vm_winid, _) = subprocess.Popen(
  1230. ['xprop', '-id', winid, '_QUBES_VMWINDOWID'],
  1231. stdout=subprocess.PIPE
  1232. ).communicate()
  1233. vm_winid = vm_winid.split("#")[1].strip('\n" ')
  1234. if dispvm:
  1235. (vmname, _) = subprocess.Popen(
  1236. ['xprop', '-id', winid, '_QUBES_VMNAME'],
  1237. stdout=subprocess.PIPE
  1238. ).communicate()
  1239. vmname = vmname.split("=")[1].strip('\n" ')
  1240. window_class = None
  1241. while window_class is None:
  1242. # XXX to use self.qc.get_vm_by_name would require reloading
  1243. # qubes.xml, so use qvm-run instead
  1244. xprop = subprocess.Popen(
  1245. ['qvm-run', '-p', vmname, 'xprop -id {} WM_CLASS'.format(
  1246. vm_winid)], stdout=subprocess.PIPE)
  1247. (window_class, _) = xprop.communicate()
  1248. if xprop.returncode != 0:
  1249. self.skipTest("xprop failed, not installed?")
  1250. if 'not found' in window_class:
  1251. # WM_CLASS not set yet, wait a little
  1252. time.sleep(0.1)
  1253. window_class = None
  1254. else:
  1255. window_class = None
  1256. while window_class is None:
  1257. xprop = self.target_vm.run(
  1258. 'xprop -id {} WM_CLASS'.format(vm_winid),
  1259. passio_popen=True)
  1260. (window_class, _) = xprop.communicate()
  1261. if xprop.returncode != 0:
  1262. self.skipTest("xprop failed, not installed?")
  1263. if 'not found' in window_class:
  1264. # WM_CLASS not set yet, wait a little
  1265. time.sleep(0.1)
  1266. window_class = None
  1267. # output: WM_CLASS(STRING) = "gnome-terminal-server", "Gnome-terminal"
  1268. try:
  1269. window_class = window_class.split("=")[1].split(",")[0].strip('\n" ')
  1270. except IndexError:
  1271. raise Exception(
  1272. "Unexpected output from xprop: '{}'".format(window_class))
  1273. return window_class
  1274. def open_file_and_check_viewer(self, filename, expected_app_titles,
  1275. expected_app_classes, dispvm=False):
  1276. self.qc.unlock_db()
  1277. if dispvm:
  1278. p = self.source_vm.run("qvm-open-in-dvm {}".format(filename),
  1279. passio_popen=True)
  1280. vmpattern = "disp*"
  1281. else:
  1282. self.qrexec_policy('qubes.Filecopy', self.source_vm.name,
  1283. self.target_vmname)
  1284. p = self.source_vm.run("qvm-open-in-vm {} {}".format(
  1285. self.target_vmname, filename), passio_popen=True)
  1286. vmpattern = self.target_vmname
  1287. wait_count = 0
  1288. winid = None
  1289. window_title = None
  1290. while True:
  1291. search = subprocess.Popen(['xdotool', 'search',
  1292. '--onlyvisible', '--class', vmpattern],
  1293. stdout=subprocess.PIPE,
  1294. stderr=open(os.path.devnull, 'w'))
  1295. retcode = search.wait()
  1296. if retcode == 0:
  1297. winid = search.stdout.read().strip()
  1298. # get window title
  1299. (window_title, _) = subprocess.Popen(
  1300. ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
  1301. communicate()
  1302. window_title = window_title.strip()
  1303. # ignore LibreOffice splash screen and window with no title
  1304. # set yet
  1305. if window_title and not window_title.startswith("LibreOffice")\
  1306. and not window_title == 'VMapp command':
  1307. break
  1308. wait_count += 1
  1309. if wait_count > 100:
  1310. self.fail("Timeout while waiting for editor window")
  1311. time.sleep(0.3)
  1312. # get window class
  1313. window_class = self.get_window_class(winid, dispvm)
  1314. # close the window - we've got the window class, it is no longer needed
  1315. subprocess.check_call(['wmctrl', '-i', '-c', winid])
  1316. p.wait()
  1317. self.wait_for_window(window_title, show=False)
  1318. def check_matches(obj, patterns):
  1319. return any((pat.search(obj) if isinstance(pat, type(re.compile('')))
  1320. else pat in obj) for pat in patterns)
  1321. if not check_matches(window_title, expected_app_titles) and \
  1322. not check_matches(window_class, expected_app_classes):
  1323. self.fail("Opening file {} resulted in window '{} ({})', which is "
  1324. "none of {!r} ({!r})".format(
  1325. filename, window_title, window_class,
  1326. expected_app_titles, expected_app_classes))
  1327. def prepare_txt(self, filename):
  1328. p = self.source_vm.run("cat > {}".format(filename), passio_popen=True)
  1329. p.stdin.write("This is test\n")
  1330. p.stdin.close()
  1331. retcode = p.wait()
  1332. assert retcode == 0, "Failed to write {} file".format(filename)
  1333. def prepare_pdf(self, filename):
  1334. self.prepare_txt("/tmp/source.txt")
  1335. cmd = "convert /tmp/source.txt {}".format(filename)
  1336. retcode = self.source_vm.run(cmd, wait=True)
  1337. assert retcode == 0, "Failed to run '{}'".format(cmd)
  1338. def prepare_doc(self, filename):
  1339. self.prepare_txt("/tmp/source.txt")
  1340. cmd = "unoconv -f doc -o {} /tmp/source.txt".format(filename)
  1341. retcode = self.source_vm.run(cmd, wait=True)
  1342. if retcode != 0:
  1343. self.skipTest("Failed to run '{}', not installed?".format(cmd))
  1344. def prepare_pptx(self, filename):
  1345. self.prepare_txt("/tmp/source.txt")
  1346. cmd = "unoconv -f pptx -o {} /tmp/source.txt".format(filename)
  1347. retcode = self.source_vm.run(cmd, wait=True)
  1348. if retcode != 0:
  1349. self.skipTest("Failed to run '{}', not installed?".format(cmd))
  1350. def prepare_png(self, filename):
  1351. self.prepare_txt("/tmp/source.txt")
  1352. cmd = "convert /tmp/source.txt {}".format(filename)
  1353. retcode = self.source_vm.run(cmd, wait=True)
  1354. if retcode != 0:
  1355. self.skipTest("Failed to run '{}', not installed?".format(cmd))
  1356. def prepare_jpg(self, filename):
  1357. self.prepare_txt("/tmp/source.txt")
  1358. cmd = "convert /tmp/source.txt {}".format(filename)
  1359. retcode = self.source_vm.run(cmd, wait=True)
  1360. if retcode != 0:
  1361. self.skipTest("Failed to run '{}', not installed?".format(cmd))
  1362. def test_000_txt(self):
  1363. filename = "/home/user/test_file.txt"
  1364. self.prepare_txt(filename)
  1365. self.open_file_and_check_viewer(filename, ["vim"],
  1366. ["gedit", "emacs"])
  1367. def test_001_pdf(self):
  1368. filename = "/home/user/test_file.pdf"
  1369. self.prepare_pdf(filename)
  1370. self.open_file_and_check_viewer(filename, [],
  1371. ["evince"])
  1372. def test_002_doc(self):
  1373. filename = "/home/user/test_file.doc"
  1374. self.prepare_doc(filename)
  1375. self.open_file_and_check_viewer(filename, [],
  1376. ["libreoffice", "abiword"])
  1377. def test_003_pptx(self):
  1378. filename = "/home/user/test_file.pptx"
  1379. self.prepare_pptx(filename)
  1380. self.open_file_and_check_viewer(filename, [],
  1381. ["libreoffice"])
  1382. def test_004_png(self):
  1383. filename = "/home/user/test_file.png"
  1384. self.prepare_png(filename)
  1385. self.open_file_and_check_viewer(filename, [],
  1386. ["shotwell", "eog", "display"])
  1387. def test_005_jpg(self):
  1388. filename = "/home/user/test_file.jpg"
  1389. self.prepare_jpg(filename)
  1390. self.open_file_and_check_viewer(filename, [],
  1391. ["shotwell", "eog", "display"])
  1392. def test_006_jpeg(self):
  1393. filename = "/home/user/test_file.jpeg"
  1394. self.prepare_jpg(filename)
  1395. self.open_file_and_check_viewer(filename, [],
  1396. ["shotwell", "eog", "display"])
  1397. def test_100_txt_dispvm(self):
  1398. filename = "/home/user/test_file.txt"
  1399. self.prepare_txt(filename)
  1400. self.open_file_and_check_viewer(filename, ["vim"],
  1401. ["gedit", "emacs"],
  1402. dispvm=True)
  1403. def test_101_pdf_dispvm(self):
  1404. filename = "/home/user/test_file.pdf"
  1405. self.prepare_pdf(filename)
  1406. self.open_file_and_check_viewer(filename, [],
  1407. ["evince"],
  1408. dispvm=True)
  1409. def test_102_doc_dispvm(self):
  1410. filename = "/home/user/test_file.doc"
  1411. self.prepare_doc(filename)
  1412. self.open_file_and_check_viewer(filename, [],
  1413. ["libreoffice", "abiword"],
  1414. dispvm=True)
  1415. def test_103_pptx_dispvm(self):
  1416. filename = "/home/user/test_file.pptx"
  1417. self.prepare_pptx(filename)
  1418. self.open_file_and_check_viewer(filename, [],
  1419. ["libreoffice"],
  1420. dispvm=True)
  1421. def test_104_png_dispvm(self):
  1422. filename = "/home/user/test_file.png"
  1423. self.prepare_png(filename)
  1424. self.open_file_and_check_viewer(filename, [],
  1425. ["shotwell", "eog", "display"],
  1426. dispvm=True)
  1427. def test_105_jpg_dispvm(self):
  1428. filename = "/home/user/test_file.jpg"
  1429. self.prepare_jpg(filename)
  1430. self.open_file_and_check_viewer(filename, [],
  1431. ["shotwell", "eog", "display"],
  1432. dispvm=True)
  1433. def test_106_jpeg_dispvm(self):
  1434. filename = "/home/user/test_file.jpeg"
  1435. self.prepare_jpg(filename)
  1436. self.open_file_and_check_viewer(filename, [],
  1437. ["shotwell", "eog", "display"],
  1438. dispvm=True)
  1439. def load_tests(loader, tests, pattern):
  1440. try:
  1441. qc = qubes.qubes.QubesVmCollection()
  1442. qc.lock_db_for_reading()
  1443. qc.load()
  1444. qc.unlock_db()
  1445. templates = [vm.name for vm in qc.values() if
  1446. isinstance(vm, qubes.qubes.QubesTemplateVm)]
  1447. except OSError:
  1448. templates = []
  1449. for template in templates:
  1450. tests.addTests(loader.loadTestsFromTestCase(
  1451. type(
  1452. 'TC_00_AppVM_' + template,
  1453. (TC_00_AppVMMixin, qubes.tests.QubesTestCase),
  1454. {'template': template})))
  1455. tests.addTests(loader.loadTestsFromTestCase(
  1456. type(
  1457. 'TC_20_DispVM_' + template,
  1458. (TC_20_DispVMMixin, qubes.tests.QubesTestCase),
  1459. {'template': template})))
  1460. tests.addTests(loader.loadTestsFromTestCase(
  1461. type(
  1462. 'TC_40_PVGrub_' + template,
  1463. (TC_40_PVGrub, qubes.tests.QubesTestCase),
  1464. {'template': template})))
  1465. tests.addTests(loader.loadTestsFromTestCase(
  1466. type(
  1467. 'TC_50_MimeHandlers_' + template,
  1468. (TC_50_MimeHandlers, qubes.tests.QubesTestCase),
  1469. {'template': template})))
  1470. return tests