network_ipv6.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015-2020
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. import asyncio
  22. import subprocess
  23. import sys
  24. import time
  25. import unittest
  26. from distutils import spawn
  27. import qubes.firewall
  28. import qubes.tests
  29. import qubes.vm
  30. from qubes.tests.integ.network import VmNetworkingMixin
  31. # noinspection PyAttributeOutsideInit,PyPep8Naming
  32. class VmIPv6NetworkingMixin(VmNetworkingMixin):
  33. test_ip6 = '2000:abcd::1'
  34. ping6_cmd = 'ping6 -W 1 -n -c 1 {target}'
  35. def setUp(self):
  36. super(VmIPv6NetworkingMixin, self).setUp()
  37. self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6)
  38. self.ping6_name = self.ping6_cmd.format(target=self.test_name)
  39. def configure_netvm(self):
  40. '''
  41. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  42. '''
  43. self.testnetvm.features['ipv6'] = True
  44. super(VmIPv6NetworkingMixin, self).configure_netvm()
  45. def run_netvm_cmd(cmd):
  46. try:
  47. self.loop.run_until_complete(
  48. self.testnetvm.run_for_stdio(cmd, user='root'))
  49. except subprocess.CalledProcessError as e:
  50. self.fail("Command '%s' failed: %s%s" %
  51. (cmd, e.stdout.decode(), e.stderr.decode()))
  52. run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
  53. run_netvm_cmd(
  54. "ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6))
  55. # ignore failure
  56. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  57. run_netvm_cmd(
  58. "dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".
  59. format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name))
  60. def test_500_ipv6_simple_networking(self):
  61. '''
  62. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  63. '''
  64. self.loop.run_until_complete(self.testvm1.start())
  65. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  66. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  67. def test_510_ipv6_simple_proxyvm(self):
  68. '''
  69. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  70. '''
  71. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  72. name=self.make_vm_name('proxy'),
  73. label='red')
  74. self.proxy.provides_network = True
  75. self.proxy.netvm = self.testnetvm
  76. self.loop.run_until_complete(self.proxy.create_on_disk())
  77. self.testvm1.netvm = self.proxy
  78. self.app.save()
  79. self.loop.run_until_complete(self.testvm1.start())
  80. self.assertTrue(self.proxy.is_running())
  81. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  82. "Ping by IP from ProxyVM failed")
  83. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  84. "Ping by name from ProxyVM failed")
  85. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  86. "Ping by IP from AppVM failed")
  87. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  88. "Ping by IP from AppVM failed")
  89. @qubes.tests.expectedFailureIfTemplate('debian-7')
  90. @unittest.skipUnless(spawn.find_executable('xdotool'),
  91. "xdotool not installed")
  92. def test_520_ipv6_simple_proxyvm_nm(self):
  93. '''
  94. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  95. '''
  96. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  97. name=self.make_vm_name('proxy'),
  98. label='red')
  99. self.proxy.provides_network = True
  100. self.loop.run_until_complete(self.proxy.create_on_disk())
  101. self.proxy.netvm = self.testnetvm
  102. self.proxy.features['service.network-manager'] = True
  103. self.testvm1.netvm = self.proxy
  104. self.app.save()
  105. self.loop.run_until_complete(self.testvm1.start())
  106. self.assertTrue(self.proxy.is_running())
  107. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  108. "Ping by IP failed")
  109. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  110. "Ping by name failed")
  111. # reconnect to make sure that device was configured by NM
  112. self.assertEqual(
  113. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  114. user="user"),
  115. 0, "Failed to disconnect eth0 using nmcli")
  116. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  117. "Network should be disabled, but apparently it isn't")
  118. self.assertEqual(
  119. self.run_cmd(self.proxy,
  120. 'nmcli connection up "VM uplink eth0" ifname eth0',
  121. user="user"),
  122. 0, "Failed to connect eth0 using nmcli")
  123. self.assertEqual(self.run_cmd(self.proxy, "nm-online",
  124. user="user"), 0,
  125. "Failed to wait for NM connection")
  126. # wait for duplicate-address-detection to complete - by default it has
  127. # 1s timeout
  128. time.sleep(2)
  129. # check for nm-applet presence
  130. self.assertEqual(subprocess.call([
  131. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  132. self.proxy.name)],
  133. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  134. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  135. "Ping by IP failed (after NM reconnection")
  136. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  137. "Ping by name failed (after NM reconnection)")
  138. def test_530_ipv6_firewallvm_firewall(self):
  139. '''
  140. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  141. '''
  142. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  143. name=self.make_vm_name('proxy'),
  144. label='red')
  145. self.proxy.provides_network = True
  146. self.loop.run_until_complete(self.proxy.create_on_disk())
  147. self.proxy.netvm = self.testnetvm
  148. self.testvm1.netvm = self.proxy
  149. self.app.save()
  150. # block all for first
  151. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  152. self.testvm1.firewall.save()
  153. self.loop.run_until_complete(self.testvm1.start())
  154. self.assertTrue(self.proxy.is_running())
  155. server = self.loop.run_until_complete(self.testnetvm.run(
  156. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  157. try:
  158. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  159. "Ping by IP from ProxyVM failed")
  160. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  161. "Ping by name from ProxyVM failed")
  162. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  163. "Ping by IP should be blocked")
  164. client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  165. client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  166. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  167. "TCP connection should be blocked")
  168. # block all except ICMP
  169. self.testvm1.firewall.rules = [(
  170. qubes.firewall.Rule(None, action='accept', proto='icmp')
  171. )]
  172. self.testvm1.firewall.save()
  173. # Ugly hack b/c there is no feedback when the rules are actually
  174. # applied
  175. time.sleep(3)
  176. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  177. "Ping by IP failed (should be allowed now)")
  178. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  179. "Ping by name should be blocked")
  180. # all TCP still blocked
  181. self.testvm1.firewall.rules = [
  182. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  183. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  184. ]
  185. self.testvm1.firewall.save()
  186. # Ugly hack b/c there is no feedback when the rules are actually
  187. # applied
  188. time.sleep(3)
  189. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  190. "Ping by name failed (should be allowed now)")
  191. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  192. "TCP connection should be blocked")
  193. # block all except target
  194. self.testvm1.firewall.rules = [
  195. qubes.firewall.Rule(None, action='accept',
  196. dsthost=self.test_ip6,
  197. proto='tcp', dstports=1234),
  198. ]
  199. self.testvm1.firewall.save()
  200. # Ugly hack b/c there is no feedback when the rules are actually
  201. # applied
  202. time.sleep(3)
  203. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  204. "TCP connection failed (should be allowed now)")
  205. # block all except target - by name
  206. self.testvm1.firewall.rules = [
  207. qubes.firewall.Rule(None, action='accept',
  208. dsthost=self.test_name,
  209. proto='tcp', dstports=1234),
  210. ]
  211. self.testvm1.firewall.save()
  212. # Ugly hack b/c there is no feedback when the rules are actually
  213. # applied
  214. time.sleep(3)
  215. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  216. "TCP (IPv6) connection failed (should be allowed now)")
  217. self.assertEqual(self.run_cmd(self.testvm1, client4_cmd),
  218. 0,
  219. "TCP (IPv4) connection failed (should be allowed now)")
  220. # allow all except target
  221. self.testvm1.firewall.rules = [
  222. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6,
  223. proto='tcp', dstports=1234),
  224. qubes.firewall.Rule(action='accept'),
  225. ]
  226. self.testvm1.firewall.save()
  227. # Ugly hack b/c there is no feedback when the rules are actually
  228. # applied
  229. time.sleep(3)
  230. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  231. "TCP connection should be blocked")
  232. finally:
  233. server.terminate()
  234. self.loop.run_until_complete(server.wait())
  235. def test_540_ipv6_inter_vm(self):
  236. '''
  237. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  238. '''
  239. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  240. name=self.make_vm_name('proxy'),
  241. label='red')
  242. self.loop.run_until_complete(self.proxy.create_on_disk())
  243. self.proxy.provides_network = True
  244. self.proxy.netvm = self.testnetvm
  245. self.testvm1.netvm = self.proxy
  246. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  247. name=self.make_vm_name('vm2'),
  248. label='red')
  249. self.loop.run_until_complete(self.testvm2.create_on_disk())
  250. self.testvm2.netvm = self.proxy
  251. self.app.save()
  252. self.loop.run_until_complete(asyncio.wait([
  253. self.testvm1.start(),
  254. self.testvm2.start()]))
  255. self.assertNotEqual(self.run_cmd(self.testvm1,
  256. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  257. self.testvm2.netvm = self.testnetvm
  258. self.assertNotEqual(self.run_cmd(self.testvm1,
  259. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  260. self.assertNotEqual(self.run_cmd(self.testvm2,
  261. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  262. self.testvm1.netvm = self.testnetvm
  263. self.assertNotEqual(self.run_cmd(self.testvm1,
  264. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  265. self.assertNotEqual(self.run_cmd(self.testvm2,
  266. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  267. def test_550_ipv6_spoof_ip(self):
  268. '''Test if VM IP spoofing is blocked
  269. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  270. '''
  271. self.loop.run_until_complete(self.testvm1.start())
  272. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  273. # add a simple rule counting packets
  274. self.assertEqual(self.run_cmd(self.testnetvm,
  275. 'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format(
  276. self.testvm1.ip6)), 0)
  277. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  278. 'ip -6 addr flush dev eth0 && '
  279. 'ip -6 addr add {}/128 dev eth0 && '
  280. 'ip -6 route add default via {} dev eth0'.format(
  281. str(self.testvm1.visible_ip6) + '1',
  282. str(self.testvm1.visible_gateway6)),
  283. user='root'))
  284. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  285. "Spoofed ping should be blocked")
  286. try:
  287. (output, _) = self.loop.run_until_complete(
  288. self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT',
  289. user='root'))
  290. except subprocess.CalledProcessError:
  291. self.fail('ip6tables -nxvL INPUT failed')
  292. output = output.decode().splitlines()
  293. packets = output[2].lstrip().split()[0]
  294. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  295. def test_710_ipv6_custom_ip_simple(self):
  296. '''Custom AppVM IP
  297. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  298. '''
  299. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  300. self.app.save()
  301. self.loop.run_until_complete(self.testvm1.start())
  302. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  303. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  304. def test_711_ipv6_custom_ip_proxy(self):
  305. '''Custom ProxyVM IP
  306. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  307. '''
  308. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  309. name=self.make_vm_name('proxy'),
  310. label='red')
  311. self.loop.run_until_complete(self.proxy.create_on_disk())
  312. self.proxy.provides_network = True
  313. self.proxy.netvm = self.testnetvm
  314. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  315. self.testvm1.netvm = self.proxy
  316. self.app.save()
  317. self.loop.run_until_complete(self.testvm1.start())
  318. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  319. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  320. def test_712_ipv6_custom_ip_firewall(self):
  321. '''Custom VM IP and firewall
  322. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  323. '''
  324. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  325. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  326. name=self.make_vm_name('proxy'),
  327. label='red')
  328. self.proxy.provides_network = True
  329. self.loop.run_until_complete(self.proxy.create_on_disk())
  330. self.proxy.netvm = self.testnetvm
  331. self.testvm1.netvm = self.proxy
  332. self.app.save()
  333. # block all but ICMP and DNS
  334. self.testvm1.firewall.rules = [
  335. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  336. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  337. ]
  338. self.testvm1.firewall.save()
  339. self.loop.run_until_complete(self.testvm1.start())
  340. self.assertTrue(self.proxy.is_running())
  341. server = self.loop.run_until_complete(self.testnetvm.run(
  342. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  343. try:
  344. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  345. "Ping by IP from ProxyVM failed")
  346. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  347. "Ping by name from ProxyVM failed")
  348. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  349. "Ping by IP should be allowed")
  350. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  351. "Ping by name should be allowed")
  352. client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  353. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  354. "TCP connection should be blocked")
  355. finally:
  356. server.terminate()
  357. self.loop.run_until_complete(server.wait())
  358. def create_testcases_for_templates():
  359. yield from qubes.tests.create_testcases_for_templates('VmIPv6Networking',
  360. VmIPv6NetworkingMixin, qubes.tests.SystemTestCase,
  361. module=sys.modules[__name__])
  362. def load_tests(loader, tests, pattern):
  363. tests.addTests(loader.loadTestsFromNames(
  364. create_testcases_for_templates()))
  365. return tests
  366. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)