Prevent removing VM if it provides devices in persistent mode

Fixes QubesOS/qubes-issues#5136.
This commit is contained in:
Pawel Marczewski 2020-01-16 10:57:28 +01:00
parent 309dd11b1d
commit b09a137b26
No known key found for this signature in database
GPG Key ID: DE42EE9B14F96465
3 changed files with 38 additions and 1 deletions

View File

@ -1201,6 +1201,14 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
if not self.dest.is_halted():
raise qubes.exc.QubesVMNotHaltedError(self.dest)
assignments = self.dest.get_provided_assignments()
if assignments:
desc = ', '.join(
assignment.ident for assignment in assignments)
raise qubes.exc.QubesVMInUseError(self.dest,
'VM has devices attached persistently to other VMs: ' +
desc)
if self.dest.installed_by_rpm:
raise qubes.exc.QubesVMInUseError(self.dest,
"VM installed by package manager: " + self.dest.name)

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
@ -1747,6 +1747,22 @@ netvm default=True type=vm
self.assertFalse(mock_remove.called)
self.assertFalse(self.app.save.called)
@unittest.mock.patch('qubes.storage.Storage.remove')
@unittest.mock.patch('shutil.rmtree')
def test_502_vm_remove_attached(self, mock_rmtree, mock_remove):
self.setup_for_clone()
assignment = qubes.devices.DeviceAssignment(
self.vm, '1234', persistent=True)
self.loop.run_until_complete(
self.vm2.devices['testclass'].attach(assignment))
mock_remove.side_effect = self.dummy_coro
with self.assertRaises(qubes.exc.QubesVMInUseError):
self.call_mgmt_func(b'admin.vm.Remove', b'test-vm1')
self.assertFalse(mock_rmtree.called)
self.assertFalse(mock_remove.called)
self.assertFalse(self.app.save.called)
def test_510_vm_volume_import(self):
value = self.call_mgmt_func(b'admin.vm.volume.Import', b'test-vm1',
b'private')

View File

@ -289,6 +289,19 @@ class BaseVM(qubes.PropertyHolder):
# SEE:1815 firewall, policy.
def get_provided_assignments(self):
'''List of persistent device assignments from this VM.'''
assignments = []
for domain in self.app.domains:
if domain == self:
continue
for device_collection in domain.devices.values():
for assignment in device_collection.persistent():
if assignment.backend_domain == self:
assignments.append(assignment)
return assignments
def init_log(self):
'''Initialise logger for this domain.'''
self.log = qubes.log.get_vm_logger(self.name)