Prevent removing domain that is referenced from anywhere

Check VM properties and global properties (all of them).

Fixes QubesOS/qubes-issues#3128
This commit is contained in:
Marek Marczykowski-Górecki 2017-09-26 14:55:45 +02:00
parent a90dea34de
commit 9e8c40867b
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 113 additions and 22 deletions

View File

@ -35,6 +35,7 @@ import time
import traceback
import uuid
import itertools
import lxml.etree
import jinja2
@ -857,6 +858,8 @@ class Qubes(qubes.PropertyHolder):
return element
def __str__(self):
return type(self).__name__
def save(self, lock=True):
'''Save all data to qubes.xml
@ -1186,14 +1189,17 @@ class Qubes(qubes.PropertyHolder):
@qubes.events.handler('domain-pre-delete')
def on_domain_pre_deleted(self, event, vm):
# pylint: disable=unused-argument
if isinstance(vm, qubes.vm.templatevm.TemplateVM):
appvms = self.domains.get_vms_based_on(vm)
if appvms:
raise qubes.exc.QubesException(
'Cannot remove template that has dependent AppVMs. '
'Affected are: {}'.format(', '.join(
appvm.name for appvm in sorted(appvms))))
for obj in itertools.chain(self.domains, (self,)):
for prop in obj.property_list():
try:
if isinstance(prop, qubes.vm.VMProperty) and \
getattr(obj, prop.__name__) == vm:
self.log.error(
'Cannot remove %s, used by %s.%s',
vm, obj, prop.__name__)
raise qubes.exc.QubesVMInUseError(vm)
except AttributeError:
pass
@qubes.events.handler('domain-delete')
def on_domain_deleted(self, event, vm):

View File

@ -42,6 +42,11 @@ class QubesVMError(QubesException):
super(QubesVMError, self).__init__(msg)
self.vm = vm
class QubesVMInUseError(QubesVMError):
'''VM is in use, cannot remove.'''
def __init__(self, vm, msg=None):
super(QubesVMInUseError, self).__init__(vm,
msg or 'Domain is in use: {!r}'.format(vm.name))
class QubesVMNotStartedError(QubesVMError):
'''Domain is not started.

View File

@ -271,13 +271,13 @@ class TC_30_VMCollection(qubes.tests.QubesTestCase):
# pass
class TC_90_Qubes(qubes.tests.QubesTestCase):
class TC_89_QubesEmpty(qubes.tests.QubesTestCase):
def tearDown(self):
try:
os.unlink('/tmp/qubestest.xml')
except:
pass
super(TC_90_Qubes, self).tearDown()
super().tearDown()
@qubes.tests.skipUnlessDom0
def test_000_init_empty(self):
@ -288,25 +288,105 @@ class TC_90_Qubes(qubes.tests.QubesTestCase):
pass
qubes.Qubes.create_empty_store('/tmp/qubestest.xml').close()
def test_100_clockvm(self):
app = qubes.Qubes('/tmp/qubestest.xml', load=False, offline_mode=True)
app.load_initial_values()
template = app.add_new_vm('TemplateVM', name='test-template',
class TC_90_Qubes(qubes.tests.QubesTestCase):
def tearDown(self):
try:
os.unlink('/tmp/qubestest.xml')
except:
pass
super().tearDown()
def setUp(self):
super(TC_90_Qubes, self).setUp()
self.app = qubes.Qubes('/tmp/qubestest.xml', load=False,
offline_mode=True)
self.addCleanup(self.cleanup_qubes)
self.app.load_initial_values()
self.template = self.app.add_new_vm('TemplateVM', name='test-template',
label='green')
appvm = app.add_new_vm('AppVM', name='test-vm', template=template,
def cleanup_qubes(self):
self.app.close()
del self.app
try:
del self.template
except AttributeError:
pass
def test_100_clockvm(self):
appvm = self.app.add_new_vm('AppVM', name='test-vm', template=self.template,
label='red')
self.assertIsNone(app.clockvm)
self.assertIsNone(self.app.clockvm)
self.assertNotIn('service.clocksync', appvm.features)
self.assertNotIn('service.clocksync', template.features)
app.clockvm = appvm
self.assertNotIn('service.clocksync', self.template.features)
self.app.clockvm = appvm
self.assertIn('service.clocksync', appvm.features)
self.assertTrue(appvm.features['service.clocksync'])
app.clockvm = template
self.app.clockvm = self.template
self.assertNotIn('service.clocksync', appvm.features)
self.assertIn('service.clocksync', template.features)
self.assertTrue(template.features['service.clocksync'])
app.close()
self.assertIn('service.clocksync', self.template.features)
self.assertTrue(self.template.features['service.clocksync'])
def test_200_remove_template(self):
appvm = self.app.add_new_vm('AppVM', name='test-vm',
template=self.template,
label='red')
with mock.patch.object(self.app, 'vmm'):
with self.assertRaises(qubes.exc.QubesException):
del self.app.domains[self.template]
def test_201_remove_netvm(self):
netvm = self.app.add_new_vm('AppVM', name='test-netvm',
template=self.template, provides_network=True,
label='red')
appvm = self.app.add_new_vm('AppVM', name='test-vm',
template=self.template,
label='red')
appvm.netvm = netvm
with mock.patch.object(self.app, 'vmm'):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[netvm]
def test_202_remove_default_netvm(self):
netvm = self.app.add_new_vm('AppVM', name='test-netvm',
template=self.template, provides_network=True,
label='red')
self.app.default_netvm = netvm
with mock.patch.object(self.app, 'vmm'):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[netvm]
def test_203_remove_default_dispvm(self):
appvm = self.app.add_new_vm('AppVM', name='test-appvm',
template=self.template,
label='red')
self.app.default_dispvm = appvm
with mock.patch.object(self.app, 'vmm'):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[appvm]
def test_204_remove_appvm_dispvm(self):
dispvm = self.app.add_new_vm('AppVM', name='test-appvm',
template=self.template,
label='red')
appvm = self.app.add_new_vm('AppVM', name='test-appvm2',
template=self.template, default_dispvm=dispvm,
label='red')
with mock.patch.object(self.app, 'vmm'):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[dispvm]
def test_205_remove_appvm_dispvm(self):
appvm = self.app.add_new_vm('AppVM', name='test-appvm',
template=self.template, template_for_dispvms=True,
label='red')
dispvm = self.app.add_new_vm('DispVM', name='test-dispvm',
template=appvm,
label='red')
with mock.patch.object(self.app, 'vmm'):
with self.assertRaises(qubes.exc.QubesVMInUseError):
del self.app.domains[appvm]
@qubes.tests.skipUnlessGit
def test_900_example_xml_in_doc(self):