tags support

QubesOS/qubes-issues#2622
This commit is contained in:
Marek Marczykowski-Górecki 2017-06-19 01:55:49 +02:00
parent c07c57bfef
commit 31988a9bd8
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 184 additions and 0 deletions

View File

@ -116,6 +116,10 @@ class QubesFeatureNotFoundError(QubesException, KeyError):
'''Feature not set for a given domain'''
class QubesTagNotFoundError(QubesException, KeyError):
'''Tag not set for a given domain'''
class StoragePoolException(QubesException):
''' A general storage exception '''

66
qubesadmin/tags.py Normal file
View File

@ -0,0 +1,66 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
'''VM tags interface'''
class Tags(object):
'''Manager of the tags.
Tags are simple: tag either can be present on qube or not. Tag is a
simple string consisting of ASCII alphanumeric characters, plus `_` and
`-`.
'''
# pylint: disable=too-few-public-methods
def __init__(self, vm):
super(Tags, self).__init__()
self.vm = vm
def remove(self, elem):
'''Remove a tag'''
self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Remove', elem)
def add(self, elem):
'''Add a tag'''
self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Set', elem)
def update(self, *others):
'''Add tags from iterable(s)'''
for other in others:
for elem in other:
self.add(elem)
def discard(self, elem):
'''Remove a tag if present'''
try:
self.remove(elem)
except KeyError:
pass
def __iter__(self):
qubesd_response = self.vm.qubesd_call(self.vm.name,
'admin.vm.tag.List')
return iter(qubesd_response.decode('utf-8').splitlines())
def __contains__(self, elem):
'''Does the VM have a tag'''
response = self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Get', elem)
return response == b'1'

110
qubesadmin/tests/tags.py Normal file
View File

@ -0,0 +1,110 @@
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
import qubesadmin.tests
import qubesadmin.tags
class TC_00_Tags(qubesadmin.tests.QubesTestCase):
def setUp(self):
super(TC_00_Tags, self).setUp()
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\0test-vm class=AppVM state=Running\n' \
b'test-vm2 class=AppVM state=Running\n' \
b'test-vm3 class=AppVM state=Running\n'
self.vm = self.app.domains['test-vm']
self.tags = qubesadmin.tags.Tags(self.vm)
def test_000_list(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.List', None, None)] = \
b'0\0tag1\ntag2\n'
self.assertEqual(sorted(self.tags),
['tag1', 'tag2'])
self.assertAllCalled()
def test_010_get(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \
b'0\x001'
self.assertIn('tag1', self.tags)
self.assertAllCalled()
def test_011_get_missing(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \
b'0\x000'
self.assertNotIn('tag1', self.tags)
self.assertAllCalled()
def test_020_set(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
self.tags.add('tag1')
self.assertAllCalled()
def test_030_update(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
self.tags.update(['tag1', 'tag2'])
self.assertAllCalled()
def test_031_update_from_other(self):
self.app.expected_calls[
('test-vm2', 'admin.vm.tag.List', None, None)] = \
b'0\0tag3\ntag4\n'
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\0'
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Set', 'tag4', None)] = b'0\0'
self.tags.update(self.app.domains['test-vm2'].tags)
self.assertAllCalled()
def test_040_remove(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \
b'0\0'
self.tags.remove('tag1')
self.assertAllCalled()
def test_040_remove_missing(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \
b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \
b'tag1\0'
with self.assertRaises(KeyError):
self.tags.remove('tag1')
self.assertAllCalled()
def test_050_discard(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \
b'0\0'
self.tags.discard('tag1')
self.assertAllCalled()
def test_051_discard_missing(self):
self.app.expected_calls[
('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \
b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \
b'tag1\0'
self.tags.discard('tag1')
self.assertAllCalled()

View File

@ -27,6 +27,7 @@ import qubesadmin.storage
import qubesadmin.features
import qubesadmin.devices
import qubesadmin.firewall
import qubesadmin.tags
class QubesVM(qubesadmin.base.PropertyHolder):
@ -34,6 +35,8 @@ class QubesVM(qubesadmin.base.PropertyHolder):
log = None
tags = None
features = None
devices = None
@ -44,6 +47,7 @@ class QubesVM(qubesadmin.base.PropertyHolder):
super(QubesVM, self).__init__(app, 'admin.vm.property.', name)
self._volumes = None
self.log = logging.getLogger(name)
self.tags = qubesadmin.tags.Tags(self)
self.features = qubesadmin.features.Features(self)
self.devices = qubesadmin.devices.DeviceManager(self)
self.firewall = qubesadmin.firewall.Firewall(self)