qvm_run.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import os
  22. import unittest.mock
  23. import subprocess
  24. import sys
  25. import qubesadmin.tests
  26. import qubesadmin.tools.qvm_run
  27. class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
  28. def setUp(self):
  29. if sys.stdout is not sys.__stdout__ or \
  30. sys.stderr is not sys.__stderr__:
  31. self.skipTest('qvm-run change behavior on redirected stdout/stderr')
  32. super(TC_00_qvm_run, self).setUp()
  33. def default_filter_esc(self):
  34. return os.isatty(sys.stdout.fileno())
  35. def test_000_run_single(self):
  36. self.app.expected_calls[
  37. ('dom0', 'admin.vm.List', None, None)] = \
  38. b'0\x00test-vm class=AppVM state=Running\n'
  39. # self.app.expected_calls[
  40. # ('test-vm', 'admin.vm.List', None, None)] = \
  41. # b'0\x00test-vm class=AppVM state=Running\n'
  42. ret = qubesadmin.tools.qvm_run.main(
  43. ['--no-gui', 'test-vm', 'command'],
  44. app=self.app)
  45. self.assertEqual(ret, 0)
  46. self.assertEqual(self.app.service_calls, [
  47. ('test-vm', 'qubes.VMShell', {
  48. 'localcmd': None,
  49. 'stdout': subprocess.DEVNULL,
  50. 'stderr': subprocess.DEVNULL,
  51. 'user': None,
  52. }),
  53. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  54. ])
  55. self.assertAllCalled()
  56. def test_001_run_multiple(self):
  57. self.app.expected_calls[
  58. ('dom0', 'admin.vm.List', None, None)] = \
  59. b'0\x00test-vm class=AppVM state=Running\n' \
  60. b'test-vm2 class=AppVM state=Running\n' \
  61. b'test-vm3 class=AppVM state=Halted\n'
  62. self.app.expected_calls[
  63. ('test-vm', 'admin.vm.List', None, None)] = \
  64. b'0\x00test-vm class=AppVM state=Running\n'
  65. self.app.expected_calls[
  66. ('test-vm2', 'admin.vm.List', None, None)] = \
  67. b'0\x00test-vm2 class=AppVM state=Running\n'
  68. self.app.expected_calls[
  69. ('test-vm3', 'admin.vm.List', None, None)] = \
  70. b'0\x00test-vm3 class=AppVM state=Halted\n'
  71. ret = qubesadmin.tools.qvm_run.main(
  72. ['--no-gui', '--all', 'command'],
  73. app=self.app)
  74. self.assertEqual(ret, 0)
  75. self.assertEqual(self.app.service_calls, [
  76. ('test-vm', 'qubes.VMShell', {
  77. 'localcmd': None,
  78. 'stdout': subprocess.DEVNULL,
  79. 'stderr': subprocess.DEVNULL,
  80. 'user': None,
  81. }),
  82. ('test-vm', 'qubes.VMShell', b'command; exit\n'),
  83. ('test-vm2', 'qubes.VMShell', {
  84. 'localcmd': None,
  85. 'stdout': subprocess.DEVNULL,
  86. 'stderr': subprocess.DEVNULL,
  87. 'user': None,
  88. }),
  89. ('test-vm2', 'qubes.VMShell', b'command; exit\n')
  90. ])
  91. self.assertAllCalled()
  92. @unittest.expectedFailure
  93. def test_002_passio(self):
  94. self.app.expected_calls[
  95. ('dom0', 'admin.vm.List', None, None)] = \
  96. b'0\x00test-vm class=AppVM state=Running\n'
  97. # self.app.expected_calls[
  98. # ('test-vm', 'admin.vm.List', None, None)] = \
  99. # b'0\x00test-vm class=AppVM state=Running\n'
  100. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  101. with unittest.mock.patch('sys.stdin', echo.stdout):
  102. ret = qubesadmin.tools.qvm_run.main(
  103. ['--no-gui', '--pass-io', 'test-vm', 'command'],
  104. app=self.app)
  105. self.assertEqual(ret, 0)
  106. self.assertEqual(self.app.service_calls, [
  107. ('test-vm', 'qubes.VMShell', {
  108. 'filter_esc': self.default_filter_esc(),
  109. 'localcmd': None,
  110. 'stdout': None,
  111. 'stderr': None,
  112. 'user': None,
  113. }),
  114. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  115. ])
  116. self.assertAllCalled()
  117. @unittest.expectedFailure
  118. def test_002_color_output(self):
  119. self.app.expected_calls[
  120. ('dom0', 'admin.vm.List', None, None)] = \
  121. b'0\x00test-vm class=AppVM state=Running\n'
  122. # self.app.expected_calls[
  123. # ('test-vm', 'admin.vm.List', None, None)] = \
  124. # b'0\x00test-vm class=AppVM state=Running\n'
  125. stdout = io.StringIO()
  126. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  127. with unittest.mock.patch('sys.stdin', echo.stdout):
  128. with unittest.mock.patch('sys.stdout', stdout):
  129. ret = qubesadmin.tools.qvm_run.main(
  130. ['--no-gui', '--filter-esc', '--pass-io', 'test-vm',
  131. 'command'],
  132. app=self.app)
  133. self.assertEqual(ret, 0)
  134. self.assertEqual(self.app.service_calls, [
  135. ('test-vm', 'qubes.VMShell', {
  136. 'filter_esc': True,
  137. 'localcmd': None,
  138. 'stdout': None,
  139. 'stderr': None,
  140. 'user': None,
  141. }),
  142. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  143. ])
  144. self.assertEqual(stdout.getvalue(), '\033[0;31m\033[0m')
  145. stdout.close()
  146. self.assertAllCalled()
  147. @unittest.expectedFailure
  148. def test_003_no_color_output(self):
  149. self.app.expected_calls[
  150. ('dom0', 'admin.vm.List', None, None)] = \
  151. b'0\x00test-vm class=AppVM state=Running\n'
  152. # self.app.expected_calls[
  153. # ('test-vm', 'admin.vm.List', None, None)] = \
  154. # b'0\x00test-vm class=AppVM state=Running\n'
  155. stdout = io.StringIO()
  156. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  157. with unittest.mock.patch('sys.stdin', echo.stdout):
  158. with unittest.mock.patch('sys.stdout', stdout):
  159. ret = qubesadmin.tools.qvm_run.main(
  160. ['--no-gui', '--pass-io', '--no-color-output',
  161. 'test-vm', 'command'],
  162. app=self.app)
  163. self.assertEqual(ret, 0)
  164. self.assertEqual(self.app.service_calls, [
  165. ('test-vm', 'qubes.VMShell', {
  166. 'filter_esc': self.default_filter_esc(),
  167. 'localcmd': None,
  168. 'stdout': None,
  169. 'stderr': None,
  170. 'user': None,
  171. }),
  172. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  173. ])
  174. self.assertEqual(stdout.getvalue(), '')
  175. stdout.close()
  176. self.assertAllCalled()
  177. @unittest.expectedFailure
  178. def test_004_no_filter_esc(self):
  179. self.app.expected_calls[
  180. ('dom0', 'admin.vm.List', None, None)] = \
  181. b'0\x00test-vm class=AppVM state=Running\n'
  182. # self.app.expected_calls[
  183. # ('test-vm', 'admin.vm.List', None, None)] = \
  184. # b'0\x00test-vm class=AppVM state=Running\n'
  185. stdout = io.StringIO()
  186. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  187. with unittest.mock.patch('sys.stdin', echo.stdout):
  188. with unittest.mock.patch('sys.stdout', stdout):
  189. ret = qubesadmin.tools.qvm_run.main(
  190. ['--no-gui', '--pass-io', '--no-filter-esc',
  191. 'test-vm', 'command'],
  192. app=self.app)
  193. self.assertEqual(ret, 0)
  194. self.assertEqual(self.app.service_calls, [
  195. ('test-vm', 'qubes.VMShell', {
  196. 'filter_esc': False,
  197. 'localcmd': None,
  198. 'stdout': None,
  199. 'stderr': None,
  200. 'user': None,
  201. }),
  202. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  203. ])
  204. self.assertEqual(stdout.getvalue(), '')
  205. stdout.close()
  206. self.assertAllCalled()
  207. def test_005_localcmd(self):
  208. self.app.expected_calls[
  209. ('dom0', 'admin.vm.List', None, None)] = \
  210. b'0\x00test-vm class=AppVM state=Running\n'
  211. # self.app.expected_calls[
  212. # ('test-vm', 'admin.vm.List', None, None)] = \
  213. # b'0\x00test-vm class=AppVM state=Running\n'
  214. ret = qubesadmin.tools.qvm_run.main(
  215. ['--no-gui', '--pass-io', '--localcmd', 'local-command',
  216. 'test-vm', 'command'],
  217. app=self.app)
  218. self.assertEqual(ret, 0)
  219. self.assertEqual(self.app.service_calls, [
  220. ('test-vm', 'qubes.VMShell', {
  221. 'localcmd': 'local-command',
  222. 'stdout': None,
  223. 'stderr': None,
  224. 'user': None,
  225. }),
  226. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  227. ])
  228. self.assertAllCalled()
  229. def test_006_run_single_with_gui(self):
  230. self.app.expected_calls[
  231. ('dom0', 'admin.vm.List', None, None)] = \
  232. b'0\x00test-vm class=AppVM state=Running\n'
  233. self.app.expected_calls[
  234. ('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
  235. b'0\x00default=yes type=str user'
  236. # self.app.expected_calls[
  237. # ('test-vm', 'admin.vm.List', None, None)] = \
  238. # b'0\x00test-vm class=AppVM state=Running\n'
  239. ret = qubesadmin.tools.qvm_run.main(
  240. ['test-vm', 'command'],
  241. app=self.app)
  242. self.assertEqual(ret, 0)
  243. # make sure we have the same instance below
  244. self.assertEqual(self.app.service_calls, [
  245. ('test-vm', 'qubes.WaitForSession', {
  246. 'stdout': subprocess.DEVNULL,
  247. 'stderr': subprocess.DEVNULL,
  248. }),
  249. ('test-vm', 'qubes.WaitForSession', b'user'),
  250. ('test-vm', 'qubes.VMShell', {
  251. 'localcmd': None,
  252. 'stdout': subprocess.DEVNULL,
  253. 'stderr': subprocess.DEVNULL,
  254. 'user': None,
  255. }),
  256. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  257. ])
  258. self.assertAllCalled()
  259. def test_007_run_service_with_gui(self):
  260. self.app.expected_calls[
  261. ('dom0', 'admin.vm.List', None, None)] = \
  262. b'0\x00test-vm class=AppVM state=Running\n'
  263. self.app.expected_calls[
  264. ('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
  265. b'0\x00default=yes type=str user'
  266. # self.app.expected_calls[
  267. # ('test-vm', 'admin.vm.List', None, None)] = \
  268. # b'0\x00test-vm class=AppVM state=Running\n'
  269. ret = qubesadmin.tools.qvm_run.main(
  270. ['--service', 'test-vm', 'service.name'],
  271. app=self.app)
  272. self.assertEqual(ret, 0)
  273. # make sure we have the same instance below
  274. self.assertEqual(self.app.service_calls, [
  275. ('test-vm', 'qubes.WaitForSession', {
  276. 'stdout': subprocess.DEVNULL,
  277. 'stderr': subprocess.DEVNULL,
  278. }),
  279. ('test-vm', 'qubes.WaitForSession', b'user'),
  280. ('test-vm', 'service.name', {
  281. 'localcmd': None,
  282. 'stdout': subprocess.DEVNULL,
  283. 'stderr': subprocess.DEVNULL,
  284. 'user': None,
  285. }),
  286. ('test-vm', 'service.name', b''),
  287. ])
  288. self.assertAllCalled()
  289. def test_008_dispvm_remote(self):
  290. ret = qubesadmin.tools.qvm_run.main(
  291. ['--dispvm', '--service', 'test.service'], app=self.app)
  292. self.assertEqual(ret, 0)
  293. self.assertEqual(self.app.service_calls, [
  294. ('$dispvm', 'test.service', {
  295. 'localcmd': None,
  296. 'stdout': subprocess.DEVNULL,
  297. 'stderr': subprocess.DEVNULL,
  298. 'user': None,
  299. }),
  300. ('$dispvm', 'test.service', b''),
  301. ])
  302. self.assertAllCalled()
  303. def test_009_dispvm_remote_specific(self):
  304. ret = qubesadmin.tools.qvm_run.main(
  305. ['--dispvm=test-vm', '--service', 'test.service'], app=self.app)
  306. self.assertEqual(ret, 0)
  307. self.assertEqual(self.app.service_calls, [
  308. ('$dispvm:test-vm', 'test.service', {
  309. 'localcmd': None,
  310. 'stdout': subprocess.DEVNULL,
  311. 'stderr': subprocess.DEVNULL,
  312. 'user': None,
  313. }),
  314. ('$dispvm:test-vm', 'test.service', b''),
  315. ])
  316. self.assertAllCalled()
  317. def test_010_dispvm_local(self):
  318. self.app.qubesd_connection_type = 'socket'
  319. self.app.expected_calls[
  320. ('dom0', 'admin.vm.CreateDisposable', None, None)] = \
  321. b'0\0disp123'
  322. self.app.expected_calls[('disp123', 'admin.vm.Kill', None, None)] = \
  323. b'0\0'
  324. ret = qubesadmin.tools.qvm_run.main(
  325. ['--dispvm', '--service', 'test.service'], app=self.app)
  326. self.assertEqual(ret, 0)
  327. self.assertEqual(self.app.service_calls, [
  328. ('disp123', 'test.service', {
  329. 'localcmd': None,
  330. 'stdout': subprocess.DEVNULL,
  331. 'stderr': subprocess.DEVNULL,
  332. 'user': None,
  333. }),
  334. ('disp123', 'test.service', b''),
  335. ])
  336. self.assertAllCalled()
  337. def test_011_dispvm_local_specific(self):
  338. self.app.qubesd_connection_type = 'socket'
  339. self.app.expected_calls[
  340. ('test-vm', 'admin.vm.CreateDisposable', None, None)] = \
  341. b'0\0disp123'
  342. self.app.expected_calls[('disp123', 'admin.vm.Kill', None, None)] = \
  343. b'0\0'
  344. ret = qubesadmin.tools.qvm_run.main(
  345. ['--dispvm=test-vm', '--service', 'test.service'], app=self.app)
  346. self.assertEqual(ret, 0)
  347. self.assertEqual(self.app.service_calls, [
  348. ('disp123', 'test.service', {
  349. 'localcmd': None,
  350. 'stdout': subprocess.DEVNULL,
  351. 'stderr': subprocess.DEVNULL,
  352. 'user': None,
  353. }),
  354. ('disp123', 'test.service', b''),
  355. ])
  356. self.assertAllCalled()
  357. def test_012_exclude(self):
  358. self.app.expected_calls[
  359. ('dom0', 'admin.vm.List', None, None)] = \
  360. b'0\x00test-vm class=AppVM state=Running\n' \
  361. b'test-vm2 class=AppVM state=Running\n' \
  362. b'test-vm3 class=AppVM state=Halted\n'
  363. self.app.expected_calls[
  364. ('test-vm', 'admin.vm.List', None, None)] = \
  365. b'0\x00test-vm class=AppVM state=Running\n'
  366. self.app.expected_calls[
  367. ('test-vm3', 'admin.vm.List', None, None)] = \
  368. b'0\x00test-vm3 class=AppVM state=Halted\n'
  369. ret = qubesadmin.tools.qvm_run.main(
  370. ['--no-gui', '--all', '--exclude', 'test-vm2', 'command'],
  371. app=self.app)
  372. self.assertEqual(ret, 0)
  373. self.assertEqual(self.app.service_calls, [
  374. ('test-vm', 'qubes.VMShell', {
  375. 'localcmd': None,
  376. 'stdout': subprocess.DEVNULL,
  377. 'stderr': subprocess.DEVNULL,
  378. 'user': None,
  379. }),
  380. ('test-vm', 'qubes.VMShell', b'command; exit\n'),
  381. ])
  382. self.assertAllCalled()