Merge remote-tracking branch 'marmarek/core3-mgmt-methods1' into core3-devel

Conflicts:
	qubes/mgmt.py
This commit is contained in:
Wojtek Porczyk 2017-04-01 18:33:54 +02:00
commit 1b9479837a
18 changed files with 1562 additions and 78 deletions

3
ci/coveragerc Normal file
View File

@ -0,0 +1,3 @@
[run]
source = qubes
omit = qubes/tests/*

View File

@ -1,5 +1,6 @@
# WARNING: those requirements are used only for travis-ci.org
# they SHOULD NOT be used under normal conditions; use system package manager
coverage
docutils
jinja2
lxml

View File

@ -31,6 +31,7 @@ import builtins
import collections
import os
import os.path
import string
import lxml.etree
import qubes.config
@ -104,6 +105,10 @@ class Label(object):
self.color,
self.name)
def __eq__(self, other):
if isinstance(other, Label):
return self.name == other.name
return NotImplemented
@builtins.property
def icon_path(self):
@ -193,7 +198,7 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
self._setter = setter
self._saver = saver if saver is not None else (
lambda self, prop, value: str(value))
self._type = type
self.type = type
self._default = default
self._write_once = write_once
self.order = order
@ -245,8 +250,8 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
if self._setter is not None:
value = self._setter(instance, self, value)
if self._type not in (None, type(value)):
value = self._type(value)
if self.type not in (None, type(value)):
value = self.type(value)
if has_oldvalue:
instance.fire_event_pre('property-pre-set:' + self.__name__,
@ -317,6 +322,41 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
'property {!r} is write-once and already set'.format(
self.__name__))
def sanitize(self, *, untrusted_newvalue):
'''Coarse sanitization of value to be set, before sending it to a
setter. Can raise QubesValueError if the value is invalid.
:param untrusted_newvalue: value to be validated
:return sanitized value
:raises qubes.exc.QubesValueError
'''
# do not treat type='str' as sufficient validation
if self.type is not None and self.type is not str:
# assume specific type will preform enough validation
if self.type is bool:
try:
untrusted_newvalue = untrusted_newvalue.decode('ascii')
except UnicodeDecodeError:
raise qubes.exc.QubesValueError
return self.bool(None, None, untrusted_newvalue)
else:
try:
return self.type(untrusted_newvalue)
except ValueError:
raise qubes.exc.QubesValueError
else:
# 'str' or not specified type
try:
untrusted_newvalue = untrusted_newvalue.decode('ascii',
errors='strict')
except UnicodeDecodeError:
raise qubes.exc.QubesValueError
allowed_set = string.printable
if not all(x in allowed_set for x in untrusted_newvalue):
raise qubes.exc.QubesValueError(
'Invalid characters in property value')
return untrusted_newvalue
#
# exceptions
@ -368,7 +408,7 @@ class property(object): # pylint: disable=redefined-builtin,invalid-name
return False
if lcvalue in ('1', 'yes', 'true', 'on'):
return True
raise ValueError(
raise qubes.exc.QubesValueError(
'Invalid literal for boolean property: {!r}'.format(value))
return bool(value)

View File

@ -859,6 +859,7 @@ class Qubes(qubes.PropertyHolder):
7: qubes.Label(7, '0x75507b', 'purple'),
8: qubes.Label(8, '0x000000', 'black'),
}
assert max(self.labels.keys()) == qubes.config.max_default_label
# check if the default LVM Thin pool qubes_dom0/pool00 exists
if os.path.exists('/dev/mapper/qubes_dom0-pool00-tpool'):
@ -872,7 +873,8 @@ class Qubes(qubes.PropertyHolder):
self.pools[name] = self._get_pool(**config)
self.domains.add(
qubes.vm.adminvm.AdminVM(self, None, qid=0, name='dom0'))
qubes.vm.adminvm.AdminVM(self, None, qid=0, name='dom0',
label='black'))
@classmethod
def create_empty_store(cls, *args, **kwargs):
@ -1085,7 +1087,7 @@ class Qubes(qubes.PropertyHolder):
oldvalue=None):
# pylint: disable=unused-argument
for vm in self.domains:
if vm.provides_network and vm.property_is_default('netvm'):
if vm.property_is_default('netvm'):
# fire property-del:netvm as it is responsible for resetting
# netvm to it's default value
vm.fire_event('property-del:netvm',

View File

@ -109,3 +109,6 @@ defaults = {
max_qid = 254
max_netid = 254
max_dispid = 10000
#: built-in standard labels, if creating new one, allocate them above this
# number, at least until label index is removed from API
max_default_label = 8

View File

@ -23,8 +23,12 @@ Qubes OS Management API
'''
import asyncio
import string
import functools
import qubes.vm.qubesvm
import qubes.storage
class ProtocolError(AssertionError):
@ -46,6 +50,16 @@ def not_in_api(func):
func.not_in_api = True
return func
def no_payload(func):
@functools.wraps(func)
def wrapper(self, untrusted_payload):
if untrusted_payload != b'':
raise ProtocolError('unexpected payload')
return func(self)
return wrapper
class QubesMgmt(object):
'''Implementation of Qubes Management API calls
@ -121,14 +135,15 @@ class QubesMgmt(object):
#
@asyncio.coroutine
def vm_list(self, untrusted_payload):
@no_payload
def vm_list(self):
'''List all the domains'''
assert self.dest.name == 'dom0'
assert not self.arg
assert not untrusted_payload
del untrusted_payload
if self.dest.name == 'dom0':
domains = self.fire_event_for_filter(self.app.domains)
else:
domains = self.fire_event_for_filter([self.dest])
return ''.join('{} class={} state={}\n'.format(
vm.name,
@ -137,40 +152,63 @@ class QubesMgmt(object):
for vm in sorted(domains))
@asyncio.coroutine
def vm_property_list(self, untrusted_payload):
@no_payload
def vm_property_list(self):
'''List all properties on a qube'''
assert not self.arg
assert not untrusted_payload
del untrusted_payload
properties = self.fire_event_for_filter(self.dest.property_list())
return ''.join('{}\n'.format(prop.__name__) for prop in properties)
@asyncio.coroutine
def vm_property_get(self, untrusted_payload):
@no_payload
def vm_property_get(self):
'''Get a value of one property'''
assert self.arg in self.dest.property_list()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
property_def = self.dest.property_get_def(self.arg)
# explicit list to be sure that it matches protocol spec
if isinstance(property_def, qubes.vm.VMProperty):
property_type = 'vm'
elif property_def.type is int:
property_type = 'int'
elif property_def.type is bool:
property_type = 'bool'
elif self.arg == 'label':
property_type = 'label'
else:
property_type = 'str'
try:
value = getattr(self.dest, self.arg)
except AttributeError:
return 'default=True '
return 'default=True type={} '.format(property_type)
else:
return 'default={} {}'.format(
return 'default={} type={} {}'.format(
str(self.dest.property_is_default(self.arg)),
str(value))
property_type,
str(value) if value is not None else '')
@asyncio.coroutine
def vm_property_help(self, untrusted_payload):
def vm_property_set(self, untrusted_payload):
assert self.arg in self.dest.property_list()
property_def = self.dest.property_get_def(self.arg)
newvalue = property_def.sanitize(untrusted_newvalue=untrusted_payload)
self.fire_event_for_permission(newvalue=newvalue)
setattr(self.dest, self.arg, newvalue)
self.app.save()
@asyncio.coroutine
@no_payload
def vm_property_help(self):
'''Get help for one property'''
assert self.arg in self.dest.property_list()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
@ -182,12 +220,239 @@ class QubesMgmt(object):
return qubes.utils.format_doc(doc)
@asyncio.coroutine
def vm_property_reset(self, untrusted_payload):
@no_payload
def vm_property_reset(self):
'''Reset a property to a default value'''
assert self.arg in self.dest.property_list()
assert not untrusted_payload
del untrusted_payload
self.fire_event_for_permission()
delattr(self.dest, self.arg)
self.app.save()
@asyncio.coroutine
@no_payload
def vm_volume_list(self):
assert not self.arg
volume_names = self.fire_event_for_filter(self.dest.volumes.keys())
return ''.join('{}\n'.format(name) for name in volume_names)
@asyncio.coroutine
@no_payload
def vm_volume_info(self):
assert self.arg in self.dest.volumes.keys()
self.fire_event_for_permission()
volume = self.dest.volumes[self.arg]
# properties defined in API
volume_properties = [
'pool', 'vid', 'size', 'usage', 'rw', 'internal', 'source',
'save_on_stop', 'snap_on_start']
return ''.join('{}={}\n'.format(key, getattr(volume, key)) for key in
volume_properties)
@asyncio.coroutine
@no_payload
def vm_volume_listsnapshots(self):
assert self.arg in self.dest.volumes.keys()
volume = self.dest.volumes[self.arg]
revisions = [revision for revision in volume.revisions]
revisions = self.fire_event_for_filter(revisions)
return ''.join('{}\n'.format(revision) for revision in revisions)
@asyncio.coroutine
def vm_volume_revert(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
untrusted_revision = untrusted_payload.decode('ascii').strip()
del untrusted_payload
volume = self.dest.volumes[self.arg]
snapshots = volume.revisions
assert untrusted_revision in snapshots
revision = untrusted_revision
self.fire_event_for_permission(revision=revision)
self.dest.storage.get_pool(volume).revert(revision)
self.app.save()
@asyncio.coroutine
def vm_volume_resize(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
untrusted_size = untrusted_payload.decode('ascii').strip()
del untrusted_payload
assert untrusted_size.isdigit() # only digits, forbid '-' too
assert len(untrusted_size) <= 20 # limit to about 2^64
size = int(untrusted_size)
self.fire_event_for_permission(size=size)
self.dest.storage.resize(self.arg, size)
self.app.save()
@asyncio.coroutine
@no_payload
def pool_list(self):
assert not self.arg
assert self.dest.name == 'dom0'
pools = self.fire_event_for_filter(self.app.pools)
return ''.join('{}\n'.format(pool) for pool in pools)
@asyncio.coroutine
@no_payload
def pool_listdrivers(self):
assert self.dest.name == 'dom0'
assert not self.arg
drivers = self.fire_event_for_filter(qubes.storage.pool_drivers())
return ''.join('{} {}\n'.format(
driver,
' '.join(qubes.storage.driver_parameters(driver)))
for driver in drivers)
@asyncio.coroutine
@no_payload
def pool_info(self):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
pool = self.app.pools[self.arg]
self.fire_event_for_permission(pool=pool)
return ''.join('{}={}\n'.format(prop, val)
for prop, val in sorted(pool.config.items()))
@asyncio.coroutine
def pool_add(self, untrusted_payload):
assert self.dest.name == 'dom0'
drivers = qubes.storage.pool_drivers()
assert self.arg in drivers
untrusted_pool_config = untrusted_payload.decode('ascii').splitlines()
del untrusted_payload
assert all(('=' in line) for line in untrusted_pool_config)
# pairs of (option, value)
untrusted_pool_config = [line.split('=', 1)
for line in untrusted_pool_config]
# reject duplicated options
assert len(set(x[0] for x in untrusted_pool_config)) == \
len([x[0] for x in untrusted_pool_config])
# and convert to dict
untrusted_pool_config = dict(untrusted_pool_config)
assert 'name' in untrusted_pool_config
untrusted_pool_name = untrusted_pool_config.pop('name')
allowed_chars = string.ascii_letters + string.digits + '-_.'
assert all(c in allowed_chars for c in untrusted_pool_name)
pool_name = untrusted_pool_name
assert pool_name not in self.app.pools
driver_parameters = qubes.storage.driver_parameters(self.arg)
assert all(key in driver_parameters for key in untrusted_pool_config)
pool_config = untrusted_pool_config
self.fire_event_for_permission(name=pool_name,
pool_config=pool_config)
self.app.add_pool(name=pool_name, driver=self.arg, **pool_config)
self.app.save()
@asyncio.coroutine
@no_payload
def pool_remove(self):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
self.fire_event_for_permission()
self.app.remove_pool(self.arg)
self.app.save()
@asyncio.coroutine
@no_payload
def label_list(self):
assert self.dest.name == 'dom0'
assert not self.arg
labels = self.fire_event_for_filter(self.app.labels.values())
return ''.join('{}\n'.format(label.name) for label in labels)
@asyncio.coroutine
@no_payload
def label_get(self):
assert self.dest.name == 'dom0'
try:
label = self.app.get_label(self.arg)
except KeyError:
raise qubes.exc.QubesValueError
self.fire_event_for_permission(label=label)
return label.color
@asyncio.coroutine
def label_create(self, untrusted_payload):
assert self.dest.name == 'dom0'
# don't confuse label name with label index
assert not self.arg.isdigit()
allowed_chars = string.ascii_letters + string.digits + '-_.'
assert all(c in allowed_chars for c in self.arg)
try:
self.app.get_label(self.arg)
except KeyError:
# ok, no such label yet
pass
else:
raise qubes.exc.QubesValueError('label already exists')
untrusted_payload = untrusted_payload.decode('ascii').strip()
assert len(untrusted_payload) == 8
assert untrusted_payload.startswith('0x')
# besides prefix, only hex digits are allowed
assert all(x in string.hexdigits for x in untrusted_payload[2:])
# SEE: #2732
color = untrusted_payload
self.fire_event_for_permission(color=color)
# allocate new index, but make sure it's outside of default labels set
new_index = max(
qubes.config.max_default_label, *self.app.labels.keys()) + 1
label = qubes.Label(new_index, color, self.arg)
self.app.labels[new_index] = label
self.app.save()
@asyncio.coroutine
@no_payload
def label_remove(self):
assert self.dest.name == 'dom0'
try:
label = self.app.get_label(self.arg)
except KeyError:
raise qubes.exc.QubesValueError
# don't allow removing default labels
assert label.index > qubes.config.max_default_label
# FIXME: this should be in app.add_label()
for vm in self.app.domains:
if vm.label == label:
raise qubes.exc.QubesException('label still in use')
self.fire_event_for_permission(label=label)
del self.app.labels[label.index]
self.app.save()

View File

@ -635,6 +635,15 @@ def pool_drivers():
for ep in pkg_resources.iter_entry_points(STORAGE_ENTRY_POINT)]
def driver_parameters(name):
''' Get __init__ parameters from a driver with out `self` & `name`. '''
init_function = qubes.utils.get_entry_point_one(
qubes.storage.STORAGE_ENTRY_POINT, name).__init__
params = init_function.func_code.co_varnames
ignored_params = ['self', 'name']
return [p for p in params if p not in ignored_params]
def isodate(seconds=time.time()):
''' Helper method which returns an iso date '''
return datetime.utcfromtimestamp(seconds).isoformat("T")

View File

@ -45,6 +45,7 @@ import unittest
from distutils import spawn
import lxml.etree
import pkg_resources
import qubes.backup
import qubes.config
@ -143,12 +144,22 @@ class TestEmitter(qubes.events.Emitter):
def fire_event(self, event, **kwargs):
effects = super(TestEmitter, self).fire_event(event, **kwargs)
self.fired_events[(event, tuple(kwargs.items()))] += 1
ev_kwargs = frozenset(
(key,
frozenset(value.items()) if isinstance(value, dict) else value)
for key, value in kwargs.items()
)
self.fired_events[(event, ev_kwargs)] += 1
return effects
def fire_event_pre(self, event, **kwargs):
effects = super(TestEmitter, self).fire_event_pre(event, **kwargs)
self.fired_events[(event, tuple(kwargs.items()))] += 1
ev_kwargs = frozenset(
(key,
frozenset(value.items()) if isinstance(value, dict) else value)
for key, value in kwargs.items()
)
self.fired_events[(event, ev_kwargs)] += 1
return effects
def expectedFailureIfTemplate(templates):
@ -894,6 +905,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument
'qubes.tests.vm.adminvm',
'qubes.tests.app',
'qubes.tests.tarwriter',
'qubes.tests.mgmt',
'qubes.tests.tools.qvm_device',
'qubes.tests.tools.qvm_firewall',
'qubes.tests.tools.qvm_ls',

1111
qubes/tests/mgmt.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -340,20 +340,6 @@ class TC_90_QubesVM(QubesVMTestsMixin,qubes.tests.QubesTestCase):
vm = self.get_vm()
self._test_generic_bool_property(vm, 'include_in_backups', True)
def test_240_firewall_conf(self):
vm = self.get_vm()
self.assertPropertyDefaultValue(vm, 'firewall_conf', 'firewall.xml')
self.assertPropertyValue(vm, 'firewall_conf', 'other.xml',
'other.xml', 'other.xml')
del vm.firewall_conf
self.assertPropertyDefaultValue(vm, 'firewall_conf',
'firewall.xml')
@unittest.expectedFailure
def test_241_firewall_conf_invalid(self):
vm = self.get_vm()
self.assertPropertyInvalidValue(vm, 'firewall_conf', None)
@qubes.tests.skipUnlessDom0
def test_250_kernel(self):
kernels = os.listdir(os.path.join(

View File

@ -64,6 +64,8 @@ class QubesDaemonProtocol(asyncio.Protocol):
asyncio.ensure_future(self.respond(
src, method, dest, arg, untrusted_payload=untrusted_payload))
return True
@asyncio.coroutine
def respond(self, src, method, dest, arg, *, untrusted_payload):
try:
@ -116,6 +118,7 @@ class QubesDaemonProtocol(asyncio.Protocol):
def send_response(self, content):
self.send_header(0x30)
if content is not None:
self.transport.write(content.encode('utf-8'))
def send_event(self, subject, event, **kwargs):

View File

@ -27,6 +27,7 @@
import datetime
import os
import re
import subprocess
import sys
import xml.parsers.expat
@ -40,6 +41,31 @@ import qubes.log
import qubes.tools.qvm_ls
def validate_name(holder, prop, value):
''' Check if value is syntactically correct VM name '''
if not isinstance(value, str):
raise TypeError('VM name must be string, {!r} found'.format(
type(value).__name__))
if len(value) > 31:
if holder is not None and prop is not None:
raise qubes.exc.QubesPropertyValueError(holder, prop, value,
'{} value must be shorter than 32 characters'.format(
prop.__name__))
else:
raise qubes.exc.QubesValueError(
'VM name must be shorter than 32 characters')
# this regexp does not contain '+'; if it had it, we should specifically
# disallow 'lost+found' #1440
if re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", value) is None:
if holder is not None and prop is not None:
raise qubes.exc.QubesPropertyValueError(holder, prop, value,
'{} value contains illegal characters'.format(prop.__name__))
else:
raise qubes.exc.QubesValueError(
'VM name contains illegal characters')
class Features(dict):
'''Manager of the features.
@ -332,3 +358,11 @@ class VMProperty(qubes.property):
self.vmclass.__name__))
super(VMProperty, self).__set__(instance, vm)
def sanitize(self, *, untrusted_newvalue):
try:
untrusted_vmname = untrusted_newvalue.decode('ascii')
except UnicodeDecodeError:
raise qubes.exc.QubesValueError
validate_name(None, self, untrusted_vmname)
return untrusted_vmname

View File

@ -143,6 +143,9 @@ class AdminVM(qubes.vm.qubesvm.QubesVM):
'''
raise qubes.exc.QubesVMError(self, 'Cannot suspend Dom0 fake domain!')
@property
def icon_path(self):
return None
# def __init__(self, **kwargs):
# super(QubesAdminVm, self).__init__(qid=0, name="dom0", netid=0,

View File

@ -91,8 +91,10 @@ class NetVMMixin(qubes.events.Emitter):
doc='''If this domain can act as network provider (formerly known as
NetVM or ProxyVM)''')
firewall_conf = qubes.property('firewall_conf', type=str,
default='firewall.xml')
@property
def firewall_conf(self):
return 'firewall.xml'
#
# used in networked appvms or proxyvms (netvm is not None)
@ -330,7 +332,7 @@ class NetVMMixin(qubes.events.Emitter):
@qubes.events.handler('property-del:netvm')
def on_property_del_netvm(self, event, prop, oldvalue=None):
def on_property_del_netvm(self, event, name, oldvalue=None):
''' Sets the the NetVM to default NetVM '''
# pylint: disable=unused-argument
# we are changing to default netvm

View File

@ -29,8 +29,8 @@ import base64
import datetime
import os
import os.path
import re
import shutil
import string
import subprocess
import sys
import time
@ -78,18 +78,8 @@ def _setter_qid(self, prop, value):
def _setter_name(self, prop, value):
''' Helper for setting the domain name '''
if not isinstance(value, str):
raise TypeError('{} value must be string, {!r} found'.format(
prop.__name__, type(value).__name__))
if len(value) > 31:
raise ValueError('{} value must be shorter than 32 characters'.format(
prop.__name__))
qubes.vm.validate_name(self, prop, value)
# this regexp does not contain '+'; if it had it, we should specifically
# disallow 'lost+found' #1440
if re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", value) is None:
raise ValueError('{} value contains illegal characters'.format(
prop.__name__))
if self.is_running():
raise qubes.exc.QubesVMNotHaltedError(
self, 'Cannot change name of running VM')
@ -102,7 +92,7 @@ def _setter_name(self, prop, value):
pass
if value in self.app.domains:
raise qubes.exc.QubesValueError(
raise qubes.exc.QubesPropertyValueError(self, prop, value,
'VM named {} alread exists'.format(value))
return value
@ -114,6 +104,9 @@ def _setter_kernel(self, prop, value):
if value is None:
return value
value = str(value)
if '/' in value:
raise qubes.exc.QubesPropertyValueError(self, prop, value,
'Kernel name cannot contain \'/\'')
dirname = os.path.join(
qubes.config.system_path['qubes_base_dir'],
qubes.config.system_path['qubes_kernels_base_dir'],
@ -148,6 +141,16 @@ def _setter_positive_int(self, prop, value):
return value
def _setter_default_user(self, prop, value):
''' Helper for setting default user '''
value = str(value)
# specifically forbid: ':', ' ', ''', '"'
allowed_chars = string.ascii_letters + string.digits + '_-+,.'
if not all(c in allowed_chars for c in value):
raise qubes.exc.QubesPropertyValueError(self, prop, value,
'Username can contain only those characters: ' + allowed_chars)
return value
class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
'''Base functionality of Qubes VM shared between all VMs.
@ -418,10 +421,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
ls_width=2,
doc='FIXME')
pool_name = qubes.property('pool_name',
default='default',
doc='storage pool for this qube devices')
# CORE2: swallowed uses_default_kernel
kernel = qubes.property('kernel', type=str,
setter=_setter_kernel,
@ -448,6 +447,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
default_user = qubes.property('default_user', type=str,
default=(lambda self: self.template.default_user
if hasattr(self, 'template') else 'user'),
setter=_setter_default_user,
ls_width=12,
doc='FIXME')
@ -489,6 +489,13 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
default=(lambda self: self.app.default_dispvm),
doc='Default VM to be used as Disposable VM for service calls.')
updateable = qubes.property('updateable',
default=(lambda self: not hasattr(self, 'template')),
type=bool,
setter=qubes.property.forbidden,
doc='True if this machine may be updated on its own.')
#
# static, class-wide properties
#
@ -576,12 +583,6 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self._qdb_connection = qubesdb.QubesDB(self.name)
return self._qdb_connection
# XXX shouldn't this go elsewhere?
@property
def updateable(self):
'''True if this machine may be updated on its own.'''
return not hasattr(self, 'template')
@property
def dir_path(self):
'''Root directory for files related to this domain'''
@ -707,7 +708,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.app.pools[vm_pool.name] = vm_pool
@qubes.events.handler('property-set:label')
def on_property_set_label(self, event, name, new_label, old_label=None):
def on_property_set_label(self, event, name, newvalue, oldvalue=None):
# pylint: disable=unused-argument
if self.icon_path:
try:
@ -715,10 +716,10 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
except OSError:
pass
if hasattr(os, "symlink"):
os.symlink(new_label.icon_path, self.icon_path)
os.symlink(newvalue.icon_path, self.icon_path)
subprocess.call(['sudo', 'xdg-icon-resource', 'forceupdate'])
else:
shutil.copy(new_label.icon_path, self.icon_path)
shutil.copy(newvalue.icon_path, self.icon_path)
@qubes.events.handler('property-pre-set:name')
def on_property_pre_set_name(self, event, name, newvalue, oldvalue=None):
@ -741,11 +742,11 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
'qubes-vm@{}.service'.format(oldvalue)])
@qubes.events.handler('property-set:name')
def on_property_set_name(self, event, name, new_name, old_name=None):
def on_property_set_name(self, event, name, newvalue, oldvalue=None):
# pylint: disable=unused-argument
self.init_log()
self.storage.rename(old_name, new_name)
self.storage.rename(oldvalue, newvalue)
if self._libvirt_domain is not None:
self.libvirt_domain.undefine()
@ -760,11 +761,11 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
self.autostart = self.autostart
@qubes.events.handler('property-pre-set:autostart')
def on_property_pre_set_autostart(self, event, prop, value,
def on_property_pre_set_autostart(self, event, prop, newvalue,
oldvalue=None):
# pylint: disable=unused-argument
# workaround https://bugzilla.redhat.com/show_bug.cgi?id=1181922
if value:
if newvalue:
retcode = subprocess.call(
["sudo", "ln", "-sf",
"/usr/lib/systemd/system/qubes-vm@.service",

View File

@ -317,6 +317,7 @@ fi
%{python3_sitelib}/qubes/tests/events.py
%{python3_sitelib}/qubes/tests/firewall.py
%{python3_sitelib}/qubes/tests/init.py
%{python3_sitelib}/qubes/tests/mgmt.py
%{python3_sitelib}/qubes/tests/storage.py
%{python3_sitelib}/qubes/tests/storage_file.py
%{python3_sitelib}/qubes/tests/storage_lvm.py

View File

@ -1,10 +1,10 @@
#!/bin/sh
: "${PYTHON:=python}"
: "${PYTHON:=python3}"
: "${TESTPYTHONPATH:=test-packages}"
PYTHONPATH="${TESTPYTHONPATH}:${PYTHONPATH}"
export PYTHONPATH
"${PYTHON}" setup.py egg_info --egg-base "${TESTPYTHONPATH}"
"${PYTHON}" -m qubes.tests.run "$@"
"${PYTHON}" -m coverage run --rcfile=ci/coveragerc -m qubes.tests.run "$@"

View File

@ -13,3 +13,11 @@ class libvirtError(Exception):
def openReadOnly(*args, **kwargs):
raise libvirtError('mock module, always raises')
VIR_DOMAIN_BLOCKED = 2
VIR_DOMAIN_RUNNING = 1
VIR_DOMAIN_PAUSED = 3
VIR_DOMAIN_SHUTDOWN = 4
VIR_DOMAIN_SHUTOFF = 5
VIR_DOMAIN_CRASHED = 6
VIR_DOMAIN_PMSUSPENDED = 7