Fixes from marmarek's review

This commit is contained in:
Wojtek Porczyk 2017-06-01 00:33:53 +02:00
parent cb3c9a82b3
commit dc793be81f
3 changed files with 135 additions and 109 deletions

View File

@ -201,78 +201,87 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be blocked")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be blocked")
# block all except ICMP
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
self.testvm1.firewall.rules = [(
qubes.firewall.Rule(None, action='accept', proto='icmp')
)]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be blocked")
# block all except ICMP
# all TCP still blocked
self.testvm1.firewall.rules = [(
qubes.firewall.Rule(None, action='accept', proto='icmp')
)]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be blocked")
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
# all TCP still blocked
# block all except target
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# block all except target
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection failed (should be allowed now)")
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# allow all except target
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection failed (should be allowed now)")
self.testvm1.firewall.policy = 'accept'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# allow all except target
# Ugly hack b/c there is no feedback when the rules are actually applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
self.testvm1.firewall.policy = 'accept'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
def test_040_inter_vm(self):
@ -318,9 +327,9 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.loop.run_until_complete(self.testvm1.run_for_stdio('''
ip addr flush dev eth0
ip addr add 10.137.1.128/24 dev eth0
ip route add default dev eth0
ip addr flush dev eth0 &&
ip addr add 10.137.1.128/24 dev eth0 &&
ip route add default dev eth0 &&
''', user='root'))
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Spoofed ping should be blocked")
@ -356,6 +365,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
except subprocess.CalledProcessError:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
@ -365,6 +375,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
except subprocess.CalledProcessError:
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
@ -383,6 +394,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
except subprocess.CalledProcessError:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
@ -417,25 +429,29 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
def test_203_fake_ip_inter_vm_allow(self):
'''Access VM with "fake IP" from other VM (when firewall allows)'''
@ -478,8 +494,10 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.ping_cmd.format(target=self.testvm1.ip)), 0)
try:
(stdout, _) = self.loop.run_until_complete(self.testvm1.run_for_stdio(
'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip), user='root'))
(stdout, _) = self.loop.run_until_complete(
self.testvm1.run_for_stdio(
'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip),
user='root'))
except subprocess.CalledProcessError as e:
self.fail(
'{} failed with {}'.format(cmd, e.returncode))
@ -513,6 +531,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
@ -522,6 +541,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'ip route show', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
@ -531,6 +551,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'ip addr show dev eth0', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip addr show dev eth0 failed')
output = output.decode()
self.assertNotIn('192.168.1.128', output)
self.assertIn(self.testvm1.ip, output)
@ -540,6 +561,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
'ip route show', user='root'))
except subprocess.CalledProcessError as e:
self.fail('ip route show failed')
output = output.decode()
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.proxy.ip, output)
@ -597,25 +619,29 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.loop.run_until_complete(self.testnetvm.run_for_stdio(
nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
# noinspection PyAttributeOutsideInit

View File

@ -110,9 +110,11 @@ class StorageTestMixin(qubes.tests.SystemTestsMixin):
yield from self.vm1.shutdown(wait=True)
yield from self.vm1.start()
# non-volatile image volatile
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(size),
user='root')
with self.assertRaises(subprocess.CalledProcessError):
yield from self.vm1.run_for_stdio(
'head -c {} /dev/zero 2>&1 | diff -q /dev/xvde - 2>&1'.format(
size),
user='root')
def test_002_read_only(self):
'''Test read-only volume'''

View File

@ -240,7 +240,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
stderr=subprocess.PIPE)
# this will hang on test failure
stdout = yield from p.stdout.read()
stdout = yield from asyncio.wait_for(p.stdout.read(), timeout=10)
p.stdin.write(TEST_DATA)
yield from p.stdin.drain()
@ -376,12 +376,6 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
handling anything else.
"""
def run(self):
# first write a lot of data to fill all the buffers
# then after some time start reading
p.communicate()
result.value = p.returncode
self.loop.run_until_complete(asyncio.wait([
self.testvm1.start(),
self.testvm2.start()]))
@ -396,6 +390,8 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
with self.qrexec_policy('test.write', self.testvm1, self.testvm2):
try:
self.loop.run_until_complete(asyncio.wait_for(
# first write a lot of data to fill all the buffers
# then after some time start reading
self.testvm1.run_for_stdio('''\
/usr/lib/qubes/qrexec-client-vm {} test.write \
/bin/sh -c '
@ -531,7 +527,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
with self.qrexec_policy('test.Argument+argument',
self.testvm1, self.testvm2, allow=False):
with self.assertRaises(subprocess.CalledProcessError,
'Service request should be denied'):
msg='Service request should be denied'):
self.loop.run_until_complete(
self.testvm1.run('/usr/lib/qubes/qrexec-client-vm {} '
'test.Argument+argument'.format(self.testvm2.name)))
@ -707,7 +703,7 @@ class TC_00_AppVMMixin(qubes.tests.SystemTestsMixin):
'yes teststring | dd of=testfile bs=1M count=50 iflag=fullblock'))
# Prepare target directory with limited size
self.loop.run_until_complete(self.testvm2.run(
self.loop.run_until_complete(self.testvm2.run_for_stdio(
'mkdir -p /home/user/QubesIncoming && '
'chown user /home/user/QubesIncoming && '
'mount -t tmpfs none /home/user/QubesIncoming -o size=48M',
@ -902,7 +898,7 @@ int main(int argc, char **argv) {
"grep ^MemFree: /proc/meminfo|awk '{print $2}'")
memory_pages = int(stdout) // 4 # 4k pages
alloc1 = yield from self.testvm1.run(
alloc1 = yield from self.testvm1.run_for_stdio(
'ulimit -l unlimited; exec /home/user/allocator {}'.format(
memory_pages),
user="root")
@ -915,10 +911,11 @@ int main(int argc, char **argv) {
len('Stage1\nStage2\nStage3\n'))
if b'Stage3' not in alloc_out:
# read stderr only in case of failed assert, but still have nice
# read stderr only in case of failed assert (), but still have nice
# failure message (don't use self.fail() directly)
#
# XXX why don't read stderr always? --woju 20170523
# stderr isn't always read, because on not-failed run, the process
# is still running, so stderr.read() will wait (indefinitely).
self.assertIn(b'Stage3', alloc_out,
(yield from alloc1.stderr.read()))
@ -998,7 +995,8 @@ class TC_10_Generic(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
self.loop.run_until_complete(self.vm.start())
with self.qrexec_policy('test.AnyvmDeny', self.vm, '$anyvm'):
with self.assertRaises(subprocess.CalledProcessError):
with self.assertRaises(subprocess.CalledProcessError,
msg='$anyvm matched dom0'):
stdout, stderr = self.loop.run_until_complete(
self.vm.run_for_stdio(
'/usr/lib/qubes/qrexec-client-vm dom0 test.AnyvmDeny'))