Forráskód Böngészése

tests: add integration tests for IPv6

Run also all IPv4 tests with IPv6 enabled to check for regressions
(broken IPv4 because of enabled IPv6).

QubesOS/qubes-issues#718
Marek Marczykowski-Górecki 6 éve
szülő
commit
f3cf58e6f2
1 módosított fájl, 374 hozzáadás és 0 törlés
  1. 374 0
      qubes/tests/integ/network.py

+ 374 - 0
qubes/tests/integ/network.py

@@ -655,6 +655,375 @@ class VmNetworkingMixin(object):
             nc.terminate()
             self.loop.run_until_complete(nc.wait())
 
+class VmIPv6NetworkingMixin(VmNetworkingMixin):
+    test_ip6 = '2000:abcd::1'
+
+    ping6_cmd = 'ping6 -W 1 -n -c 1 {target}'
+
+    def setUp(self):
+        super(VmIPv6NetworkingMixin, self).setUp()
+        self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6)
+        self.ping6_name = self.ping6_cmd.format(target=self.test_name)
+
+    def configure_netvm(self):
+        self.testnetvm.features['ipv6'] = True
+        super(VmIPv6NetworkingMixin, self).configure_netvm()
+
+        def run_netvm_cmd(cmd):
+            if self.run_cmd(self.testnetvm, cmd) != 0:
+                self.fail("Command '%s' failed" % cmd)
+
+        run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
+        run_netvm_cmd(
+            "ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6))
+        # ignore failure
+        self.run_cmd(self.testnetvm, "killall --wait dnsmasq")
+        run_netvm_cmd(
+            "dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".
+            format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name))
+
+    def test_500_ipv6_simple_networking(self):
+        self.loop.run_until_complete(self.testvm1.start())
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
+
+
+    def test_510_ipv6_simple_proxyvm(self):
+        self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('proxy'),
+            label='red')
+        self.proxy.provides_network = True
+        self.proxy.netvm = self.testnetvm
+        self.loop.run_until_complete(self.proxy.create_on_disk())
+        self.testvm1.netvm = self.proxy
+        self.app.save()
+
+        self.loop.run_until_complete(self.testvm1.start())
+        self.assertTrue(self.proxy.is_running())
+        self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
+                         "Ping by IP from ProxyVM failed")
+        self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
+                         "Ping by name from ProxyVM failed")
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                         "Ping by IP from AppVM failed")
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
+                         "Ping by IP from AppVM failed")
+
+
+    @qubes.tests.expectedFailureIfTemplate('debian-7')
+    @unittest.skipUnless(spawn.find_executable('xdotool'),
+                         "xdotool not installed")
+    def test_520_ipv6_simple_proxyvm_nm(self):
+        self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('proxy'),
+            label='red')
+        self.proxy.provides_network = True
+        self.loop.run_until_complete(self.proxy.create_on_disk())
+        self.proxy.netvm = self.testnetvm
+        self.proxy.features['service.network-manager'] = True
+        self.testvm1.netvm = self.proxy
+        self.app.save()
+
+        self.loop.run_until_complete(self.testvm1.start())
+        self.assertTrue(self.proxy.is_running())
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                         "Ping by IP failed")
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
+                         "Ping by name failed")
+
+        # reconnect to make sure that device was configured by NM
+        self.assertEqual(
+            self.run_cmd(self.proxy, "nmcli device disconnect eth0",
+                user="user"),
+            0, "Failed to disconnect eth0 using nmcli")
+
+        self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+            "Network should be disabled, but apparently it isn't")
+        self.assertEqual(
+            self.run_cmd(self.proxy,
+                'nmcli connection up "VM uplink eth0" ifname eth0',
+                user="user"),
+            0, "Failed to connect eth0 using nmcli")
+        self.assertEqual(self.run_cmd(self.proxy, "nm-online",
+            user="user"), 0,
+                         "Failed to wait for NM connection")
+
+        # wait for duplicate-address-detection to complete - by default it has
+        #  1s timeout
+        time.sleep(2)
+
+        # check for nm-applet presence
+        self.assertEqual(subprocess.call([
+            'xdotool', 'search', '--class', '{}:nm-applet'.format(
+                self.proxy.name)],
+            stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                         "Ping by IP failed (after NM reconnection")
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
+                         "Ping by name failed (after NM reconnection)")
+
+
+    def test_530_ipv6_firewallvm_firewall(self):
+        self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('proxy'),
+            label='red')
+        self.proxy.provides_network = True
+        self.loop.run_until_complete(self.proxy.create_on_disk())
+        self.proxy.netvm = self.testnetvm
+        self.testvm1.netvm = self.proxy
+        self.app.save()
+
+        if self.run_cmd(self.testnetvm, 'ncat -h') != 0:
+            self.skipTest('nmap ncat not installer')
+
+        # block all for first
+
+        self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
+        self.testvm1.firewall.save()
+        self.loop.run_until_complete(self.testvm1.start())
+        self.assertTrue(self.proxy.is_running())
+
+        nc = self.loop.run_until_complete(self.testnetvm.run(
+            'ncat -l --send-only -e /bin/hostname -k 1234'))
+
+        try:
+            self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
+                            "Ping by IP from ProxyVM failed")
+            self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
+                            "Ping by name from ProxyVM failed")
+            self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                            "Ping by IP should be blocked")
+
+            nc_cmd = "ncat -w 1 --recv-only {} 1234".format(self.test_ip6)
+            self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
+                            "TCP connection should be blocked")
+
+            # block all except ICMP
+
+            self.testvm1.firewall.rules = [(
+                qubes.firewall.Rule(None, action='accept', proto='icmp')
+            )]
+            self.testvm1.firewall.save()
+            # Ugly hack b/c there is no feedback when the rules are actually
+            # applied
+            time.sleep(3)
+            self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                            "Ping by IP failed (should be allowed now)")
+            self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
+                            "Ping by name should be blocked")
+
+            # all TCP still blocked
+
+            self.testvm1.firewall.rules = [
+                qubes.firewall.Rule(None, action='accept', proto='icmp'),
+                qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
+            ]
+            self.testvm1.firewall.save()
+
+            # Ugly hack b/c there is no feedback when the rules are actually
+            # applied
+            time.sleep(3)
+            self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
+                            "Ping by name failed (should be allowed now)")
+            self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
+                            "TCP connection should be blocked")
+
+            # block all except target
+
+            self.testvm1.firewall.rules = [
+                qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip6,
+                    proto='tcp', dstports=1234),
+            ]
+            self.testvm1.firewall.save()
+
+            # Ugly hack b/c there is no feedback when the rules are actually
+            # applied
+            time.sleep(3)
+            self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
+                            "TCP connection failed (should be allowed now)")
+
+            # block all except target - by name
+
+            self.testvm1.firewall.rules = [
+                qubes.firewall.Rule(None, action='accept',
+                    dsthost=self.test_name,
+                    proto='tcp', dstports=1234),
+            ]
+            self.testvm1.firewall.save()
+
+            # Ugly hack b/c there is no feedback when the rules are actually
+            # applied
+            time.sleep(3)
+            self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
+                "TCP (IPv6) connection failed (should be allowed now)")
+            self.assertEqual(self.run_cmd(self.testvm1,
+                nc_cmd.replace(self.test_ip6, self.test_ip)),
+                0,
+                "TCP (IPv4) connection failed (should be allowed now)")
+
+            # allow all except target
+
+            self.testvm1.firewall.rules = [
+                qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6,
+                    proto='tcp', dstports=1234),
+                qubes.firewall.Rule(action='accept'),
+            ]
+            self.testvm1.firewall.save()
+
+            # Ugly hack b/c there is no feedback when the rules are actually
+            # applied
+            time.sleep(3)
+            self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
+                            "TCP connection should be blocked")
+        finally:
+            nc.terminate()
+            self.loop.run_until_complete(nc.wait())
+
+
+    def test_540_ipv6_inter_vm(self):
+        self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('proxy'),
+            label='red')
+        self.loop.run_until_complete(self.proxy.create_on_disk())
+        self.proxy.provides_network = True
+        self.proxy.netvm = self.testnetvm
+        self.testvm1.netvm = self.proxy
+
+        self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('vm2'),
+            label='red')
+        self.loop.run_until_complete(self.testvm2.create_on_disk())
+        self.testvm2.netvm = self.proxy
+        self.app.save()
+
+        self.loop.run_until_complete(asyncio.wait([
+            self.testvm1.start(),
+            self.testvm2.start()]))
+
+        self.assertNotEqual(self.run_cmd(self.testvm1,
+            self.ping_cmd.format(target=self.testvm2.ip6)), 0)
+
+        self.testvm2.netvm = self.testnetvm
+
+        self.assertNotEqual(self.run_cmd(self.testvm1,
+            self.ping_cmd.format(target=self.testvm2.ip6)), 0)
+        self.assertNotEqual(self.run_cmd(self.testvm2,
+            self.ping_cmd.format(target=self.testvm1.ip6)), 0)
+
+        self.testvm1.netvm = self.testnetvm
+
+        self.assertNotEqual(self.run_cmd(self.testvm1,
+            self.ping_cmd.format(target=self.testvm2.ip6)), 0)
+        self.assertNotEqual(self.run_cmd(self.testvm2,
+            self.ping_cmd.format(target=self.testvm1.ip6)), 0)
+
+
+
+    def test_550_ipv6_spoof_ip(self):
+        """Test if VM IP spoofing is blocked"""
+        self.loop.run_until_complete(self.testvm1.start())
+
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
+        # add a simple rule counting packets
+        self.assertEqual(self.run_cmd(self.testnetvm,
+            'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format(
+                self.testvm1.ip6)), 0)
+        self.loop.run_until_complete(self.testvm1.run_for_stdio(
+            'ip -6 addr flush dev eth0 && '
+            'ip -6 addr add {}/128 dev eth0 && '
+            'ip -6 route add default via {} dev eth0'.format(
+                self.testvm1.visible_ip6 + '1',
+                self.testvm1.visible_gateway6),
+            user='root'))
+        self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                         "Spoofed ping should be blocked")
+        try:
+            (output, _) = self.loop.run_until_complete(
+                self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT',
+                    user='root'))
+        except subprocess.CalledProcessError:
+            self.fail('ip6tables -nxvL INPUT failed')
+
+        output = output.decode().splitlines()
+        packets = output[2].lstrip().split()[0]
+        self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
+
+    def test_710_ipv6_custom_ip_simple(self):
+        '''Custom AppVM IP'''
+        self.testvm1.ip6 = '2000:aaaa:bbbb::1'
+        self.app.save()
+        self.loop.run_until_complete(self.testvm1.start())
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
+
+    def test_711_ipv6_custom_ip_proxy(self):
+        '''Custom ProxyVM IP'''
+        self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('proxy'),
+            label='red')
+        self.loop.run_until_complete(self.proxy.create_on_disk())
+        self.proxy.provides_network = True
+        self.proxy.netvm = self.testnetvm
+        self.testvm1.ip6 = '2000:aaaa:bbbb::1'
+        self.testvm1.netvm = self.proxy
+        self.app.save()
+
+        self.loop.run_until_complete(self.testvm1.start())
+
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
+        self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
+
+    def test_712_ipv6_custom_ip_firewall(self):
+        '''Custom VM IP and firewall'''
+        self.testvm1.ip6 = '2000:aaaa:bbbb::1'
+
+        self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
+            name=self.make_vm_name('proxy'),
+            label='red')
+        self.proxy.provides_network = True
+        self.loop.run_until_complete(self.proxy.create_on_disk())
+        self.proxy.netvm = self.testnetvm
+        self.testvm1.netvm = self.proxy
+        self.app.save()
+
+        if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0:
+            nc_version = NcVersion.Nmap
+        else:
+            nc_version = NcVersion.Trad
+
+        # block all but ICMP and DNS
+
+        self.testvm1.firewall.rules = [
+            qubes.firewall.Rule(None, action='accept', proto='icmp'),
+            qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
+        ]
+        self.testvm1.firewall.save()
+        self.loop.run_until_complete(self.testvm1.start())
+        self.assertTrue(self.proxy.is_running())
+
+        nc = self.loop.run_until_complete(self.testnetvm.run(
+            'nc -l --send-only -e /bin/hostname -k 1234'
+            if nc_version == NcVersion.Nmap
+            else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
+
+        try:
+            self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
+                            "Ping by IP from ProxyVM failed")
+            self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
+                            "Ping by name from ProxyVM failed")
+            self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
+                            "Ping by IP should be allowed")
+            self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
+                            "Ping by name should be allowed")
+            if nc_version == NcVersion.Nmap:
+                nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip6)
+            else:
+                nc_cmd = "nc -w 1 {} 1234".format(self.test_ip6)
+            self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
+                            "TCP connection should be blocked")
+        finally:
+            nc.terminate()
+            self.loop.run_until_complete(nc.wait())
 
 # noinspection PyAttributeOutsideInit
 class VmUpdatesMixin(object):
@@ -960,6 +1329,11 @@ def load_tests(loader, tests, pattern):
                 'VmNetworking_' + template,
                 (VmNetworkingMixin, qubes.tests.SystemTestCase),
                 {'template': template})))
+        tests.addTests(loader.loadTestsFromTestCase(
+            type(
+                'VmIPv6Networking_' + template,
+                (VmIPv6NetworkingMixin, qubes.tests.SystemTestCase),
+                {'template': template})))
         tests.addTests(loader.loadTestsFromTestCase(
             type(
                 'VmUpdates_' + template,