diff --git a/ci/coveragerc b/ci/coveragerc new file mode 100644 index 00000000..2d47c77e --- /dev/null +++ b/ci/coveragerc @@ -0,0 +1,3 @@ +[run] +source = qubes +omit = qubes/tests/* diff --git a/ci/requirements.txt b/ci/requirements.txt index a702822e..81325bdf 100644 --- a/ci/requirements.txt +++ b/ci/requirements.txt @@ -1,5 +1,6 @@ # WARNING: those requirements are used only for travis-ci.org # they SHOULD NOT be used under normal conditions; use system package manager +coverage docutils jinja2 lxml diff --git a/qubes/__init__.py b/qubes/__init__.py index 697605ce..1a5859df 100644 --- a/qubes/__init__.py +++ b/qubes/__init__.py @@ -31,6 +31,7 @@ import builtins import collections import os import os.path +import string import lxml.etree import qubes.config @@ -104,6 +105,10 @@ class Label(object): self.color, self.name) + def __eq__(self, other): + if isinstance(other, Label): + return self.name == other.name + return NotImplemented @builtins.property def icon_path(self): @@ -193,7 +198,7 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name self._setter = setter self._saver = saver if saver is not None else ( lambda self, prop, value: str(value)) - self._type = type + self.type = type self._default = default self._write_once = write_once self.order = order @@ -245,8 +250,8 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name if self._setter is not None: value = self._setter(instance, self, value) - if self._type not in (None, type(value)): - value = self._type(value) + if self.type not in (None, type(value)): + value = self.type(value) if has_oldvalue: instance.fire_event_pre('property-pre-set:' + self.__name__, @@ -317,6 +322,41 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name 'property {!r} is write-once and already set'.format( self.__name__)) + def sanitize(self, *, untrusted_newvalue): + '''Coarse sanitization of value to be set, before sending it to a + setter. Can raise QubesValueError if the value is invalid. + + :param untrusted_newvalue: value to be validated + :return sanitized value + :raises qubes.exc.QubesValueError + ''' + # do not treat type='str' as sufficient validation + if self.type is not None and self.type is not str: + # assume specific type will preform enough validation + if self.type is bool: + try: + untrusted_newvalue = untrusted_newvalue.decode('ascii') + except UnicodeDecodeError: + raise qubes.exc.QubesValueError + return self.bool(None, None, untrusted_newvalue) + else: + try: + return self.type(untrusted_newvalue) + except ValueError: + raise qubes.exc.QubesValueError + else: + # 'str' or not specified type + try: + untrusted_newvalue = untrusted_newvalue.decode('ascii', + errors='strict') + except UnicodeDecodeError: + raise qubes.exc.QubesValueError + allowed_set = string.printable + if not all(x in allowed_set for x in untrusted_newvalue): + raise qubes.exc.QubesValueError( + 'Invalid characters in property value') + return untrusted_newvalue + # # exceptions @@ -368,7 +408,7 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name return False if lcvalue in ('1', 'yes', 'true', 'on'): return True - raise ValueError( + raise qubes.exc.QubesValueError( 'Invalid literal for boolean property: {!r}'.format(value)) return bool(value) diff --git a/qubes/app.py b/qubes/app.py index ae9380f5..8eeeddba 100644 --- a/qubes/app.py +++ b/qubes/app.py @@ -859,6 +859,7 @@ class Qubes(qubes.PropertyHolder): 7: qubes.Label(7, '0x75507b', 'purple'), 8: qubes.Label(8, '0x000000', 'black'), } + assert max(self.labels.keys()) == qubes.config.max_default_label # check if the default LVM Thin pool qubes_dom0/pool00 exists if os.path.exists('/dev/mapper/qubes_dom0-pool00-tpool'): @@ -872,7 +873,8 @@ class Qubes(qubes.PropertyHolder): self.pools[name] = self._get_pool(**config) self.domains.add( - qubes.vm.adminvm.AdminVM(self, None, qid=0, name='dom0')) + qubes.vm.adminvm.AdminVM(self, None, qid=0, name='dom0', + label='black')) @classmethod def create_empty_store(cls, *args, **kwargs): @@ -1085,7 +1087,7 @@ class Qubes(qubes.PropertyHolder): oldvalue=None): # pylint: disable=unused-argument for vm in self.domains: - if vm.provides_network and vm.property_is_default('netvm'): + if vm.property_is_default('netvm'): # fire property-del:netvm as it is responsible for resetting # netvm to it's default value vm.fire_event('property-del:netvm', diff --git a/qubes/config.py b/qubes/config.py index 2596e239..4d65e873 100644 --- a/qubes/config.py +++ b/qubes/config.py @@ -109,3 +109,6 @@ defaults = { max_qid = 254 max_netid = 254 max_dispid = 10000 +#: built-in standard labels, if creating new one, allocate them above this +# number, at least until label index is removed from API +max_default_label = 8 diff --git a/qubes/mgmt.py b/qubes/mgmt.py index b6dec617..7e29f1c7 100644 --- a/qubes/mgmt.py +++ b/qubes/mgmt.py @@ -23,8 +23,12 @@ Qubes OS Management API ''' import asyncio +import string + +import functools import qubes.vm.qubesvm +import qubes.storage class ProtocolError(AssertionError): @@ -46,6 +50,16 @@ def not_in_api(func): func.not_in_api = True return func + +def no_payload(func): + @functools.wraps(func) + def wrapper(self, untrusted_payload): + if untrusted_payload != b'': + raise ProtocolError('unexpected payload') + return func(self) + return wrapper + + class QubesMgmt(object): '''Implementation of Qubes Management API calls @@ -121,14 +135,15 @@ class QubesMgmt(object): # @asyncio.coroutine - def vm_list(self, untrusted_payload): + @no_payload + def vm_list(self): '''List all the domains''' - assert self.dest.name == 'dom0' assert not self.arg - assert not untrusted_payload - del untrusted_payload - domains = self.fire_event_for_filter(self.app.domains) + if self.dest.name == 'dom0': + domains = self.fire_event_for_filter(self.app.domains) + else: + domains = self.fire_event_for_filter([self.dest]) return ''.join('{} class={} state={}\n'.format( vm.name, @@ -137,40 +152,63 @@ class QubesMgmt(object): for vm in sorted(domains)) @asyncio.coroutine - def vm_property_list(self, untrusted_payload): + @no_payload + def vm_property_list(self): '''List all properties on a qube''' assert not self.arg - assert not untrusted_payload - del untrusted_payload properties = self.fire_event_for_filter(self.dest.property_list()) return ''.join('{}\n'.format(prop.__name__) for prop in properties) @asyncio.coroutine - def vm_property_get(self, untrusted_payload): + @no_payload + def vm_property_get(self): '''Get a value of one property''' assert self.arg in self.dest.property_list() - assert not untrusted_payload - del untrusted_payload self.fire_event_for_permission() + property_def = self.dest.property_get_def(self.arg) + # explicit list to be sure that it matches protocol spec + if isinstance(property_def, qubes.vm.VMProperty): + property_type = 'vm' + elif property_def.type is int: + property_type = 'int' + elif property_def.type is bool: + property_type = 'bool' + elif self.arg == 'label': + property_type = 'label' + else: + property_type = 'str' + try: value = getattr(self.dest, self.arg) except AttributeError: - return 'default=True ' + return 'default=True type={} '.format(property_type) else: - return 'default={} {}'.format( + return 'default={} type={} {}'.format( str(self.dest.property_is_default(self.arg)), - str(value)) + property_type, + str(value) if value is not None else '') @asyncio.coroutine - def vm_property_help(self, untrusted_payload): + def vm_property_set(self, untrusted_payload): + assert self.arg in self.dest.property_list() + + property_def = self.dest.property_get_def(self.arg) + newvalue = property_def.sanitize(untrusted_newvalue=untrusted_payload) + + self.fire_event_for_permission(newvalue=newvalue) + + setattr(self.dest, self.arg, newvalue) + self.app.save() + + @asyncio.coroutine + @no_payload + def vm_property_help(self): '''Get help for one property''' assert self.arg in self.dest.property_list() - assert not untrusted_payload - del untrusted_payload self.fire_event_for_permission() @@ -182,12 +220,239 @@ class QubesMgmt(object): return qubes.utils.format_doc(doc) @asyncio.coroutine - def vm_property_reset(self, untrusted_payload): + @no_payload + def vm_property_reset(self): '''Reset a property to a default value''' assert self.arg in self.dest.property_list() - assert not untrusted_payload - del untrusted_payload self.fire_event_for_permission() delattr(self.dest, self.arg) + self.app.save() + + @asyncio.coroutine + @no_payload + def vm_volume_list(self): + assert not self.arg + + volume_names = self.fire_event_for_filter(self.dest.volumes.keys()) + return ''.join('{}\n'.format(name) for name in volume_names) + + @asyncio.coroutine + @no_payload + def vm_volume_info(self): + assert self.arg in self.dest.volumes.keys() + + self.fire_event_for_permission() + + volume = self.dest.volumes[self.arg] + # properties defined in API + volume_properties = [ + 'pool', 'vid', 'size', 'usage', 'rw', 'internal', 'source', + 'save_on_stop', 'snap_on_start'] + return ''.join('{}={}\n'.format(key, getattr(volume, key)) for key in + volume_properties) + + @asyncio.coroutine + @no_payload + def vm_volume_listsnapshots(self): + assert self.arg in self.dest.volumes.keys() + + volume = self.dest.volumes[self.arg] + revisions = [revision for revision in volume.revisions] + revisions = self.fire_event_for_filter(revisions) + + return ''.join('{}\n'.format(revision) for revision in revisions) + + @asyncio.coroutine + def vm_volume_revert(self, untrusted_payload): + assert self.arg in self.dest.volumes.keys() + untrusted_revision = untrusted_payload.decode('ascii').strip() + del untrusted_payload + + volume = self.dest.volumes[self.arg] + snapshots = volume.revisions + assert untrusted_revision in snapshots + revision = untrusted_revision + + self.fire_event_for_permission(revision=revision) + + self.dest.storage.get_pool(volume).revert(revision) + self.app.save() + + @asyncio.coroutine + def vm_volume_resize(self, untrusted_payload): + assert self.arg in self.dest.volumes.keys() + untrusted_size = untrusted_payload.decode('ascii').strip() + del untrusted_payload + assert untrusted_size.isdigit() # only digits, forbid '-' too + assert len(untrusted_size) <= 20 # limit to about 2^64 + + size = int(untrusted_size) + + self.fire_event_for_permission(size=size) + + self.dest.storage.resize(self.arg, size) + self.app.save() + + @asyncio.coroutine + @no_payload + def pool_list(self): + assert not self.arg + assert self.dest.name == 'dom0' + + pools = self.fire_event_for_filter(self.app.pools) + + return ''.join('{}\n'.format(pool) for pool in pools) + + @asyncio.coroutine + @no_payload + def pool_listdrivers(self): + assert self.dest.name == 'dom0' + assert not self.arg + + drivers = self.fire_event_for_filter(qubes.storage.pool_drivers()) + + return ''.join('{} {}\n'.format( + driver, + ' '.join(qubes.storage.driver_parameters(driver))) + for driver in drivers) + + @asyncio.coroutine + @no_payload + def pool_info(self): + assert self.dest.name == 'dom0' + assert self.arg in self.app.pools.keys() + + pool = self.app.pools[self.arg] + + self.fire_event_for_permission(pool=pool) + + return ''.join('{}={}\n'.format(prop, val) + for prop, val in sorted(pool.config.items())) + + @asyncio.coroutine + def pool_add(self, untrusted_payload): + assert self.dest.name == 'dom0' + drivers = qubes.storage.pool_drivers() + assert self.arg in drivers + untrusted_pool_config = untrusted_payload.decode('ascii').splitlines() + del untrusted_payload + assert all(('=' in line) for line in untrusted_pool_config) + # pairs of (option, value) + untrusted_pool_config = [line.split('=', 1) + for line in untrusted_pool_config] + # reject duplicated options + assert len(set(x[0] for x in untrusted_pool_config)) == \ + len([x[0] for x in untrusted_pool_config]) + # and convert to dict + untrusted_pool_config = dict(untrusted_pool_config) + + assert 'name' in untrusted_pool_config + untrusted_pool_name = untrusted_pool_config.pop('name') + allowed_chars = string.ascii_letters + string.digits + '-_.' + assert all(c in allowed_chars for c in untrusted_pool_name) + pool_name = untrusted_pool_name + assert pool_name not in self.app.pools + + driver_parameters = qubes.storage.driver_parameters(self.arg) + assert all(key in driver_parameters for key in untrusted_pool_config) + pool_config = untrusted_pool_config + + self.fire_event_for_permission(name=pool_name, + pool_config=pool_config) + + self.app.add_pool(name=pool_name, driver=self.arg, **pool_config) + self.app.save() + + @asyncio.coroutine + @no_payload + def pool_remove(self): + assert self.dest.name == 'dom0' + assert self.arg in self.app.pools.keys() + + self.fire_event_for_permission() + + self.app.remove_pool(self.arg) + self.app.save() + + @asyncio.coroutine + @no_payload + def label_list(self): + assert self.dest.name == 'dom0' + assert not self.arg + + labels = self.fire_event_for_filter(self.app.labels.values()) + + return ''.join('{}\n'.format(label.name) for label in labels) + + @asyncio.coroutine + @no_payload + def label_get(self): + assert self.dest.name == 'dom0' + + try: + label = self.app.get_label(self.arg) + except KeyError: + raise qubes.exc.QubesValueError + + self.fire_event_for_permission(label=label) + + return label.color + + @asyncio.coroutine + def label_create(self, untrusted_payload): + assert self.dest.name == 'dom0' + + # don't confuse label name with label index + assert not self.arg.isdigit() + allowed_chars = string.ascii_letters + string.digits + '-_.' + assert all(c in allowed_chars for c in self.arg) + try: + self.app.get_label(self.arg) + except KeyError: + # ok, no such label yet + pass + else: + raise qubes.exc.QubesValueError('label already exists') + + untrusted_payload = untrusted_payload.decode('ascii').strip() + assert len(untrusted_payload) == 8 + assert untrusted_payload.startswith('0x') + # besides prefix, only hex digits are allowed + assert all(x in string.hexdigits for x in untrusted_payload[2:]) + + # SEE: #2732 + color = untrusted_payload + + self.fire_event_for_permission(color=color) + + # allocate new index, but make sure it's outside of default labels set + new_index = max( + qubes.config.max_default_label, *self.app.labels.keys()) + 1 + + label = qubes.Label(new_index, color, self.arg) + self.app.labels[new_index] = label + self.app.save() + + @asyncio.coroutine + @no_payload + def label_remove(self): + assert self.dest.name == 'dom0' + + try: + label = self.app.get_label(self.arg) + except KeyError: + raise qubes.exc.QubesValueError + # don't allow removing default labels + assert label.index > qubes.config.max_default_label + + # FIXME: this should be in app.add_label() + for vm in self.app.domains: + if vm.label == label: + raise qubes.exc.QubesException('label still in use') + + self.fire_event_for_permission(label=label) + + del self.app.labels[label.index] + self.app.save() diff --git a/qubes/storage/__init__.py b/qubes/storage/__init__.py index da57401b..1f8aa44a 100644 --- a/qubes/storage/__init__.py +++ b/qubes/storage/__init__.py @@ -635,6 +635,15 @@ def pool_drivers(): for ep in pkg_resources.iter_entry_points(STORAGE_ENTRY_POINT)] +def driver_parameters(name): + ''' Get __init__ parameters from a driver with out `self` & `name`. ''' + init_function = qubes.utils.get_entry_point_one( + qubes.storage.STORAGE_ENTRY_POINT, name).__init__ + params = init_function.func_code.co_varnames + ignored_params = ['self', 'name'] + return [p for p in params if p not in ignored_params] + + def isodate(seconds=time.time()): ''' Helper method which returns an iso date ''' return datetime.utcfromtimestamp(seconds).isoformat("T") diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 2d63df7f..7b0b2684 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -45,6 +45,7 @@ import unittest from distutils import spawn import lxml.etree +import pkg_resources import qubes.backup import qubes.config @@ -143,12 +144,22 @@ class TestEmitter(qubes.events.Emitter): def fire_event(self, event, **kwargs): effects = super(TestEmitter, self).fire_event(event, **kwargs) - self.fired_events[(event, tuple(kwargs.items()))] += 1 + ev_kwargs = frozenset( + (key, + frozenset(value.items()) if isinstance(value, dict) else value) + for key, value in kwargs.items() + ) + self.fired_events[(event, ev_kwargs)] += 1 return effects def fire_event_pre(self, event, **kwargs): effects = super(TestEmitter, self).fire_event_pre(event, **kwargs) - self.fired_events[(event, tuple(kwargs.items()))] += 1 + ev_kwargs = frozenset( + (key, + frozenset(value.items()) if isinstance(value, dict) else value) + for key, value in kwargs.items() + ) + self.fired_events[(event, ev_kwargs)] += 1 return effects def expectedFailureIfTemplate(templates): @@ -894,6 +905,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.vm.adminvm', 'qubes.tests.app', 'qubes.tests.tarwriter', + 'qubes.tests.mgmt', 'qubes.tests.tools.qvm_device', 'qubes.tests.tools.qvm_firewall', 'qubes.tests.tools.qvm_ls', diff --git a/qubes/tests/mgmt.py b/qubes/tests/mgmt.py new file mode 100644 index 00000000..d55f907a --- /dev/null +++ b/qubes/tests/mgmt.py @@ -0,0 +1,1111 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +''' Tests for management calls endpoints ''' + +import asyncio +import libvirt +import unittest.mock + +import qubes +import qubes.tests +import qubes.mgmt + +# properties defined in API +volume_properties = [ + 'pool', 'vid', 'size', 'usage', 'rw', 'internal', 'source', + 'save_on_stop', 'snap_on_start'] + + +class MgmtTestCase(qubes.tests.QubesTestCase): + def setUp(self): + super().setUp() + app = qubes.Qubes('/tmp/qubes-test.xml', load=False) + app.vmm = unittest.mock.Mock(spec=qubes.app.VMMConnection) + app.load_initial_values() + app.default_kernel = '1.0' + app.default_netvm = None + app.add_new_vm('TemplateVM', label='black', name='test-template') + app.default_template = 'test-template' + app.save = unittest.mock.Mock() + self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1', + template='test-template') + self.app = app + libvirt_attrs = { + 'libvirt_conn.lookupByUUID.return_value.isActive.return_value': + False, + 'libvirt_conn.lookupByUUID.return_value.state.return_value': + [libvirt.VIR_DOMAIN_SHUTOFF], + } + app.vmm.configure_mock(**libvirt_attrs) + + self.emitter = qubes.tests.TestEmitter() + self.app.domains[0].fire_event = self.emitter.fire_event + self.app.domains[0].fire_event_pre = self.emitter.fire_event_pre + + def call_mgmt_func(self, method, dest, arg=b'', payload=b''): + mgmt_obj = qubes.mgmt.QubesMgmt(self.app, b'dom0', method, dest, arg) + + loop = asyncio.get_event_loop() + response = loop.run_until_complete( + mgmt_obj.execute(untrusted_payload=payload)) + self.assertEventFired(self.emitter, + 'mgmt-permission:' + method.decode('ascii')) + return response + + +class TC_00_VMs(MgmtTestCase): + def test_000_vm_list(self): + value = self.call_mgmt_func(b'mgmt.vm.List', b'dom0') + self.assertEqual(value, + 'dom0 class=AdminVM state=Running\n' + 'test-template class=TemplateVM state=Halted\n' + 'test-vm1 class=AppVM state=Halted\n') + + def test_001_vm_list_single(self): + value = self.call_mgmt_func(b'mgmt.vm.List', b'test-vm1') + self.assertEqual(value, + 'test-vm1 class=AppVM state=Halted\n') + + def test_010_vm_property_list(self): + # this test is kind of stupid, but at least check if appropriate + # mgmt-permission event is fired + value = self.call_mgmt_func(b'mgmt.vm.property.List', b'test-vm1') + properties = self.app.domains['test-vm1'].property_list() + self.assertEqual(value, + ''.join('{}\n'.format(prop.__name__) for prop in properties)) + + def test_020_vm_property_get_str(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Get', b'test-vm1', + b'name') + self.assertEqual(value, 'default=False type=str test-vm1') + + def test_021_vm_property_get_int(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Get', b'test-vm1', + b'vcpus') + self.assertEqual(value, 'default=True type=int 42') + + def test_022_vm_property_get_bool(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Get', b'test-vm1', + b'provides_network') + self.assertEqual(value, 'default=True type=bool False') + + def test_023_vm_property_get_label(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Get', b'test-vm1', + b'label') + self.assertEqual(value, 'default=False type=label red') + + def test_024_vm_property_get_vm(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Get', b'test-vm1', + b'template') + self.assertEqual(value, 'default=False type=vm test-template') + + def test_025_vm_property_get_vm_none(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Get', b'test-vm1', + b'netvm') + self.assertEqual(value, 'default=True type=vm ') + + def test_030_vm_property_set_vm(self): + netvm = self.app.add_new_vm('AppVM', label='red', name='test-net', + template='test-template', provides_network=True) + + with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'netvm', b'test-net') + self.assertIsNone(value) + mock.assert_called_once_with(self.vm, 'test-net') + self.app.save.assert_called_once_with() + + def test_032_vm_property_set_vm_invalid1(self): + with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'netvm', b'forbidden-chars/../!') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_033_vm_property_set_vm_invalid2(self): + with unittest.mock.patch('qubes.vm.VMProperty.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'netvm', b'\x80\x90\xa0') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_034_vm_propert_set_bool_true(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'autostart', b'True') + self.assertIsNone(value) + mock.assert_called_once_with(self.vm, True) + self.app.save.assert_called_once_with() + + def test_035_vm_propert_set_bool_false(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'autostart', b'False') + self.assertIsNone(value) + mock.assert_called_once_with(self.vm, False) + self.app.save.assert_called_once_with() + + def test_036_vm_propert_set_bool_invalid1(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'autostart', b'some string') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_037_vm_propert_set_bool_invalid2(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'autostart', b'\x80\x90@#$%^&*(') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_038_vm_propert_set_str(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'kernel', b'1.0') + self.assertIsNone(value) + mock.assert_called_once_with(self.vm, '1.0') + self.app.save.assert_called_once_with() + + def test_039_vm_propert_set_str_invalid1(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'kernel', b'some, non-ASCII: \x80\xd2') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_040_vm_propert_set_int(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'maxmem', b'1024000') + self.assertIsNone(value) + mock.assert_called_once_with(self.vm, 1024000) + self.app.save.assert_called_once_with() + + def test_041_vm_propert_set_int_invalid1(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'maxmem', b'fourty two') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_042_vm_propert_set_label(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'label', b'green') + self.assertIsNone(value) + mock.assert_called_once_with(self.vm, 'green') + self.app.save.assert_called_once_with() + + def test_043_vm_propert_set_label_invalid1(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'maxmem', b'some, non-ASCII: \x80\xd2') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + @unittest.skip('label existence not checked before actual setter yet') + def test_044_vm_propert_set_label_invalid2(self): + with unittest.mock.patch('qubes.property.__set__') as mock: + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.vm.property.Set', b'test-vm1', + b'maxmem', b'non-existing-color') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_050_vm_property_help(self): + value = self.call_mgmt_func(b'mgmt.vm.property.Help', b'test-vm1', + b'label') + self.assertEqual(value, + 'Colourful label assigned to VM. This is where the colour of the ' + 'padlock is set.') + self.assertFalse(self.app.save.called) + + def test_052_vm_property_help_invalid_property(self): + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.property.Help', b'test-vm1', + b'no-such-property') + + self.assertFalse(self.app.save.called) + + def test_060_vm_property_reset(self): + with unittest.mock.patch('qubes.property.__delete__') as mock: + value = self.call_mgmt_func(b'mgmt.vm.property.Reset', b'test-vm1', + b'default_user') + mock.assert_called_with(self.vm) + self.assertIsNone(value) + self.app.save.assert_called_once_with() + + def test_062_vm_property_reset_invalid_property(self): + with unittest.mock.patch('qubes.property.__delete__') as mock: + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.property.Help', b'test-vm1', + b'no-such-property') + self.assertFalse(mock.called) + self.assertFalse(self.app.save.called) + + def test_070_vm_volume_list(self): + self.vm.volumes = unittest.mock.Mock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] + } + self.vm.volumes.configure_mock(**volumes_conf) + value = self.call_mgmt_func(b'mgmt.vm.volume.List', b'test-vm1') + self.assertEqual(value, 'root\nprivate\nvolatile\nkernel\n') + # check if _only_ keys were accessed + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + + def test_080_vm_volume_info(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] + } + for prop in volume_properties: + volumes_conf[ + '__getitem__.return_value.{}'.format(prop)] = prop +'-value' + self.vm.volumes.configure_mock(**volumes_conf) + value = self.call_mgmt_func(b'mgmt.vm.volume.Info', b'test-vm1', + b'private') + self.assertEqual(value, + ''.join('{p}={p}-value\n'.format(p=p) for p in volume_properties)) + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys(), + unittest.mock.call.__getattr__('__getitem__')('private')]) + + def test_080_vm_volume_info_invalid_volume(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] + } + self.vm.volumes.configure_mock(**volumes_conf) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.Info', b'test-vm1', + b'no-such-volume') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + + def test_090_vm_volume_listsnapshots(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + '__getitem__.return_value.revisions': ['rev1', 'rev2'], + } + self.vm.volumes.configure_mock(**volumes_conf) + value = self.call_mgmt_func(b'mgmt.vm.volume.ListSnapshots', + b'test-vm1', b'private') + self.assertEqual(value, + 'rev1\nrev2\n') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys(), + unittest.mock.call.__getattr__('__getitem__')('private')]) + + def test_090_vm_volume_listsnapshots_invalid_volume(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] + } + self.vm.volumes.configure_mock(**volumes_conf) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.ListSnapshots', b'test-vm1', + b'no-such-volume') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + + @unittest.skip('method not implemented yet') + def test_100_vm_volume_snapshot(self): + pass + + @unittest.skip('method not implemented yet') + def test_100_vm_volume_snapshot_invlid_volume(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + '__getitem__.return_value.revisions': ['rev1', 'rev2'], + } + self.vm.volumes.configure_mock(**volumes_conf) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.Snapshots', + b'test-vm1', b'no-such-volume') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + + @unittest.skip('method not implemented yet') + def test_100_vm_volume_snapshot_invalid_revision(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'] + } + self.vm.volumes.configure_mock(**volumes_conf) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.Snapshots', + b'test-vm1', b'private', b'no-such-rev') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys(), + unittest.mock.call.__getattr__('__getitem__')('private')]) + + def test_110_vm_volume_revert(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + '__getitem__.return_value.revisions': ['rev1', 'rev2'], + } + self.vm.volumes.configure_mock(**volumes_conf) + self.vm.storage = unittest.mock.Mock() + value = self.call_mgmt_func(b'mgmt.vm.volume.Revert', + b'test-vm1', b'private', b'rev1') + self.assertIsNone(value) + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys(), + unittest.mock.call.__getattr__('__getitem__')('private')]) + self.assertEqual(self.vm.storage.mock_calls, + [unittest.mock.call.get_pool(self.vm.volumes['private']), + unittest.mock.call.get_pool().revert('rev1')]) + + def test_110_vm_volume_revert_invalid_rev(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + '__getitem__.return_value.revisions': ['rev1', 'rev2'], + } + self.vm.volumes.configure_mock(**volumes_conf) + self.vm.storage = unittest.mock.Mock() + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.Revert', + b'test-vm1', b'private', b'no-such-rev') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys(), + unittest.mock.call.__getattr__('__getitem__')('private')]) + self.assertFalse(self.vm.storage.called) + + def test_120_vm_volume_resize(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + } + self.vm.volumes.configure_mock(**volumes_conf) + self.vm.storage = unittest.mock.Mock() + value = self.call_mgmt_func(b'mgmt.vm.volume.Resize', + b'test-vm1', b'private', b'1024000000') + self.assertIsNone(value) + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + self.assertEqual(self.vm.storage.mock_calls, + [unittest.mock.call.resize('private', 1024000000)]) + + def test_120_vm_volume_resize_invalid_size1(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + } + self.vm.volumes.configure_mock(**volumes_conf) + self.vm.storage = unittest.mock.Mock() + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.Resize', + b'test-vm1', b'private', b'no-int-size') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + self.assertFalse(self.vm.storage.called) + + def test_120_vm_volume_resize_invalid_size2(self): + self.vm.volumes = unittest.mock.MagicMock() + volumes_conf = { + 'keys.return_value': ['root', 'private', 'volatile', 'kernel'], + } + self.vm.volumes.configure_mock(**volumes_conf) + self.vm.storage = unittest.mock.Mock() + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.vm.volume.Resize', + b'test-vm1', b'private', b'-1') + self.assertEqual(self.vm.volumes.mock_calls, + [unittest.mock.call.keys()]) + self.assertFalse(self.vm.storage.called) + + def test_130_pool_list(self): + self.app.pools = ['file', 'lvm'] + value = self.call_mgmt_func(b'mgmt.pool.List', b'dom0') + self.assertEqual(value, 'file\nlvm\n') + self.assertFalse(self.app.save.called) + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_140_pool_listdrivers(self, mock_parameters, mock_drivers): + self.app.pools = ['file', 'lvm'] + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + value = self.call_mgmt_func(b'mgmt.pool.ListDrivers', b'dom0') + self.assertEqual(value, + 'driver1 param1 param2\ndriver2 param3 param4\n') + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, + [unittest.mock.call('driver1'), unittest.mock.call('driver2')]) + self.assertFalse(self.app.save.called) + + def test_150_pool_info(self): + self.app.pools = { + 'pool1': unittest.mock.Mock(config={ + 'param1': 'value1', 'param2': 'value2'}) + } + value = self.call_mgmt_func(b'mgmt.pool.Info', b'dom0', b'pool1') + + self.assertEqual(value, 'param1=value1\nparam2=value2\n') + self.assertFalse(self.app.save.called) + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_160_pool_add(self, mock_parameters, mock_drivers): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock() + } + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + self.app.add_pool = unittest.mock.Mock() + + value = self.call_mgmt_func(b'mgmt.pool.Add', b'dom0', b'driver1', + b'name=test-pool\nparam1=some-value\n') + self.assertIsNone(value) + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, + [unittest.mock.call('driver1')]) + self.assertEqual(self.app.add_pool.mock_calls, + [unittest.mock.call(name='test-pool', driver='driver1', + param1='some-value')]) + self.assertTrue(self.app.save.called) + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_160_pool_add_invalid_driver(self, mock_parameters, mock_drivers): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock() + } + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + self.app.add_pool = unittest.mock.Mock() + + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.pool.Add', b'dom0', + b'no-such-driver', b'name=test-pool\nparam1=some-value\n') + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, []) + self.assertEqual(self.app.add_pool.mock_calls, []) + self.assertFalse(self.app.save.called) + + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_160_pool_add_invalid_param(self, mock_parameters, mock_drivers): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock() + } + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + self.app.add_pool = unittest.mock.Mock() + + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.pool.Add', b'dom0', + b'driver1', b'name=test-pool\nparam3=some-value\n') + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, + [unittest.mock.call('driver1')]) + self.assertEqual(self.app.add_pool.mock_calls, []) + self.assertFalse(self.app.save.called) + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_160_pool_add_missing_name(self, mock_parameters, mock_drivers): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock() + } + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + self.app.add_pool = unittest.mock.Mock() + + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.pool.Add', b'dom0', + b'driver1', b'param1=value\nparam2=some-value\n') + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, []) + self.assertEqual(self.app.add_pool.mock_calls, []) + self.assertFalse(self.app.save.called) + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_160_pool_add_existing_pool(self, mock_parameters, mock_drivers): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock() + } + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + self.app.add_pool = unittest.mock.Mock() + + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.pool.Add', b'dom0', + b'driver1', b'name=file\nparam1=value\nparam2=some-value\n') + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, []) + self.assertEqual(self.app.add_pool.mock_calls, []) + self.assertFalse(self.app.save.called) + + @unittest.mock.patch('qubes.storage.pool_drivers') + @unittest.mock.patch('qubes.storage.driver_parameters') + def test_160_pool_add_invalid_config_format(self, mock_parameters, + mock_drivers): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock() + } + + mock_drivers.return_value = ['driver1', 'driver2'] + mock_parameters.side_effect = \ + lambda driver: { + 'driver1': ['param1', 'param2'], + 'driver2': ['param3', 'param4'] + }[driver] + + self.app.add_pool = unittest.mock.Mock() + + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.pool.Add', b'dom0', + b'driver1', b'name=test-pool\nparam 1=value\n_param2\n') + self.assertEqual(mock_drivers.mock_calls, [unittest.mock.call()]) + self.assertEqual(mock_parameters.mock_calls, []) + self.assertEqual(self.app.add_pool.mock_calls, []) + self.assertFalse(self.app.save.called) + + def test_170_pool_remove(self): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock(), + 'test-pool': unittest.mock.Mock(), + } + self.app.remove_pool = unittest.mock.Mock() + value = self.call_mgmt_func(b'mgmt.pool.Remove', b'dom0', b'test-pool') + self.assertIsNone(value) + self.assertEqual(self.app.remove_pool.mock_calls, + [unittest.mock.call('test-pool')]) + self.assertTrue(self.app.save.called) + + def test_170_pool_remove_invalid_pool(self): + self.app.pools = { + 'file': unittest.mock.Mock(), + 'lvm': unittest.mock.Mock(), + 'test-pool': unittest.mock.Mock(), + } + self.app.remove_pool = unittest.mock.Mock() + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.pool.Remove', b'dom0', + b'no-such-pool') + self.assertEqual(self.app.remove_pool.mock_calls, []) + self.assertFalse(self.app.save.called) + + def test_180_label_list(self): + value = self.call_mgmt_func(b'mgmt.label.List', b'dom0') + self.assertEqual(value, + ''.join('{}\n'.format(l.name) for l in self.app.labels.values())) + self.assertFalse(self.app.save.called) + + def test_190_label_get(self): + self.app.get_label = unittest.mock.Mock() + self.app.get_label.configure_mock(**{'return_value.color': '0xff0000'}) + value = self.call_mgmt_func(b'mgmt.label.Get', b'dom0', b'red') + self.assertEqual(value, '0xff0000') + self.assertEqual(self.app.get_label.mock_calls, + [unittest.mock.call('red')]) + self.assertFalse(self.app.save.called) + + def test_200_label_create(self): + self.app.get_label = unittest.mock.Mock() + self.app.get_label.side_effect=KeyError + self.app.labels = unittest.mock.MagicMock() + labels_config = { + 'keys.return_value': range(1, 9), + } + self.app.labels.configure_mock(**labels_config) + value = self.call_mgmt_func(b'mgmt.label.Create', b'dom0', b'cyan', + b'0x00ffff') + self.assertIsNone(value) + self.assertEqual(self.app.get_label.mock_calls, + [unittest.mock.call('cyan')]) + self.assertEqual(self.app.labels.mock_calls, + [unittest.mock.call.keys(), + unittest.mock.call.__getattr__('__setitem__')(9, + qubes.Label(9, '0x00ffff', 'cyan'))]) + self.assertTrue(self.app.save.called) + + def test_200_label_create_invalid_color(self): + self.app.get_label = unittest.mock.Mock() + self.app.get_label.side_effect=KeyError + self.app.labels = unittest.mock.MagicMock() + labels_config = { + 'keys.return_value': range(1, 9), + } + self.app.labels.configure_mock(**labels_config) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.label.Create', b'dom0', b'cyan', + b'abcd') + self.assertEqual(self.app.get_label.mock_calls, + [unittest.mock.call('cyan')]) + self.assertEqual(self.app.labels.mock_calls, []) + self.assertFalse(self.app.save.called) + + def test_200_label_create_invalid_name(self): + self.app.get_label = unittest.mock.Mock() + self.app.get_label.side_effect=KeyError + self.app.labels = unittest.mock.MagicMock() + labels_config = { + 'keys.return_value': range(1, 9), + } + self.app.labels.configure_mock(**labels_config) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.label.Create', b'dom0', b'01', + b'0xff0000') + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.label.Create', b'dom0', b'../xxx', + b'0xff0000') + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.label.Create', b'dom0', + b'strange-name!@#$', + b'0xff0000') + + self.assertEqual(self.app.get_label.mock_calls, []) + self.assertEqual(self.app.labels.mock_calls, []) + self.assertFalse(self.app.save.called) + + def test_200_label_create_already_exists(self): + self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label) + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.label.Create', b'dom0', b'red', + b'abcd') + self.assertEqual(self.app.get_label.mock_calls, + [unittest.mock.call('red')]) + self.assertFalse(self.app.save.called) + + def test_210_label_remove(self): + label = qubes.Label(9, '0x00ffff', 'cyan') + self.app.labels[9] = label + self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label, + **{'return_value.index': 9}) + self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels) + value = self.call_mgmt_func(b'mgmt.label.Remove', b'dom0', b'cyan') + self.assertIsNone(value) + self.assertEqual(self.app.get_label.mock_calls, + [unittest.mock.call('cyan')]) + self.assertEqual(self.app.labels.mock_calls, + [unittest.mock.call.__delitem__(9)]) + self.assertTrue(self.app.save.called) + + def test_210_label_remove_invalid_label(self): + with self.assertRaises(qubes.exc.QubesValueError): + self.call_mgmt_func(b'mgmt.label.Remove', b'dom0', + b'no-such-label') + self.assertFalse(self.app.save.called) + + def test_210_label_remove_default_label(self): + self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels) + self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label, + **{'return_value.index': 6}) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.label.Remove', b'dom0', + b'blue') + self.assertEqual(self.app.labels.mock_calls, []) + self.assertFalse(self.app.save.called) + + def test_210_label_remove_in_use(self): + self.app.labels = unittest.mock.MagicMock(wraps=self.app.labels) + self.app.get_label = unittest.mock.Mock(wraps=self.app.get_label, + **{'return_value.index': 1}) + with self.assertRaises(AssertionError): + self.call_mgmt_func(b'mgmt.label.Remove', b'dom0', + b'red') + self.assertEqual(self.app.labels.mock_calls, []) + self.assertFalse(self.app.save.called) + + def test_990_vm_unexpected_payload(self): + methods_with_no_payload = [ + b'mgmt.vm.List', + b'mgmt.vm.Remove', + b'mgmt.vm.property.List', + b'mgmt.vm.property.Get', + b'mgmt.vm.property.Help', + b'mgmt.vm.property.HelpRst', + b'mgmt.vm.property.Reset', + b'mgmt.vm.feature.List', + b'mgmt.vm.feature.Get', + b'mgmt.vm.feature.CheckWithTemplate', + b'mgmt.vm.feature.Remove', + b'mgmt.vm.tag.List', + b'mgmt.vm.tag.Get', + b'mgmt.vm.tag.Remove', + b'mgmt.vm.tag.Set', + b'mgmt.vm.firewall.Get', + b'mgmt.vm.firewall.RemoveRule', + b'mgmt.vm.firewall.Flush', + b'mgmt.vm.device.pci.Attach', + b'mgmt.vm.device.pci.Detach', + b'mgmt.vm.device.pci.List', + b'mgmt.vm.device.pci.Available', + b'mgmt.vm.microphone.Attach', + b'mgmt.vm.microphone.Detach', + b'mgmt.vm.microphone.Status', + b'mgmt.vm.volume.ListSnapshots', + b'mgmt.vm.volume.List', + b'mgmt.vm.volume.Info', + b'mgmt.vm.Start', + b'mgmt.vm.Shutdown', + b'mgmt.vm.Pause', + b'mgmt.vm.Unpause', + b'mgmt.vm.Kill', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_payload: + # should reject payload regardless of having argument or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_991_vm_unexpected_argument(self): + methods_with_no_argument = [ + b'mgmt.vm.List', + b'mgmt.vm.Clone', + b'mgmt.vm.Remove', + b'mgmt.vm.property.List', + b'mgmt.vm.feature.List', + b'mgmt.vm.tag.List', + b'mgmt.vm.firewall.List', + b'mgmt.vm.firewall.Flush', + b'mgmt.vm.device.pci.List', + b'mgmt.vm.device.pci.Available', + b'mgmt.vm.microphone.Attach', + b'mgmt.vm.microphone.Detach', + b'mgmt.vm.microphone.Status', + b'mgmt.vm.volume.List', + b'mgmt.vm.Start', + b'mgmt.vm.Shutdown', + b'mgmt.vm.Pause', + b'mgmt.vm.Unpause', + b'mgmt.vm.Kill', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_argument: + # should reject argument regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'unexpected-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_992_dom0_unexpected_payload(self): + methods_with_no_payload = [ + b'mgmt.vmclass.List', + b'mgmt.vm.List', + b'mgmt.label.List', + b'mgmt.label.Get', + b'mgmt.label.Remove', + b'mgmt.property.List', + b'mgmt.property.Get', + b'mgmt.property.Help', + b'mgmt.property.HelpRst', + b'mgmt.property.Reset', + b'mgmt.pool.List', + b'mgmt.pool.ListDrivers', + b'mgmt.pool.Info', + b'mgmt.pool.Remove', + b'mgmt.backup.Execute', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_payload: + # should reject payload regardless of having argument or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'unexpected-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_993_dom0_unexpected_argument(self): + methods_with_no_argument = [ + b'mgmt.vmclass.List', + b'mgmt.vm.List', + b'mgmt.label.List', + b'mgmt.property.List', + b'mgmt.pool.List', + b'mgmt.pool.ListDrivers', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_with_no_argument: + # should reject argument regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'unexpected-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + def test_994_dom0_only_calls(self): + # TODO set some better arguments, to make sure the call was rejected + # because of invalid destination, not invalid arguments + methods_for_dom0_only = [ + b'mgmt.vmclass.List', + b'mgmt.vm.Create.AppVM', + b'mgmt.vm.CreateInPool.AppVM', + b'mgmt.vm.CreateTemplate', + b'mgmt.label.List', + b'mgmt.label.Create', + b'mgmt.label.Get', + b'mgmt.label.Remove', + b'mgmt.property.List', + b'mgmt.property.Get', + b'mgmt.property.Set', + b'mgmt.property.Help', + b'mgmt.property.HelpRst', + b'mgmt.property.Reset', + b'mgmt.pool.List', + b'mgmt.pool.ListDrivers', + b'mgmt.pool.Info', + b'mgmt.pool.Add', + b'mgmt.pool.Remove', + b'mgmt.pool.volume.List', + b'mgmt.pool.volume.Info', + b'mgmt.pool.volume.ListSnapshots', + b'mgmt.pool.volume.Snapshot', + b'mgmt.pool.volume.Revert', + b'mgmt.pool.volume.Resize', + b'mgmt.backup.Execute', + b'mgmt.backup.Info', + b'mgmt.backup.Restore', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_for_dom0_only: + # should reject call regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'', + b'payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'test-vm1', b'some-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + @unittest.skip('undecided') + def test_995_vm_only_calls(self): + # XXX is it really a good idea to prevent those calls this early? + # TODO set some better arguments, to make sure the call was rejected + # because of invalid destination, not invalid arguments + methods_for_vm_only = [ + b'mgmt.vm.Clone', + b'mgmt.vm.Remove', + b'mgmt.vm.property.List', + b'mgmt.vm.property.Get', + b'mgmt.vm.property.Set', + b'mgmt.vm.property.Help', + b'mgmt.vm.property.HelpRst', + b'mgmt.vm.property.Reset', + b'mgmt.vm.feature.List', + b'mgmt.vm.feature.Get', + b'mgmt.vm.feature.Set', + b'mgmt.vm.feature.CheckWithTemplate', + b'mgmt.vm.feature.Remove', + b'mgmt.vm.tag.List', + b'mgmt.vm.tag.Get', + b'mgmt.vm.tag.Remove', + b'mgmt.vm.tag.Set', + b'mgmt.vm.firewall.Get', + b'mgmt.vm.firewall.RemoveRule', + b'mgmt.vm.firewall.InsertRule', + b'mgmt.vm.firewall.Flush', + b'mgmt.vm.device.pci.Attach', + b'mgmt.vm.device.pci.Detach', + b'mgmt.vm.device.pci.List', + b'mgmt.vm.device.pci.Available', + b'mgmt.vm.microphone.Attach', + b'mgmt.vm.microphone.Detach', + b'mgmt.vm.microphone.Status', + b'mgmt.vm.volume.ListSnapshots', + b'mgmt.vm.volume.List', + b'mgmt.vm.volume.Info', + b'mgmt.vm.volume.Revert', + b'mgmt.vm.volume.Resize', + b'mgmt.vm.Start', + b'mgmt.vm.Shutdown', + b'mgmt.vm.Pause', + b'mgmt.vm.Unpause', + b'mgmt.vm.Kill', + ] + # make sure also no methods on actual VM gets called + vm_mock = unittest.mock.MagicMock() + vm_mock.name = self.vm.name + vm_mock.qid = self.vm.qid + vm_mock.__lt__ = (lambda x, y: x.qid < y.qid) + self.app.domains._dict[self.vm.qid] = vm_mock + for method in methods_for_vm_only: + # should reject payload regardless of having argument or not + # should reject call regardless of having payload or not + with self.subTest(method.decode('ascii')): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'', + b'payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) + + with self.subTest(method.decode('ascii') + '+arg+payload'): + with self.assertRaises(AssertionError): + self.call_mgmt_func(method, b'dom0', b'some-arg', + b'some-payload') + self.assertFalse(vm_mock.called) + self.assertFalse(self.app.save.called) diff --git a/qubes/tests/vm/qubesvm.py b/qubes/tests/vm/qubesvm.py index 2fe79ff7..a8d9f626 100644 --- a/qubes/tests/vm/qubesvm.py +++ b/qubes/tests/vm/qubesvm.py @@ -340,20 +340,6 @@ class TC_90_QubesVM(QubesVMTestsMixin,qubes.tests.QubesTestCase): vm = self.get_vm() self._test_generic_bool_property(vm, 'include_in_backups', True) - def test_240_firewall_conf(self): - vm = self.get_vm() - self.assertPropertyDefaultValue(vm, 'firewall_conf', 'firewall.xml') - self.assertPropertyValue(vm, 'firewall_conf', 'other.xml', - 'other.xml', 'other.xml') - del vm.firewall_conf - self.assertPropertyDefaultValue(vm, 'firewall_conf', - 'firewall.xml') - - @unittest.expectedFailure - def test_241_firewall_conf_invalid(self): - vm = self.get_vm() - self.assertPropertyInvalidValue(vm, 'firewall_conf', None) - @qubes.tests.skipUnlessDom0 def test_250_kernel(self): kernels = os.listdir(os.path.join( diff --git a/qubes/tools/qubesd.py b/qubes/tools/qubesd.py index 4c94251b..a624fcc9 100644 --- a/qubes/tools/qubesd.py +++ b/qubes/tools/qubesd.py @@ -64,6 +64,8 @@ class QubesDaemonProtocol(asyncio.Protocol): asyncio.ensure_future(self.respond( src, method, dest, arg, untrusted_payload=untrusted_payload)) + return True + @asyncio.coroutine def respond(self, src, method, dest, arg, *, untrusted_payload): try: @@ -116,7 +118,8 @@ class QubesDaemonProtocol(asyncio.Protocol): def send_response(self, content): self.send_header(0x30) - self.transport.write(content.encode('utf-8')) + if content is not None: + self.transport.write(content.encode('utf-8')) def send_event(self, subject, event, **kwargs): self.send_header(0x31) diff --git a/qubes/vm/__init__.py b/qubes/vm/__init__.py index 50879a06..524d5000 100644 --- a/qubes/vm/__init__.py +++ b/qubes/vm/__init__.py @@ -27,6 +27,7 @@ import datetime import os +import re import subprocess import sys import xml.parsers.expat @@ -40,6 +41,31 @@ import qubes.log import qubes.tools.qvm_ls +def validate_name(holder, prop, value): + ''' Check if value is syntactically correct VM name ''' + if not isinstance(value, str): + raise TypeError('VM name must be string, {!r} found'.format( + type(value).__name__)) + if len(value) > 31: + if holder is not None and prop is not None: + raise qubes.exc.QubesPropertyValueError(holder, prop, value, + '{} value must be shorter than 32 characters'.format( + prop.__name__)) + else: + raise qubes.exc.QubesValueError( + 'VM name must be shorter than 32 characters') + + # this regexp does not contain '+'; if it had it, we should specifically + # disallow 'lost+found' #1440 + if re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", value) is None: + if holder is not None and prop is not None: + raise qubes.exc.QubesPropertyValueError(holder, prop, value, + '{} value contains illegal characters'.format(prop.__name__)) + else: + raise qubes.exc.QubesValueError( + 'VM name contains illegal characters') + + class Features(dict): '''Manager of the features. @@ -332,3 +358,11 @@ class VMProperty(qubes.property): self.vmclass.__name__)) super(VMProperty, self).__set__(instance, vm) + + def sanitize(self, *, untrusted_newvalue): + try: + untrusted_vmname = untrusted_newvalue.decode('ascii') + except UnicodeDecodeError: + raise qubes.exc.QubesValueError + validate_name(None, self, untrusted_vmname) + return untrusted_vmname diff --git a/qubes/vm/adminvm.py b/qubes/vm/adminvm.py index 13671ff4..a571a3de 100644 --- a/qubes/vm/adminvm.py +++ b/qubes/vm/adminvm.py @@ -143,6 +143,9 @@ class AdminVM(qubes.vm.qubesvm.QubesVM): ''' raise qubes.exc.QubesVMError(self, 'Cannot suspend Dom0 fake domain!') + @property + def icon_path(self): + return None # def __init__(self, **kwargs): # super(QubesAdminVm, self).__init__(qid=0, name="dom0", netid=0, diff --git a/qubes/vm/mix/net.py b/qubes/vm/mix/net.py index dc7c9e95..8597e30f 100644 --- a/qubes/vm/mix/net.py +++ b/qubes/vm/mix/net.py @@ -91,8 +91,10 @@ class NetVMMixin(qubes.events.Emitter): doc='''If this domain can act as network provider (formerly known as NetVM or ProxyVM)''') - firewall_conf = qubes.property('firewall_conf', type=str, - default='firewall.xml') + + @property + def firewall_conf(self): + return 'firewall.xml' # # used in networked appvms or proxyvms (netvm is not None) @@ -330,7 +332,7 @@ class NetVMMixin(qubes.events.Emitter): @qubes.events.handler('property-del:netvm') - def on_property_del_netvm(self, event, prop, oldvalue=None): + def on_property_del_netvm(self, event, name, oldvalue=None): ''' Sets the the NetVM to default NetVM ''' # pylint: disable=unused-argument # we are changing to default netvm diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index 2f0a4489..c3eac1b8 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -29,8 +29,8 @@ import base64 import datetime import os import os.path -import re import shutil +import string import subprocess import sys import time @@ -78,18 +78,8 @@ def _setter_qid(self, prop, value): def _setter_name(self, prop, value): ''' Helper for setting the domain name ''' - if not isinstance(value, str): - raise TypeError('{} value must be string, {!r} found'.format( - prop.__name__, type(value).__name__)) - if len(value) > 31: - raise ValueError('{} value must be shorter than 32 characters'.format( - prop.__name__)) + qubes.vm.validate_name(self, prop, value) - # this regexp does not contain '+'; if it had it, we should specifically - # disallow 'lost+found' #1440 - if re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", value) is None: - raise ValueError('{} value contains illegal characters'.format( - prop.__name__)) if self.is_running(): raise qubes.exc.QubesVMNotHaltedError( self, 'Cannot change name of running VM') @@ -102,7 +92,7 @@ def _setter_name(self, prop, value): pass if value in self.app.domains: - raise qubes.exc.QubesValueError( + raise qubes.exc.QubesPropertyValueError(self, prop, value, 'VM named {} alread exists'.format(value)) return value @@ -114,6 +104,9 @@ def _setter_kernel(self, prop, value): if value is None: return value value = str(value) + if '/' in value: + raise qubes.exc.QubesPropertyValueError(self, prop, value, + 'Kernel name cannot contain \'/\'') dirname = os.path.join( qubes.config.system_path['qubes_base_dir'], qubes.config.system_path['qubes_kernels_base_dir'], @@ -148,6 +141,16 @@ def _setter_positive_int(self, prop, value): return value +def _setter_default_user(self, prop, value): + ''' Helper for setting default user ''' + value = str(value) + # specifically forbid: ':', ' ', ''', '"' + allowed_chars = string.ascii_letters + string.digits + '_-+,.' + if not all(c in allowed_chars for c in value): + raise qubes.exc.QubesPropertyValueError(self, prop, value, + 'Username can contain only those characters: ' + allowed_chars) + return value + class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): '''Base functionality of Qubes VM shared between all VMs. @@ -418,10 +421,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): ls_width=2, doc='FIXME') - pool_name = qubes.property('pool_name', - default='default', - doc='storage pool for this qube devices') - # CORE2: swallowed uses_default_kernel kernel = qubes.property('kernel', type=str, setter=_setter_kernel, @@ -448,6 +447,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): default_user = qubes.property('default_user', type=str, default=(lambda self: self.template.default_user if hasattr(self, 'template') else 'user'), + setter=_setter_default_user, ls_width=12, doc='FIXME') @@ -489,6 +489,13 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): default=(lambda self: self.app.default_dispvm), doc='Default VM to be used as Disposable VM for service calls.') + + updateable = qubes.property('updateable', + default=(lambda self: not hasattr(self, 'template')), + type=bool, + setter=qubes.property.forbidden, + doc='True if this machine may be updated on its own.') + # # static, class-wide properties # @@ -576,12 +583,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): self._qdb_connection = qubesdb.QubesDB(self.name) return self._qdb_connection - # XXX shouldn't this go elsewhere? - @property - def updateable(self): - '''True if this machine may be updated on its own.''' - return not hasattr(self, 'template') - @property def dir_path(self): '''Root directory for files related to this domain''' @@ -707,7 +708,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): self.app.pools[vm_pool.name] = vm_pool @qubes.events.handler('property-set:label') - def on_property_set_label(self, event, name, new_label, old_label=None): + def on_property_set_label(self, event, name, newvalue, oldvalue=None): # pylint: disable=unused-argument if self.icon_path: try: @@ -715,10 +716,10 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): except OSError: pass if hasattr(os, "symlink"): - os.symlink(new_label.icon_path, self.icon_path) + os.symlink(newvalue.icon_path, self.icon_path) subprocess.call(['sudo', 'xdg-icon-resource', 'forceupdate']) else: - shutil.copy(new_label.icon_path, self.icon_path) + shutil.copy(newvalue.icon_path, self.icon_path) @qubes.events.handler('property-pre-set:name') def on_property_pre_set_name(self, event, name, newvalue, oldvalue=None): @@ -741,11 +742,11 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): 'qubes-vm@{}.service'.format(oldvalue)]) @qubes.events.handler('property-set:name') - def on_property_set_name(self, event, name, new_name, old_name=None): + def on_property_set_name(self, event, name, newvalue, oldvalue=None): # pylint: disable=unused-argument self.init_log() - self.storage.rename(old_name, new_name) + self.storage.rename(oldvalue, newvalue) if self._libvirt_domain is not None: self.libvirt_domain.undefine() @@ -760,11 +761,11 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): self.autostart = self.autostart @qubes.events.handler('property-pre-set:autostart') - def on_property_pre_set_autostart(self, event, prop, value, + def on_property_pre_set_autostart(self, event, prop, newvalue, oldvalue=None): # pylint: disable=unused-argument # workaround https://bugzilla.redhat.com/show_bug.cgi?id=1181922 - if value: + if newvalue: retcode = subprocess.call( ["sudo", "ln", "-sf", "/usr/lib/systemd/system/qubes-vm@.service", diff --git a/rpm_spec/core-dom0.spec b/rpm_spec/core-dom0.spec index 13309529..5389bdce 100644 --- a/rpm_spec/core-dom0.spec +++ b/rpm_spec/core-dom0.spec @@ -317,6 +317,7 @@ fi %{python3_sitelib}/qubes/tests/events.py %{python3_sitelib}/qubes/tests/firewall.py %{python3_sitelib}/qubes/tests/init.py +%{python3_sitelib}/qubes/tests/mgmt.py %{python3_sitelib}/qubes/tests/storage.py %{python3_sitelib}/qubes/tests/storage_file.py %{python3_sitelib}/qubes/tests/storage_lvm.py diff --git a/run-tests b/run-tests index 85ed63ee..1d0706f6 100755 --- a/run-tests +++ b/run-tests @@ -1,10 +1,10 @@ #!/bin/sh -: "${PYTHON:=python}" +: "${PYTHON:=python3}" : "${TESTPYTHONPATH:=test-packages}" PYTHONPATH="${TESTPYTHONPATH}:${PYTHONPATH}" export PYTHONPATH "${PYTHON}" setup.py egg_info --egg-base "${TESTPYTHONPATH}" -"${PYTHON}" -m qubes.tests.run "$@" +"${PYTHON}" -m coverage run --rcfile=ci/coveragerc -m qubes.tests.run "$@" diff --git a/test-packages/libvirt.py b/test-packages/libvirt.py index 4c850656..c227e986 100644 --- a/test-packages/libvirt.py +++ b/test-packages/libvirt.py @@ -13,3 +13,11 @@ class libvirtError(Exception): def openReadOnly(*args, **kwargs): raise libvirtError('mock module, always raises') + +VIR_DOMAIN_BLOCKED = 2 +VIR_DOMAIN_RUNNING = 1 +VIR_DOMAIN_PAUSED = 3 +VIR_DOMAIN_SHUTDOWN = 4 +VIR_DOMAIN_SHUTOFF = 5 +VIR_DOMAIN_CRASHED = 6 +VIR_DOMAIN_PMSUSPENDED = 7