core3.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. '''Parser for core3 qubes.xml'''
  21. import xml.parsers
  22. import logging
  23. import lxml.etree
  24. import qubesadmin.backup
  25. import qubesadmin.firewall
  26. class Core3VM(qubesadmin.backup.BackupVM):
  27. '''VM object'''
  28. # pylint: disable=too-few-public-methods
  29. @property
  30. def included_in_backup(self):
  31. return self.backup_path is not None
  32. def handle_firewall_xml(self, vm, stream):
  33. '''Load new (Qubes >= 4.0) firewall XML format'''
  34. try:
  35. tree = lxml.etree.parse(stream) # pylint: disable=no-member
  36. xml_root = tree.getroot()
  37. rules = []
  38. for rule_node in xml_root.findall('./rules/rule'):
  39. rule_opts = {}
  40. for rule_opt in rule_node.findall('./properties/property'):
  41. rule_opts[rule_opt.get('name')] = rule_opt.text
  42. rules.append(qubesadmin.firewall.Rule(None, **rule_opts))
  43. vm.firewall.rules = rules
  44. except: # pylint: disable=bare-except
  45. vm.log.exception('Failed to set firewall')
  46. class Core3Qubes(qubesadmin.backup.BackupApp):
  47. '''Parsed qubes.xml'''
  48. def __init__(self, store=None):
  49. if store is None:
  50. raise ValueError("store path required")
  51. self.log = logging.getLogger('qubesadmin.backup.core3')
  52. self.labels = {}
  53. super().__init__(store)
  54. @staticmethod
  55. def get_property(xml_obj, prop):
  56. '''Get property of given object (XML node)
  57. Object can be any PropertyHolder serialized to XML - in practice
  58. :py:class:`BaseVM` or :py:class:`Qubes`.
  59. '''
  60. xml_prop = xml_obj.findall('./property[@name=\'{}\']'.format(prop))
  61. if not xml_prop:
  62. raise KeyError(prop)
  63. return xml_prop[0].text
  64. def load_labels(self, labels_element):
  65. '''Load labels table'''
  66. for node in labels_element.findall('label'):
  67. ident = node.get('id')
  68. assert ident is not None
  69. self.labels[ident] = node.text
  70. self.labels[node.text] = node.text
  71. def load_globals(self, globals_element):
  72. '''Load global settings
  73. :param globals_element: XML element containing global settings
  74. '''
  75. for node in globals_element.findall('property'):
  76. name = node.get('name')
  77. assert name is not None
  78. self.globals[name] = node.text
  79. def import_core3_vm(self, element):
  80. '''Parse a single VM from given XML node
  81. This method load only VM properties not depending on other VMs
  82. (other than template). VM connections are set later.
  83. :param element: XML node
  84. '''
  85. vm = Core3VM()
  86. vm.klass = element.get('class')
  87. for node in element.findall('./properties/property'):
  88. name = node.get('name')
  89. assert name is not None
  90. vm.properties[name] = node.text
  91. for node in element.findall('./features/feature'):
  92. name = node.get('name')
  93. assert name is not None
  94. vm.features[name] = False if node.text is None else node.text
  95. for node in element.findall('./tags/tag'):
  96. name = node.get('name')
  97. assert name is not None
  98. vm.tags.add(name)
  99. for bus_node in element.findall('./devices'):
  100. bus_name = bus_node.get('class')
  101. assert bus_name is not None
  102. for node in bus_node.findall('./device'):
  103. backend_domain = node.get('backend-domain')
  104. ident = node.get('id')
  105. options = {}
  106. for opt_node in node.findall('./option'):
  107. opt_name = opt_node.get('name')
  108. options[opt_name] = opt_node.text
  109. vm.devices[bus_name][(backend_domain, ident)] = options
  110. # extract base properties
  111. if vm.klass == 'AdminVM':
  112. vm.name = 'dom0'
  113. else:
  114. vm.name = vm.properties.pop('name')
  115. vm.label = self.labels[vm.properties.pop('label')]
  116. vm.template = vm.properties.pop('template', None)
  117. # skip UUID and qid, will be generated during restore
  118. vm.properties.pop('uuid', None)
  119. vm.properties.pop('qid', None)
  120. if vm.features.pop('backup-content', False):
  121. vm.backup_path = vm.features.pop('backup-path', None)
  122. vm.size = vm.features.pop('backup-size', 0)
  123. self.domains[vm.name] = vm
  124. def load(self):
  125. with open(self.store) as fh:
  126. try:
  127. # pylint: disable=no-member
  128. tree = lxml.etree.parse(fh)
  129. except (EnvironmentError, # pylint: disable=broad-except
  130. xml.parsers.expat.ExpatError) as err:
  131. self.log.error(err)
  132. return False
  133. self.load_labels(tree.find('./labels'))
  134. for element in tree.findall('./domains/domain'):
  135. self.import_core3_vm(element)
  136. # and load other defaults (default netvm, updatevm etc)
  137. self.load_globals(tree.find('./properties'))