7f728e2dad
Labels can be referred as either id or name. Support both ways. Fixes QubesOS/qubes-issues#3211
164 lines
5.8 KiB
Python
164 lines
5.8 KiB
Python
# -*- encoding: utf8 -*-
|
|
#
|
|
# The Qubes OS Project, http://www.qubes-os.org
|
|
#
|
|
# Copyright (C) 2017 Marek Marczykowski-Górecki
|
|
# <marmarek@invisiblethingslab.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation; either version 2.1 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License along
|
|
# with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
'''Parser for core2 qubes.xml'''
|
|
|
|
import xml.parsers
|
|
import logging
|
|
import lxml.etree
|
|
|
|
import qubesadmin.backup
|
|
import qubesadmin.firewall
|
|
|
|
class Core3VM(qubesadmin.backup.BackupVM):
|
|
'''VM object'''
|
|
# pylint: disable=too-few-public-methods
|
|
@property
|
|
def included_in_backup(self):
|
|
return self.backup_path is not None
|
|
|
|
def handle_firewall_xml(self, vm, stream):
|
|
'''Load new (Qubes >= 4.0) firewall XML format'''
|
|
try:
|
|
tree = lxml.etree.parse(stream) # pylint: disable=no-member
|
|
xml_root = tree.getroot()
|
|
rules = []
|
|
for rule_node in xml_root.findall('./rules/rule'):
|
|
rule_opts = {}
|
|
for rule_opt in rule_node.findall('./properties/property'):
|
|
rule_opts[rule_opt.get('name')] = rule_opt.text
|
|
rules.append(qubesadmin.firewall.Rule(None, **rule_opts))
|
|
|
|
vm.firewall.rules = rules
|
|
except: # pylint: disable=bare-except
|
|
vm.log.exception('Failed to set firewall')
|
|
|
|
class Core3Qubes(qubesadmin.backup.BackupApp):
|
|
'''Parsed qubes.xml'''
|
|
def __init__(self, store=None):
|
|
if store is None:
|
|
raise ValueError("store path required")
|
|
self.log = logging.getLogger('qubesadmin.backup.core3')
|
|
self.labels = {}
|
|
super(Core3Qubes, self).__init__(store)
|
|
|
|
@staticmethod
|
|
def get_property(xml_obj, prop):
|
|
'''Get property of given object (XML node)
|
|
|
|
Object can be any PropertyHolder serialized to XML - in practice
|
|
:py:class:`BaseVM` or :py:class:`Qubes`.
|
|
'''
|
|
xml_prop = xml_obj.findall('./property[@name=\'{}\']'.format(prop))
|
|
if not xml_prop:
|
|
raise KeyError(prop)
|
|
return xml_prop[0].text
|
|
|
|
def load_labels(self, labels_element):
|
|
'''Load labels table'''
|
|
for node in labels_element.findall('label'):
|
|
ident = node.get('id')
|
|
assert ident is not None
|
|
self.labels[ident] = node.text
|
|
self.labels[node.text] = node.text
|
|
|
|
|
|
def load_globals(self, globals_element):
|
|
'''Load global settings
|
|
|
|
:param globals_element: XML element containing global settings
|
|
'''
|
|
for node in globals_element.findall('property'):
|
|
name = node.get('name')
|
|
assert name is not None
|
|
self.globals[name] = node.text
|
|
|
|
def import_core3_vm(self, element):
|
|
'''Parse a single VM from given XML node
|
|
|
|
This method load only VM properties not depending on other VMs
|
|
(other than template). VM connections are set later.
|
|
:param element: XML node
|
|
'''
|
|
vm = Core3VM()
|
|
vm.klass = element.get('class')
|
|
|
|
for node in element.findall('./properties/property'):
|
|
name = node.get('name')
|
|
assert name is not None
|
|
vm.properties[name] = node.text
|
|
|
|
for node in element.findall('./features/feature'):
|
|
name = node.get('name')
|
|
assert name is not None
|
|
vm.features[name] = False if node.text is None else node.text
|
|
|
|
for node in element.findall('./tags/tag'):
|
|
name = node.get('name')
|
|
assert name is not None
|
|
vm.tags.add(name)
|
|
|
|
for bus_node in element.findall('./devices'):
|
|
bus_name = bus_node.get('class')
|
|
assert bus_name is not None
|
|
for node in bus_node.findall('./device'):
|
|
backend_domain = node.get('backend-domain')
|
|
ident = node.get('id')
|
|
options = {}
|
|
for opt_node in node.findall('./option'):
|
|
opt_name = opt_node.get('name')
|
|
options[opt_name] = opt_node.text
|
|
vm.devices[bus_name][(backend_domain, ident)] = options
|
|
|
|
# extract base properties
|
|
if vm.klass == 'AdminVM':
|
|
vm.name = 'dom0'
|
|
else:
|
|
vm.name = vm.properties.pop('name')
|
|
vm.label = self.labels[vm.properties.pop('label')]
|
|
vm.template = vm.properties.pop('template', None)
|
|
# skip UUID and qid, will be generated during restore
|
|
vm.properties.pop('uuid', None)
|
|
vm.properties.pop('qid', None)
|
|
|
|
if vm.features.pop('backup-content', False):
|
|
vm.backup_path = vm.features.pop('backup-path', None)
|
|
vm.size = vm.features.pop('backup-size', 0)
|
|
|
|
self.domains[vm.name] = vm
|
|
|
|
def load(self):
|
|
with open(self.store) as fh:
|
|
try:
|
|
# pylint: disable=no-member
|
|
tree = lxml.etree.parse(fh)
|
|
except (EnvironmentError, # pylint: disable=broad-except
|
|
xml.parsers.expat.ExpatError) as err:
|
|
self.log.error(err)
|
|
return False
|
|
|
|
self.load_labels(tree.find('./labels'))
|
|
|
|
for element in tree.findall('./domains/domain'):
|
|
self.import_core3_vm(element)
|
|
|
|
# and load other defaults (default netvm, updatevm etc)
|
|
self.load_globals(tree.find('./properties'))
|