__init__.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import socket
  22. import unittest.mock
  23. import shutil
  24. import qubes.tests
  25. import qubespolicy
  26. tmp_policy_dir = '/tmp/policy'
  27. system_info = {
  28. 'domains': {
  29. 'dom0': {
  30. 'tags': [],
  31. 'type': 'AdminVM',
  32. 'default_dispvm': 'default-dvm',
  33. 'dispvm_allowed': False,
  34. },
  35. 'test-vm1': {
  36. 'tags': ['tag1', 'tag2'],
  37. 'type': 'AppVM',
  38. 'default_dispvm': 'default-dvm',
  39. 'dispvm_allowed': False,
  40. },
  41. 'test-vm2': {
  42. 'tags': ['tag2'],
  43. 'type': 'AppVM',
  44. 'default_dispvm': 'default-dvm',
  45. 'dispvm_allowed': False,
  46. },
  47. 'test-vm3': {
  48. 'tags': [],
  49. 'type': 'AppVM',
  50. 'default_dispvm': 'default-dvm',
  51. 'dispvm_allowed': True,
  52. },
  53. 'default-dvm': {
  54. 'tags': [],
  55. 'type': 'AppVM',
  56. 'default_dispvm': 'default-dvm',
  57. 'dispvm_allowed': True,
  58. },
  59. 'test-invalid-dvm': {
  60. 'tags': ['tag1', 'tag2'],
  61. 'type': 'AppVM',
  62. 'default_dispvm': 'test-vm1',
  63. 'dispvm_allowed': False,
  64. },
  65. 'test-no-dvm': {
  66. 'tags': ['tag1', 'tag2'],
  67. 'type': 'AppVM',
  68. 'default_dispvm': None,
  69. 'dispvm_allowed': False,
  70. },
  71. 'test-template': {
  72. 'tags': ['tag1', 'tag2'],
  73. 'type': 'TemplateVM',
  74. 'default_dispvm': 'default-dvm',
  75. 'dispvm_allowed': False,
  76. },
  77. 'test-standalone': {
  78. 'tags': ['tag1', 'tag2'],
  79. 'type': 'StandaloneVM',
  80. 'default_dispvm': 'default-dvm',
  81. 'dispvm_allowed': False,
  82. },
  83. }
  84. }
  85. class TC_00_PolicyRule(qubes.tests.QubesTestCase):
  86. def test_000_verify_target_value(self):
  87. self.assertTrue(
  88. qubespolicy.verify_target_value(system_info, 'test-vm1'))
  89. self.assertTrue(
  90. qubespolicy.verify_target_value(system_info, 'default-dvm'))
  91. self.assertTrue(
  92. qubespolicy.verify_target_value(system_info, '$dispvm'))
  93. self.assertTrue(
  94. qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm'))
  95. self.assertTrue(
  96. qubespolicy.verify_target_value(system_info, 'test-template'))
  97. self.assertTrue(
  98. qubespolicy.verify_target_value(system_info, 'test-standalone'))
  99. self.assertFalse(
  100. qubespolicy.verify_target_value(system_info, 'no-such-vm'))
  101. self.assertFalse(
  102. qubespolicy.verify_target_value(system_info,
  103. '$dispvm:test-invalid-dvm'))
  104. self.assertFalse(
  105. qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1'))
  106. self.assertFalse(
  107. qubespolicy.verify_target_value(system_info, ''))
  108. self.assertFalse(
  109. qubespolicy.verify_target_value(system_info, '$default'))
  110. self.assertFalse(
  111. qubespolicy.verify_target_value(system_info, '$anyvm'))
  112. self.assertFalse(
  113. qubespolicy.verify_target_value(system_info, '$tag:tag1'))
  114. self.assertFalse(
  115. qubespolicy.verify_target_value(system_info, '$invalid'))
  116. def test_010_verify_special_value(self):
  117. self.assertTrue(qubespolicy.verify_special_value('$tag:tag',
  118. for_target=False))
  119. self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag',
  120. for_target=False))
  121. self.assertTrue(qubespolicy.verify_special_value('$type:AppVM',
  122. for_target=False))
  123. self.assertFalse(qubespolicy.verify_special_value('$default',
  124. for_target=False))
  125. self.assertFalse(qubespolicy.verify_special_value('$dispvm',
  126. for_target=False))
  127. self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm',
  128. for_target=False))
  129. self.assertFalse(qubespolicy.verify_special_value('$invalid',
  130. for_target=False))
  131. self.assertFalse(qubespolicy.verify_special_value('vm-name',
  132. for_target=False))
  133. self.assertFalse(qubespolicy.verify_special_value('$tag:',
  134. for_target=False))
  135. self.assertFalse(qubespolicy.verify_special_value('$type:',
  136. for_target=False))
  137. def test_020_line_simple(self):
  138. line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12)
  139. self.assertEqual(line.filename, 'filename')
  140. self.assertEqual(line.lineno, 12)
  141. self.assertEqual(line.action, qubespolicy.Action.ask)
  142. self.assertEqual(line.source, '$anyvm')
  143. self.assertEqual(line.target, '$anyvm')
  144. self.assertEqual(line.full_action, 'ask')
  145. self.assertIsNone(line.override_target)
  146. self.assertIsNone(line.override_user)
  147. self.assertIsNone(line.default_target)
  148. def test_021_line_simple(self):
  149. line = qubespolicy.PolicyRule(
  150. '$tag:tag1 $type:AppVM ask,target=test-vm2,user=user',
  151. 'filename', 12)
  152. self.assertEqual(line.filename, 'filename')
  153. self.assertEqual(line.lineno, 12)
  154. self.assertEqual(line.action, qubespolicy.Action.ask)
  155. self.assertEqual(line.source, '$tag:tag1')
  156. self.assertEqual(line.target, '$type:AppVM')
  157. self.assertEqual(line.full_action, 'ask,target=test-vm2,user=user')
  158. self.assertEqual(line.override_target, 'test-vm2')
  159. self.assertEqual(line.override_user, 'user')
  160. self.assertIsNone(line.default_target)
  161. def test_022_line_simple(self):
  162. line = qubespolicy.PolicyRule(
  163. '$anyvm $default allow,target=$dispvm:test-vm2',
  164. 'filename', 12)
  165. self.assertEqual(line.filename, 'filename')
  166. self.assertEqual(line.lineno, 12)
  167. self.assertEqual(line.action, qubespolicy.Action.allow)
  168. self.assertEqual(line.source, '$anyvm')
  169. self.assertEqual(line.target, '$default')
  170. self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2')
  171. self.assertEqual(line.override_target, '$dispvm:test-vm2')
  172. self.assertIsNone(line.override_user)
  173. self.assertIsNone(line.default_target)
  174. def test_023_line_simple(self):
  175. line = qubespolicy.PolicyRule(
  176. '$anyvm $default ask,default_target=test-vm1',
  177. 'filename', 12)
  178. self.assertEqual(line.filename, 'filename')
  179. self.assertEqual(line.lineno, 12)
  180. self.assertEqual(line.action, qubespolicy.Action.ask)
  181. self.assertEqual(line.source, '$anyvm')
  182. self.assertEqual(line.target, '$default')
  183. self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
  184. self.assertIsNone(line.override_target)
  185. self.assertIsNone(line.override_user)
  186. self.assertEqual(line.default_target, 'test-vm1')
  187. def test_030_line_invalid(self):
  188. invalid_lines = [
  189. '$dispvm $default allow', # $dispvm can't be a source
  190. '$default $default allow', # $default can't be a source
  191. '$anyvm $default deny,target=test-vm1', # target= used with deny
  192. '$anyvm $anyvm deny,default_target=test-vm1', # default_target=
  193. # with deny
  194. '$anyvm $anyvm deny,user=user', # user= with deny
  195. '$anyvm $anyvm invalid', # invalid action
  196. '$anyvm $anyvm allow,invalid=xx', # invalid option
  197. '$anyvm $anyvm', # missing action
  198. '$anyvm $anyvm allow,default_target=test-vm1', # default_target=
  199. # with allow
  200. '$invalid $anyvm allow', # invalid source
  201. '$anyvm $invalid deny', # invalid target
  202. '', # empty line
  203. '$anyvm $anyvm allow extra', # trailing words
  204. '$anyvm $default allow', # $default allow without target=
  205. ]
  206. for line in invalid_lines:
  207. with self.subTest(line):
  208. with self.assertRaises(qubespolicy.PolicySyntaxError):
  209. qubespolicy.PolicyRule(line, 'filename', 12)
  210. def test_040_match_single(self):
  211. is_match_single = qubespolicy.PolicyRule.is_match_single
  212. self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
  213. self.assertTrue(is_match_single(system_info, '$anyvm', '$default'))
  214. self.assertTrue(is_match_single(system_info, '$anyvm', ''))
  215. self.assertTrue(is_match_single(system_info, '$default', ''))
  216. self.assertTrue(is_match_single(system_info, '$default', '$default'))
  217. self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1'))
  218. self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1'))
  219. self.assertTrue(is_match_single(system_info,
  220. '$type:TemplateVM', 'test-template'))
  221. self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
  222. self.assertTrue(is_match_single(system_info,
  223. '$anyvm', '$dispvm:default-dvm'))
  224. self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm'))
  225. self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
  226. self.assertTrue(is_match_single(system_info,
  227. '$dispvm:default-dvm', '$dispvm:default-dvm'))
  228. self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm'))
  229. self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
  230. self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
  231. self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1'))
  232. self.assertFalse(is_match_single(system_info, '$default', 'test-vm1'))
  233. self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3'))
  234. self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
  235. # test-vm1.dispvm_allowed=False
  236. self.assertFalse(is_match_single(system_info,
  237. '$anyvm', '$dispvm:test-vm1'))
  238. # test-vm1.dispvm_allowed=False
  239. self.assertFalse(is_match_single(system_info,
  240. '$dispvm:test-vm1', '$dispvm:test-vm1'))
  241. self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0'))
  242. self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0'))
  243. self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1'))
  244. self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM'))
  245. self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid'))
  246. self.assertFalse(is_match_single(system_info, '$invalid', '$invalid'))
  247. self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm'))
  248. self.assertFalse(is_match_single(system_info,
  249. 'no-such-vm', 'no-such-vm'))
  250. self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1'))
  251. self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm'))
  252. self.assertFalse(is_match_single(system_info,
  253. '$dispvm:default-dvm', 'default-dvm'))
  254. self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n'))
  255. self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1 '))
  256. def test_050_match(self):
  257. line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  258. self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
  259. line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  260. self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
  261. line = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  262. self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
  263. def test_060_expand_target(self):
  264. lines = {
  265. '$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
  266. '$dispvm:test-vm3',
  267. 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
  268. 'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'],
  269. '$anyvm $dispvm allow': ['$dispvm'],
  270. '$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'],
  271. # no DispVM from test-vm1 allowed
  272. '$anyvm $dispvm:test-vm1 allow': [],
  273. '$anyvm test-vm1 allow': ['test-vm1'],
  274. '$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
  275. 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
  276. '$anyvm $type:TemplateVM allow': ['test-template'],
  277. '$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
  278. 'test-template', 'test-standalone', 'test-no-dvm'],
  279. '$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2',
  280. 'test-invalid-dvm', 'test-template', 'test-standalone',
  281. 'test-no-dvm'],
  282. '$anyvm $tag:no-such-tag allow': [],
  283. }
  284. for line in lines:
  285. with self.subTest(line):
  286. policy_line = qubespolicy.PolicyRule(line)
  287. self.assertCountEqual(list(policy_line.expand_target(system_info)),
  288. lines[line])
  289. def test_070_expand_override_target(self):
  290. line = qubespolicy.PolicyRule(
  291. '$anyvm $anyvm allow,target=test-vm2')
  292. self.assertEqual(
  293. line.expand_override_target(system_info, 'test-vm1'),
  294. 'test-vm2')
  295. def test_071_expand_override_target_dispvm(self):
  296. line = qubespolicy.PolicyRule(
  297. '$anyvm $anyvm allow,target=$dispvm')
  298. self.assertEqual(
  299. line.expand_override_target(system_info, 'test-vm1'),
  300. '$dispvm:default-dvm')
  301. def test_072_expand_override_target_dispvm_specific(self):
  302. line = qubespolicy.PolicyRule(
  303. '$anyvm $anyvm allow,target=$dispvm:test-vm3')
  304. self.assertEqual(
  305. line.expand_override_target(system_info, 'test-vm1'),
  306. '$dispvm:test-vm3')
  307. def test_073_expand_override_target_dispvm_none(self):
  308. line = qubespolicy.PolicyRule(
  309. '$anyvm $anyvm allow,target=$dispvm')
  310. self.assertEqual(
  311. line.expand_override_target(system_info, 'test-no-dvm'),
  312. None)
  313. def test_074_expand_override_target_dom0(self):
  314. line = qubespolicy.PolicyRule(
  315. '$anyvm $anyvm allow,target=dom0')
  316. self.assertEqual(
  317. line.expand_override_target(system_info, 'test-no-dvm'),
  318. 'dom0')
  319. class TC_10_PolicyAction(qubes.tests.QubesTestCase):
  320. def test_000_init(self):
  321. rule = qubespolicy.PolicyRule('$anyvm $anyvm deny')
  322. with self.assertRaises(qubespolicy.AccessDenied):
  323. qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
  324. rule, 'test-vm2')
  325. def test_001_init(self):
  326. rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
  327. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  328. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  329. self.assertEqual(action.service, 'test.service')
  330. self.assertEqual(action.source, 'test-vm1')
  331. self.assertIsNone(action.target)
  332. self.assertEqual(action.original_target, 'test-vm2')
  333. self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3'])
  334. self.assertEqual(action.rule, rule)
  335. self.assertEqual(action.action, qubespolicy.Action.ask)
  336. def test_002_init_invalid(self):
  337. rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
  338. rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  339. with self.assertRaises(AssertionError):
  340. qubespolicy.PolicyAction('test.service', 'test-vm1',
  341. None, rule_allow, 'test-vm2', None)
  342. with self.assertRaises(AssertionError):
  343. qubespolicy.PolicyAction('test.service', 'test-vm1',
  344. 'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3'])
  345. with self.assertRaises(AssertionError):
  346. qubespolicy.PolicyAction('test.service', 'test-vm1',
  347. None, rule_ask, 'test-vm2', None)
  348. def test_003_init_default_target(self):
  349. rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask')
  350. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  351. 'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
  352. self.assertIsNone(action.target)
  353. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  354. 'test-vm2', rule_ask, 'test-vm2', ['test-vm2'])
  355. self.assertEqual(action.target, 'test-vm2')
  356. def test_010_handle_user_response(self):
  357. rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
  358. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  359. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  360. action.handle_user_response(True, 'test-vm2')
  361. self.assertEqual(action.action, qubespolicy.Action.allow)
  362. self.assertEqual(action.target, 'test-vm2')
  363. def test_011_handle_user_response(self):
  364. rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
  365. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  366. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  367. with self.assertRaises(AssertionError):
  368. action.handle_user_response(True, 'test-no-dvm')
  369. def test_012_handle_user_response(self):
  370. rule = qubespolicy.PolicyRule('$anyvm $anyvm ask')
  371. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  372. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  373. with self.assertRaises(qubespolicy.AccessDenied):
  374. action.handle_user_response(False, None)
  375. self.assertEqual(action.action, qubespolicy.Action.deny)
  376. @unittest.mock.patch('qubespolicy.qubesd_call')
  377. @unittest.mock.patch('subprocess.call')
  378. def test_020_execute(self, mock_subprocess, mock_qubesd_call):
  379. rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  380. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  381. 'test-vm2', rule, 'test-vm2')
  382. action.execute('some-ident')
  383. self.assertEqual(mock_qubesd_call.mock_calls,
  384. [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')])
  385. self.assertEqual(mock_subprocess.mock_calls,
  386. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
  387. '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
  388. @unittest.mock.patch('qubespolicy.qubesd_call')
  389. @unittest.mock.patch('subprocess.call')
  390. def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
  391. rule = qubespolicy.PolicyRule('$anyvm dom0 allow')
  392. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  393. 'dom0', rule, 'dom0')
  394. action.execute('some-ident')
  395. self.assertEqual(mock_qubesd_call.mock_calls,
  396. [unittest.mock.call('dom0', 'mgmtinternal.vm.Start')])
  397. self.assertEqual(mock_subprocess.mock_calls,
  398. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
  399. '-c', 'some-ident',
  400. qubespolicy.QUBES_RPC_MULTIPLEXER_PATH +
  401. ' test.service test-vm1 dom0'])])
  402. @unittest.mock.patch('qubespolicy.qubesd_call')
  403. @unittest.mock.patch('subprocess.call')
  404. def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
  405. rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow')
  406. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  407. '$dispvm:default-dvm', rule, '$dispvm:default-dvm')
  408. mock_qubesd_call.side_effect = (lambda target, call:
  409. b'dispvm-name' if call == 'mgmtinternal.vm.Create.DispVM' else
  410. unittest.mock.DEFAULT)
  411. action.execute('some-ident')
  412. self.assertEqual(mock_qubesd_call.mock_calls,
  413. [unittest.mock.call('default-dvm', 'mgmtinternal.vm.Create.DispVM'),
  414. unittest.mock.call('dispvm-name', 'mgmtinternal.vm.Start'),
  415. unittest.mock.call('dispvm-name',
  416. 'mgmtinternal.vm.CleanupDispVM')])
  417. self.assertEqual(mock_subprocess.mock_calls,
  418. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name',
  419. '-c', 'some-ident', '-W',
  420. 'DEFAULT:QUBESRPC test.service test-vm1'])])
  421. @unittest.mock.patch('qubespolicy.qubesd_call')
  422. @unittest.mock.patch('subprocess.call')
  423. def test_023_execute_already_running(self, mock_subprocess,
  424. mock_qubesd_call):
  425. rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  426. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  427. 'test-vm2', rule, 'test-vm2')
  428. mock_qubesd_call.side_effect = \
  429. qubespolicy.QubesMgmtException('QubesVMNotHaltedError')
  430. action.execute('some-ident')
  431. self.assertEqual(mock_qubesd_call.mock_calls,
  432. [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')])
  433. self.assertEqual(mock_subprocess.mock_calls,
  434. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
  435. '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
  436. @unittest.mock.patch('qubespolicy.qubesd_call')
  437. @unittest.mock.patch('subprocess.call')
  438. def test_024_execute_startup_error(self, mock_subprocess,
  439. mock_qubesd_call):
  440. rule = qubespolicy.PolicyRule('$anyvm $anyvm allow')
  441. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  442. 'test-vm2', rule, 'test-vm2')
  443. mock_qubesd_call.side_effect = \
  444. qubespolicy.QubesMgmtException('QubesVMError')
  445. with self.assertRaises(qubespolicy.QubesMgmtException):
  446. action.execute('some-ident')
  447. self.assertEqual(mock_qubesd_call.mock_calls,
  448. [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')])
  449. self.assertEqual(mock_subprocess.mock_calls, [])
  450. @unittest.mock.patch('qubespolicy.POLICY_DIR', tmp_policy_dir)
  451. class TC_20_Policy(qubes.tests.QubesTestCase):
  452. def setUp(self):
  453. super(TC_20_Policy, self).setUp()
  454. if not os.path.exists(tmp_policy_dir):
  455. os.mkdir(tmp_policy_dir)
  456. def tearDown(self):
  457. shutil.rmtree(tmp_policy_dir)
  458. super(TC_20_Policy, self).tearDown()
  459. def test_000_load(self):
  460. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  461. f.write('test-vm1 test-vm2 allow\n')
  462. f.write('\n')
  463. f.write('# comment\n')
  464. f.write('test-vm2 test-vm3 ask\n')
  465. f.write(' # comment \n')
  466. f.write('$anyvm $anyvm ask\n')
  467. policy = qubespolicy.Policy('test.service')
  468. self.assertEqual(policy.service, 'test.service')
  469. self.assertEqual(len(policy.policy_rules), 3)
  470. self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
  471. self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
  472. self.assertEqual(policy.policy_rules[0].action,
  473. qubespolicy.Action.allow)
  474. def test_001_not_existent(self):
  475. with self.assertRaises(qubespolicy.AccessDenied):
  476. qubespolicy.Policy('no-such.service')
  477. def test_002_include(self):
  478. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  479. f.write('test-vm1 test-vm2 allow\n')
  480. f.write('$include:test.service2\n')
  481. f.write('$anyvm $anyvm deny\n')
  482. with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
  483. f.write('test-vm3 $default allow,target=test-vm2\n')
  484. policy = qubespolicy.Policy('test.service')
  485. self.assertEqual(policy.service, 'test.service')
  486. self.assertEqual(len(policy.policy_rules), 3)
  487. self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
  488. self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
  489. self.assertEqual(policy.policy_rules[0].action,
  490. qubespolicy.Action.allow)
  491. self.assertEqual(policy.policy_rules[0].filename,
  492. tmp_policy_dir + '/test.service')
  493. self.assertEqual(policy.policy_rules[0].lineno, 1)
  494. self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
  495. self.assertEqual(policy.policy_rules[1].target, '$default')
  496. self.assertEqual(policy.policy_rules[1].action,
  497. qubespolicy.Action.allow)
  498. self.assertEqual(policy.policy_rules[1].filename,
  499. tmp_policy_dir + '/test.service2')
  500. self.assertEqual(policy.policy_rules[1].lineno, 1)
  501. self.assertEqual(policy.policy_rules[2].source, '$anyvm')
  502. self.assertEqual(policy.policy_rules[2].target, '$anyvm')
  503. self.assertEqual(policy.policy_rules[2].action,
  504. qubespolicy.Action.deny)
  505. self.assertEqual(policy.policy_rules[2].filename,
  506. tmp_policy_dir + '/test.service')
  507. self.assertEqual(policy.policy_rules[2].lineno, 3)
  508. def test_010_find_rule(self):
  509. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  510. f.write('test-vm1 test-vm2 allow\n')
  511. f.write('test-vm1 $anyvm ask\n')
  512. f.write('test-vm2 $tag:tag1 deny\n')
  513. f.write('test-vm2 $tag:tag2 allow\n')
  514. f.write('$type:AppVM $default allow,target=test-vm3\n')
  515. f.write('$tag:tag1 $type:AppVM allow\n')
  516. policy = qubespolicy.Policy('test.service')
  517. self.assertEqual(policy.find_matching_rule(
  518. system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
  519. self.assertEqual(policy.find_matching_rule(
  520. system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1])
  521. self.assertEqual(policy.find_matching_rule(
  522. system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
  523. self.assertEqual(policy.find_matching_rule(
  524. system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
  525. # $anyvm matches $default too
  526. self.assertEqual(policy.find_matching_rule(
  527. system_info, 'test-vm1', ''), policy.policy_rules[1])
  528. self.assertEqual(policy.find_matching_rule(
  529. system_info, 'test-vm2', ''), policy.policy_rules[4])
  530. self.assertEqual(policy.find_matching_rule(
  531. system_info, 'test-vm2', '$default'), policy.policy_rules[4])
  532. self.assertEqual(policy.find_matching_rule(
  533. system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[5])
  534. with self.assertRaises(qubespolicy.AccessDenied):
  535. policy.find_matching_rule(
  536. system_info, 'test-no-dvm', 'test-standalone')
  537. with self.assertRaises(qubespolicy.AccessDenied):
  538. policy.find_matching_rule(
  539. system_info, 'test-standalone', '$default')
  540. def test_020_collect_targets_for_ask(self):
  541. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  542. f.write('test-vm1 test-vm2 allow\n')
  543. f.write('test-vm1 $anyvm ask\n')
  544. f.write('test-vm2 $tag:tag1 deny\n')
  545. f.write('test-vm2 $tag:tag2 allow\n')
  546. f.write('test-no-dvm $type:AppVM deny\n')
  547. f.write('$type:AppVM $default allow,target=test-vm3\n')
  548. f.write('$tag:tag1 $type:AppVM allow\n')
  549. f.write('test-no-dvm $dispvm allow\n')
  550. f.write('test-standalone $dispvm allow\n')
  551. policy = qubespolicy.Policy('test.service')
  552. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  553. 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3',
  554. '$dispvm:test-vm3',
  555. 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
  556. 'test-no-dvm', 'test-template', 'test-standalone'])
  557. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  558. 'test-vm2'), ['test-vm2', 'test-vm3'])
  559. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  560. 'test-vm3'), ['test-vm3'])
  561. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  562. 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
  563. 'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
  564. '$dispvm:default-dvm'])
  565. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  566. 'test-no-dvm'), [])
  567. def test_030_eval_simple(self):
  568. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  569. f.write('test-vm1 test-vm2 allow\n')
  570. policy = qubespolicy.Policy('test.service')
  571. action = policy.evaluate(system_info, 'test-vm1', 'test-vm2')
  572. self.assertEqual(action.rule, policy.policy_rules[0])
  573. self.assertEqual(action.action, qubespolicy.Action.allow)
  574. self.assertEqual(action.target, 'test-vm2')
  575. self.assertEqual(action.original_target, 'test-vm2')
  576. self.assertEqual(action.service, 'test.service')
  577. self.assertIsNone(action.targets_for_ask)
  578. with self.assertRaises(qubespolicy.AccessDenied):
  579. policy.evaluate(system_info, 'test-vm2', '$default')
  580. def test_031_eval_default(self):
  581. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  582. f.write('test-vm1 test-vm2 allow\n')
  583. f.write('test-vm1 $default allow,target=test-vm2\n')
  584. f.write('$tag:tag1 test-vm2 ask\n')
  585. f.write('$tag:tag2 $anyvm allow\n')
  586. f.write('test-vm3 $anyvm deny\n')
  587. policy = qubespolicy.Policy('test.service')
  588. action = policy.evaluate(system_info, 'test-vm1', '$default')
  589. self.assertEqual(action.rule, policy.policy_rules[1])
  590. self.assertEqual(action.action, qubespolicy.Action.allow)
  591. self.assertEqual(action.target, 'test-vm2')
  592. self.assertEqual(action.original_target, '$default')
  593. self.assertEqual(action.service, 'test.service')
  594. self.assertIsNone(action.targets_for_ask)
  595. with self.assertRaises(qubespolicy.AccessDenied):
  596. # action allow should hit, but no target specified (either by
  597. # caller or policy)
  598. policy.evaluate(system_info, 'test-standalone', '$default')
  599. def test_032_eval_ask(self):
  600. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  601. f.write('test-vm1 test-vm2 allow\n')
  602. f.write('test-vm1 $default allow,target=test-vm2\n')
  603. f.write('$tag:tag1 test-vm2 ask\n')
  604. f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
  605. f.write('$tag:tag2 $anyvm allow\n')
  606. f.write('test-vm3 $anyvm deny\n')
  607. policy = qubespolicy.Policy('test.service')
  608. action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
  609. self.assertEqual(action.rule, policy.policy_rules[2])
  610. self.assertEqual(action.action, qubespolicy.Action.ask)
  611. self.assertIsNone(action.target)
  612. self.assertEqual(action.original_target, 'test-vm2')
  613. self.assertEqual(action.service, 'test.service')
  614. self.assertCountEqual(action.targets_for_ask,
  615. ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
  616. 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
  617. 'test-no-dvm', 'test-template', 'test-standalone'])
  618. def test_033_eval_ask(self):
  619. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  620. f.write('test-vm1 test-vm2 allow\n')
  621. f.write('test-vm1 $default allow,target=test-vm2\n')
  622. f.write('$tag:tag1 test-vm2 ask\n')
  623. f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n')
  624. f.write('$tag:tag2 $anyvm allow\n')
  625. f.write('test-vm3 $anyvm deny\n')
  626. policy = qubespolicy.Policy('test.service')
  627. action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
  628. self.assertEqual(action.rule, policy.policy_rules[3])
  629. self.assertEqual(action.action, qubespolicy.Action.ask)
  630. self.assertEqual(action.target, 'test-vm3')
  631. self.assertEqual(action.original_target, 'test-vm3')
  632. self.assertEqual(action.service, 'test.service')
  633. self.assertCountEqual(action.targets_for_ask,
  634. ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3',
  635. 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm',
  636. 'test-no-dvm', 'test-template', 'test-standalone'])
  637. class TC_30_Misc(qubes.tests.QubesTestCase):
  638. @unittest.mock.patch('socket.socket')
  639. def test_000_qubesd_call(self, mock_socket):
  640. mock_config = {
  641. 'return_value.makefile.return_value.read.return_value': b'0\x00data'
  642. }
  643. mock_socket.configure_mock(**mock_config)
  644. result = qubespolicy.qubesd_call('test', 'method')
  645. self.assertEqual(result, b'data')
  646. self.assertEqual(mock_socket.mock_calls, [
  647. unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
  648. unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
  649. unittest.mock.call().sendall(b'dom0'),
  650. unittest.mock.call().sendall(b'\x00'),
  651. unittest.mock.call().sendall(b'method'),
  652. unittest.mock.call().sendall(b'\x00'),
  653. unittest.mock.call().sendall(b'test'),
  654. unittest.mock.call().sendall(b'\x00'),
  655. unittest.mock.call().sendall(b'\x00'),
  656. unittest.mock.call().shutdown(socket.SHUT_WR),
  657. unittest.mock.call().makefile('rb'),
  658. unittest.mock.call().makefile().read(),
  659. ])
  660. @unittest.mock.patch('socket.socket')
  661. def test_001_qubesd_call_arg_payload(self, mock_socket):
  662. mock_config = {
  663. 'return_value.makefile.return_value.read.return_value': b'0\x00data'
  664. }
  665. mock_socket.configure_mock(**mock_config)
  666. result = qubespolicy.qubesd_call('test', 'method', 'arg', b'payload')
  667. self.assertEqual(result, b'data')
  668. self.assertEqual(mock_socket.mock_calls, [
  669. unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
  670. unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
  671. unittest.mock.call().sendall(b'dom0'),
  672. unittest.mock.call().sendall(b'\x00'),
  673. unittest.mock.call().sendall(b'method'),
  674. unittest.mock.call().sendall(b'\x00'),
  675. unittest.mock.call().sendall(b'test'),
  676. unittest.mock.call().sendall(b'\x00'),
  677. unittest.mock.call().sendall(b'arg'),
  678. unittest.mock.call().sendall(b'\x00'),
  679. unittest.mock.call().sendall(b'payload'),
  680. unittest.mock.call().shutdown(socket.SHUT_WR),
  681. unittest.mock.call().makefile('rb'),
  682. unittest.mock.call().makefile().read(),
  683. ])
  684. @unittest.mock.patch('socket.socket')
  685. def test_002_qubesd_call_exception(self, mock_socket):
  686. mock_config = {
  687. 'return_value.makefile.return_value.read.return_value':
  688. b'2\x00SomeError\x00traceback\x00message\x00'
  689. }
  690. mock_socket.configure_mock(**mock_config)
  691. with self.assertRaises(qubespolicy.QubesMgmtException) as e:
  692. qubespolicy.qubesd_call('test', 'method')
  693. self.assertEqual(e.exception.exc_type, 'SomeError')
  694. self.assertEqual(mock_socket.mock_calls, [
  695. unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
  696. unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
  697. unittest.mock.call().sendall(b'dom0'),
  698. unittest.mock.call().sendall(b'\x00'),
  699. unittest.mock.call().sendall(b'method'),
  700. unittest.mock.call().sendall(b'\x00'),
  701. unittest.mock.call().sendall(b'test'),
  702. unittest.mock.call().sendall(b'\x00'),
  703. unittest.mock.call().sendall(b'\x00'),
  704. unittest.mock.call().shutdown(socket.SHUT_WR),
  705. unittest.mock.call().makefile('rb'),
  706. unittest.mock.call().makefile().read(),
  707. ])