diff --git a/qubes/tests/integ/network.py b/qubes/tests/integ/network.py index 18f04500..e1fb075b 100644 --- a/qubes/tests/integ/network.py +++ b/qubes/tests/integ/network.py @@ -201,78 +201,87 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - self.loop.run_until_complete(self.testnetvm.run_for_stdio( + nc = self.loop.run_until_complete(self.testnetvm.run( 'nc -l --send-only -e /bin/hostname -k 1234' if nc_version == NcVersion.Nmap else 'while nc -l -e /bin/hostname -p 1234; do true; done')) - self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, - "Ping by IP from ProxyVM failed") - self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, - "Ping by name from ProxyVM failed") - self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, - "Ping by IP should be blocked") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, - "TCP connection should be blocked") + try: + self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, + "Ping by name from ProxyVM failed") + self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, + "Ping by IP should be blocked") - # block all except ICMP + if nc_version == NcVersion.Nmap: + nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) + else: + nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") - self.testvm1.firewall.rules = [( - qubes.firewall.Rule(None, action='accept', proto='icmp') - )] - self.testvm1.firewall.save() - # Ugly hack b/c there is no feedback when the rules are actually applied - time.sleep(3) - self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, - "Ping by IP failed (should be allowed now)") - self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0, - "Ping by name should be blocked") + # block all except ICMP - # all TCP still blocked + self.testvm1.firewall.rules = [( + qubes.firewall.Rule(None, action='accept', proto='icmp') + )] + self.testvm1.firewall.save() + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, + "Ping by IP failed (should be allowed now)") + self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0, + "Ping by name should be blocked") - self.testvm1.firewall.rules = [ - qubes.firewall.Rule(None, action='accept', proto='icmp'), - qubes.firewall.Rule(None, action='accept', specialtarget='dns'), - ] - self.testvm1.firewall.save() - # Ugly hack b/c there is no feedback when the rules are actually applied - time.sleep(3) - self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, - "Ping by name failed (should be allowed now)") - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, - "TCP connection should be blocked") + # all TCP still blocked - # block all except target + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', proto='icmp'), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + self.testvm1.firewall.save() + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, + "Ping by name failed (should be allowed now)") + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") - self.testvm1.firewall.policy = 'drop' - self.testvm1.firewall.rules = [ - qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip, - proto='tcp', dstports=1234), - ] - self.testvm1.firewall.save() + # block all except target - # Ugly hack b/c there is no feedback when the rules are actually applied - time.sleep(3) - self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, - "TCP connection failed (should be allowed now)") + self.testvm1.firewall.policy = 'drop' + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip, + proto='tcp', dstports=1234), + ] + self.testvm1.firewall.save() - # allow all except target + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection failed (should be allowed now)") - self.testvm1.firewall.policy = 'accept' - self.testvm1.firewall.rules = [ - qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip, - proto='tcp', dstports=1234), - ] - self.testvm1.firewall.save() + # allow all except target - # Ugly hack b/c there is no feedback when the rules are actually applied - time.sleep(3) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, - "TCP connection should be blocked") + self.testvm1.firewall.policy = 'accept' + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip, + proto='tcp', dstports=1234), + ] + self.testvm1.firewall.save() + + # Ugly hack b/c there is no feedback when the rules are actually + # applied + time.sleep(3) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + finally: + nc.terminate() + self.loop.run_until_complete(nc.wait()) def test_040_inter_vm(self): @@ -318,9 +327,9 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.loop.run_until_complete(self.testvm1.run_for_stdio(''' - ip addr flush dev eth0 - ip addr add 10.137.1.128/24 dev eth0 - ip route add default dev eth0 + ip addr flush dev eth0 && + ip addr add 10.137.1.128/24 dev eth0 && + ip route add default dev eth0 && ''', user='root')) self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, "Spoofed ping should be blocked") @@ -356,6 +365,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): except subprocess.CalledProcessError: self.fail('ip addr show dev eth0 failed') + output = output.decode() self.assertIn('192.168.1.128', output) self.assertNotIn(self.testvm1.ip, output) @@ -365,6 +375,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): except subprocess.CalledProcessError: self.fail('ip route show failed') + output = output.decode() self.assertIn('192.168.1.1', output) self.assertNotIn(self.testvm1.netvm.ip, output) @@ -383,6 +394,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): except subprocess.CalledProcessError: self.fail('ip addr show dev eth0 failed') + output = output.decode() self.assertIn('192.168.1.128', output) self.assertNotIn(self.testvm1.ip, output) @@ -417,25 +429,29 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - self.loop.run_until_complete(self.testnetvm.run_for_stdio( + nc = self.loop.run_until_complete(self.testnetvm.run( 'nc -l --send-only -e /bin/hostname -k 1234' if nc_version == NcVersion.Nmap else 'while nc -l -e /bin/hostname -p 1234; do true; done')) - self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, - "Ping by IP from ProxyVM failed") - self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, - "Ping by name from ProxyVM failed") - self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, - "Ping by IP should be allowed") - self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, - "Ping by name should be allowed") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, - "TCP connection should be blocked") + try: + self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, + "Ping by name from ProxyVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, + "Ping by IP should be allowed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, + "Ping by name should be allowed") + if nc_version == NcVersion.Nmap: + nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) + else: + nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + finally: + nc.terminate() + self.loop.run_until_complete(nc.wait()) def test_203_fake_ip_inter_vm_allow(self): '''Access VM with "fake IP" from other VM (when firewall allows)''' @@ -478,8 +494,10 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.ping_cmd.format(target=self.testvm1.ip)), 0) try: - (stdout, _) = self.loop.run_until_complete(self.testvm1.run_for_stdio( - 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip), user='root')) + (stdout, _) = self.loop.run_until_complete( + self.testvm1.run_for_stdio( + 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip), + user='root')) except subprocess.CalledProcessError as e: self.fail( '{} failed with {}'.format(cmd, e.returncode)) @@ -513,6 +531,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): 'ip addr show dev eth0', user='root')) except subprocess.CalledProcessError as e: self.fail('ip addr show dev eth0 failed') + output = output.decode() self.assertIn('192.168.1.128', output) self.assertNotIn(self.testvm1.ip, output) @@ -522,6 +541,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): 'ip route show', user='root')) except subprocess.CalledProcessError as e: self.fail('ip route show failed') + output = output.decode() self.assertIn('192.168.1.1', output) self.assertNotIn(self.testvm1.netvm.ip, output) @@ -531,6 +551,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): 'ip addr show dev eth0', user='root')) except subprocess.CalledProcessError as e: self.fail('ip addr show dev eth0 failed') + output = output.decode() self.assertNotIn('192.168.1.128', output) self.assertIn(self.testvm1.ip, output) @@ -540,6 +561,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): 'ip route show', user='root')) except subprocess.CalledProcessError as e: self.fail('ip route show failed') + output = output.decode() self.assertIn('192.168.1.128', output) self.assertNotIn(self.proxy.ip, output) @@ -597,25 +619,29 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.loop.run_until_complete(self.testvm1.start()) self.assertTrue(self.proxy.is_running()) - self.loop.run_until_complete(self.testnetvm.run_for_stdio( + nc = self.loop.run_until_complete(self.testnetvm.run( 'nc -l --send-only -e /bin/hostname -k 1234' if nc_version == NcVersion.Nmap else 'while nc -l -e /bin/hostname -p 1234; do true; done')) - self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, - "Ping by IP from ProxyVM failed") - self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, - "Ping by name from ProxyVM failed") - self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, - "Ping by IP should be allowed") - self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, - "Ping by name should be allowed") - if nc_version == NcVersion.Nmap: - nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) - else: - nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) - self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, - "TCP connection should be blocked") + try: + self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, + "Ping by name from ProxyVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, + "Ping by IP should be allowed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, + "Ping by name should be allowed") + if nc_version == NcVersion.Nmap: + nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) + else: + nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + finally: + nc.terminate() + self.loop.run_until_complete(nc.wait()) # noinspection PyAttributeOutsideInit diff --git a/qubes/tests/integ/storage.py b/qubes/tests/integ/storage.py index f06f82c6..8f93d9da 100644 --- a/qubes/tests/integ/storage.py +++ b/qubes/tests/integ/storage.py @@ -110,9 +110,11 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin): yield from self.vm1.shutdown(wait=True) yield from self.vm1.start() # non-volatile image volatile - yield from self.vm1.run_for_stdio( - 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size), - user='root') + with self.assertRaises(subprocess.CalledProcessError): + yield from self.vm1.run_for_stdio( + 'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format( + size), + user='root') def test_002_read_only(self): '''Test read-only volume''' diff --git a/qubes/tests/integ/vm_qrexec_gui.py b/qubes/tests/integ/vm_qrexec_gui.py index 7556089a..b0e38539 100644 --- a/qubes/tests/integ/vm_qrexec_gui.py +++ b/qubes/tests/integ/vm_qrexec_gui.py @@ -240,7 +240,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): stderr=subprocess.PIPE) # this will hang on test failure - stdout = yield from p.stdout.read() + stdout = yield from asyncio.wait_for(p.stdout.read(), timeout=10) p.stdin.write(TEST_DATA) yield from p.stdin.drain() @@ -376,12 +376,6 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): handling anything else. """ - def run(self): - # first write a lot of data to fill all the buffers - # then after some time start reading - p.communicate() - result.value = p.returncode - self.loop.run_until_complete(asyncio.wait([ self.testvm1.start(), self.testvm2.start()])) @@ -396,6 +390,8 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): with self.qrexec_policy('test.write', self.testvm1, self.testvm2): try: self.loop.run_until_complete(asyncio.wait_for( + # first write a lot of data to fill all the buffers + # then after some time start reading self.testvm1.run_for_stdio('''\ /usr/lib/qubes/qrexec-client-vm {} test.write \ /bin/sh -c ' @@ -531,7 +527,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): with self.qrexec_policy('test.Argument+argument', self.testvm1, self.testvm2, allow=False): with self.assertRaises(subprocess.CalledProcessError, - 'Service request should be denied'): + msg='Service request should be denied'): self.loop.run_until_complete( self.testvm1.run('/usr/lib/qubes/qrexec-client-vm {} ' 'test.Argument+argument'.format(self.testvm2.name))) @@ -707,7 +703,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin): 'yes teststring | dd of=testfile bs=1M count=50 iflag=fullblock')) # Prepare target directory with limited size - self.loop.run_until_complete(self.testvm2.run( + self.loop.run_until_complete(self.testvm2.run_for_stdio( 'mkdir -p /home/user/QubesIncoming && ' 'chown user /home/user/QubesIncoming && ' 'mount -t tmpfs none /home/user/QubesIncoming -o size=48M', @@ -902,7 +898,7 @@ int main(int argc, char **argv) { "grep ^MemFree: /proc/meminfo|awk '{print $2}'") memory_pages = int(stdout) // 4 # 4k pages - alloc1 = yield from self.testvm1.run( + alloc1 = yield from self.testvm1.run_for_stdio( 'ulimit -l unlimited; exec /home/user/allocator {}'.format( memory_pages), user="root") @@ -915,10 +911,11 @@ int main(int argc, char **argv) { len('Stage1\nStage2\nStage3\n')) if b'Stage3' not in alloc_out: - # read stderr only in case of failed assert, but still have nice + # read stderr only in case of failed assert (), but still have nice # failure message (don't use self.fail() directly) # - # XXX why don't read stderr always? --woju 20170523 + # stderr isn't always read, because on not-failed run, the process + # is still running, so stderr.read() will wait (indefinitely). self.assertIn(b'Stage3', alloc_out, (yield from alloc1.stderr.read())) @@ -998,7 +995,8 @@ class TC_10_Generic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase): self.loop.run_until_complete(self.vm.start()) with self.qrexec_policy('test.AnyvmDeny', self.vm, '$anyvm'): - with self.assertRaises(subprocess.CalledProcessError): + with self.assertRaises(subprocess.CalledProcessError, + msg='$anyvm matched dom0'): stdout, stderr = self.loop.run_until_complete( self.vm.run_for_stdio( '/usr/lib/qubes/qrexec-client-vm dom0 test.AnyvmDeny'))