diff --git a/qubes/tests/int/network.py b/qubes/tests/int/network.py index 661a7f5a..977d74d7 100644 --- a/qubes/tests/int/network.py +++ b/qubes/tests/int/network.py @@ -58,7 +58,7 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): def setUp(self): super(VmNetworkingMixin, self).setUp() - if self.template.startswith('whonix-'): + if self.template.startswith('whonix-gw'): self.skipTest("Test not supported here - Whonix uses its own " "firewall settings") self.init_default_template(self.template) @@ -339,6 +339,278 @@ class VmNetworkingMixin(qubes.tests.SystemTestsMixin): self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + def test_200_fake_ip_simple(self): + '''Test hiding VM real IP''' + self.testvm1.features['net/fake-ip'] = '192.168.1.128' + self.testvm1.features['net/fake-gateway'] = '192.168.1.1' + self.testvm1.features['net/fake-netmask'] = '255.255.255.0' + self.app.save() + self.testvm1.start() + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) + p = self.testvm1.run('ip addr show dev eth0', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + self.assertIn('192.168.1.128', output) + self.assertNotIn(self.testvm1.ip, output) + + p = self.testvm1.run('ip route show', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip route show failed') + self.assertIn('192.168.1.1', output) + self.assertNotIn(self.testvm1.netvm.ip, output) + + def test_201_fake_ip_without_gw(self): + '''Test hiding VM real IP''' + self.testvm1.features['net/fake-ip'] = '192.168.1.128' + self.app.save() + self.testvm1.start() + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) + p = self.testvm1.run('ip addr show dev eth0', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + self.assertIn('192.168.1.128', output) + self.assertNotIn(self.testvm1.ip, output) + + def test_202_fake_ip_firewall(self): + '''Test hiding VM real IP, firewall''' + self.testvm1.features['net/fake-ip'] = '192.168.1.128' + self.testvm1.features['net/fake-gateway'] = '192.168.1.1' + self.testvm1.features['net/fake-netmask'] = '255.255.255.0' + + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.provides_network = True + self.proxy.create_on_disk() + self.proxy.netvm = self.testnetvm + self.testvm1.netvm = self.proxy + self.app.save() + + if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0: + nc_version = NcVersion.Nmap + else: + nc_version = NcVersion.Trad + + # block all but ICMP and DNS + + self.testvm1.firewall.policy = 'drop' + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', proto='icmp'), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + self.testvm1.firewall.save() + self.testvm1.start() + self.assertTrue(self.proxy.is_running()) + + if nc_version == NcVersion.Nmap: + self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234") + else: + self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do " + "true; done") + + self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, + "Ping by name from ProxyVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, + "Ping by IP should be allowed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, + "Ping by name should be allowed") + if nc_version == NcVersion.Nmap: + nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) + else: + nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + + def test_203_fake_ip_inter_vm_allow(self): + '''Access VM with "fake IP" from other VM (when firewall allows)''' + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.create_on_disk() + self.proxy.provides_network = True + self.proxy.netvm = self.testnetvm + self.testvm1.netvm = self.proxy + self.testvm1.features['net/fake-ip'] = '192.168.1.128' + self.testvm1.features['net/fake-gateway'] = '192.168.1.1' + self.testvm1.features['net/fake-netmask'] = '255.255.255.0' + + self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('vm3'), + label='red') + self.testvm2.create_on_disk() + self.testvm2.netvm = self.proxy + self.app.save() + + self.testvm1.start() + self.testvm2.start() + + cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format( + self.testvm2.ip, self.testvm1.ip) + retcode = self.proxy.run(cmd, user='root', wait=True) + self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode)) + + cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format( + self.testvm2.ip) + retcode = self.testvm1.run(cmd, user='root', wait=True) + self.assertEqual(retcode, 0, '{} failed with: {}'.format(cmd, retcode)) + + self.assertEqual(self.run_cmd(self.testvm2, + self.ping_cmd.format(target=self.testvm1.ip)), 0) + + cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip) + p = self.testvm1.run(cmd, user='root', passio_popen=True) + (stdout, _) = p.communicate() + self.assertEqual(p.returncode, 0, + '{} failed with {}'.format(cmd, p.returncode)) + self.assertNotEqual(stdout.split()[0], '0', + 'Packets didn\'t managed to the VM') + + def test_204_fake_ip_proxy(self): + '''Test hiding VM real IP''' + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.create_on_disk() + self.proxy.provides_network = True + self.proxy.netvm = self.testnetvm + self.proxy.features['net/fake-ip'] = '192.168.1.128' + self.proxy.features['net/fake-gateway'] = '192.168.1.1' + self.proxy.features['net/fake-netmask'] = '255.255.255.0' + self.testvm1.netvm = self.proxy + self.app.save() + self.testvm1.start() + + self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0) + + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) + + p = self.proxy.run('ip addr show dev eth0', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + self.assertIn('192.168.1.128', output) + self.assertNotIn(self.testvm1.ip, output) + + p = self.proxy.run('ip route show', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip route show failed') + self.assertIn('192.168.1.1', output) + self.assertNotIn(self.testvm1.netvm.ip, output) + + p = self.testvm1.run('ip addr show dev eth0', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip addr show dev eth0 failed') + self.assertNotIn('192.168.1.128', output) + self.assertIn(self.testvm1.ip, output) + + p = self.testvm1.run('ip route show', user='root', + passio_popen=True, + ignore_stderr=True) + p.stdin.close() + output = p.stdout.read() + self.assertEqual(p.wait(), 0, 'ip route show failed') + self.assertIn('192.168.1.128', output) + self.assertNotIn(self.proxy.ip, output) + + def test_210_custom_ip_simple(self): + '''Custom AppVM IP''' + self.testvm1.ip = '192.168.1.1' + self.app.save() + self.testvm1.start() + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) + + def test_211_custom_ip_proxy(self): + '''Custom ProxyVM IP''' + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.create_on_disk() + self.proxy.provides_network = True + self.proxy.netvm = self.testnetvm + self.proxy.ip = '192.168.1.1' + self.testvm1.netvm = self.proxy + self.app.save() + + self.testvm1.start() + + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0) + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0) + + def test_212_custom_ip_firewall(self): + '''Custom VM IP and firewall''' + self.testvm1.ip = '192.168.1.1' + + self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM, + name=self.make_vm_name('proxy'), + label='red') + self.proxy.provides_network = True + self.proxy.create_on_disk() + self.proxy.netvm = self.testnetvm + self.testvm1.netvm = self.proxy + self.app.save() + + if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0: + nc_version = NcVersion.Nmap + else: + nc_version = NcVersion.Trad + + # block all but ICMP and DNS + + self.testvm1.firewall.policy = 'drop' + self.testvm1.firewall.rules = [ + qubes.firewall.Rule(None, action='accept', proto='icmp'), + qubes.firewall.Rule(None, action='accept', specialtarget='dns'), + ] + self.testvm1.firewall.save() + self.testvm1.start() + self.assertTrue(self.proxy.is_running()) + + if nc_version == NcVersion.Nmap: + self.testnetvm.run("nc -l --send-only -e /bin/hostname -k 1234") + else: + self.testnetvm.run("while nc -l -e /bin/hostname -p 1234; do " + "true; done") + + self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0, + "Ping by IP from ProxyVM failed") + self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0, + "Ping by name from ProxyVM failed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0, + "Ping by IP should be allowed") + self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0, + "Ping by name should be allowed") + if nc_version == NcVersion.Nmap: + nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip) + else: + nc_cmd = "nc -w 1 {} 1234".format(self.test_ip) + self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0, + "TCP connection should be blocked") + + # noinspection PyAttributeOutsideInit class VmUpdatesMixin(qubes.tests.SystemTestsMixin): """ diff --git a/qubes/tests/vm/mix/net.py b/qubes/tests/vm/mix/net.py index dfd844d1..2f50ec5c 100644 --- a/qubes/tests/vm/mix/net.py +++ b/qubes/tests/vm/mix/net.py @@ -84,49 +84,17 @@ class TC_00_NetVMMixin( self.app.domains = {1: vm, vm: vm} self.assertPropertyInvalidValue(vm, 'netvm', vm) - @unittest.skip('TODO: probably remove') - def test_290_dispvm_netvm(self): + def test_150_ip(self): vm = self.get_vm() self.setup_netvms(vm) - self.assertPropertyDefaultValue(vm, 'dispvm_netvm', - self.app.default_netvm) - self.assertPropertyValue(vm, 'dispvm_netvm', self.netvm2, self.netvm2, - self.netvm2.name) - del vm.dispvm_netvm - self.assertPropertyDefaultValue(vm, 'dispvm_netvm', - self.app.default_netvm) - self.assertPropertyValue(vm, 'dispvm_netvm', self.netvm2.name, - self.netvm2, self.netvm2.name) - # XXX FIXME xml value - self.assertPropertyValue(vm, 'dispvm_netvm', None, None, 'None') + self.assertPropertyDefaultValue(vm, 'ip', '10.137.0.' + str(vm.qid)) + vm.ip = '192.168.1.1' + self.assertEqual(vm.ip, '192.168.1.1') - @unittest.skip('TODO: probably remove') - def test_291_dispvm_netvm_invalid(self): + def test_151_ip_invalid(self): vm = self.get_vm() self.setup_netvms(vm) - self.assertPropertyInvalidValue(vm, 'dispvm_netvm', 'invalid') - self.assertPropertyInvalidValue(vm, 'dispvm_netvm', 123) - - @unittest.skip('TODO: probably remove') - def test_291_dispvm_netvm_netvm(self): - vm = self.get_vm() - nonetvm = TestVM(qid=2, app=self.app, name='nonetvm') - self.app.domains = {1: vm, 2: nonetvm} - self.assertPropertyInvalidValue(vm, 'dispvm_netvm', nonetvm) - - @unittest.skip('TODO: probably remove') - def test_291_dispvm_netvm_default(self): - """Check if vm.dispvm_netvm default is really vm.netvm""" - vm = self.get_vm() - self.setup_netvms(vm) - vm.netvm = self.netvm2 - self.assertPropertyDefaultValue(vm, 'dispvm_netvm', self.netvm2) - del vm.netvm - self.assertPropertyDefaultValue(vm, 'dispvm_netvm', self.netvm1) - - @unittest.skip('TODO: probably remove') - def test_292_dispvm_netvm_loopback(self): - vm = self.get_vm() - self.app.domains = {1: vm, vm: vm} - self.assertPropertyInvalidValue(vm, 'dispvm_netvm', vm) - + self.assertPropertyInvalidValue(vm, 'ip', 'abcd') + self.assertPropertyInvalidValue(vm, 'ip', 'a.b.c.d') + self.assertPropertyInvalidValue(vm, 'ip', '1111.2222.3333.4444') + # TODO: implement and add here: 0.0.0.0, 333.333.333.333 diff --git a/qubes/vm/__init__.py b/qubes/vm/__init__.py index 54984866..45312695 100644 --- a/qubes/vm/__init__.py +++ b/qubes/vm/__init__.py @@ -127,11 +127,8 @@ class Features(dict): return self[feature] if hasattr(self.vm, 'template') and self.vm.template is not None: - try: - return self.vm.template.features[feature] - except KeyError: - # handle default just below - pass + return self.vm.template.features.check_with_template(feature, + default) if default is self._NO_DEFAULT: raise KeyError(feature) diff --git a/qubes/vm/mix/net.py b/qubes/vm/mix/net.py index b17bd356..b8f851d1 100644 --- a/qubes/vm/mix/net.py +++ b/qubes/vm/mix/net.py @@ -45,6 +45,25 @@ def _setter_mac(self, prop, value): return value +def _default_ip(self): + if not self.is_networked(): + return None + if self.netvm is not None: + return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member + else: + return self.get_ip_for_vm(self) + + +def _setter_ip(self, prop, value): + # pylint: disable=unused-argument + if not isinstance(value, basestring): + raise ValueError('IP address must be a string') + value = value.lower() + if re.match(r"^([0-9]{1,3}.){3}[0-9]{1,3}$", value) is None: + raise ValueError('Invalid IP address value') + return value + + class NetVMMixin(qubes.events.Emitter): ''' Mixin containing network functionality ''' mac = qubes.property('mac', type=str, @@ -53,6 +72,12 @@ class NetVMMixin(qubes.events.Emitter): ls_width=17, doc='MAC address of the NIC emulated inside VM') + ip = qubes.property('ip', type=str, + default=_default_ip, + setter=_setter_ip, + ls_width=15, + doc='IP address of this domain.') + # CORE2: swallowed uses_default_netvm netvm = qubes.VMProperty('netvm', load_stage=4, allow_none=True, default=(lambda self: self.app.default_fw_netvm if self.provides_network @@ -74,16 +99,27 @@ class NetVMMixin(qubes.events.Emitter): # used in networked appvms or proxyvms (netvm is not None) # + @qubes.tools.qvm_ls.column(width=15) @property - def ip(self): - '''IP address of this domain.''' - if not self.is_networked(): - return None - if self.netvm is not None: - return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member - else: - return self.get_ip_for_vm(self) + def visible_ip(self): + '''IP address of this domain as seen by the domain.''' + return self.features.check_with_template('net/fake-ip', None) or \ + self.ip + + @qubes.tools.qvm_ls.column(width=15) + @property + def visible_gateway(self): + '''Default gateway of this domain as seen by the domain.''' + return self.features.check_with_template('net/fake-gateway', None) or \ + self.netvm.gateway + + @qubes.tools.qvm_ls.column(width=15) + @property + def visible_netmask(self): + '''Netmask as seen by the domain.''' + return self.features.check_with_template('net/fake-netmask', None) or \ + self.netvm.netmask # # used in netvms (provides_network=True) @@ -106,7 +142,7 @@ class NetVMMixin(qubes.events.Emitter): @property def gateway(self): '''Gateway for other domains that use this domain as netvm.''' - return self.ip if self.provides_network else None + return self.visible_ip if self.provides_network else None @qubes.tools.qvm_ls.column(width=15) @property @@ -211,6 +247,7 @@ class NetVMMixin(qubes.events.Emitter): self.log.info('Starting NetVM ({0})'.format(self.netvm.name)) self.netvm.start() + self.netvm.set_mapped_ip_info_for_vm(self) self.libvirt_domain.attachDevice( self.app.env.get_template('libvirt/devices/net.xml').render( vm=self)) @@ -274,6 +311,25 @@ class NetVMMixin(qubes.events.Emitter): # signal its done self.qdb.write(base_dir[:-1], '') + def set_mapped_ip_info_for_vm(self, vm): + ''' + Set configuration to possibly hide real IP from the VM. + This needs to be done before executing 'script' + (`/etc/xen/scripts/vif-route-qubes`) in network providing VM + ''' + # add info about remapped IPs (VM IP hidden from the VM itself) + mapped_ip_base = '/mapped-ip/{}'.format(vm.ip) + if vm.visible_ip: + self.qdb.write(mapped_ip_base + '/visible-ip', vm.visible_ip) + else: + self.qdb.rm(mapped_ip_base + '/visible-ip') + if vm.visible_gateway: + self.qdb.write(mapped_ip_base + '/visible-gateway', + vm.visible_gateway) + else: + self.qdb.rm(mapped_ip_base + '/visible-gateway') + + @qubes.events.handler('property-del:netvm') def on_property_del_netvm(self, event, prop, old_netvm=None): ''' Sets the the NetVM to default NetVM ''' @@ -342,6 +398,7 @@ class NetVMMixin(qubes.events.Emitter): ''' Reloads the firewall if vm is running and has a NetVM assigned ''' # pylint: disable=unused-argument if self.is_running() and self.netvm: + self.netvm.set_mapped_ip_info_for_vm(self) self.netvm.reload_firewall_for_vm(self) # pylint: disable=no-member # CORE2: swallowed get_firewall_conf, write_firewall_conf, diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index 67d855f0..3686997d 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -1670,9 +1670,9 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): self.qdb.write('/qubes-netvm-{}-dns'.format(i), addr) if self.netvm is not None: - self.qdb.write('/qubes-ip', self.ip) - self.qdb.write('/qubes-netmask', self.netvm.netmask) - self.qdb.write('/qubes-gateway', self.netvm.gateway) + self.qdb.write('/qubes-ip', self.visible_ip) + self.qdb.write('/qubes-netmask', self.visible_netmask) + self.qdb.write('/qubes-gateway', self.visible_gateway) for i, addr in zip(('primary', 'secondary'), self.dns): self.qdb.write('/qubes-{}-dns'.format(i), addr)