app.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import shutil
  22. import socket
  23. import subprocess
  24. import unittest
  25. import multiprocessing
  26. try:
  27. import unittest.mock as mock
  28. except ImportError:
  29. import mock
  30. import tempfile
  31. import qubesadmin.tests
  32. class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
  33. def test_000_list(self):
  34. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  35. b'0\x00test-vm class=AppVM state=Running\n'
  36. self.assertEqual(
  37. list(self.app.domains.keys()),
  38. ['test-vm'])
  39. self.assertAllCalled()
  40. def test_001_getitem(self):
  41. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  42. b'0\x00test-vm class=AppVM state=Running\n'
  43. try:
  44. vm = self.app.domains['test-vm']
  45. self.assertEqual(vm.name, 'test-vm')
  46. except KeyError:
  47. self.fail('VM not found in collection')
  48. self.assertAllCalled()
  49. with self.assertRaises(KeyError):
  50. vm = self.app.domains['test-non-existent']
  51. self.assertAllCalled()
  52. def test_002_in(self):
  53. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  54. b'0\x00test-vm class=AppVM state=Running\n'
  55. self.assertIn('test-vm', self.app.domains)
  56. self.assertAllCalled()
  57. self.assertNotIn('test-non-existent', self.app.domains)
  58. self.assertAllCalled()
  59. def test_003_iter(self):
  60. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  61. b'0\x00test-vm class=AppVM state=Running\n'
  62. self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
  63. self.assertAllCalled()
  64. def test_004_delitem(self):
  65. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  66. b'0\x00'
  67. del self.app.domains['test-vm']
  68. self.assertAllCalled()
  69. class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
  70. def test_010_new_simple(self):
  71. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
  72. b'name=new-vm label=red')] = b'0\x00'
  73. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  74. b'0\x00new-vm class=AppVM state=Running\n'
  75. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
  76. self.assertEqual(vm.name, 'new-vm')
  77. self.assertEqual(vm.__class__.__name__, 'AppVM')
  78. self.assertAllCalled()
  79. def test_011_new_template(self):
  80. self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
  81. b'name=new-template label=red')] = b'0\x00'
  82. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  83. b'0\x00new-template class=TemplateVM state=Running\n'
  84. vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
  85. self.assertEqual(vm.name, 'new-template')
  86. self.assertEqual(vm.__class__.__name__, 'TemplateVM')
  87. self.assertAllCalled()
  88. def test_012_new_template_based(self):
  89. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  90. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  91. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  92. b'0\x00new-vm class=AppVM state=Running\n'
  93. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
  94. self.assertEqual(vm.name, 'new-vm')
  95. self.assertEqual(vm.__class__.__name__, 'AppVM')
  96. self.assertAllCalled()
  97. def test_013_new_objects_params(self):
  98. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  99. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  100. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  101. b'0\x00red\nblue\n'
  102. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  103. b'0\x00new-vm class=AppVM state=Running\n' \
  104. b'some-template class=TemplateVM state=Running\n'
  105. vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
  106. self.app.get_label('red'), self.app.domains['some-template'])
  107. self.assertEqual(vm.name, 'new-vm')
  108. self.assertEqual(vm.__class__.__name__, 'AppVM')
  109. self.assertAllCalled()
  110. def test_014_new_pool(self):
  111. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  112. b'name=new-vm label=red pool=some-pool')] = b'0\x00'
  113. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  114. b'0\x00new-vm class=AppVM state=Running\n'
  115. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
  116. self.assertEqual(vm.name, 'new-vm')
  117. self.assertEqual(vm.__class__.__name__, 'AppVM')
  118. self.assertAllCalled()
  119. def test_015_new_pools(self):
  120. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  121. b'name=new-vm label=red pool:private=some-pool '
  122. b'pool:volatile=other-pool')] = b'0\x00'
  123. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  124. b'0\x00new-vm class=AppVM state=Running\n'
  125. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  126. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  127. self.assertEqual(vm.name, 'new-vm')
  128. self.assertEqual(vm.__class__.__name__, 'AppVM')
  129. self.assertAllCalled()
  130. def test_020_get_label(self):
  131. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  132. b'0\x00red\nblue\n'
  133. label = self.app.get_label('red')
  134. self.assertEqual(label.name, 'red')
  135. self.assertAllCalled()
  136. def test_030_clone(self):
  137. self.app.expected_calls[('test-vm', 'admin.vm.Clone', None,
  138. b'name=new-name')] = b'0\x00'
  139. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  140. b'0\x00new-name class=AppVM state=Halted\n' \
  141. b'test-vm class=AppVM state=Halted\n'
  142. new_vm = self.app.clone_vm('test-vm', 'new-name')
  143. self.assertEqual(new_vm.name, 'new-name')
  144. self.assertAllCalled()
  145. def test_031_clone_object(self):
  146. self.app.expected_calls[('test-vm', 'admin.vm.Clone', None,
  147. b'name=new-name')] = b'0\x00'
  148. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  149. b'0\x00new-name class=AppVM state=Halted\n' \
  150. b'test-vm class=AppVM state=Halted\n'
  151. new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
  152. self.assertEqual(new_vm.name, 'new-name')
  153. self.assertAllCalled()
  154. def test_032_clone_pool(self):
  155. self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None,
  156. b'name=new-name pool=some-pool')] = b'0\x00'
  157. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  158. b'0\x00new-name class=AppVM state=Halted\n' \
  159. b'test-vm class=AppVM state=Halted\n'
  160. new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
  161. self.assertEqual(new_vm.name, 'new-name')
  162. self.assertAllCalled()
  163. def test_033_clone_pools(self):
  164. self.app.expected_calls[('test-vm', 'admin.vm.CloneInPool', None,
  165. b'name=new-name pool:private=some-pool '
  166. b'pool:volatile=other-pool')] = b'0\x00'
  167. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  168. b'0\x00new-name class=AppVM state=Halted\n' \
  169. b'test-vm class=AppVM state=Halted\n'
  170. new_vm = self.app.clone_vm('test-vm', 'new-name',
  171. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  172. self.assertEqual(new_vm.name, 'new-name')
  173. self.assertAllCalled()
  174. class TC_20_QubesLocal(unittest.TestCase):
  175. def setUp(self):
  176. super(TC_20_QubesLocal, self).setUp()
  177. self.socket_dir = tempfile.mkdtemp()
  178. self.orig_sock = qubesadmin.config.QUBESD_SOCKET
  179. qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
  180. self.proc = None
  181. self.app = qubesadmin.app.QubesLocal()
  182. def listen_and_send(self, send_data):
  183. '''Listen on socket and send data in response.
  184. :param bytes send_data: data to send
  185. '''
  186. self.socket_pipe, child_pipe = multiprocessing.Pipe()
  187. self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  188. self.socket.bind(os.path.join(self.socket_dir, 'sock'))
  189. self.socket.listen(1)
  190. def worker(sock, pipe, send_data_):
  191. conn, addr = sock.accept()
  192. pipe.send(conn.makefile('rb').read())
  193. conn.sendall(send_data_)
  194. conn.close()
  195. self.proc = multiprocessing.Process(target=worker,
  196. args=(self.socket, child_pipe, send_data))
  197. self.proc.start()
  198. self.socket.close()
  199. def get_request(self):
  200. '''Get request sent to "qubesd" mock'''
  201. return self.socket_pipe.recv()
  202. def tearDown(self):
  203. qubesadmin.config.QUBESD_SOCKET = self.orig_sock
  204. if self.proc is not None:
  205. try:
  206. self.proc.terminate()
  207. except OSError:
  208. pass
  209. shutil.rmtree(self.socket_dir)
  210. super(TC_20_QubesLocal, self).tearDown()
  211. def test_000_qubesd_call(self):
  212. self.listen_and_send(b'0\0')
  213. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  214. self.assertEqual(self.get_request(),
  215. b'dom0\0some.method\0test-vm\0arg1\0payload')
  216. def test_001_qubesd_call_none_arg(self):
  217. self.listen_and_send(b'0\0')
  218. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  219. self.assertEqual(self.get_request(),
  220. b'dom0\0some.method\0test-vm\0\0payload')
  221. def test_002_qubesd_call_none_payload(self):
  222. self.listen_and_send(b'0\0')
  223. self.app.qubesd_call('test-vm', 'some.method', None, None)
  224. self.assertEqual(self.get_request(),
  225. b'dom0\0some.method\0test-vm\0\0')
  226. def test_003_qubesd_call_payload_stream(self):
  227. # this should really be in setUp()...
  228. tmpdir = tempfile.mkdtemp()
  229. self.addCleanup(shutil.rmtree, tmpdir)
  230. service_path = os.path.join(tmpdir, 'test.service')
  231. payload_input = os.path.join(tmpdir, 'payload-input')
  232. with open(service_path, 'w') as f:
  233. f.write('#!/bin/bash\n'
  234. 'env > {dir}/env\n'
  235. 'echo "$@" > {dir}/args\n'
  236. 'cat > {dir}/payload\n'
  237. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  238. os.chmod(service_path, 0o755)
  239. with open(payload_input, 'w+') as payload_file:
  240. payload_file.write('some payload\n')
  241. payload_file.seek(0)
  242. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  243. tmpdir):
  244. value = self.app.qubesd_call('test-vm', 'test.service',
  245. 'some-arg', payload_stream=payload_file)
  246. self.assertEqual(value, b'return-value')
  247. self.assertTrue(os.path.exists(tmpdir + '/env'))
  248. with open(tmpdir + '/env') as env:
  249. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  250. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  251. self.assertTrue(os.path.exists(tmpdir + '/args'))
  252. with open(tmpdir + '/args') as args:
  253. self.assertEqual(args.read(), 'some-arg\n')
  254. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  255. with open(tmpdir + '/payload') as payload:
  256. self.assertEqual(payload.read(), 'some payload\n')
  257. def test_004_qubesd_call_payload_stream_proc(self):
  258. # this should really be in setUp()...
  259. tmpdir = tempfile.mkdtemp()
  260. self.addCleanup(shutil.rmtree, tmpdir)
  261. service_path = os.path.join(tmpdir, 'test.service')
  262. echo = subprocess.Popen(['echo', 'some payload'],
  263. stdout=subprocess.PIPE)
  264. with open(service_path, 'w') as f:
  265. f.write('#!/bin/bash\n'
  266. 'env > {dir}/env\n'
  267. 'echo "$@" > {dir}/args\n'
  268. 'cat > {dir}/payload\n'
  269. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  270. os.chmod(service_path, 0o755)
  271. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  272. tmpdir):
  273. value = self.app.qubesd_call('test-vm', 'test.service',
  274. 'some-arg', payload_stream=echo.stdout)
  275. echo.stdout.close()
  276. self.assertEqual(value, b'return-value')
  277. self.assertTrue(os.path.exists(tmpdir + '/env'))
  278. with open(tmpdir + '/env') as env:
  279. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  280. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  281. self.assertTrue(os.path.exists(tmpdir + '/args'))
  282. with open(tmpdir + '/args') as args:
  283. self.assertEqual(args.read(), 'some-arg\n')
  284. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  285. with open(tmpdir + '/payload') as payload:
  286. self.assertEqual(payload.read(), 'some payload\n')
  287. def test_010_run_service(self):
  288. self.listen_and_send(b'0\0')
  289. with mock.patch('subprocess.Popen') as mock_proc:
  290. p = self.app.run_service('some-vm', 'service.name')
  291. mock_proc.assert_called_once_with([
  292. qubesadmin.config.QREXEC_CLIENT,
  293. '-d', 'some-vm', 'DEFAULT:QUBESRPC service.name dom0'],
  294. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  295. stderr=subprocess.PIPE)
  296. self.assertEqual(self.get_request(),
  297. b'dom0\0admin.vm.Start\0some-vm\0\0')
  298. def test_011_run_service_filter_esc(self):
  299. self.listen_and_send(b'0\0')
  300. with mock.patch('subprocess.Popen') as mock_proc:
  301. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  302. mock_proc.assert_called_once_with([
  303. qubesadmin.config.QREXEC_CLIENT,
  304. '-d', 'some-vm', '-t', '-T',
  305. 'DEFAULT:QUBESRPC service.name dom0'],
  306. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  307. stderr=subprocess.PIPE)
  308. self.assertEqual(self.get_request(),
  309. b'dom0\0admin.vm.Start\0some-vm\0\0')
  310. def test_012_run_service_user(self):
  311. self.listen_and_send(b'0\0')
  312. with mock.patch('subprocess.Popen') as mock_proc:
  313. p = self.app.run_service('some-vm', 'service.name', user='user')
  314. mock_proc.assert_called_once_with([
  315. qubesadmin.config.QREXEC_CLIENT,
  316. '-d', 'some-vm',
  317. 'user:QUBESRPC service.name dom0'],
  318. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  319. stderr=subprocess.PIPE)
  320. self.assertEqual(self.get_request(),
  321. b'dom0\0admin.vm.Start\0some-vm\0\0')
  322. def test_013_run_service_default_target(self):
  323. with self.assertRaises(ValueError):
  324. self.app.run_service('', 'service.name')
  325. class TC_30_QubesRemote(unittest.TestCase):
  326. def setUp(self):
  327. super(TC_30_QubesRemote, self).setUp()
  328. self.proc_mock = mock.Mock()
  329. self.proc_mock.configure_mock(**{
  330. 'return_value.returncode': 0
  331. })
  332. self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
  333. self.proc_patch.start()
  334. self.app = qubesadmin.app.QubesRemote()
  335. def set_proc_stdout(self, send_data):
  336. self.proc_mock.configure_mock(**{
  337. 'return_value.communicate.return_value': (send_data, None)
  338. })
  339. def tearDown(self):
  340. self.proc_patch.stop()
  341. super(TC_30_QubesRemote, self).tearDown()
  342. def test_000_qubesd_call(self):
  343. self.set_proc_stdout(b'0\0')
  344. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  345. self.assertEqual(self.proc_mock.mock_calls, [
  346. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  347. 'some.method+arg1'],
  348. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  349. stderr=subprocess.PIPE),
  350. mock.call().communicate(b'payload')
  351. ])
  352. def test_001_qubesd_call_none_arg(self):
  353. self.set_proc_stdout(b'0\0')
  354. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  355. self.assertEqual(self.proc_mock.mock_calls, [
  356. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  357. 'some.method'],
  358. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  359. stderr=subprocess.PIPE),
  360. mock.call().communicate(b'payload')
  361. ])
  362. def test_002_qubesd_call_none_payload(self):
  363. self.set_proc_stdout(b'0\0')
  364. self.app.qubesd_call('test-vm', 'some.method', None, None)
  365. self.assertEqual(self.proc_mock.mock_calls, [
  366. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  367. 'some.method'],
  368. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  369. stderr=subprocess.PIPE),
  370. mock.call().communicate(None)
  371. ])
  372. def test_003_qubesd_call_payload_stream(self):
  373. self.set_proc_stdout(b'0\0return-value')
  374. tmpdir = tempfile.mkdtemp()
  375. self.addCleanup(shutil.rmtree, tmpdir)
  376. payload_input = os.path.join(tmpdir, 'payload-input')
  377. with open(payload_input, 'w+') as payload_file:
  378. payload_file.write('some payload\n')
  379. payload_file.seek(0)
  380. value = self.app.qubesd_call('test-vm', 'some.method',
  381. 'some-arg', payload_stream=payload_file)
  382. self.assertEqual(self.proc_mock.mock_calls, [
  383. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  384. 'some.method+some-arg'],
  385. stdin=payload_file, stdout=subprocess.PIPE,
  386. stderr=subprocess.PIPE),
  387. mock.call().communicate(None)
  388. ])
  389. self.assertEqual(value, b'return-value')
  390. def test_010_run_service(self):
  391. self.app.run_service('some-vm', 'service.name')
  392. self.proc_mock.assert_called_once_with([
  393. qubesadmin.config.QREXEC_CLIENT_VM,
  394. 'some-vm', 'service.name'],
  395. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  396. stderr=subprocess.PIPE)
  397. def test_011_run_service_filter_esc(self):
  398. with self.assertRaises(NotImplementedError):
  399. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  400. def test_012_run_service_user(self):
  401. with self.assertRaises(ValueError):
  402. p = self.app.run_service('some-vm', 'service.name', user='user')
  403. def test_013_run_service_default_target(self):
  404. self.app.run_service('', 'service.name')
  405. self.proc_mock.assert_called_once_with([
  406. qubesadmin.config.QREXEC_CLIENT_VM,
  407. '', 'service.name'],
  408. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  409. stderr=subprocess.PIPE)