From f3cf58e6f226274e4032602b349b13839c94115a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Sun, 3 Dec 2017 03:23:09 +0100 Subject: [PATCH] tests: add integration tests for IPv6 Run also all IPv4 tests with IPv6 enabled to check for regressions (broken IPv4 because of enabled IPv6). QubesOS/qubes-issues#718 --- qubes/tests/integ/network.py | 374 +++++++++++++++++++++++++++++++++++ 1 file changed, 374 insertions(+) diff --git a/qubes/tests/integ/network.py b/qubes/tests/integ/network.py index 0c078379..14c92f59 100644 --- a/qubes/tests/integ/network.py +++ b/qubes/tests/integ/network.py @@ -655,6 +655,375 @@ class VmNetworkingMixin(object): nc.terminate() self.loop.run_until_complete(nc.wait()) +class VmIPv6NetworkingMixin(VmNetworkingMixin): + test_ip6 = '2000:abcd::1' + + ping6_cmd = 'ping6 -W 1 -n -c 1 {target}' + + def setUp(self): + super(VmIPv6NetworkingMixin, self).setUp() + self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6) + self.ping6_name = self.ping6_cmd.format(target=self.test_name) + + def configure_netvm(self): + self.testnetvm.features['ipv6'] = True + super(VmIPv6NetworkingMixin, self).configure_netvm() + + def run_netvm_cmd(cmd): + if self.run_cmd(self.testnetvm, cmd) != 0: + self.fail("Command '%s' failed" % cmd) + + run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6)) + run_netvm_cmd( + "ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6)) + # ignore failure + self.run_cmd(self.testnetvm, "killall --wait dnsmasq") + run_netvm_cmd( + "dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z". + format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name)) + + def test_500_ipv6_simple_networking(self): + self.loop.run_until_complete(self.testvm1.start()) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0) + + + def test_510_ipv6_simple_proxyvm(self): + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.provides_network = True + self.proxy.netvm = self.testnetvm + self.loop.run_until_complete(self.proxy.create_on_disk()) + self.testvm1.netvm = self.proxy + self.app.save() + + self.loop.run_until_complete(self.testvm1.start()) + self.assertTrue(self.proxy.is_running()) + self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0, + "Ping by name from ProxyVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Ping by IP from AppVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, + "Ping by IP from AppVM failed") + + + @qubes.tests.expectedFailureIfTemplate('debian-7') + @unittest.skipUnless(spawn.find_executable('xdotool'), + "xdotool not installed") + def test_520_ipv6_simple_proxyvm_nm(self): + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.provides_network = True + self.loop.run_until_complete(self.proxy.create_on_disk()) + self.proxy.netvm = self.testnetvm + self.proxy.features['service.network-manager'] = True + self.testvm1.netvm = self.proxy + self.app.save() + + self.loop.run_until_complete(self.testvm1.start()) + self.assertTrue(self.proxy.is_running()) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Ping by IP failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, + "Ping by name failed") + + # reconnect to make sure that device was configured by NM + self.assertEqual( + self.run_cmd(self.proxy, "nmcli device disconnect eth0", + user="user"), + 0, "Failed to disconnect eth0 using nmcli") + + self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Network should be disabled, but apparently it isn't") + self.assertEqual( + self.run_cmd(self.proxy, + 'nmcli connection up "VM uplink eth0" ifname eth0', + user="user"), + 0, "Failed to connect eth0 using nmcli") + self.assertEqual(self.run_cmd(self.proxy, "nm-online", + user="user"), 0, + "Failed to wait for NM connection") + + # wait for duplicate-address-detection to complete - by default it has + # 1s timeout + time.sleep(2) + + # check for nm-applet presence + self.assertEqual(subprocess.call([ + 'xdotool', 'search', '--class', '{}:nm-applet'.format( + self.proxy.name)], + stdout=subprocess.DEVNULL), 0, "nm-applet window not found") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Ping by IP failed (after NM reconnection") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, + "Ping by name failed (after NM reconnection)") + + + def test_530_ipv6_firewallvm_firewall(self): + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.provides_network = True + self.loop.run_until_complete(self.proxy.create_on_disk()) + self.proxy.netvm = self.testnetvm + self.testvm1.netvm = self.proxy + self.app.save() + + if self.run_cmd(self.testnetvm, 'ncat -h') != 0: + self.skipTest('nmap ncat not installer') + + # block all for first + + self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')] + self.testvm1.firewall.save() + self.loop.run_until_complete(self.testvm1.start()) + self.assertTrue(self.proxy.is_running()) + + nc = self.loop.run_until_complete(self.testnetvm.run( + 'ncat -l --send-only -e /bin/hostname -k 1234')) + + try: + self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0, + "Ping by name from ProxyVM failed") + self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Ping by IP should be blocked") + + nc_cmd = "ncat -w 1 --recv-only {} 1234".format(self.test_ip6) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + + # block all except ICMP + + self.testvm1.firewall.rules = [( + qubes.firewall.Rule(None, action='accept', proto='icmp') + )] + self.testvm1.firewall.save() + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Ping by IP failed (should be allowed now)") + self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, + "Ping by name should be blocked") + + # all TCP still blocked + + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', proto='icmp'), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + self.testvm1.firewall.save() + + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, + "Ping by name failed (should be allowed now)") + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + + # block all except target + + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip6, + proto='tcp', dstports=1234), + ] + self.testvm1.firewall.save() + + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection failed (should be allowed now)") + + # block all except target - by name + + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', + dsthost=self.test_name, + proto='tcp', dstports=1234), + ] + self.testvm1.firewall.save() + + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP (IPv6) connection failed (should be allowed now)") + self.assertEqual(self.run_cmd(self.testvm1, + nc_cmd.replace(self.test_ip6, self.test_ip)), + 0, + "TCP (IPv4) connection failed (should be allowed now)") + + # allow all except target + + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6, + proto='tcp', dstports=1234), + qubes.firewall.Rule(action='accept'), + ] + self.testvm1.firewall.save() + + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + finally: + nc.terminate() + self.loop.run_until_complete(nc.wait()) + + + def test_540_ipv6_inter_vm(self): + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.loop.run_until_complete(self.proxy.create_on_disk()) + self.proxy.provides_network = True + self.proxy.netvm = self.testnetvm + self.testvm1.netvm = self.proxy + + self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('vm2'), + label='red') + self.loop.run_until_complete(self.testvm2.create_on_disk()) + self.testvm2.netvm = self.proxy + self.app.save() + + self.loop.run_until_complete(asyncio.wait([ + self.testvm1.start(), + self.testvm2.start()])) + + self.assertNotEqual(self.run_cmd(self.testvm1, + self.ping_cmd.format(target=self.testvm2.ip6)), 0) + + self.testvm2.netvm = self.testnetvm + + self.assertNotEqual(self.run_cmd(self.testvm1, + self.ping_cmd.format(target=self.testvm2.ip6)), 0) + self.assertNotEqual(self.run_cmd(self.testvm2, + self.ping_cmd.format(target=self.testvm1.ip6)), 0) + + self.testvm1.netvm = self.testnetvm + + self.assertNotEqual(self.run_cmd(self.testvm1, + self.ping_cmd.format(target=self.testvm2.ip6)), 0) + self.assertNotEqual(self.run_cmd(self.testvm2, + self.ping_cmd.format(target=self.testvm1.ip6)), 0) + + + + def test_550_ipv6_spoof_ip(self): + """Test if VM IP spoofing is blocked""" + self.loop.run_until_complete(self.testvm1.start()) + + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0) + # add a simple rule counting packets + self.assertEqual(self.run_cmd(self.testnetvm, + 'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format( + self.testvm1.ip6)), 0) + self.loop.run_until_complete(self.testvm1.run_for_stdio( + 'ip -6 addr flush dev eth0 && ' + 'ip -6 addr add {}/128 dev eth0 && ' + 'ip -6 route add default via {} dev eth0'.format( + self.testvm1.visible_ip6 + '1', + self.testvm1.visible_gateway6), + user='root')) + self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Spoofed ping should be blocked") + try: + (output, _) = self.loop.run_until_complete( + self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT', + user='root')) + except subprocess.CalledProcessError: + self.fail('ip6tables -nxvL INPUT failed') + + output = output.decode().splitlines() + packets = output[2].lstrip().split()[0] + self.assertEquals(packets, '0', 'Some packet hit the INPUT rule') + + def test_710_ipv6_custom_ip_simple(self): + '''Custom AppVM IP''' + self.testvm1.ip6 = '2000:aaaa:bbbb::1' + self.app.save() + self.loop.run_until_complete(self.testvm1.start()) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0) + + def test_711_ipv6_custom_ip_proxy(self): + '''Custom ProxyVM IP''' + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.loop.run_until_complete(self.proxy.create_on_disk()) + self.proxy.provides_network = True + self.proxy.netvm = self.testnetvm + self.testvm1.ip6 = '2000:aaaa:bbbb::1' + self.testvm1.netvm = self.proxy + self.app.save() + + self.loop.run_until_complete(self.testvm1.start()) + + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0) + + def test_712_ipv6_custom_ip_firewall(self): + '''Custom VM IP and firewall''' + self.testvm1.ip6 = '2000:aaaa:bbbb::1' + + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.provides_network = True + self.loop.run_until_complete(self.proxy.create_on_disk()) + self.proxy.netvm = self.testnetvm + self.testvm1.netvm = self.proxy + self.app.save() + + if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0: + nc_version = NcVersion.Nmap + else: + nc_version = NcVersion.Trad + + # block all but ICMP and DNS + + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', proto='icmp'), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + self.testvm1.firewall.save() + self.loop.run_until_complete(self.testvm1.start()) + self.assertTrue(self.proxy.is_running()) + + nc = self.loop.run_until_complete(self.testnetvm.run( + 'nc -l --send-only -e /bin/hostname -k 1234' + if nc_version == NcVersion.Nmap + else 'while nc -l -e /bin/hostname -p 1234; do true; done')) + + try: + self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0, + "Ping by name from ProxyVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, + "Ping by IP should be allowed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, + "Ping by name should be allowed") + if nc_version == NcVersion.Nmap: + nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip6) + else: + nc_cmd = "nc -w 1 {} 1234".format(self.test_ip6) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + finally: + nc.terminate() + self.loop.run_until_complete(nc.wait()) # noinspection PyAttributeOutsideInit class VmUpdatesMixin(object): @@ -960,6 +1329,11 @@ def load_tests(loader, tests, pattern): 'VmNetworking_' + template, (VmNetworkingMixin, qubes.tests.SystemTestCase), {'template': template}))) + tests.addTests(loader.loadTestsFromTestCase( + type( + 'VmIPv6Networking_' + template, + (VmIPv6NetworkingMixin, qubes.tests.SystemTestCase), + {'template': template}))) tests.addTests(loader.loadTestsFromTestCase( type( 'VmUpdates_' + template,