cli.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. # -*- encoding: utf-8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import tempfile
  22. import unittest.mock
  23. import shutil
  24. import qubes.tests
  25. import qubespolicy
  26. import qubespolicy.cli
  27. import qubespolicy.tests
  28. class TC_00_qrexec_policy(qubes.tests.QubesTestCase):
  29. def setUp(self):
  30. super(TC_00_qrexec_policy, self).setUp()
  31. self.policy_patch = unittest.mock.patch('qubespolicy.Policy')
  32. self.policy_mock = self.policy_patch.start()
  33. self.system_info_patch = unittest.mock.patch(
  34. 'qubespolicy.get_system_info')
  35. self.system_info_mock = self.system_info_patch.start()
  36. self.system_info = {
  37. 'domains': {'dom0': {'icon': 'black', 'template_for_dispvms': False},
  38. 'test-vm1': {'icon': 'red', 'template_for_dispvms': False},
  39. 'test-vm2': {'icon': 'red', 'template_for_dispvms': False},
  40. 'test-vm3': {'icon': 'green', 'template_for_dispvms': True}, }}
  41. self.system_info_mock.return_value = self.system_info
  42. self.dbus_patch = unittest.mock.patch('pydbus.SystemBus')
  43. self.dbus_mock = self.dbus_patch.start()
  44. self.policy_dir = tempfile.TemporaryDirectory()
  45. self.policydir_patch = unittest.mock.patch('qubespolicy.POLICY_DIR',
  46. self.policy_dir.name)
  47. self.policydir_patch.start()
  48. def tearDown(self):
  49. self.policydir_patch.stop()
  50. self.policy_dir.cleanup()
  51. self.dbus_patch.start()
  52. self.system_info_patch.stop()
  53. self.policy_patch.stop()
  54. super(TC_00_qrexec_policy, self).tearDown()
  55. def test_000_allow(self):
  56. self.policy_mock.configure_mock(**{
  57. 'return_value.evaluate.return_value.action':
  58. qubespolicy.Action.allow,
  59. })
  60. retval = qubespolicy.cli.main(
  61. ['source-id', 'source', 'target', 'service', 'process_ident'])
  62. self.assertEqual(retval, 0)
  63. self.assertEqual(self.policy_mock.mock_calls, [
  64. ('', ('service',), {}),
  65. ('().evaluate', (self.system_info, 'source',
  66. 'target'), {}),
  67. ('().evaluate().target.__str__', (), {}),
  68. ('().evaluate().execute', ('process_ident,source,source-id', ), {}),
  69. ])
  70. self.assertEqual(self.dbus_mock.mock_calls, [])
  71. def test_010_ask_allow(self):
  72. self.policy_mock.configure_mock(**{
  73. 'return_value.evaluate.return_value.action':
  74. qubespolicy.Action.ask,
  75. 'return_value.evaluate.return_value.target':
  76. None,
  77. 'return_value.evaluate.return_value.targets_for_ask':
  78. ['test-vm1', 'test-vm2'],
  79. })
  80. self.dbus_mock.configure_mock(**{
  81. 'return_value.get.return_value.Ask.return_value': 'test-vm1'
  82. })
  83. retval = qubespolicy.cli.main(
  84. ['source-id', 'source', 'target', 'service', 'process_ident'])
  85. self.assertEqual(retval, 0)
  86. self.assertEqual(self.policy_mock.mock_calls, [
  87. ('', ('service',), {}),
  88. ('().evaluate', (self.system_info, 'source',
  89. 'target'), {}),
  90. ('().evaluate().handle_user_response', (True, 'test-vm1'), {}),
  91. ('().evaluate().execute', ('process_ident,source,source-id', ), {}),
  92. ])
  93. icons = {
  94. 'dom0': 'black',
  95. 'test-vm1': 'red',
  96. 'test-vm2': 'red',
  97. 'test-vm3': 'green',
  98. '$dispvm:test-vm3': 'green',
  99. }
  100. self.assertEqual(self.dbus_mock.mock_calls, [
  101. ('', (), {}),
  102. ('().get', ('org.qubesos.PolicyAgent',
  103. '/org/qubesos/PolicyAgent'), {}),
  104. ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'],
  105. '', icons), {}),
  106. ])
  107. def test_011_ask_deny(self):
  108. self.policy_mock.configure_mock(**{
  109. 'return_value.evaluate.return_value.action':
  110. qubespolicy.Action.ask,
  111. 'return_value.evaluate.return_value.target':
  112. None,
  113. 'return_value.evaluate.return_value.targets_for_ask':
  114. ['test-vm1', 'test-vm2'],
  115. 'return_value.evaluate.return_value.handle_user_response'
  116. '.side_effect':
  117. qubespolicy.AccessDenied,
  118. })
  119. self.dbus_mock.configure_mock(**{
  120. 'return_value.get.return_value.Ask.return_value': ''
  121. })
  122. retval = qubespolicy.cli.main(
  123. ['source-id', 'source', 'target', 'service', 'process_ident'])
  124. self.assertEqual(retval, 1)
  125. self.assertEqual(self.policy_mock.mock_calls, [
  126. ('', ('service',), {}),
  127. ('().evaluate', (self.system_info, 'source',
  128. 'target'), {}),
  129. ('().evaluate().handle_user_response', (False,), {}),
  130. ])
  131. icons = {
  132. 'dom0': 'black',
  133. 'test-vm1': 'red',
  134. 'test-vm2': 'red',
  135. 'test-vm3': 'green',
  136. '$dispvm:test-vm3': 'green',
  137. }
  138. self.assertEqual(self.dbus_mock.mock_calls, [
  139. ('', (), {}),
  140. ('().get', ('org.qubesos.PolicyAgent',
  141. '/org/qubesos/PolicyAgent'), {}),
  142. ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'],
  143. '', icons), {}),
  144. ])
  145. def test_012_ask_default_target(self):
  146. self.policy_mock.configure_mock(**{
  147. 'return_value.evaluate.return_value.action':
  148. qubespolicy.Action.ask,
  149. 'return_value.evaluate.return_value.target':
  150. 'test-vm1',
  151. 'return_value.evaluate.return_value.targets_for_ask':
  152. ['test-vm1', 'test-vm2'],
  153. })
  154. self.dbus_mock.configure_mock(**{
  155. 'return_value.get.return_value.Ask.return_value': 'test-vm1'
  156. })
  157. retval = qubespolicy.cli.main(
  158. ['source-id', 'source', 'target', 'service', 'process_ident'])
  159. self.assertEqual(retval, 0)
  160. self.assertEqual(self.policy_mock.mock_calls, [
  161. ('', ('service',), {}),
  162. ('().evaluate', (self.system_info, 'source',
  163. 'target'), {}),
  164. ('().evaluate().handle_user_response', (True, 'test-vm1'), {}),
  165. ('().evaluate().execute', ('process_ident,source,source-id',), {}),
  166. ])
  167. icons = {
  168. 'dom0': 'black',
  169. 'test-vm1': 'red',
  170. 'test-vm2': 'red',
  171. 'test-vm3': 'green',
  172. '$dispvm:test-vm3': 'green',
  173. }
  174. self.assertEqual(self.dbus_mock.mock_calls, [
  175. ('', (), {}),
  176. ('().get', ('org.qubesos.PolicyAgent',
  177. '/org/qubesos/PolicyAgent'), {}),
  178. ('().get().Ask', ('source', 'service', ['test-vm1', 'test-vm2'],
  179. 'test-vm1', icons), {}),
  180. ])
  181. def test_020_deny(self):
  182. self.policy_mock.configure_mock(**{
  183. 'return_value.evaluate.return_value.action':
  184. qubespolicy.Action.deny,
  185. 'return_value.evaluate.return_value.execute.side_effect':
  186. qubespolicy.AccessDenied,
  187. })
  188. retval = qubespolicy.cli.main(
  189. ['source-id', 'source', 'target', 'service', 'process_ident'])
  190. self.assertEqual(retval, 1)
  191. self.assertEqual(self.policy_mock.mock_calls, [
  192. ('', ('service',), {}),
  193. ('().evaluate', (self.system_info, 'source',
  194. 'target'), {}),
  195. ('().evaluate().target.__str__', (), {}),
  196. ('().evaluate().execute', ('process_ident,source,source-id',), {}),
  197. ])
  198. self.assertEqual(self.dbus_mock.mock_calls, [])
  199. def test_030_just_evaluate_allow(self):
  200. self.policy_mock.configure_mock(**{
  201. 'return_value.evaluate.return_value.action':
  202. qubespolicy.Action.allow,
  203. })
  204. retval = qubespolicy.cli.main(
  205. ['--just-evaluate',
  206. 'source-id', 'source', 'target', 'service', 'process_ident'])
  207. self.assertEqual(retval, 0)
  208. self.assertEqual(self.policy_mock.mock_calls, [
  209. ('', ('service',), {}),
  210. ('().evaluate', (self.system_info, 'source',
  211. 'target'), {}),
  212. ])
  213. self.assertEqual(self.dbus_mock.mock_calls, [])
  214. def test_031_just_evaluate_deny(self):
  215. self.policy_mock.configure_mock(**{
  216. 'return_value.evaluate.return_value.action':
  217. qubespolicy.Action.deny,
  218. })
  219. retval = qubespolicy.cli.main(
  220. ['--just-evaluate',
  221. 'source-id', 'source', 'target', 'service', 'process_ident'])
  222. self.assertEqual(retval, 1)
  223. self.assertEqual(self.policy_mock.mock_calls, [
  224. ('', ('service',), {}),
  225. ('().evaluate', (self.system_info, 'source',
  226. 'target'), {}),
  227. ])
  228. self.assertEqual(self.dbus_mock.mock_calls, [])
  229. def test_032_just_evaluate_ask(self):
  230. self.policy_mock.configure_mock(**{
  231. 'return_value.evaluate.return_value.action':
  232. qubespolicy.Action.ask,
  233. })
  234. retval = qubespolicy.cli.main(
  235. ['--just-evaluate',
  236. 'source-id', 'source', 'target', 'service', 'process_ident'])
  237. self.assertEqual(retval, 1)
  238. self.assertEqual(self.policy_mock.mock_calls, [
  239. ('', ('service',), {}),
  240. ('().evaluate', (self.system_info, 'source',
  241. 'target'), {}),
  242. ])
  243. self.assertEqual(self.dbus_mock.mock_calls, [])
  244. def test_033_just_evaluate_ask_assume_yes(self):
  245. self.policy_mock.configure_mock(**{
  246. 'return_value.evaluate.return_value.action':
  247. qubespolicy.Action.ask,
  248. })
  249. retval = qubespolicy.cli.main(
  250. ['--just-evaluate', '--assume-yes-for-ask',
  251. 'source-id', 'source', 'target', 'service', 'process_ident'])
  252. self.assertEqual(retval, 0)
  253. self.assertEqual(self.policy_mock.mock_calls, [
  254. ('', ('service',), {}),
  255. ('().evaluate', (self.system_info, 'source',
  256. 'target'), {}),
  257. ])
  258. self.assertEqual(self.dbus_mock.mock_calls, [])
  259. def test_040_create_policy(self):
  260. self.policy_mock.configure_mock(**{
  261. 'side_effect':
  262. [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT],
  263. 'return_value.evaluate.return_value.action':
  264. qubespolicy.Action.allow,
  265. })
  266. self.dbus_mock.configure_mock(**{
  267. 'return_value.get.return_value.ConfirmPolicyCreate.return_value':
  268. True
  269. })
  270. retval = qubespolicy.cli.main(
  271. ['source-id', 'source', 'target', 'service', 'process_ident'])
  272. self.assertEqual(retval, 0)
  273. self.assertEqual(self.policy_mock.mock_calls, [
  274. ('', ('service',), {}),
  275. ('', ('service',), {}),
  276. ('().evaluate', (self.system_info, 'source',
  277. 'target'), {}),
  278. ('().evaluate().target.__str__', (), {}),
  279. ('().evaluate().execute', ('process_ident,source,source-id',), {}),
  280. ])
  281. self.assertEqual(self.dbus_mock.mock_calls, [
  282. ('', (), {}),
  283. ('().get', ('org.qubesos.PolicyAgent',
  284. '/org/qubesos/PolicyAgent'), {}),
  285. ('().get().ConfirmPolicyCreate', ('source', 'service'), {}),
  286. ])
  287. policy_path = os.path.join(self.policy_dir.name, 'service')
  288. self.assertTrue(os.path.exists(policy_path))
  289. with open(policy_path) as policy_file:
  290. self.assertEqual(policy_file.read(),
  291. "## Policy file automatically created on first service call.\n"
  292. "## Fill free to edit.\n"
  293. "## Note that policy parsing stops at the first match\n"
  294. "\n"
  295. "## Please use a single # to start your custom comments\n"
  296. "\n"
  297. "$anyvm $anyvm ask\n")
  298. def test_041_create_policy_abort(self):
  299. self.policy_mock.configure_mock(**{
  300. 'side_effect':
  301. [qubespolicy.PolicyNotFound('service'), unittest.mock.DEFAULT],
  302. 'return_value.evaluate.return_value.action':
  303. qubespolicy.Action.deny,
  304. })
  305. self.dbus_mock.configure_mock(**{
  306. 'return_value.get.return_value.ConfirmPolicyCreate.return_value':
  307. False
  308. })
  309. retval = qubespolicy.cli.main(
  310. ['source-id', 'source', 'target', 'service', 'process_ident'])
  311. self.assertEqual(retval, 1)
  312. self.assertEqual(self.policy_mock.mock_calls, [
  313. ('', ('service',), {}),
  314. ])
  315. self.assertEqual(self.dbus_mock.mock_calls, [
  316. ('', (), {}),
  317. ('().get', ('org.qubesos.PolicyAgent',
  318. '/org/qubesos/PolicyAgent'), {}),
  319. ('().get().ConfirmPolicyCreate', ('source', 'service'), {}),
  320. ])
  321. policy_path = os.path.join(self.policy_dir.name, 'service')
  322. self.assertFalse(os.path.exists(policy_path))