Merge remote-tracking branch 'marmarek/core3-fake-ip' into core3-devel

This commit is contained in:
Wojtek Porczyk 2016-11-15 17:40:30 +01:00
commit 37bfd0d2a3
5 changed files with 353 additions and 59 deletions

View File

@ -58,7 +58,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
def setUp(self): def setUp(self):
super(VmNetworkingMixin, self).setUp() super(VmNetworkingMixin, self).setUp()
if self.template.startswith('whonix-'): if self.template.startswith('whonix-gw'):
self.skipTest("Test not supported here - Whonix uses its own " self.skipTest("Test not supported here - Whonix uses its own "
"firewall settings") "firewall settings")
self.init_default_template(self.template) self.init_default_template(self.template)
@ -339,6 +339,278 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin):
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
def test_200_fake_ip_simple(self):
'''Test hiding VM real IP'''
self.testvm1.features['net/fake-ip'] = '192.168.1.128'
self.testvm1.features['net/fake-gateway'] = '192.168.1.1'
self.testvm1.features['net/fake-netmask'] = '255.255.255.0'
self.app.save()
self.testvm1.start()
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
p = self.testvm1.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip route show failed')
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
def test_201_fake_ip_without_gw(self):
'''Test hiding VM real IP'''
self.testvm1.features['net/fake-ip'] = '192.168.1.128'
self.app.save()
self.testvm1.start()
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
def test_202_fake_ip_firewall(self):
'''Test hiding VM real IP, firewall'''
self.testvm1.features['net/fake-ip'] = '192.168.1.128'
self.testvm1.features['net/fake-gateway'] = '192.168.1.1'
self.testvm1.features['net/fake-netmask'] = '255.255.255.0'
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0:
nc_version = NcVersion.Nmap
else:
nc_version = NcVersion.Trad
# block all but ICMP and DNS
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.testvm1.start()
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
def test_203_fake_ip_inter_vm_allow(self):
'''Access VM with "fake IP" from other VM (when firewall allows)'''
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.testvm1.features['net/fake-ip'] = '192.168.1.128'
self.testvm1.features['net/fake-gateway'] = '192.168.1.1'
self.testvm1.features['net/fake-netmask'] = '255.255.255.0'
self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm3'),
label='red')
self.testvm2.create_on_disk()
self.testvm2.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.testvm2.start()
cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
self.testvm2.ip, self.testvm1.ip)
retcode = self.proxy.run(cmd, user='root', wait=True)
self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode))
cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(
self.testvm2.ip)
retcode = self.testvm1.run(cmd, user='root', wait=True)
self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode))
self.assertEqual(self.run_cmd(self.testvm2,
self.ping_cmd.format(target=self.testvm1.ip)), 0)
cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
p = self.testvm1.run(cmd, user='root', passio_popen=True)
(stdout, _) = p.communicate()
self.assertEqual(p.returncode, 0,
'{} failed with {}'.format(cmd, p.returncode))
self.assertNotEqual(stdout.split()[0], '0',
'Packets didn\'t managed to the VM')
def test_204_fake_ip_proxy(self):
'''Test hiding VM real IP'''
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.features['net/fake-ip'] = '192.168.1.128'
self.proxy.features['net/fake-gateway'] = '192.168.1.1'
self.proxy.features['net/fake-netmask'] = '255.255.255.0'
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
p = self.proxy.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.testvm1.ip, output)
p = self.proxy.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip route show failed')
self.assertIn('192.168.1.1', output)
self.assertNotIn(self.testvm1.netvm.ip, output)
p = self.testvm1.run('ip addr show dev eth0', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed')
self.assertNotIn('192.168.1.128', output)
self.assertIn(self.testvm1.ip, output)
p = self.testvm1.run('ip route show', user='root',
passio_popen=True,
ignore_stderr=True)
p.stdin.close()
output = p.stdout.read()
self.assertEqual(p.wait(), 0, 'ip route show failed')
self.assertIn('192.168.1.128', output)
self.assertNotIn(self.proxy.ip, output)
def test_210_custom_ip_simple(self):
'''Custom AppVM IP'''
self.testvm1.ip = '192.168.1.1'
self.app.save()
self.testvm1.start()
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
def test_211_custom_ip_proxy(self):
'''Custom ProxyVM IP'''
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.create_on_disk()
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.proxy.ip = '192.168.1.1'
self.testvm1.netvm = self.proxy
self.app.save()
self.testvm1.start()
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
def test_212_custom_ip_firewall(self):
'''Custom VM IP and firewall'''
self.testvm1.ip = '192.168.1.1'
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.create_on_disk()
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()
if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0:
nc_version = NcVersion.Nmap
else:
nc_version = NcVersion.Trad
# block all but ICMP and DNS
self.testvm1.firewall.policy = 'drop'
self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.testvm1.start()
self.assertTrue(self.proxy.is_running())
if nc_version == NcVersion.Nmap:
self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234")
else:
self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do "
"true; done")
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
# noinspection PyAttributeOutsideInit # noinspection PyAttributeOutsideInit
class VmUpdatesMixin(qubes.tests.SystemTestsMixin): class VmUpdatesMixin(qubes.tests.SystemTestsMixin):
""" """

