DomainPool check the untrusted data from qubes-db

This commit is contained in:
Bahtiar `kalkin-` Gadimov 2016-05-25 16:28:03 +02:00
parent 3f5a92772a
commit 35974a5dbf
No known key found for this signature in database
GPG Key ID: 96ED3C3BA19C3DEE

View File

@ -21,6 +21,8 @@
# #
''' Manages block devices in a domain ''' ''' Manages block devices in a domain '''
import string
from qubes.storage import Pool, Volume from qubes.storage import Pool, Volume
@ -41,25 +43,50 @@ class DomainPool(Pool):
''' Queries qubesdb and returns volumes for `self.vm` ''' ''' Queries qubesdb and returns volumes for `self.vm` '''
qdb = self.vm.qdb qdb = self.vm.qdb
safe_set = set(string.letters + string.digits + string.punctuation)
allowed_attributes = {'desc': string.printable,
'mode': string.letters,
'size': string.digits}
if not self.vm.is_running(): if not self.vm.is_running():
return [] return []
untrusted_qubes_devices = qdb.list('/qubes-block-devices/') untrusted_qubes_devices = qdb.list('/qubes-block-devices/')
# because we get each path 3 x times as # because we get each path 3 x times as
# /qubes-block-devices/foo/{desc,mode,size} we need to merge this # /qubes-block-devices/foo/{desc,mode,size} we need to merge this
untrusted_devices = {} devices = {}
for untrusted_device_path in untrusted_qubes_devices: for untrusted_device_path in untrusted_qubes_devices:
_, _, untrusted_name, untrusted_atr = untrusted_device_path.split( if not all(c in safe_set for c in untrusted_device_path):
'/', 4) msg = ("%s vm's device path name contains unsafe characters. "
if untrusted_name not in untrusted_devices.keys(): "Skipping it.")
untrusted_devices[untrusted_name] = { self.vm.log.warning(msg % self.vm.name)
untrusted_atr: qdb.read(untrusted_device_path) continue
}
else:
untrusted_devices[untrusted_name][untrusted_atr] = qdb.read(
untrusted_device_path)
return [DomainVolume(untrusted_n, self.name, **untrusted_atrs) # name can be trusted because it was checked as a part of
for untrusted_n, untrusted_atrs in untrusted_devices.items()] # untrusted_device_path check above
_, _, name, untrusted_atr = untrusted_device_path.split('/', 4)
if untrusted_atr in allowed_attributes.keys():
atr = untrusted_atr
else:
msg = ('{!s} has an unknown qubes-block-device atr {!s} '
'Skipping it')
self.vm.log.error(msg.format(self.vm.name, untrusted_atr))
continue
untrusted_value = qdb.read(untrusted_device_path)
allowed_characters = allowed_attributes[atr]
if all(c in allowed_characters for c in untrusted_value):
value = untrusted_value
else:
msg = ("{!s} vm's device path {!s} contains unsafe characters")
self.vm.log.error(msg.format(self.vm.name, atr))
if name not in devices.keys():
devices[name] = {}
devices[name][atr] = value
return [DomainVolume(n, self.name, **atrs)
for n, atrs in devices.items()]
def clone(self, source, target): def clone(self, source, target):
raise NotImplementedError raise NotImplementedError