tests: use socat instead of nc

socat have only one variant, so one command line syntax to handle. It's
also installed by default in Qubes VMs.
This commit is contained in:
Marek Marczykowski-Górecki 2018-10-21 16:26:39 +02:00
parent 08ddeee9fb
commit a972c61914
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -32,10 +32,6 @@ import qubes.firewall
import qubes.vm.qubesvm import qubes.vm.qubesvm
import qubes.vm.appvm import qubes.vm.appvm
class NcVersion:
Trad = 1
Nmap = 2
# noinspection PyAttributeOutsideInit,PyPep8Naming # noinspection PyAttributeOutsideInit,PyPep8Naming
class VmNetworkingMixin(object): class VmNetworkingMixin(object):
@ -63,18 +59,6 @@ class VmNetworkingMixin(object):
return e.returncode return e.returncode
return 0 return 0
def check_nc_version(self, vm):
'''
:type self: qubes.tests.SystemTestCase | VMNetworkingMixin
:param vm: VM where check ncat version in
'''
if self.run_cmd(vm, 'nc -h >/dev/null 2>&1') != 0:
self.skipTest('nc not installed')
if self.run_cmd(vm, 'nc -h 2>&1|grep -q nmap.org') == 0:
return NcVersion.Nmap
else:
return NcVersion.Trad
def setUp(self): def setUp(self):
''' '''
:type self: qubes.tests.SystemTestCase | VMNetworkingMixin :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
@ -228,8 +212,6 @@ class VmNetworkingMixin(object):
self.testvm1.netvm = self.proxy self.testvm1.netvm = self.proxy
self.app.save() self.app.save()
nc_version = self.check_nc_version(self.testnetvm)
# block all for first # block all for first
self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')] self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
@ -237,10 +219,8 @@ class VmNetworkingMixin(object):
self.loop.run_until_complete(self.testvm1.start()) self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running()) self.assertTrue(self.proxy.is_running())
nc = self.loop.run_until_complete(self.testnetvm.run( server = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234' 'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname'))
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
try: try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
@ -250,11 +230,8 @@ class VmNetworkingMixin(object):
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be blocked") "Ping by IP should be blocked")
if nc_version == NcVersion.Nmap: client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
# block all except ICMP # block all except ICMP
@ -283,7 +260,7 @@ class VmNetworkingMixin(object):
time.sleep(3) time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (should be allowed now)") "Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
# block all except target # block all except target
@ -297,7 +274,7 @@ class VmNetworkingMixin(object):
# Ugly hack b/c there is no feedback when the rules are actually # Ugly hack b/c there is no feedback when the rules are actually
# applied # applied
time.sleep(3) time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection failed (should be allowed now)") "TCP connection failed (should be allowed now)")
# allow all except target # allow all except target
@ -312,11 +289,11 @@ class VmNetworkingMixin(object):
# Ugly hack b/c there is no feedback when the rules are actually # Ugly hack b/c there is no feedback when the rules are actually
# applied # applied
time.sleep(3) time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
finally: finally:
nc.terminate() server.terminate()
self.loop.run_until_complete(nc.wait()) self.loop.run_until_complete(server.wait())
def test_040_inter_vm(self): def test_040_inter_vm(self):
@ -479,8 +456,6 @@ class VmNetworkingMixin(object):
self.testvm1.netvm = self.proxy self.testvm1.netvm = self.proxy
self.app.save() self.app.save()
nc_version = self.check_nc_version(self.testnetvm)
# block all but ICMP and DNS # block all but ICMP and DNS
self.testvm1.firewall.rules = [ self.testvm1.firewall.rules = [
@ -491,10 +466,8 @@ class VmNetworkingMixin(object):
self.loop.run_until_complete(self.testvm1.start()) self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running()) self.assertTrue(self.proxy.is_running())
nc = self.loop.run_until_complete(self.testnetvm.run( server = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234' 'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname'))
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
try: try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
@ -505,15 +478,12 @@ class VmNetworkingMixin(object):
"Ping by IP should be allowed") "Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed") "Ping by name should be allowed")
if nc_version == NcVersion.Nmap: client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
finally: finally:
nc.terminate() server.terminate()
self.loop.run_until_complete(nc.wait()) self.loop.run_until_complete(server.wait())
def test_203_fake_ip_inter_vm_allow(self): def test_203_fake_ip_inter_vm_allow(self):
'''Access VM with "fake IP" from other VM (when firewall allows) '''Access VM with "fake IP" from other VM (when firewall allows)
@ -682,8 +652,6 @@ class VmNetworkingMixin(object):
self.testvm1.netvm = self.proxy self.testvm1.netvm = self.proxy
self.app.save() self.app.save()
nc_version = self.check_nc_version(self.testnetvm)
# block all but ICMP and DNS # block all but ICMP and DNS
self.testvm1.firewall.rules = [ self.testvm1.firewall.rules = [
@ -694,10 +662,8 @@ class VmNetworkingMixin(object):
self.loop.run_until_complete(self.testvm1.start()) self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running()) self.assertTrue(self.proxy.is_running())
nc = self.loop.run_until_complete(self.testnetvm.run( server = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234' 'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname'))
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
try: try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
@ -708,15 +674,12 @@ class VmNetworkingMixin(object):
"Ping by IP should be allowed") "Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed") "Ping by name should be allowed")
if nc_version == NcVersion.Nmap: client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
finally: finally:
nc.terminate() server.terminate()
self.loop.run_until_complete(nc.wait()) self.loop.run_until_complete(server.wait())
# noinspection PyAttributeOutsideInit,PyPep8Naming # noinspection PyAttributeOutsideInit,PyPep8Naming
class VmIPv6NetworkingMixin(VmNetworkingMixin): class VmIPv6NetworkingMixin(VmNetworkingMixin):
@ -852,9 +815,6 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
self.testvm1.netvm = self.proxy self.testvm1.netvm = self.proxy
self.app.save() self.app.save()
if self.run_cmd(self.testnetvm, 'ncat -h') != 0:
self.skipTest('nmap ncat not installed')
# block all for first # block all for first
self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')] self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
@ -862,8 +822,8 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
self.loop.run_until_complete(self.testvm1.start()) self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running()) self.assertTrue(self.proxy.is_running())
nc = self.loop.run_until_complete(self.testnetvm.run( server = self.loop.run_until_complete(self.testnetvm.run(
'ncat -l --send-only -e /bin/hostname -k 1234')) 'socat TCP6-LISTEN:1234,fork EXEC:/bin/hostname'))
try: try:
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
@ -873,8 +833,9 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0, self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP should be blocked") "Ping by IP should be blocked")
nc_cmd = "ncat -w 1 --recv-only {} 1234".format(self.test_ip6) client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
# block all except ICMP # block all except ICMP
@ -904,7 +865,7 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
time.sleep(3) time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name failed (should be allowed now)") "Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
# block all except target # block all except target
@ -919,7 +880,7 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
# Ugly hack b/c there is no feedback when the rules are actually # Ugly hack b/c there is no feedback when the rules are actually
# applied # applied
time.sleep(3) time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection failed (should be allowed now)") "TCP connection failed (should be allowed now)")
# block all except target - by name # block all except target - by name
@ -934,10 +895,9 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
# Ugly hack b/c there is no feedback when the rules are actually # Ugly hack b/c there is no feedback when the rules are actually
# applied # applied
time.sleep(3) time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP (IPv6) connection failed (should be allowed now)") "TCP (IPv6) connection failed (should be allowed now)")
self.assertEqual(self.run_cmd(self.testvm1, self.assertEqual(self.run_cmd(self.testvm1, client4_cmd),
nc_cmd.replace(self.test_ip6, self.test_ip)),
0, 0,
"TCP (IPv4) connection failed (should be allowed now)") "TCP (IPv4) connection failed (should be allowed now)")
@ -953,11 +913,11 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
# Ugly hack b/c there is no feedback when the rules are actually # Ugly hack b/c there is no feedback when the rules are actually
# applied # applied
time.sleep(3) time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
finally: finally:
nc.terminate() server.terminate()
self.loop.run_until_complete(nc.wait()) self.loop.run_until_complete(server.wait())
def test_540_ipv6_inter_vm(self): def test_540_ipv6_inter_vm(self):
@ -1081,8 +1041,6 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
self.testvm1.netvm = self.proxy self.testvm1.netvm = self.proxy
self.app.save() self.app.save()
nc_version = self.check_nc_version(self.testnetvm)
# block all but ICMP and DNS # block all but ICMP and DNS
self.testvm1.firewall.rules = [ self.testvm1.firewall.rules = [
@ -1093,10 +1051,8 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
self.loop.run_until_complete(self.testvm1.start()) self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running()) self.assertTrue(self.proxy.is_running())
nc = self.loop.run_until_complete(self.testnetvm.run( server = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234' 'socat TCP6-LISTEN:1234,fork EXEC:/bin/hostname'))
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
try: try:
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0, self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
@ -1107,15 +1063,12 @@ class VmIPv6NetworkingMixin(VmNetworkingMixin):
"Ping by IP should be allowed") "Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0, self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name should be allowed") "Ping by name should be allowed")
if nc_version == NcVersion.Nmap: client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip6) self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked") "TCP connection should be blocked")
finally: finally:
nc.terminate() server.terminate()
self.loop.run_until_complete(nc.wait()) self.loop.run_until_complete(server.wait())
# noinspection PyAttributeOutsideInit,PyPep8Naming # noinspection PyAttributeOutsideInit,PyPep8Naming
class VmUpdatesMixin(object): class VmUpdatesMixin(object):