qvm_run.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import os
  22. import unittest.mock
  23. import subprocess
  24. import sys
  25. import qubesadmin.tests
  26. import qubesadmin.tools.qvm_run
  27. class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
  28. def setUp(self):
  29. if sys.stdout is not sys.__stdout__ or \
  30. sys.stderr is not sys.__stderr__:
  31. self.skipTest('qvm-run change behavior on redirected stdout/stderr')
  32. super(TC_00_qvm_run, self).setUp()
  33. def default_filter_esc(self):
  34. return os.isatty(sys.stdout.fileno())
  35. def test_000_run_single(self):
  36. self.app.expected_calls[
  37. ('dom0', 'admin.vm.List', None, None)] = \
  38. b'0\x00test-vm class=AppVM state=Running\n'
  39. # self.app.expected_calls[
  40. # ('test-vm', 'admin.vm.List', None, None)] = \
  41. # b'0\x00test-vm class=AppVM state=Running\n'
  42. ret = qubesadmin.tools.qvm_run.main(
  43. ['--no-gui', 'test-vm', 'command'],
  44. app=self.app)
  45. self.assertEqual(ret, 0)
  46. self.assertEqual(self.app.service_calls, [
  47. ('test-vm', 'qubes.VMShell', {
  48. 'filter_esc': self.default_filter_esc(),
  49. 'localcmd': None,
  50. 'stdout': subprocess.DEVNULL,
  51. 'stderr': subprocess.DEVNULL,
  52. 'user': None,
  53. }),
  54. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  55. ])
  56. self.assertAllCalled()
  57. def test_001_run_multiple(self):
  58. self.app.expected_calls[
  59. ('dom0', 'admin.vm.List', None, None)] = \
  60. b'0\x00test-vm class=AppVM state=Running\n' \
  61. b'test-vm2 class=AppVM state=Running\n' \
  62. b'test-vm3 class=AppVM state=Halted\n'
  63. self.app.expected_calls[
  64. ('test-vm', 'admin.vm.List', None, None)] = \
  65. b'0\x00test-vm class=AppVM state=Running\n'
  66. self.app.expected_calls[
  67. ('test-vm2', 'admin.vm.List', None, None)] = \
  68. b'0\x00test-vm2 class=AppVM state=Running\n'
  69. self.app.expected_calls[
  70. ('test-vm3', 'admin.vm.List', None, None)] = \
  71. b'0\x00test-vm3 class=AppVM state=Halted\n'
  72. ret = qubesadmin.tools.qvm_run.main(
  73. ['--no-gui', '--all', 'command'],
  74. app=self.app)
  75. self.assertEqual(ret, 0)
  76. self.assertEqual(self.app.service_calls, [
  77. ('test-vm', 'qubes.VMShell', {
  78. 'filter_esc': self.default_filter_esc(),
  79. 'localcmd': None,
  80. 'stdout': subprocess.DEVNULL,
  81. 'stderr': subprocess.DEVNULL,
  82. 'user': None,
  83. }),
  84. ('test-vm', 'qubes.VMShell', b'command; exit\n'),
  85. ('test-vm2', 'qubes.VMShell', {
  86. 'filter_esc': self.default_filter_esc(),
  87. 'localcmd': None,
  88. 'stdout': subprocess.DEVNULL,
  89. 'stderr': subprocess.DEVNULL,
  90. 'user': None,
  91. }),
  92. ('test-vm2', 'qubes.VMShell', b'command; exit\n')
  93. ])
  94. self.assertAllCalled()
  95. @unittest.expectedFailure
  96. def test_002_passio(self):
  97. self.app.expected_calls[
  98. ('dom0', 'admin.vm.List', None, None)] = \
  99. b'0\x00test-vm class=AppVM state=Running\n'
  100. # self.app.expected_calls[
  101. # ('test-vm', 'admin.vm.List', None, None)] = \
  102. # b'0\x00test-vm class=AppVM state=Running\n'
  103. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  104. with unittest.mock.patch('sys.stdin', echo.stdout):
  105. ret = qubesadmin.tools.qvm_run.main(
  106. ['--no-gui', '--pass-io', 'test-vm', 'command'],
  107. app=self.app)
  108. self.assertEqual(ret, 0)
  109. self.assertEqual(self.app.service_calls, [
  110. ('test-vm', 'qubes.VMShell', {
  111. 'filter_esc': self.default_filter_esc(),
  112. 'localcmd': None,
  113. 'stdout': None,
  114. 'stderr': None,
  115. 'user': None,
  116. }),
  117. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  118. ])
  119. self.assertAllCalled()
  120. @unittest.expectedFailure
  121. def test_002_color_output(self):
  122. self.app.expected_calls[
  123. ('dom0', 'admin.vm.List', None, None)] = \
  124. b'0\x00test-vm class=AppVM state=Running\n'
  125. # self.app.expected_calls[
  126. # ('test-vm', 'admin.vm.List', None, None)] = \
  127. # b'0\x00test-vm class=AppVM state=Running\n'
  128. stdout = io.StringIO()
  129. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  130. with unittest.mock.patch('sys.stdin', echo.stdout):
  131. with unittest.mock.patch('sys.stdout', stdout):
  132. ret = qubesadmin.tools.qvm_run.main(
  133. ['--no-gui', '--filter-esc', '--pass-io', 'test-vm',
  134. 'command'],
  135. app=self.app)
  136. self.assertEqual(ret, 0)
  137. self.assertEqual(self.app.service_calls, [
  138. ('test-vm', 'qubes.VMShell', {
  139. 'filter_esc': True,
  140. 'localcmd': None,
  141. 'stdout': None,
  142. 'stderr': None,
  143. 'user': None,
  144. }),
  145. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  146. ])
  147. self.assertEqual(stdout.getvalue(), '\033[0;31m\033[0m')
  148. stdout.close()
  149. self.assertAllCalled()
  150. @unittest.expectedFailure
  151. def test_003_no_color_output(self):
  152. self.app.expected_calls[
  153. ('dom0', 'admin.vm.List', None, None)] = \
  154. b'0\x00test-vm class=AppVM state=Running\n'
  155. # self.app.expected_calls[
  156. # ('test-vm', 'admin.vm.List', None, None)] = \
  157. # b'0\x00test-vm class=AppVM state=Running\n'
  158. stdout = io.StringIO()
  159. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  160. with unittest.mock.patch('sys.stdin', echo.stdout):
  161. with unittest.mock.patch('sys.stdout', stdout):
  162. ret = qubesadmin.tools.qvm_run.main(
  163. ['--no-gui', '--pass-io', '--no-color-output',
  164. 'test-vm', 'command'],
  165. app=self.app)
  166. self.assertEqual(ret, 0)
  167. self.assertEqual(self.app.service_calls, [
  168. ('test-vm', 'qubes.VMShell', {
  169. 'filter_esc': self.default_filter_esc(),
  170. 'localcmd': None,
  171. 'stdout': None,
  172. 'stderr': None,
  173. 'user': None,
  174. }),
  175. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  176. ])
  177. self.assertEqual(stdout.getvalue(), '')
  178. stdout.close()
  179. self.assertAllCalled()
  180. @unittest.expectedFailure
  181. def test_004_no_filter_esc(self):
  182. self.app.expected_calls[
  183. ('dom0', 'admin.vm.List', None, None)] = \
  184. b'0\x00test-vm class=AppVM state=Running\n'
  185. # self.app.expected_calls[
  186. # ('test-vm', 'admin.vm.List', None, None)] = \
  187. # b'0\x00test-vm class=AppVM state=Running\n'
  188. stdout = io.StringIO()
  189. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  190. with unittest.mock.patch('sys.stdin', echo.stdout):
  191. with unittest.mock.patch('sys.stdout', stdout):
  192. ret = qubesadmin.tools.qvm_run.main(
  193. ['--no-gui', '--pass-io', '--no-filter-esc',
  194. 'test-vm', 'command'],
  195. app=self.app)
  196. self.assertEqual(ret, 0)
  197. self.assertEqual(self.app.service_calls, [
  198. ('test-vm', 'qubes.VMShell', {
  199. 'filter_esc': False,
  200. 'localcmd': None,
  201. 'stdout': None,
  202. 'stderr': None,
  203. 'user': None,
  204. }),
  205. ('test-vm', 'qubes.VMShell', b'command; exit\nsome-data\n')
  206. ])
  207. self.assertEqual(stdout.getvalue(), '')
  208. stdout.close()
  209. self.assertAllCalled()
  210. def test_005_localcmd(self):
  211. self.app.expected_calls[
  212. ('dom0', 'admin.vm.List', None, None)] = \
  213. b'0\x00test-vm class=AppVM state=Running\n'
  214. # self.app.expected_calls[
  215. # ('test-vm', 'admin.vm.List', None, None)] = \
  216. # b'0\x00test-vm class=AppVM state=Running\n'
  217. ret = qubesadmin.tools.qvm_run.main(
  218. ['--no-gui', '--pass-io', '--localcmd', 'local-command',
  219. 'test-vm', 'command'],
  220. app=self.app)
  221. self.assertEqual(ret, 0)
  222. self.assertEqual(self.app.service_calls, [
  223. ('test-vm', 'qubes.VMShell', {
  224. 'filter_esc': self.default_filter_esc(),
  225. 'localcmd': 'local-command',
  226. 'stdout': None,
  227. 'stderr': None,
  228. 'user': None,
  229. }),
  230. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  231. ])
  232. self.assertAllCalled()
  233. def test_006_run_single_with_gui(self):
  234. self.app.expected_calls[
  235. ('dom0', 'admin.vm.List', None, None)] = \
  236. b'0\x00test-vm class=AppVM state=Running\n'
  237. self.app.expected_calls[
  238. ('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
  239. b'0\x00default=yes type=str user'
  240. # self.app.expected_calls[
  241. # ('test-vm', 'admin.vm.List', None, None)] = \
  242. # b'0\x00test-vm class=AppVM state=Running\n'
  243. ret = qubesadmin.tools.qvm_run.main(
  244. ['test-vm', 'command'],
  245. app=self.app)
  246. self.assertEqual(ret, 0)
  247. # make sure we have the same instance below
  248. self.assertEqual(self.app.service_calls, [
  249. ('test-vm', 'qubes.WaitForSession', {
  250. 'stdout': subprocess.DEVNULL,
  251. 'stderr': subprocess.DEVNULL,
  252. }),
  253. ('test-vm', 'qubes.WaitForSession', b'user'),
  254. ('test-vm', 'qubes.VMShell', {
  255. 'filter_esc': self.default_filter_esc(),
  256. 'localcmd': None,
  257. 'stdout': subprocess.DEVNULL,
  258. 'stderr': subprocess.DEVNULL,
  259. 'user': None,
  260. }),
  261. ('test-vm', 'qubes.VMShell', b'command; exit\n')
  262. ])
  263. self.assertAllCalled()
  264. def test_007_run_service_with_gui(self):
  265. self.app.expected_calls[
  266. ('dom0', 'admin.vm.List', None, None)] = \
  267. b'0\x00test-vm class=AppVM state=Running\n'
  268. self.app.expected_calls[
  269. ('test-vm', 'admin.vm.property.Get', 'default_user', None)] = \
  270. b'0\x00default=yes type=str user'
  271. # self.app.expected_calls[
  272. # ('test-vm', 'admin.vm.List', None, None)] = \
  273. # b'0\x00test-vm class=AppVM state=Running\n'
  274. ret = qubesadmin.tools.qvm_run.main(
  275. ['--service', 'test-vm', 'service.name'],
  276. app=self.app)
  277. self.assertEqual(ret, 0)
  278. # make sure we have the same instance below
  279. self.assertEqual(self.app.service_calls, [
  280. ('test-vm', 'qubes.WaitForSession', {
  281. 'stdout': subprocess.DEVNULL,
  282. 'stderr': subprocess.DEVNULL,
  283. }),
  284. ('test-vm', 'qubes.WaitForSession', b'user'),
  285. ('test-vm', 'service.name', {
  286. 'filter_esc': self.default_filter_esc(),
  287. 'localcmd': None,
  288. 'stdout': subprocess.DEVNULL,
  289. 'stderr': subprocess.DEVNULL,
  290. 'user': None,
  291. }),
  292. ('test-vm', 'service.name', b''),
  293. ])
  294. self.assertAllCalled()
  295. def test_008_dispvm_remote(self):
  296. ret = qubesadmin.tools.qvm_run.main(
  297. ['--dispvm', '--service', 'test.service'], app=self.app)
  298. self.assertEqual(ret, 0)
  299. self.assertEqual(self.app.service_calls, [
  300. ('$dispvm', 'test.service', {
  301. 'filter_esc': self.default_filter_esc(),
  302. 'localcmd': None,
  303. 'stdout': subprocess.DEVNULL,
  304. 'stderr': subprocess.DEVNULL,
  305. 'user': None,
  306. }),
  307. ('$dispvm', 'test.service', b''),
  308. ])
  309. self.assertAllCalled()
  310. def test_009_dispvm_remote_specific(self):
  311. ret = qubesadmin.tools.qvm_run.main(
  312. ['--dispvm=test-vm', '--service', 'test.service'], app=self.app)
  313. self.assertEqual(ret, 0)
  314. self.assertEqual(self.app.service_calls, [
  315. ('$dispvm:test-vm', 'test.service', {
  316. 'filter_esc': self.default_filter_esc(),
  317. 'localcmd': None,
  318. 'stdout': subprocess.DEVNULL,
  319. 'stderr': subprocess.DEVNULL,
  320. 'user': None,
  321. }),
  322. ('$dispvm:test-vm', 'test.service', b''),
  323. ])
  324. self.assertAllCalled()
  325. def test_010_dispvm_local(self):
  326. self.app.qubesd_connection_type = 'socket'
  327. self.app.expected_calls[
  328. ('dom0', 'admin.vm.CreateDisposable', None, None)] = \
  329. b'0\0disp123'
  330. self.app.expected_calls[('disp123', 'admin.vm.Kill', None, None)] = \
  331. b'0\0'
  332. ret = qubesadmin.tools.qvm_run.main(
  333. ['--dispvm', '--service', 'test.service'], app=self.app)
  334. self.assertEqual(ret, 0)
  335. self.assertEqual(self.app.service_calls, [
  336. ('disp123', 'test.service', {
  337. 'filter_esc': self.default_filter_esc(),
  338. 'localcmd': None,
  339. 'stdout': subprocess.DEVNULL,
  340. 'stderr': subprocess.DEVNULL,
  341. 'user': None,
  342. }),
  343. ('disp123', 'test.service', b''),
  344. ])
  345. self.assertAllCalled()
  346. def test_011_dispvm_local_specific(self):
  347. self.app.qubesd_connection_type = 'socket'
  348. self.app.expected_calls[
  349. ('test-vm', 'admin.vm.CreateDisposable', None, None)] = \
  350. b'0\0disp123'
  351. self.app.expected_calls[('disp123', 'admin.vm.Kill', None, None)] = \
  352. b'0\0'
  353. ret = qubesadmin.tools.qvm_run.main(
  354. ['--dispvm=test-vm', '--service', 'test.service'], app=self.app)
  355. self.assertEqual(ret, 0)
  356. self.assertEqual(self.app.service_calls, [
  357. ('disp123', 'test.service', {
  358. 'filter_esc': self.default_filter_esc(),
  359. 'localcmd': None,
  360. 'stdout': subprocess.DEVNULL,
  361. 'stderr': subprocess.DEVNULL,
  362. 'user': None,
  363. }),
  364. ('disp123', 'test.service', b''),
  365. ])
  366. self.assertAllCalled()
  367. def test_012_exclude(self):
  368. self.app.expected_calls[
  369. ('dom0', 'admin.vm.List', None, None)] = \
  370. b'0\x00test-vm class=AppVM state=Running\n' \
  371. b'test-vm2 class=AppVM state=Running\n' \
  372. b'test-vm3 class=AppVM state=Halted\n'
  373. self.app.expected_calls[
  374. ('test-vm', 'admin.vm.List', None, None)] = \
  375. b'0\x00test-vm class=AppVM state=Running\n'
  376. self.app.expected_calls[
  377. ('test-vm3', 'admin.vm.List', None, None)] = \
  378. b'0\x00test-vm3 class=AppVM state=Halted\n'
  379. ret = qubesadmin.tools.qvm_run.main(
  380. ['--no-gui', '--all', '--exclude', 'test-vm2', 'command'],
  381. app=self.app)
  382. self.assertEqual(ret, 0)
  383. self.assertEqual(self.app.service_calls, [
  384. ('test-vm', 'qubes.VMShell', {
  385. 'filter_esc': self.default_filter_esc(),
  386. 'localcmd': None,
  387. 'stdout': subprocess.DEVNULL,
  388. 'stderr': subprocess.DEVNULL,
  389. 'user': None,
  390. }),
  391. ('test-vm', 'qubes.VMShell', b'command; exit\n'),
  392. ])
  393. self.assertAllCalled()