firewall.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. #!/usr/bin/python2 -O
  2. # vim: fileencoding=utf-8
  3. #
  4. # The Qubes OS Project, https://www.qubes-os.org/
  5. #
  6. # Copyright (C) 2016
  7. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  8. #
  9. # This program is free software; you can redistribute it and/or modify
  10. # it under the terms of the GNU General Public License as published by
  11. # the Free Software Foundation; either version 2 of the License, or
  12. # (at your option) any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License along
  20. # with this program; if not, write to the Free Software Foundation, Inc.,
  21. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  22. #
  23. import datetime
  24. import os
  25. import lxml.etree
  26. import unittest
  27. import qubes.firewall
  28. import qubes.tests
  29. class TestOption(qubes.firewall.RuleChoice):
  30. opt1 = 'opt1'
  31. opt2 = 'opt2'
  32. another = 'another'
  33. class TestVMM(object):
  34. def __init__(self):
  35. self.offline_mode = True
  36. class TestApp(object):
  37. def __init__(self):
  38. self.vmm = TestVMM()
  39. class TestVM(object):
  40. def __init__(self):
  41. self.firewall_conf = 'test-firewall.xml'
  42. self.dir_path = '/tmp'
  43. self.app = TestApp()
  44. def fire_event(self, event):
  45. pass
  46. # noinspection PyPep8Naming
  47. class TC_00_RuleChoice(qubes.tests.QubesTestCase):
  48. def test_000_accept_allowed(self):
  49. with self.assertNotRaises(ValueError):
  50. TestOption('opt1')
  51. TestOption('opt2')
  52. TestOption('another')
  53. def test_001_value_list(self):
  54. instance = TestOption('opt1')
  55. self.assertEqual(
  56. set(instance.allowed_values), {'opt1', 'opt2', 'another'})
  57. def test_010_reject_others(self):
  58. self.assertRaises(ValueError, lambda: TestOption('invalid'))
  59. class TC_01_Action(qubes.tests.QubesTestCase):
  60. def test_000_allowed_values(self):
  61. with self.assertNotRaises(ValueError):
  62. instance = qubes.firewall.Action('accept')
  63. self.assertEqual(
  64. set(instance.allowed_values), {'accept', 'drop'})
  65. def test_001_rule(self):
  66. instance = qubes.firewall.Action('accept')
  67. self.assertEqual(instance.rule, 'action=accept')
  68. # noinspection PyPep8Naming
  69. class TC_02_Proto(qubes.tests.QubesTestCase):
  70. def test_000_allowed_values(self):
  71. with self.assertNotRaises(ValueError):
  72. instance = qubes.firewall.Proto('tcp')
  73. self.assertEqual(
  74. set(instance.allowed_values), {'tcp', 'udp', 'icmp'})
  75. def test_001_rule(self):
  76. instance = qubes.firewall.Proto('tcp')
  77. self.assertEqual(instance.rule, 'proto=tcp')
  78. # noinspection PyPep8Naming
  79. class TC_02_DstHost(qubes.tests.QubesTestCase):
  80. def test_000_hostname(self):
  81. with self.assertNotRaises(ValueError):
  82. instance = qubes.firewall.DstHost('qubes-os.org')
  83. self.assertEqual(instance.type, 'dsthost')
  84. def test_001_ipv4(self):
  85. with self.assertNotRaises(ValueError):
  86. instance = qubes.firewall.DstHost('127.0.0.1')
  87. self.assertEqual(instance.type, 'dst4')
  88. self.assertEqual(instance.prefixlen, 32)
  89. self.assertEqual(str(instance), '127.0.0.1/32')
  90. self.assertEqual(instance.rule, 'dst4=127.0.0.1/32')
  91. def test_002_ipv4_prefixlen(self):
  92. with self.assertNotRaises(ValueError):
  93. instance = qubes.firewall.DstHost('127.0.0.0', 8)
  94. self.assertEqual(instance.type, 'dst4')
  95. self.assertEqual(instance.prefixlen, 8)
  96. self.assertEqual(str(instance), '127.0.0.0/8')
  97. self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
  98. def test_003_ipv4_parse_prefixlen(self):
  99. with self.assertNotRaises(ValueError):
  100. instance = qubes.firewall.DstHost('127.0.0.0/8')
  101. self.assertEqual(instance.type, 'dst4')
  102. self.assertEqual(instance.prefixlen, 8)
  103. self.assertEqual(str(instance), '127.0.0.0/8')
  104. self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
  105. def test_004_ipv4_invalid_prefix(self):
  106. with self.assertRaises(ValueError):
  107. qubes.firewall.DstHost('127.0.0.0/33')
  108. with self.assertRaises(ValueError):
  109. qubes.firewall.DstHost('127.0.0.0', 33)
  110. with self.assertRaises(ValueError):
  111. qubes.firewall.DstHost('127.0.0.0/-1')
  112. def test_005_ipv4_reject_shortened(self):
  113. # not strictly required, but ppl are used to it
  114. with self.assertRaises(ValueError):
  115. qubes.firewall.DstHost('127/8')
  116. def test_006_ipv4_invalid_addr(self):
  117. with self.assertRaises(ValueError):
  118. qubes.firewall.DstHost('137.327.0.0/16')
  119. with self.assertRaises(ValueError):
  120. qubes.firewall.DstHost('1.2.3.4.5/32')
  121. @unittest.expectedFailure
  122. def test_007_ipv4_invalid_network(self):
  123. with self.assertRaises(ValueError):
  124. qubes.firewall.DstHost('127.0.0.1/32')
  125. def test_010_ipv6(self):
  126. with self.assertNotRaises(ValueError):
  127. instance = qubes.firewall.DstHost('2001:abcd:efab::3')
  128. self.assertEqual(instance.type, 'dst6')
  129. self.assertEqual(instance.prefixlen, 128)
  130. self.assertEqual(str(instance), '2001:abcd:efab::3/128')
  131. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128')
  132. def test_011_ipv6_prefixlen(self):
  133. with self.assertNotRaises(ValueError):
  134. instance = qubes.firewall.DstHost('2001:abcd:efab::', 64)
  135. self.assertEqual(instance.type, 'dst6')
  136. self.assertEqual(instance.prefixlen, 64)
  137. self.assertEqual(str(instance), '2001:abcd:efab::/64')
  138. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
  139. def test_012_ipv6_parse_prefixlen(self):
  140. with self.assertNotRaises(ValueError):
  141. instance = qubes.firewall.DstHost('2001:abcd:efab::/64')
  142. self.assertEqual(instance.type, 'dst6')
  143. self.assertEqual(instance.prefixlen, 64)
  144. self.assertEqual(str(instance), '2001:abcd:efab::/64')
  145. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
  146. def test_013_ipv6_invalid_prefix(self):
  147. with self.assertRaises(ValueError):
  148. qubes.firewall.DstHost('2001:abcd:efab::3/129')
  149. with self.assertRaises(ValueError):
  150. qubes.firewall.DstHost('2001:abcd:efab::3', 129)
  151. with self.assertRaises(ValueError):
  152. qubes.firewall.DstHost('2001:abcd:efab::3/-1')
  153. def test_014_ipv6_invalid_addr(self):
  154. with self.assertRaises(ValueError):
  155. qubes.firewall.DstHost('2001:abcd:efab0123::3/128')
  156. with self.assertRaises(ValueError):
  157. qubes.firewall.DstHost('2001:abcd:efab:3/128')
  158. with self.assertRaises(ValueError):
  159. qubes.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128')
  160. with self.assertRaises(ValueError):
  161. qubes.firewall.DstHost('2001:abcd:efgh::3/128')
  162. @unittest.expectedFailure
  163. def test_015_ipv6_invalid_network(self):
  164. with self.assertRaises(ValueError):
  165. qubes.firewall.DstHost('2001:abcd:efab::3/64')
  166. @unittest.expectedFailure
  167. def test_020_invalid_hostname(self):
  168. with self.assertRaises(ValueError):
  169. qubes.firewall.DstHost('www qubes-os.org')
  170. with self.assertRaises(ValueError):
  171. qubes.firewall.DstHost('https://qubes-os.org')
  172. class TC_03_DstPorts(qubes.tests.QubesTestCase):
  173. def test_000_single_str(self):
  174. with self.assertNotRaises(ValueError):
  175. instance = qubes.firewall.DstPorts('80')
  176. self.assertEqual(str(instance), '80')
  177. self.assertEqual(instance.range, [80, 80])
  178. self.assertEqual(instance.rule, 'dstports=80-80')
  179. def test_001_single_int(self):
  180. with self.assertNotRaises(ValueError):
  181. instance = qubes.firewall.DstPorts(80)
  182. self.assertEqual(str(instance), '80')
  183. self.assertEqual(instance.range, [80, 80])
  184. self.assertEqual(instance.rule, 'dstports=80-80')
  185. def test_002_range(self):
  186. with self.assertNotRaises(ValueError):
  187. instance = qubes.firewall.DstPorts('80-90')
  188. self.assertEqual(str(instance), '80-90')
  189. self.assertEqual(instance.range, [80, 90])
  190. self.assertEqual(instance.rule, 'dstports=80-90')
  191. def test_003_invalid(self):
  192. with self.assertRaises(ValueError):
  193. qubes.firewall.DstPorts('80-90-100')
  194. with self.assertRaises(ValueError):
  195. qubes.firewall.DstPorts('abcdef')
  196. with self.assertRaises(ValueError):
  197. qubes.firewall.DstPorts('80 90')
  198. with self.assertRaises(ValueError):
  199. qubes.firewall.DstPorts('')
  200. def test_004_reversed_range(self):
  201. with self.assertRaises(ValueError):
  202. qubes.firewall.DstPorts('100-20')
  203. def test_005_out_of_range(self):
  204. with self.assertRaises(ValueError):
  205. qubes.firewall.DstPorts('1000000000000')
  206. with self.assertRaises(ValueError):
  207. qubes.firewall.DstPorts(1000000000000)
  208. with self.assertRaises(ValueError):
  209. qubes.firewall.DstPorts('1-1000000000000')
  210. class TC_04_IcmpType(qubes.tests.QubesTestCase):
  211. def test_000_number(self):
  212. with self.assertNotRaises(ValueError):
  213. instance = qubes.firewall.IcmpType(8)
  214. self.assertEqual(str(instance), '8')
  215. self.assertEqual(instance.rule, 'icmptype=8')
  216. def test_001_str(self):
  217. with self.assertNotRaises(ValueError):
  218. instance = qubes.firewall.IcmpType('8')
  219. self.assertEqual(str(instance), '8')
  220. self.assertEqual(instance.rule, 'icmptype=8')
  221. def test_002_invalid(self):
  222. with self.assertRaises(ValueError):
  223. qubes.firewall.IcmpType(600)
  224. with self.assertRaises(ValueError):
  225. qubes.firewall.IcmpType(-1)
  226. with self.assertRaises(ValueError):
  227. qubes.firewall.IcmpType('abcde')
  228. with self.assertRaises(ValueError):
  229. qubes.firewall.IcmpType('')
  230. class TC_05_SpecialTarget(qubes.tests.QubesTestCase):
  231. def test_000_allowed_values(self):
  232. with self.assertNotRaises(ValueError):
  233. instance = qubes.firewall.SpecialTarget('dns')
  234. self.assertEqual(
  235. set(instance.allowed_values), {'dns'})
  236. def test_001_rule(self):
  237. instance = qubes.firewall.SpecialTarget('dns')
  238. self.assertEqual(instance.rule, 'specialtarget=dns')
  239. class TC_06_Expire(qubes.tests.QubesTestCase):
  240. def test_000_number(self):
  241. with self.assertNotRaises(ValueError):
  242. instance = qubes.firewall.Expire(1463292452)
  243. self.assertEqual(str(instance), '1463292452')
  244. self.assertEqual(instance.datetime,
  245. datetime.datetime(2016, 5, 15, 6, 7, 32))
  246. self.assertIsNone(instance.rule)
  247. def test_001_str(self):
  248. with self.assertNotRaises(ValueError):
  249. instance = qubes.firewall.Expire('1463292452')
  250. self.assertEqual(str(instance), '1463292452')
  251. self.assertEqual(instance.datetime,
  252. datetime.datetime(2016, 5, 15, 6, 7, 32))
  253. self.assertIsNone(instance.rule)
  254. def test_002_invalid(self):
  255. with self.assertRaises(ValueError):
  256. qubes.firewall.Expire('abcdef')
  257. with self.assertRaises(ValueError):
  258. qubes.firewall.Expire('')
  259. def test_003_expired(self):
  260. with self.assertNotRaises(ValueError):
  261. instance = qubes.firewall.Expire('1463292452')
  262. self.assertTrue(instance.expired)
  263. with self.assertNotRaises(ValueError):
  264. instance = qubes.firewall.Expire('1583292452')
  265. self.assertFalse(instance.expired)
  266. class TC_07_Comment(qubes.tests.QubesTestCase):
  267. def test_000_str(self):
  268. with self.assertNotRaises(ValueError):
  269. instance = qubes.firewall.Comment('Some comment')
  270. self.assertEqual(str(instance), 'Some comment')
  271. self.assertIsNone(instance.rule)
  272. class TC_08_Rule(qubes.tests.QubesTestCase):
  273. def test_000_simple(self):
  274. with self.assertNotRaises(ValueError):
  275. rule = qubes.firewall.Rule(None, action='accept', proto='icmp')
  276. self.assertEqual(rule.rule, 'action=accept proto=icmp')
  277. self.assertIsNone(rule.dsthost)
  278. self.assertIsNone(rule.dstports)
  279. self.assertIsNone(rule.icmptype)
  280. self.assertIsNone(rule.comment)
  281. self.assertIsNone(rule.expire)
  282. self.assertEqual(str(rule.action), 'accept')
  283. self.assertEqual(str(rule.proto), 'icmp')
  284. def test_001_expire(self):
  285. with self.assertNotRaises(ValueError):
  286. rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
  287. expire='1463292452')
  288. self.assertIsNone(rule.rule)
  289. with self.assertNotRaises(ValueError):
  290. rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
  291. expire='1663292452')
  292. self.assertIsNotNone(rule.rule)
  293. def test_002_dstports(self):
  294. with self.assertNotRaises(ValueError):
  295. rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
  296. dstports=80)
  297. self.assertEqual(str(rule.dstports), '80')
  298. with self.assertNotRaises(ValueError):
  299. rule = qubes.firewall.Rule(None, action='accept', proto='udp',
  300. dstports=80)
  301. self.assertEqual(str(rule.dstports), '80')
  302. def test_003_reject_invalid(self):
  303. with self.assertRaises((ValueError, AssertionError)):
  304. # missing action
  305. qubes.firewall.Rule(None, proto='icmp')
  306. with self.assertRaises(ValueError):
  307. # not proto=tcp or proto=udp for dstports
  308. qubes.firewall.Rule(None, action='accept', proto='icmp',
  309. dstports=80)
  310. with self.assertRaises(ValueError):
  311. # not proto=tcp or proto=udp for dstports
  312. qubes.firewall.Rule(None, action='accept', dstports=80)
  313. with self.assertRaises(ValueError):
  314. # not proto=icmp for icmptype
  315. qubes.firewall.Rule(None, action='accept', proto='tcp',
  316. icmptype=8)
  317. with self.assertRaises(ValueError):
  318. # not proto=icmp for icmptype
  319. qubes.firewall.Rule(None, action='accept', icmptype=8)
  320. def test_004_proto_change(self):
  321. rule = qubes.firewall.Rule(None, action='accept', proto='tcp')
  322. with self.assertNotRaises(ValueError):
  323. rule.proto = 'udp'
  324. self.assertEqual(rule.rule, 'action=accept proto=udp')
  325. rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
  326. dstports=80)
  327. with self.assertNotRaises(ValueError):
  328. rule.proto = 'udp'
  329. self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
  330. rule = qubes.firewall.Rule(None, action='accept')
  331. with self.assertNotRaises(ValueError):
  332. rule.proto = 'udp'
  333. self.assertEqual(rule.rule, 'action=accept proto=udp')
  334. with self.assertNotRaises(ValueError):
  335. rule.dstports = 80
  336. self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
  337. with self.assertNotRaises(ValueError):
  338. rule.proto = 'icmp'
  339. self.assertEqual(rule.rule, 'action=accept proto=icmp')
  340. self.assertIsNone(rule.dstports)
  341. rule.icmptype = 8
  342. self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8')
  343. with self.assertNotRaises(ValueError):
  344. rule.proto = qubes.property.DEFAULT
  345. self.assertEqual(rule.rule, 'action=accept')
  346. self.assertIsNone(rule.dstports)
  347. def test_005_from_xml_v1(self):
  348. xml_txt = \
  349. '<rule address="192.168.0.0" proto="tcp" netmask="24" port="443"/>'
  350. with self.assertNotRaises(ValueError):
  351. rule = qubes.firewall.Rule.from_xml_v1(
  352. lxml.etree.fromstring(xml_txt), 'accept')
  353. self.assertEqual(rule.dsthost, '192.168.0.0/24')
  354. self.assertEqual(rule.proto, 'tcp')
  355. self.assertEqual(rule.dstports, '443')
  356. self.assertIsNone(rule.expire)
  357. self.assertIsNone(rule.comment)
  358. def test_006_from_xml_v1(self):
  359. xml_txt = \
  360. '<rule address="qubes-os.org" proto="tcp" ' \
  361. 'port="443" toport="1024"/>'
  362. with self.assertNotRaises(ValueError):
  363. rule = qubes.firewall.Rule.from_xml_v1(
  364. lxml.etree.fromstring(xml_txt), 'drop')
  365. self.assertEqual(rule.dsthost, 'qubes-os.org')
  366. self.assertEqual(rule.proto, 'tcp')
  367. self.assertEqual(rule.dstports, '443-1024')
  368. self.assertEqual(rule.action, 'drop')
  369. self.assertIsNone(rule.expire)
  370. self.assertIsNone(rule.comment)
  371. def test_007_from_xml_v1(self):
  372. xml_txt = \
  373. '<rule address="192.168.0.0" netmask="24" expire="1463292452"/>'
  374. with self.assertNotRaises(ValueError):
  375. rule = qubes.firewall.Rule.from_xml_v1(
  376. lxml.etree.fromstring(xml_txt), 'accept')
  377. self.assertEqual(rule.dsthost, '192.168.0.0/24')
  378. self.assertEqual(rule.expire, '1463292452')
  379. self.assertEqual(rule.action, 'accept')
  380. self.assertIsNone(rule.proto)
  381. self.assertIsNone(rule.dstports)
  382. class TC_10_Firewall(qubes.tests.QubesTestCase):
  383. def setUp(self):
  384. super(TC_10_Firewall, self).setUp()
  385. self.vm = TestVM()
  386. firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
  387. if os.path.exists(firewall_path):
  388. os.unlink(firewall_path)
  389. def tearDown(self):
  390. firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
  391. if os.path.exists(firewall_path):
  392. os.unlink(firewall_path)
  393. return super(TC_10_Firewall, self).tearDown()
  394. def test_000_defaults(self):
  395. fw = qubes.firewall.Firewall(self.vm, False)
  396. fw.load_defaults()
  397. self.assertEqual(fw.policy, 'accept')
  398. self.assertEqual(fw.rules, [])
  399. def test_001_save_load_empty(self):
  400. fw = qubes.firewall.Firewall(self.vm, True)
  401. self.assertEqual(fw.policy, 'accept')
  402. self.assertEqual(fw.rules, [])
  403. fw.save()
  404. fw.load()
  405. self.assertEqual(fw.policy, 'accept')
  406. self.assertEqual(fw.rules, [])
  407. def test_002_save_load_rules(self):
  408. fw = qubes.firewall.Firewall(self.vm, True)
  409. rules = [
  410. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  411. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  412. qubes.firewall.Rule(None, action='accept', proto='udp',
  413. dstports=67),
  414. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  415. ]
  416. fw.rules.extend(rules)
  417. fw.policy = qubes.firewall.Action.drop
  418. fw.save()
  419. self.assertTrue(os.path.exists(os.path.join(
  420. self.vm.dir_path, self.vm.firewall_conf)))
  421. fw = qubes.firewall.Firewall(TestVM(), True)
  422. self.assertEqual(fw.policy, qubes.firewall.Action.drop)
  423. self.assertEqual(fw.rules, rules)
  424. def test_003_load_v1(self):
  425. xml_txt = """<QubesFirewallRules dns="allow" icmp="allow"
  426. policy="deny" yumProxy="allow">
  427. <rule address="192.168.0.0" proto="tcp" netmask="24" port="80"/>
  428. <rule address="qubes-os.org" proto="tcp" port="443"/>
  429. </QubesFirewallRules>
  430. """
  431. with open(os.path.join('/tmp', self.vm.firewall_conf), 'w') as f:
  432. f.write(xml_txt)
  433. with self.assertNotRaises(ValueError):
  434. fw = qubes.firewall.Firewall(self.vm)
  435. self.assertEqual(str(fw.policy), 'drop')
  436. rules = [
  437. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  438. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  439. qubes.firewall.Rule(None, action='accept', proto='tcp',
  440. dsthost='192.168.0.0/24', dstports='80'),
  441. qubes.firewall.Rule(None, action='accept', proto='tcp',
  442. dsthost='qubes-os.org', dstports='443')
  443. ]
  444. self.assertEqual(fw.rules, rules)
  445. def test_004_save_skip_expired(self):
  446. fw = qubes.firewall.Firewall(self.vm, True)
  447. rules = [
  448. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  449. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  450. qubes.firewall.Rule(None, action='accept', proto='udp',
  451. dstports=67, expire=1373300257),
  452. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  453. ]
  454. fw.rules.extend(rules)
  455. fw.policy = qubes.firewall.Action.drop
  456. fw.save()
  457. rules.pop(2)
  458. fw = qubes.firewall.Firewall(self.vm, True)
  459. self.assertEqual(fw.rules, rules)
  460. def test_005_qdb_entries(self):
  461. fw = qubes.firewall.Firewall(self.vm, True)
  462. rules = [
  463. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  464. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  465. qubes.firewall.Rule(None, action='accept', proto='udp'),
  466. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  467. ]
  468. fw.rules.extend(rules)
  469. fw.policy = qubes.firewall.Action.drop
  470. expected_qdb_entries = {
  471. 'policy': 'drop',
  472. '0000': 'action=drop proto=icmp',
  473. '0001': 'action=drop proto=tcp dstports=80-80',
  474. '0002': 'action=accept proto=udp',
  475. '0003': 'action=accept specialtarget=dns',
  476. }
  477. self.assertEqual(fw.qdb_entries(), expected_qdb_entries)