diff --git a/qubes/storage/domain.py b/qubes/storage/domain.py index a84be882..e7fc33ca 100644 --- a/qubes/storage/domain.py +++ b/qubes/storage/domain.py @@ -21,6 +21,8 @@ # ''' Manages block devices in a domain ''' +import string + from qubes.storage import Pool, Volume @@ -41,25 +43,50 @@ class DomainPool(Pool): ''' Queries qubesdb and returns volumes for `self.vm` ''' qdb = self.vm.qdb + safe_set = set(string.letters + string.digits + string.punctuation) + allowed_attributes = {'desc': string.printable, + 'mode': string.letters, + 'size': string.digits} if not self.vm.is_running(): return [] untrusted_qubes_devices = qdb.list('/qubes-block-devices/') # because we get each path 3 x times as # /qubes-block-devices/foo/{desc,mode,size} we need to merge this - untrusted_devices = {} + devices = {} for untrusted_device_path in untrusted_qubes_devices: - _, _, untrusted_name, untrusted_atr = untrusted_device_path.split( - '/', 4) - if untrusted_name not in untrusted_devices.keys(): - untrusted_devices[untrusted_name] = { - untrusted_atr: qdb.read(untrusted_device_path) - } - else: - untrusted_devices[untrusted_name][untrusted_atr] = qdb.read( - untrusted_device_path) + if not all(c in safe_set for c in untrusted_device_path): + msg = ("%s vm's device path name contains unsafe characters. " + "Skipping it.") + self.vm.log.warning(msg % self.vm.name) + continue - return [DomainVolume(untrusted_n, self.name, **untrusted_atrs) - for untrusted_n, untrusted_atrs in untrusted_devices.items()] + # name can be trusted because it was checked as a part of + # untrusted_device_path check above + _, _, name, untrusted_atr = untrusted_device_path.split('/', 4) + + if untrusted_atr in allowed_attributes.keys(): + atr = untrusted_atr + else: + msg = ('{!s} has an unknown qubes-block-device atr {!s} ' + 'Skipping it') + self.vm.log.error(msg.format(self.vm.name, untrusted_atr)) + continue + + untrusted_value = qdb.read(untrusted_device_path) + allowed_characters = allowed_attributes[atr] + if all(c in allowed_characters for c in untrusted_value): + value = untrusted_value + else: + msg = ("{!s} vm's device path {!s} contains unsafe characters") + self.vm.log.error(msg.format(self.vm.name, atr)) + + if name not in devices.keys(): + devices[name] = {} + + devices[name][atr] = value + + return [DomainVolume(n, self.name, **atrs) + for n, atrs in devices.items()] def clone(self, source, target): raise NotImplementedError