api/admin: add API for changing revisions_to_keep dynamically

This one pool/volume property makes sense to change dynamically. There
may be more such properties, but lets be on the safe side and take
whitelist approach - allow only selected (just one for now), instead of
blacklisting any harmful ones.

QubesOS/qubes-issues#3256
This commit is contained in:
Marek Marczykowski-Górecki 2017-11-07 02:34:43 +01:00
parent 81f455e15d
commit c3afdde3ef
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
3 changed files with 106 additions and 0 deletions

View File

@ -23,11 +23,13 @@ ADMIN_API_METHODS_SIMPLE = \
admin.pool.List \ admin.pool.List \
admin.pool.ListDrivers \ admin.pool.ListDrivers \
admin.pool.Remove \ admin.pool.Remove \
admin.pool.Set.revisions_to_keep \
admin.pool.volume.Info \ admin.pool.volume.Info \
admin.pool.volume.List \ admin.pool.volume.List \
admin.pool.volume.ListSnapshots \ admin.pool.volume.ListSnapshots \
admin.pool.volume.Resize \ admin.pool.volume.Resize \
admin.pool.volume.Revert \ admin.pool.volume.Revert \
admin.pool.volume.Set.revisions_to_keep \
admin.pool.volume.Snapshot \ admin.pool.volume.Snapshot \
admin.property.Get \ admin.property.Get \
admin.property.GetDefault \ admin.property.GetDefault \
@ -96,6 +98,7 @@ ADMIN_API_METHODS_SIMPLE = \
admin.vm.volume.ListSnapshots \ admin.vm.volume.ListSnapshots \
admin.vm.volume.Resize \ admin.vm.volume.Resize \
admin.vm.volume.Revert \ admin.vm.volume.Revert \
admin.vm.volume.Set.revisions_to_keep \
admin.vm.Stats \ admin.vm.Stats \
$(null) $(null)

View File

@ -476,6 +476,25 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
return '{} {}'.format(size, path) return '{} {}'.format(size, path)
@qubes.api.method('admin.vm.volume.Set.revisions_to_keep',
scope='local', write=True)
@asyncio.coroutine
def vm_volume_set_revisions_to_keep(self, untrusted_payload):
assert self.arg in self.dest.volumes.keys()
try:
untrusted_value = int(untrusted_payload.decode('ascii'))
except (UnicodeDecodeError, ValueError):
raise qubes.api.ProtocolError('Invalid value')
del untrusted_payload
assert untrusted_value >= 0
newvalue = untrusted_value
del untrusted_value
self.fire_event_for_permission(newvalue=newvalue)
self.dest.volumes[self.arg].revisions_to_keep = newvalue
self.app.save()
@qubes.api.method('admin.vm.tag.List', no_payload=True, @qubes.api.method('admin.vm.tag.List', no_payload=True,
scope='local', read=True) scope='local', read=True)
@asyncio.coroutine @asyncio.coroutine
@ -621,6 +640,27 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
self.app.remove_pool(self.arg) self.app.remove_pool(self.arg)
self.app.save() self.app.save()
@qubes.api.method('admin.pool.Set.revisions_to_keep',
scope='global', write=True)
@asyncio.coroutine
def pool_set_revisions_to_keep(self, untrusted_payload):
assert self.dest.name == 'dom0'
assert self.arg in self.app.pools.keys()
pool = self.app.pools[self.arg]
try:
untrusted_value = int(untrusted_payload.decode('ascii'))
except (UnicodeDecodeError, ValueError):
raise qubes.api.ProtocolError('Invalid value')
del untrusted_payload
assert untrusted_value >= 0
newvalue = untrusted_value
del untrusted_value
self.fire_event_for_permission(newvalue=newvalue)
pool.revisions_to_keep = newvalue
self.app.save()
@qubes.api.method('admin.label.List', no_payload=True, @qubes.api.method('admin.label.List', no_payload=True,
scope='global', read=True) scope='global', read=True)
@asyncio.coroutine @asyncio.coroutine

View File

@ -2275,6 +2275,69 @@ class TC_00_VMs(AdminAPITestCase):
self.assertNotIn(dev, self.vm.devices['testclass'].persistent()) self.assertNotIn(dev, self.vm.devices['testclass'].persistent())
self.assertFalse(self.app.save.called) self.assertFalse(self.app.save.called)
def test_660_pool_set_revisions_to_keep(self):
self.app.pools['test-pool'] = unittest.mock.Mock()
value = self.call_mgmt_func(b'admin.pool.Set.revisions_to_keep',
b'dom0', b'test-pool', b'2')
self.assertIsNone(value)
self.assertEqual(self.app.pools['test-pool'].mock_calls, [])
self.assertEqual(self.app.pools['test-pool'].revisions_to_keep, 2)
self.app.save.assert_called_once_with()
def test_661_pool_set_revisions_to_keep_negative(self):
self.app.pools['test-pool'] = unittest.mock.Mock()
with self.assertRaises(AssertionError):
self.call_mgmt_func(b'admin.pool.Set.revisions_to_keep',
b'dom0', b'test-pool', b'-2')
self.assertEqual(self.app.pools['test-pool'].mock_calls, [])
self.assertFalse(self.app.save.called)
def test_662_pool_set_revisions_to_keep_not_a_number(self):
self.app.pools['test-pool'] = unittest.mock.Mock()
with self.assertRaises(AssertionError):
self.call_mgmt_func(b'admin.pool.Set.revisions_to_keep',
b'dom0', b'test-pool', b'abc')
self.assertEqual(self.app.pools['test-pool'].mock_calls, [])
self.assertFalse(self.app.save.called)
def test_670_vm_volume_set_revisions_to_keep(self):
self.vm.volumes = unittest.mock.MagicMock()
volumes_conf = {
'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
}
self.vm.volumes.configure_mock(**volumes_conf)
self.vm.storage = unittest.mock.Mock()
value = self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
b'test-vm1', b'private', b'2')
self.assertIsNone(value)
self.assertEqual(self.vm.volumes.mock_calls,
[unittest.mock.call.keys(),
('__getitem__', ('private',), {})])
self.assertEqual(self.vm.volumes['private'].revisions_to_keep, 2)
self.app.save.assert_called_once_with()
def test_671_vm_volume_set_revisions_to_keep_negative(self):
self.vm.volumes = unittest.mock.MagicMock()
volumes_conf = {
'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
}
self.vm.volumes.configure_mock(**volumes_conf)
self.vm.storage = unittest.mock.Mock()
with self.assertRaises(AssertionError):
self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
b'test-vm1', b'private', b'-2')
def test_672_vm_volume_set_revisions_to_keep_not_a_number(self):
self.vm.volumes = unittest.mock.MagicMock()
volumes_conf = {
'keys.return_value': ['root', 'private', 'volatile', 'kernel'],
}
self.vm.volumes.configure_mock(**volumes_conf)
self.vm.storage = unittest.mock.Mock()
with self.assertRaises(AssertionError):
self.call_mgmt_func(b'admin.vm.volume.Set.revisions_to_keep',
b'test-vm1', b'private', b'abc')
def test_990_vm_unexpected_payload(self): def test_990_vm_unexpected_payload(self):
methods_with_no_payload = [ methods_with_no_payload = [
b'admin.vm.List', b'admin.vm.List',