mgmt: move property value sanitization to property definition
This also means we don't check if a VM with given name (in case of VMProperty) exists in the system, at this stage. But this is ok, lets not duplicate work of property setter. QubesOS/qubes-issues#2622
This commit is contained in:
parent
ce3bedbf2c
commit
3388054eae
@ -31,6 +31,7 @@ import builtins
|
|||||||
import collections
|
import collections
|
||||||
import os
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
|
import string
|
||||||
|
|
||||||
import lxml.etree
|
import lxml.etree
|
||||||
import qubes.config
|
import qubes.config
|
||||||
@ -321,6 +322,41 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
|
|||||||
'property {!r} is write-once and already set'.format(
|
'property {!r} is write-once and already set'.format(
|
||||||
self.__name__))
|
self.__name__))
|
||||||
|
|
||||||
|
def sanitize(self, *, untrusted_newvalue):
|
||||||
|
'''Coarse sanitization of value to be set, before sending it to a
|
||||||
|
setter. Can raise QubesValueError if the value is invalid.
|
||||||
|
|
||||||
|
:param untrusted_newvalue: value to be validated
|
||||||
|
:return sanitized value
|
||||||
|
:raises qubes.exc.QubesValueError
|
||||||
|
'''
|
||||||
|
# do not treat type='str' as sufficient validation
|
||||||
|
if self.type is not None and self.type is not str:
|
||||||
|
# assume specific type will preform enough validation
|
||||||
|
if self.type is bool:
|
||||||
|
try:
|
||||||
|
untrusted_newvalue = untrusted_newvalue.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
raise qubes.exc.QubesValueError
|
||||||
|
return self.bool(None, None, untrusted_newvalue)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
return self.type(untrusted_newvalue)
|
||||||
|
except ValueError:
|
||||||
|
raise qubes.exc.QubesValueError
|
||||||
|
else:
|
||||||
|
# 'str' or not specified type
|
||||||
|
try:
|
||||||
|
untrusted_newvalue = untrusted_newvalue.decode('ascii',
|
||||||
|
errors='strict')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
raise qubes.exc.QubesValueError
|
||||||
|
allowed_set = string.printable
|
||||||
|
if not all(x in allowed_set for x in untrusted_newvalue):
|
||||||
|
raise qubes.exc.QubesValueError(
|
||||||
|
'Invalid characters in property value')
|
||||||
|
return untrusted_newvalue
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# exceptions
|
# exceptions
|
||||||
|
@ -176,43 +176,7 @@ class QubesMgmt(object):
|
|||||||
assert self.arg in self.dest.property_list()
|
assert self.arg in self.dest.property_list()
|
||||||
|
|
||||||
property_def = self.dest.property_get_def(self.arg)
|
property_def = self.dest.property_get_def(self.arg)
|
||||||
# do not treat type='str' as sufficient validation
|
newvalue = property_def.sanitize(untrusted_newvalue=untrusted_payload)
|
||||||
if isinstance(property_def, qubes.vm.VMProperty):
|
|
||||||
try:
|
|
||||||
untrusted_vmname = untrusted_payload.decode('ascii')
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
raise qubes.exc.QubesValueError
|
|
||||||
qubes.vm.qubesvm.validate_name(self.dest, property_def,
|
|
||||||
untrusted_vmname)
|
|
||||||
try:
|
|
||||||
newvalue = self.app.domains[untrusted_vmname]
|
|
||||||
except KeyError:
|
|
||||||
raise qubes.exc.QubesPropertyValueError(
|
|
||||||
self.dest, property_def, untrusted_vmname)
|
|
||||||
elif property_def.type is not None and property_def.type is not str:
|
|
||||||
if property_def.type is bool:
|
|
||||||
try:
|
|
||||||
untrusted_value = untrusted_payload.decode('ascii')
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
raise qubes.exc.QubesValueError
|
|
||||||
newvalue = qubes.property.bool(None, None, untrusted_value)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
newvalue = property_def.type(untrusted_payload)
|
|
||||||
except ValueError:
|
|
||||||
raise qubes.exc.QubesValueError
|
|
||||||
else:
|
|
||||||
# other types (including explicitly defined 'str') coerce to str
|
|
||||||
# with limited characters allowed
|
|
||||||
try:
|
|
||||||
untrusted_value = untrusted_payload.decode('ascii')
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
raise qubes.exc.QubesValueError
|
|
||||||
allowed_set = string.printable
|
|
||||||
if not all(x in allowed_set for x in untrusted_value):
|
|
||||||
raise qubes.exc.QubesValueError(
|
|
||||||
'Invalid characters in property value')
|
|
||||||
newvalue = untrusted_value
|
|
||||||
|
|
||||||
self.fire_event_for_permission(newvalue=newvalue)
|
self.fire_event_for_permission(newvalue=newvalue)
|
||||||
|
|
||||||
|
@ -130,18 +130,10 @@ class TC_00_VMs(MgmtTestCase):
|
|||||||
value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
||||||
b'netvm', b'test-net')
|
b'netvm', b'test-net')
|
||||||
self.assertIsNone(value)
|
self.assertIsNone(value)
|
||||||
mock.assert_called_once_with(self.vm, netvm)
|
mock.assert_called_once_with(self.vm, 'test-net')
|
||||||
self.app.save.assert_called_once_with()
|
self.app.save.assert_called_once_with()
|
||||||
|
|
||||||
def test_031_vm_property_set_vm_invalid1(self):
|
def test_032_vm_property_set_vm_invalid1(self):
|
||||||
with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
|
|
||||||
with self.assertRaises(qubes.exc.QubesValueError):
|
|
||||||
self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
|
||||||
b'netvm', b'no-such-vm')
|
|
||||||
self.assertFalse(mock.called)
|
|
||||||
self.assertFalse(self.app.save.called)
|
|
||||||
|
|
||||||
def test_032_vm_property_set_vm_invalid2(self):
|
|
||||||
with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
|
with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
|
||||||
with self.assertRaises(qubes.exc.QubesValueError):
|
with self.assertRaises(qubes.exc.QubesValueError):
|
||||||
self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
||||||
@ -149,7 +141,7 @@ class TC_00_VMs(MgmtTestCase):
|
|||||||
self.assertFalse(mock.called)
|
self.assertFalse(mock.called)
|
||||||
self.assertFalse(self.app.save.called)
|
self.assertFalse(self.app.save.called)
|
||||||
|
|
||||||
def test_033_vm_property_set_vm_invalid3(self):
|
def test_033_vm_property_set_vm_invalid2(self):
|
||||||
with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
|
with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock:
|
||||||
with self.assertRaises(qubes.exc.QubesValueError):
|
with self.assertRaises(qubes.exc.QubesValueError):
|
||||||
self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1',
|
||||||
|
@ -358,3 +358,11 @@ class VMProperty(qubes.property):
|
|||||||
self.vmclass.__name__))
|
self.vmclass.__name__))
|
||||||
|
|
||||||
super(VMProperty, self).__set__(instance, vm)
|
super(VMProperty, self).__set__(instance, vm)
|
||||||
|
|
||||||
|
def sanitize(self, *, untrusted_newvalue):
|
||||||
|
try:
|
||||||
|
untrusted_vmname = untrusted_newvalue.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
raise qubes.exc.QubesValueError
|
||||||
|
validate_name(None, self, untrusted_vmname)
|
||||||
|
return untrusted_vmname
|
||||||
|
Loading…
Reference in New Issue
Block a user