admin: implement admin.vm.tag.*

QubesOS/qubes-issues#2622
This commit is contained in:
Marek Marczykowski-Górecki 2017-06-19 19:57:15 +02:00
parent 4a1a5fc24b
commit 9242202db2
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 104 additions and 0 deletions

View File

@ -354,6 +354,49 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
return '{} {}'.format(size, path)
@qubes.api.method('admin.vm.tag.List', no_payload=True)
@asyncio.coroutine
def vm_tag_list(self):
assert not self.arg
tags = self.dest.tags
tags = self.fire_event_for_filter(tags)
return ''.join('{}\n'.format(tag) for tag in sorted(tags))
@qubes.api.method('admin.vm.tag.Get', no_payload=True)
@asyncio.coroutine
def vm_tag_get(self):
qubes.vm.Tags.validate_tag(self.arg)
self.fire_event_for_permission()
return '1' if self.arg in self.dest.tags else '0'
@qubes.api.method('admin.vm.tag.Set', no_payload=True)
@asyncio.coroutine
def vm_tag_set(self):
qubes.vm.Tags.validate_tag(self.arg)
self.fire_event_for_permission()
self.dest.tags.add(self.arg)
self.app.save()
@qubes.api.method('admin.vm.tag.Remove', no_payload=True)
@asyncio.coroutine
def vm_tag_remove(self):
qubes.vm.Tags.validate_tag(self.arg)
self.fire_event_for_permission()
try:
self.dest.tags.remove(self.arg)
except KeyError:
raise qubes.exc.QubesTagNotFoundError(self.dest, self.arg)
self.app.save()
@qubes.api.method('admin.pool.List', no_payload=True)
@asyncio.coroutine
def pool_list(self):

View File

@ -162,3 +162,12 @@ class QubesFeatureNotFoundError(QubesException, KeyError):
'Feature not set for domain {}: {}'.format(domain, feature))
self.feature = feature
self.vm = domain
class QubesTagNotFoundError(QubesException, KeyError):
'''Tag not set for a given domain'''
def __init__(self, domain, tag):
super().__init__('Tag not set for domain {}: {}'.format(
domain, tag))
self.vm = domain
self.tag = tag

View File

@ -1687,6 +1687,53 @@ class TC_00_VMs(AdminAPITestCase):
self.assertEqual(func_mock.mock_calls, [])
self.assertFalse(self.app.save.called)
def test_530_tag_list(self):
self.vm.tags.add('tag1')
self.vm.tags.add('tag2')
value = self.call_mgmt_func(b'admin.vm.tag.List', b'test-vm1')
self.assertEqual(value, 'tag1\ntag2\n')
self.assertFalse(self.app.save.called)
def test_540_tag_get(self):
self.vm.tags.add('tag1')
value = self.call_mgmt_func(b'admin.vm.tag.Get', b'test-vm1',
b'tag1')
self.assertEqual(value, '1')
self.assertFalse(self.app.save.called)
def test_541_tag_get_absent(self):
value = self.call_mgmt_func(b'admin.vm.tag.Get', b'test-vm1', b'tag1')
self.assertEqual(value, '0')
self.assertFalse(self.app.save.called)
def test_550_tag_remove(self):
self.vm.tags.add('tag1')
value = self.call_mgmt_func(b'admin.vm.tag.Remove', b'test-vm1',
b'tag1')
self.assertIsNone(value, None)
self.assertNotIn('tag1', self.vm.tags)
self.assertTrue(self.app.save.called)
def test_551_tag_remove_absent(self):
with self.assertRaises(qubes.exc.QubesTagNotFoundError):
self.call_mgmt_func(b'admin.vm.tag.Remove',
b'test-vm1', b'tag1')
self.assertFalse(self.app.save.called)
def test_560_tag_set(self):
value = self.call_mgmt_func(b'admin.vm.tag.Set',
b'test-vm1', b'tag1')
self.assertIsNone(value)
self.assertIn('tag1', self.vm.tags)
self.assertTrue(self.app.save.called)
def test_561_tag_set_invalid(self):
with self.assertRaises(AssertionError):
self.call_mgmt_func(b'admin.vm.tag.Set',
b'test-vm1', b'+.some-tag')
self.assertNotIn('+.some-tag', self.vm.tags)
self.assertFalse(self.app.save.called)
def test_990_vm_unexpected_payload(self):
methods_with_no_payload = [
b'admin.vm.List',

View File

@ -237,6 +237,11 @@ class Tags(set):
# end of overriding
#
@staticmethod
def validate_tag(tag):
safe_set = string.ascii_letters + string.digits + '-_'
assert all((x in safe_set) for x in tag)
class BaseVM(qubes.PropertyHolder):
'''Base class for all VMs