vm/mix/net: use ipaddress module for ip and ip6 properties
It has built-in validation, which is much more elegant than custom regex or socket call. Suggested by @woju QubesOS/qubes-issues#718
This commit is contained in:
parent
f3cf58e6f2
commit
e12a66f103
@ -19,7 +19,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
|
# License along with this library; if not, see <https://www.gnu.org/licenses/>.
|
||||||
#
|
#
|
||||||
|
import ipaddress
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import qubes
|
import qubes
|
||||||
@ -110,9 +110,10 @@ class TC_00_NetVMMixin(
|
|||||||
def test_150_ip(self):
|
def test_150_ip(self):
|
||||||
vm = self.get_vm()
|
vm = self.get_vm()
|
||||||
self.setup_netvms(vm)
|
self.setup_netvms(vm)
|
||||||
self.assertPropertyDefaultValue(vm, 'ip', '10.137.0.' + str(vm.qid))
|
self.assertPropertyDefaultValue(vm, 'ip',
|
||||||
|
ipaddress.IPv4Address('10.137.0.' + str(vm.qid)))
|
||||||
vm.ip = '192.168.1.1'
|
vm.ip = '192.168.1.1'
|
||||||
self.assertEqual(vm.ip, '192.168.1.1')
|
self.assertEqual(vm.ip, ipaddress.IPv4Address('192.168.1.1'))
|
||||||
|
|
||||||
def test_151_ip_invalid(self):
|
def test_151_ip_invalid(self):
|
||||||
vm = self.get_vm()
|
vm = self.get_vm()
|
||||||
@ -128,9 +129,10 @@ class TC_00_NetVMMixin(
|
|||||||
self.assertPropertyDefaultValue(vm, 'ip6', None)
|
self.assertPropertyDefaultValue(vm, 'ip6', None)
|
||||||
vm.netvm.features['ipv6'] = True
|
vm.netvm.features['ipv6'] = True
|
||||||
self.assertPropertyDefaultValue(vm, 'ip6',
|
self.assertPropertyDefaultValue(vm, 'ip6',
|
||||||
'{}::a89:{:x}'.format(qubes.config.qubes_ipv6_prefix, vm.qid))
|
ipaddress.IPv6Address('{}::a89:{:x}'.format(
|
||||||
|
qubes.config.qubes_ipv6_prefix, vm.qid)))
|
||||||
vm.ip6 = 'abcd:efff::1'
|
vm.ip6 = 'abcd:efff::1'
|
||||||
self.assertEqual(vm.ip6, 'abcd:efff::1')
|
self.assertEqual(vm.ip6, ipaddress.IPv6Address('abcd:efff::1'))
|
||||||
|
|
||||||
def test_161_ip6_invalid(self):
|
def test_161_ip6_invalid(self):
|
||||||
vm = self.get_vm()
|
vm = self.get_vm()
|
||||||
|
@ -783,7 +783,7 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
|
|||||||
self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
|
self.assertXMLEqual(lxml.etree.XML(libvirt_xml),
|
||||||
lxml.etree.XML(expected.format(
|
lxml.etree.XML(expected.format(
|
||||||
extra_ip='<ip address="{}::a89:1" family=\'ipv6\'/>'.format(
|
extra_ip='<ip address="{}::a89:1" family=\'ipv6\'/>'.format(
|
||||||
qubes.config.qubes_ipv6_prefix))))
|
qubes.config.qubes_ipv6_prefix.replace(':0000', '')))))
|
||||||
|
|
||||||
@unittest.mock.patch('qubes.utils.get_timezone')
|
@unittest.mock.patch('qubes.utils.get_timezone')
|
||||||
@unittest.mock.patch('qubes.utils.urandom')
|
@unittest.mock.patch('qubes.utils.urandom')
|
||||||
@ -904,7 +904,9 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
|
|||||||
test_qubesdb.data.clear()
|
test_qubesdb.data.clear()
|
||||||
with self.subTest('ipv6'):
|
with self.subTest('ipv6'):
|
||||||
netvm.features['ipv6'] = True
|
netvm.features['ipv6'] = True
|
||||||
expected['/qubes-ip6'] = qubes.config.qubes_ipv6_prefix + '::a89:3'
|
expected['/qubes-ip6'] = \
|
||||||
|
qubes.config.qubes_ipv6_prefix.replace(':0000', '') + \
|
||||||
|
'::a89:3'
|
||||||
expected['/qubes-gateway6'] = 'fe80::fcff:ffff:feff:ffff'
|
expected['/qubes-gateway6'] = 'fe80::fcff:ffff:feff:ffff'
|
||||||
vm.create_qdb_entries()
|
vm.create_qdb_entries()
|
||||||
self.assertEqual(test_qubesdb.data, expected)
|
self.assertEqual(test_qubesdb.data, expected)
|
||||||
@ -913,7 +915,9 @@ class TC_90_QubesVM(QubesVMTestsMixin, qubes.tests.QubesTestCase):
|
|||||||
with self.subTest('ipv6_just_appvm'):
|
with self.subTest('ipv6_just_appvm'):
|
||||||
del netvm.features['ipv6']
|
del netvm.features['ipv6']
|
||||||
vm.features['ipv6'] = True
|
vm.features['ipv6'] = True
|
||||||
expected['/qubes-ip6'] = qubes.config.qubes_ipv6_prefix + '::a89:3'
|
expected['/qubes-ip6'] = \
|
||||||
|
qubes.config.qubes_ipv6_prefix.replace(':0000', '') + \
|
||||||
|
'::a89:3'
|
||||||
del expected['/qubes-gateway6']
|
del expected['/qubes-gateway6']
|
||||||
vm.create_qdb_entries()
|
vm.create_qdb_entries()
|
||||||
self.assertEqual(test_qubesdb.data, expected)
|
self.assertEqual(test_qubesdb.data, expected)
|
||||||
|
@ -21,10 +21,9 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
''' This module contains the NetVMMixin '''
|
''' This module contains the NetVMMixin '''
|
||||||
|
import ipaddress
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import socket
|
|
||||||
|
|
||||||
import libvirt # pylint: disable=import-error
|
import libvirt # pylint: disable=import-error
|
||||||
import qubes
|
import qubes
|
||||||
@ -53,15 +52,6 @@ def _default_ip(self):
|
|||||||
return self.get_ip_for_vm(self)
|
return self.get_ip_for_vm(self)
|
||||||
|
|
||||||
|
|
||||||
def _setter_ip(self, prop, value):
|
|
||||||
# pylint: disable=unused-argument
|
|
||||||
if not isinstance(value, str):
|
|
||||||
raise ValueError('IP address must be a string')
|
|
||||||
value = value.lower()
|
|
||||||
if re.match(r"^([0-9]{1,3}.){3}[0-9]{1,3}$", value) is None:
|
|
||||||
raise ValueError('Invalid IP address value')
|
|
||||||
return value
|
|
||||||
|
|
||||||
def _default_ip6(self):
|
def _default_ip6(self):
|
||||||
if not self.is_networked():
|
if not self.is_networked():
|
||||||
return None
|
return None
|
||||||
@ -73,17 +63,6 @@ def _default_ip6(self):
|
|||||||
return self.get_ip6_for_vm(self)
|
return self.get_ip6_for_vm(self)
|
||||||
|
|
||||||
|
|
||||||
def _setter_ip6(self, prop, value):
|
|
||||||
# pylint: disable=unused-argument
|
|
||||||
if not isinstance(value, str):
|
|
||||||
raise ValueError('IPv6 address must be a string')
|
|
||||||
value = value.lower()
|
|
||||||
try:
|
|
||||||
socket.inet_pton(socket.AF_INET6, value)
|
|
||||||
except socket.error:
|
|
||||||
raise ValueError('Invalid IPv6 address value')
|
|
||||||
return value
|
|
||||||
|
|
||||||
def _setter_netvm(self, prop, value):
|
def _setter_netvm(self, prop, value):
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
if value is None:
|
if value is None:
|
||||||
@ -109,14 +88,12 @@ class NetVMMixin(qubes.events.Emitter):
|
|||||||
setter=_setter_mac,
|
setter=_setter_mac,
|
||||||
doc='MAC address of the NIC emulated inside VM')
|
doc='MAC address of the NIC emulated inside VM')
|
||||||
|
|
||||||
ip = qubes.property('ip', type=str,
|
ip = qubes.property('ip', type=ipaddress.IPv4Address,
|
||||||
default=_default_ip,
|
default=_default_ip,
|
||||||
setter=_setter_ip,
|
|
||||||
doc='IP address of this domain.')
|
doc='IP address of this domain.')
|
||||||
|
|
||||||
ip6 = qubes.property('ip6', type=str,
|
ip6 = qubes.property('ip6', type=ipaddress.IPv6Address,
|
||||||
default=_default_ip6,
|
default=_default_ip6,
|
||||||
setter=_setter_ip6,
|
|
||||||
doc='IPv6 address of this domain.')
|
doc='IPv6 address of this domain.')
|
||||||
|
|
||||||
# CORE2: swallowed uses_default_netvm
|
# CORE2: swallowed uses_default_netvm
|
||||||
@ -183,12 +160,13 @@ class NetVMMixin(qubes.events.Emitter):
|
|||||||
'''
|
'''
|
||||||
import qubes.vm.dispvm # pylint: disable=redefined-outer-name
|
import qubes.vm.dispvm # pylint: disable=redefined-outer-name
|
||||||
if isinstance(vm, qubes.vm.dispvm.DispVM):
|
if isinstance(vm, qubes.vm.dispvm.DispVM):
|
||||||
return '10.138.{}.{}'.format((vm.dispid >> 8) & 0xff,
|
return ipaddress.IPv4Address('10.138.{}.{}'.format(
|
||||||
vm.dispid & 0xff)
|
(vm.dispid >> 8) & 0xff, vm.dispid & 0xff))
|
||||||
|
|
||||||
# VM technically can get address which ends in '.0'. This currently
|
# VM technically can get address which ends in '.0'. This currently
|
||||||
# does not happen, because qid < 253, but may happen in the future.
|
# does not happen, because qid < 253, but may happen in the future.
|
||||||
return '10.137.{}.{}'.format((vm.qid >> 8) & 0xff, vm.qid & 0xff)
|
return ipaddress.IPv4Address('10.137.{}.{}'.format(
|
||||||
|
(vm.qid >> 8) & 0xff, vm.qid & 0xff))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_ip6_for_vm(vm):
|
def get_ip6_for_vm(vm):
|
||||||
@ -199,10 +177,11 @@ class NetVMMixin(qubes.events.Emitter):
|
|||||||
'''
|
'''
|
||||||
import qubes.vm.dispvm # pylint: disable=redefined-outer-name
|
import qubes.vm.dispvm # pylint: disable=redefined-outer-name
|
||||||
if isinstance(vm, qubes.vm.dispvm.DispVM):
|
if isinstance(vm, qubes.vm.dispvm.DispVM):
|
||||||
return '{}::a8a:{:x}'.format(
|
return ipaddress.IPv6Address('{}::a8a:{:x}'.format(
|
||||||
qubes.config.qubes_ipv6_prefix, vm.dispid)
|
qubes.config.qubes_ipv6_prefix, vm.dispid))
|
||||||
|
|
||||||
return '{}::a89:{:x}'.format(qubes.config.qubes_ipv6_prefix, vm.qid)
|
return ipaddress.IPv6Address('{}::a89:{:x}'.format(
|
||||||
|
qubes.config.qubes_ipv6_prefix, vm.qid))
|
||||||
|
|
||||||
@qubes.stateless_property
|
@qubes.stateless_property
|
||||||
def gateway(self):
|
def gateway(self):
|
||||||
|
@ -1816,26 +1816,28 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
|
|||||||
|
|
||||||
if self.provides_network:
|
if self.provides_network:
|
||||||
# '/qubes-netvm-network' value is only checked for being non empty
|
# '/qubes-netvm-network' value is only checked for being non empty
|
||||||
self.untrusted_qdb.write('/qubes-netvm-network', self.gateway)
|
self.untrusted_qdb.write('/qubes-netvm-network', str(self.gateway))
|
||||||
self.untrusted_qdb.write('/qubes-netvm-gateway', self.gateway)
|
self.untrusted_qdb.write('/qubes-netvm-gateway', str(self.gateway))
|
||||||
self.untrusted_qdb.write('/qubes-netvm-netmask', self.netmask)
|
self.untrusted_qdb.write('/qubes-netvm-netmask', str(self.netmask))
|
||||||
|
|
||||||
for i, addr in zip(('primary', 'secondary'), self.dns):
|
for i, addr in zip(('primary', 'secondary'), self.dns):
|
||||||
self.untrusted_qdb.write('/qubes-netvm-{}-dns'.format(i), addr)
|
self.untrusted_qdb.write('/qubes-netvm-{}-dns'.format(i), addr)
|
||||||
|
|
||||||
if self.netvm is not None:
|
if self.netvm is not None:
|
||||||
self.untrusted_qdb.write('/qubes-ip', self.visible_ip)
|
self.untrusted_qdb.write('/qubes-ip', str(self.visible_ip))
|
||||||
self.untrusted_qdb.write('/qubes-netmask', self.visible_netmask)
|
self.untrusted_qdb.write('/qubes-netmask',
|
||||||
self.untrusted_qdb.write('/qubes-gateway', self.visible_gateway)
|
str(self.visible_netmask))
|
||||||
|
self.untrusted_qdb.write('/qubes-gateway',
|
||||||
|
str(self.visible_gateway))
|
||||||
|
|
||||||
for i, addr in zip(('primary', 'secondary'), self.dns):
|
for i, addr in zip(('primary', 'secondary'), self.dns):
|
||||||
self.untrusted_qdb.write('/qubes-{}-dns'.format(i), addr)
|
self.untrusted_qdb.write('/qubes-{}-dns'.format(i), str(addr))
|
||||||
|
|
||||||
if self.visible_ip6:
|
if self.visible_ip6:
|
||||||
self.untrusted_qdb.write('/qubes-ip6', self.visible_ip6)
|
self.untrusted_qdb.write('/qubes-ip6', str(self.visible_ip6))
|
||||||
if self.visible_gateway6:
|
if self.visible_gateway6:
|
||||||
self.untrusted_qdb.write('/qubes-gateway6',
|
self.untrusted_qdb.write('/qubes-gateway6',
|
||||||
self.visible_gateway6)
|
str(self.visible_gateway6))
|
||||||
|
|
||||||
|
|
||||||
tzname = qubes.utils.get_timezone()
|
tzname = qubes.utils.get_timezone()
|
||||||
|
Loading…
Reference in New Issue
Block a user