diff --git a/qubes/api/__init__.py b/qubes/api/__init__.py index 370e05f9..bdb808bb 100644 --- a/qubes/api/__init__.py +++ b/qubes/api/__init__.py @@ -196,6 +196,11 @@ class AbstractQubesAPI(object): return apply_filters(iterable, self.fire_event_for_permission(**kwargs)) + def enforce(self, predicate): + '''An assert replacement, but works even with optimisations.''' + if not predicate: + raise PermissionDenied() + class QubesDaemonProtocol(asyncio.Protocol): buffer_size = 65536 diff --git a/qubes/api/admin.py b/qubes/api/admin.py index a4a803c5..a207d7ee 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -98,8 +98,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def vmclass_list(self): '''List all VM classes''' - assert not self.arg - assert self.dest.name == 'dom0' + self.enforce(not self.arg) + self.enforce(self.dest.name == 'dom0') entrypoints = self.fire_event_for_filter( pkg_resources.iter_entry_points(qubes.vm.VM_ENTRY_POINT)) @@ -112,7 +112,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def vm_list(self): '''List all the domains''' - assert not self.arg + self.enforce(not self.arg) if self.dest.name == 'dom0': domains = self.fire_event_for_filter(self.app.domains) @@ -137,11 +137,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def property_list(self): '''List all global properties''' - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') return self._property_list(self.app) def _property_list(self, dest): - assert not self.arg + self.enforce(not self.arg) properties = self.fire_event_for_filter(dest.property_list()) @@ -159,7 +159,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def property_get(self): '''Get a value of one global property''' - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') return self._property_get(self.app) def _property_get(self, dest): @@ -203,7 +203,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def property_get_default(self): '''Get a value of one global property''' - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') return self._property_get_default(self.app) def _property_get_default(self, dest): @@ -247,7 +247,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def property_set(self, untrusted_payload): '''Set property value''' - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') return self._property_set(self.app, untrusted_payload=untrusted_payload) @@ -275,7 +275,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def property_help(self): '''Get help for one property''' - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') return self._property_help(self.app) def _property_help(self, dest): @@ -303,7 +303,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): @asyncio.coroutine def property_reset(self): '''Reset a property to a default value''' - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') return self._property_reset(self.app) def _property_reset(self, dest): @@ -319,7 +319,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def vm_volume_list(self): - assert not self.arg + self.enforce(not self.arg) volume_names = self.fire_event_for_filter(self.dest.volumes.keys()) return ''.join('{}\n'.format(name) for name in volume_names) @@ -328,7 +328,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def vm_volume_info(self): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) self.fire_event_for_permission() @@ -356,7 +356,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def vm_volume_listsnapshots(self): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) volume = self.dest.volumes[self.arg] id_to_timestamp = volume.revisions @@ -369,13 +369,13 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_volume_revert(self, untrusted_payload): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) untrusted_revision = untrusted_payload.decode('ascii').strip() del untrusted_payload volume = self.dest.volumes[self.arg] snapshots = volume.revisions - assert untrusted_revision in snapshots + self.enforce(untrusted_revision in snapshots) revision = untrusted_revision self.fire_event_for_permission(volume=volume, revision=revision) @@ -391,7 +391,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_volume_clone_from(self): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) volume = self.dest.volumes[self.arg] @@ -403,7 +403,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): self.app.api_admin_pending_clone = {} # don't handle collisions any better - if someone is so much out of # luck, can try again anyway - assert token not in self.app.api_admin_pending_clone + self.enforce(token not in self.app.api_admin_pending_clone) self.app.api_admin_pending_clone[token] = volume return token @@ -412,11 +412,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_volume_clone_to(self, untrusted_payload): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) untrusted_token = untrusted_payload.decode('ascii').strip() del untrusted_payload - assert untrusted_token in getattr(self.app, - 'api_admin_pending_clone', {}) + self.enforce( + untrusted_token in getattr(self.app, 'api_admin_pending_clone', {})) token = untrusted_token del untrusted_token @@ -424,8 +424,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): del self.app.api_admin_pending_clone[token] # make sure the volume still exists, but invalidate token anyway - assert str(src_volume.pool) in self.app.pools - assert src_volume in self.app.pools[str(src_volume.pool)].volumes + self.enforce(str(src_volume.pool) in self.app.pools) + self.enforce(src_volume in self.app.pools[str(src_volume.pool)].volumes) dst_volume = self.dest.volumes[self.arg] @@ -446,11 +446,11 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_volume_resize(self, untrusted_payload): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) untrusted_size = untrusted_payload.decode('ascii').strip() del untrusted_payload - assert untrusted_size.isdigit() # only digits, forbid '-' too - assert len(untrusted_size) <= 20 # limit to about 2^64 + self.enforce(untrusted_size.isdigit()) # only digits, forbid '-' too + self.enforce(len(untrusted_size) <= 20) # limit to about 2^64 size = int(untrusted_size) @@ -472,7 +472,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): payload) and response from that call will be actually send to the caller. ''' - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) self.fire_event_for_permission() @@ -480,7 +480,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): raise qubes.exc.QubesVMNotHaltedError(self.dest) path = self.dest.storage.import_data(self.arg) - assert ' ' not in path + self.enforce(' ' not in path) size = self.dest.volumes[self.arg].size # when we know the action is allowed, inform extensions that it will @@ -493,13 +493,13 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_volume_set_revisions_to_keep(self, untrusted_payload): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) try: untrusted_value = int(untrusted_payload.decode('ascii')) except (UnicodeDecodeError, ValueError): raise qubes.api.ProtocolError('Invalid value') del untrusted_payload - assert untrusted_value >= 0 + self.enforce(untrusted_value >= 0) newvalue = untrusted_value del untrusted_value @@ -512,7 +512,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_volume_set_rw(self, untrusted_payload): - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) try: newvalue = qubes.property.bool(None, None, untrusted_payload.decode('ascii')) @@ -532,7 +532,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def vm_tag_list(self): - assert not self.arg + self.enforce(not self.arg) tags = self.dest.tags @@ -579,8 +579,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def pool_list(self): - assert not self.arg - assert self.dest.name == 'dom0' + self.enforce(not self.arg) + self.enforce(self.dest.name == 'dom0') pools = self.fire_event_for_filter(self.app.pools) @@ -590,8 +590,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def pool_listdrivers(self): - assert self.dest.name == 'dom0' - assert not self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(not self.arg) drivers = self.fire_event_for_filter(qubes.storage.pool_drivers()) @@ -604,8 +604,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def pool_info(self): - assert self.dest.name == 'dom0' - assert self.arg in self.app.pools.keys() + self.enforce(self.dest.name == 'dom0') + self.enforce(self.arg in self.app.pools.keys()) pool = self.app.pools[self.arg] @@ -635,30 +635,32 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def pool_add(self, untrusted_payload): - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') drivers = qubes.storage.pool_drivers() - assert self.arg in drivers + self.enforce(self.arg in drivers) untrusted_pool_config = untrusted_payload.decode('ascii').splitlines() del untrusted_payload - assert all(('=' in line) for line in untrusted_pool_config) + self.enforce(all(('=' in line) for line in untrusted_pool_config)) # pairs of (option, value) untrusted_pool_config = [line.split('=', 1) for line in untrusted_pool_config] # reject duplicated options - assert len(set(x[0] for x in untrusted_pool_config)) == \ - len([x[0] for x in untrusted_pool_config]) + self.enforce( + len(set(x[0] for x in untrusted_pool_config)) == + len([x[0] for x in untrusted_pool_config])) # and convert to dict untrusted_pool_config = dict(untrusted_pool_config) - assert 'name' in untrusted_pool_config + self.enforce('name' in untrusted_pool_config) untrusted_pool_name = untrusted_pool_config.pop('name') allowed_chars = string.ascii_letters + string.digits + '-_.' - assert all(c in allowed_chars for c in untrusted_pool_name) + self.enforce(all(c in allowed_chars for c in untrusted_pool_name)) pool_name = untrusted_pool_name - assert pool_name not in self.app.pools + self.enforce(pool_name not in self.app.pools) driver_parameters = qubes.storage.driver_parameters(self.arg) - assert all(key in driver_parameters for key in untrusted_pool_config) + self.enforce( + all(key in driver_parameters for key in untrusted_pool_config)) pool_config = untrusted_pool_config self.fire_event_for_permission(name=pool_name, @@ -671,8 +673,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def pool_remove(self): - assert self.dest.name == 'dom0' - assert self.arg in self.app.pools.keys() + self.enforce(self.dest.name == 'dom0') + self.enforce(self.arg in self.app.pools.keys()) self.fire_event_for_permission() @@ -683,15 +685,15 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def pool_set_revisions_to_keep(self, untrusted_payload): - assert self.dest.name == 'dom0' - assert self.arg in self.app.pools.keys() + self.enforce(self.dest.name == 'dom0') + self.enforce(self.arg in self.app.pools.keys()) pool = self.app.pools[self.arg] try: untrusted_value = int(untrusted_payload.decode('ascii')) except (UnicodeDecodeError, ValueError): raise qubes.api.ProtocolError('Invalid value') del untrusted_payload - assert untrusted_value >= 0 + self.enforce(untrusted_value >= 0) newvalue = untrusted_value del untrusted_value @@ -704,8 +706,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def label_list(self): - assert self.dest.name == 'dom0' - assert not self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(not self.arg) labels = self.fire_event_for_filter(self.app.labels.values()) @@ -715,7 +717,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def label_get(self): - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') try: label = self.app.get_label(self.arg) @@ -730,7 +732,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def label_index(self): - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') try: label = self.app.get_label(self.arg) @@ -745,12 +747,12 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def label_create(self, untrusted_payload): - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') # don't confuse label name with label index - assert not self.arg.isdigit() + self.enforce(not self.arg.isdigit()) allowed_chars = string.ascii_letters + string.digits + '-_.' - assert all(c in allowed_chars for c in self.arg) + self.enforce(all(c in allowed_chars for c in self.arg)) try: self.app.get_label(self.arg) except KeyError: @@ -760,10 +762,10 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): raise qubes.exc.QubesValueError('label already exists') untrusted_payload = untrusted_payload.decode('ascii').strip() - assert len(untrusted_payload) == 8 - assert untrusted_payload.startswith('0x') + self.enforce(len(untrusted_payload) == 8) + self.enforce(untrusted_payload.startswith('0x')) # besides prefix, only hex digits are allowed - assert all(x in string.hexdigits for x in untrusted_payload[2:]) + self.enforce(all(x in string.hexdigits for x in untrusted_payload[2:])) # SEE: #2732 color = untrusted_payload @@ -782,14 +784,14 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def label_remove(self): - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') try: label = self.app.get_label(self.arg) except KeyError: raise qubes.exc.QubesValueError # don't allow removing default labels - assert label.index > qubes.config.max_default_label + self.enforce(label.index > qubes.config.max_default_label) # FIXME: this should be in app.add_label() for vm in self.app.domains: @@ -805,7 +807,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', execute=True) @asyncio.coroutine def vm_start(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() try: yield from self.dest.start() @@ -819,7 +821,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', execute=True) @asyncio.coroutine def vm_shutdown(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() yield from self.dest.shutdown() @@ -827,7 +829,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', execute=True) @asyncio.coroutine def vm_pause(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() yield from self.dest.pause() @@ -835,7 +837,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', execute=True) @asyncio.coroutine def vm_unpause(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() yield from self.dest.unpause() @@ -843,7 +845,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', execute=True) @asyncio.coroutine def vm_kill(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() yield from self.dest.kill() @@ -851,7 +853,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def events(self): - assert not self.arg + self.enforce(not self.arg) # run until client connection is terminated self.cancellable = True @@ -893,7 +895,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def vm_feature_list(self): - assert not self.arg + self.enforce(not self.arg) features = self.fire_event_for_filter(self.dest.features.keys()) return ''.join('{}\n'.format(feature) for feature in features) @@ -978,7 +980,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): untrusted_payload=untrusted_payload) def _vm_create(self, vm_type, allow_pool=False, untrusted_payload=None): - assert self.dest.name == 'dom0' + self.enforce(self.dest.name == 'dom0') kwargs = {} pool = None @@ -992,10 +994,10 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): # when given VM class do need a template if hasattr(vm_class, 'template'): if self.arg: - assert self.arg in self.app.domains + self.enforce(self.arg in self.app.domains) kwargs['template'] = self.app.domains[self.arg] else: - assert not self.arg + self.enforce(not self.arg) for untrusted_param in untrusted_payload.decode('ascii', errors='strict').split(' '): @@ -1009,9 +1011,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): elif untrusted_key == 'label': # don't confuse label name with label index - assert not untrusted_value.isdigit() + self.enforce(not untrusted_value.isdigit()) allowed_chars = string.ascii_letters + string.digits + '-_.' - assert all(c in allowed_chars for c in untrusted_value) + self.enforce(all(c in allowed_chars for c in untrusted_value)) try: kwargs['label'] = self.app.get_label(untrusted_value) except KeyError: @@ -1025,8 +1027,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): untrusted_volume = untrusted_key.split(':', 1)[1] # kind of ugly, but actual list of volumes is available only # after creating a VM - assert untrusted_volume in ['root', 'private', 'volatile', - 'kernel'] + self.enforce(untrusted_volume in [ + 'root', 'private', 'volatile', 'kernel']) volume = untrusted_volume if volume in pools: raise qubes.api.ProtocolError( @@ -1065,7 +1067,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def create_disposable(self): - assert not self.arg + self.enforce(not self.arg) if self.dest.name == 'dom0': dispvm_template = self.src.default_dispvm @@ -1084,7 +1086,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', write=True) @asyncio.coroutine def vm_remove(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() @@ -1116,7 +1118,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): devices = [dev for dev in devices if dev.ident == self.arg] # no duplicated devices, but device may not exists, in which case # the list is empty - assert len(devices) <= 1 + self.enforce(len(devices) <= 1) devices = self.fire_event_for_filter(devices, devclass=devclass) dev_info = {} @@ -1133,7 +1135,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): # specification (('description', dev.description),) )) - assert '\n' not in properties_txt + self.enforce('\n' not in properties_txt) dev_info[dev.ident] = properties_txt return ''.join('{} {}\n'.format(ident, dev_info[ident]) @@ -1154,7 +1156,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): == (select_backend, select_ident)] # no duplicated devices, but device may not exists, in which case # the list is empty - assert len(device_assignments) <= 1 + self.enforce(len(device_assignments) <= 1) device_assignments = self.fire_event_for_filter(device_assignments, devclass=devclass) @@ -1166,7 +1168,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): dev.options.items(), (('persistent', 'yes' if dev.persistent else 'no'),) )) - assert '\n' not in properties_txt + self.enforce('\n' not in properties_txt) ident = '{!s}+{!s}'.format(dev.backend_domain, dev.ident) dev_info[ident] = properties_txt @@ -1254,7 +1256,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): def vm_device_set_persistent(self, endpoint, untrusted_payload): devclass = endpoint - assert untrusted_payload in (b'True', b'False') + self.enforce(untrusted_payload in (b'True', b'False')) persistent = untrusted_payload == b'True' del untrusted_payload @@ -1264,7 +1266,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): matching_devices = [dev for dev in self.dest.devices[devclass].attached() if dev.backend_domain.name == backend_domain and dev.ident == ident] - assert len(matching_devices) == 1 + self.enforce(len(matching_devices) == 1) dev = matching_devices[0] self.fire_event_for_permission(device=dev, @@ -1277,7 +1279,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def vm_firewall_get(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() @@ -1289,7 +1291,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', write=True) @asyncio.coroutine def vm_firewall_set(self, untrusted_payload): - assert not self.arg + self.enforce(not self.arg) rules = [] for untrusted_line in untrusted_payload.decode('ascii', errors='strict').splitlines(): @@ -1306,7 +1308,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', execute=True) @asyncio.coroutine def vm_firewall_reload(self): - assert not self.arg + self.enforce(not self.arg) self.fire_event_for_permission() @@ -1367,7 +1369,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): 'qubes.BackupPassphrase+' + self.arg) # make it foolproof against "echo passphrase" implementation passphrase = passphrase.strip() - assert b'\n' not in passphrase + self.enforce(b'\n' not in passphrase) except subprocess.CalledProcessError: raise qubes.exc.QubesException( 'Failed to retrieve passphrase from \'{}\' VM'.format( @@ -1410,9 +1412,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True, execute=True) @asyncio.coroutine def backup_execute(self): - assert self.dest.name == 'dom0' - assert self.arg - assert '/' not in self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(self.arg) + self.enforce('/' not in self.arg) self.fire_event_for_permission() @@ -1430,7 +1432,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): self._backup_progress_callback, self.arg) # forbid running the same backup operation twice at the time - assert self.arg not in self.app.api_admin_running_backups + self.enforce(self.arg not in self.app.api_admin_running_backups) backup_task = asyncio.ensure_future(backup.backup_do()) self.app.api_admin_running_backups[self.arg] = backup_task @@ -1445,9 +1447,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', execute=True) @asyncio.coroutine def backup_cancel(self): - assert self.dest.name == 'dom0' - assert self.arg - assert '/' not in self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(self.arg) + self.enforce('/' not in self.arg) self.fire_event_for_permission() @@ -1463,9 +1465,9 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='local', read=True) @asyncio.coroutine def backup_info(self): - assert self.dest.name == 'dom0' - assert self.arg - assert '/' not in self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(self.arg) + self.enforce('/' not in self.arg) self.fire_event_for_permission() @@ -1527,7 +1529,7 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI): scope='global', read=True) @asyncio.coroutine def vm_stats(self): - assert not self.arg + self.enforce(not self.arg) # run until client connection is terminated self.cancellable = True diff --git a/qubes/api/internal.py b/qubes/api/internal.py index 0773a98f..2df550fe 100644 --- a/qubes/api/internal.py +++ b/qubes/api/internal.py @@ -39,8 +39,8 @@ class QubesInternalAPI(qubes.api.AbstractQubesAPI): @qubes.api.method('internal.GetSystemInfo', no_payload=True) @asyncio.coroutine def getsysteminfo(self): - assert self.dest.name == 'dom0' - assert not self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(not self.arg) system_info = {'domains': { domain.name: { @@ -64,7 +64,7 @@ class QubesInternalAPI(qubes.api.AbstractQubesAPI): when actual import is finished. Response from this method is sent do the client (as a response for admin.vm.volume.Import call). ''' - assert self.arg in self.dest.volumes.keys() + self.enforce(self.arg in self.dest.volumes.keys()) success = untrusted_payload == b'ok' try: diff --git a/qubes/api/misc.py b/qubes/api/misc.py index 50652e3c..c6a9e677 100644 --- a/qubes/api/misc.py +++ b/qubes/api/misc.py @@ -46,8 +46,8 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI): appropriate extensions. Requests not explicitly handled by some extension are ignored. ''' - assert self.dest.name == 'dom0' - assert not self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(not self.arg) prefix = '/features-request/' @@ -59,7 +59,7 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI): safe_set = string.ascii_letters + string.digits for untrusted_key in untrusted_features: untrusted_value = untrusted_features[untrusted_key] - assert all((c in safe_set) for c in untrusted_value) + self.enforce(all((c in safe_set) for c in untrusted_value)) yield from self.src.fire_event_async('features-request', untrusted_features=untrusted_features) @@ -71,8 +71,8 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI): ''' Legacy version of qubes.FeaturesRequest, used by Qubes Windows Tools ''' - assert self.dest.name == 'dom0' - assert not self.arg + self.enforce(self.dest.name == 'dom0') + self.enforce(not self.arg) untrusted_features = {} safe_set = string.ascii_letters + string.digits @@ -83,7 +83,7 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI): if untrusted_value: untrusted_value = untrusted_value.decode('ascii', errors='strict') - assert all((c in safe_set) for c in untrusted_value) + self.enforce(all((c in safe_set) for c in untrusted_value)) untrusted_features[feature] = untrusted_value del untrusted_value @@ -102,7 +102,7 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI): ''' untrusted_update_count = untrusted_payload.strip() - assert untrusted_update_count.isdigit() + self.enforce(untrusted_update_count.isdigit()) # now sanitized update_count = int(untrusted_update_count) del untrusted_update_count diff --git a/qubes/backup.py b/qubes/backup.py index e3e6f1ec..a792692f 100644 --- a/qubes/backup.py +++ b/qubes/backup.py @@ -20,26 +20,27 @@ # # from __future__ import unicode_literals -import itertools -import logging -import functools -import string -import termios import asyncio - -from qubes.utils import size_to_human -import stat -import os +import datetime import fcntl -import subprocess +import functools +import grp +import itertools +import logging +import os +import pathlib +import pwd import re import shutil +import stat +import string +import subprocess import tempfile +import termios import time -import grp -import pwd -import datetime + +from .utils import size_to_human import qubes import qubes.core2migration import qubes.storage @@ -257,17 +258,14 @@ class Backup(object): size = qubes.storage.file.get_disk_usage(file_path) if subdir is None: - abs_file_path = os.path.abspath(file_path) - abs_base_dir = os.path.abspath( - qubes.config.system_path["qubes_base_dir"]) + '/' - abs_file_dir = os.path.dirname(abs_file_path) + '/' - (nothing, directory, subdir) = \ - abs_file_dir.partition(abs_base_dir) - assert nothing == "" - assert directory == abs_base_dir - else: - if subdir and not subdir.endswith('/'): - subdir += '/' + abs_file_dir = pathlib.Path(file_path).resolve().parent + abs_base_dir = pathlib.Path( + qubes.config.system_path["qubes_base_dir"]).resolve() + # this raises ValueError if abs_file_dir is not in abs_base_dir + subdir = str(abs_file_dir.relative_to(abs_base_dir)) + + if not subdir.endswith(os.path.sep): + subdir += os.path.sep #: real path to the file self.path = file_path diff --git a/qubes/devices.py b/qubes/devices.py index 565f325c..11849691 100644 --- a/qubes/devices.py +++ b/qubes/devices.py @@ -238,9 +238,9 @@ class DeviceCollection(object): if device_assignment.bus is None: device_assignment.bus = self._bus - else: - assert device_assignment.bus == self._bus, \ - "Trying to attach DeviceAssignment of a different device class" + elif device_assignment.bus != self._bus: + raise ValueError( + 'Trying to attach DeviceAssignment of a different device class') if not device_assignment.persistent and self._vm.is_halted(): raise qubes.exc.QubesVMNotRunningError(self._vm, diff --git a/qubes/firewall.py b/qubes/firewall.py index dbf2a9e3..36e2c3b0 100644 --- a/qubes/firewall.py +++ b/qubes/firewall.py @@ -39,9 +39,10 @@ class RuleOption(object): # subset of string.punctuation safe_set = string.ascii_letters + string.digits + \ ':;,./-_[]' - assert all(x in safe_set for x in str(untrusted_value)) - value = str(untrusted_value) - self._value = value + untrusted_value = str(untrusted_value) + if not all(x in safe_set for x in untrusted_value): + raise ValueError('strange characters in rule') + self._value = untrusted_value @property def rule(self): @@ -226,9 +227,10 @@ class Comment(RuleOption): # subset of string.punctuation safe_set = string.ascii_letters + string.digits + \ ':;,./-_[] ' - assert all(x in safe_set for x in str(untrusted_value)) - value = str(untrusted_value) - self._value = value + untrusted_value = str(untrusted_value) + if not all(x in safe_set for x in untrusted_value): + raise ValueError('strange characters comment') + self._value = untrusted_value @property def rule(self): diff --git a/qubes/vm/__init__.py b/qubes/vm/__init__.py index 33b49a96..517e08ec 100644 --- a/qubes/vm/__init__.py +++ b/qubes/vm/__init__.py @@ -280,7 +280,8 @@ class Tags(set): @staticmethod def validate_tag(tag): safe_set = string.ascii_letters + string.digits + '-_' - assert all((x in safe_set) for x in tag) + if not all((x in safe_set) for x in tag): + raise ValueError('disallowed characters') class BaseVM(qubes.PropertyHolder): diff --git a/qubes/vm/mix/net.py b/qubes/vm/mix/net.py index 4ceeb858..98a2e6a2 100644 --- a/qubes/vm/mix/net.py +++ b/qubes/vm/mix/net.py @@ -302,7 +302,9 @@ class NetVMMixin(qubes.events.Emitter): if not self.is_running(): raise qubes.exc.QubesVMNotRunningError(self) - assert self.netvm is not None + if self.netvm is None: + raise qubes.exc.QubesVMError(self, + 'netvm should not be {}'.format(self.netvm)) if not self.netvm.is_running(): # pylint: disable=no-member # pylint: disable=no-member @@ -319,7 +321,9 @@ class NetVMMixin(qubes.events.Emitter): if not self.is_running(): raise qubes.exc.QubesVMNotRunningError(self) - assert self.netvm is not None + if self.netvm is None: + raise qubes.exc.QubesVMError(self, + 'netvm should not be {}'.format(self.netvm)) self.libvirt_domain.detachDevice( self.app.env.get_template('libvirt/devices/net.xml').render( diff --git a/qubespolicy/__init__.py b/qubespolicy/__init__.py index c8c6c042..7e1b9c61 100755 --- a/qubespolicy/__init__.py +++ b/qubespolicy/__init__.py @@ -416,10 +416,13 @@ class PolicyAction(object): raise AccessDenied( 'denied by policy {}:{}'.format(rule.filename, rule.lineno)) elif rule.action == Action.ask: - assert targets_for_ask is not None + if targets_for_ask is None: + raise AccessDenied( + 'invalid policy {}:{}'.format(rule.filename, rule.lineno)) elif rule.action == Action.allow: - assert targets_for_ask is None - assert target is not None + if targets_for_ask is not None or target is None: + raise AccessDenied( + 'invalid policy {}:{}'.format(rule.filename, rule.lineno)) self.action = rule.action def handle_user_response(self, response, target=None):