diff --git a/qubes/tests/integ/network.py b/qubes/tests/integ/network.py index 524dd5b8..1b4c9e04 100644 --- a/qubes/tests/integ/network.py +++ b/qubes/tests/integ/network.py @@ -32,10 +32,6 @@ import qubes.firewall import qubes.vm.qubesvm import qubes.vm.appvm -class NcVersion: - Trad = 1 - Nmap = 2 - # noinspection PyAttributeOutsideInit,PyPep8Naming class VmNetworkingMixin(object): @@ -63,18 +59,6 @@ class VmNetworkingMixin(object): return e.returncode return 0 - def check_nc_version(self, vm): - ''' - :type self: qubes.tests.SystemTestCase | VMNetworkingMixin - :param vm: VM where check ncat version in - ''' - if self.run_cmd(vm, 'nc -h >/dev/null 2>&1') != 0: - self.skipTest('nc not installed') - if self.run_cmd(vm, 'nc -h 2>&1|grep -q nmap.org') == 0: - return NcVersion.Nmap - else: - return NcVersion.Trad - def setUp(self): ''' :type self: qubes.tests.SystemTestCase | VMNetworkingMixin @@ -228,8 +212,6 @@ class VmNetworkingMixin(object): self.testvm1.netvm = self.proxy self.app.save() - nc_version = self.check_nc_version(self.testnetvm) - # block all for first self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')] @@ -237,10 +219,8 @@ class VmNetworkingMixin(object): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - nc = self.loop.run_until_complete(self.testnetvm.run( - 'nc -l --send-only -e /bin/hostname -k 1234' - if nc_version == NcVersion.Nmap - else 'while nc -l -e /bin/hostname -p 1234; do true; done')) + server = self.loop.run_until_complete(self.testnetvm.run( + 'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname')) try: self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, @@ -250,11 +230,8 @@ class VmNetworkingMixin(object): self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, "Ping by IP should be blocked") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + client_cmd = "socat TCP:{}:1234 -".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection should be blocked") # block all except ICMP @@ -283,7 +260,7 @@ class VmNetworkingMixin(object): time.sleep(3) self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, "Ping by name failed (should be allowed now)") - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection should be blocked") # block all except target @@ -297,7 +274,7 @@ class VmNetworkingMixin(object): # Ugly hack b/c there is no feedback when the rules are actually # applied time.sleep(3) - self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection failed (should be allowed now)") # allow all except target @@ -312,11 +289,11 @@ class VmNetworkingMixin(object): # Ugly hack b/c there is no feedback when the rules are actually # applied time.sleep(3) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection should be blocked") finally: - nc.terminate() - self.loop.run_until_complete(nc.wait()) + server.terminate() + self.loop.run_until_complete(server.wait()) def test_040_inter_vm(self): @@ -479,8 +456,6 @@ class VmNetworkingMixin(object): self.testvm1.netvm = self.proxy self.app.save() - nc_version = self.check_nc_version(self.testnetvm) - # block all but ICMP and DNS self.testvm1.firewall.rules = [ @@ -491,10 +466,8 @@ class VmNetworkingMixin(object): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - nc = self.loop.run_until_complete(self.testnetvm.run( - 'nc -l --send-only -e /bin/hostname -k 1234' - if nc_version == NcVersion.Nmap - else 'while nc -l -e /bin/hostname -p 1234; do true; done')) + server = self.loop.run_until_complete(self.testnetvm.run( + 'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname')) try: self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, @@ -505,15 +478,12 @@ class VmNetworkingMixin(object): "Ping by IP should be allowed") self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, "Ping by name should be allowed") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + client_cmd = "socat TCP:{}:1234 -".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection should be blocked") finally: - nc.terminate() - self.loop.run_until_complete(nc.wait()) + server.terminate() + self.loop.run_until_complete(server.wait()) def test_203_fake_ip_inter_vm_allow(self): '''Access VM with "fake IP" from other VM (when firewall allows) @@ -682,8 +652,6 @@ class VmNetworkingMixin(object): self.testvm1.netvm = self.proxy self.app.save() - nc_version = self.check_nc_version(self.testnetvm) - # block all but ICMP and DNS self.testvm1.firewall.rules = [ @@ -694,10 +662,8 @@ class VmNetworkingMixin(object): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - nc = self.loop.run_until_complete(self.testnetvm.run( - 'nc -l --send-only -e /bin/hostname -k 1234' - if nc_version == NcVersion.Nmap - else 'while nc -l -e /bin/hostname -p 1234; do true; done')) + server = self.loop.run_until_complete(self.testnetvm.run( + 'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname')) try: self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, @@ -708,15 +674,12 @@ class VmNetworkingMixin(object): "Ping by IP should be allowed") self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, "Ping by name should be allowed") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + client_cmd = "socat TCP:{}:1234 -".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection should be blocked") finally: - nc.terminate() - self.loop.run_until_complete(nc.wait()) + server.terminate() + self.loop.run_until_complete(server.wait()) # noinspection PyAttributeOutsideInit,PyPep8Naming class VmIPv6NetworkingMixin(VmNetworkingMixin): @@ -852,9 +815,6 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): self.testvm1.netvm = self.proxy self.app.save() - if self.run_cmd(self.testnetvm, 'ncat -h') != 0: - self.skipTest('nmap ncat not installed') - # block all for first self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')] @@ -862,8 +822,8 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - nc = self.loop.run_until_complete(self.testnetvm.run( - 'ncat -l --send-only -e /bin/hostname -k 1234')) + server = self.loop.run_until_complete(self.testnetvm.run( + 'socat TCP6-LISTEN:1234,fork EXEC:/bin/hostname')) try: self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, @@ -873,8 +833,9 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, "Ping by IP should be blocked") - nc_cmd = "ncat -w 1 --recv-only {} 1234".format(self.test_ip6) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6) + client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0, "TCP connection should be blocked") # block all except ICMP @@ -904,7 +865,7 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): time.sleep(3) self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, "Ping by name failed (should be allowed now)") - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0, "TCP connection should be blocked") # block all except target @@ -919,7 +880,7 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): # Ugly hack b/c there is no feedback when the rules are actually # applied time.sleep(3) - self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0, "TCP connection failed (should be allowed now)") # block all except target - by name @@ -934,10 +895,9 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): # Ugly hack b/c there is no feedback when the rules are actually # applied time.sleep(3) - self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0, "TCP (IPv6) connection failed (should be allowed now)") - self.assertEqual(self.run_cmd(self.testvm1, - nc_cmd.replace(self.test_ip6, self.test_ip)), + self.assertEqual(self.run_cmd(self.testvm1, client4_cmd), 0, "TCP (IPv4) connection failed (should be allowed now)") @@ -953,11 +913,11 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): # Ugly hack b/c there is no feedback when the rules are actually # applied time.sleep(3) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0, "TCP connection should be blocked") finally: - nc.terminate() - self.loop.run_until_complete(nc.wait()) + server.terminate() + self.loop.run_until_complete(server.wait()) def test_540_ipv6_inter_vm(self): @@ -1081,8 +1041,6 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): self.testvm1.netvm = self.proxy self.app.save() - nc_version = self.check_nc_version(self.testnetvm) - # block all but ICMP and DNS self.testvm1.firewall.rules = [ @@ -1093,10 +1051,8 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - nc = self.loop.run_until_complete(self.testnetvm.run( - 'nc -l --send-only -e /bin/hostname -k 1234' - if nc_version == NcVersion.Nmap - else 'while nc -l -e /bin/hostname -p 1234; do true; done')) + server = self.loop.run_until_complete(self.testnetvm.run( + 'socat TCP6-LISTEN:1234,fork EXEC:/bin/hostname')) try: self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, @@ -1107,15 +1063,12 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin): "Ping by IP should be allowed") self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, "Ping by name should be allowed") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip6) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip6) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6) + self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0, "TCP connection should be blocked") finally: - nc.terminate() - self.loop.run_until_complete(nc.wait()) + server.terminate() + self.loop.run_until_complete(server.wait()) # noinspection PyAttributeOutsideInit,PyPep8Naming class VmUpdatesMixin(object):