36bd834c01
Core3 keep information whether property have default value for all the properties (not only few like netvm or kernel). Try to use this feature as much as possible.
282 lines
10 KiB
Python
282 lines
10 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2016 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>
|
|
#
|
|
|
|
import ast
|
|
import xml.parsers.expat
|
|
|
|
import lxml.etree
|
|
|
|
import qubes
|
|
import qubes.vm.appvm
|
|
import qubes.vm.standalonevm
|
|
import qubes.vm.templatevm
|
|
import qubes.vm.adminvm
|
|
import qubes.ext.r3compatibility
|
|
|
|
|
|
class AppVM(qubes.vm.appvm.AppVM):
|
|
"""core2 compatibility AppVM class, with variable dir_path"""
|
|
dir_path = qubes.property('dir_path',
|
|
# pylint: disable=undefined-variable
|
|
default=(lambda self: super(AppVM, self).dir_path),
|
|
saver=qubes.property.dontsave,
|
|
doc="VM storage directory",
|
|
)
|
|
|
|
def is_running(self):
|
|
return False
|
|
|
|
class StandaloneVM(qubes.vm.standalonevm.StandaloneVM):
|
|
"""core2 compatibility StandaloneVM class, with variable dir_path"""
|
|
dir_path = qubes.property('dir_path',
|
|
# pylint: disable=undefined-variable
|
|
default=(lambda self: super(StandaloneVM, self).dir_path),
|
|
saver=qubes.property.dontsave,
|
|
doc="VM storage directory")
|
|
|
|
def is_running(self):
|
|
return False
|
|
|
|
|
|
class Core2Qubes(qubes.Qubes):
|
|
|
|
def __init__(self, store=None, load=True, **kwargs):
|
|
if store is None:
|
|
raise ValueError("store path required")
|
|
super(Core2Qubes, self).__init__(store, load, **kwargs)
|
|
|
|
def load_default_template(self, element):
|
|
default_template = element.get("default_template")
|
|
self.default_template = int(default_template) \
|
|
if default_template.lower() != "none" else None
|
|
|
|
def load_globals(self, element):
|
|
default_netvm = element.get("default_netvm")
|
|
if default_netvm is not None:
|
|
self.default_netvm = int(default_netvm) \
|
|
if default_netvm != "None" else None
|
|
|
|
default_fw_netvm = element.get("default_fw_netvm")
|
|
if default_fw_netvm is not None:
|
|
self.default_fw_netvm = int(default_fw_netvm) \
|
|
if default_fw_netvm != "None" else None
|
|
|
|
updatevm = element.get("updatevm")
|
|
if updatevm is not None:
|
|
self.updatevm = int(updatevm) \
|
|
if updatevm != "None" else None
|
|
|
|
clockvm = element.get("clockvm")
|
|
if clockvm is not None:
|
|
self.clockvm = int(clockvm) \
|
|
if clockvm != "None" else None
|
|
|
|
|
|
def set_netvm_dependency(self, element):
|
|
kwargs = {}
|
|
attr_list = ("qid", "uses_default_netvm", "netvm_qid")
|
|
|
|
for attribute in attr_list:
|
|
kwargs[attribute] = element.get(attribute)
|
|
|
|
vm = self.domains[int(kwargs["qid"])]
|
|
|
|
if element.get("uses_default_netvm") is None:
|
|
uses_default_netvm = True
|
|
else:
|
|
uses_default_netvm = (
|
|
True if element.get("uses_default_netvm") == "True" else False)
|
|
if not uses_default_netvm:
|
|
netvm_qid = element.get("netvm_qid")
|
|
if netvm_qid is None or netvm_qid == "none":
|
|
vm.netvm = None
|
|
else:
|
|
vm.netvm = int(netvm_qid)
|
|
|
|
def set_dispvm_netvm_dependency(self, element):
|
|
kwargs = {}
|
|
attr_list = ("qid", "uses_default_netvm", "netvm_qid")
|
|
|
|
for attribute in attr_list:
|
|
kwargs[attribute] = element.get(attribute)
|
|
|
|
vm = self.domains[int(kwargs["qid"])]
|
|
|
|
if element.get("uses_default_dispvm_netvm") is None:
|
|
uses_default_dispvm_netvm = True
|
|
else:
|
|
uses_default_dispvm_netvm = (
|
|
True if element.get("uses_default_dispvm_netvm") == "True"
|
|
else False)
|
|
if not uses_default_dispvm_netvm:
|
|
dispvm_netvm_qid = element.get("dispvm_netvm_qid")
|
|
if dispvm_netvm_qid is None or dispvm_netvm_qid == "none":
|
|
dispvm_netvm = None
|
|
else:
|
|
dispvm_netvm = self.domains[int(dispvm_netvm_qid)]
|
|
else:
|
|
dispvm_netvm = vm.netvm
|
|
|
|
if dispvm_netvm:
|
|
dispvm_tpl_name = 'disp-{}'.format(dispvm_netvm.name)
|
|
else:
|
|
dispvm_tpl_name = 'disp-no-netvm'
|
|
|
|
if dispvm_tpl_name not in self.domains:
|
|
vm = self.add_new_vm(qubes.vm.appvm.AppVM,
|
|
name=dispvm_tpl_name)
|
|
# TODO: add support for #2075
|
|
# TODO: set qrexec policy based on dispvm_netvm value
|
|
|
|
def import_core2_vm(self, element):
|
|
vm_class_name = element.tag
|
|
try:
|
|
kwargs = {}
|
|
if vm_class_name in ["QubesTemplateVm", "QubesTemplateHVm"]:
|
|
vm_class = qubes.vm.templatevm.TemplateVM
|
|
elif element.get('template_qid').lower() == "none":
|
|
kwargs['dir_path'] = element.get('dir_path')
|
|
vm_class = StandaloneVM
|
|
else:
|
|
kwargs['dir_path'] = element.get('dir_path')
|
|
kwargs['template'] = self.domains[int(element.get(
|
|
'template_qid'))]
|
|
vm_class = AppVM
|
|
# simple attributes
|
|
for attr, default in {
|
|
'installed_by_rpm': 'False',
|
|
'include_in_backups': 'True',
|
|
'qrexec_timeout': '60',
|
|
'internal': 'False',
|
|
'label': None,
|
|
'name': None,
|
|
'vcpus': '2',
|
|
'memory': '400',
|
|
'maxmem': '4000',
|
|
'default_user': 'user',
|
|
'debug': 'False',
|
|
'pci_strictreset': 'True',
|
|
'mac': None,
|
|
'autostart': 'False'}.items():
|
|
value = element.get(attr)
|
|
if value and value != default:
|
|
kwargs[attr] = value
|
|
# attributes with default value
|
|
for attr in ["kernel", "kernelopts"]:
|
|
value = element.get(attr)
|
|
if value and value.lower() == "none":
|
|
value = None
|
|
value_is_default = element.get(
|
|
"uses_default_{}".format(attr))
|
|
if value_is_default and value_is_default.lower() != \
|
|
"true":
|
|
kwargs[attr] = value
|
|
kwargs['hvm'] = "HVm" in vm_class_name
|
|
kwargs['provides_network'] = \
|
|
vm_class_name in ('QubesNetVm', 'QubesProxyVm')
|
|
if vm_class_name == 'QubesNetVm':
|
|
kwargs['netvm'] = None
|
|
vm = self.add_new_vm(vm_class,
|
|
qid=int(element.get('qid')), **kwargs)
|
|
services = element.get('services')
|
|
if services:
|
|
services = ast.literal_eval(services)
|
|
else:
|
|
services = {}
|
|
for service, value in services.items():
|
|
feature = service
|
|
for repl_feature, repl_service in \
|
|
qubes.ext.r3compatibility.\
|
|
R3Compatibility.features_to_services.\
|
|
items():
|
|
if repl_service == service:
|
|
feature = repl_feature
|
|
vm.features[feature] = value
|
|
for attr in ['backup_content', 'backup_path',
|
|
'backup_size']:
|
|
value = element.get(attr)
|
|
vm.features[attr.replace('_', '-')] = value
|
|
pcidevs = element.get('pcidevs')
|
|
if pcidevs:
|
|
pcidevs = ast.literal_eval(pcidevs)
|
|
for pcidev in pcidevs:
|
|
try:
|
|
vm.devices["pci"].attach(
|
|
self.domains[0].devices['pci'][pcidev])
|
|
except qubes.exc.QubesException as e:
|
|
self.log.error("VM {}: {}".format(vm.name, str(e)))
|
|
except (ValueError, LookupError) as err:
|
|
self.log.error("import error ({1}): {2}".format(
|
|
vm_class_name, err))
|
|
if 'vm' in locals():
|
|
del self.domains[vm]
|
|
|
|
def load(self):
|
|
qubes_store_file = open(self._store, 'r')
|
|
|
|
try:
|
|
qubes_store_file.seek(0)
|
|
tree = lxml.etree.parse(qubes_store_file)
|
|
except (EnvironmentError, # pylint: disable=broad-except
|
|
xml.parsers.expat.ExpatError) as err:
|
|
self.log.error(err)
|
|
return False
|
|
|
|
self.load_initial_values()
|
|
|
|
self.default_kernel = tree.getroot().get("default_kernel")
|
|
|
|
vm_classes = ["TemplateVm", "TemplateHVm",
|
|
"AppVm", "HVm", "NetVm", "ProxyVm"]
|
|
# First load templates
|
|
for vm_class_name in ["TemplateVm", "TemplateHVm"]:
|
|
vms_of_class = tree.findall("Qubes" + vm_class_name)
|
|
for element in vms_of_class:
|
|
self.import_core2_vm(element)
|
|
|
|
# Then set default template ...
|
|
self.load_default_template(tree.getroot())
|
|
|
|
# ... and load other VMs
|
|
for vm_class_name in ["AppVm", "HVm", "NetVm", "ProxyVm"]:
|
|
vms_of_class = tree.findall("Qubes" + vm_class_name)
|
|
# first non-template based, then template based
|
|
sorted_vms_of_class = sorted(vms_of_class,
|
|
key=lambda x: str(x.get('template_qid')).lower() != "none")
|
|
for element in sorted_vms_of_class:
|
|
self.import_core2_vm(element)
|
|
|
|
# After importing all VMs, set netvm references, in the same order
|
|
for vm_class_name in vm_classes:
|
|
for element in tree.findall("Qubes" + vm_class_name):
|
|
try:
|
|
self.set_netvm_dependency(element)
|
|
except (ValueError, LookupError) as err:
|
|
self.log.error("VM {}: failed to set netvm dependency: {}".
|
|
format(element.get('name'), err))
|
|
|
|
# and load other defaults (default netvm, updatevm etc)
|
|
self.load_globals(tree.getroot())
|
|
|
|
def save(self):
|
|
raise NotImplementedError("Saving old qubes.xml not supported")
|