qvm_run.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import unittest.mock
  22. import subprocess
  23. import sys
  24. import qubesadmin.tests
  25. import qubesadmin.tools.qvm_run
  26. class TC_00_qvm_run(qubesadmin.tests.QubesTestCase):
  27. def setUp(self):
  28. if sys.stdout is not sys.__stdout__ or \
  29. sys.stderr is not sys.__stderr__:
  30. self.skipTest('qvm-run change behavior on redirected stdout/stderr')
  31. super(TC_00_qvm_run, self).setUp()
  32. def test_000_run_single(self):
  33. self.app.expected_calls[
  34. ('dom0', 'mgmt.vm.List', None, None)] = \
  35. b'0\x00test-vm class=AppVM state=Running\n'
  36. # self.app.expected_calls[
  37. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  38. # b'0\x00test-vm class=AppVM state=Running\n'
  39. ret = qubesadmin.tools.qvm_run.main(['test-vm', 'command'], app=self.app)
  40. self.assertEqual(ret, 0)
  41. # make sure we have the same instance below
  42. null = self.app.service_calls[0][2]['stdout']
  43. self.assertIsInstance(null, io.BufferedWriter)
  44. self.assertEqual(self.app.service_calls, [
  45. ('test-vm', 'qubes.VMShell', {
  46. 'filter_esc': True,
  47. 'localcmd': None,
  48. 'stdout': null,
  49. 'stderr': null,
  50. 'user': None,
  51. }),
  52. ('test-vm', 'qubes.VMShell', b'command\n')
  53. ])
  54. self.assertAllCalled()
  55. def test_001_run_multiple(self):
  56. self.app.expected_calls[
  57. ('dom0', 'mgmt.vm.List', None, None)] = \
  58. b'0\x00test-vm class=AppVM state=Running\n' \
  59. b'test-vm2 class=AppVM state=Running\n'
  60. # self.app.expected_calls[
  61. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  62. # b'0\x00test-vm class=AppVM state=Running\n'
  63. ret = qubesadmin.tools.qvm_run.main(['test-vm', 'test-vm2', 'command'],
  64. app=self.app)
  65. self.assertEqual(ret, 0)
  66. for i in range(0, len(self.app.service_calls), 2):
  67. self.assertIsInstance(self.app.service_calls[i][2]['stdout'],
  68. io.BufferedWriter)
  69. self.assertIsInstance(self.app.service_calls[i][2]['stderr'],
  70. io.BufferedWriter)
  71. # make sure we have the same instance below
  72. null = self.app.service_calls[0][2]['stdout']
  73. null2 = self.app.service_calls[2][2]['stdout']
  74. self.assertEqual(self.app.service_calls, [
  75. ('test-vm', 'qubes.VMShell', {
  76. 'filter_esc': True,
  77. 'localcmd': None,
  78. 'stdout': null,
  79. 'stderr': null,
  80. 'user': None,
  81. }),
  82. ('test-vm', 'qubes.VMShell', b'command\n'),
  83. ('test-vm2', 'qubes.VMShell', {
  84. 'filter_esc': True,
  85. 'localcmd': None,
  86. 'stdout': null2,
  87. 'stderr': null2,
  88. 'user': None,
  89. }),
  90. ('test-vm2', 'qubes.VMShell', b'command\n')
  91. ])
  92. self.assertAllCalled()
  93. def test_002_passio(self):
  94. self.app.expected_calls[
  95. ('dom0', 'mgmt.vm.List', None, None)] = \
  96. b'0\x00test-vm class=AppVM state=Running\n'
  97. # self.app.expected_calls[
  98. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  99. # b'0\x00test-vm class=AppVM state=Running\n'
  100. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  101. with unittest.mock.patch('sys.stdin', echo.stdout):
  102. ret = qubesadmin.tools.qvm_run.main(
  103. ['--pass-io', 'test-vm', 'command'],
  104. app=self.app)
  105. self.assertEqual(ret, 0)
  106. self.assertEqual(self.app.service_calls, [
  107. ('test-vm', 'qubes.VMShell', {
  108. 'filter_esc': True,
  109. 'localcmd': None,
  110. 'stdout': None,
  111. 'stderr': None,
  112. 'user': None,
  113. }),
  114. ('test-vm', 'qubes.VMShell', b'command\nsome-data\n')
  115. ])
  116. self.assertAllCalled()
  117. def test_002_color_output(self):
  118. self.app.expected_calls[
  119. ('dom0', 'mgmt.vm.List', None, None)] = \
  120. b'0\x00test-vm class=AppVM state=Running\n'
  121. # self.app.expected_calls[
  122. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  123. # b'0\x00test-vm class=AppVM state=Running\n'
  124. stdout = io.StringIO()
  125. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  126. with unittest.mock.patch('sys.stdin', echo.stdout):
  127. with unittest.mock.patch('sys.stdout', stdout):
  128. ret = qubesadmin.tools.qvm_run.main(
  129. ['--pass-io', 'test-vm', 'command'],
  130. app=self.app)
  131. self.assertEqual(ret, 0)
  132. self.assertEqual(self.app.service_calls, [
  133. ('test-vm', 'qubes.VMShell', {
  134. 'filter_esc': True,
  135. 'localcmd': None,
  136. 'stdout': None,
  137. 'stderr': None,
  138. 'user': None,
  139. }),
  140. ('test-vm', 'qubes.VMShell', b'command\nsome-data\n')
  141. ])
  142. self.assertEqual(stdout.getvalue(), '\033[0;31m\033[0m')
  143. stdout.close()
  144. self.assertAllCalled()
  145. def test_003_no_color_output(self):
  146. self.app.expected_calls[
  147. ('dom0', 'mgmt.vm.List', None, None)] = \
  148. b'0\x00test-vm class=AppVM state=Running\n'
  149. # self.app.expected_calls[
  150. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  151. # b'0\x00test-vm class=AppVM state=Running\n'
  152. stdout = io.StringIO()
  153. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  154. with unittest.mock.patch('sys.stdin', echo.stdout):
  155. with unittest.mock.patch('sys.stdout', stdout):
  156. ret = qubesadmin.tools.qvm_run.main(
  157. ['--pass-io', '--no-color-output', 'test-vm', 'command'],
  158. app=self.app)
  159. self.assertEqual(ret, 0)
  160. self.assertEqual(self.app.service_calls, [
  161. ('test-vm', 'qubes.VMShell', {
  162. 'filter_esc': True,
  163. 'localcmd': None,
  164. 'stdout': None,
  165. 'stderr': None,
  166. 'user': None,
  167. }),
  168. ('test-vm', 'qubes.VMShell', b'command\nsome-data\n')
  169. ])
  170. self.assertEqual(stdout.getvalue(), '')
  171. stdout.close()
  172. self.assertAllCalled()
  173. def test_004_no_filter_esc(self):
  174. self.app.expected_calls[
  175. ('dom0', 'mgmt.vm.List', None, None)] = \
  176. b'0\x00test-vm class=AppVM state=Running\n'
  177. # self.app.expected_calls[
  178. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  179. # b'0\x00test-vm class=AppVM state=Running\n'
  180. stdout = io.StringIO()
  181. echo = subprocess.Popen(['echo', 'some-data'], stdout=subprocess.PIPE)
  182. with unittest.mock.patch('sys.stdin', echo.stdout):
  183. with unittest.mock.patch('sys.stdout', stdout):
  184. ret = qubesadmin.tools.qvm_run.main(
  185. ['--pass-io', '--no-filter-esc', 'test-vm', 'command'],
  186. app=self.app)
  187. self.assertEqual(ret, 0)
  188. self.assertEqual(self.app.service_calls, [
  189. ('test-vm', 'qubes.VMShell', {
  190. 'filter_esc': False,
  191. 'localcmd': None,
  192. 'stdout': None,
  193. 'stderr': None,
  194. 'user': None,
  195. }),
  196. ('test-vm', 'qubes.VMShell', b'command\nsome-data\n')
  197. ])
  198. self.assertEqual(stdout.getvalue(), '')
  199. stdout.close()
  200. self.assertAllCalled()
  201. def test_005_localcmd(self):
  202. self.app.expected_calls[
  203. ('dom0', 'mgmt.vm.List', None, None)] = \
  204. b'0\x00test-vm class=AppVM state=Running\n'
  205. # self.app.expected_calls[
  206. # ('test-vm', 'mgmt.vm.List', None, None)] = \
  207. # b'0\x00test-vm class=AppVM state=Running\n'
  208. ret = qubesadmin.tools.qvm_run.main(
  209. ['--pass-io', '--localcmd', 'local-command',
  210. 'test-vm', 'command'],
  211. app=self.app)
  212. self.assertEqual(ret, 0)
  213. self.assertEqual(self.app.service_calls, [
  214. ('test-vm', 'qubes.VMShell', {
  215. 'filter_esc': True,
  216. 'localcmd': 'local-command',
  217. 'stdout': None,
  218. 'stderr': None,
  219. 'user': None,
  220. }),
  221. ('test-vm', 'qubes.VMShell', b'command\n')
  222. ])
  223. self.assertAllCalled()