firewall.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. '''Tests for firewall API. This is mostly copy from core-admin'''
  21. import datetime
  22. import unittest
  23. import qubesadmin.firewall
  24. import qubesadmin.tests
  25. class TestOption(qubesadmin.firewall.RuleChoice):
  26. opt1 = 'opt1'
  27. opt2 = 'opt2'
  28. another = 'another'
  29. # noinspection PyPep8Naming
  30. class TC_00_RuleChoice(qubesadmin.tests.QubesTestCase):
  31. def test_000_accept_allowed(self):
  32. with self.assertNotRaises(ValueError):
  33. TestOption('opt1')
  34. TestOption('opt2')
  35. TestOption('another')
  36. def test_001_value_list(self):
  37. instance = TestOption('opt1')
  38. self.assertEqual(
  39. set(instance.allowed_values), {'opt1', 'opt2', 'another'})
  40. def test_010_reject_others(self):
  41. self.assertRaises(ValueError, lambda: TestOption('invalid'))
  42. class TC_01_Action(qubesadmin.tests.QubesTestCase):
  43. def test_000_allowed_values(self):
  44. with self.assertNotRaises(ValueError):
  45. instance = qubesadmin.firewall.Action('accept')
  46. self.assertEqual(
  47. set(instance.allowed_values), {'accept', 'drop'})
  48. def test_001_rule(self):
  49. instance = qubesadmin.firewall.Action('accept')
  50. self.assertEqual(instance.rule, 'action=accept')
  51. # noinspection PyPep8Naming
  52. class TC_02_Proto(qubesadmin.tests.QubesTestCase):
  53. def test_000_allowed_values(self):
  54. with self.assertNotRaises(ValueError):
  55. instance = qubesadmin.firewall.Proto('tcp')
  56. self.assertEqual(
  57. set(instance.allowed_values), {'tcp', 'udp', 'icmp'})
  58. def test_001_rule(self):
  59. instance = qubesadmin.firewall.Proto('tcp')
  60. self.assertEqual(instance.rule, 'proto=tcp')
  61. # noinspection PyPep8Naming
  62. class TC_02_DstHost(qubesadmin.tests.QubesTestCase):
  63. def test_000_hostname(self):
  64. with self.assertNotRaises(ValueError):
  65. instance = qubesadmin.firewall.DstHost('qubes-os.org')
  66. self.assertEqual(instance.type, 'dsthost')
  67. def test_001_ipv4(self):
  68. with self.assertNotRaises(ValueError):
  69. instance = qubesadmin.firewall.DstHost('127.0.0.1')
  70. self.assertEqual(instance.type, 'dst4')
  71. self.assertEqual(instance.prefixlen, 32)
  72. self.assertEqual(str(instance), '127.0.0.1/32')
  73. self.assertEqual(instance.rule, 'dst4=127.0.0.1/32')
  74. def test_002_ipv4_prefixlen(self):
  75. with self.assertNotRaises(ValueError):
  76. instance = qubesadmin.firewall.DstHost('127.0.0.0', 8)
  77. self.assertEqual(instance.type, 'dst4')
  78. self.assertEqual(instance.prefixlen, 8)
  79. self.assertEqual(str(instance), '127.0.0.0/8')
  80. self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
  81. def test_003_ipv4_parse_prefixlen(self):
  82. with self.assertNotRaises(ValueError):
  83. instance = qubesadmin.firewall.DstHost('127.0.0.0/8')
  84. self.assertEqual(instance.type, 'dst4')
  85. self.assertEqual(instance.prefixlen, 8)
  86. self.assertEqual(str(instance), '127.0.0.0/8')
  87. self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
  88. def test_004_ipv4_invalid_prefix(self):
  89. with self.assertRaises(ValueError):
  90. qubesadmin.firewall.DstHost('127.0.0.0/33')
  91. with self.assertRaises(ValueError):
  92. qubesadmin.firewall.DstHost('127.0.0.0', 33)
  93. with self.assertRaises(ValueError):
  94. qubesadmin.firewall.DstHost('127.0.0.0/-1')
  95. def test_005_ipv4_reject_shortened(self):
  96. # not strictly required, but ppl are used to it
  97. with self.assertRaises(ValueError):
  98. qubesadmin.firewall.DstHost('127/8')
  99. def test_006_ipv4_invalid_addr(self):
  100. with self.assertRaises(ValueError):
  101. qubesadmin.firewall.DstHost('137.327.0.0/16')
  102. with self.assertRaises(ValueError):
  103. qubesadmin.firewall.DstHost('1.2.3.4.5/32')
  104. @unittest.expectedFailure
  105. def test_007_ipv4_invalid_network(self):
  106. with self.assertRaises(ValueError):
  107. qubesadmin.firewall.DstHost('127.0.0.1/32')
  108. def test_010_ipv6(self):
  109. with self.assertNotRaises(ValueError):
  110. instance = qubesadmin.firewall.DstHost('2001:abcd:efab::3')
  111. self.assertEqual(instance.type, 'dst6')
  112. self.assertEqual(instance.prefixlen, 128)
  113. self.assertEqual(str(instance), '2001:abcd:efab::3/128')
  114. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128')
  115. def test_011_ipv6_prefixlen(self):
  116. with self.assertNotRaises(ValueError):
  117. instance = qubesadmin.firewall.DstHost('2001:abcd:efab::', 64)
  118. self.assertEqual(instance.type, 'dst6')
  119. self.assertEqual(instance.prefixlen, 64)
  120. self.assertEqual(str(instance), '2001:abcd:efab::/64')
  121. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
  122. def test_012_ipv6_parse_prefixlen(self):
  123. with self.assertNotRaises(ValueError):
  124. instance = qubesadmin.firewall.DstHost('2001:abcd:efab::/64')
  125. self.assertEqual(instance.type, 'dst6')
  126. self.assertEqual(instance.prefixlen, 64)
  127. self.assertEqual(str(instance), '2001:abcd:efab::/64')
  128. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
  129. def test_013_ipv6_invalid_prefix(self):
  130. with self.assertRaises(ValueError):
  131. qubesadmin.firewall.DstHost('2001:abcd:efab::3/129')
  132. with self.assertRaises(ValueError):
  133. qubesadmin.firewall.DstHost('2001:abcd:efab::3', 129)
  134. with self.assertRaises(ValueError):
  135. qubesadmin.firewall.DstHost('2001:abcd:efab::3/-1')
  136. def test_014_ipv6_invalid_addr(self):
  137. with self.assertRaises(ValueError):
  138. qubesadmin.firewall.DstHost('2001:abcd:efab0123::3/128')
  139. with self.assertRaises(ValueError):
  140. qubesadmin.firewall.DstHost('2001:abcd:efab:3/128')
  141. with self.assertRaises(ValueError):
  142. qubesadmin.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128')
  143. with self.assertRaises(ValueError):
  144. qubesadmin.firewall.DstHost('2001:abcd:efgh::3/128')
  145. @unittest.expectedFailure
  146. def test_015_ipv6_invalid_network(self):
  147. with self.assertRaises(ValueError):
  148. qubesadmin.firewall.DstHost('2001:abcd:efab::3/64')
  149. @unittest.expectedFailure
  150. def test_020_invalid_hostname(self):
  151. with self.assertRaises(ValueError):
  152. qubesadmin.firewall.DstHost('www qubes-os.org')
  153. with self.assertRaises(ValueError):
  154. qubesadmin.firewall.DstHost('https://qubes-os.org')
  155. class TC_03_DstPorts(qubesadmin.tests.QubesTestCase):
  156. def test_000_single_str(self):
  157. with self.assertNotRaises(ValueError):
  158. instance = qubesadmin.firewall.DstPorts('80')
  159. self.assertEqual(str(instance), '80')
  160. self.assertEqual(instance.range, [80, 80])
  161. self.assertEqual(instance.rule, 'dstports=80-80')
  162. def test_001_single_int(self):
  163. with self.assertNotRaises(ValueError):
  164. instance = qubesadmin.firewall.DstPorts(80)
  165. self.assertEqual(str(instance), '80')
  166. self.assertEqual(instance.range, [80, 80])
  167. self.assertEqual(instance.rule, 'dstports=80-80')
  168. def test_002_range(self):
  169. with self.assertNotRaises(ValueError):
  170. instance = qubesadmin.firewall.DstPorts('80-90')
  171. self.assertEqual(str(instance), '80-90')
  172. self.assertEqual(instance.range, [80, 90])
  173. self.assertEqual(instance.rule, 'dstports=80-90')
  174. def test_003_invalid(self):
  175. with self.assertRaises(ValueError):
  176. qubesadmin.firewall.DstPorts('80-90-100')
  177. with self.assertRaises(ValueError):
  178. qubesadmin.firewall.DstPorts('abcdef')
  179. with self.assertRaises(ValueError):
  180. qubesadmin.firewall.DstPorts('80 90')
  181. with self.assertRaises(ValueError):
  182. qubesadmin.firewall.DstPorts('')
  183. def test_004_reversed_range(self):
  184. with self.assertRaises(ValueError):
  185. qubesadmin.firewall.DstPorts('100-20')
  186. def test_005_out_of_range(self):
  187. with self.assertRaises(ValueError):
  188. qubesadmin.firewall.DstPorts('1000000000000')
  189. with self.assertRaises(ValueError):
  190. qubesadmin.firewall.DstPorts(1000000000000)
  191. with self.assertRaises(ValueError):
  192. qubesadmin.firewall.DstPorts('1-1000000000000')
  193. class TC_04_IcmpType(qubesadmin.tests.QubesTestCase):
  194. def test_000_number(self):
  195. with self.assertNotRaises(ValueError):
  196. instance = qubesadmin.firewall.IcmpType(8)
  197. self.assertEqual(str(instance), '8')
  198. self.assertEqual(instance.rule, 'icmptype=8')
  199. def test_001_str(self):
  200. with self.assertNotRaises(ValueError):
  201. instance = qubesadmin.firewall.IcmpType('8')
  202. self.assertEqual(str(instance), '8')
  203. self.assertEqual(instance.rule, 'icmptype=8')
  204. def test_002_invalid(self):
  205. with self.assertRaises(ValueError):
  206. qubesadmin.firewall.IcmpType(600)
  207. with self.assertRaises(ValueError):
  208. qubesadmin.firewall.IcmpType(-1)
  209. with self.assertRaises(ValueError):
  210. qubesadmin.firewall.IcmpType('abcde')
  211. with self.assertRaises(ValueError):
  212. qubesadmin.firewall.IcmpType('')
  213. class TC_05_SpecialTarget(qubesadmin.tests.QubesTestCase):
  214. def test_000_allowed_values(self):
  215. with self.assertNotRaises(ValueError):
  216. instance = qubesadmin.firewall.SpecialTarget('dns')
  217. self.assertEqual(
  218. set(instance.allowed_values), {'dns'})
  219. def test_001_rule(self):
  220. instance = qubesadmin.firewall.SpecialTarget('dns')
  221. self.assertEqual(instance.rule, 'specialtarget=dns')
  222. class TC_06_Expire(qubesadmin.tests.QubesTestCase):
  223. def test_000_number(self):
  224. with self.assertNotRaises(ValueError):
  225. instance = qubesadmin.firewall.Expire(1463292452)
  226. self.assertEqual(str(instance), '1463292452')
  227. self.assertEqual(instance.datetime,
  228. datetime.datetime(2016, 5, 15, 6, 7, 32))
  229. self.assertEqual(instance.rule, 'expire=1463292452')
  230. def test_001_str(self):
  231. with self.assertNotRaises(ValueError):
  232. instance = qubesadmin.firewall.Expire('1463292452')
  233. self.assertEqual(str(instance), '1463292452')
  234. self.assertEqual(instance.datetime,
  235. datetime.datetime(2016, 5, 15, 6, 7, 32))
  236. self.assertEqual(instance.rule, 'expire=1463292452')
  237. def test_002_invalid(self):
  238. with self.assertRaises(ValueError):
  239. qubesadmin.firewall.Expire('abcdef')
  240. with self.assertRaises(ValueError):
  241. qubesadmin.firewall.Expire('')
  242. def test_003_expired(self):
  243. with self.assertNotRaises(ValueError):
  244. instance = qubesadmin.firewall.Expire('1463292452')
  245. self.assertTrue(instance.expired)
  246. with self.assertNotRaises(ValueError):
  247. instance = qubesadmin.firewall.Expire('1583292452')
  248. self.assertFalse(instance.expired)
  249. class TC_07_Comment(qubesadmin.tests.QubesTestCase):
  250. def test_000_str(self):
  251. with self.assertNotRaises(ValueError):
  252. instance = qubesadmin.firewall.Comment('Some comment')
  253. self.assertEqual(str(instance), 'Some comment')
  254. self.assertEqual(instance.rule, 'comment=Some comment')
  255. class TC_10_Rule(qubesadmin.tests.QubesTestCase):
  256. def test_000_simple(self):
  257. with self.assertNotRaises(ValueError):
  258. rule = qubesadmin.firewall.Rule(None, action='accept', proto='icmp')
  259. self.assertEqual(rule.rule, 'action=accept proto=icmp')
  260. self.assertIsNone(rule.dsthost)
  261. self.assertIsNone(rule.dstports)
  262. self.assertIsNone(rule.icmptype)
  263. self.assertIsNone(rule.comment)
  264. self.assertIsNone(rule.expire)
  265. self.assertEqual(str(rule.action), 'accept')
  266. self.assertEqual(str(rule.proto), 'icmp')
  267. def test_001_expire(self):
  268. with self.assertNotRaises(ValueError):
  269. rule = qubesadmin.firewall.Rule(None, action='accept', proto='icmp',
  270. expire='1463292452')
  271. self.assertEqual(rule.rule,
  272. 'action=accept proto=icmp expire=1463292452')
  273. def test_002_dstports(self):
  274. with self.assertNotRaises(ValueError):
  275. rule = qubesadmin.firewall.Rule(None, action='accept', proto='tcp',
  276. dstports=80)
  277. self.assertEqual(str(rule.dstports), '80')
  278. def test_003_reject_invalid(self):
  279. with self.assertRaises((ValueError, AssertionError)):
  280. # missing action
  281. qubesadmin.firewall.Rule(None, proto='icmp')
  282. with self.assertRaises(ValueError):
  283. # not proto=tcp or proto=udp for dstports
  284. qubesadmin.firewall.Rule(None, action='accept', proto='icmp',
  285. dstports=80)
  286. with self.assertRaises(ValueError):
  287. # not proto=tcp or proto=udp for dstports
  288. qubesadmin.firewall.Rule(None, action='accept', dstports=80)
  289. with self.assertRaises(ValueError):
  290. # not proto=icmp for icmptype
  291. qubesadmin.firewall.Rule(None, action='accept', proto='tcp',
  292. icmptype=8)
  293. with self.assertRaises(ValueError):
  294. # not proto=icmp for icmptype
  295. qubesadmin.firewall.Rule(None, action='accept', icmptype=8)
  296. def test_004_proto_change(self):
  297. rule = qubesadmin.firewall.Rule(None, action='accept', proto='tcp')
  298. with self.assertNotRaises(ValueError):
  299. rule.proto = 'udp'
  300. self.assertEqual(rule.rule, 'action=accept proto=udp')
  301. rule = qubesadmin.firewall.Rule(None, action='accept', proto='tcp',
  302. dstports=80)
  303. with self.assertNotRaises(ValueError):
  304. rule.proto = 'udp'
  305. self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
  306. rule = qubesadmin.firewall.Rule(None, action='accept')
  307. with self.assertNotRaises(ValueError):
  308. rule.proto = 'udp'
  309. self.assertEqual(rule.rule, 'action=accept proto=udp')
  310. with self.assertNotRaises(ValueError):
  311. rule.dstports = 80
  312. self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
  313. with self.assertNotRaises(ValueError):
  314. rule.proto = 'icmp'
  315. self.assertEqual(rule.rule, 'action=accept proto=icmp')
  316. self.assertIsNone(rule.dstports)
  317. rule.icmptype = 8
  318. self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8')
  319. with self.assertNotRaises(ValueError):
  320. rule.proto = None
  321. self.assertEqual(rule.rule, 'action=accept')
  322. self.assertIsNone(rule.dstports)
  323. def test_005_parse_str(self):
  324. rule_txt = \
  325. 'action=accept dst4=192.168.0.0/24 proto=tcp dstports=443'
  326. with self.assertNotRaises(ValueError):
  327. rule = qubesadmin.firewall.Rule(rule_txt)
  328. self.assertEqual(rule.dsthost, '192.168.0.0/24')
  329. self.assertEqual(rule.proto, 'tcp')
  330. self.assertEqual(rule.dstports, '443')
  331. self.assertIsNone(rule.expire)
  332. self.assertIsNone(rule.comment)
  333. def test_006_parse_str_comment(self):
  334. rule_txt = \
  335. 'action=accept dsthost=qubes-os.org comment=Some comment'
  336. with self.assertNotRaises(ValueError):
  337. rule = qubesadmin.firewall.Rule(rule_txt)
  338. self.assertEqual(rule.dsthost, 'qubes-os.org')
  339. self.assertIsNone(rule.proto)
  340. self.assertIsNone(rule.dstports)
  341. self.assertIsNone(rule.expire)
  342. self.assertEqual(rule.comment, 'Some comment')
  343. class TC_11_Firewall(qubesadmin.tests.QubesTestCase):
  344. def setUp(self):
  345. super(TC_11_Firewall, self).setUp()
  346. self.app.expected_calls[('dom0', 'mgmt.vm.List', None, None)] = \
  347. b'0\0test-vm class=AppVM state=Halted\n'
  348. self.vm = self.app.domains['test-vm']
  349. def test_000_policy_get(self):
  350. self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.GetPolicy',
  351. None, None)] = b'0\0accept'
  352. policy = self.vm.firewall.policy
  353. self.assertEqual(policy, 'accept')
  354. self.assertEqual(policy, qubesadmin.firewall.Action('accept'))
  355. self.assertAllCalled()
  356. def test_001_policy_set(self):
  357. self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.SetPolicy',
  358. None, b'drop')] = b'0\0'
  359. self.vm.firewall.policy = 'drop'
  360. self.assertAllCalled()
  361. def test_002_policy_set2(self):
  362. self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.SetPolicy',
  363. None, b'drop')] = b'0\0'
  364. self.vm.firewall.policy = qubesadmin.firewall.Action('drop')
  365. self.assertAllCalled()
  366. def test_010_load_rules(self):
  367. self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get',
  368. None, None)] = \
  369. b'0\0action=accept dsthost=qubes-os.org\n' \
  370. b'action=drop proto=icmp\n'
  371. rules = self.vm.firewall.rules
  372. self.assertListEqual(rules, [
  373. qubesadmin.firewall.Rule('action=accept dsthost=qubes-os.org'),
  374. qubesadmin.firewall.Rule('action=drop proto=icmp'),
  375. ])
  376. # check caching
  377. del self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get',
  378. None, None)]
  379. rules2 = self.vm.firewall.rules
  380. self.assertEqual(rules, rules2)
  381. # then force reload
  382. self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Get',
  383. None, None)] = \
  384. b'0\0action=accept dsthost=qubes-os.org proto=tcp dstports=443\n'
  385. self.vm.firewall.load_rules()
  386. rules3 = self.vm.firewall.rules
  387. self.assertListEqual(rules3, [
  388. qubesadmin.firewall.Rule(
  389. 'action=accept dsthost=qubes-os.org proto=tcp dstports=443')])
  390. self.assertAllCalled()
  391. def test_020_set_rules(self):
  392. rules_txt = (
  393. 'action=accept proto=tcp dsthost=qubes-os.org dstports=443-443',
  394. 'action=accept dsthost=example.com',
  395. )
  396. rules = [qubesadmin.firewall.Rule(rule) for rule in rules_txt]
  397. self.app.expected_calls[('test-vm', 'mgmt.vm.firewall.Set', None,
  398. ''.join(rule + '\n' for rule in rules_txt).encode('ascii'))] = b'0\0'
  399. self.vm.firewall.rules = rules
  400. self.assertAllCalled()