dispvm.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2016 Marek Marczykowski-Górecki
  5. # <marmarek@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import os
  21. import subprocess
  22. import tempfile
  23. import time
  24. import unittest
  25. from contextlib import suppress
  26. from distutils import spawn
  27. import asyncio
  28. import sys
  29. import qubes.tests
  30. class TC_04_DispVM(qubes.tests.SystemTestCase):
  31. def setUp(self):
  32. super(TC_04_DispVM, self).setUp()
  33. self.init_default_template()
  34. self.disp_base = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  35. name=self.make_vm_name('dvm'),
  36. label='red',
  37. )
  38. self.loop.run_until_complete(self.disp_base.create_on_disk())
  39. self.disp_base.template_for_dispvms = True
  40. self.app.default_dispvm = self.disp_base
  41. self.testvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  42. name=self.make_vm_name('vm'),
  43. label='red',
  44. )
  45. self.loop.run_until_complete(self.testvm.create_on_disk())
  46. self.app.save()
  47. def tearDown(self):
  48. self.app.default_dispvm = None
  49. super(TC_04_DispVM, self).tearDown()
  50. def test_002_cleanup(self):
  51. self.loop.run_until_complete(self.testvm.start())
  52. try:
  53. (stdout, _) = self.loop.run_until_complete(
  54. self.testvm.run_for_stdio("qvm-run-vm --dispvm bash",
  55. input=b"echo test; qubesdb-read /name; echo ERROR\n"))
  56. except subprocess.CalledProcessError as err:
  57. self.fail('qvm-run-vm failed with {} code, stderr: {}'.format(
  58. err.returncode, err.stderr))
  59. lines = stdout.decode('ascii').splitlines()
  60. self.assertEqual(lines[0], "test")
  61. dispvm_name = lines[1]
  62. # wait for actual DispVM destruction
  63. self.loop.run_until_complete(asyncio.sleep(5))
  64. self.assertNotIn(dispvm_name, self.app.domains)
  65. def test_003_cleanup_destroyed(self):
  66. """
  67. Check if DispVM is properly removed even if it terminated itself (#1660)
  68. :return:
  69. """
  70. self.loop.run_until_complete(self.testvm.start())
  71. p = self.loop.run_until_complete(
  72. self.testvm.run("qvm-run-vm --dispvm bash; true",
  73. stdin=subprocess.PIPE, stdout=subprocess.PIPE))
  74. p.stdin.write(b"qubesdb-read /name\n")
  75. p.stdin.write(b"echo ERROR\n")
  76. p.stdin.write(b"sudo poweroff\n")
  77. # do not close p.stdin on purpose - wait to automatic disconnect when
  78. # domain is destroyed
  79. timeout = 70
  80. lines_task = asyncio.ensure_future(p.stdout.read())
  81. self.loop.run_until_complete(asyncio.wait_for(p.wait(), timeout))
  82. self.loop.run_until_complete(lines_task)
  83. lines = lines_task.result().splitlines()
  84. self.assertTrue(lines, 'No output received from DispVM')
  85. dispvm_name = lines[0]
  86. self.assertNotEquals(dispvm_name, b"ERROR")
  87. self.assertNotIn(dispvm_name, self.app.domains)
  88. class TC_20_DispVMMixin(object):
  89. def setUp(self):
  90. super(TC_20_DispVMMixin, self).setUp()
  91. self.init_default_template(self.template)
  92. self.disp_base = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  93. name=self.make_vm_name('dvm'),
  94. label='red', template_for_dispvms=True,
  95. )
  96. self.loop.run_until_complete(self.disp_base.create_on_disk())
  97. self.app.default_dispvm = self.disp_base
  98. self.app.save()
  99. def tearDown(self):
  100. self.app.default_dispvm = None
  101. super(TC_20_DispVMMixin, self).tearDown()
  102. def test_010_simple_dvm_run(self):
  103. dispvm = self.loop.run_until_complete(
  104. qubes.vm.dispvm.DispVM.from_appvm(self.disp_base))
  105. try:
  106. self.loop.run_until_complete(dispvm.start())
  107. (stdout, _) = self.loop.run_until_complete(
  108. dispvm.run_service_for_stdio('qubes.VMShell',
  109. input=b"echo test"))
  110. self.assertEqual(stdout, b"test\n")
  111. finally:
  112. self.loop.run_until_complete(dispvm.cleanup())
  113. @unittest.skipUnless(spawn.find_executable('xdotool'),
  114. "xdotool not installed")
  115. def test_020_gui_app(self):
  116. dispvm = self.loop.run_until_complete(
  117. qubes.vm.dispvm.DispVM.from_appvm(self.disp_base))
  118. try:
  119. self.loop.run_until_complete(dispvm.start())
  120. self.loop.run_until_complete(self.wait_for_session(dispvm))
  121. p = self.loop.run_until_complete(
  122. dispvm.run_service('qubes.VMShell',
  123. stdin=subprocess.PIPE,
  124. stdout=subprocess.PIPE))
  125. # wait for DispVM startup:
  126. p.stdin.write(b"echo test\n")
  127. self.loop.run_until_complete(p.stdin.drain())
  128. l = self.loop.run_until_complete(p.stdout.readline())
  129. self.assertEqual(l, b"test\n")
  130. self.assertTrue(dispvm.is_running())
  131. try:
  132. window_title = 'user@%s' % (dispvm.name,)
  133. # close xterm on Return, but after short delay, to allow
  134. # xdotool to send also keyup event
  135. p.stdin.write("xterm -e "
  136. "\"sh -c 'echo \\\"\033]0;{}\007\\\";read x;"
  137. "sleep 0.1;'\"\n".
  138. format(window_title).encode())
  139. self.loop.run_until_complete(p.stdin.drain())
  140. self.wait_for_window(window_title)
  141. time.sleep(0.5)
  142. self.enter_keys_in_window(window_title, ['Return'])
  143. # Wait for window to close
  144. self.wait_for_window(window_title, show=False)
  145. p.stdin.close()
  146. self.loop.run_until_complete(
  147. asyncio.wait_for(p.wait(), 30))
  148. except:
  149. with suppress(ProcessLookupError):
  150. p.terminate()
  151. self.loop.run_until_complete(p.wait())
  152. raise
  153. finally:
  154. del p
  155. finally:
  156. self.loop.run_until_complete(dispvm.cleanup())
  157. dispvm_name = dispvm.name
  158. del dispvm
  159. # give it a time for shutdown + cleanup
  160. self.loop.run_until_complete(asyncio.sleep(5))
  161. self.assertNotIn(dispvm_name, self.app.domains,
  162. "DispVM not removed from qubes.xml")
  163. def _handle_editor(self, winid):
  164. (window_title, _) = subprocess.Popen(
  165. ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE).\
  166. communicate()
  167. window_title = window_title.decode().strip().\
  168. replace('(', '\(').replace(')', '\)')
  169. time.sleep(1)
  170. if "gedit" in window_title or 'KWrite' in window_title:
  171. subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
  172. 'type', 'Test test 2'])
  173. subprocess.check_call(['xdotool', 'key', '--window', winid,
  174. 'key', 'Return'])
  175. time.sleep(0.5)
  176. subprocess.check_call(['xdotool',
  177. 'key', 'ctrl+s', 'ctrl+q'])
  178. elif "LibreOffice" in window_title:
  179. # wait for actual editor (we've got splash screen)
  180. search = subprocess.Popen(['xdotool', 'search', '--sync',
  181. '--onlyvisible', '--all', '--name', '--class', 'disp*|Writer'],
  182. stdout=subprocess.PIPE,
  183. stderr=open(os.path.devnull, 'w'))
  184. retcode = search.wait()
  185. if retcode == 0:
  186. winid = search.stdout.read().strip()
  187. time.sleep(0.5)
  188. subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
  189. 'type', 'Test test 2'])
  190. subprocess.check_call(['xdotool', 'key', '--window', winid,
  191. 'key', 'Return'])
  192. time.sleep(0.5)
  193. subprocess.check_call(['xdotool',
  194. 'key', '--delay', '100', 'ctrl+s',
  195. 'Return', 'ctrl+q'])
  196. elif "emacs" in window_title:
  197. subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
  198. 'type', 'Test test 2'])
  199. subprocess.check_call(['xdotool', 'key', '--window', winid,
  200. 'key', 'Return'])
  201. time.sleep(0.5)
  202. subprocess.check_call(['xdotool',
  203. 'key', 'ctrl+x', 'ctrl+s'])
  204. subprocess.check_call(['xdotool',
  205. 'key', 'ctrl+x', 'ctrl+c'])
  206. elif "vim" in window_title or "user@" in window_title:
  207. subprocess.check_call(['xdotool', 'windowactivate', '--sync', winid,
  208. 'key', 'i', 'type', 'Test test 2'])
  209. subprocess.check_call(['xdotool', 'key', '--window', winid,
  210. 'key', 'Return'])
  211. subprocess.check_call(
  212. ['xdotool',
  213. 'key', 'Escape', 'colon', 'w', 'q', 'Return'])
  214. else:
  215. self.fail("Unknown editor window: {}".format(window_title))
  216. @unittest.skipUnless(spawn.find_executable('xdotool'),
  217. "xdotool not installed")
  218. def test_030_edit_file(self):
  219. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  220. name=self.make_vm_name('vm1'),
  221. label='red',
  222. template=self.app.domains[self.template])
  223. self.loop.run_until_complete(self.testvm1.create_on_disk())
  224. self.app.save()
  225. self.loop.run_until_complete(self.testvm1.start())
  226. self.loop.run_until_complete(
  227. self.testvm1.run_for_stdio("echo test1 > /home/user/test.txt"))
  228. p = self.loop.run_until_complete(
  229. self.testvm1.run("qvm-open-in-dvm /home/user/test.txt",
  230. stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
  231. # if first 5 windows isn't expected editor, there is no hope
  232. winid = None
  233. for _ in range(5):
  234. try:
  235. winid = self.wait_for_window('disp[0-9]*', search_class=True)
  236. except Exception as e:
  237. try:
  238. self.loop.run_until_complete(asyncio.wait_for(p.wait(), 1))
  239. except asyncio.TimeoutError:
  240. raise e
  241. else:
  242. stdout = self.loop.run_until_complete(p.stdout.read())
  243. self.fail(
  244. 'qvm-open-in-dvm exited prematurely with {}: {}'.format(
  245. p.returncode, stdout))
  246. # get window title
  247. (window_title, _) = subprocess.Popen(
  248. ['xdotool', 'getwindowname', winid], stdout=subprocess.PIPE). \
  249. communicate()
  250. window_title = window_title.decode().strip()
  251. # ignore LibreOffice splash screen and window with no title
  252. # set yet
  253. if window_title and not window_title.startswith("LibreOffice")\
  254. and not window_title == 'VMapp command' \
  255. and 'whonixcheck' not in window_title \
  256. and not window_title == 'NetworkManager Applet':
  257. break
  258. self.loop.run_until_complete(asyncio.sleep(1))
  259. winid = None
  260. if winid is None:
  261. self.fail('Timeout waiting for editor window')
  262. time.sleep(0.5)
  263. self._handle_editor(winid)
  264. self.loop.run_until_complete(p.communicate())
  265. (test_txt_content, _) = self.loop.run_until_complete(
  266. self.testvm1.run_for_stdio("cat /home/user/test.txt"))
  267. # Drop BOM if added by editor
  268. if test_txt_content.startswith(b'\xef\xbb\xbf'):
  269. test_txt_content = test_txt_content[3:]
  270. self.assertEqual(test_txt_content, b"Test test 2\ntest1\n")
  271. def create_testcases_for_templates():
  272. return qubes.tests.create_testcases_for_templates('TC_20_DispVM',
  273. TC_20_DispVMMixin, qubes.tests.SystemTestCase,
  274. module=sys.modules[__name__])
  275. def load_tests(loader, tests, pattern):
  276. tests.addTests(loader.loadTestsFromNames(
  277. create_testcases_for_templates()))
  278. return tests
  279. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)