core3 move: QubesVmCollection
This got split to qubes.Qubes and qubes.VMCollection. From now on, VMCollection is a stupid bag. Some parts went elsewhere.
This commit is contained in:
parent
2835238a87
commit
b623a71d87
625
core/qubes.py
625
core/qubes.py
@ -21,42 +21,6 @@
|
|||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
from __future__ import absolute_import
|
|
||||||
|
|
||||||
import atexit
|
|
||||||
import grp
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import os.path
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
import warnings
|
|
||||||
import xml.parsers.expat
|
|
||||||
|
|
||||||
import lxml.etree
|
|
||||||
|
|
||||||
if os.name == 'posix':
|
|
||||||
import fcntl
|
|
||||||
elif os.name == 'nt':
|
|
||||||
import win32con
|
|
||||||
import win32file
|
|
||||||
import pywintypes
|
|
||||||
else:
|
|
||||||
raise RuntimeError, "Qubes works only on POSIX or WinNT systems"
|
|
||||||
|
|
||||||
# Do not use XenAPI or create/read any VM files
|
|
||||||
# This is for testing only!
|
|
||||||
dry_run = False
|
|
||||||
#dry_run = True
|
|
||||||
|
|
||||||
if not dry_run:
|
|
||||||
import libvirt
|
|
||||||
try:
|
|
||||||
import xen.lowlevel.xs
|
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
qubes_base_dir = "/var/lib/qubes"
|
qubes_base_dir = "/var/lib/qubes"
|
||||||
system_path = {
|
system_path = {
|
||||||
@ -134,595 +98,6 @@ def register_qubes_vm_class(vm_class):
|
|||||||
# other modules
|
# other modules
|
||||||
setattr(sys.modules[__name__], vm_class.__name__, vm_class)
|
setattr(sys.modules[__name__], vm_class.__name__, vm_class)
|
||||||
|
|
||||||
class QubesVmCollection(dict):
|
|
||||||
"""
|
|
||||||
A collection of Qubes VMs indexed by Qubes id (qid)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, store_filename=None):
|
|
||||||
super(QubesVmCollection, self).__init__()
|
|
||||||
self.default_netvm_qid = None
|
|
||||||
self.default_fw_netvm_qid = None
|
|
||||||
self.default_template_qid = None
|
|
||||||
self.default_kernel = None
|
|
||||||
self.updatevm_qid = None
|
|
||||||
self.qubes_store_filename = store_filename
|
|
||||||
if not store_filename:
|
|
||||||
self.qubes_store_filename = system_path["qubes_store_filename"]
|
|
||||||
self.clockvm_qid = None
|
|
||||||
self.qubes_store_file = None
|
|
||||||
|
|
||||||
self.log = logging.getLogger('qubes.qvmc.{:x}'.format(id(self)))
|
|
||||||
self.log.debug('instantiated store_filename={!r}'.format(
|
|
||||||
self.qubes_store_filename))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<{} {!r}>'.format(self.__class__.__name__, list(sorted(self.keys())))
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
self.log.debug('clear()')
|
|
||||||
super(QubesVmCollection, self).clear()
|
|
||||||
|
|
||||||
def values(self):
|
|
||||||
for qid in self.keys():
|
|
||||||
yield self[qid]
|
|
||||||
|
|
||||||
def items(self):
|
|
||||||
for qid in self.keys():
|
|
||||||
yield (qid, self[qid])
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
for qid in sorted(super(QubesVmCollection, self).keys()):
|
|
||||||
yield qid
|
|
||||||
|
|
||||||
keys = __iter__
|
|
||||||
|
|
||||||
def __setitem__(self, key, value):
|
|
||||||
self.log.debug('[{!r}] = {!r}'.format(key, value))
|
|
||||||
if key not in self:
|
|
||||||
return super(QubesVmCollection, self).__setitem__(key, value)
|
|
||||||
else:
|
|
||||||
assert False, "Attempt to add VM with qid that already exists in the collection!"
|
|
||||||
|
|
||||||
def add_new_vm(self, vm_type, **kwargs):
|
|
||||||
self.log.debug('add_new_vm(vm_type={}, **kwargs={!r})'.format(
|
|
||||||
vm_type, kwargs))
|
|
||||||
if vm_type not in QubesVmClasses.keys():
|
|
||||||
raise ValueError("Unknown VM type: %s" % vm_type)
|
|
||||||
|
|
||||||
qid = self.get_new_unused_qid()
|
|
||||||
vm_cls = QubesVmClasses[vm_type]
|
|
||||||
if 'template' in kwargs:
|
|
||||||
if not vm_cls.is_template_compatible(kwargs['template']):
|
|
||||||
raise QubesException("Template not compatible with selected "
|
|
||||||
"VM type")
|
|
||||||
|
|
||||||
vm = vm_cls(qid=qid, collection=self, **kwargs)
|
|
||||||
if not self.verify_new_vm(vm):
|
|
||||||
raise QubesException("Wrong VM description!")
|
|
||||||
self[vm.qid] = vm
|
|
||||||
|
|
||||||
# make first created NetVM the default one
|
|
||||||
if self.default_fw_netvm_qid is None and vm.is_netvm():
|
|
||||||
self.set_default_fw_netvm(vm)
|
|
||||||
|
|
||||||
if self.default_netvm_qid is None and vm.is_proxyvm():
|
|
||||||
self.set_default_netvm(vm)
|
|
||||||
|
|
||||||
# make first created TemplateVM the default one
|
|
||||||
if self.default_template_qid is None and vm.is_template():
|
|
||||||
self.set_default_template(vm)
|
|
||||||
|
|
||||||
# make first created ProxyVM the UpdateVM
|
|
||||||
if self.updatevm_qid is None and vm.is_proxyvm():
|
|
||||||
self.set_updatevm_vm(vm)
|
|
||||||
|
|
||||||
# by default ClockVM is the first NetVM
|
|
||||||
if self.clockvm_qid is None and vm.is_netvm():
|
|
||||||
self.set_clockvm_vm(vm)
|
|
||||||
|
|
||||||
return vm
|
|
||||||
|
|
||||||
def add_new_appvm(self, name, template,
|
|
||||||
dir_path = None, conf_file = None,
|
|
||||||
private_img = None,
|
|
||||||
label = None):
|
|
||||||
|
|
||||||
warnings.warn("Call to deprecated function, use add_new_vm instead",
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
return self.add_new_vm("QubesAppVm", name=name, template=template,
|
|
||||||
dir_path=dir_path, conf_file=conf_file,
|
|
||||||
private_img=private_img,
|
|
||||||
netvm = self.get_default_netvm(),
|
|
||||||
kernel = self.get_default_kernel(),
|
|
||||||
uses_default_kernel = True,
|
|
||||||
label=label)
|
|
||||||
|
|
||||||
def add_new_hvm(self, name, label = None):
|
|
||||||
|
|
||||||
warnings.warn("Call to deprecated function, use add_new_vm instead",
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
return self.add_new_vm("QubesHVm", name=name, label=label)
|
|
||||||
|
|
||||||
def add_new_disposablevm(self, name, template, dispid,
|
|
||||||
label = None, netvm = None):
|
|
||||||
|
|
||||||
warnings.warn("Call to deprecated function, use add_new_vm instead",
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
return self.add_new_vm("QubesDisposableVm", name=name, template=template,
|
|
||||||
netvm = netvm,
|
|
||||||
label=label, dispid=dispid)
|
|
||||||
|
|
||||||
def add_new_templatevm(self, name,
|
|
||||||
dir_path = None, conf_file = None,
|
|
||||||
root_img = None, private_img = None,
|
|
||||||
installed_by_rpm = True):
|
|
||||||
|
|
||||||
warnings.warn("Call to deprecated function, use add_new_vm instead",
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
return self.add_new_vm("QubesTemplateVm", name=name,
|
|
||||||
dir_path=dir_path, conf_file=conf_file,
|
|
||||||
root_img=root_img, private_img=private_img,
|
|
||||||
installed_by_rpm=installed_by_rpm,
|
|
||||||
netvm = self.get_default_netvm(),
|
|
||||||
kernel = self.get_default_kernel(),
|
|
||||||
uses_default_kernel = True)
|
|
||||||
|
|
||||||
def add_new_netvm(self, name, template,
|
|
||||||
dir_path = None, conf_file = None,
|
|
||||||
private_img = None, installed_by_rpm = False,
|
|
||||||
label = None):
|
|
||||||
|
|
||||||
warnings.warn("Call to deprecated function, use add_new_vm instead",
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
return self.add_new_vm("QubesNetVm", name=name, template=template,
|
|
||||||
label=label,
|
|
||||||
private_img=private_img, installed_by_rpm=installed_by_rpm,
|
|
||||||
uses_default_kernel = True,
|
|
||||||
dir_path=dir_path, conf_file=conf_file)
|
|
||||||
|
|
||||||
def add_new_proxyvm(self, name, template,
|
|
||||||
dir_path = None, conf_file = None,
|
|
||||||
private_img = None, installed_by_rpm = False,
|
|
||||||
label = None):
|
|
||||||
|
|
||||||
warnings.warn("Call to deprecated function, use add_new_vm instead",
|
|
||||||
DeprecationWarning, stacklevel=2)
|
|
||||||
return self.add_new_vm("QubesProxyVm", name=name, template=template,
|
|
||||||
label=label,
|
|
||||||
private_img=private_img, installed_by_rpm=installed_by_rpm,
|
|
||||||
dir_path=dir_path, conf_file=conf_file,
|
|
||||||
uses_default_kernel = True,
|
|
||||||
netvm = self.get_default_fw_netvm())
|
|
||||||
|
|
||||||
def set_default_template(self, vm):
|
|
||||||
self.log.debug('set_default_template({!r})'.format(vm))
|
|
||||||
if vm is None:
|
|
||||||
self.default_template_qid = None
|
|
||||||
else:
|
|
||||||
assert vm.is_template(), "VM {0} is not a TemplateVM!".format(vm.name)
|
|
||||||
self.default_template_qid = vm.qid
|
|
||||||
|
|
||||||
def get_default_template(self):
|
|
||||||
if self.default_template_qid is None:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return self[self.default_template_qid]
|
|
||||||
|
|
||||||
def set_default_netvm(self, vm):
|
|
||||||
self.log.debug('set_default_netvm({!r})'.format(vm))
|
|
||||||
if vm is None:
|
|
||||||
self.default_netvm_qid = None
|
|
||||||
else:
|
|
||||||
assert vm.is_netvm(), "VM {0} does not provide network!".format(vm.name)
|
|
||||||
self.default_netvm_qid = vm.qid
|
|
||||||
|
|
||||||
def get_default_netvm(self):
|
|
||||||
if self.default_netvm_qid is None:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return self[self.default_netvm_qid]
|
|
||||||
|
|
||||||
def set_default_kernel(self, kernel):
|
|
||||||
self.log.debug('set_default_kernel({!r})'.format(kernel))
|
|
||||||
assert os.path.exists(
|
|
||||||
os.path.join(system_path["qubes_kernels_base_dir"], kernel)), \
|
|
||||||
"Kerel {0} not installed!".format(kernel)
|
|
||||||
self.default_kernel = kernel
|
|
||||||
|
|
||||||
def get_default_kernel(self):
|
|
||||||
return self.default_kernel
|
|
||||||
|
|
||||||
def set_default_fw_netvm(self, vm):
|
|
||||||
self.log.debug('set_default_fw_netvm({!r})'.format(vm))
|
|
||||||
if vm is None:
|
|
||||||
self.default_fw_netvm_qid = None
|
|
||||||
else:
|
|
||||||
assert vm.is_netvm(), "VM {0} does not provide network!".format(vm.name)
|
|
||||||
self.default_fw_netvm_qid = vm.qid
|
|
||||||
|
|
||||||
def get_default_fw_netvm(self):
|
|
||||||
if self.default_fw_netvm_qid is None:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return self[self.default_fw_netvm_qid]
|
|
||||||
|
|
||||||
def set_updatevm_vm(self, vm):
|
|
||||||
self.log.debug('set_updatevm_vm({!r})'.format(vm))
|
|
||||||
if vm is None:
|
|
||||||
self.updatevm_qid = None
|
|
||||||
else:
|
|
||||||
self.updatevm_qid = vm.qid
|
|
||||||
|
|
||||||
def get_updatevm_vm(self):
|
|
||||||
if self.updatevm_qid is None:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return self[self.updatevm_qid]
|
|
||||||
|
|
||||||
def set_clockvm_vm(self, vm):
|
|
||||||
self.log.debug('set_clockvm({!r})'.format(vm))
|
|
||||||
if vm is None:
|
|
||||||
self.clockvm_qid = None
|
|
||||||
else:
|
|
||||||
self.clockvm_qid = vm.qid
|
|
||||||
|
|
||||||
def get_clockvm_vm(self):
|
|
||||||
if self.clockvm_qid is None:
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return self[self.clockvm_qid]
|
|
||||||
|
|
||||||
def get_vm_by_name(self, name):
|
|
||||||
for vm in self.values():
|
|
||||||
if (vm.name == name):
|
|
||||||
return vm
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_qid_by_name(self, name):
|
|
||||||
vm = self.get_vm_by_name(name)
|
|
||||||
return vm.qid if vm is not None else None
|
|
||||||
|
|
||||||
def get_vms_based_on(self, template_qid):
|
|
||||||
vms = set([vm for vm in self.values()
|
|
||||||
if (vm.template and vm.template.qid == template_qid)])
|
|
||||||
return vms
|
|
||||||
|
|
||||||
def get_vms_connected_to(self, netvm_qid):
|
|
||||||
new_vms = [ netvm_qid ]
|
|
||||||
dependend_vms_qid = []
|
|
||||||
|
|
||||||
# Dependency resolving only makes sense on NetVM (or derivative)
|
|
||||||
if not self[netvm_qid].is_netvm():
|
|
||||||
return set([])
|
|
||||||
|
|
||||||
while len(new_vms) > 0:
|
|
||||||
cur_vm = new_vms.pop()
|
|
||||||
for vm in self[cur_vm].connected_vms.values():
|
|
||||||
if vm.qid not in dependend_vms_qid:
|
|
||||||
dependend_vms_qid.append(vm.qid)
|
|
||||||
if vm.is_netvm():
|
|
||||||
new_vms.append(vm.qid)
|
|
||||||
|
|
||||||
vms = [vm for vm in self.values() if vm.qid in dependend_vms_qid]
|
|
||||||
return vms
|
|
||||||
|
|
||||||
def verify_new_vm(self, new_vm):
|
|
||||||
|
|
||||||
# Verify that qid is unique
|
|
||||||
for vm in self.values():
|
|
||||||
if vm.qid == new_vm.qid:
|
|
||||||
print >> sys.stderr, "ERROR: The qid={0} is already used by VM '{1}'!".\
|
|
||||||
format(vm.qid, vm.name)
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Verify that name is unique
|
|
||||||
for vm in self.values():
|
|
||||||
if vm.name == new_vm.name:
|
|
||||||
print >> sys.stderr, \
|
|
||||||
"ERROR: The name={0} is already used by other VM with qid='{1}'!".\
|
|
||||||
format(vm.name, vm.qid)
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def get_new_unused_qid(self):
|
|
||||||
used_ids = set([vm.qid for vm in self.values()])
|
|
||||||
for id in range (1, qubes_max_qid):
|
|
||||||
if id not in used_ids:
|
|
||||||
return id
|
|
||||||
raise LookupError ("Cannot find unused qid!")
|
|
||||||
|
|
||||||
def get_new_unused_netid(self):
|
|
||||||
used_ids = set([vm.netid for vm in self.values() if vm.is_netvm()])
|
|
||||||
for id in range (1, qubes_max_netid):
|
|
||||||
if id not in used_ids:
|
|
||||||
return id
|
|
||||||
raise LookupError ("Cannot find unused netid!")
|
|
||||||
|
|
||||||
|
|
||||||
def check_if_storage_exists(self):
|
|
||||||
try:
|
|
||||||
f = open (self.qubes_store_filename, 'r')
|
|
||||||
except IOError:
|
|
||||||
return False
|
|
||||||
f.close()
|
|
||||||
return True
|
|
||||||
|
|
||||||
def create_empty_storage(self):
|
|
||||||
self.log.debug('create_empty_storage()')
|
|
||||||
self.qubes_store_file = open (self.qubes_store_filename, 'w')
|
|
||||||
self.clear()
|
|
||||||
self.save()
|
|
||||||
|
|
||||||
def lock_db_for_reading(self):
|
|
||||||
if self.qubes_store_file is not None:
|
|
||||||
raise QubesException("lock already taken")
|
|
||||||
# save() would rename the file over qubes.xml, _then_ release lock,
|
|
||||||
# so we need to ensure that the file for which we've got the lock is
|
|
||||||
# still the right file
|
|
||||||
self.log.debug('lock_db_for_reading()')
|
|
||||||
while True:
|
|
||||||
self.qubes_store_file = open (self.qubes_store_filename, 'r')
|
|
||||||
if os.name == 'posix':
|
|
||||||
fcntl.lockf (self.qubes_store_file, fcntl.LOCK_SH)
|
|
||||||
elif os.name == 'nt':
|
|
||||||
overlapped = pywintypes.OVERLAPPED()
|
|
||||||
win32file.LockFileEx(win32file._get_osfhandle(self.qubes_store_file.fileno()),
|
|
||||||
0, 0, -0x10000, overlapped)
|
|
||||||
if os.fstat(self.qubes_store_file.fileno()) == os.stat(
|
|
||||||
self.qubes_store_filename):
|
|
||||||
break
|
|
||||||
self.qubes_store_file.close()
|
|
||||||
|
|
||||||
def lock_db_for_writing(self):
|
|
||||||
if self.qubes_store_file is not None:
|
|
||||||
raise QubesException("lock already taken")
|
|
||||||
# save() would rename the file over qubes.xml, _then_ release lock,
|
|
||||||
# so we need to ensure that the file for which we've got the lock is
|
|
||||||
# still the right file
|
|
||||||
self.log.debug('lock_db_for_writing()')
|
|
||||||
while True:
|
|
||||||
self.qubes_store_file = open (self.qubes_store_filename, 'r+')
|
|
||||||
if os.name == 'posix':
|
|
||||||
fcntl.lockf (self.qubes_store_file, fcntl.LOCK_EX)
|
|
||||||
elif os.name == 'nt':
|
|
||||||
overlapped = pywintypes.OVERLAPPED()
|
|
||||||
win32file.LockFileEx(win32file._get_osfhandle(self.qubes_store_file.fileno()),
|
|
||||||
win32con.LOCKFILE_EXCLUSIVE_LOCK, 0, -0x10000, overlapped)
|
|
||||||
if os.fstat(self.qubes_store_file.fileno()) == os.stat(
|
|
||||||
self.qubes_store_filename):
|
|
||||||
break
|
|
||||||
self.qubes_store_file.close()
|
|
||||||
|
|
||||||
def unlock_db(self):
|
|
||||||
# intentionally do not call explicit unlock to not unlock the file
|
|
||||||
# before all buffers are flushed
|
|
||||||
self.log.debug('unlock_db()')
|
|
||||||
self.qubes_store_file.close()
|
|
||||||
self.qubes_store_file = None
|
|
||||||
|
|
||||||
def save(self):
|
|
||||||
self.log.debug('save()')
|
|
||||||
root = lxml.etree.Element(
|
|
||||||
"QubesVmCollection",
|
|
||||||
|
|
||||||
default_template=str(self.default_template_qid) \
|
|
||||||
if self.default_template_qid is not None else "None",
|
|
||||||
|
|
||||||
default_netvm=str(self.default_netvm_qid) \
|
|
||||||
if self.default_netvm_qid is not None else "None",
|
|
||||||
|
|
||||||
default_fw_netvm=str(self.default_fw_netvm_qid) \
|
|
||||||
if self.default_fw_netvm_qid is not None else "None",
|
|
||||||
|
|
||||||
updatevm=str(self.updatevm_qid) \
|
|
||||||
if self.updatevm_qid is not None else "None",
|
|
||||||
|
|
||||||
clockvm=str(self.clockvm_qid) \
|
|
||||||
if self.clockvm_qid is not None else "None",
|
|
||||||
|
|
||||||
default_kernel=str(self.default_kernel) \
|
|
||||||
if self.default_kernel is not None else "None",
|
|
||||||
)
|
|
||||||
|
|
||||||
for vm in self.values():
|
|
||||||
element = vm.create_xml_element()
|
|
||||||
if element is not None:
|
|
||||||
root.append(element)
|
|
||||||
tree = lxml.etree.ElementTree(root)
|
|
||||||
|
|
||||||
try:
|
|
||||||
|
|
||||||
new_store_file = tempfile.NamedTemporaryFile(prefix=self.qubes_store_filename, delete=False)
|
|
||||||
if os.name == 'posix':
|
|
||||||
fcntl.lockf (new_store_file, fcntl.LOCK_EX)
|
|
||||||
elif os.name == 'nt':
|
|
||||||
overlapped = pywintypes.OVERLAPPED()
|
|
||||||
win32file.LockFileEx(win32file._get_osfhandle(new_store_file.fileno()),
|
|
||||||
win32con.LOCKFILE_EXCLUSIVE_LOCK, 0, -0x10000, overlapped)
|
|
||||||
tree.write(new_store_file, encoding="UTF-8", pretty_print=True)
|
|
||||||
new_store_file.flush()
|
|
||||||
os.chmod(new_store_file.name, 0660)
|
|
||||||
os.chown(new_store_file.name, -1, grp.getgrnam('qubes').gr_gid)
|
|
||||||
os.rename(new_store_file.name, self.qubes_store_filename)
|
|
||||||
self.qubes_store_file.close()
|
|
||||||
self.qubes_store_file = new_store_file
|
|
||||||
except EnvironmentError as err:
|
|
||||||
print("{0}: export error: {1}".format(
|
|
||||||
os.path.basename(sys.argv[0]), err))
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def set_netvm_dependency(self, element):
|
|
||||||
kwargs = {}
|
|
||||||
attr_list = ("qid", "uses_default_netvm", "netvm_qid")
|
|
||||||
|
|
||||||
for attribute in attr_list:
|
|
||||||
kwargs[attribute] = element.get(attribute)
|
|
||||||
|
|
||||||
vm = self[int(kwargs["qid"])]
|
|
||||||
|
|
||||||
if "uses_default_netvm" not in kwargs:
|
|
||||||
vm.uses_default_netvm = True
|
|
||||||
else:
|
|
||||||
vm.uses_default_netvm = (
|
|
||||||
True if kwargs["uses_default_netvm"] == "True" else False)
|
|
||||||
if vm.uses_default_netvm is True:
|
|
||||||
if vm.is_proxyvm():
|
|
||||||
netvm = self.get_default_fw_netvm()
|
|
||||||
else:
|
|
||||||
netvm = self.get_default_netvm()
|
|
||||||
kwargs.pop("netvm_qid")
|
|
||||||
else:
|
|
||||||
if kwargs["netvm_qid"] == "none" or kwargs["netvm_qid"] is None:
|
|
||||||
netvm = None
|
|
||||||
kwargs.pop("netvm_qid")
|
|
||||||
else:
|
|
||||||
netvm_qid = int(kwargs.pop("netvm_qid"))
|
|
||||||
if netvm_qid not in self:
|
|
||||||
netvm = None
|
|
||||||
else:
|
|
||||||
netvm = self[netvm_qid]
|
|
||||||
|
|
||||||
# directly set internal attr to not call setters...
|
|
||||||
vm._netvm = netvm
|
|
||||||
if netvm:
|
|
||||||
netvm.connected_vms[vm.qid] = vm
|
|
||||||
|
|
||||||
|
|
||||||
def load_globals(self, element):
|
|
||||||
default_template = element.get("default_template")
|
|
||||||
self.default_template_qid = int(default_template) \
|
|
||||||
if default_template.lower() != "none" else None
|
|
||||||
|
|
||||||
default_netvm = element.get("default_netvm")
|
|
||||||
if default_netvm is not None:
|
|
||||||
self.default_netvm_qid = int(default_netvm) \
|
|
||||||
if default_netvm != "None" else None
|
|
||||||
#assert self.default_netvm_qid is not None
|
|
||||||
|
|
||||||
default_fw_netvm = element.get("default_fw_netvm")
|
|
||||||
if default_fw_netvm is not None:
|
|
||||||
self.default_fw_netvm_qid = int(default_fw_netvm) \
|
|
||||||
if default_fw_netvm != "None" else None
|
|
||||||
#assert self.default_netvm_qid is not None
|
|
||||||
|
|
||||||
updatevm = element.get("updatevm")
|
|
||||||
if updatevm is not None:
|
|
||||||
self.updatevm_qid = int(updatevm) \
|
|
||||||
if updatevm != "None" else None
|
|
||||||
#assert self.default_netvm_qid is not None
|
|
||||||
|
|
||||||
clockvm = element.get("clockvm")
|
|
||||||
if clockvm is not None:
|
|
||||||
self.clockvm_qid = int(clockvm) \
|
|
||||||
if clockvm != "None" else None
|
|
||||||
|
|
||||||
self.default_kernel = element.get("default_kernel")
|
|
||||||
|
|
||||||
|
|
||||||
def _check_global(self, attr, default):
|
|
||||||
qid = getattr(self, attr)
|
|
||||||
if qid is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
self[qid]
|
|
||||||
except KeyError:
|
|
||||||
setattr(self, attr, default)
|
|
||||||
|
|
||||||
|
|
||||||
def check_globals(self):
|
|
||||||
'''Ensure that all referenced qids are present in the collection'''
|
|
||||||
self._check_global('default_template_qid', None)
|
|
||||||
self._check_global('default_fw_netvm_qid', None)
|
|
||||||
self._check_global('default_netvm_qid', self.default_fw_netvm_qid)
|
|
||||||
self._check_global('updatevm_qid', self.default_netvm_qid)
|
|
||||||
self._check_global('clockvm_qid', self.default_netvm_qid)
|
|
||||||
|
|
||||||
|
|
||||||
def load(self):
|
|
||||||
self.log.debug('load()')
|
|
||||||
self.clear()
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.qubes_store_file.seek(0)
|
|
||||||
tree = lxml.etree.parse(self.qubes_store_file)
|
|
||||||
except (EnvironmentError,
|
|
||||||
xml.parsers.expat.ExpatError) as err:
|
|
||||||
print("{0}: import error: {1}".format(
|
|
||||||
os.path.basename(sys.argv[0]), err))
|
|
||||||
return False
|
|
||||||
|
|
||||||
self.load_globals(tree.getroot())
|
|
||||||
|
|
||||||
for (vm_class_name, vm_class) in sorted(QubesVmClasses.items(),
|
|
||||||
key=lambda _x: _x[1].load_order):
|
|
||||||
vms_of_class = tree.findall(vm_class_name)
|
|
||||||
# first non-template based, then template based
|
|
||||||
sorted_vms_of_class = sorted(vms_of_class, key= \
|
|
||||||
lambda x: str(x.get('template_qid')).lower() != "none")
|
|
||||||
for element in sorted_vms_of_class:
|
|
||||||
try:
|
|
||||||
vm = vm_class(xml_element=element, collection=self)
|
|
||||||
self[vm.qid] = vm
|
|
||||||
except (ValueError, LookupError) as err:
|
|
||||||
print("{0}: import error ({1}): {2}".format(
|
|
||||||
os.path.basename(sys.argv[0]), vm_class_name, err))
|
|
||||||
raise
|
|
||||||
return False
|
|
||||||
|
|
||||||
# After importing all VMs, set netvm references, in the same order
|
|
||||||
for (vm_class_name, vm_class) in sorted(QubesVmClasses.items(),
|
|
||||||
key=lambda _x: _x[1].load_order):
|
|
||||||
for element in tree.findall(vm_class_name):
|
|
||||||
try:
|
|
||||||
self.set_netvm_dependency(element)
|
|
||||||
except (ValueError, LookupError) as err:
|
|
||||||
print("{0}: import error2 ({}): {}".format(
|
|
||||||
os.path.basename(sys.argv[0]), vm_class_name, err))
|
|
||||||
return False
|
|
||||||
|
|
||||||
self.check_globals()
|
|
||||||
|
|
||||||
# if there was no clockvm entry in qubes.xml, try to determine default:
|
|
||||||
# root of default NetVM chain
|
|
||||||
if tree.getroot().get("clockvm") is None:
|
|
||||||
if self.default_netvm_qid is not None:
|
|
||||||
clockvm = self[self.default_netvm_qid]
|
|
||||||
# Find root of netvm chain
|
|
||||||
while clockvm.netvm is not None:
|
|
||||||
clockvm = clockvm.netvm
|
|
||||||
|
|
||||||
self.clockvm_qid = clockvm.qid
|
|
||||||
|
|
||||||
# Disable ntpd in ClockVM - to not conflict with ntpdate (both are
|
|
||||||
# using 123/udp port)
|
|
||||||
if self.clockvm_qid is not None:
|
|
||||||
self[self.clockvm_qid].services['ntpd'] = False
|
|
||||||
|
|
||||||
# Add dom0 if wasn't present in qubes.xml
|
|
||||||
if not 0 in self.keys():
|
|
||||||
dom0vm = QubesAdminVm (collection=self)
|
|
||||||
self[dom0vm.qid] = dom0vm
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def pop(self, qid):
|
|
||||||
self.log.debug('pop({})'.format(qid))
|
|
||||||
|
|
||||||
if self.default_netvm_qid == qid:
|
|
||||||
self.default_netvm_qid = None
|
|
||||||
if self.default_fw_netvm_qid == qid:
|
|
||||||
self.default_fw_netvm_qid = None
|
|
||||||
if self.clockvm_qid == qid:
|
|
||||||
self.clockvm_qid = None
|
|
||||||
if self.updatevm_qid == qid:
|
|
||||||
self.updatevm_qid = None
|
|
||||||
if self.default_template_qid == qid:
|
|
||||||
self.default_template_qid = None
|
|
||||||
|
|
||||||
return super(QubesVmCollection, self).pop(qid)
|
|
||||||
|
|
||||||
class QubesDaemonPidfile(object):
|
class QubesDaemonPidfile(object):
|
||||||
def __init__(self, name):
|
def __init__(self, name):
|
||||||
|
@ -1,7 +1,12 @@
|
|||||||
#!/usr/bin/python2 -O
|
#!/usr/bin/python2 -O
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Qubes OS
|
Qubes OS
|
||||||
|
|
||||||
|
:copyright: © 2010-2014 Invisible Things Lab
|
||||||
'''
|
'''
|
||||||
|
|
||||||
__author__ = 'Invisible Things Lab'
|
__author__ = 'Invisible Things Lab'
|
||||||
@ -9,8 +14,42 @@ __license__ = 'GPLv2 or later'
|
|||||||
__version__ = 'R3'
|
__version__ = 'R3'
|
||||||
|
|
||||||
import ast
|
import ast
|
||||||
|
import atexit
|
||||||
|
import collections
|
||||||
|
import grp
|
||||||
|
import os
|
||||||
|
import os.path
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
import __builtin__
|
||||||
|
|
||||||
|
import lxml.etree
|
||||||
|
import xml.parsers.expat
|
||||||
|
|
||||||
|
if os.name == 'posix':
|
||||||
|
import fcntl
|
||||||
|
elif os.name == 'nt':
|
||||||
|
import win32con
|
||||||
|
import win32file
|
||||||
|
import pywintypes
|
||||||
|
else:
|
||||||
|
raise RuntimeError, "Qubes works only on POSIX or WinNT systems"
|
||||||
|
|
||||||
|
import libvirt
|
||||||
|
try:
|
||||||
|
import xen.lowlevel.xs
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
#: FIXME documentation
|
||||||
|
MAX_QID = 253
|
||||||
|
|
||||||
|
#: FIXME documentation
|
||||||
|
MAX_NETID = 253
|
||||||
|
|
||||||
import qubes._pluginloader
|
|
||||||
|
|
||||||
class QubesException(Exception):
|
class QubesException(Exception):
|
||||||
'''Exception that can be shown to the user'''
|
'''Exception that can be shown to the user'''
|
||||||
@ -24,7 +63,7 @@ class QubesVMMConnection(object):
|
|||||||
self._xc = None
|
self._xc = None
|
||||||
self._offline_mode = False
|
self._offline_mode = False
|
||||||
|
|
||||||
@property
|
@__builtin__.property
|
||||||
def offline_mode(self):
|
def offline_mode(self):
|
||||||
'''Check or enable offline mode (do not actually connect to vmm)'''
|
'''Check or enable offline mode (do not actually connect to vmm)'''
|
||||||
return self._offline_mode
|
return self._offline_mode
|
||||||
@ -58,13 +97,13 @@ class QubesVMMConnection(object):
|
|||||||
libvirt.registerErrorHandler(self._libvirt_error_handler, None)
|
libvirt.registerErrorHandler(self._libvirt_error_handler, None)
|
||||||
atexit.register(self._libvirt_conn.close)
|
atexit.register(self._libvirt_conn.close)
|
||||||
|
|
||||||
@property
|
@__builtin__.property
|
||||||
def libvirt_conn(self):
|
def libvirt_conn(self):
|
||||||
'''Connection to libvirt'''
|
'''Connection to libvirt'''
|
||||||
self.init_vmm_connection()
|
self.init_vmm_connection()
|
||||||
return self._libvirt_conn
|
return self._libvirt_conn
|
||||||
|
|
||||||
@property
|
@__builtin__.property
|
||||||
def xs(self):
|
def xs(self):
|
||||||
'''Connection to Xen Store
|
'''Connection to Xen Store
|
||||||
|
|
||||||
@ -90,12 +129,12 @@ class QubesHost(object):
|
|||||||
# print "QubesHost: free_mem = {0}".format (self.get_free_xen_memory())
|
# print "QubesHost: free_mem = {0}".format (self.get_free_xen_memory())
|
||||||
# print "QubesHost: total_cpus = {0}".format (self.xen_no_cpus)
|
# print "QubesHost: total_cpus = {0}".format (self.xen_no_cpus)
|
||||||
|
|
||||||
@property
|
@__builtin__.property
|
||||||
def memory_total(self):
|
def memory_total(self):
|
||||||
'''Total memory, in bytes'''
|
'''Total memory, in bytes'''
|
||||||
return self._total_mem
|
return self._total_mem
|
||||||
|
|
||||||
@property
|
@__builtin__.property
|
||||||
def no_cpus(self):
|
def no_cpus(self):
|
||||||
'''Noumber of CPUs'''
|
'''Noumber of CPUs'''
|
||||||
return self._no_cpus
|
return self._no_cpus
|
||||||
@ -141,7 +180,7 @@ class QubesHost(object):
|
|||||||
return (current_time, current)
|
return (current_time, current)
|
||||||
|
|
||||||
|
|
||||||
class QubesVmLabel(object):
|
class Label(object):
|
||||||
'''Label definition for virtual machines
|
'''Label definition for virtual machines
|
||||||
|
|
||||||
Label specifies colour of the padlock displayed next to VM's name.
|
Label specifies colour of the padlock displayed next to VM's name.
|
||||||
@ -151,10 +190,9 @@ class QubesVmLabel(object):
|
|||||||
:param int index: numeric identificator of label
|
:param int index: numeric identificator of label
|
||||||
:param str color: colour specification as in HTML (``#abcdef``)
|
:param str color: colour specification as in HTML (``#abcdef``)
|
||||||
:param str name: label's name like "red" or "green"
|
:param str name: label's name like "red" or "green"
|
||||||
:param bool dispvm: :py:obj:`True` if this is :py:class:`qubes.vm.dispvm.DispVM` label
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
def __init__(self, index, color, name, dispvm=False):
|
|
||||||
|
def __init__(self, index, color, name):
|
||||||
#: numeric identificator of label
|
#: numeric identificator of label
|
||||||
self.index = index
|
self.index = index
|
||||||
|
|
||||||
@ -164,65 +202,683 @@ class QubesVmLabel(object):
|
|||||||
#: label's name like "red" or "green"
|
#: label's name like "red" or "green"
|
||||||
self.name = name
|
self.name = name
|
||||||
|
|
||||||
#: :py:obj:`True` if this is :py:class:`qubes.vm.dispvm.DispVM` label
|
#: freedesktop icon name, suitable for use in :py:meth:`PyQt4.QtGui.QIcon.fromTheme`
|
||||||
self.dispvm = dispvm
|
self.icon = 'appvm-' + name
|
||||||
|
|
||||||
#: freedesktop icon name, suitable for use in :py:meth:`PyQt4.QtGui.QIcon.fromTheme`
|
#: freedesktop icon name, suitable for use in :py:meth:`PyQt4.QtGui.QIcon.fromTheme`
|
||||||
self.icon = '{}-{}'.format(('dispvm' if dispvm else 'appvm'), name)
|
#: on DispVMs
|
||||||
|
self.icon_dispvm = 'dispvm-' + name
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fromxml(cls, xml):
|
def fromxml(cls, xml):
|
||||||
'''Create label definition from XML node
|
'''Create label definition from XML node
|
||||||
|
|
||||||
:param :py:class:`lxml.etree._Element` xml: XML node reference
|
:param lxml.etree._Element xml: XML node reference
|
||||||
:rtype: :py:class:`qubes.QubesVmLabel`
|
:rtype: :py:class:`qubes.Label`
|
||||||
'''
|
'''
|
||||||
|
|
||||||
index = int(xml.get('id').split('-', 1)[1])
|
index = int(xml.get('id').split('-', 1)[1])
|
||||||
color = xml.get('color')
|
color = xml.get('color')
|
||||||
name = xml.text
|
name = xml.text
|
||||||
dispvm = ast.literal_eval(xml.get('dispvm', 'False'))
|
|
||||||
|
|
||||||
return cls(index, color, name, dispvm)
|
return cls(index, color, name)
|
||||||
|
|
||||||
|
|
||||||
|
def __xml__(self):
|
||||||
|
element = lxml.etree.Element('label', id='label-' + self.index, color=self.color)
|
||||||
|
element.text = self.name
|
||||||
|
return element
|
||||||
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '{}({!r}, {!r}, {!r}, dispvm={!r})'.format(
|
return '{}({!r}, {!r}, {!r}, dispvm={!r})'.format(
|
||||||
self.__class__.__name__,
|
self.__class__.__name__,
|
||||||
self.index,
|
self.index,
|
||||||
self.color,
|
self.color,
|
||||||
self.name,
|
self.name)
|
||||||
self.dispvm)
|
|
||||||
|
|
||||||
# self.icon_path is obsolete
|
|
||||||
# use QIcon.fromTheme(label.icon) where applicable
|
@__builtin__.property
|
||||||
@property
|
|
||||||
def icon_path(self):
|
def icon_path(self):
|
||||||
'''Icon path
|
'''Icon path
|
||||||
|
|
||||||
DEPRECATED --- use :py:meth:`PyQt4.QtGui.QIcon.fromTheme` and :py:attr:`QubesVmLabel.icon`'''
|
.. deprecated:: 2.0
|
||||||
|
use :py:meth:`PyQt4.QtGui.QIcon.fromTheme` and :py:attr:`icon`
|
||||||
|
'''
|
||||||
return os.path.join(system_path['qubes_icon_dir'], self.icon) + ".png"
|
return os.path.join(system_path['qubes_icon_dir'], self.icon) + ".png"
|
||||||
|
|
||||||
#: Globally defined labels
|
|
||||||
QubesVmLabels = {
|
|
||||||
"red": QubesVmLabel(1, "0xcc0000", "red" ),
|
|
||||||
"orange": QubesVmLabel(2, "0xf57900", "orange" ),
|
|
||||||
"yellow": QubesVmLabel(3, "0xedd400", "yellow" ),
|
|
||||||
"green": QubesVmLabel(4, "0x73d216", "green" ),
|
|
||||||
"gray": QubesVmLabel(5, "0x555753", "gray" ),
|
|
||||||
"blue": QubesVmLabel(6, "0x3465a4", "blue" ),
|
|
||||||
"purple": QubesVmLabel(7, "0x75507b", "purple" ),
|
|
||||||
"black": QubesVmLabel(8, "0x000000", "black" ),
|
|
||||||
}
|
|
||||||
|
|
||||||
#: Globally defined labels for :py:class:`qubes.vm.dispvm.DispVM` s
|
@__builtin__.property
|
||||||
QubesDispVmLabels = {
|
def icon_path_dispvm(self):
|
||||||
"red": QubesVmLabel(1, "0xcc0000", "red", dispvm=True),
|
'''Icon path
|
||||||
"orange": QubesVmLabel(2, "0xf57900", "orange", dispvm=True),
|
|
||||||
"yellow": QubesVmLabel(3, "0xedd400", "yellow", dispvm=True),
|
|
||||||
"green": QubesVmLabel(4, "0x73d216", "green", dispvm=True),
|
|
||||||
"gray": QubesVmLabel(5, "0x555753", "gray", dispvm=True),
|
|
||||||
"blue": QubesVmLabel(6, "0x3465a4", "blue", dispvm=True),
|
|
||||||
"purple": QubesVmLabel(7, "0x75507b", "purple", dispvm=True),
|
|
||||||
"black": QubesVmLabel(8, "0x000000", "black", dispvm=True),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
.. deprecated:: 2.0
|
||||||
|
use :py:meth:`PyQt4.QtGui.QIcon.fromTheme` and :py:attr:`icon_dispvm`
|
||||||
|
'''
|
||||||
|
return os.path.join(system_path['qubes_icon_dir'], self.icon_dispvm) + ".png"
|
||||||
|
|
||||||
|
|
||||||
|
class VMCollection(object):
|
||||||
|
'''A collection of Qubes VMs
|
||||||
|
|
||||||
|
VMCollection supports ``in`` operator. You may test for ``qid``, ``name``
|
||||||
|
and whole VM object's presence.
|
||||||
|
|
||||||
|
Iterating over VMCollection will yield machine objects.
|
||||||
|
'''
|
||||||
|
|
||||||
|
def __init__(self, app):
|
||||||
|
self.app = app
|
||||||
|
self._dict = dict()
|
||||||
|
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '<{} {!r}>'.format(self.__class__.__name__, list(sorted(self.keys())))
|
||||||
|
|
||||||
|
|
||||||
|
def items(self):
|
||||||
|
'''Iterate over ``(qid, vm)`` pairs'''
|
||||||
|
for qid in self.qids():
|
||||||
|
yield (qid, self[qid])
|
||||||
|
|
||||||
|
|
||||||
|
def qids(self):
|
||||||
|
'''Iterate over all qids
|
||||||
|
|
||||||
|
qids are sorted by numerical order.
|
||||||
|
'''
|
||||||
|
|
||||||
|
return iter(sorted(self._dict.keys()))
|
||||||
|
|
||||||
|
keys = qids
|
||||||
|
|
||||||
|
|
||||||
|
def names(self):
|
||||||
|
'''Iterate over all names
|
||||||
|
|
||||||
|
names are sorted by lexical order.
|
||||||
|
'''
|
||||||
|
|
||||||
|
return iter(sorted(vm.name for vm in self._dict.values()))
|
||||||
|
|
||||||
|
|
||||||
|
def vms(self):
|
||||||
|
'''Iterate over all machines
|
||||||
|
|
||||||
|
vms are sorted by qid.
|
||||||
|
'''
|
||||||
|
|
||||||
|
return iter(sorted(self._dict.values()))
|
||||||
|
|
||||||
|
__iter__ = vms
|
||||||
|
values = vms
|
||||||
|
|
||||||
|
|
||||||
|
def add(self, value):
|
||||||
|
'''Add VM to collection
|
||||||
|
|
||||||
|
:param qubes.vm.BaseVM value: VM to add
|
||||||
|
:raises TypeError: when value is of wrong type
|
||||||
|
:raises ValueError: when there is already VM which has equal ``qid``
|
||||||
|
'''
|
||||||
|
|
||||||
|
# XXX this violates duck typing, should we do it?
|
||||||
|
if not isinstance(value, qubes.vm.BaseVM):
|
||||||
|
raise TypeError('{} holds only BaseVM instances'.format(self.__class__.__name__))
|
||||||
|
|
||||||
|
if value.qid in self:
|
||||||
|
raise ValueError('This collection already holds VM that has qid={!r} (!r)'.format(
|
||||||
|
value.qid, self[value.qid]))
|
||||||
|
if value.name in self:
|
||||||
|
raise ValueError('This collection already holds VM that has name={!r} (!r)'.format(
|
||||||
|
value.name, self[value.name]))
|
||||||
|
|
||||||
|
self._dict[value.qid] = value
|
||||||
|
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
if isinstance(key, int):
|
||||||
|
return self._dict[key]
|
||||||
|
|
||||||
|
if isinstance(key, basestring):
|
||||||
|
for vm in self:
|
||||||
|
if (vm.name == key):
|
||||||
|
return vm
|
||||||
|
raise KeyError(key)
|
||||||
|
|
||||||
|
if isinstance(key, qubes.vm.BaseVM):
|
||||||
|
if key in self:
|
||||||
|
return key
|
||||||
|
raise KeyError(key)
|
||||||
|
|
||||||
|
raise KeyError(key)
|
||||||
|
|
||||||
|
|
||||||
|
def __delitem__(self, key):
|
||||||
|
del self._dict[self[key].qid]
|
||||||
|
|
||||||
|
|
||||||
|
def __contains__(self, key):
|
||||||
|
return any((key == vm or key == vm.qid or key == vm.name) for vm in self)
|
||||||
|
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self._dict)
|
||||||
|
|
||||||
|
|
||||||
|
def get_vms_based_on(self, template):
|
||||||
|
template = self[template]
|
||||||
|
return set(vm for vm in self if vm.template == template)
|
||||||
|
|
||||||
|
|
||||||
|
def get_vms_connected_to(self, netvm):
|
||||||
|
new_vms = set([netvm])
|
||||||
|
dependend_vms = set()
|
||||||
|
|
||||||
|
# Dependency resolving only makes sense on NetVM (or derivative)
|
||||||
|
# if not self[netvm_qid].is_netvm():
|
||||||
|
# return set([])
|
||||||
|
|
||||||
|
while len(new_vms) > 0:
|
||||||
|
cur_vm = new_vms.pop()
|
||||||
|
for vm in cur_vm.connected_vms.values():
|
||||||
|
if vm in dependend_vms:
|
||||||
|
continue
|
||||||
|
dependend_vms.add(vm.qid)
|
||||||
|
# if vm.is_netvm():
|
||||||
|
new_vms.append(vm.qid)
|
||||||
|
|
||||||
|
return dependent_vms
|
||||||
|
|
||||||
|
|
||||||
|
# XXX with Qubes Admin Api this will probably lead to race condition
|
||||||
|
# whole process of creating and adding should be synchronised
|
||||||
|
def get_new_unused_qid(self):
|
||||||
|
used_ids = set(self.qids())
|
||||||
|
for i in range(1, MAX_QID):
|
||||||
|
if i not in used_ids:
|
||||||
|
return i
|
||||||
|
raise LookupError("Cannot find unused qid!")
|
||||||
|
|
||||||
|
|
||||||
|
def get_new_unused_netid(self):
|
||||||
|
used_ids = set([vm.netid for vm in self]) # if vm.is_netvm()])
|
||||||
|
for i in range(1, MAX_NETID):
|
||||||
|
if i not in used_ids:
|
||||||
|
return i
|
||||||
|
raise LookupError("Cannot find unused netid!")
|
||||||
|
|
||||||
|
|
||||||
|
class property(object):
|
||||||
|
'''Qubes property.
|
||||||
|
|
||||||
|
This class holds one property that can be saved to and loaded from
|
||||||
|
:file:`qubes.xml`. It is used for both global and per-VM properties.
|
||||||
|
|
||||||
|
:param str name: name of the property
|
||||||
|
:param collections.Callable setter: if not :py:obj:`None`, this is used to initialise value; first parameter to the function is holder instance and the second is value; this is called before ``type``
|
||||||
|
:param type type: if not :py:obj:`None`, value is coerced to this type
|
||||||
|
:param object default: default value
|
||||||
|
:param int load_stage: stage when property should be loaded (see :py:class:`Qubes` for description of stages)
|
||||||
|
:param int order: order of evaluation (bigger order values are later)
|
||||||
|
:param str doc: docstring; you may use RST markup
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
def __init__(self, name, setter=None, type=None, default=None,
|
||||||
|
load_stage=2, order=0, save_via_ref=False, doc=None):
|
||||||
|
self.__name__ = name
|
||||||
|
self._setter = setter
|
||||||
|
self._type = type
|
||||||
|
self._default = default
|
||||||
|
self.order = order
|
||||||
|
self.load_stage = load_stage
|
||||||
|
self.save_via_ref = save_via_ref
|
||||||
|
self.__doc__ = doc
|
||||||
|
self._attr_name = '_qubesprop_' + name
|
||||||
|
|
||||||
|
|
||||||
|
def __get__(self, instance, owner):
|
||||||
|
# sys.stderr.write('{!r}.__get__({}, {!r})\n'.format(self.__name__, hex(id(instance)), owner))
|
||||||
|
if instance is None:
|
||||||
|
return self
|
||||||
|
|
||||||
|
# XXX this violates duck typing, shall we keep it?
|
||||||
|
if not isinstance(instance, PropertyHolder):
|
||||||
|
raise AttributeError(
|
||||||
|
'qubes.property should be used on qubes.PropertyHolder instances only')
|
||||||
|
|
||||||
|
# sys.stderr.write(' __get__ try\n')
|
||||||
|
try:
|
||||||
|
return getattr(instance, self._attr_name)
|
||||||
|
|
||||||
|
except AttributeError:
|
||||||
|
# sys.stderr.write(' __get__ except\n')
|
||||||
|
if self._default is None:
|
||||||
|
raise AttributeError('property {!r} not set'.format(self.__name__))
|
||||||
|
elif isinstance(self._default, collections.Callable):
|
||||||
|
return self._default(instance)
|
||||||
|
else:
|
||||||
|
return self._default
|
||||||
|
|
||||||
|
|
||||||
|
def __set__(self, instance, value):
|
||||||
|
if self._setter is not None:
|
||||||
|
value = self._setter(instance, self, value)
|
||||||
|
if self._type is not None:
|
||||||
|
value = self._type(value)
|
||||||
|
instance._init_property(self, value)
|
||||||
|
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '<{} object at {:#x} name={!r} default={!r}>'.format(
|
||||||
|
self.__class__.__name__, id(self), self.__name__, self._default)
|
||||||
|
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return hash(self.__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return self.__name__ == other.__name__
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# some setters provided
|
||||||
|
#
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def forbidden(self, prop, value):
|
||||||
|
'''Property setter that forbids loading a property
|
||||||
|
|
||||||
|
This is used to effectively disable property in classes which inherit
|
||||||
|
unwanted property. When someone attempts to load such a property, it
|
||||||
|
|
||||||
|
:throws AttributeError: always
|
||||||
|
'''
|
||||||
|
|
||||||
|
raise AttributeError('setting {} property on {} instance is forbidden'.format(
|
||||||
|
prop.__name__, self.__class__.__name__))
|
||||||
|
|
||||||
|
|
||||||
|
class PropertyHolder(object):
|
||||||
|
'''Abstract class for holding :py:class:`qubes.property`'''
|
||||||
|
|
||||||
|
def __init__(self, xml, *args, **kwargs):
|
||||||
|
super(PropertyHolder, self).__init__(*args, **kwargs)
|
||||||
|
self.xml = xml
|
||||||
|
|
||||||
|
|
||||||
|
def get_props_list(self, load_stage=None):
|
||||||
|
'''List all properties attached to this VM
|
||||||
|
|
||||||
|
:param load_stage: Filter by load stage
|
||||||
|
:type load_stage: :py:func:`int` or :py:obj:`None`
|
||||||
|
'''
|
||||||
|
|
||||||
|
# sys.stderr.write('{!r}.get_props_list(load_stage={})\n'.format('self', load_stage))
|
||||||
|
props = set()
|
||||||
|
for class_ in self.__class__.__mro__:
|
||||||
|
props.update(prop for prop in class_.__dict__.values()
|
||||||
|
if isinstance(prop, property))
|
||||||
|
if load_stage is not None:
|
||||||
|
props = set(prop for prop in props
|
||||||
|
if prop.load_stage == load_stage)
|
||||||
|
# sys.stderr.write(' props={!r}\n'.format(props))
|
||||||
|
return sorted(props, key=lambda prop: (prop.load_stage, prop.order, prop.__name__))
|
||||||
|
|
||||||
|
|
||||||
|
def _init_property(self, prop, value):
|
||||||
|
'''Initialise property to a given value, without side effects.
|
||||||
|
|
||||||
|
:param qubes.property prop: property object of particular interest
|
||||||
|
:param value: value
|
||||||
|
'''
|
||||||
|
|
||||||
|
setattr(self, prop._attr_name, value)
|
||||||
|
|
||||||
|
|
||||||
|
def load_properties(self, load_stage=None):
|
||||||
|
'''Load properties from immediate children of XML node.
|
||||||
|
|
||||||
|
:param lxml.etree._Element xml: XML node reference
|
||||||
|
'''
|
||||||
|
|
||||||
|
# sys.stderr.write('<{}>.load_properties(load_stage={}) xml={!r}\n'.format(hex(id(self)), load_stage, self.xml))
|
||||||
|
|
||||||
|
all_names = set(prop.__name__ for prop in self.get_props_list(load_stage))
|
||||||
|
# sys.stderr.write(' all_names={!r}\n'.format(all_names))
|
||||||
|
for node in self.xml.xpath('./properties/property'):
|
||||||
|
name = node.get('name')
|
||||||
|
value = node.get('ref') or node.text
|
||||||
|
|
||||||
|
# sys.stderr.write(' load_properties name={!r} value={!r}\n'.format(name, value))
|
||||||
|
if not name in all_names:
|
||||||
|
raise AttributeError(
|
||||||
|
'No property {!r} found in {!r}'.format(
|
||||||
|
name, self.__class__))
|
||||||
|
|
||||||
|
setattr(self, name, value)
|
||||||
|
# sys.stderr.write(' load_properties return\n')
|
||||||
|
|
||||||
|
|
||||||
|
def save_properties(self, with_defaults=False):
|
||||||
|
'''Iterator that yields XML nodes representing set properties.
|
||||||
|
|
||||||
|
:param bool with_defaults: If :py:obj:`True`, then it also includes properties which were not set explicite, but have default values filled.
|
||||||
|
'''
|
||||||
|
|
||||||
|
# sys.stderr.write('{!r}.save_properties(with_defaults={})\n'.format(self, with_defaults))
|
||||||
|
|
||||||
|
properties = lxml.etree.Element('properties')
|
||||||
|
|
||||||
|
for prop in self.get_props_list():
|
||||||
|
try:
|
||||||
|
value = str(getattr(self, (prop.__name__ if with_defaults else prop._attr_name)))
|
||||||
|
except AttributeError, e:
|
||||||
|
# sys.stderr.write('AttributeError: {!s}\n'.format(e))
|
||||||
|
continue
|
||||||
|
|
||||||
|
element = lxml.etree.Element('property', name=prop.__name__)
|
||||||
|
if prop.save_via_ref:
|
||||||
|
element.set('ref', value)
|
||||||
|
else:
|
||||||
|
element.text = value
|
||||||
|
properties.append(element)
|
||||||
|
|
||||||
|
return properties
|
||||||
|
|
||||||
|
|
||||||
|
import qubes.vm.qubesvm
|
||||||
|
import qubes.vm.templatevm
|
||||||
|
|
||||||
|
|
||||||
|
class VMProperty(property):
|
||||||
|
'''Property that is referring to a VM
|
||||||
|
|
||||||
|
:param type vmclass: class that returned VM is supposed to be instance of
|
||||||
|
|
||||||
|
and all supported by :py:class:`property` with the exception of ``type`` and ``setter``
|
||||||
|
'''
|
||||||
|
|
||||||
|
def __init__(self, name, vmclass=qubes.vm.BaseVM, **kwargs):
|
||||||
|
if 'type' in kwargs:
|
||||||
|
raise TypeError("'type' keyword parameter is unsupported in {}".format(
|
||||||
|
self.__class__.__name__))
|
||||||
|
if 'setter' in kwargs:
|
||||||
|
raise TypeError("'setter' keyword parameter is unsupported in {}".format(
|
||||||
|
self.__class__.__name__))
|
||||||
|
super(VMProperty, self).__init__(name, **kwargs)
|
||||||
|
self.vmclass = vmclass
|
||||||
|
|
||||||
|
|
||||||
|
def __set__(self, instance, value):
|
||||||
|
vm = instance.app.domains[value]
|
||||||
|
if not isinstance(vm, self.vmclass):
|
||||||
|
raise TypeError('wrong VM class: domains[{!r}] if of type {!s} and not {!s}'.format(
|
||||||
|
value, vm.__class__.__name__, self.vmclass.__name__))
|
||||||
|
|
||||||
|
super(VMProperty, self).__set__(self, instance, vm)
|
||||||
|
|
||||||
|
|
||||||
|
class Qubes(PropertyHolder):
|
||||||
|
'''Main Qubes application
|
||||||
|
|
||||||
|
:param str store: path to ``qubes.xml``
|
||||||
|
|
||||||
|
The store is loaded in stages.
|
||||||
|
|
||||||
|
In the first stage there are loaded some basic features from store
|
||||||
|
(currently labels).
|
||||||
|
|
||||||
|
In the second stage stubs for all VMs are loaded. They are filled with
|
||||||
|
their basic properties, like ``qid`` and ``name``.
|
||||||
|
|
||||||
|
In the third stage all global properties are loaded. They often reference
|
||||||
|
VMs, like default netvm, so they should be filled after loading VMs.
|
||||||
|
|
||||||
|
In the fourth stage all remaining VM properties are loaded. They also need
|
||||||
|
all VMs loaded, because they represent dependencies between VMs like
|
||||||
|
aforementioned netvm.
|
||||||
|
|
||||||
|
In the fifth stage there are some fixups to ensure sane system operation.
|
||||||
|
'''
|
||||||
|
|
||||||
|
default_netvm = VMProperty('default_netvm', load_stage=3,
|
||||||
|
doc='Default NetVM for new AppVMs')
|
||||||
|
default_fw_netvm = VMProperty('default_fw_netvm', load_stage=3,
|
||||||
|
doc='Default NetVM for new ProxyVMs')
|
||||||
|
default_template = VMProperty('default_template', load_stage=3,
|
||||||
|
vmclass=qubes.vm.templatevm.TemplateVM,
|
||||||
|
doc='Default template for new AppVMs')
|
||||||
|
updatevm = VMProperty('updatevm', load_stage=3,
|
||||||
|
doc='Which VM to use as ``yum`` proxy for updating AdminVM and TemplateVMs')
|
||||||
|
clockvm = VMProperty('clockvm', load_stage=3,
|
||||||
|
doc='Which VM to use as NTP proxy for updating AdminVM')
|
||||||
|
default_kernel = property('default_kernel', load_stage=3,
|
||||||
|
doc='Which kernel to use when not overriden in VM')
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, store='/var/lib/qubes/qubes.xml'):
|
||||||
|
#: collection of all VMs managed by this Qubes instance
|
||||||
|
self.domains = VMCollection()
|
||||||
|
|
||||||
|
#: collection of all available labels for VMs
|
||||||
|
self.labels = {}
|
||||||
|
|
||||||
|
self._store = store
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.load()
|
||||||
|
except IOError:
|
||||||
|
self._init()
|
||||||
|
|
||||||
|
super(PropertyHolder, self).__init__(xml=lxml.etree.parse(self.qubes_store_file))
|
||||||
|
|
||||||
|
|
||||||
|
def _open_store(self):
|
||||||
|
if hasattr(self, '_storefd'):
|
||||||
|
return
|
||||||
|
|
||||||
|
self._storefd = open(self._store, 'r+')
|
||||||
|
|
||||||
|
if os.name == 'posix':
|
||||||
|
fcntl.lockf (self.qubes_store_file, fcntl.LOCK_EX)
|
||||||
|
elif os.name == 'nt':
|
||||||
|
overlapped = pywintypes.OVERLAPPED()
|
||||||
|
win32file.LockFileEx(win32file._get_osfhandle(self.qubes_store_file.fileno()),
|
||||||
|
win32con.LOCKFILE_EXCLUSIVE_LOCK, 0, -0x10000, overlapped)
|
||||||
|
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
'''
|
||||||
|
:throws EnvironmentError: failure on parsing store
|
||||||
|
:throws xml.parsers.expat.ExpatError: failure on parsing store
|
||||||
|
'''
|
||||||
|
self._open_store()
|
||||||
|
|
||||||
|
# stage 1: load labels
|
||||||
|
for node in self._xml.xpath('./labels/label'):
|
||||||
|
label = Label.fromxml(node)
|
||||||
|
self.labels[label.id] = label
|
||||||
|
|
||||||
|
# stage 2: load VMs
|
||||||
|
for node in self._xml.xpath('./domains/domain'):
|
||||||
|
cls = qubes.vm.load(node.get("class"))
|
||||||
|
vm = cls.fromxml(self, node)
|
||||||
|
self.domains.add(vm)
|
||||||
|
|
||||||
|
if not 0 in self.domains:
|
||||||
|
self.domains.add(qubes.vm.adminvm.AdminVM(self))
|
||||||
|
|
||||||
|
# stage 3: load global properties
|
||||||
|
self.load_properties(self.xml, load_stage=3)
|
||||||
|
|
||||||
|
# stage 4: fill all remaining VM properties
|
||||||
|
for vm in self.domains:
|
||||||
|
vm.load_properties(None, load_stage=4)
|
||||||
|
|
||||||
|
# stage 5: misc fixups
|
||||||
|
|
||||||
|
# if we have no default netvm, make first one the default
|
||||||
|
if not hasattr(self, 'default_netvm'):
|
||||||
|
for vm in self.domains:
|
||||||
|
if hasattr(vm, 'provides_network') and hasattr(vm, 'netvm'):
|
||||||
|
self.default_netvm = vm
|
||||||
|
break
|
||||||
|
|
||||||
|
if not hasattr(self, 'default_fw_netvm'):
|
||||||
|
for vm in self.domains:
|
||||||
|
if hasattr(vm, 'provides_network') and not hasattr(vm, 'netvm'):
|
||||||
|
self.default_netvm = vm
|
||||||
|
break
|
||||||
|
|
||||||
|
# first found template vm is the default
|
||||||
|
if not hasattr(self, 'default_template'):
|
||||||
|
for vm in self.domains:
|
||||||
|
if isinstance(vm, qubes.vm.templatevm.TemplateVM):
|
||||||
|
self.default_template = vm
|
||||||
|
break
|
||||||
|
|
||||||
|
# if there was no clockvm entry in qubes.xml, try to determine default:
|
||||||
|
# root of default NetVM chain
|
||||||
|
if not hasattr(self, 'clockvm') and hasattr(self, 'default_netvm'):
|
||||||
|
clockvm = self.default_netvm
|
||||||
|
# Find root of netvm chain
|
||||||
|
while clockvm.netvm is not None:
|
||||||
|
clockvm = clockvm.netvm
|
||||||
|
|
||||||
|
self.clockvm = clockvm
|
||||||
|
|
||||||
|
# Disable ntpd in ClockVM - to not conflict with ntpdate (both are
|
||||||
|
# using 123/udp port)
|
||||||
|
if hasattr(self, 'clockvm'):
|
||||||
|
self.clockvm.services['ntpd'] = False
|
||||||
|
|
||||||
|
|
||||||
|
def _init(self):
|
||||||
|
self._open_store()
|
||||||
|
|
||||||
|
self.labels = {
|
||||||
|
1: Label(1, '0xcc0000', 'red'),
|
||||||
|
2: Label(2, '0xf57900', 'orange'),
|
||||||
|
3: Label(3, '0xedd400', 'yellow'),
|
||||||
|
4: Label(4, '0x73d216', 'green'),
|
||||||
|
5: Label(5, '0x555753', 'gray'),
|
||||||
|
6: Label(6, '0x3465a4', 'blue'),
|
||||||
|
7: Label(7, '0x75507b', 'purple'),
|
||||||
|
8: Label(8, '0x000000', 'black'),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
# intentionally do not call explicit unlock to not unlock the file
|
||||||
|
# before all buffers are flushed
|
||||||
|
self._storefd.close()
|
||||||
|
del self._storefd
|
||||||
|
|
||||||
|
|
||||||
|
def __xml__(self):
|
||||||
|
element = lxml.etree.Element('qubes')
|
||||||
|
|
||||||
|
element.append(self.save_labels())
|
||||||
|
element.append(self.save_properties())
|
||||||
|
|
||||||
|
domains = lxml.etree.Element('domains')
|
||||||
|
for vm in self.domains:
|
||||||
|
domains.append(vm.__xml__())
|
||||||
|
element.append(domains)
|
||||||
|
|
||||||
|
return element
|
||||||
|
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
'''Save all data to qubes.xml
|
||||||
|
'''
|
||||||
|
self._storefd.seek(0)
|
||||||
|
self._storefd.truncate()
|
||||||
|
lxml.etree.ElementTree(self.__xml__()).write(
|
||||||
|
self._storefd, encoding='utf-8', pretty_print=True)
|
||||||
|
self._storefd.sync()
|
||||||
|
os.chmod(self._store, 0660)
|
||||||
|
os.chown(self._store, -1, grp.getgrnam('qubes').gr_gid)
|
||||||
|
|
||||||
|
|
||||||
|
def save_labels(self):
|
||||||
|
'''Serialise labels
|
||||||
|
|
||||||
|
:rtype: lxml.etree._Element
|
||||||
|
'''
|
||||||
|
|
||||||
|
labels = lxml.etree.Element('labels')
|
||||||
|
for label in self.labels:
|
||||||
|
labels.append(label.__xml__())
|
||||||
|
return labels
|
||||||
|
|
||||||
|
|
||||||
|
def add_new_vm(self, vm):
|
||||||
|
'''Add new Virtual Machine to colletion
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
if not hasattr(vm, 'qid'):
|
||||||
|
vm.qid = self.domains.get_new_unused_qid()
|
||||||
|
|
||||||
|
self.domains.add(vm)
|
||||||
|
|
||||||
|
#
|
||||||
|
# XXX
|
||||||
|
# all this will be moved to an event handler
|
||||||
|
# and deduplicated with self.load()
|
||||||
|
#
|
||||||
|
|
||||||
|
# make first created NetVM the default one
|
||||||
|
if not hasattr(self, 'default_fw_netvm') \
|
||||||
|
and vm.provides_network \
|
||||||
|
and not hasattr(vm, 'netvm'):
|
||||||
|
self.default_fw_netvm = vm
|
||||||
|
|
||||||
|
if not hasattr(self, 'default_netvm') \
|
||||||
|
and vm.provides_network \
|
||||||
|
and hasattr(vm, 'netvm'):
|
||||||
|
self.default_netvm = vm
|
||||||
|
|
||||||
|
# make first created TemplateVM the default one
|
||||||
|
if not hasattr(self, 'default_template') \
|
||||||
|
and not hasattr(vm, 'template'):
|
||||||
|
self.default_template = vm
|
||||||
|
|
||||||
|
# make first created ProxyVM the UpdateVM
|
||||||
|
if not hasattr(self, 'default_netvm') \
|
||||||
|
and vm.provides_network \
|
||||||
|
and hasattr(vm, 'netvm'):
|
||||||
|
self.updatevm = vm
|
||||||
|
|
||||||
|
# by default ClockVM is the first NetVM
|
||||||
|
if not hasattr(self, 'clockvm') \
|
||||||
|
and vm.provides_network \
|
||||||
|
and hasattr(vm, 'netvm'):
|
||||||
|
self.default_clockvm = vm
|
||||||
|
|
||||||
|
# XXX don't know if it should return self
|
||||||
|
return vm
|
||||||
|
|
||||||
|
# XXX This was in QubesVmCollection, will be in an event
|
||||||
|
# def pop(self, qid):
|
||||||
|
# if self.default_netvm_qid == qid:
|
||||||
|
# self.default_netvm_qid = None
|
||||||
|
# if self.default_fw_netvm_qid == qid:
|
||||||
|
# self.default_fw_netvm_qid = None
|
||||||
|
# if self.clockvm_qid == qid:
|
||||||
|
# self.clockvm_qid = None
|
||||||
|
# if self.updatevm_qid == qid:
|
||||||
|
# self.updatevm_qid = None
|
||||||
|
# if self.default_template_qid == qid:
|
||||||
|
# self.default_template_qid = None
|
||||||
|
#
|
||||||
|
# return super(QubesVmCollection, self).pop(qid)
|
||||||
|
|
||||||
|
|
||||||
|
# load plugins
|
||||||
|
import qubes._pluginloader
|
||||||
|
@ -8,9 +8,6 @@ Main public classes
|
|||||||
.. autoclass:: BaseVM
|
.. autoclass:: BaseVM
|
||||||
:members:
|
:members:
|
||||||
:show-inheritance:
|
:show-inheritance:
|
||||||
.. autoclass:: property
|
|
||||||
:members:
|
|
||||||
:show-inheritance:
|
|
||||||
|
|
||||||
Helper classes and functions
|
Helper classes and functions
|
||||||
----------------------------
|
----------------------------
|
||||||
@ -57,57 +54,11 @@ import functools
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
|
import lxml.etree
|
||||||
|
|
||||||
|
import qubes
|
||||||
import qubes.plugins
|
import qubes.plugins
|
||||||
|
|
||||||
class property(object):
|
|
||||||
'''Qubes VM property.
|
|
||||||
|
|
||||||
This class holds one property that can be saved and loaded from qubes.xml
|
|
||||||
|
|
||||||
:param str name: name of the property
|
|
||||||
:param object default: default value
|
|
||||||
:param type type: if not :py:obj:`None`, this is used to initialise value
|
|
||||||
:param int order: order of evaluation (bigger order values are later)
|
|
||||||
:param str doc: docstring
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
def __init__(self, name, default=None, type=None, order=0, doc=None):
|
|
||||||
self.__name__ = name
|
|
||||||
self._default = default
|
|
||||||
self._type = type
|
|
||||||
self.order = order
|
|
||||||
self.__doc__ = doc
|
|
||||||
|
|
||||||
self._attr_name = '_qubesprop_' + self.__name__
|
|
||||||
|
|
||||||
def __get__(self, instance, owner):
|
|
||||||
if instance is None:
|
|
||||||
return self
|
|
||||||
|
|
||||||
try:
|
|
||||||
return getattr(instance, self._attr_name)
|
|
||||||
|
|
||||||
except AttributeError:
|
|
||||||
if self._default is None:
|
|
||||||
raise AttributeError('property not set')
|
|
||||||
else:
|
|
||||||
return self._default
|
|
||||||
|
|
||||||
def __set__(self, instance, value):
|
|
||||||
setattr(instance, self._attr_name,
|
|
||||||
(self._type(value) if self._type is not None else value))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<{} object at {:#x} name={!r} default={!r}>'.format(
|
|
||||||
self.__class__.__name__, id(self), self.__name__, self._default)
|
|
||||||
|
|
||||||
def __hash__(self):
|
|
||||||
return hash(self.__name__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return self.__name__ == other.__name__
|
|
||||||
|
|
||||||
class VMPlugin(qubes.plugins.Plugin):
|
class VMPlugin(qubes.plugins.Plugin):
|
||||||
'''Metaclass for :py:class:`.BaseVM`'''
|
'''Metaclass for :py:class:`.BaseVM`'''
|
||||||
@ -115,7 +66,8 @@ class VMPlugin(qubes.plugins.Plugin):
|
|||||||
super(VMPlugin, cls).__init__(name, bases, dict_)
|
super(VMPlugin, cls).__init__(name, bases, dict_)
|
||||||
cls.__hooks__ = collections.defaultdict(list)
|
cls.__hooks__ = collections.defaultdict(list)
|
||||||
|
|
||||||
class BaseVM(object):
|
|
||||||
|
class BaseVM(qubes.PropertyHolder):
|
||||||
'''Base class for all VMs
|
'''Base class for all VMs
|
||||||
|
|
||||||
:param xml: xml node from which to deserialise
|
:param xml: xml node from which to deserialise
|
||||||
@ -128,53 +80,132 @@ class BaseVM(object):
|
|||||||
|
|
||||||
__metaclass__ = VMPlugin
|
__metaclass__ = VMPlugin
|
||||||
|
|
||||||
def get_props_list(self):
|
def __init__(self, app, xml=None, load_stage=2, services={}, devices=None, tags={}, **kwargs):
|
||||||
'''List all properties attached to this VM'''
|
self.app = app
|
||||||
props = set()
|
self.xml = xml
|
||||||
for class_ in self.__class__.__mro__:
|
self.services = services
|
||||||
props.update(prop for prop in class_.__dict__.values()
|
self.devices = collections.defaultdict(list) if devices is None else devices
|
||||||
if isinstance(prop, property))
|
self.tags = tags
|
||||||
return sorted(props, key=lambda prop: (prop.order, prop.__name__))
|
|
||||||
|
|
||||||
def __init__(self, xml):
|
all_names = set(prop.__name__ for prop in self.get_props_list(load_stage=2))
|
||||||
self._xml = xml
|
for key in kwargs:
|
||||||
|
if not key in all_names:
|
||||||
self.services = {}
|
|
||||||
self.devices = collections.defaultdict(list)
|
|
||||||
self.tags = {}
|
|
||||||
|
|
||||||
if self._xml is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
# properties
|
|
||||||
all_names = set(prop.__name__ for prop in self.get_props_list())
|
|
||||||
for node in self._xml.xpath('.//property'):
|
|
||||||
name = node.get('name')
|
|
||||||
value = node.get('ref') or node.text
|
|
||||||
|
|
||||||
if not name in all_names:
|
|
||||||
raise AttributeError(
|
raise AttributeError(
|
||||||
'No property {!r} found in {!r}'.format(
|
'No property {!r} found in {!r}'.format(
|
||||||
name, self.__class__))
|
key, self.__class__))
|
||||||
|
setattr(self, key, kwargs[key])
|
||||||
|
|
||||||
setattr(self, name, value)
|
|
||||||
|
|
||||||
# tags
|
def add_new_vm(self, vm):
|
||||||
for node in self._xml.xpath('.//tag'):
|
'''Add new Virtual Machine to colletion
|
||||||
self.tags[node.get('name')] = node.text
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
vm_cls = QubesVmClasses[vm_type]
|
||||||
|
if 'template' in kwargs:
|
||||||
|
if not vm_cls.is_template_compatible(kwargs['template']):
|
||||||
|
raise QubesException("Template not compatible with selected "
|
||||||
|
"VM type")
|
||||||
|
|
||||||
|
vm = vm_cls(qid=qid, collection=self, **kwargs)
|
||||||
|
if not self.verify_new_vm(vm):
|
||||||
|
raise QubesException("Wrong VM description!")
|
||||||
|
self[vm.qid] = vm
|
||||||
|
|
||||||
|
# make first created NetVM the default one
|
||||||
|
if self.default_fw_netvm_qid is None and vm.is_netvm():
|
||||||
|
self.set_default_fw_netvm(vm)
|
||||||
|
|
||||||
|
if self.default_netvm_qid is None and vm.is_proxyvm():
|
||||||
|
self.set_default_netvm(vm)
|
||||||
|
|
||||||
|
# make first created TemplateVM the default one
|
||||||
|
if self.default_template_qid is None and vm.is_template():
|
||||||
|
self.set_default_template(vm)
|
||||||
|
|
||||||
|
# make first created ProxyVM the UpdateVM
|
||||||
|
if self.updatevm_qid is None and vm.is_proxyvm():
|
||||||
|
self.set_updatevm_vm(vm)
|
||||||
|
|
||||||
|
# by default ClockVM is the first NetVM
|
||||||
|
if self.clockvm_qid is None and vm.is_netvm():
|
||||||
|
self.set_clockvm_vm(vm)
|
||||||
|
|
||||||
|
return vm
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fromxml(cls, app, xml, load_stage=2):
|
||||||
|
'''Create VM from XML node
|
||||||
|
|
||||||
|
:param qubes.Qubes app: :py:class:`qubes.Qubes` application instance
|
||||||
|
:param lxml.etree._Element xml: XML node reference
|
||||||
|
:param int load_stage: do not change the default (2) unless you know, what you are doing
|
||||||
|
'''
|
||||||
|
|
||||||
|
# sys.stderr.write('{}.fromxml(app={!r}, xml={!r}, load_stage={})\n'.format(
|
||||||
|
# cls.__name__, app, xml, load_stage))
|
||||||
|
if xml is None:
|
||||||
|
return cls(app)
|
||||||
|
|
||||||
|
services = {}
|
||||||
|
devices = collections.defaultdict(list)
|
||||||
|
tags = {}
|
||||||
|
|
||||||
# services
|
# services
|
||||||
for node in self._xml.xpath('.//service'):
|
for node in xml.xpath('./services/service'):
|
||||||
self.services[node.text] = bool(ast.literal_eval(node.get('enabled', 'True')))
|
services[node.text] = bool(ast.literal_eval(node.get('enabled', 'True')))
|
||||||
|
|
||||||
# devices (pci, usb, ...)
|
# devices (pci, usb, ...)
|
||||||
for parent in self._xml.xpath('.//devices'):
|
for parent in xml.xpath('./devices'):
|
||||||
devclass = parent.get('class')
|
devclass = parent.get('class')
|
||||||
for node in parent.xpath('./device'):
|
for node in parent.xpath('./device'):
|
||||||
self.devices[devclass].append(node.text)
|
devices[devclass].append(node.text)
|
||||||
|
|
||||||
# firewall
|
# tags
|
||||||
#TODO
|
for node in xml.xpath('./tags/tag'):
|
||||||
|
tags[node.get('name')] = node.text
|
||||||
|
|
||||||
|
# properties
|
||||||
|
self = cls(app, xml=xml, services=services, devices=devices, tags=tags)
|
||||||
|
self.load_properties(load_stage=load_stage)
|
||||||
|
|
||||||
|
# TODO: firewall, policy
|
||||||
|
|
||||||
|
# sys.stderr.write('{}.fromxml return\n'.format(cls.__name__))
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
|
def __xml__(self):
|
||||||
|
element = lxml.etree.Element('domain', id='domain-' + str(self.qid))
|
||||||
|
|
||||||
|
element.append(self.save_properties())
|
||||||
|
|
||||||
|
services = lxml.etree.Element('services')
|
||||||
|
for service in self.services:
|
||||||
|
node = lxml.etree.Element('service')
|
||||||
|
node.text = service
|
||||||
|
if not self.services[service]:
|
||||||
|
node.set('enabled', 'False')
|
||||||
|
services.append(node)
|
||||||
|
element.append(services)
|
||||||
|
|
||||||
|
for devclass in self.devices:
|
||||||
|
devices = lxml.etree.Element('devices')
|
||||||
|
devices.set('class', devclass)
|
||||||
|
for device in self.devices[devclass]:
|
||||||
|
node = lxml.etree.Element('device')
|
||||||
|
node.text = device
|
||||||
|
devices.append(node)
|
||||||
|
element.append(devices)
|
||||||
|
|
||||||
|
tags = lxml.etree.Element('tags')
|
||||||
|
for tag in self.tags:
|
||||||
|
node = lxml.etree.Element('tag', name=tag)
|
||||||
|
node.text = self.tags[tag]
|
||||||
|
tags.append(node)
|
||||||
|
element.append(tags)
|
||||||
|
|
||||||
|
return element
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<{} object at {:#x} {}>'.format(
|
return '<{} object at {:#x} {}>'.format(
|
||||||
@ -182,6 +213,7 @@ class BaseVM(object):
|
|||||||
' '.join('{}={}'.format(prop.__name__, getattr(self, prop.__name__))
|
' '.join('{}={}'.format(prop.__name__, getattr(self, prop.__name__))
|
||||||
for prop in self.get_props_list()))
|
for prop in self.get_props_list()))
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_hook(cls, event, f):
|
def add_hook(cls, event, f):
|
||||||
'''Add hook to entire VM class and all subclasses
|
'''Add hook to entire VM class and all subclasses
|
||||||
@ -195,6 +227,7 @@ class BaseVM(object):
|
|||||||
|
|
||||||
cls.__hooks__[event].append(f)
|
cls.__hooks__[event].append(f)
|
||||||
|
|
||||||
|
|
||||||
def fire_hooks(self, event, *args, **kwargs):
|
def fire_hooks(self, event, *args, **kwargs):
|
||||||
'''Fire hooks associated with an event
|
'''Fire hooks associated with an event
|
||||||
|
|
||||||
|
@ -1,8 +1,22 @@
|
|||||||
#!/usr/bin/python2 -O
|
#!/usr/bin/python2 -O
|
||||||
|
|
||||||
|
import qubes
|
||||||
import qubes.vm
|
import qubes.vm
|
||||||
|
|
||||||
class QubesVM(qubes.vm.BaseVM):
|
class QubesVM(qubes.vm.BaseVM):
|
||||||
'''Base functionality of Qubes VM shared between all VMs.'''
|
'''Base functionality of Qubes VM shared between all VMs.'''
|
||||||
def __init__(self, D):
|
|
||||||
super(QubesVM, self).__init__(D)
|
label = qubes.property('label',
|
||||||
|
setter=(lambda self, prop, value: self.app.labels[int(value.rsplit('-', 1)[1])]),
|
||||||
|
doc='Colourful label assigned to VM. This is where you set the colour of the padlock.')
|
||||||
|
|
||||||
|
netvm = qubes.property('netvm', load_stage=4,
|
||||||
|
default=(lambda self: self.app.default_fw_netvm if self.provides_network
|
||||||
|
else self.app.default_fw_netvm),
|
||||||
|
doc='VM that provides network connection to this domain. '
|
||||||
|
'When :py:obj:`False`, machine is disconnected. '
|
||||||
|
'When :py:obj:`None` (or absent), domain uses default NetVM.')
|
||||||
|
|
||||||
|
provides_network = qubes.property('provides_network',
|
||||||
|
type=bool,
|
||||||
|
doc=':py:obj:`True` if it is NetVM or ProxyVM, false otherwise')
|
||||||
|
@ -1,8 +1,13 @@
|
|||||||
#!/usr/bin/python2 -O
|
#!/usr/bin/python2 -O
|
||||||
|
|
||||||
|
import qubes
|
||||||
import qubes.vm.qubesvm
|
import qubes.vm.qubesvm
|
||||||
|
|
||||||
class TemplateVM(qubes.vm.qubesvm.QubesVM):
|
class TemplateVM(qubes.vm.qubesvm.QubesVM):
|
||||||
'''Template for AppVM'''
|
'''Template for AppVM'''
|
||||||
|
|
||||||
|
template = qubes.property('template',
|
||||||
|
setter=qubes.property.forbidden)
|
||||||
|
|
||||||
def __init__(self, D):
|
def __init__(self, D):
|
||||||
super(TemplateVM, self).__init__(D)
|
super(TemplateVM, self).__init__(D)
|
||||||
|
184
tests/init.py
184
tests/init.py
@ -7,9 +7,19 @@ import lxml.etree
|
|||||||
|
|
||||||
sys.path.insert(0, '../')
|
sys.path.insert(0, '../')
|
||||||
import qubes
|
import qubes
|
||||||
|
import qubes.vm
|
||||||
|
|
||||||
class TC_QubesVmLabel(unittest.TestCase):
|
class TC_10_Label(unittest.TestCase):
|
||||||
def test_000_appvm(self):
|
def test_000_constructor(self):
|
||||||
|
label = qubes.Label(1, '#cc0000', 'red')
|
||||||
|
|
||||||
|
self.assertEqual(label.index, 1)
|
||||||
|
self.assertEqual(label.color, '#cc0000')
|
||||||
|
self.assertEqual(label.name, 'red')
|
||||||
|
self.assertEqual(label.icon, 'appvm-red')
|
||||||
|
self.assertEqual(label.icon_dispvm, 'dispvm-red')
|
||||||
|
|
||||||
|
def test_001_fromxml(self):
|
||||||
xml = lxml.etree.XML('''
|
xml = lxml.etree.XML('''
|
||||||
<qubes version="3">
|
<qubes version="3">
|
||||||
<labels>
|
<labels>
|
||||||
@ -19,25 +29,175 @@ class TC_QubesVmLabel(unittest.TestCase):
|
|||||||
''')
|
''')
|
||||||
|
|
||||||
node = xml.xpath('//label')[0]
|
node = xml.xpath('//label')[0]
|
||||||
label = qubes.QubesVmLabel.fromxml(node)
|
label = qubes.Label.fromxml(node)
|
||||||
|
|
||||||
self.assertEqual(label.index, 1)
|
self.assertEqual(label.index, 1)
|
||||||
self.assertEqual(label.color, '#cc0000')
|
self.assertEqual(label.color, '#cc0000')
|
||||||
self.assertEqual(label.name, 'red')
|
self.assertEqual(label.name, 'red')
|
||||||
self.assertEqual(label.dispvm, False)
|
|
||||||
self.assertEqual(label.icon, 'appvm-red')
|
self.assertEqual(label.icon, 'appvm-red')
|
||||||
|
self.assertEqual(label.icon_dispvm, 'dispvm-red')
|
||||||
|
|
||||||
def test_001_dispvm(self):
|
|
||||||
|
class TestHolder(qubes.PropertyHolder):
|
||||||
|
testprop1 = qubes.property('testprop1', order=0)
|
||||||
|
testprop2 = qubes.property('testprop2', order=1, save_via_ref=True)
|
||||||
|
testprop3 = qubes.property('testprop3', order=2, default='testdefault')
|
||||||
|
testprop4 = qubes.property('testprop4', order=3)
|
||||||
|
|
||||||
|
class TC_00_PropertyHolder(unittest.TestCase):
|
||||||
|
def assertXMLEqual(self, xml1, xml2):
|
||||||
|
self.assertEqual(xml1.tag, xml2.tag)
|
||||||
|
self.assertEqual(xml1.text, xml2.text)
|
||||||
|
self.assertEqual(sorted(xml1.keys()), sorted(xml2.keys()))
|
||||||
|
for key in xml1.keys():
|
||||||
|
self.assertEqual(xml1.get(key), xml2.get(key))
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
xml = lxml.etree.XML('''
|
xml = lxml.etree.XML('''
|
||||||
<qubes version="3">
|
<qubes version="3">
|
||||||
<labels>
|
<properties>
|
||||||
<label id="label-2" color="#cc0000" dispvm="True">red</label>
|
<property name="testprop1">testvalue1</property>
|
||||||
</labels>
|
<property name="testprop2" ref="testref2" />
|
||||||
|
</properties>
|
||||||
</qubes>
|
</qubes>
|
||||||
''')
|
''')
|
||||||
|
|
||||||
node = xml.xpath('//label')[0]
|
self.holder = TestHolder(xml)
|
||||||
label = qubes.QubesVmLabel.fromxml(node)
|
|
||||||
|
|
||||||
self.assertEqual(label.dispvm, True)
|
def test_000_load_properties(self):
|
||||||
self.assertEqual(label.icon, 'dispvm-red')
|
self.holder.load_properties()
|
||||||
|
self.assertEquals(self.holder.testprop1, 'testvalue1')
|
||||||
|
self.assertEquals(self.holder.testprop2, 'testref2')
|
||||||
|
self.assertEquals(self.holder.testprop3, 'testdefault')
|
||||||
|
|
||||||
|
with self.assertRaises(AttributeError):
|
||||||
|
self.holder.testprop4
|
||||||
|
|
||||||
|
def test_001_save_properties(self):
|
||||||
|
self.holder.load_properties()
|
||||||
|
|
||||||
|
elements = self.holder.save_properties()
|
||||||
|
elements_with_defaults = self.holder.save_properties(with_defaults=True)
|
||||||
|
|
||||||
|
self.assertEqual(len(elements), 2)
|
||||||
|
self.assertEqual(len(elements_with_defaults), 3)
|
||||||
|
|
||||||
|
expected_prop1 = lxml.etree.Element('property', name='testprop1')
|
||||||
|
expected_prop1.text = 'testvalue1'
|
||||||
|
self.assertXMLEqual(elements_with_defaults[0], expected_prop1)
|
||||||
|
|
||||||
|
expected_prop2 = lxml.etree.Element('property', name='testprop2', ref='testref2')
|
||||||
|
self.assertXMLEqual(elements_with_defaults[1], expected_prop2)
|
||||||
|
|
||||||
|
expected_prop3 = lxml.etree.Element('property', name='testprop3')
|
||||||
|
expected_prop3.text = 'testdefault'
|
||||||
|
self.assertXMLEqual(elements_with_defaults[2], expected_prop3)
|
||||||
|
|
||||||
|
|
||||||
|
class TestVM(qubes.vm.BaseVM):
|
||||||
|
qid = qubes.property('qid', type=int)
|
||||||
|
name = qubes.property('name')
|
||||||
|
netid = qid
|
||||||
|
|
||||||
|
class TC_11_VMCollection(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# XXX passing None may be wrong in the future
|
||||||
|
self.vms = qubes.VMCollection(None)
|
||||||
|
|
||||||
|
self.testvm1 = TestVM(None, qid=1, name='testvm1')
|
||||||
|
self.testvm2 = TestVM(None, qid=2, name='testvm2')
|
||||||
|
|
||||||
|
def test_000_contains(self):
|
||||||
|
self.vms._dict = {1: self.testvm1}
|
||||||
|
|
||||||
|
self.assertIn(1, self.vms)
|
||||||
|
self.assertIn('testvm1', self.vms)
|
||||||
|
self.assertIn(self.testvm1, self.vms)
|
||||||
|
|
||||||
|
self.assertNotIn(2, self.vms)
|
||||||
|
self.assertNotIn('testvm2', self.vms)
|
||||||
|
self.assertNotIn(self.testvm2, self.vms)
|
||||||
|
|
||||||
|
def test_001_getitem(self):
|
||||||
|
self.vms._dict = {1: self.testvm1}
|
||||||
|
|
||||||
|
self.assertIs(self.vms[1], self.testvm1)
|
||||||
|
self.assertIs(self.vms['testvm1'], self.testvm1)
|
||||||
|
self.assertIs(self.vms[self.testvm1], self.testvm1)
|
||||||
|
|
||||||
|
def test_002_add(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.assertIn(1, self.vms)
|
||||||
|
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
self.vms.add(object())
|
||||||
|
|
||||||
|
testvm_qid_collision = TestVM(None, name='testvm2', qid=1)
|
||||||
|
testvm_name_collision = TestVM(None, name='testvm1', qid=2)
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.vms.add(testvm_qid_collision)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.vms.add(testvm_name_collision)
|
||||||
|
|
||||||
|
def test_003_qids(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.assertItemsEqual(self.vms.qids(), [1, 2])
|
||||||
|
self.assertItemsEqual(self.vms.keys(), [1, 2])
|
||||||
|
|
||||||
|
def test_004_names(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.assertItemsEqual(self.vms.names(), ['testvm1', 'testvm2'])
|
||||||
|
|
||||||
|
def test_005_vms(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.assertItemsEqual(self.vms.vms(), [self.testvm1, self.testvm2])
|
||||||
|
self.assertItemsEqual(self.vms.values(), [self.testvm1, self.testvm2])
|
||||||
|
|
||||||
|
def test_006_items(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.assertItemsEqual(self.vms.items(), [(1, self.testvm1), (2, self.testvm2)])
|
||||||
|
|
||||||
|
def test_007_len(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.vms), 2)
|
||||||
|
|
||||||
|
def test_008_delitem(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
del self.vms['testvm2']
|
||||||
|
|
||||||
|
self.assertItemsEqual(self.vms.vms(), [self.testvm1])
|
||||||
|
|
||||||
|
def test_100_get_new_unused_qid(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.vms.get_new_unused_qid()
|
||||||
|
|
||||||
|
def test_101_get_new_unused_netid(self):
|
||||||
|
self.vms.add(self.testvm1)
|
||||||
|
self.vms.add(self.testvm2)
|
||||||
|
|
||||||
|
self.vms.get_new_unused_netid()
|
||||||
|
|
||||||
|
# def test_200_get_vms_based_on(self):
|
||||||
|
# pass
|
||||||
|
|
||||||
|
# def test_201_get_vms_connected_to(self):
|
||||||
|
# pass
|
||||||
|
|
||||||
|
|
||||||
|
class TC_20_Qubes(unittest.TestCase):
|
||||||
|
pass
|
||||||
|
23
tests/vm.py
23
tests/vm.py
@ -8,10 +8,13 @@ import lxml.etree
|
|||||||
sys.path.insert(0, '../')
|
sys.path.insert(0, '../')
|
||||||
import qubes.vm
|
import qubes.vm
|
||||||
|
|
||||||
|
|
||||||
class TestVM(qubes.vm.BaseVM):
|
class TestVM(qubes.vm.BaseVM):
|
||||||
testprop = qubes.vm.property('testprop')
|
qid = qubes.property('qid', type=int)
|
||||||
testlabel = qubes.vm.property('testlabel')
|
name = qubes.property('name')
|
||||||
defaultprop = qubes.vm.property('defaultprop', default='defaultvalue')
|
testprop = qubes.property('testprop')
|
||||||
|
testlabel = qubes.property('testlabel')
|
||||||
|
defaultprop = qubes.property('defaultprop', default='defaultvalue')
|
||||||
|
|
||||||
class TC_BaseVM(unittest.TestCase):
|
class TC_BaseVM(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -24,6 +27,8 @@ class TC_BaseVM(unittest.TestCase):
|
|||||||
<domains>
|
<domains>
|
||||||
<domain id="domain-1" class="TestVM">
|
<domain id="domain-1" class="TestVM">
|
||||||
<properties>
|
<properties>
|
||||||
|
<property name="qid">1</property>
|
||||||
|
<property name="name">domain1</property>
|
||||||
<property name="testprop">testvalue</property>
|
<property name="testprop">testvalue</property>
|
||||||
<property name="testlabel" ref="label-1" />
|
<property name="testlabel" ref="label-1" />
|
||||||
</properties>
|
</properties>
|
||||||
@ -54,8 +59,10 @@ class TC_BaseVM(unittest.TestCase):
|
|||||||
|
|
||||||
def test_000_BaseVM_load(self):
|
def test_000_BaseVM_load(self):
|
||||||
node = self.xml.xpath('//domain')[0]
|
node = self.xml.xpath('//domain')[0]
|
||||||
vm = TestVM(node)
|
vm = TestVM.fromxml(None, node)
|
||||||
|
|
||||||
|
self.assertEqual(vm.qid, 1)
|
||||||
|
self.assertEqual(vm.testprop, 'testvalue')
|
||||||
self.assertEqual(vm.testprop, 'testvalue')
|
self.assertEqual(vm.testprop, 'testvalue')
|
||||||
self.assertEqual(vm.testlabel, 'label-1')
|
self.assertEqual(vm.testlabel, 'label-1')
|
||||||
self.assertEqual(vm.defaultprop, 'defaultvalue')
|
self.assertEqual(vm.defaultprop, 'defaultvalue')
|
||||||
@ -67,6 +74,8 @@ class TC_BaseVM(unittest.TestCase):
|
|||||||
'disabledservice': False,
|
'disabledservice': False,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
lxml.etree.ElementTree(vm.__xml__()).write(sys.stderr, encoding='utf-8', pretty_print=True)
|
||||||
|
|
||||||
def test_001_BaseVM_nxproperty(self):
|
def test_001_BaseVM_nxproperty(self):
|
||||||
xml = lxml.etree.XML('''
|
xml = lxml.etree.XML('''
|
||||||
<qubes version="3">
|
<qubes version="3">
|
||||||
@ -82,7 +91,5 @@ class TC_BaseVM(unittest.TestCase):
|
|||||||
|
|
||||||
node = xml.xpath('//domain')[0]
|
node = xml.xpath('//domain')[0]
|
||||||
|
|
||||||
def f():
|
with self.assertRaises(AttributeError):
|
||||||
vm = TestVM(node)
|
TestVM.fromxml(None, node)
|
||||||
|
|
||||||
self.assertRaises(AttributeError, f)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user