123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- #
- # The Qubes OS Project, http://www.qubes-os.org
- #
- # Copyright (C) 2016 Marek Marczykowski-Górecki
- # <marmarek@invisiblethingslab.com>
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU General Public License
- # as published by the Free Software Foundation; either version 2
- # of the License, or (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>
- #
- import ast
- import xml.parsers.expat
- import lxml.etree
- import qubes
- import qubes.vm.appvm
- import qubes.vm.standalonevm
- import qubes.vm.templatevm
- import qubes.vm.adminvm
- import qubes.ext.r3compatibility
- class AppVM(qubes.vm.appvm.AppVM):
- """core2 compatibility AppVM class, with variable dir_path"""
- dir_path = qubes.property('dir_path',
- default=(lambda self: self.storage.vmdir),
- saver=qubes.property.dontsave,
- doc="VM storage directory",
- )
- def is_running(self):
- return False
- class StandaloneVM(qubes.vm.standalonevm.StandaloneVM):
- """core2 compatibility StandaloneVM class, with variable dir_path"""
- dir_path = qubes.property('dir_path',
- default=(lambda self: self.storage.vmdir),
- saver=qubes.property.dontsave,
- doc="VM storage directory")
- def is_running(self):
- return False
- class Core2Qubes(qubes.Qubes):
- def __init__(self, store=None, load=True, **kwargs):
- if store is None:
- raise ValueError("store path required")
- super(Core2Qubes, self).__init__(store, load, **kwargs)
- def load_default_template(self, element):
- default_template = element.get("default_template")
- self.default_template = int(default_template) \
- if default_template.lower() != "none" else None
- def load_globals(self, element):
- default_netvm = element.get("default_netvm")
- if default_netvm is not None:
- self.default_netvm = int(default_netvm) \
- if default_netvm != "None" else None
- default_fw_netvm = element.get("default_fw_netvm")
- if default_fw_netvm is not None:
- self.default_fw_netvm = int(default_fw_netvm) \
- if default_fw_netvm != "None" else None
- updatevm = element.get("updatevm")
- if updatevm is not None:
- self.updatevm = int(updatevm) \
- if updatevm != "None" else None
- clockvm = element.get("clockvm")
- if clockvm is not None:
- self.clockvm = int(clockvm) \
- if clockvm != "None" else None
- def set_netvm_dependency(self, element):
- kwargs = {}
- attr_list = ("qid", "uses_default_netvm", "netvm_qid")
- for attribute in attr_list:
- kwargs[attribute] = element.get(attribute)
- vm = self.domains[int(kwargs["qid"])]
- if element.get("uses_default_netvm") is None:
- uses_default_netvm = True
- else:
- uses_default_netvm = (
- True if element.get("uses_default_netvm") == "True" else False)
- if not uses_default_netvm:
- netvm_qid = element.get("netvm_qid")
- if netvm_qid is None or netvm_qid == "none":
- vm.netvm = None
- else:
- vm.netvm = int(netvm_qid)
- # TODO: dispvm_netvm
- def import_core2_vm(self, element):
- vm_class_name = element.tag
- try:
- kwargs = {}
- if vm_class_name in ["TemplateVm", "TemplateHVm"]:
- vm_class = qubes.vm.templatevm.TemplateVM
- elif element.get('template_qid').lower() == "none":
- kwargs['dir_path'] = element.get('dir_path')
- vm_class = StandaloneVM
- else:
- kwargs['dir_path'] = element.get('dir_path')
- kwargs['template'] = int(element.get('template_qid'))
- vm_class = AppVM
- # simple attributes
- for attr in ['installed_by_rpm', 'include_in_backups',
- 'qrexec_timeout', 'internal', 'label', 'name',
- 'vcpus', 'memory', 'maxmem', 'default_user',
- 'debug', 'pci_strictreset', 'mac', 'autostart']:
- value = element.get(attr)
- if value:
- kwargs[attr] = value
- # attributes with default value
- for attr in ["kernel", "kernelopts"]:
- value = element.get(attr)
- if value and value.lower() == "none":
- value = None
- value_is_default = element.get(
- "uses_default_{}".format(attr))
- if value_is_default and value_is_default.lower() != \
- "true":
- kwargs[attr] = value
- kwargs['hvm'] = "HVm" in vm_class_name
- vm = self.add_new_vm(vm_class,
- qid=int(element.get('qid')), **kwargs)
- services = element.get('services')
- if services:
- services = ast.literal_eval(services)
- else:
- services = {}
- for service, value in services.iteritems():
- feature = service
- for repl_feature, repl_service in \
- qubes.ext.r3compatibility.\
- R3Compatibility.features_to_services.\
- iteritems():
- if repl_service == service:
- feature = repl_feature
- vm.features[feature] = value
- for attr in ['backup_content', 'backup_path',
- 'backup_size']:
- value = element.get(attr)
- vm.features[attr.replace('_', '-')] = value
- pcidevs = element.get('pcidevs')
- if pcidevs:
- pcidevs = ast.literal_eval(pcidevs)
- for pcidev in pcidevs:
- try:
- vm.devices["pci"].attach(pcidev)
- except qubes.exc.QubesException as e:
- self.log.error("VM {}: {}".format(vm.name, str(e)))
- except (ValueError, LookupError) as err:
- self.log.error("import error ({1}): {2}".format(
- vm_class_name, err))
- if 'vm' in locals():
- del self.domains[vm]
- def load(self):
- qubes_store_file = open(self._store, 'r')
- try:
- qubes_store_file.seek(0)
- tree = lxml.etree.parse(qubes_store_file)
- except (EnvironmentError, # pylint: disable=broad-except
- xml.parsers.expat.ExpatError) as err:
- self.log.error(err)
- return False
- self.load_initial_values()
- self.default_kernel = tree.getroot().get("default_kernel")
- vm_classes = ["TemplateVm", "TemplateHVm",
- "AppVm", "HVm", "NetVm", "ProxyVm"]
- # First load templates
- for vm_class_name in ["TemplateVm", "TemplateHVm"]:
- vms_of_class = tree.findall("Qubes" + vm_class_name)
- for element in vms_of_class:
- self.import_core2_vm(element)
- # Then set default template ...
- self.load_default_template(tree.getroot())
- # ... and load other VMs
- for vm_class_name in ["AppVm", "HVm", "NetVm", "ProxyVm"]:
- vms_of_class = tree.findall("Qubes" + vm_class_name)
- # first non-template based, then template based
- sorted_vms_of_class = sorted(vms_of_class,
- key=lambda x: str(x.get('template_qid')).lower() != "none")
- for element in sorted_vms_of_class:
- self.import_core2_vm(element)
- # After importing all VMs, set netvm references, in the same order
- for vm_class_name in vm_classes:
- for element in tree.findall("Qubes" + vm_class_name):
- try:
- self.set_netvm_dependency(element)
- except (ValueError, LookupError) as err:
- self.log.error("VM {}: failed to set netvm dependency: {}".
- format(element.get('name'), err))
- # and load other defaults (default netvm, updatevm etc)
- self.load_globals(tree.getroot())
- def save(self):
- raise NotImplementedError("Saving old qubes.xml not supported")
|