Do not use assert statement in security logic

This is because assert statement gets optimised out when Python is run
with -O flag. This was pointed out to me by audience at PyWaw 76.
This commit is contained in:
Wojtek Porczyk 2018-06-11 12:32:05 +02:00
parent 39a9e4e422
commit 4e49b951ce
10 changed files with 164 additions and 149 deletions

View File

@ -196,6 +196,11 @@ class AbstractQubesAPI(object):
return apply_filters(iterable,
self.fire_event_for_permission(**kwargs))
def enforce(self, predicate):
'''An assert replacement, but works even with optimisations.'''
if not predicate:
raise PermissionDenied()
class QubesDaemonProtocol(asyncio.Protocol):
buffer_size = 65536

View File

@ -98,8 +98,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def vmclass_list(self):
'''List all VM classes'''
assert not self.arg
assert self.dest.name == 'dom0'
self.enforce(not self.arg)
self.enforce(self.dest.name == 'dom0')
entrypoints = self.fire_event_for_filter(
pkg_resources.iter_entry_points(qubes.vm.VM_ENTRY_POINT))
@ -112,7 +112,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def vm_list(self):
'''List all the domains'''
assert not self.arg
self.enforce(not self.arg)
if self.dest.name == 'dom0':
domains = self.fire_event_for_filter(self.app.domains)
@ -137,11 +137,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def property_list(self):
'''List all global properties'''
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
return self._property_list(self.app)
def _property_list(self, dest):
assert not self.arg
self.enforce(not self.arg)
properties = self.fire_event_for_filter(dest.property_list())
@ -159,7 +159,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def property_get(self):
'''Get a value of one global property'''
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
return self._property_get(self.app)
def _property_get(self, dest):
@ -203,7 +203,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def property_get_default(self):
'''Get a value of one global property'''
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
return self._property_get_default(self.app)
def _property_get_default(self, dest):
@ -247,7 +247,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def property_set(self, untrusted_payload):
'''Set property value'''
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
return self._property_set(self.app,
untrusted_payload=untrusted_payload)
@ -275,7 +275,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def property_help(self):
'''Get help for one property'''
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
return self._property_help(self.app)
def _property_help(self, dest):
@ -303,7 +303,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
@asyncio.coroutine
def property_reset(self):
'''Reset a property to a default value'''
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
return self._property_reset(self.app)
def _property_reset(self, dest):
@ -319,7 +319,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def vm_volume_list(self):
assert not self.arg
self.enforce(not self.arg)
volume_names = self.fire_event_for_filter(self.dest.volumes.keys())
return ''.join('{}\n'.format(name) for name in volume_names)
@ -328,7 +328,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def vm_volume_info(self):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
self.fire_event_for_permission()
@ -356,7 +356,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def vm_volume_listsnapshots(self):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
volume = self.dest.volumes[self.arg]
id_to_timestamp = volume.revisions
@ -369,13 +369,13 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_volume_revert(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
untrusted_revision = untrusted_payload.decode('ascii').strip()
del untrusted_payload
volume = self.dest.volumes[self.arg]
snapshots = volume.revisions
assert untrusted_revision in snapshots
self.enforce(untrusted_revision in snapshots)
revision = untrusted_revision
self.fire_event_for_permission(volume=volume, revision=revision)
@ -391,7 +391,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_volume_clone_from(self):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
volume = self.dest.volumes[self.arg]
@ -403,7 +403,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
self.app.api_admin_pending_clone = {}
# don't handle collisions any better - if someone is so much out of
# luck, can try again anyway
assert token not in self.app.api_admin_pending_clone
self.enforce(token not in self.app.api_admin_pending_clone)
self.app.api_admin_pending_clone[token] = volume
return token
@ -412,11 +412,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_volume_clone_to(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
untrusted_token = untrusted_payload.decode('ascii').strip()
del untrusted_payload
assert untrusted_token in getattr(self.app,
'api_admin_pending_clone', {})
self.enforce(
untrusted_token in getattr(self.app, 'api_admin_pending_clone', {}))
token = untrusted_token
del untrusted_token
@ -424,8 +424,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
del self.app.api_admin_pending_clone[token]
# make sure the volume still exists, but invalidate token anyway
assert str(src_volume.pool) in self.app.pools
assert src_volume in self.app.pools[str(src_volume.pool)].volumes
self.enforce(str(src_volume.pool) in self.app.pools)
self.enforce(src_volume in self.app.pools[str(src_volume.pool)].volumes)
dst_volume = self.dest.volumes[self.arg]
@ -446,11 +446,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_volume_resize(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
untrusted_size = untrusted_payload.decode('ascii').strip()
del untrusted_payload
assert untrusted_size.isdigit() # only digits, forbid '-' too
assert len(untrusted_size) <= 20 # limit to about 2^64
self.enforce(untrusted_size.isdigit()) # only digits, forbid '-' too
self.enforce(len(untrusted_size) <= 20) # limit to about 2^64
size = int(untrusted_size)
@ -472,7 +472,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
payload) and response from that call will be actually send to the
caller.
'''
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
self.fire_event_for_permission()
@ -480,7 +480,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
raise qubes.exc.QubesVMNotHaltedError(self.dest)
path = self.dest.storage.import_data(self.arg)
assert ' ' not in path
self.enforce(' ' not in path)
size = self.dest.volumes[self.arg].size
# when we know the action is allowed, inform extensions that it will
@ -493,13 +493,13 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_volume_set_revisions_to_keep(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
try:
untrusted_value = int(untrusted_payload.decode('ascii'))
except (UnicodeDecodeError, ValueError):
raise qubes.api.ProtocolError('Invalid value')
del untrusted_payload
assert untrusted_value >= 0
self.enforce(untrusted_value >= 0)
newvalue = untrusted_value
del untrusted_value
@ -512,7 +512,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_volume_set_rw(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
try:
newvalue = qubes.property.bool(None, None,
untrusted_payload.decode('ascii'))
@ -532,7 +532,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def vm_tag_list(self):
assert not self.arg
self.enforce(not self.arg)
tags = self.dest.tags
@ -579,8 +579,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def pool_list(self):
assert not self.arg
assert self.dest.name == 'dom0'
self.enforce(not self.arg)
self.enforce(self.dest.name == 'dom0')
pools = self.fire_event_for_filter(self.app.pools)
@ -590,8 +590,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def pool_listdrivers(self):
assert self.dest.name == 'dom0'
assert not self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(not self.arg)
drivers = self.fire_event_for_filter(qubes.storage.pool_drivers())
@ -604,8 +604,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def pool_info(self):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
self.enforce(self.dest.name == 'dom0')
self.enforce(self.arg in self.app.pools.keys())
pool = self.app.pools[self.arg]
@ -635,30 +635,32 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def pool_add(self, untrusted_payload):
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
drivers = qubes.storage.pool_drivers()
assert self.arg in drivers
self.enforce(self.arg in drivers)
untrusted_pool_config = untrusted_payload.decode('ascii').splitlines()
del untrusted_payload
assert all(('=' in line) for line in untrusted_pool_config)
self.enforce(all(('=' in line) for line in untrusted_pool_config))
# pairs of (option, value)
untrusted_pool_config = [line.split('=', 1)
for line in untrusted_pool_config]
# reject duplicated options
assert len(set(x[0] for x in untrusted_pool_config)) == \
len([x[0] for x in untrusted_pool_config])
self.enforce(
len(set(x[0] for x in untrusted_pool_config)) ==
len([x[0] for x in untrusted_pool_config]))
# and convert to dict
untrusted_pool_config = dict(untrusted_pool_config)
assert 'name' in untrusted_pool_config
self.enforce('name' in untrusted_pool_config)
untrusted_pool_name = untrusted_pool_config.pop('name')
allowed_chars = string.ascii_letters + string.digits + '-_.'
assert all(c in allowed_chars for c in untrusted_pool_name)
self.enforce(all(c in allowed_chars for c in untrusted_pool_name))
pool_name = untrusted_pool_name
assert pool_name not in self.app.pools
self.enforce(pool_name not in self.app.pools)
driver_parameters = qubes.storage.driver_parameters(self.arg)
assert all(key in driver_parameters for key in untrusted_pool_config)
self.enforce(
all(key in driver_parameters for key in untrusted_pool_config))
pool_config = untrusted_pool_config
self.fire_event_for_permission(name=pool_name,
@ -671,8 +673,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def pool_remove(self):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
self.enforce(self.dest.name == 'dom0')
self.enforce(self.arg in self.app.pools.keys())
self.fire_event_for_permission()
@ -683,15 +685,15 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def pool_set_revisions_to_keep(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
self.enforce(self.dest.name == 'dom0')
self.enforce(self.arg in self.app.pools.keys())
pool = self.app.pools[self.arg]
try:
untrusted_value = int(untrusted_payload.decode('ascii'))
except (UnicodeDecodeError, ValueError):
raise qubes.api.ProtocolError('Invalid value')
del untrusted_payload
assert untrusted_value >= 0
self.enforce(untrusted_value >= 0)
newvalue = untrusted_value
del untrusted_value
@ -704,8 +706,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def label_list(self):
assert self.dest.name == 'dom0'
assert not self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(not self.arg)
labels = self.fire_event_for_filter(self.app.labels.values())
@ -715,7 +717,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def label_get(self):
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
try:
label = self.app.get_label(self.arg)
@ -730,7 +732,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def label_index(self):
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
try:
label = self.app.get_label(self.arg)
@ -745,12 +747,12 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def label_create(self, untrusted_payload):
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
# don't confuse label name with label index
assert not self.arg.isdigit()
self.enforce(not self.arg.isdigit())
allowed_chars = string.ascii_letters + string.digits + '-_.'
assert all(c in allowed_chars for c in self.arg)
self.enforce(all(c in allowed_chars for c in self.arg))
try:
self.app.get_label(self.arg)
except KeyError:
@ -760,10 +762,10 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
raise qubes.exc.QubesValueError('label already exists')
untrusted_payload = untrusted_payload.decode('ascii').strip()
assert len(untrusted_payload) == 8
assert untrusted_payload.startswith('0x')
self.enforce(len(untrusted_payload) == 8)
self.enforce(untrusted_payload.startswith('0x'))
# besides prefix, only hex digits are allowed
assert all(x in string.hexdigits for x in untrusted_payload[2:])
self.enforce(all(x in string.hexdigits for x in untrusted_payload[2:]))
# SEE: #2732
color = untrusted_payload
@ -782,14 +784,14 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def label_remove(self):
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
try:
label = self.app.get_label(self.arg)
except KeyError:
raise qubes.exc.QubesValueError
# don't allow removing default labels
assert label.index > qubes.config.max_default_label
self.enforce(label.index > qubes.config.max_default_label)
# FIXME: this should be in app.add_label()
for vm in self.app.domains:
@ -805,7 +807,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', execute=True)
@asyncio.coroutine
def vm_start(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
try:
yield from self.dest.start()
@ -819,7 +821,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', execute=True)
@asyncio.coroutine
def vm_shutdown(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
yield from self.dest.shutdown()
@ -827,7 +829,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', execute=True)
@asyncio.coroutine
def vm_pause(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
yield from self.dest.pause()
@ -835,7 +837,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', execute=True)
@asyncio.coroutine
def vm_unpause(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
yield from self.dest.unpause()
@ -843,7 +845,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', execute=True)
@asyncio.coroutine
def vm_kill(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
yield from self.dest.kill()
@ -851,7 +853,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def events(self):
assert not self.arg
self.enforce(not self.arg)
# run until client connection is terminated
self.cancellable = True
@ -893,7 +895,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def vm_feature_list(self):
assert not self.arg
self.enforce(not self.arg)
features = self.fire_event_for_filter(self.dest.features.keys())
return ''.join('{}\n'.format(feature) for feature in features)
@ -978,7 +980,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
untrusted_payload=untrusted_payload)
def _vm_create(self, vm_type, allow_pool=False, untrusted_payload=None):
assert self.dest.name == 'dom0'
self.enforce(self.dest.name == 'dom0')
kwargs = {}
pool = None
@ -992,10 +994,10 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
# when given VM class do need a template
if hasattr(vm_class, 'template'):
if self.arg:
assert self.arg in self.app.domains
self.enforce(self.arg in self.app.domains)
kwargs['template'] = self.app.domains[self.arg]
else:
assert not self.arg
self.enforce(not self.arg)
for untrusted_param in untrusted_payload.decode('ascii',
errors='strict').split(' '):
@ -1009,9 +1011,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
elif untrusted_key == 'label':
# don't confuse label name with label index
assert not untrusted_value.isdigit()
self.enforce(not untrusted_value.isdigit())
allowed_chars = string.ascii_letters + string.digits + '-_.'
assert all(c in allowed_chars for c in untrusted_value)
self.enforce(all(c in allowed_chars for c in untrusted_value))
try:
kwargs['label'] = self.app.get_label(untrusted_value)
except KeyError:
@ -1025,8 +1027,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
untrusted_volume = untrusted_key.split(':', 1)[1]
# kind of ugly, but actual list of volumes is available only
# after creating a VM
assert untrusted_volume in ['root', 'private', 'volatile',
'kernel']
self.enforce(untrusted_volume in [
'root', 'private', 'volatile', 'kernel'])
volume = untrusted_volume
if volume in pools:
raise qubes.api.ProtocolError(
@ -1065,7 +1067,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def create_disposable(self):
assert not self.arg
self.enforce(not self.arg)
if self.dest.name == 'dom0':
dispvm_template = self.src.default_dispvm
@ -1084,7 +1086,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', write=True)
@asyncio.coroutine
def vm_remove(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
@ -1116,7 +1118,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
devices = [dev for dev in devices if dev.ident == self.arg]
# no duplicated devices, but device may not exists, in which case
# the list is empty
assert len(devices) <= 1
self.enforce(len(devices) <= 1)
devices = self.fire_event_for_filter(devices, devclass=devclass)
dev_info = {}
@ -1133,7 +1135,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
# specification
(('description', dev.description),)
))
assert '\n' not in properties_txt
self.enforce('\n' not in properties_txt)
dev_info[dev.ident] = properties_txt
return ''.join('{} {}\n'.format(ident, dev_info[ident])
@ -1154,7 +1156,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
== (select_backend, select_ident)]
# no duplicated devices, but device may not exists, in which case
# the list is empty
assert len(device_assignments) <= 1
self.enforce(len(device_assignments) <= 1)
device_assignments = self.fire_event_for_filter(device_assignments,
devclass=devclass)
@ -1166,7 +1168,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
dev.options.items(),
(('persistent', 'yes' if dev.persistent else 'no'),)
))
assert '\n' not in properties_txt
self.enforce('\n' not in properties_txt)
ident = '{!s}+{!s}'.format(dev.backend_domain, dev.ident)
dev_info[ident] = properties_txt
@ -1254,7 +1256,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
def vm_device_set_persistent(self, endpoint, untrusted_payload):
devclass = endpoint
assert untrusted_payload in (b'True', b'False')
self.enforce(untrusted_payload in (b'True', b'False'))
persistent = untrusted_payload == b'True'
del untrusted_payload
@ -1264,7 +1266,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
matching_devices = [dev for dev
in self.dest.devices[devclass].attached()
if dev.backend_domain.name == backend_domain and dev.ident == ident]
assert len(matching_devices) == 1
self.enforce(len(matching_devices) == 1)
dev = matching_devices[0]
self.fire_event_for_permission(device=dev,
@ -1277,7 +1279,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def vm_firewall_get(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
@ -1289,7 +1291,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', write=True)
@asyncio.coroutine
def vm_firewall_set(self, untrusted_payload):
assert not self.arg
self.enforce(not self.arg)
rules = []
for untrusted_line in untrusted_payload.decode('ascii',
errors='strict').splitlines():
@ -1306,7 +1308,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', execute=True)
@asyncio.coroutine
def vm_firewall_reload(self):
assert not self.arg
self.enforce(not self.arg)
self.fire_event_for_permission()
@ -1367,7 +1369,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
'qubes.BackupPassphrase+' + self.arg)
# make it foolproof against "echo passphrase" implementation
passphrase = passphrase.strip()
assert b'\n' not in passphrase
self.enforce(b'\n' not in passphrase)
except subprocess.CalledProcessError:
raise qubes.exc.QubesException(
'Failed to retrieve passphrase from \'{}\' VM'.format(
@ -1410,9 +1412,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True, execute=True)
@asyncio.coroutine
def backup_execute(self):
assert self.dest.name == 'dom0'
assert self.arg
assert '/' not in self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(self.arg)
self.enforce('/' not in self.arg)
self.fire_event_for_permission()
@ -1430,7 +1432,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
self._backup_progress_callback, self.arg)
# forbid running the same backup operation twice at the time
assert self.arg not in self.app.api_admin_running_backups
self.enforce(self.arg not in self.app.api_admin_running_backups)
backup_task = asyncio.ensure_future(backup.backup_do())
self.app.api_admin_running_backups[self.arg] = backup_task
@ -1445,9 +1447,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', execute=True)
@asyncio.coroutine
def backup_cancel(self):
assert self.dest.name == 'dom0'
assert self.arg
assert '/' not in self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(self.arg)
self.enforce('/' not in self.arg)
self.fire_event_for_permission()
@ -1463,9 +1465,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='local', read=True)
@asyncio.coroutine
def backup_info(self):
assert self.dest.name == 'dom0'
assert self.arg
assert '/' not in self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(self.arg)
self.enforce('/' not in self.arg)
self.fire_event_for_permission()
@ -1527,7 +1529,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
scope='global', read=True)
@asyncio.coroutine
def vm_stats(self):
assert not self.arg
self.enforce(not self.arg)
# run until client connection is terminated
self.cancellable = True

View File

@ -39,8 +39,8 @@ class QubesInternalAPI(qubes.api.AbstractQubesAPI):
@qubes.api.method('internal.GetSystemInfo', no_payload=True)
@asyncio.coroutine
def getsysteminfo(self):
assert self.dest.name == 'dom0'
assert not self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(not self.arg)
system_info = {'domains': {
domain.name: {
@ -64,7 +64,7 @@ class QubesInternalAPI(qubes.api.AbstractQubesAPI):
when actual import is finished. Response from this method is sent do
the client (as a response for admin.vm.volume.Import call).
'''
assert self.arg in self.dest.volumes.keys()
self.enforce(self.arg in self.dest.volumes.keys())
success = untrusted_payload == b'ok'
try:

View File

@ -46,8 +46,8 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI):
appropriate extensions. Requests not explicitly handled by some
extension are ignored.
'''
assert self.dest.name == 'dom0'
assert not self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(not self.arg)
prefix = '/features-request/'
@ -59,7 +59,7 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI):
safe_set = string.ascii_letters + string.digits
for untrusted_key in untrusted_features:
untrusted_value = untrusted_features[untrusted_key]
assert all((c in safe_set) for c in untrusted_value)
self.enforce(all((c in safe_set) for c in untrusted_value))
yield from self.src.fire_event_async('features-request',
untrusted_features=untrusted_features)
@ -71,8 +71,8 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI):
'''
Legacy version of qubes.FeaturesRequest, used by Qubes Windows Tools
'''
assert self.dest.name == 'dom0'
assert not self.arg
self.enforce(self.dest.name == 'dom0')
self.enforce(not self.arg)
untrusted_features = {}
safe_set = string.ascii_letters + string.digits
@ -83,7 +83,7 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI):
if untrusted_value:
untrusted_value = untrusted_value.decode('ascii',
errors='strict')
assert all((c in safe_set) for c in untrusted_value)
self.enforce(all((c in safe_set) for c in untrusted_value))
untrusted_features[feature] = untrusted_value
del untrusted_value
@ -102,7 +102,7 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI):
'''
untrusted_update_count = untrusted_payload.strip()
assert untrusted_update_count.isdigit()
self.enforce(untrusted_update_count.isdigit())
# now sanitized
update_count = int(untrusted_update_count)
del untrusted_update_count

View File

@ -20,26 +20,27 @@
#
#
from __future__ import unicode_literals
import itertools
import logging
import functools
import string
import termios
import asyncio
from qubes.utils import size_to_human
import stat
import os
import datetime
import fcntl
import subprocess
import functools
import grp
import itertools
import logging
import os
import pathlib
import pwd
import re
import shutil
import stat
import string
import subprocess
import tempfile
import termios
import time
import grp
import pwd
import datetime
from .utils import size_to_human
import qubes
import qubes.core2migration
import qubes.storage
@ -257,17 +258,14 @@ class Backup(object):
size = qubes.storage.file.get_disk_usage(file_path)
if subdir is None:
abs_file_path = os.path.abspath(file_path)
abs_base_dir = os.path.abspath(
qubes.config.system_path["qubes_base_dir"]) + '/'
abs_file_dir = os.path.dirname(abs_file_path) + '/'
(nothing, directory, subdir) = \
abs_file_dir.partition(abs_base_dir)
assert nothing == ""
assert directory == abs_base_dir
else:
if subdir and not subdir.endswith('/'):
subdir += '/'
abs_file_dir = pathlib.Path(file_path).resolve().parent
abs_base_dir = pathlib.Path(
qubes.config.system_path["qubes_base_dir"]).resolve()
# this raises ValueError if abs_file_dir is not in abs_base_dir
subdir = str(abs_file_dir.relative_to(abs_base_dir))
if not subdir.endswith(os.path.sep):
subdir += os.path.sep
#: real path to the file
self.path = file_path

View File

@ -238,9 +238,9 @@ class DeviceCollection(object):
if device_assignment.bus is None:
device_assignment.bus = self._bus
else:
assert device_assignment.bus == self._bus, \
"Trying to attach DeviceAssignment of a different device class"
elif device_assignment.bus != self._bus:
raise ValueError(
'Trying to attach DeviceAssignment of a different device class')
if not device_assignment.persistent and self._vm.is_halted():
raise qubes.exc.QubesVMNotRunningError(self._vm,

View File

@ -39,9 +39,10 @@ class RuleOption(object):
# subset of string.punctuation
safe_set = string.ascii_letters + string.digits + \
':;,./-_[]'
assert all(x in safe_set for x in str(untrusted_value))
value = str(untrusted_value)
self._value = value
untrusted_value = str(untrusted_value)
if not all(x in safe_set for x in untrusted_value):
raise ValueError('strange characters in rule')
self._value = untrusted_value
@property
def rule(self):
@ -226,9 +227,10 @@ class Comment(RuleOption):
# subset of string.punctuation
safe_set = string.ascii_letters + string.digits + \
':;,./-_[] '
assert all(x in safe_set for x in str(untrusted_value))
value = str(untrusted_value)
self._value = value
untrusted_value = str(untrusted_value)
if not all(x in safe_set for x in untrusted_value):
raise ValueError('strange characters comment')
self._value = untrusted_value
@property
def rule(self):

View File

@ -280,7 +280,8 @@ class Tags(set):
@staticmethod
def validate_tag(tag):
safe_set = string.ascii_letters + string.digits + '-_'
assert all((x in safe_set) for x in tag)
if not all((x in safe_set) for x in tag):
raise ValueError('disallowed characters')
class BaseVM(qubes.PropertyHolder):

View File

@ -302,7 +302,9 @@ class NetVMMixin(qubes.events.Emitter):
if not self.is_running():
raise qubes.exc.QubesVMNotRunningError(self)
assert self.netvm is not None
if self.netvm is None:
raise qubes.exc.QubesVMError(self,
'netvm should not be {}'.format(self.netvm))
if not self.netvm.is_running(): # pylint: disable=no-member
# pylint: disable=no-member
@ -319,7 +321,9 @@ class NetVMMixin(qubes.events.Emitter):
if not self.is_running():
raise qubes.exc.QubesVMNotRunningError(self)
assert self.netvm is not None
if self.netvm is None:
raise qubes.exc.QubesVMError(self,
'netvm should not be {}'.format(self.netvm))
self.libvirt_domain.detachDevice(
self.app.env.get_template('libvirt/devices/net.xml').render(

View File

@ -416,10 +416,13 @@ class PolicyAction(object):
raise AccessDenied(
'denied by policy {}:{}'.format(rule.filename, rule.lineno))
elif rule.action == Action.ask:
assert targets_for_ask is not None
if targets_for_ask is None:
raise AccessDenied(
'invalid policy {}:{}'.format(rule.filename, rule.lineno))
elif rule.action == Action.allow:
assert targets_for_ask is None
assert target is not None
if targets_for_ask is not None or target is None:
raise AccessDenied(
'invalid policy {}:{}'.format(rule.filename, rule.lineno))
self.action = rule.action
def handle_user_response(self, response, target=None):