__init__.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. import os
  21. import socket
  22. import unittest.mock
  23. import shutil
  24. import qubes.tests
  25. import qubespolicy
  26. tmp_policy_dir = '/tmp/policy'
  27. system_info = {
  28. 'domains': {
  29. 'dom0': {
  30. 'tags': ['dom0-tag'],
  31. 'type': 'AdminVM',
  32. 'default_dispvm': 'default-dvm',
  33. 'template_for_dispvms': False,
  34. },
  35. 'test-vm1': {
  36. 'tags': ['tag1', 'tag2'],
  37. 'type': 'AppVM',
  38. 'default_dispvm': 'default-dvm',
  39. 'template_for_dispvms': False,
  40. },
  41. 'test-vm2': {
  42. 'tags': ['tag2'],
  43. 'type': 'AppVM',
  44. 'default_dispvm': 'default-dvm',
  45. 'template_for_dispvms': False,
  46. },
  47. 'test-vm3': {
  48. 'tags': ['tag3'],
  49. 'type': 'AppVM',
  50. 'default_dispvm': 'default-dvm',
  51. 'template_for_dispvms': True,
  52. },
  53. 'default-dvm': {
  54. 'tags': [],
  55. 'type': 'AppVM',
  56. 'default_dispvm': 'default-dvm',
  57. 'template_for_dispvms': True,
  58. },
  59. 'test-invalid-dvm': {
  60. 'tags': ['tag1', 'tag2'],
  61. 'type': 'AppVM',
  62. 'default_dispvm': 'test-vm1',
  63. 'template_for_dispvms': False,
  64. },
  65. 'test-no-dvm': {
  66. 'tags': ['tag1', 'tag2'],
  67. 'type': 'AppVM',
  68. 'default_dispvm': None,
  69. 'template_for_dispvms': False,
  70. },
  71. 'test-template': {
  72. 'tags': ['tag1', 'tag2'],
  73. 'type': 'TemplateVM',
  74. 'default_dispvm': 'default-dvm',
  75. 'template_for_dispvms': False,
  76. },
  77. 'test-standalone': {
  78. 'tags': ['tag1', 'tag2'],
  79. 'type': 'StandaloneVM',
  80. 'default_dispvm': 'default-dvm',
  81. 'template_for_dispvms': False,
  82. },
  83. }
  84. }
  85. class TC_00_PolicyRule(qubes.tests.QubesTestCase):
  86. def test_000_verify_target_value(self):
  87. self.assertTrue(
  88. qubespolicy.verify_target_value(system_info, 'test-vm1'))
  89. self.assertTrue(
  90. qubespolicy.verify_target_value(system_info, 'default-dvm'))
  91. self.assertTrue(
  92. qubespolicy.verify_target_value(system_info, '@dispvm'))
  93. self.assertTrue(
  94. qubespolicy.verify_target_value(system_info, '@dispvm:default-dvm'))
  95. self.assertTrue(
  96. qubespolicy.verify_target_value(system_info, 'test-template'))
  97. self.assertTrue(
  98. qubespolicy.verify_target_value(system_info, 'test-standalone'))
  99. self.assertTrue(
  100. qubespolicy.verify_target_value(system_info, '@adminvm'))
  101. self.assertFalse(
  102. qubespolicy.verify_target_value(system_info, 'no-such-vm'))
  103. self.assertFalse(
  104. qubespolicy.verify_target_value(system_info,
  105. '@dispvm:test-invalid-dvm'))
  106. self.assertFalse(
  107. qubespolicy.verify_target_value(system_info, '@dispvm:test-vm1'))
  108. self.assertFalse(
  109. qubespolicy.verify_target_value(system_info, ''))
  110. self.assertFalse(
  111. qubespolicy.verify_target_value(system_info, '@default'))
  112. self.assertFalse(
  113. qubespolicy.verify_target_value(system_info, '@anyvm'))
  114. self.assertFalse(
  115. qubespolicy.verify_target_value(system_info, '@tag:tag1'))
  116. self.assertFalse(
  117. qubespolicy.verify_target_value(system_info, '@dispvm:@tag:tag1'))
  118. self.assertFalse(
  119. qubespolicy.verify_target_value(system_info, '@invalid'))
  120. def test_010_verify_special_value(self):
  121. self.assertTrue(qubespolicy.verify_special_value('@tag:tag',
  122. for_target=False))
  123. self.assertTrue(qubespolicy.verify_special_value('@tag:other-tag',
  124. for_target=False))
  125. self.assertTrue(qubespolicy.verify_special_value('@type:AppVM',
  126. for_target=False))
  127. self.assertTrue(qubespolicy.verify_special_value('@adminvm',
  128. for_target=False))
  129. self.assertTrue(qubespolicy.verify_special_value('@dispvm:some-vm',
  130. for_target=True))
  131. self.assertTrue(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
  132. for_target=True))
  133. self.assertFalse(qubespolicy.verify_special_value('@default',
  134. for_target=False))
  135. self.assertFalse(qubespolicy.verify_special_value('@dispvm',
  136. for_target=False))
  137. self.assertFalse(qubespolicy.verify_special_value('@dispvm:some-vm',
  138. for_target=False))
  139. self.assertFalse(qubespolicy.verify_special_value('@dispvm:@tag:tag1',
  140. for_target=False))
  141. self.assertFalse(qubespolicy.verify_special_value('@invalid',
  142. for_target=False))
  143. self.assertFalse(qubespolicy.verify_special_value('vm-name',
  144. for_target=False))
  145. self.assertFalse(qubespolicy.verify_special_value('@tag:',
  146. for_target=False))
  147. self.assertFalse(qubespolicy.verify_special_value('@type:',
  148. for_target=False))
  149. def test_020_line_simple(self):
  150. line = qubespolicy.PolicyRule('@anyvm @anyvm ask', 'filename', 12)
  151. self.assertEqual(line.filename, 'filename')
  152. self.assertEqual(line.lineno, 12)
  153. self.assertEqual(line.action, qubespolicy.Action.ask)
  154. self.assertEqual(line.source, '@anyvm')
  155. self.assertEqual(line.target, '@anyvm')
  156. self.assertEqual(line.full_action, 'ask')
  157. self.assertIsNone(line.override_target)
  158. self.assertIsNone(line.override_user)
  159. self.assertIsNone(line.default_target)
  160. def test_021_line_simple(self):
  161. # also check spaces in action field
  162. line = qubespolicy.PolicyRule(
  163. '@tag:tag1 @type:AppVM ask, target=test-vm2, user=user',
  164. 'filename', 12)
  165. self.assertEqual(line.filename, 'filename')
  166. self.assertEqual(line.lineno, 12)
  167. self.assertEqual(line.action, qubespolicy.Action.ask)
  168. self.assertEqual(line.source, '@tag:tag1')
  169. self.assertEqual(line.target, '@type:AppVM')
  170. self.assertEqual(line.full_action, 'ask, target=test-vm2, user=user')
  171. self.assertEqual(line.override_target, 'test-vm2')
  172. self.assertEqual(line.override_user, 'user')
  173. self.assertIsNone(line.default_target)
  174. def test_022_line_simple(self):
  175. line = qubespolicy.PolicyRule(
  176. '@anyvm @default allow,target=@dispvm:test-vm2',
  177. 'filename', 12)
  178. self.assertEqual(line.filename, 'filename')
  179. self.assertEqual(line.lineno, 12)
  180. self.assertEqual(line.action, qubespolicy.Action.allow)
  181. self.assertEqual(line.source, '@anyvm')
  182. self.assertEqual(line.target, '@default')
  183. self.assertEqual(line.full_action, 'allow,target=@dispvm:test-vm2')
  184. self.assertEqual(line.override_target, '@dispvm:test-vm2')
  185. self.assertIsNone(line.override_user)
  186. self.assertIsNone(line.default_target)
  187. def test_023_line_simple(self):
  188. line = qubespolicy.PolicyRule(
  189. '@anyvm @default ask,default_target=test-vm1',
  190. 'filename', 12)
  191. self.assertEqual(line.filename, 'filename')
  192. self.assertEqual(line.lineno, 12)
  193. self.assertEqual(line.action, qubespolicy.Action.ask)
  194. self.assertEqual(line.source, '@anyvm')
  195. self.assertEqual(line.target, '@default')
  196. self.assertEqual(line.full_action, 'ask,default_target=test-vm1')
  197. self.assertIsNone(line.override_target)
  198. self.assertIsNone(line.override_user)
  199. self.assertEqual(line.default_target, 'test-vm1')
  200. def test_024_line_simple(self):
  201. line = qubespolicy.PolicyRule(
  202. '@anyvm @adminvm ask,default_target=@adminvm',
  203. 'filename', 12)
  204. self.assertEqual(line.filename, 'filename')
  205. self.assertEqual(line.lineno, 12)
  206. self.assertEqual(line.action, qubespolicy.Action.ask)
  207. self.assertEqual(line.source, '@anyvm')
  208. self.assertEqual(line.target, '@adminvm')
  209. self.assertEqual(line.full_action, 'ask,default_target=@adminvm')
  210. self.assertIsNone(line.override_target)
  211. self.assertIsNone(line.override_user)
  212. self.assertEqual(line.default_target, '@adminvm')
  213. def test_030_line_invalid(self):
  214. invalid_lines = [
  215. '@dispvm @default allow', # @dispvm can't be a source
  216. '@default @default allow', # @default can't be a source
  217. '@anyvm @default allow,target=@dispvm:@tag:tag1', # @dispvm:@tag
  218. # as override target
  219. '@anyvm @default allow,target=@tag:tag1', # @tag as override target
  220. '@anyvm @default deny,target=test-vm1', # target= used with deny
  221. '@anyvm @anyvm deny,default_target=test-vm1', # default_target=
  222. # with deny
  223. '@anyvm @anyvm deny,user=user', # user= with deny
  224. '@anyvm @anyvm invalid', # invalid action
  225. '@anyvm @anyvm allow,invalid=xx', # invalid option
  226. '@anyvm @anyvm', # missing action
  227. '@anyvm @anyvm allow,default_target=test-vm1', # default_target=
  228. # with allow
  229. '@invalid @anyvm allow', # invalid source
  230. '@anyvm @invalid deny', # invalid target
  231. '', # empty line
  232. '@anyvm @anyvm allow extra', # trailing words
  233. '@anyvm @default allow', # @default allow without target=
  234. ]
  235. for line in invalid_lines:
  236. with self.subTest(line):
  237. with self.assertRaises(qubespolicy.PolicySyntaxError):
  238. qubespolicy.PolicyRule(line, 'filename', 12)
  239. def test_040_match_single(self):
  240. is_match_single = qubespolicy.PolicyRule.is_match_single
  241. self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
  242. self.assertTrue(is_match_single(system_info, '@anyvm', '@default'))
  243. self.assertTrue(is_match_single(system_info, '@default', '@default'))
  244. self.assertTrue(is_match_single(system_info, '@tag:tag1', 'test-vm1'))
  245. self.assertTrue(is_match_single(system_info, '@type:AppVM', 'test-vm1'))
  246. self.assertTrue(is_match_single(system_info,
  247. '@type:TemplateVM', 'test-template'))
  248. self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
  249. self.assertTrue(is_match_single(system_info,
  250. '@anyvm', '@dispvm:default-dvm'))
  251. self.assertTrue(is_match_single(system_info, '@dispvm', '@dispvm'))
  252. self.assertTrue(is_match_single(system_info,
  253. '@dispvm:@tag:tag3', '@dispvm:test-vm3'))
  254. self.assertTrue(is_match_single(system_info, '@adminvm', '@adminvm'))
  255. self.assertTrue(is_match_single(system_info, '@adminvm', 'dom0'))
  256. self.assertTrue(is_match_single(system_info, 'dom0', '@adminvm'))
  257. self.assertTrue(is_match_single(system_info, 'dom0', 'dom0'))
  258. self.assertTrue(is_match_single(system_info,
  259. '@dispvm:default-dvm', '@dispvm:default-dvm'))
  260. self.assertTrue(is_match_single(system_info, '@anyvm', '@dispvm'))
  261. self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
  262. self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
  263. self.assertTrue(is_match_single(system_info, '@anyvm', 'test-vm1'))
  264. self.assertFalse(is_match_single(system_info, '@default', 'test-vm1'))
  265. self.assertFalse(is_match_single(system_info, '@tag:tag1', 'test-vm3'))
  266. self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
  267. # test-vm1.template_for_dispvms=False
  268. self.assertFalse(is_match_single(system_info,
  269. '@anyvm', '@dispvm:test-vm1'))
  270. # test-vm1.template_for_dispvms=False
  271. self.assertFalse(is_match_single(system_info,
  272. '@dispvm:test-vm1', '@dispvm:test-vm1'))
  273. self.assertFalse(is_match_single(system_info,
  274. '@dispvm:@tag:tag1', '@dispvm:test-vm1'))
  275. # test-vm3 has not tag1
  276. self.assertFalse(is_match_single(system_info,
  277. '@dispvm:@tag:tag1', '@dispvm:test-vm3'))
  278. # default-dvm has no tag3
  279. self.assertFalse(is_match_single(system_info,
  280. '@dispvm:@tag:tag3', '@dispvm:default-dvm'))
  281. self.assertFalse(is_match_single(system_info, '@anyvm', 'dom0'))
  282. self.assertFalse(is_match_single(system_info, '@anyvm', '@adminvm'))
  283. self.assertFalse(is_match_single(system_info,
  284. '@tag:dom0-tag', '@adminvm'))
  285. self.assertFalse(is_match_single(system_info,
  286. '@type:AdminVM', '@adminvm'))
  287. self.assertFalse(is_match_single(system_info,
  288. '@tag:dom0-tag', 'dom0'))
  289. self.assertFalse(is_match_single(system_info,
  290. '@type:AdminVM', 'dom0'))
  291. self.assertFalse(is_match_single(system_info, '@tag:tag1', 'dom0'))
  292. self.assertFalse(is_match_single(system_info, '@anyvm', '@tag:tag1'))
  293. self.assertFalse(is_match_single(system_info, '@anyvm', '@type:AppVM'))
  294. self.assertFalse(is_match_single(system_info, '@anyvm', '@invalid'))
  295. self.assertFalse(is_match_single(system_info, '@invalid', '@invalid'))
  296. self.assertFalse(is_match_single(system_info, '@anyvm', 'no-such-vm'))
  297. self.assertFalse(is_match_single(system_info,
  298. 'no-such-vm', 'no-such-vm'))
  299. self.assertFalse(is_match_single(system_info, '@dispvm', 'test-vm1'))
  300. self.assertFalse(is_match_single(system_info, '@dispvm', 'default-dvm'))
  301. self.assertFalse(is_match_single(system_info,
  302. '@dispvm:default-dvm', 'default-dvm'))
  303. self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1\n'))
  304. self.assertFalse(is_match_single(system_info, '@anyvm', 'test-vm1 '))
  305. def test_050_match(self):
  306. line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  307. self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2'))
  308. line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  309. self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2'))
  310. line = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  311. self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm'))
  312. line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
  313. self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
  314. line = qubespolicy.PolicyRule('@anyvm @dispvm allow')
  315. self.assertFalse(line.is_match(system_info,
  316. 'test-vm1', '@dispvm:default-dvm'))
  317. line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
  318. self.assertTrue(line.is_match(system_info, 'test-vm1', '@dispvm'))
  319. line = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
  320. self.assertTrue(line.is_match(system_info,
  321. 'test-vm1', '@dispvm:default-dvm'))
  322. line = qubespolicy.PolicyRule('@anyvm @dispvm:@tag:tag3 allow')
  323. self.assertTrue(line.is_match(system_info,
  324. 'test-vm1', '@dispvm:test-vm3'))
  325. def test_060_expand_target(self):
  326. lines = {
  327. '@anyvm @anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3',
  328. '@dispvm:test-vm3',
  329. 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
  330. 'test-no-dvm', 'test-template', 'test-standalone', '@dispvm'],
  331. '@anyvm @dispvm allow': ['@dispvm'],
  332. '@anyvm @dispvm:default-dvm allow': ['@dispvm:default-dvm'],
  333. # no DispVM from test-vm1 allowed
  334. '@anyvm @dispvm:test-vm1 allow': [],
  335. '@anyvm @dispvm:test-vm3 allow': ['@dispvm:test-vm3'],
  336. '@anyvm @dispvm:@tag:tag1 allow': [],
  337. '@anyvm @dispvm:@tag:tag3 allow': ['@dispvm:test-vm3'],
  338. '@anyvm test-vm1 allow': ['test-vm1'],
  339. '@anyvm @type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3',
  340. 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'],
  341. '@anyvm @type:TemplateVM allow': ['test-template'],
  342. '@anyvm @tag:tag1 allow': ['test-vm1', 'test-invalid-dvm',
  343. 'test-template', 'test-standalone', 'test-no-dvm'],
  344. '@anyvm @tag:tag2 allow': ['test-vm1', 'test-vm2',
  345. 'test-invalid-dvm', 'test-template', 'test-standalone',
  346. 'test-no-dvm'],
  347. '@anyvm @tag:no-such-tag allow': [],
  348. }
  349. for line in lines:
  350. with self.subTest(line):
  351. policy_line = qubespolicy.PolicyRule(line)
  352. self.assertCountEqual(list(policy_line.expand_target(system_info)),
  353. lines[line])
  354. def test_070_expand_override_target(self):
  355. line = qubespolicy.PolicyRule(
  356. '@anyvm @anyvm allow,target=test-vm2')
  357. self.assertEqual(
  358. line.expand_override_target(system_info, 'test-vm1'),
  359. 'test-vm2')
  360. def test_071_expand_override_target_dispvm(self):
  361. line = qubespolicy.PolicyRule(
  362. '@anyvm @anyvm allow,target=@dispvm')
  363. self.assertEqual(
  364. line.expand_override_target(system_info, 'test-vm1'),
  365. '@dispvm:default-dvm')
  366. def test_072_expand_override_target_dispvm_specific(self):
  367. line = qubespolicy.PolicyRule(
  368. '@anyvm @anyvm allow,target=@dispvm:test-vm3')
  369. self.assertEqual(
  370. line.expand_override_target(system_info, 'test-vm1'),
  371. '@dispvm:test-vm3')
  372. def test_073_expand_override_target_dispvm_none(self):
  373. line = qubespolicy.PolicyRule(
  374. '@anyvm @anyvm allow,target=@dispvm')
  375. self.assertEqual(
  376. line.expand_override_target(system_info, 'test-no-dvm'),
  377. None)
  378. def test_074_expand_override_target_dom0(self):
  379. line = qubespolicy.PolicyRule(
  380. '@anyvm @anyvm allow,target=dom0')
  381. self.assertEqual(
  382. line.expand_override_target(system_info, 'test-no-dvm'),
  383. 'dom0')
  384. def test_075_expand_override_target_dom0(self):
  385. line = qubespolicy.PolicyRule(
  386. '@anyvm @anyvm allow,target=@adminvm')
  387. self.assertEqual(
  388. line.expand_override_target(system_info, 'test-no-dvm'),
  389. '@adminvm')
  390. class TC_10_PolicyAction(qubes.tests.QubesTestCase):
  391. def test_000_init(self):
  392. rule = qubespolicy.PolicyRule('@anyvm @anyvm deny')
  393. with self.assertRaises(qubespolicy.AccessDenied):
  394. qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2',
  395. rule, 'test-vm2')
  396. def test_001_init(self):
  397. rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
  398. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  399. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  400. self.assertEqual(action.service, 'test.service')
  401. self.assertEqual(action.source, 'test-vm1')
  402. self.assertIsNone(action.target)
  403. self.assertEqual(action.original_target, 'test-vm2')
  404. self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3'])
  405. self.assertEqual(action.rule, rule)
  406. self.assertEqual(action.action, qubespolicy.Action.ask)
  407. def test_002_init_invalid(self):
  408. rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
  409. rule_allow = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  410. with self.assertRaises(AssertionError):
  411. qubespolicy.PolicyAction('test.service', 'test-vm1',
  412. None, rule_allow, 'test-vm2', None)
  413. with self.assertRaises(AssertionError):
  414. qubespolicy.PolicyAction('test.service', 'test-vm1',
  415. 'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3'])
  416. with self.assertRaises(AssertionError):
  417. qubespolicy.PolicyAction('test.service', 'test-vm1',
  418. None, rule_ask, 'test-vm2', None)
  419. def test_003_init_default_target(self):
  420. rule_ask = qubespolicy.PolicyRule('@anyvm @anyvm ask')
  421. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  422. 'test-vm1', rule_ask, 'test-vm2', ['test-vm2'])
  423. self.assertIsNone(action.target)
  424. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  425. 'test-vm2', rule_ask, 'test-vm2', ['test-vm2'])
  426. self.assertEqual(action.target, 'test-vm2')
  427. def test_010_handle_user_response(self):
  428. rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
  429. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  430. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  431. action.handle_user_response(True, 'test-vm2')
  432. self.assertEqual(action.action, qubespolicy.Action.allow)
  433. self.assertEqual(action.target, 'test-vm2')
  434. def test_011_handle_user_response(self):
  435. rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
  436. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  437. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  438. with self.assertRaises(AssertionError):
  439. action.handle_user_response(True, 'test-no-dvm')
  440. def test_012_handle_user_response(self):
  441. rule = qubespolicy.PolicyRule('@anyvm @anyvm ask')
  442. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  443. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  444. with self.assertRaises(qubespolicy.AccessDenied):
  445. action.handle_user_response(False, None)
  446. self.assertEqual(action.action, qubespolicy.Action.deny)
  447. def test_013_handle_user_response_with_default_target(self):
  448. rule = qubespolicy.PolicyRule(
  449. '@anyvm @anyvm ask,default_target=test-vm2')
  450. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  451. None, rule, 'test-vm2', ['test-vm2', 'test-vm3'])
  452. action.handle_user_response(True, 'test-vm2')
  453. self.assertEqual(action.action, qubespolicy.Action.allow)
  454. self.assertEqual(action.target, 'test-vm2')
  455. @unittest.mock.patch('qubespolicy.qubesd_call')
  456. @unittest.mock.patch('subprocess.call')
  457. def test_020_execute(self, mock_subprocess, mock_qubesd_call):
  458. rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  459. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  460. 'test-vm2', rule, 'test-vm2')
  461. action.execute('some-ident')
  462. self.assertEqual(mock_qubesd_call.mock_calls,
  463. [unittest.mock.call('test-vm2', 'admin.vm.Start')])
  464. self.assertEqual(mock_subprocess.mock_calls,
  465. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
  466. '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
  467. @unittest.mock.patch('qubespolicy.qubesd_call')
  468. @unittest.mock.patch('subprocess.call')
  469. def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call):
  470. rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
  471. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  472. 'dom0', rule, 'dom0')
  473. action.execute('some-ident')
  474. self.assertEqual(mock_qubesd_call.mock_calls, [])
  475. self.assertEqual(mock_subprocess.mock_calls,
  476. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
  477. '-c', 'some-ident',
  478. 'QUBESRPC test.service test-vm1 name dom0'])])
  479. @unittest.mock.patch('qubespolicy.qubesd_call')
  480. @unittest.mock.patch('subprocess.call')
  481. def test_021_execute_dom0_keyword(self, mock_subprocess, mock_qubesd_call):
  482. rule = qubespolicy.PolicyRule('@anyvm dom0 allow')
  483. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  484. 'dom0', rule, '@adminvm')
  485. action.execute('some-ident')
  486. self.assertEqual(mock_qubesd_call.mock_calls, [])
  487. self.assertEqual(mock_subprocess.mock_calls,
  488. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0',
  489. '-c', 'some-ident',
  490. 'QUBESRPC test.service test-vm1 keyword adminvm'])])
  491. @unittest.mock.patch('qubespolicy.qubesd_call')
  492. @unittest.mock.patch('subprocess.call')
  493. def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call):
  494. rule = qubespolicy.PolicyRule('@anyvm @dispvm:default-dvm allow')
  495. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  496. '@dispvm:default-dvm', rule, '@dispvm:default-dvm')
  497. mock_qubesd_call.side_effect = (lambda target, call:
  498. b'dispvm-name' if call == 'admin.vm.CreateDisposable' else
  499. unittest.mock.DEFAULT)
  500. action.execute('some-ident')
  501. self.assertEqual(mock_qubesd_call.mock_calls,
  502. [unittest.mock.call('default-dvm', 'admin.vm.CreateDisposable'),
  503. unittest.mock.call('dispvm-name', 'admin.vm.Start'),
  504. unittest.mock.call('dispvm-name', 'admin.vm.Kill')])
  505. self.assertEqual(mock_subprocess.mock_calls,
  506. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name',
  507. '-c', 'some-ident', '-W',
  508. 'DEFAULT:QUBESRPC test.service test-vm1'])])
  509. @unittest.mock.patch('qubespolicy.qubesd_call')
  510. @unittest.mock.patch('subprocess.call')
  511. def test_023_execute_already_running(self, mock_subprocess,
  512. mock_qubesd_call):
  513. rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  514. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  515. 'test-vm2', rule, 'test-vm2')
  516. mock_qubesd_call.side_effect = \
  517. qubespolicy.QubesMgmtException('QubesVMNotHaltedError')
  518. action.execute('some-ident')
  519. self.assertEqual(mock_qubesd_call.mock_calls,
  520. [unittest.mock.call('test-vm2', 'admin.vm.Start')])
  521. self.assertEqual(mock_subprocess.mock_calls,
  522. [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2',
  523. '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])])
  524. @unittest.mock.patch('qubespolicy.qubesd_call')
  525. @unittest.mock.patch('subprocess.call')
  526. def test_024_execute_startup_error(self, mock_subprocess,
  527. mock_qubesd_call):
  528. rule = qubespolicy.PolicyRule('@anyvm @anyvm allow')
  529. action = qubespolicy.PolicyAction('test.service', 'test-vm1',
  530. 'test-vm2', rule, 'test-vm2')
  531. mock_qubesd_call.side_effect = \
  532. qubespolicy.QubesMgmtException('QubesVMError')
  533. with self.assertRaises(qubespolicy.QubesMgmtException):
  534. action.execute('some-ident')
  535. self.assertEqual(mock_qubesd_call.mock_calls,
  536. [unittest.mock.call('test-vm2', 'admin.vm.Start')])
  537. self.assertEqual(mock_subprocess.mock_calls, [])
  538. class TC_20_Policy(qubes.tests.QubesTestCase):
  539. def setUp(self):
  540. super(TC_20_Policy, self).setUp()
  541. if not os.path.exists(tmp_policy_dir):
  542. os.mkdir(tmp_policy_dir)
  543. def tearDown(self):
  544. shutil.rmtree(tmp_policy_dir)
  545. super(TC_20_Policy, self).tearDown()
  546. def test_000_load(self):
  547. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  548. f.write('test-vm1 test-vm2 allow\n')
  549. f.write('\n')
  550. f.write('# comment\n')
  551. f.write('test-vm2 test-vm3 ask\n')
  552. f.write(' # comment \n')
  553. f.write('@anyvm @anyvm ask\n')
  554. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  555. self.assertEqual(policy.service, 'test.service')
  556. self.assertEqual(len(policy.policy_rules), 3)
  557. self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
  558. self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
  559. self.assertEqual(policy.policy_rules[0].action,
  560. qubespolicy.Action.allow)
  561. def test_001_not_existent(self):
  562. with self.assertRaises(qubespolicy.AccessDenied):
  563. qubespolicy.Policy('no-such.service', tmp_policy_dir)
  564. def test_002_include(self):
  565. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  566. f.write('test-vm1 test-vm2 allow\n')
  567. f.write('@include:test.service2\n')
  568. f.write('@anyvm @anyvm deny\n')
  569. with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f:
  570. f.write('test-vm3 @default allow,target=test-vm2\n')
  571. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  572. self.assertEqual(policy.service, 'test.service')
  573. self.assertEqual(len(policy.policy_rules), 3)
  574. self.assertEqual(policy.policy_rules[0].source, 'test-vm1')
  575. self.assertEqual(policy.policy_rules[0].target, 'test-vm2')
  576. self.assertEqual(policy.policy_rules[0].action,
  577. qubespolicy.Action.allow)
  578. self.assertEqual(policy.policy_rules[0].filename,
  579. tmp_policy_dir + '/test.service')
  580. self.assertEqual(policy.policy_rules[0].lineno, 1)
  581. self.assertEqual(policy.policy_rules[1].source, 'test-vm3')
  582. self.assertEqual(policy.policy_rules[1].target, '@default')
  583. self.assertEqual(policy.policy_rules[1].action,
  584. qubespolicy.Action.allow)
  585. self.assertEqual(policy.policy_rules[1].filename,
  586. tmp_policy_dir + '/test.service2')
  587. self.assertEqual(policy.policy_rules[1].lineno, 1)
  588. self.assertEqual(policy.policy_rules[2].source, '@anyvm')
  589. self.assertEqual(policy.policy_rules[2].target, '@anyvm')
  590. self.assertEqual(policy.policy_rules[2].action,
  591. qubespolicy.Action.deny)
  592. self.assertEqual(policy.policy_rules[2].filename,
  593. tmp_policy_dir + '/test.service')
  594. self.assertEqual(policy.policy_rules[2].lineno, 3)
  595. def test_003_load_convert(self):
  596. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  597. f.write('test-vm2 test-vm3 ask\n')
  598. f.write(' # comment \n')
  599. f.write('$anyvm $dispvm ask,default_target=$dispvm\n')
  600. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  601. self.assertEqual(policy.service, 'test.service')
  602. self.assertEqual(len(policy.policy_rules), 2)
  603. self.assertEqual(policy.policy_rules[1].source, '@anyvm')
  604. self.assertEqual(policy.policy_rules[1].target, '@dispvm')
  605. self.assertEqual(policy.policy_rules[1].action,
  606. qubespolicy.Action.ask)
  607. self.assertEqual(policy.policy_rules[1].default_target,
  608. '@dispvm')
  609. def test_010_find_rule(self):
  610. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  611. f.write('test-vm1 test-vm2 allow\n')
  612. f.write('test-vm1 @anyvm ask\n')
  613. f.write('test-vm2 @tag:tag1 deny\n')
  614. f.write('test-vm2 @tag:tag2 allow\n')
  615. f.write('test-vm2 @dispvm:@tag:tag3 allow\n')
  616. f.write('test-vm2 @dispvm:@tag:tag2 allow\n')
  617. f.write('test-vm2 @dispvm:default-dvm allow\n')
  618. f.write('@type:AppVM @default allow,target=test-vm3\n')
  619. f.write('@tag:tag1 @type:AppVM allow\n')
  620. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  621. self.assertEqual(policy.find_matching_rule(
  622. system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0])
  623. self.assertEqual(policy.find_matching_rule(
  624. system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1])
  625. self.assertEqual(policy.find_matching_rule(
  626. system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3])
  627. self.assertEqual(policy.find_matching_rule(
  628. system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2])
  629. # @anyvm matches @default too
  630. self.assertEqual(policy.find_matching_rule(
  631. system_info, 'test-vm1', '@default'), policy.policy_rules[1])
  632. self.assertEqual(policy.find_matching_rule(
  633. system_info, 'test-vm2', '@default'), policy.policy_rules[7])
  634. self.assertEqual(policy.find_matching_rule(
  635. system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[8])
  636. self.assertEqual(policy.find_matching_rule(
  637. system_info, 'test-vm2', '@dispvm:test-vm3'),
  638. policy.policy_rules[4])
  639. self.assertEqual(policy.find_matching_rule(
  640. system_info, 'test-vm2', '@dispvm'),
  641. policy.policy_rules[6])
  642. with self.assertRaises(qubespolicy.AccessDenied):
  643. policy.find_matching_rule(
  644. system_info, 'test-no-dvm', 'test-standalone')
  645. with self.assertRaises(qubespolicy.AccessDenied):
  646. policy.find_matching_rule(system_info, 'test-no-dvm', '@dispvm')
  647. with self.assertRaises(qubespolicy.AccessDenied):
  648. policy.find_matching_rule(
  649. system_info, 'test-standalone', '@default')
  650. def test_020_collect_targets_for_ask(self):
  651. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  652. f.write('test-vm1 test-vm2 allow\n')
  653. f.write('test-vm1 @anyvm ask\n')
  654. f.write('test-vm2 @tag:tag1 deny\n')
  655. f.write('test-vm2 @tag:tag2 allow\n')
  656. f.write('test-no-dvm @type:AppVM deny\n')
  657. f.write('@type:AppVM @default allow,target=test-vm3\n')
  658. f.write('@tag:tag1 @type:AppVM allow\n')
  659. f.write('test-no-dvm @dispvm allow\n')
  660. f.write('test-standalone @dispvm allow\n')
  661. f.write('test-standalone @adminvm allow\n')
  662. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  663. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  664. 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3',
  665. '@dispvm:test-vm3',
  666. 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
  667. 'test-no-dvm', 'test-template', 'test-standalone'])
  668. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  669. 'test-vm2'), ['test-vm2', 'test-vm3'])
  670. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  671. 'test-vm3'), ['test-vm3'])
  672. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  673. 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3',
  674. 'default-dvm', 'test-no-dvm', 'test-invalid-dvm',
  675. '@dispvm:default-dvm', 'dom0'])
  676. self.assertCountEqual(policy.collect_targets_for_ask(system_info,
  677. 'test-no-dvm'), [])
  678. def test_030_eval_simple(self):
  679. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  680. f.write('test-vm1 test-vm2 allow\n')
  681. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  682. action = policy.evaluate(system_info, 'test-vm1', 'test-vm2')
  683. self.assertEqual(action.rule, policy.policy_rules[0])
  684. self.assertEqual(action.action, qubespolicy.Action.allow)
  685. self.assertEqual(action.target, 'test-vm2')
  686. self.assertEqual(action.original_target, 'test-vm2')
  687. self.assertEqual(action.service, 'test.service')
  688. self.assertIsNone(action.targets_for_ask)
  689. with self.assertRaises(qubespolicy.AccessDenied):
  690. policy.evaluate(system_info, 'test-vm2', '@default')
  691. def test_031_eval_default(self):
  692. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  693. f.write('test-vm1 test-vm2 allow\n')
  694. f.write('test-vm1 @default allow,target=test-vm2\n')
  695. f.write('@tag:tag1 test-vm2 ask\n')
  696. f.write('@tag:tag2 @anyvm allow\n')
  697. f.write('test-vm3 @anyvm deny\n')
  698. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  699. action = policy.evaluate(system_info, 'test-vm1', '@default')
  700. self.assertEqual(action.rule, policy.policy_rules[1])
  701. self.assertEqual(action.action, qubespolicy.Action.allow)
  702. self.assertEqual(action.target, 'test-vm2')
  703. self.assertEqual(action.original_target, '@default')
  704. self.assertEqual(action.service, 'test.service')
  705. self.assertIsNone(action.targets_for_ask)
  706. with self.assertRaises(qubespolicy.AccessDenied):
  707. # action allow should hit, but no target specified (either by
  708. # caller or policy)
  709. policy.evaluate(system_info, 'test-standalone', '@default')
  710. def test_032_eval_ask(self):
  711. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  712. f.write('test-vm1 test-vm2 allow\n')
  713. f.write('test-vm1 @default allow,target=test-vm2\n')
  714. f.write('@tag:tag1 test-vm2 ask\n')
  715. f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
  716. f.write('@tag:tag2 @anyvm allow\n')
  717. f.write('test-vm3 @anyvm deny\n')
  718. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  719. action = policy.evaluate(system_info, 'test-standalone', 'test-vm2')
  720. self.assertEqual(action.rule, policy.policy_rules[2])
  721. self.assertEqual(action.action, qubespolicy.Action.ask)
  722. self.assertIsNone(action.target)
  723. self.assertEqual(action.original_target, 'test-vm2')
  724. self.assertEqual(action.service, 'test.service')
  725. self.assertCountEqual(action.targets_for_ask,
  726. ['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
  727. 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
  728. 'test-no-dvm', 'test-template', 'test-standalone'])
  729. def test_033_eval_ask(self):
  730. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  731. f.write('test-vm1 test-vm2 allow\n')
  732. f.write('test-vm1 @default allow,target=test-vm2\n')
  733. f.write('@tag:tag1 test-vm2 ask\n')
  734. f.write('@tag:tag1 test-vm3 ask,default_target=test-vm3\n')
  735. f.write('@tag:tag2 @anyvm allow\n')
  736. f.write('test-vm3 @anyvm deny\n')
  737. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  738. action = policy.evaluate(system_info, 'test-standalone', 'test-vm3')
  739. self.assertEqual(action.rule, policy.policy_rules[3])
  740. self.assertEqual(action.action, qubespolicy.Action.ask)
  741. self.assertEqual(action.target, 'test-vm3')
  742. self.assertEqual(action.original_target, 'test-vm3')
  743. self.assertEqual(action.service, 'test.service')
  744. self.assertCountEqual(action.targets_for_ask,
  745. ['test-vm1', 'test-vm2', 'test-vm3', '@dispvm:test-vm3',
  746. 'default-dvm', '@dispvm:default-dvm', 'test-invalid-dvm',
  747. 'test-no-dvm', 'test-template', 'test-standalone'])
  748. def test_034_eval_resolve_dispvm(self):
  749. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  750. f.write('test-vm3 @dispvm allow\n')
  751. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  752. action = policy.evaluate(system_info, 'test-vm3', '@dispvm')
  753. self.assertEqual(action.rule, policy.policy_rules[0])
  754. self.assertEqual(action.action, qubespolicy.Action.allow)
  755. self.assertEqual(action.target, '@dispvm:default-dvm')
  756. self.assertEqual(action.original_target, '@dispvm')
  757. self.assertEqual(action.service, 'test.service')
  758. self.assertIsNone(action.targets_for_ask)
  759. def test_035_eval_resolve_dispvm_fail(self):
  760. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  761. f.write('test-no-dvm @dispvm allow\n')
  762. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  763. with self.assertRaises(qubespolicy.AccessDenied):
  764. policy.evaluate(system_info, 'test-no-dvm', '@dispvm')
  765. def test_036_eval_invalid_override_target(self):
  766. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  767. f.write('test-vm3 @anyvm allow,target=no-such-vm\n')
  768. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  769. with self.assertRaises(qubespolicy.AccessDenied):
  770. policy.evaluate(system_info, 'test-vm3', '@default')
  771. def test_037_eval_ask_no_targets(self):
  772. with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f:
  773. f.write('test-vm3 @default ask\n')
  774. policy = qubespolicy.Policy('test.service', tmp_policy_dir)
  775. with self.assertRaises(qubespolicy.AccessDenied):
  776. policy.evaluate(system_info, 'test-vm3', '@default')
  777. class TC_30_Misc(qubes.tests.QubesTestCase):
  778. @unittest.mock.patch('socket.socket')
  779. def test_000_qubesd_call(self, mock_socket):
  780. mock_config = {
  781. 'return_value.makefile.return_value.read.return_value': b'0\x00data'
  782. }
  783. mock_socket.configure_mock(**mock_config)
  784. result = qubespolicy.qubesd_call('test', 'internal.method')
  785. self.assertEqual(result, b'data')
  786. self.assertEqual(mock_socket.mock_calls, [
  787. unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
  788. unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
  789. unittest.mock.call().sendall(b'dom0'),
  790. unittest.mock.call().sendall(b'\x00'),
  791. unittest.mock.call().sendall(b'internal.method'),
  792. unittest.mock.call().sendall(b'\x00'),
  793. unittest.mock.call().sendall(b'test'),
  794. unittest.mock.call().sendall(b'\x00'),
  795. unittest.mock.call().sendall(b'\x00'),
  796. unittest.mock.call().shutdown(socket.SHUT_WR),
  797. unittest.mock.call().makefile('rb'),
  798. unittest.mock.call().makefile().read(),
  799. ])
  800. @unittest.mock.patch('socket.socket')
  801. def test_001_qubesd_call_arg_payload(self, mock_socket):
  802. mock_config = {
  803. 'return_value.makefile.return_value.read.return_value': b'0\x00data'
  804. }
  805. mock_socket.configure_mock(**mock_config)
  806. result = qubespolicy.qubesd_call('test', 'internal.method', 'arg',
  807. b'payload')
  808. self.assertEqual(result, b'data')
  809. self.assertEqual(mock_socket.mock_calls, [
  810. unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
  811. unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
  812. unittest.mock.call().sendall(b'dom0'),
  813. unittest.mock.call().sendall(b'\x00'),
  814. unittest.mock.call().sendall(b'internal.method'),
  815. unittest.mock.call().sendall(b'\x00'),
  816. unittest.mock.call().sendall(b'test'),
  817. unittest.mock.call().sendall(b'\x00'),
  818. unittest.mock.call().sendall(b'arg'),
  819. unittest.mock.call().sendall(b'\x00'),
  820. unittest.mock.call().sendall(b'payload'),
  821. unittest.mock.call().shutdown(socket.SHUT_WR),
  822. unittest.mock.call().makefile('rb'),
  823. unittest.mock.call().makefile().read(),
  824. ])
  825. @unittest.mock.patch('socket.socket')
  826. def test_002_qubesd_call_exception(self, mock_socket):
  827. mock_config = {
  828. 'return_value.makefile.return_value.read.return_value':
  829. b'2\x00SomeError\x00traceback\x00message\x00'
  830. }
  831. mock_socket.configure_mock(**mock_config)
  832. with self.assertRaises(qubespolicy.QubesMgmtException) as e:
  833. qubespolicy.qubesd_call('test', 'internal.method')
  834. self.assertEqual(e.exception.exc_type, 'SomeError')
  835. self.assertEqual(mock_socket.mock_calls, [
  836. unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM),
  837. unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK),
  838. unittest.mock.call().sendall(b'dom0'),
  839. unittest.mock.call().sendall(b'\x00'),
  840. unittest.mock.call().sendall(b'internal.method'),
  841. unittest.mock.call().sendall(b'\x00'),
  842. unittest.mock.call().sendall(b'test'),
  843. unittest.mock.call().sendall(b'\x00'),
  844. unittest.mock.call().sendall(b'\x00'),
  845. unittest.mock.call().shutdown(socket.SHUT_WR),
  846. unittest.mock.call().makefile('rb'),
  847. unittest.mock.call().makefile().read(),
  848. ])