vm_qrexec_gui.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. #!/usr/bin/python
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2014-2015
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License as published by
  12. # the Free Software Foundation; either version 2 of the License, or
  13. # (at your option) any later version.
  14. #
  15. # This program is distributed in the hope that it will be useful,
  16. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. # GNU General Public License for more details.
  19. #
  20. # You should have received a copy of the GNU General Public License along
  21. # with this program; if not, write to the Free Software Foundation, Inc.,
  22. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  23. #
  24. from distutils import spawn
  25. import multiprocessing
  26. import os
  27. import subprocess
  28. import unittest
  29. import time
  30. from qubes.qubes import QubesVmCollection, defaults, QubesException
  31. import qubes.tests
  32. TEST_DATA = "0123456789" * 1024
  33. class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
  34. def setUp(self):
  35. super(TC_00_AppVMMixin, self).setUp()
  36. self.testvm1 = self.qc.add_new_vm("QubesAppVm",
  37. name=self.make_vm_name('vm1'),
  38. template=self.qc.get_vm_by_name(self.template))
  39. self.testvm1.create_on_disk(verbose=False)
  40. self.testvm2 = self.qc.add_new_vm("QubesAppVm",
  41. name=self.make_vm_name('vm2'),
  42. template=self.qc.get_vm_by_name(self.template))
  43. self.testvm2.create_on_disk(verbose=False)
  44. self.qc.save()
  45. self.qc.unlock_db()
  46. def test_000_start_shutdown(self):
  47. self.testvm1.start()
  48. self.assertEquals(self.testvm1.get_power_state(), "Running")
  49. self.testvm1.shutdown()
  50. shutdown_counter = 0
  51. while self.testvm1.is_running():
  52. if shutdown_counter > defaults["shutdown_counter_max"]:
  53. self.fail("VM hanged during shutdown")
  54. shutdown_counter += 1
  55. time.sleep(1)
  56. time.sleep(1)
  57. self.assertEquals(self.testvm1.get_power_state(), "Halted")
  58. @unittest.skipUnless(spawn.find_executable('xdotool'),
  59. "xdotool not installed")
  60. def test_010_run_gui_app(self):
  61. self.testvm1.start()
  62. self.assertEquals(self.testvm1.get_power_state(), "Running")
  63. self.testvm1.run("gnome-terminal")
  64. wait_count = 0
  65. while subprocess.call(['xdotool', 'search', '--name', 'user@%s' %
  66. self.testvm1.name], stdout=open(os.path.devnull, 'w'),
  67. stderr=subprocess.STDOUT) > 0:
  68. wait_count += 1
  69. if wait_count > 100:
  70. self.fail("Timeout while waiting for gnome-terminal window")
  71. time.sleep(0.1)
  72. time.sleep(0.5)
  73. subprocess.check_call(['xdotool', 'search', '--name', 'user@%s' %
  74. self.testvm1.name, 'windowactivate', 'type', 'exit\n'])
  75. wait_count = 0
  76. while subprocess.call(['xdotool', 'search', '--name', 'user@%s' %
  77. self.testvm1.name], stdout=open(os.path.devnull, 'w'),
  78. stderr=subprocess.STDOUT) == 0:
  79. wait_count += 1
  80. if wait_count > 100:
  81. self.fail("Timeout while waiting for gnome-terminal "
  82. "termination")
  83. time.sleep(0.1)
  84. def test_050_qrexec_simple_eof(self):
  85. """Test for data and EOF transmission dom0->VM"""
  86. result = multiprocessing.Value('i', 0)
  87. def run(self, result):
  88. p = self.testvm1.run("cat", passio_popen=True,
  89. passio_stderr=True)
  90. (stdout, stderr) = p.communicate(TEST_DATA)
  91. if stdout != TEST_DATA:
  92. result.value = 1
  93. if len(stderr) > 0:
  94. result.value = 2
  95. self.testvm1.start()
  96. t = multiprocessing.Process(target=run, args=(self, result))
  97. t.start()
  98. t.join(timeout=10)
  99. if t.is_alive():
  100. t.terminate()
  101. self.fail("Timeout, probably EOF wasn't transferred to the VM "
  102. "process")
  103. if result.value == 1:
  104. self.fail("Received data differs from what was sent")
  105. elif result.value == 2:
  106. self.fail("Some data was printed to stderr")
  107. def test_051_qrexec_simple_eof_reverse(self):
  108. """Test for EOF transmission VM->dom0"""
  109. result = multiprocessing.Value('i', 0)
  110. def run(self, result):
  111. p = self.testvm1.run("echo test; exec >&-; cat > /dev/null",
  112. passio_popen=True, passio_stderr=True)
  113. # this will hang on test failure
  114. stdout = p.stdout.read()
  115. p.stdin.write(TEST_DATA)
  116. p.stdin.close()
  117. if stdout.strip() != "test":
  118. result.value = 1
  119. # this may hang in some buggy cases
  120. elif len(p.stderr.read()) > 0:
  121. result.value = 2
  122. elif p.poll() is None:
  123. time.sleep(1)
  124. if p.poll() is None:
  125. result.value = 3
  126. self.testvm1.start()
  127. t = multiprocessing.Process(target=run, args=(self, result))
  128. t.start()
  129. t.join(timeout=10)
  130. if t.is_alive():
  131. t.terminate()
  132. self.fail("Timeout, probably EOF wasn't transferred from the VM "
  133. "process")
  134. if result.value == 1:
  135. self.fail("Received data differs from what was expected")
  136. elif result.value == 2:
  137. self.fail("Some data was printed to stderr")
  138. elif result.value == 3:
  139. self.fail("VM proceess didn't terminated on EOF")
  140. def test_052_qrexec_vm_service_eof(self):
  141. """Test for EOF transmission VM(src)->VM(dst)"""
  142. result = multiprocessing.Value('i', 0)
  143. def run(self, result):
  144. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.EOF "
  145. "/bin/sh -c 'echo test; exec >&-; cat "
  146. ">&$SAVED_FD_1'" % self.testvm2.name,
  147. passio_popen=True)
  148. (stdout, stderr) = p.communicate()
  149. if stdout != "test\n":
  150. result.value = 1
  151. self.testvm1.start()
  152. self.testvm2.start()
  153. p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
  154. passio_popen=True)
  155. p.stdin.write("/bin/cat")
  156. p.stdin.close()
  157. p.wait()
  158. policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
  159. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  160. policy.close()
  161. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.EOF")
  162. t = multiprocessing.Process(target=run, args=(self, result))
  163. t.start()
  164. t.join(timeout=10)
  165. if t.is_alive():
  166. t.terminate()
  167. self.fail("Timeout, probably EOF wasn't transferred")
  168. if result.value == 1:
  169. self.fail("Received data differs from what was expected")
  170. @unittest.expectedFailure
  171. def test_053_qrexec_vm_service_eof_reverse(self):
  172. """Test for EOF transmission VM(src)<-VM(dst)"""
  173. result = multiprocessing.Value('i', 0)
  174. def run(self, result):
  175. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.EOF "
  176. "/bin/sh -c 'cat >&$SAVED_FD_1'"
  177. % self.testvm1.name,
  178. passio_popen=True)
  179. (stdout, stderr) = p.communicate()
  180. if stdout != "test\n":
  181. result.value = 1
  182. self.testvm1.start()
  183. self.testvm2.start()
  184. p = self.testvm2.run("cat > /etc/qubes-rpc/test.EOF", user="root",
  185. passio_popen=True)
  186. p.stdin.write("echo test; exec >&-; cat >/dev/null")
  187. p.stdin.close()
  188. p.wait()
  189. policy = open("/etc/qubes-rpc/policy/test.EOF", "w")
  190. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  191. policy.close()
  192. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.EOF")
  193. t = multiprocessing.Process(target=run, args=(self, result))
  194. t.start()
  195. t.join(timeout=10)
  196. if t.is_alive():
  197. t.terminate()
  198. self.fail("Timeout, probably EOF wasn't transferred")
  199. if result.value == 1:
  200. self.fail("Received data differs from what was expected")
  201. def test_060_qrexec_exit_code_dom0(self):
  202. self.testvm1.start()
  203. p = self.testvm1.run("exit 0", passio_popen=True)
  204. p.wait()
  205. self.assertEqual(0, p.returncode)
  206. p = self.testvm1.run("exit 3", passio_popen=True)
  207. p.wait()
  208. self.assertEqual(3, p.returncode)
  209. @unittest.expectedFailure
  210. def test_065_qrexec_exit_code_vm(self):
  211. self.testvm1.start()
  212. self.testvm2.start()
  213. policy = open("/etc/qubes-rpc/policy/test.Retcode", "w")
  214. policy.write("%s %s allow" % (self.testvm1.name, self.testvm2.name))
  215. policy.close()
  216. self.addCleanup(os.unlink, "/etc/qubes-rpc/policy/test.Retcode")
  217. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
  218. passio_popen=True)
  219. p.stdin.write("exit 0")
  220. p.stdin.close()
  221. p.wait()
  222. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.Retcode "
  223. "/bin/sh -c 'cat >/dev/null'; echo $?"
  224. % self.testvm1.name,
  225. passio_popen=True)
  226. (stdout, stderr) = p.communicate()
  227. self.assertEqual(stdout, "0\n")
  228. p = self.testvm2.run("cat > /etc/qubes-rpc/test.Retcode", user="root",
  229. passio_popen=True)
  230. p.stdin.write("exit 3")
  231. p.stdin.close()
  232. p.wait()
  233. p = self.testvm1.run("/usr/lib/qubes/qrexec-client-vm %s test.Retcode "
  234. "/bin/sh -c 'cat >/dev/null'; echo $?"
  235. % self.testvm1.name,
  236. passio_popen=True)
  237. (stdout, stderr) = p.communicate()
  238. self.assertEqual(stdout, "3\n")
  239. @unittest.skipUnless(spawn.find_executable('xdotool'),
  240. "xdotool not installed")
  241. def test_100_qrexec_filecopy(self):
  242. self.testvm1.start()
  243. self.testvm2.start()
  244. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  245. self.testvm2.name, passio_popen=True,
  246. passio_stderr=True)
  247. # Confirm transfer
  248. subprocess.check_call(['xdotool', 'search', '--sync', '--name', 'Question',
  249. 'key', 'y'])
  250. p.wait()
  251. self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
  252. p.stderr.read())
  253. retcode = self.testvm2.run("diff /etc/passwd "
  254. "/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True)
  255. self.assertEqual(retcode, 0, "file differs")
  256. @unittest.skipUnless(spawn.find_executable('xdotool'),
  257. "xdotool not installed")
  258. def test_110_qrexec_filecopy_deny(self):
  259. self.testvm1.start()
  260. self.testvm2.start()
  261. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  262. self.testvm2.name, passio_popen=True)
  263. # Deny transfer
  264. subprocess.check_call(['xdotool', 'search', '--sync', '--name', 'Question',
  265. 'key', 'n'])
  266. p.wait()
  267. self.assertNotEqual(p.returncode, 0, "qvm-copy-to-vm unexpectedly "
  268. "succeeded")
  269. retcode = self.testvm1.run("ls /home/user/QubesIncoming/%s" %
  270. self.testvm1.name, wait=True,
  271. ignore_stderr=True)
  272. self.assertNotEqual(retcode, 0, "QubesIncoming exists although file "
  273. "copy was "
  274. "denied")
  275. @unittest.skip("Xen gntalloc driver crashes when page is mapped in the "
  276. "same domain")
  277. @unittest.skipUnless(spawn.find_executable('xdotool'),
  278. "xdotool not installed")
  279. def test_120_qrexec_filecopy_self(self):
  280. self.testvm1.start()
  281. p = self.testvm1.run("qvm-copy-to-vm %s /etc/passwd" %
  282. self.testvm1.name, passio_popen=True,
  283. passio_stderr=True)
  284. # Confirm transfer
  285. subprocess.check_call(['xdotool', 'search', '--sync', '--name', 'Question',
  286. 'key', 'y'])
  287. p.wait()
  288. self.assertEqual(p.returncode, 0, "qvm-copy-to-vm failed: %s" %
  289. p.stderr.read())
  290. retcode = self.testvm1.run("diff /etc/passwd "
  291. "/home/user/QubesIncoming/%s/passwd" % self.testvm1.name, wait=True)
  292. self.assertEqual(retcode, 0, "file differs")
  293. def test_200_timezone(self):
  294. """Test whether timezone setting is properly propagated to the VM"""
  295. if "whonix" in self.template:
  296. self.skipTest("Timezone propagation disabled on Whonix templates")
  297. self.testvm1.start()
  298. (vm_tz, _) = self.testvm1.run("date +%Z",
  299. passio_popen=True).communicate()
  300. (dom0_tz, _) = subprocess.Popen(["date", "+%Z"],
  301. stdout=subprocess.PIPE).communicate()
  302. self.assertEqual(vm_tz.strip(), dom0_tz.strip())
  303. # Check if reverting back to UTC works
  304. (vm_tz, _) = self.testvm1.run("TZ=UTC date +%Z",
  305. passio_popen=True).communicate()
  306. self.assertEqual(vm_tz.strip(), "UTC")
  307. class TC_10_HVM(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  308. # TODO: test with some OS inside
  309. # TODO: windows tools tests
  310. def test_000_create_start(self):
  311. self.testvm1 = self.qc.add_new_vm("QubesHVm",
  312. name=self.make_vm_name('vm1'))
  313. self.testvm1.create_on_disk(verbose=False)
  314. self.qc.save()
  315. self.qc.unlock_db()
  316. self.testvm1.start()
  317. self.assertEquals(self.testvm1.get_power_state(), "Running")
  318. def test_010_create_start_template(self):
  319. self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
  320. name=self.make_vm_name('template'))
  321. self.templatevm.create_on_disk(verbose=False)
  322. self.qc.save()
  323. self.qc.unlock_db()
  324. self.templatevm.start()
  325. self.assertEquals(self.templatevm.get_power_state(), "Running")
  326. def test_020_create_start_template_vm(self):
  327. self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
  328. name=self.make_vm_name('template'))
  329. self.templatevm.create_on_disk(verbose=False)
  330. self.testvm2 = self.qc.add_new_vm("QubesHVm",
  331. name=self.make_vm_name('vm2'),
  332. template=self.templatevm)
  333. self.testvm2.create_on_disk(verbose=False)
  334. self.qc.save()
  335. self.qc.unlock_db()
  336. self.testvm2.start()
  337. self.assertEquals(self.testvm2.get_power_state(), "Running")
  338. def test_030_prevent_simultaneus_start(self):
  339. self.templatevm = self.qc.add_new_vm("QubesTemplateHVm",
  340. name=self.make_vm_name('template'))
  341. self.templatevm.create_on_disk(verbose=False)
  342. self.testvm2 = self.qc.add_new_vm("QubesHVm",
  343. name=self.make_vm_name('vm2'),
  344. template=self.templatevm)
  345. self.testvm2.create_on_disk(verbose=False)
  346. self.qc.save()
  347. self.qc.unlock_db()
  348. self.templatevm.start()
  349. self.assertEquals(self.templatevm.get_power_state(), "Running")
  350. self.assertRaises(QubesException, self.testvm2.start)
  351. self.templatevm.force_shutdown()
  352. self.testvm2.start()
  353. self.assertEquals(self.testvm2.get_power_state(), "Running")
  354. self.assertRaises(QubesException, self.templatevm.start)
  355. class TC_20_DispVMMixin(qubes.tests.SystemTestsMixin):
  356. def test_000_prepare_dvm(self):
  357. self.qc.unlock_db()
  358. retcode = subprocess.call(['/usr/bin/qvm-create-default-dvm',
  359. self.template],
  360. stderr=open(os.devnull, 'w'))
  361. self.assertEqual(retcode, 0)
  362. self.qc.lock_db_for_writing()
  363. self.qc.load()
  364. self.assertIsNotNone(self.qc.get_vm_by_name(
  365. self.template + "-dvm"))
  366. # TODO: check mtime of snapshot file
  367. def test_010_simple_dvm_run(self):
  368. self.qc.unlock_db()
  369. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  370. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  371. stdin=subprocess.PIPE,
  372. stdout=subprocess.PIPE,
  373. stderr=open(os.devnull, 'w'))
  374. (stdout, _) = p.communicate(input="echo test")
  375. self.assertEqual(stdout, "test\n")
  376. # TODO: check if DispVM is destroyed
  377. @unittest.skipUnless(spawn.find_executable('xdotool'),
  378. "xdotool not installed")
  379. def test_020_gui_app(self):
  380. self.qc.unlock_db()
  381. p = subprocess.Popen(['/usr/lib/qubes/qfile-daemon-dvm',
  382. 'qubes.VMShell', 'dom0', 'DEFAULT'],
  383. stdin=subprocess.PIPE,
  384. stdout=subprocess.PIPE,
  385. stderr=open(os.devnull, 'w'))
  386. # wait for DispVM startup:
  387. p.stdin.write("echo test\n")
  388. p.stdin.flush()
  389. l = p.stdout.readline()
  390. self.assertEqual(l, "test\n")
  391. # potential race condition, but our tests are supposed to be
  392. # running on dedicated machine, so should not be a problem
  393. self.qc.lock_db_for_reading()
  394. self.qc.load()
  395. self.qc.unlock_db()
  396. max_qid = 0
  397. for vm in self.qc.values():
  398. if not vm.is_disposablevm():
  399. continue
  400. if vm.qid > max_qid:
  401. max_qid = vm.qid
  402. dispvm = self.qc[max_qid]
  403. self.assertNotEqual(dispvm.qid, 0, "DispVM not found in qubes.xml")
  404. self.assertTrue(dispvm.is_running())
  405. window_title = 'user@%s' % (dispvm.template.name + "-dvm")
  406. p.stdin.write("gnome-terminal -e "
  407. "\"sh -s -c 'echo \\\"\033]0;{}\007\\\"'\"\n".
  408. format(window_title))
  409. wait_count = 0
  410. while subprocess.call(['xdotool', 'search', '--name', window_title],
  411. stdout=open(os.path.devnull, 'w'),
  412. stderr=subprocess.STDOUT) > 0:
  413. wait_count += 1
  414. if wait_count > 100:
  415. self.fail("Timeout while waiting for gnome-terminal window")
  416. time.sleep(0.1)
  417. time.sleep(0.5)
  418. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  419. 'windowactivate', 'type', 'exit\n'])
  420. wait_count = 0
  421. while subprocess.call(['xdotool', 'search', '--name', window_title],
  422. stdout=open(os.path.devnull, 'w'),
  423. stderr=subprocess.STDOUT) == 0:
  424. wait_count += 1
  425. if wait_count > 100:
  426. self.fail("Timeout while waiting for gnome-terminal "
  427. "termination")
  428. time.sleep(0.1)
  429. p.stdin.close()
  430. wait_count = 0
  431. while dispvm.is_running():
  432. wait_count += 1
  433. if wait_count > 100:
  434. self.fail("Timeout while waiting for DispVM destruction")
  435. time.sleep(0.1)
  436. wait_count = 0
  437. while p.poll() is None:
  438. wait_count += 1
  439. if wait_count > 100:
  440. self.fail("Timeout while waiting for qfile-daemon-dvm "
  441. "termination")
  442. time.sleep(0.1)
  443. self.assertEqual(p.returncode, 0)
  444. self.qc.lock_db_for_reading()
  445. self.qc.load()
  446. self.qc.unlock_db()
  447. self.assertIsNone(self.qc.get_vm_by_name(dispvm.name),
  448. "DispVM not removed from qubes.xml")
  449. def _handle_editor(self, winid):
  450. (window_title, _) = subprocess.Popen(
  451. ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
  452. communicate()
  453. window_title = window_title.strip().\
  454. replace('(', '\(').replace(')', '\)')
  455. time.sleep(1)
  456. if "gedit" in window_title:
  457. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  458. 'windowactivate', 'type', 'test test 2\n'])
  459. time.sleep(0.5)
  460. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  461. 'key', 'ctrl+s', 'ctrl+q'])
  462. elif "emacs" in window_title:
  463. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  464. 'windowactivate', 'type', 'test test 2\n'])
  465. time.sleep(0.5)
  466. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  467. 'key', 'ctrl+x', 'ctrl+s'])
  468. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  469. 'key', 'ctrl+x', 'ctrl+c'])
  470. elif "vim" in window_title:
  471. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  472. 'windowactivate', 'key', 'i',
  473. 'type', 'test test 2\n'])
  474. subprocess.check_call(
  475. ['xdotool', 'search', '--name', window_title,
  476. 'key', 'Escape', 'colon', 'w', 'q', 'Return'])
  477. else:
  478. self.fail("Unknown editor window: {}".format(window_title))
  479. @unittest.skipUnless(spawn.find_executable('xdotool'),
  480. "xdotool not installed")
  481. def test_030_edit_file(self):
  482. self.testvm1 = self.qc.add_new_vm("QubesAppVm",
  483. name=self.make_vm_name('vm1'),
  484. template=self.qc.get_vm_by_name(self.template))
  485. self.testvm1.create_on_disk(verbose=False)
  486. self.qc.save()
  487. self.testvm1.start()
  488. self.testvm1.run("echo test1 > /home/user/test.txt", wait=True)
  489. self.qc.unlock_db()
  490. p = self.testvm1.run("qvm-open-in-dvm /home/user/test.txt",
  491. passio_popen=True)
  492. wait_count = 0
  493. winid = None
  494. while True:
  495. search = subprocess.Popen(['xdotool', 'search',
  496. '--onlyvisible', '--class', 'disp*'],
  497. stdout=subprocess.PIPE,
  498. stderr=open(os.path.devnull, 'w'))
  499. retcode = search.wait()
  500. if retcode == 0:
  501. winid = search.stdout.read().strip()
  502. break
  503. wait_count += 1
  504. if wait_count > 100:
  505. self.fail("Timeout while waiting for editor window")
  506. time.sleep(0.3)
  507. self._handle_editor(winid)
  508. p.wait()
  509. p = self.testvm1.run("cat /home/user/test.txt",
  510. passio_popen=True)
  511. (test_txt_content, _) = p.communicate()
  512. self.assertEqual(test_txt_content, "test test 2\ntest1\n")
  513. class TC_30_Gui_daemon(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  514. @unittest.skipUnless(spawn.find_executable('xdotool'),
  515. "xdotool not installed")
  516. def test_000_clipboard(self):
  517. self.testvm1 = self.qc.add_new_vm("QubesAppVm",
  518. name=self.make_vm_name('vm1'),
  519. template=self.qc.get_default_template())
  520. self.testvm1.create_on_disk(verbose=False)
  521. self.testvm2 = self.qc.add_new_vm("QubesAppVm",
  522. name=self.make_vm_name('vm2'),
  523. template=self.qc.get_default_template())
  524. self.testvm2.create_on_disk(verbose=False)
  525. self.qc.save()
  526. self.qc.unlock_db()
  527. self.testvm1.start()
  528. self.testvm2.start()
  529. window_title = 'user@{}'.format(self.testvm1.name)
  530. self.testvm1.run('zenity --text-info --editable --title={}'.format(
  531. window_title))
  532. wait_count = 0
  533. while subprocess.call(['xdotool', 'search', '--name', window_title],
  534. stdout=open(os.path.devnull, 'w'),
  535. stderr=subprocess.STDOUT) > 0:
  536. wait_count += 1
  537. if wait_count > 100:
  538. self.fail("Timeout while waiting for text-info window")
  539. time.sleep(0.1)
  540. time.sleep(0.5)
  541. test_string = "test{}".format(self.testvm1.xid)
  542. # Type and copy some text
  543. subprocess.check_call(['xdotool', 'search', '--name', window_title,
  544. 'windowactivate',
  545. 'type', '{}'.format(test_string)])
  546. # second xdotool call because type --terminator do not work (SEGV)
  547. # additionally do not use search here, so window stack will be empty
  548. # and xdotool will use XTEST instead of generating events manually -
  549. # this will be much better - at least because events will have
  550. # correct timestamp (so gui-daemon would not drop the copy request)
  551. subprocess.check_call(['xdotool',
  552. 'key', 'ctrl+a', 'ctrl+c', 'ctrl+shift+c',
  553. 'Escape'])
  554. clipboard_content = \
  555. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  556. self.assertEquals(clipboard_content, test_string,
  557. "Clipboard copy operation failed - content")
  558. clipboard_source = \
  559. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read().strip()
  560. self.assertEquals(clipboard_source, self.testvm1.name,
  561. "Clipboard copy operation failed - owner")
  562. # Then paste it to the other window
  563. window_title = 'user@{}'.format(self.testvm2.name)
  564. self.testvm2.run('zenity --entry --title={} > test.txt'.format(
  565. window_title))
  566. wait_count = 0
  567. while subprocess.call(['xdotool', 'search', '--name', window_title],
  568. stdout=open(os.path.devnull, 'w'),
  569. stderr=subprocess.STDOUT) > 0:
  570. wait_count += 1
  571. if wait_count > 100:
  572. self.fail("Timeout while waiting for input window")
  573. time.sleep(0.1)
  574. subprocess.check_call(['xdotool', 'key', '--delay', '100',
  575. 'ctrl+shift+v', 'ctrl+v', 'Return'])
  576. time.sleep(0.5)
  577. # And compare the result
  578. (test_output, _) = self.testvm2.run('cat test.txt',
  579. passio_popen=True).communicate()
  580. self.assertEquals(test_string, test_output.strip())
  581. clipboard_content = \
  582. open('/var/run/qubes/qubes-clipboard.bin', 'r').read().strip()
  583. self.assertEquals(clipboard_content, "",
  584. "Clipboard not wiped after paste - content")
  585. clipboard_source = \
  586. open('/var/run/qubes/qubes-clipboard.bin.source', 'r').read(
  587. ).strip()
  588. self.assertEquals(clipboard_source, "",
  589. "Clipboard not wiped after paste - owner")
  590. def load_tests(loader, tests, pattern):
  591. try:
  592. qc = qubes.qubes.QubesVmCollection()
  593. qc.lock_db_for_reading()
  594. qc.load()
  595. qc.unlock_db()
  596. templates = [vm.name for vm in qc.values() if
  597. isinstance(vm, qubes.qubes.QubesTemplateVm)]
  598. except OSError:
  599. templates = []
  600. for template in templates:
  601. tests.addTests(loader.loadTestsFromTestCase(
  602. type(
  603. 'TC_00_AppVM_' + template,
  604. (TC_00_AppVMMixin, qubes.tests.QubesTestCase),
  605. {'template': template})))
  606. tests.addTests(loader.loadTestsFromTestCase(
  607. type(
  608. 'TC_20_DispVM_' + template,
  609. (TC_20_DispVMMixin, qubes.tests.QubesTestCase),
  610. {'template': template})))
  611. return tests