qvm_run.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import unittest.mock
  22. import subprocess
  23. import sys
  24. import qubesadmin.tests
  25. import qubesadmin.tools.qvm_run
  26. class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
  27. def setUp(self):
  28. if sys.stdout is not sys.__stdout__ or \
  29. sys.stderr is not sys.__stderr__:
  30. self.skipTest('qvm-run change behavior on redirected stdout/stderr')
  31. super(TC_00_qvm_run, self).setUp()
  32. def test_000_run_single(self):
  33. self.app.expected_calls[
  34. ('dom0', 'admin.vm.List', None, None)] = \
  35. b'0\x00test-vm class=AppVM state=Running\n'
  36. # self.app.expected_calls[
  37. # ('test-vm', 'admin.vm.List', None, None)] = \
  38. # b'0\x00test-vm class=AppVM state=Running\n'
  39. ret = qubesadmin.tools.qvm_run.main(
  40. ['--no-gui', 'test-vm', 'command'],
  41. app=self.app)
  42. self.assertEqual(ret, 0)
  43. self.assertEqual(self.app.service_calls, [
  44. ('test-vm', 'qubes.VMShell', {
  45. 'filter_esc': True,
  46. 'localcmd': None,
  47. 'stdout': subprocess.DEVNULL,
  48. 'stderr': subprocess.DEVNULL,
  49. 'user': None,
  50. }),
  51. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  52. ])
  53. self.assertAllCalled()
  54. def test_001_run_multiple(self):
  55. self.app.expected_calls[
  56. ('dom0', 'admin.vm.List', None, None)] = \
  57. b'0\x00test-vm class=AppVM state=Running\n' \
  58. b'test-vm2 class=AppVM state=Running\n'
  59. # self.app.expected_calls[
  60. # ('test-vm', 'admin.vm.List', None, None)] = \
  61. # b'0\x00test-vm class=AppVM state=Running\n'
  62. ret = qubesadmin.tools.qvm_run.main(
  63. ['--no-gui', 'test-vm', 'test-vm2', 'command'],
  64. app=self.app)
  65. self.assertEqual(ret, 0)
  66. self.assertEqual(self.app.service_calls, [
  67. ('test-vm', 'qubes.VMShell', {
  68. 'filter_esc': True,
  69. 'localcmd': None,
  70. 'stdout': subprocess.DEVNULL,
  71. 'stderr': subprocess.DEVNULL,
  72. 'user': None,
  73. }),
  74. ('test-vm', 'qubes.VMShell', b'command; exit\n'),
  75. ('test-vm2', 'qubes.VMShell', {
  76. 'filter_esc': True,
  77. 'localcmd': None,
  78. 'stdout': subprocess.DEVNULL,
  79. 'stderr': subprocess.DEVNULL,
  80. 'user': None,
  81. }),
  82. ('test-vm2', 'qubes.VMShell', b'command; exit\n')
  83. ])
  84. self.assertAllCalled()
  85. def test_002_passio(self):
  86. self.app.expected_calls[
  87. ('dom0', 'admin.vm.List', None, None)] = \
  88. b'0\x00test-vm class=AppVM state=Running\n'
  89. # self.app.expected_calls[
  90. # ('test-vm', 'admin.vm.List', None, None)] = \
  91. # b'0\x00test-vm class=AppVM state=Running\n'
  92. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  93. with unittest.mock.patch('sys.stdin', echo.stdout):
  94. ret = qubesadmin.tools.qvm_run.main(
  95. ['--no-gui', '--pass-io', 'test-vm', 'command'],
  96. app=self.app)
  97. self.assertEqual(ret, 0)
  98. self.assertEqual(self.app.service_calls, [
  99. ('test-vm', 'qubes.VMShell', {
  100. 'filter_esc': True,
  101. 'localcmd': None,
  102. 'stdout': None,
  103. 'stderr': None,
  104. 'user': None,
  105. }),
  106. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  107. ])
  108. self.assertAllCalled()
  109. def test_002_color_output(self):
  110. self.app.expected_calls[
  111. ('dom0', 'admin.vm.List', None, None)] = \
  112. b'0\x00test-vm class=AppVM state=Running\n'
  113. # self.app.expected_calls[
  114. # ('test-vm', 'admin.vm.List', None, None)] = \
  115. # b'0\x00test-vm class=AppVM state=Running\n'
  116. stdout = io.StringIO()
  117. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  118. with unittest.mock.patch('sys.stdin', echo.stdout):
  119. with unittest.mock.patch('sys.stdout', stdout):
  120. ret = qubesadmin.tools.qvm_run.main(
  121. ['--no-gui', '--pass-io', 'test-vm', 'command'],
  122. app=self.app)
  123. self.assertEqual(ret, 0)
  124. self.assertEqual(self.app.service_calls, [
  125. ('test-vm', 'qubes.VMShell', {
  126. 'filter_esc': True,
  127. 'localcmd': None,
  128. 'stdout': None,
  129. 'stderr': None,
  130. 'user': None,
  131. }),
  132. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  133. ])
  134. self.assertEqual(stdout.getvalue(), '\033[0;31m\033[0m')
  135. stdout.close()
  136. self.assertAllCalled()
  137. def test_003_no_color_output(self):
  138. self.app.expected_calls[
  139. ('dom0', 'admin.vm.List', None, None)] = \
  140. b'0\x00test-vm class=AppVM state=Running\n'
  141. # self.app.expected_calls[
  142. # ('test-vm', 'admin.vm.List', None, None)] = \
  143. # b'0\x00test-vm class=AppVM state=Running\n'
  144. stdout = io.StringIO()
  145. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  146. with unittest.mock.patch('sys.stdin', echo.stdout):
  147. with unittest.mock.patch('sys.stdout', stdout):
  148. ret = qubesadmin.tools.qvm_run.main(
  149. ['--no-gui', '--pass-io', '--no-color-output',
  150. 'test-vm', 'command'],
  151. app=self.app)
  152. self.assertEqual(ret, 0)
  153. self.assertEqual(self.app.service_calls, [
  154. ('test-vm', 'qubes.VMShell', {
  155. 'filter_esc': True,
  156. 'localcmd': None,
  157. 'stdout': None,
  158. 'stderr': None,
  159. 'user': None,
  160. }),
  161. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  162. ])
  163. self.assertEqual(stdout.getvalue(), '')
  164. stdout.close()
  165. self.assertAllCalled()
  166. def test_004_no_filter_esc(self):
  167. self.app.expected_calls[
  168. ('dom0', 'admin.vm.List', None, None)] = \
  169. b'0\x00test-vm class=AppVM state=Running\n'
  170. # self.app.expected_calls[
  171. # ('test-vm', 'admin.vm.List', None, None)] = \
  172. # b'0\x00test-vm class=AppVM state=Running\n'
  173. stdout = io.StringIO()
  174. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  175. with unittest.mock.patch('sys.stdin', echo.stdout):
  176. with unittest.mock.patch('sys.stdout', stdout):
  177. ret = qubesadmin.tools.qvm_run.main(
  178. ['--no-gui', '--pass-io', '--no-filter-esc',
  179. 'test-vm', 'command'],
  180. app=self.app)
  181. self.assertEqual(ret, 0)
  182. self.assertEqual(self.app.service_calls, [
  183. ('test-vm', 'qubes.VMShell', {
  184. 'filter_esc': False,
  185. 'localcmd': None,
  186. 'stdout': None,
  187. 'stderr': None,
  188. 'user': None,
  189. }),
  190. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  191. ])
  192. self.assertEqual(stdout.getvalue(), '')
  193. stdout.close()
  194. self.assertAllCalled()
  195. def test_005_localcmd(self):
  196. self.app.expected_calls[
  197. ('dom0', 'admin.vm.List', None, None)] = \
  198. b'0\x00test-vm class=AppVM state=Running\n'
  199. # self.app.expected_calls[
  200. # ('test-vm', 'admin.vm.List', None, None)] = \
  201. # b'0\x00test-vm class=AppVM state=Running\n'
  202. ret = qubesadmin.tools.qvm_run.main(
  203. ['--no-gui', '--pass-io', '--localcmd', 'local-command',
  204. 'test-vm', 'command'],
  205. app=self.app)
  206. self.assertEqual(ret, 0)
  207. self.assertEqual(self.app.service_calls, [
  208. ('test-vm', 'qubes.VMShell', {
  209. 'filter_esc': True,
  210. 'localcmd': 'local-command',
  211. 'stdout': None,
  212. 'stderr': None,
  213. 'user': None,
  214. }),
  215. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  216. ])
  217. self.assertAllCalled()
  218. def test_006_run_single_with_gui(self):
  219. self.app.expected_calls[
  220. ('dom0', 'admin.vm.List', None, None)] = \
  221. b'0\x00test-vm class=AppVM state=Running\n'
  222. self.app.expected_calls[
  223. ('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
  224. b'0\x00default=yes type=str user'
  225. # self.app.expected_calls[
  226. # ('test-vm', 'admin.vm.List', None, None)] = \
  227. # b'0\x00test-vm class=AppVM state=Running\n'
  228. ret = qubesadmin.tools.qvm_run.main(
  229. ['test-vm', 'command'],
  230. app=self.app)
  231. self.assertEqual(ret, 0)
  232. # make sure we have the same instance below
  233. self.assertEqual(self.app.service_calls, [
  234. ('test-vm', 'qubes.WaitForSession', {
  235. 'stdout': subprocess.DEVNULL,
  236. 'stderr': subprocess.DEVNULL,
  237. }),
  238. ('test-vm', 'qubes.WaitForSession', b'user'),
  239. ('test-vm', 'qubes.VMShell', {
  240. 'filter_esc': True,
  241. 'localcmd': None,
  242. 'stdout': subprocess.DEVNULL,
  243. 'stderr': subprocess.DEVNULL,
  244. 'user': None,
  245. }),
  246. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  247. ])
  248. self.assertAllCalled()
  249. def test_007_run_service_with_gui(self):
  250. self.app.expected_calls[
  251. ('dom0', 'admin.vm.List', None, None)] = \
  252. b'0\x00test-vm class=AppVM state=Running\n'
  253. self.app.expected_calls[
  254. ('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
  255. b'0\x00default=yes type=str user'
  256. # self.app.expected_calls[
  257. # ('test-vm', 'admin.vm.List', None, None)] = \
  258. # b'0\x00test-vm class=AppVM state=Running\n'
  259. ret = qubesadmin.tools.qvm_run.main(
  260. ['--service', 'test-vm', 'service.name'],
  261. app=self.app)
  262. self.assertEqual(ret, 0)
  263. # make sure we have the same instance below
  264. self.assertEqual(self.app.service_calls, [
  265. ('test-vm', 'qubes.WaitForSession', {
  266. 'stdout': subprocess.DEVNULL,
  267. 'stderr': subprocess.DEVNULL,
  268. }),
  269. ('test-vm', 'qubes.WaitForSession', b'user'),
  270. ('test-vm', 'service.name', {
  271. 'filter_esc': True,
  272. 'localcmd': None,
  273. 'stdout': subprocess.DEVNULL,
  274. 'stderr': subprocess.DEVNULL,
  275. 'user': None,
  276. }),
  277. ('test-vm', 'service.name', b''),
  278. ])
  279. self.assertAllCalled()