View File

@ -84,49 +84,17 @@ class TC_00_NetVMMixin(
self.app.domains = {1: vm, vm: vm} self.app.domains = {1: vm, vm: vm}
self.assertPropertyInvalidValue(vm, 'netvm', vm) self.assertPropertyInvalidValue(vm, 'netvm', vm)
@unittest.skip('TODO: probably remove') def test_150_ip(self):
def test_290_dispvm_netvm(self):
vm = self.get_vm() vm = self.get_vm()
self.setup_netvms(vm) self.setup_netvms(vm)
self.assertPropertyDefaultValue(vm, 'dispvm_netvm', self.assertPropertyDefaultValue(vm, 'ip', '10.137.0.' + str(vm.qid))
self.app.default_netvm) vm.ip = '192.168.1.1'
self.assertPropertyValue(vm, 'dispvm_netvm', self.netvm2, self.netvm2, self.assertEqual(vm.ip, '192.168.1.1')
self.netvm2.name)
del vm.dispvm_netvm
self.assertPropertyDefaultValue(vm, 'dispvm_netvm',
self.app.default_netvm)
self.assertPropertyValue(vm, 'dispvm_netvm', self.netvm2.name,
self.netvm2, self.netvm2.name)
# XXX FIXME xml value
self.assertPropertyValue(vm, 'dispvm_netvm', None, None, 'None')
@unittest.skip('TODO: probably remove') def test_151_ip_invalid(self):
def test_291_dispvm_netvm_invalid(self):
vm = self.get_vm() vm = self.get_vm()
self.setup_netvms(vm) self.setup_netvms(vm)
self.assertPropertyInvalidValue(vm, 'dispvm_netvm', 'invalid') self.assertPropertyInvalidValue(vm, 'ip', 'abcd')
self.assertPropertyInvalidValue(vm, 'dispvm_netvm', 123) self.assertPropertyInvalidValue(vm, 'ip', 'a.b.c.d')
self.assertPropertyInvalidValue(vm, 'ip', '1111.2222.3333.4444')
@unittest.skip('TODO: probably remove') # TODO: implement and add here: 0.0.0.0, 333.333.333.333
def test_291_dispvm_netvm_netvm(self):
vm = self.get_vm()
nonetvm = TestVM(qid=2, app=self.app, name='nonetvm')
self.app.domains = {1: vm, 2: nonetvm}
self.assertPropertyInvalidValue(vm, 'dispvm_netvm', nonetvm)
@unittest.skip('TODO: probably remove')
def test_291_dispvm_netvm_default(self):
"""Check if vm.dispvm_netvm default is really vm.netvm"""
vm = self.get_vm()
self.setup_netvms(vm)
vm.netvm = self.netvm2
self.assertPropertyDefaultValue(vm, 'dispvm_netvm', self.netvm2)
del vm.netvm
self.assertPropertyDefaultValue(vm, 'dispvm_netvm', self.netvm1)
@unittest.skip('TODO: probably remove')
def test_292_dispvm_netvm_loopback(self):
vm = self.get_vm()
self.app.domains = {1: vm, vm: vm}
self.assertPropertyInvalidValue(vm, 'dispvm_netvm', vm)

View File

@ -127,11 +127,8 @@ class Features(dict):
return self[feature] return self[feature]
if hasattr(self.vm, 'template') and self.vm.template is not None: if hasattr(self.vm, 'template') and self.vm.template is not None:
try: return self.vm.template.features.check_with_template(feature,
return self.vm.template.features[feature] default)
except KeyError:
# handle default just below
pass
if default is self._NO_DEFAULT: if default is self._NO_DEFAULT:
raise KeyError(feature) raise KeyError(feature)

View File

@ -45,6 +45,25 @@ def _setter_mac(self, prop, value):
return value return value
def _default_ip(self):
if not self.is_networked():
return None
if self.netvm is not None:
return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member
else:
return self.get_ip_for_vm(self)
def _setter_ip(self, prop, value):
# pylint: disable=unused-argument
if not isinstance(value, basestring):
raise ValueError('IP address must be a string')
value = value.lower()
if re.match(r"^([0-9]{1,3}.){3}[0-9]{1,3}$", value) is None:
raise ValueError('Invalid IP address value')
return value
class NetVMMixin(qubes.events.Emitter): class NetVMMixin(qubes.events.Emitter):
''' Mixin containing network functionality ''' ''' Mixin containing network functionality '''
mac = qubes.property('mac', type=str, mac = qubes.property('mac', type=str,
@ -53,6 +72,12 @@ class NetVMMixin(qubes.events.Emitter):
ls_width=17, ls_width=17,
doc='MAC address of the NIC emulated inside VM') doc='MAC address of the NIC emulated inside VM')
ip = qubes.property('ip', type=str,
default=_default_ip,
setter=_setter_ip,
ls_width=15,
doc='IP address of this domain.')
# CORE2: swallowed uses_default_netvm # CORE2: swallowed uses_default_netvm
netvm = qubes.VMProperty('netvm', load_stage=4, allow_none=True, netvm = qubes.VMProperty('netvm', load_stage=4, allow_none=True,
default=(lambda self: self.app.default_fw_netvm if self.provides_network default=(lambda self: self.app.default_fw_netvm if self.provides_network
@ -74,16 +99,27 @@ class NetVMMixin(qubes.events.Emitter):
# used in networked appvms or proxyvms (netvm is not None) # used in networked appvms or proxyvms (netvm is not None)
# #
@qubes.tools.qvm_ls.column(width=15) @qubes.tools.qvm_ls.column(width=15)
@property @property
def ip(self): def visible_ip(self):
'''IP address of this domain.''' '''IP address of this domain as seen by the domain.'''
if not self.is_networked(): return self.features.check_with_template('net/fake-ip', None) or \
return None self.ip
if self.netvm is not None:
return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member @qubes.tools.qvm_ls.column(width=15)
else: @property
return self.get_ip_for_vm(self) def visible_gateway(self):
'''Default gateway of this domain as seen by the domain.'''
return self.features.check_with_template('net/fake-gateway', None) or \
self.netvm.gateway
@qubes.tools.qvm_ls.column(width=15)
@property
def visible_netmask(self):
'''Netmask as seen by the domain.'''
return self.features.check_with_template('net/fake-netmask', None) or \
self.netvm.netmask
# #
# used in netvms (provides_network=True) # used in netvms (provides_network=True)
@ -106,7 +142,7 @@ class NetVMMixin(qubes.events.Emitter):
@property @property
def gateway(self): def gateway(self):
'''Gateway for other domains that use this domain as netvm.''' '''Gateway for other domains that use this domain as netvm.'''
return self.ip if self.provides_network else None return self.visible_ip if self.provides_network else None
@qubes.tools.qvm_ls.column(width=15) @qubes.tools.qvm_ls.column(width=15)
@property @property
@ -211,6 +247,7 @@ class NetVMMixin(qubes.events.Emitter):
self.log.info('Starting NetVM ({0})'.format(self.netvm.name)) self.log.info('Starting NetVM ({0})'.format(self.netvm.name))
self.netvm.start() self.netvm.start()
self.netvm.set_mapped_ip_info_for_vm(self)
self.libvirt_domain.attachDevice( self.libvirt_domain.attachDevice(
self.app.env.get_template('libvirt/devices/net.xml').render( self.app.env.get_template('libvirt/devices/net.xml').render(
vm=self)) vm=self))
@ -274,6 +311,25 @@ class NetVMMixin(qubes.events.Emitter):
# signal its done # signal its done
self.qdb.write(base_dir[:-1], '') self.qdb.write(base_dir[:-1], '')
def set_mapped_ip_info_for_vm(self, vm):
'''
Set configuration to possibly hide real IP from the VM.
This needs to be done before executing 'script'
(`/etc/xen/scripts/vif-route-qubes`) in network providing VM
'''
# add info about remapped IPs (VM IP hidden from the VM itself)
mapped_ip_base = '/mapped-ip/{}'.format(vm.ip)
if vm.visible_ip:
self.qdb.write(mapped_ip_base + '/visible-ip', vm.visible_ip)
else:
self.qdb.rm(mapped_ip_base + '/visible-ip')
if vm.visible_gateway:
self.qdb.write(mapped_ip_base + '/visible-gateway',
vm.visible_gateway)
else:
self.qdb.rm(mapped_ip_base + '/visible-gateway')
@qubes.events.handler('property-del:netvm') @qubes.events.handler('property-del:netvm')
def on_property_del_netvm(self, event, prop, old_netvm=None): def on_property_del_netvm(self, event, prop, old_netvm=None):
''' Sets the the NetVM to default NetVM ''' ''' Sets the the NetVM to default NetVM '''
@ -342,6 +398,7 @@ class NetVMMixin(qubes.events.Emitter):
''' Reloads the firewall if vm is running and has a NetVM assigned ''' ''' Reloads the firewall if vm is running and has a NetVM assigned '''
# pylint: disable=unused-argument # pylint: disable=unused-argument
if self.is_running() and self.netvm: if self.is_running() and self.netvm:
self.netvm.set_mapped_ip_info_for_vm(self)
self.netvm.reload_firewall_for_vm(self) # pylint: disable=no-member self.netvm.reload_firewall_for_vm(self) # pylint: disable=no-member
# CORE2: swallowed get_firewall_conf, write_firewall_conf, # CORE2: swallowed get_firewall_conf, write_firewall_conf,

View File

@ -1670,9 +1670,9 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.qdb.write('/qubes-netvm-{}-dns'.format(i), addr) self.qdb.write('/qubes-netvm-{}-dns'.format(i), addr)
if self.netvm is not None: if self.netvm is not None:
self.qdb.write('/qubes-ip', self.ip) self.qdb.write('/qubes-ip', self.visible_ip)
self.qdb.write('/qubes-netmask', self.netvm.netmask) self.qdb.write('/qubes-netmask', self.visible_netmask)
self.qdb.write('/qubes-gateway', self.netvm.gateway) self.qdb.write('/qubes-gateway', self.visible_gateway)
for i, addr in zip(('primary', 'secondary'), self.dns): for i, addr in zip(('primary', 'secondary'), self.dns):
self.qdb.write('/qubes-{}-dns'.format(i), addr) self.qdb.write('/qubes-{}-dns'.format(i), addr)