firewall.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2016
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. #
  7. # This library is free software; you can redistribute it and/or
  8. # modify it under the terms of the GNU Lesser General Public
  9. # License as published by the Free Software Foundation; either
  10. # version 2.1 of the License, or (at your option) any later version.
  11. #
  12. # This library is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. # Lesser General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU Lesser General Public
  18. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  19. #
  20. import datetime
  21. import os
  22. import asyncio
  23. import lxml.etree
  24. import unittest
  25. import qubes.firewall
  26. import qubes.tests
  27. class TestOption(qubes.firewall.RuleChoice):
  28. opt1 = 'opt1'
  29. opt2 = 'opt2'
  30. another = 'another'
  31. class TestVMM(object):
  32. def __init__(self):
  33. self.offline_mode = True
  34. class TestApp(object):
  35. def __init__(self):
  36. self.vmm = TestVMM()
  37. class TestVM(object):
  38. def __init__(self):
  39. self.firewall_conf = 'test-firewall.xml'
  40. self.dir_path = '/tmp'
  41. self.app = TestApp()
  42. def fire_event(self, event):
  43. pass
  44. # noinspection PyPep8Naming
  45. class TC_00_RuleChoice(qubes.tests.QubesTestCase):
  46. def test_000_accept_allowed(self):
  47. with self.assertNotRaises(ValueError):
  48. TestOption('opt1')
  49. TestOption('opt2')
  50. TestOption('another')
  51. def test_001_value_list(self):
  52. instance = TestOption('opt1')
  53. self.assertEqual(
  54. set(instance.allowed_values), {'opt1', 'opt2', 'another'})
  55. def test_010_reject_others(self):
  56. self.assertRaises(ValueError, lambda: TestOption('invalid'))
  57. class TC_01_Action(qubes.tests.QubesTestCase):
  58. def test_000_allowed_values(self):
  59. with self.assertNotRaises(ValueError):
  60. instance = qubes.firewall.Action('accept')
  61. self.assertEqual(
  62. set(instance.allowed_values), {'accept', 'drop'})
  63. def test_001_rule(self):
  64. instance = qubes.firewall.Action('accept')
  65. self.assertEqual(instance.rule, 'action=accept')
  66. self.assertEqual(instance.api_rule, 'action=accept')
  67. # noinspection PyPep8Naming
  68. class TC_02_Proto(qubes.tests.QubesTestCase):
  69. def test_000_allowed_values(self):
  70. with self.assertNotRaises(ValueError):
  71. instance = qubes.firewall.Proto('tcp')
  72. self.assertEqual(
  73. set(instance.allowed_values), {'tcp', 'udp', 'icmp'})
  74. def test_001_rule(self):
  75. instance = qubes.firewall.Proto('tcp')
  76. self.assertEqual(instance.rule, 'proto=tcp')
  77. self.assertEqual(instance.api_rule, 'proto=tcp')
  78. # noinspection PyPep8Naming
  79. class TC_02_DstHost(qubes.tests.QubesTestCase):
  80. def test_000_hostname(self):
  81. with self.assertNotRaises(ValueError):
  82. instance = qubes.firewall.DstHost('qubes-os.org')
  83. self.assertEqual(instance.type, 'dsthost')
  84. def test_001_ipv4(self):
  85. with self.assertNotRaises(ValueError):
  86. instance = qubes.firewall.DstHost('127.0.0.1')
  87. self.assertEqual(instance.type, 'dst4')
  88. self.assertEqual(instance.prefixlen, 32)
  89. self.assertEqual(str(instance), '127.0.0.1/32')
  90. self.assertEqual(instance.rule, 'dst4=127.0.0.1/32')
  91. def test_002_ipv4_prefixlen(self):
  92. with self.assertNotRaises(ValueError):
  93. instance = qubes.firewall.DstHost('127.0.0.0', 8)
  94. self.assertEqual(instance.type, 'dst4')
  95. self.assertEqual(instance.prefixlen, 8)
  96. self.assertEqual(str(instance), '127.0.0.0/8')
  97. self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
  98. def test_003_ipv4_parse_prefixlen(self):
  99. with self.assertNotRaises(ValueError):
  100. instance = qubes.firewall.DstHost('127.0.0.0/8')
  101. self.assertEqual(instance.type, 'dst4')
  102. self.assertEqual(instance.prefixlen, 8)
  103. self.assertEqual(str(instance), '127.0.0.0/8')
  104. self.assertEqual(instance.rule, 'dst4=127.0.0.0/8')
  105. def test_004_ipv4_invalid_prefix(self):
  106. with self.assertRaises(ValueError):
  107. qubes.firewall.DstHost('127.0.0.0/33')
  108. with self.assertRaises(ValueError):
  109. qubes.firewall.DstHost('127.0.0.0', 33)
  110. with self.assertRaises(ValueError):
  111. qubes.firewall.DstHost('127.0.0.0/-1')
  112. def test_005_ipv4_reject_shortened(self):
  113. # not strictly required, but ppl are used to it
  114. with self.assertRaises(ValueError):
  115. qubes.firewall.DstHost('127/8')
  116. def test_006_ipv4_invalid_addr(self):
  117. with self.assertRaises(ValueError):
  118. qubes.firewall.DstHost('137.327.0.0/16')
  119. with self.assertRaises(ValueError):
  120. qubes.firewall.DstHost('1.2.3.4.5/32')
  121. @unittest.expectedFailure
  122. def test_007_ipv4_invalid_network(self):
  123. with self.assertRaises(ValueError):
  124. qubes.firewall.DstHost('127.0.0.1/32')
  125. def test_010_ipv6(self):
  126. with self.assertNotRaises(ValueError):
  127. instance = qubes.firewall.DstHost('2001:abcd:efab::3')
  128. self.assertEqual(instance.type, 'dst6')
  129. self.assertEqual(instance.prefixlen, 128)
  130. self.assertEqual(str(instance), '2001:abcd:efab::3/128')
  131. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::3/128')
  132. self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::3/128')
  133. def test_011_ipv6_prefixlen(self):
  134. with self.assertNotRaises(ValueError):
  135. instance = qubes.firewall.DstHost('2001:abcd:efab::', 64)
  136. self.assertEqual(instance.type, 'dst6')
  137. self.assertEqual(instance.prefixlen, 64)
  138. self.assertEqual(str(instance), '2001:abcd:efab::/64')
  139. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
  140. self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::/64')
  141. def test_012_ipv6_parse_prefixlen(self):
  142. with self.assertNotRaises(ValueError):
  143. instance = qubes.firewall.DstHost('2001:abcd:efab::/64')
  144. self.assertEqual(instance.type, 'dst6')
  145. self.assertEqual(instance.prefixlen, 64)
  146. self.assertEqual(str(instance), '2001:abcd:efab::/64')
  147. self.assertEqual(instance.rule, 'dst6=2001:abcd:efab::/64')
  148. self.assertEqual(instance.api_rule, 'dst6=2001:abcd:efab::/64')
  149. def test_013_ipv6_invalid_prefix(self):
  150. with self.assertRaises(ValueError):
  151. qubes.firewall.DstHost('2001:abcd:efab::3/129')
  152. with self.assertRaises(ValueError):
  153. qubes.firewall.DstHost('2001:abcd:efab::3', 129)
  154. with self.assertRaises(ValueError):
  155. qubes.firewall.DstHost('2001:abcd:efab::3/-1')
  156. def test_014_ipv6_invalid_addr(self):
  157. with self.assertRaises(ValueError):
  158. qubes.firewall.DstHost('2001:abcd:efab0123::3/128')
  159. with self.assertRaises(ValueError):
  160. qubes.firewall.DstHost('2001:abcd:efab:3/128')
  161. with self.assertRaises(ValueError):
  162. qubes.firewall.DstHost('2001:abcd:efab:a:a:a:a:a:a:3/128')
  163. with self.assertRaises(ValueError):
  164. qubes.firewall.DstHost('2001:abcd:efgh::3/128')
  165. @unittest.expectedFailure
  166. def test_015_ipv6_invalid_network(self):
  167. with self.assertRaises(ValueError):
  168. qubes.firewall.DstHost('2001:abcd:efab::3/64')
  169. def test_020_invalid_hostname(self):
  170. with self.assertRaises(ValueError):
  171. qubes.firewall.DstHost('www qubes-os.org')
  172. with self.assertRaises(ValueError):
  173. qubes.firewall.DstHost('https://qubes-os.org')
  174. class TC_03_DstPorts(qubes.tests.QubesTestCase):
  175. def test_000_single_str(self):
  176. with self.assertNotRaises(ValueError):
  177. instance = qubes.firewall.DstPorts('80')
  178. self.assertEqual(str(instance), '80')
  179. self.assertEqual(instance.range, [80, 80])
  180. self.assertEqual(instance.rule, 'dstports=80-80')
  181. self.assertEqual(instance.api_rule, 'dstports=80-80')
  182. def test_001_single_int(self):
  183. with self.assertNotRaises(ValueError):
  184. instance = qubes.firewall.DstPorts(80)
  185. self.assertEqual(str(instance), '80')
  186. self.assertEqual(instance.range, [80, 80])
  187. self.assertEqual(instance.rule, 'dstports=80-80')
  188. self.assertEqual(instance.api_rule, 'dstports=80-80')
  189. def test_002_range(self):
  190. with self.assertNotRaises(ValueError):
  191. instance = qubes.firewall.DstPorts('80-90')
  192. self.assertEqual(str(instance), '80-90')
  193. self.assertEqual(instance.range, [80, 90])
  194. self.assertEqual(instance.rule, 'dstports=80-90')
  195. def test_003_invalid(self):
  196. with self.assertRaises(ValueError):
  197. qubes.firewall.DstPorts('80-90-100')
  198. with self.assertRaises(ValueError):
  199. qubes.firewall.DstPorts('abcdef')
  200. with self.assertRaises(ValueError):
  201. qubes.firewall.DstPorts('80 90')
  202. with self.assertRaises(ValueError):
  203. qubes.firewall.DstPorts('')
  204. def test_004_reversed_range(self):
  205. with self.assertRaises(ValueError):
  206. qubes.firewall.DstPorts('100-20')
  207. def test_005_out_of_range(self):
  208. with self.assertRaises(ValueError):
  209. qubes.firewall.DstPorts('1000000000000')
  210. with self.assertRaises(ValueError):
  211. qubes.firewall.DstPorts(1000000000000)
  212. with self.assertRaises(ValueError):
  213. qubes.firewall.DstPorts('1-1000000000000')
  214. class TC_04_IcmpType(qubes.tests.QubesTestCase):
  215. def test_000_number(self):
  216. with self.assertNotRaises(ValueError):
  217. instance = qubes.firewall.IcmpType(8)
  218. self.assertEqual(str(instance), '8')
  219. self.assertEqual(instance.rule, 'icmptype=8')
  220. def test_001_str(self):
  221. with self.assertNotRaises(ValueError):
  222. instance = qubes.firewall.IcmpType('8')
  223. self.assertEqual(str(instance), '8')
  224. self.assertEqual(instance.rule, 'icmptype=8')
  225. self.assertEqual(instance.api_rule, 'icmptype=8')
  226. def test_002_invalid(self):
  227. with self.assertRaises(ValueError):
  228. qubes.firewall.IcmpType(600)
  229. with self.assertRaises(ValueError):
  230. qubes.firewall.IcmpType(-1)
  231. with self.assertRaises(ValueError):
  232. qubes.firewall.IcmpType('abcde')
  233. with self.assertRaises(ValueError):
  234. qubes.firewall.IcmpType('')
  235. class TC_05_SpecialTarget(qubes.tests.QubesTestCase):
  236. def test_000_allowed_values(self):
  237. with self.assertNotRaises(ValueError):
  238. instance = qubes.firewall.SpecialTarget('dns')
  239. self.assertEqual(
  240. set(instance.allowed_values), {'dns'})
  241. def test_001_rule(self):
  242. instance = qubes.firewall.SpecialTarget('dns')
  243. self.assertEqual(instance.rule, 'specialtarget=dns')
  244. self.assertEqual(instance.api_rule, 'specialtarget=dns')
  245. class TC_06_Expire(qubes.tests.QubesTestCase):
  246. def test_000_number(self):
  247. with self.assertNotRaises(ValueError):
  248. instance = qubes.firewall.Expire(1463292452)
  249. self.assertEqual(str(instance), '1463292452')
  250. self.assertEqual(instance.api_rule, 'expire=1463292452')
  251. self.assertEqual(instance.datetime,
  252. datetime.datetime.fromtimestamp(1463292452))
  253. self.assertIsNone(instance.rule)
  254. def test_001_str(self):
  255. with self.assertNotRaises(ValueError):
  256. instance = qubes.firewall.Expire('1463292452')
  257. self.assertEqual(str(instance), '1463292452')
  258. self.assertEqual(instance.datetime,
  259. datetime.datetime.fromtimestamp(1463292452))
  260. self.assertIsNone(instance.rule)
  261. def test_002_invalid(self):
  262. with self.assertRaises(ValueError):
  263. qubes.firewall.Expire('abcdef')
  264. with self.assertRaises(ValueError):
  265. qubes.firewall.Expire('')
  266. def test_003_expired(self):
  267. with self.assertNotRaises(ValueError):
  268. instance = qubes.firewall.Expire('1463292452')
  269. self.assertTrue(instance.expired)
  270. with self.assertNotRaises(ValueError):
  271. instance = qubes.firewall.Expire('2583292452')
  272. self.assertFalse(instance.expired)
  273. class TC_07_Comment(qubes.tests.QubesTestCase):
  274. def test_000_str(self):
  275. with self.assertNotRaises(ValueError):
  276. instance = qubes.firewall.Comment('Some comment')
  277. self.assertEqual(str(instance), 'Some comment')
  278. self.assertEqual(instance.api_rule, 'comment=Some comment')
  279. self.assertIsNone(instance.rule)
  280. class TC_08_Rule(qubes.tests.QubesTestCase):
  281. def test_000_simple(self):
  282. with self.assertNotRaises(ValueError):
  283. rule = qubes.firewall.Rule(None, action='accept', proto='icmp')
  284. self.assertEqual(rule.rule, 'action=accept proto=icmp')
  285. self.assertIsNone(rule.dsthost)
  286. self.assertIsNone(rule.dstports)
  287. self.assertIsNone(rule.icmptype)
  288. self.assertIsNone(rule.comment)
  289. self.assertIsNone(rule.expire)
  290. self.assertEqual(str(rule.action), 'accept')
  291. self.assertEqual(str(rule.proto), 'icmp')
  292. def test_001_expire(self):
  293. with self.assertNotRaises(ValueError):
  294. rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
  295. expire='1463292452')
  296. self.assertIsNone(rule.rule)
  297. with self.assertNotRaises(ValueError):
  298. rule = qubes.firewall.Rule(None, action='accept', proto='icmp',
  299. expire='1663292452')
  300. self.assertIsNotNone(rule.rule)
  301. def test_002_dstports(self):
  302. with self.assertNotRaises(ValueError):
  303. rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
  304. dstports=80)
  305. self.assertEqual(str(rule.dstports), '80')
  306. with self.assertNotRaises(ValueError):
  307. rule = qubes.firewall.Rule(None, action='accept', proto='udp',
  308. dstports=80)
  309. self.assertEqual(str(rule.dstports), '80')
  310. def test_003_reject_invalid(self):
  311. with self.assertRaises((ValueError, AssertionError)):
  312. # missing action
  313. qubes.firewall.Rule(None, proto='icmp')
  314. with self.assertRaises(ValueError):
  315. # not proto=tcp or proto=udp for dstports
  316. qubes.firewall.Rule(None, action='accept', proto='icmp',
  317. dstports=80)
  318. with self.assertRaises(ValueError):
  319. # not proto=tcp or proto=udp for dstports
  320. qubes.firewall.Rule(None, action='accept', dstports=80)
  321. with self.assertRaises(ValueError):
  322. # not proto=icmp for icmptype
  323. qubes.firewall.Rule(None, action='accept', proto='tcp',
  324. icmptype=8)
  325. with self.assertRaises(ValueError):
  326. # not proto=icmp for icmptype
  327. qubes.firewall.Rule(None, action='accept', icmptype=8)
  328. def test_004_proto_change(self):
  329. rule = qubes.firewall.Rule(None, action='accept', proto='tcp')
  330. with self.assertNotRaises(ValueError):
  331. rule.proto = 'udp'
  332. self.assertEqual(rule.rule, 'action=accept proto=udp')
  333. rule = qubes.firewall.Rule(None, action='accept', proto='tcp',
  334. dstports=80)
  335. with self.assertNotRaises(ValueError):
  336. rule.proto = 'udp'
  337. self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
  338. rule = qubes.firewall.Rule(None, action='accept')
  339. with self.assertNotRaises(ValueError):
  340. rule.proto = 'udp'
  341. self.assertEqual(rule.rule, 'action=accept proto=udp')
  342. with self.assertNotRaises(ValueError):
  343. rule.dstports = 80
  344. self.assertEqual(rule.rule, 'action=accept proto=udp dstports=80-80')
  345. with self.assertNotRaises(ValueError):
  346. rule.proto = 'icmp'
  347. self.assertEqual(rule.rule, 'action=accept proto=icmp')
  348. self.assertIsNone(rule.dstports)
  349. rule.icmptype = 8
  350. self.assertEqual(rule.rule, 'action=accept proto=icmp icmptype=8')
  351. with self.assertNotRaises(ValueError):
  352. rule.proto = qubes.property.DEFAULT
  353. self.assertEqual(rule.rule, 'action=accept')
  354. self.assertIsNone(rule.dstports)
  355. def test_005_from_xml_v1(self):
  356. xml_txt = \
  357. '<rule address="192.168.0.0" proto="tcp" netmask="24" port="443"/>'
  358. with self.assertNotRaises(ValueError):
  359. rule = qubes.firewall.Rule.from_xml_v1(
  360. lxml.etree.fromstring(xml_txt), 'accept')
  361. self.assertEqual(rule.dsthost, '192.168.0.0/24')
  362. self.assertEqual(rule.proto, 'tcp')
  363. self.assertEqual(rule.dstports, '443')
  364. self.assertIsNone(rule.expire)
  365. self.assertIsNone(rule.comment)
  366. def test_006_from_xml_v1(self):
  367. xml_txt = \
  368. '<rule address="qubes-os.org" proto="tcp" ' \
  369. 'port="443" toport="1024"/>'
  370. with self.assertNotRaises(ValueError):
  371. rule = qubes.firewall.Rule.from_xml_v1(
  372. lxml.etree.fromstring(xml_txt), 'drop')
  373. self.assertEqual(rule.dsthost, 'qubes-os.org')
  374. self.assertEqual(rule.proto, 'tcp')
  375. self.assertEqual(rule.dstports, '443-1024')
  376. self.assertEqual(rule.action, 'drop')
  377. self.assertIsNone(rule.expire)
  378. self.assertIsNone(rule.comment)
  379. def test_007_from_xml_v1(self):
  380. xml_txt = \
  381. '<rule address="192.168.0.0" netmask="24" expire="1463292452"/>'
  382. with self.assertNotRaises(ValueError):
  383. rule = qubes.firewall.Rule.from_xml_v1(
  384. lxml.etree.fromstring(xml_txt), 'accept')
  385. self.assertEqual(rule.dsthost, '192.168.0.0/24')
  386. self.assertEqual(rule.expire, '1463292452')
  387. self.assertEqual(rule.action, 'accept')
  388. self.assertIsNone(rule.proto)
  389. self.assertIsNone(rule.dstports)
  390. def test_008_from_api_string(self):
  391. rule_txt = 'action=drop proto=tcp dstports=80-80'
  392. with self.assertNotRaises(ValueError):
  393. rule = qubes.firewall.Rule.from_api_string(
  394. rule_txt)
  395. self.assertEqual(rule.dstports.range, [80, 80])
  396. self.assertEqual(rule.proto, 'tcp')
  397. self.assertEqual(rule.action, 'drop')
  398. self.assertIsNone(rule.dsthost)
  399. self.assertIsNone(rule.expire)
  400. self.assertIsNone(rule.comment)
  401. self.assertEqual(rule.api_rule, rule_txt)
  402. def test_009_from_api_string(self):
  403. rule_txt = 'action=accept expire=2063292452 proto=tcp ' \
  404. 'comment=Some comment, with spaces'
  405. with self.assertNotRaises(ValueError):
  406. rule = qubes.firewall.Rule.from_api_string(
  407. rule_txt)
  408. self.assertEqual(rule.comment, 'Some comment, with spaces')
  409. self.assertEqual(rule.proto, 'tcp')
  410. self.assertEqual(rule.action, 'accept')
  411. self.assertEqual(rule.expire, '2063292452')
  412. self.assertIsNone(rule.dstports)
  413. self.assertIsNone(rule.dsthost)
  414. self.assertEqual(rule.api_rule, rule_txt)
  415. class TC_10_Firewall(qubes.tests.QubesTestCase):
  416. def setUp(self):
  417. super(TC_10_Firewall, self).setUp()
  418. self.vm = TestVM()
  419. firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
  420. if os.path.exists(firewall_path):
  421. os.unlink(firewall_path)
  422. def tearDown(self):
  423. firewall_path = os.path.join('/tmp', self.vm.firewall_conf)
  424. if os.path.exists(firewall_path):
  425. os.unlink(firewall_path)
  426. return super(TC_10_Firewall, self).tearDown()
  427. def test_000_defaults(self):
  428. fw = qubes.firewall.Firewall(self.vm, False)
  429. fw.load_defaults()
  430. self.assertEqual(fw.policy, 'drop')
  431. self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
  432. def test_001_save_load_empty(self):
  433. fw = qubes.firewall.Firewall(self.vm, True)
  434. self.assertEqual(fw.policy, 'drop')
  435. self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
  436. fw.save()
  437. fw.load()
  438. self.assertEqual(fw.policy, 'drop')
  439. self.assertEqual(fw.rules, [qubes.firewall.Rule(None, action='accept')])
  440. def test_002_save_load_rules(self):
  441. fw = qubes.firewall.Firewall(self.vm, True)
  442. rules = [
  443. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  444. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  445. qubes.firewall.Rule(None, action='accept', proto='udp',
  446. dstports=67),
  447. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  448. ]
  449. fw.rules.extend(rules)
  450. fw.save()
  451. self.assertTrue(os.path.exists(os.path.join(
  452. self.vm.dir_path, self.vm.firewall_conf)))
  453. fw = qubes.firewall.Firewall(TestVM(), True)
  454. self.assertEqual(fw.policy, qubes.firewall.Action.drop)
  455. self.assertEqual(fw.rules,
  456. [qubes.firewall.Rule(None, action='accept')] + rules)
  457. def test_003_load_v1(self):
  458. xml_txt = """<QubesFirewallRules dns="allow" icmp="allow"
  459. policy="deny" yumProxy="allow">
  460. <rule address="192.168.0.0" proto="tcp" netmask="24" port="80"/>
  461. <rule address="qubes-os.org" proto="tcp" port="443"/>
  462. </QubesFirewallRules>
  463. """
  464. with open(os.path.join('/tmp', self.vm.firewall_conf), 'w') as f:
  465. f.write(xml_txt)
  466. with self.assertNotRaises(ValueError):
  467. fw = qubes.firewall.Firewall(self.vm)
  468. self.assertEqual(str(fw.policy), 'drop')
  469. rules = [
  470. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  471. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  472. qubes.firewall.Rule(None, action='accept', proto='tcp',
  473. dsthost='192.168.0.0/24', dstports='80'),
  474. qubes.firewall.Rule(None, action='accept', proto='tcp',
  475. dsthost='qubes-os.org', dstports='443')
  476. ]
  477. self.assertEqual(fw.rules, rules)
  478. def test_004_save_skip_expired(self):
  479. fw = qubes.firewall.Firewall(self.vm, True)
  480. rules = [
  481. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  482. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  483. qubes.firewall.Rule(None, action='accept', proto='udp',
  484. dstports=67, expire=1373300257),
  485. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  486. ]
  487. fw.rules = rules
  488. fw.save()
  489. rules.pop(2)
  490. fw = qubes.firewall.Firewall(self.vm, True)
  491. self.assertEqual(fw.rules, rules)
  492. def test_005_qdb_entries(self):
  493. fw = qubes.firewall.Firewall(self.vm, True)
  494. rules = [
  495. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  496. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  497. qubes.firewall.Rule(None, action='accept', proto='udp'),
  498. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  499. ]
  500. fw.rules = rules
  501. expected_qdb_entries = {
  502. 'policy': 'drop',
  503. '0000': 'action=drop proto=icmp',
  504. '0001': 'action=drop proto=tcp dstports=80-80',
  505. '0002': 'action=accept proto=udp',
  506. '0003': 'action=accept specialtarget=dns',
  507. }
  508. self.assertEqual(fw.qdb_entries(), expected_qdb_entries)
  509. def test_006_auto_expire_rules(self):
  510. fw = qubes.firewall.Firewall(self.vm, True)
  511. rules = [
  512. qubes.firewall.Rule(None, action='drop', proto='icmp'),
  513. qubes.firewall.Rule(None, action='drop', proto='tcp', dstports=80),
  514. qubes.firewall.Rule(None, action='accept', proto='udp',
  515. dstports=67, expire=self.loop.time() + 5),
  516. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  517. ]
  518. fw.rules = rules
  519. fw.save()
  520. self.assertEqual(fw.rules, rules)
  521. self.loop.run_until_complete(asyncio.sleep(3))
  522. # still old rules should be there
  523. self.assertEqual(fw.rules, rules)
  524. rules.pop(2)
  525. self.loop.run_until_complete(asyncio.sleep(3))
  526. # expect new rules
  527. self.assertEqual(fw.rules, rules)