rpc-window: adjust for qubespolicy API

- drop qid usage - it isn't really needed, especially for to-be-created
  DispVMs
- use "domains_info" dict as input, instead of loading qubes.xml
  directly
- nicely format "Disposable VM" entries
- simplify whitelist/blacklist handling - since qrexecpolicy always
  provide a list of allowed choices, use just that

Important note: there are two names concepts:
1. Display name - name of VM, or in case of to-be-created DispVMs - a
string "Disposable VM (name-of-base-vm)"
2. API name - as in qrexec policy - $dispvm:name-of-base-vm for new
DispVMs

Externally at API level (allowed targets list, return value), API name
is used, but internally VMListModeler._entries is still indexed with
display names. This is done for more efficient (and readable) GUI
handling - because most of the time it's searched for what user have
entered.

QubesOS/qubes-issues#910
This commit is contained in:
Marek Marczykowski-Górecki 2017-03-24 02:49:02 +01:00
parent b1dbc0647f
commit 6c3410377d
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 113 additions and 211 deletions

View File

@ -20,10 +20,10 @@
#
import gi
import os
import itertools
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, Gdk, GdkPixbuf, GObject, GLib
import qubes
from qubespolicy.utils import sanitize_domain_name
@ -46,63 +46,61 @@ class GtkIconGetter:
class VMListModeler:
def __init__(self):
def __init__(self, domains_info=None):
self._icon_getter = GtkIconGetter(16)
self._entries = {}
self._domains_info = domains_info
self._create_entries()
def _get_icon(self, vm):
return self._icon_getter.get_icon(vm.label.icon)
def _get_list(self):
collection = qubes.QubesVmCollection()
try:
collection.lock_db_for_reading()
collection.load()
return [vm for vm in collection.values()]
finally:
collection.unlock_db()
def _create_entries(self):
for vm in self._get_list():
sanitize_domain_name(vm.name, assert_sanitized=True)
for name, vm in self._domains_info.items():
if name.startswith('$dispvm:'):
vm_name = name[len('$dispvm:'):]
dispvm = True
else:
vm_name = name
dispvm = False
sanitize_domain_name(vm_name, assert_sanitized=True)
icon = self._get_icon(vm)
icon = self._icon_getter.get_icon(vm.get('icon', None))
self._entries[vm.name] = {'qid': vm.qid,
'icon': icon,
'vm': vm}
if dispvm:
display_name = 'Disposable VM ({})'.format(vm_name)
else:
display_name = vm_name
self._entries[display_name] = {
'api_name': name,
'icon': icon,
'vm': vm}
def _get_valid_qube_name(self, combo, entry_box, exclusions):
def _get_valid_qube_name(self, combo, entry_box, whitelist):
name = None
if combo and combo.get_active_id():
selected = combo.get_active_id()
if selected in self._entries and selected not in exclusions:
if selected in self._entries and \
self._entries[selected]['api_name'] in whitelist:
name = selected
if not name and entry_box:
typed = entry_box.get_text()
if typed in self._entries and typed not in exclusions:
if typed in self._entries and \
self._entries[typed]['api_name'] in whitelist:
name = typed
return name
def _combo_change(self, selection_trigger, combo, entry_box, exclusions):
def _combo_change(self, selection_trigger, combo, entry_box, whitelist):
data = None
name = self._get_valid_qube_name(combo, entry_box, exclusions)
name = self._get_valid_qube_name(combo, entry_box, whitelist)
if name:
entry = self._entries[name]
data = (entry['qid'], name)
data = entry['api_name']
if entry_box:
entry_box.set_icon_from_pixbuf(
@ -115,33 +113,22 @@ class VMListModeler:
if selection_trigger:
selection_trigger(data)
def _entry_activate(self, activation_trigger, combo, entry_box, exclusions):
name = self._get_valid_qube_name(combo, entry_box, exclusions)
def _entry_activate(self, activation_trigger, combo, entry_box, whitelist):
name = self._get_valid_qube_name(combo, entry_box, whitelist)
if name:
activation_trigger(entry_box)
def apply_model(self, destination_object, vm_filter_list=None,
def apply_model(self, destination_object, vm_list,
selection_trigger=None, activation_trigger=None):
if isinstance(destination_object, Gtk.ComboBox):
list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf)
exclusions = []
for vm_name in sorted(self._entries.keys()):
entry = self._entries[vm_name]
matches = True
if vm_filter_list:
for vm_filter in vm_filter_list:
if not vm_filter.matches(entry['vm']):
matches = False
break
if matches:
list_store.append([entry['qid'], vm_name, entry['icon']])
else:
exclusions += [vm_name]
for entry_no, display_name in zip(itertools.count(),
sorted(self._entries.keys())):
entry = self._entries[display_name]
if entry['api_name'] in vm_list:
list_store.append([entry_no, display_name, entry['icon']])
destination_object.set_model(list_store)
destination_object.set_id_column(1)
@ -173,7 +160,7 @@ class VMListModeler:
activation_trigger,
destination_object,
entry,
exclusions))
vm_list))
# A Combo with an entry has a text column already
text_column = destination_object.get_cells()[0]
@ -189,7 +176,7 @@ class VMListModeler:
selection_trigger,
combo,
entry_box,
exclusions)
vm_list)
destination_object.connect("changed", changed_function)
changed_function(destination_object)
@ -210,20 +197,6 @@ class VMListModeler:
raise TypeError(
"Only expecting Gtk.Entry objects to want our icon.")
class NameBlacklistFilter:
def __init__(self, avoid_names_list):
self._avoid_names_list = avoid_names_list
def matches(self, vm):
return vm.name not in self._avoid_names_list
class NameWhitelistFilter:
def __init__(self, allowed_names_list):
self._allowed_names_list = allowed_names_list
def matches(self, vm):
return vm.name in self._allowed_names_list
class GtkOneTimerHelper:
def __init__(self, wait_seconds):

View File

@ -68,9 +68,8 @@ class RPCConfirmationWindow:
valid = (data is not None)
if valid:
(self._target_qid, self._target_name) = data
self._target_name = data
else:
self._target_qid = None
self._target_name = None
self._focus_helper.request_sensitivity(valid)
@ -131,7 +130,8 @@ class RPCConfirmationWindow:
self._error_bar.connect("response", self._close_error)
def __init__(self, source, rpc_operation, name_whitelist, target=None):
def __init__(self, entries_info, source, rpc_operation, targets_list,
target=None):
sanitize_domain_name(source, assert_sanitized=True)
sanitize_service_name(source, assert_sanitized=True)
@ -153,7 +153,6 @@ class RPCConfirmationWindow:
self._source_id['error_bar'])
self._error_message = self._gtk_builder.get_object(
self._source_id['error_message'])
self._target_qid = None
self._target_name = None
self._focus_helper = self._new_focus_stealing_helper()
@ -161,11 +160,10 @@ class RPCConfirmationWindow:
self._rpc_label.set_markup(
self._escape_and_format_rpc_text(rpc_operation))
self._entries_info = entries_info
list_modeler = self._new_VM_list_modeler()
domain_filters = [VMListModeler.NameWhitelistFilter(name_whitelist)]
list_modeler.apply_model(self._rpc_combo_box, domain_filters,
list_modeler.apply_model(self._rpc_combo_box, targets_list,
selection_trigger=self._update_ok_button_sensitivity,
activation_trigger=self._clicked_ok)
@ -189,7 +187,7 @@ class RPCConfirmationWindow:
Gtk.main()
def _new_VM_list_modeler(self):
return VMListModeler()
return VMListModeler(self._entries_info)
def _new_focus_stealing_helper(self):
return FocusStealingHelper(
@ -201,15 +199,14 @@ class RPCConfirmationWindow:
self._show()
if self._confirmed:
return {'name': self._target_name, 'qid': self._target_qid,
'parameters': {}}
return self._target_name
else:
return False
def confirm_rpc(source, rpc_operation, name_whitelist, target=None):
window = RPCConfirmationWindow(source, rpc_operation, name_whitelist,
target)
def confirm_rpc(entries_info, source, rpc_operation, targets_list, target=None):
window = RPCConfirmationWindow(entries_info, source, rpc_operation,
targets_list, target)
return window.confirm_rpc()

View File

@ -27,43 +27,18 @@ from gi.repository import Gtk
from qubespolicy.gtkhelpers import VMListModeler, GtkOneTimerHelper, \
FocusStealingHelper
mock_domains_info = {
'dom0': {'icon': 'black', 'type': 'AdminVM'},
'test-red1': {'icon': 'red', 'type': 'AppVM'},
'test-red2': {'icon': 'red', 'type': 'AppVM'},
'test-red3': {'icon': 'red', 'type': 'AppVM'},
'test-source': {'icon': 'green', 'type': 'AppVM'},
'test-target': {'icon': 'orange', 'type': 'AppVM'},
'$dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'},
}
class VMListModelerMock(VMListModeler):
def __init__(self):
VMListModeler.__init__(self)
def _get_list(self):
return [
MockVm(0, "dom0", "black"),
MockVm(2, "test-red1", "red"),
MockVm(4, "test-red2", "red"),
MockVm(7, "test-red3", "red"),
MockVm(8, "test-source", "green"),
MockVm(10, "test-target", "orange"),
MockVm(15, "test-disp6", "red", True)
]
@staticmethod
def get_name_whitelist():
return ["test-red1", "test-red2", "test-red3",
"test-target", "test-disp6"]
class MockVmLabel:
def __init__(self, index, color, name, dispvm=False):
self.index = index
self.color = color
self.name = name
self.dispvm = dispvm
self.icon = "edit-find"
class MockVm:
def __init__(self, qid, name, color, dispvm=False):
self.qid = qid
self.name = name
self.label = MockVmLabel(qid, 0x000000, color, dispvm)
mock_whitelist = ["test-red1", "test-red2", "test-red3",
"test-target", "$dispvm:test-disp6"]
class MockComboEntry:
def __init__(self, text):
@ -104,53 +79,60 @@ class GtkTestCase(unittest.TestCase):
return iterations, time_length
class VMListModelerTest(VMListModelerMock, unittest.TestCase):
class VMListModelerTest(VMListModeler, unittest.TestCase):
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
VMListModelerMock.__init__(self)
VMListModeler.__init__(self, mock_domains_info)
def test_entries_gets_loaded(self):
self.assertIsNotNone(self._entries)
def test_valid_qube_name(self):
self.apply_model(Gtk.ComboBox())
self.apply_model(Gtk.ComboBox(), list(mock_domains_info.keys()))
for name in ["test-red1", "test-red2", "test-red3",
"test-target", "test-disp6"]:
"test-target", "Disposable VM (test-disp6)"]:
mock = MockComboEntry(name)
self.assertEquals(name, self._get_valid_qube_name(mock, mock, []))
self.assertEquals(name, self._get_valid_qube_name(None, mock, []))
self.assertEquals(name, self._get_valid_qube_name(mock, None, []))
self.assertIsNone(self._get_valid_qube_name(None, None, []))
self.assertEquals(name,
self._get_valid_qube_name(mock, mock, mock_whitelist))
self.assertEquals(name,
self._get_valid_qube_name(None, mock, mock_whitelist))
self.assertEquals(name,
self._get_valid_qube_name(mock, None, mock_whitelist))
self.assertIsNone(
self._get_valid_qube_name(None, None, mock_whitelist))
def test_valid_qube_name_exceptions(self):
list_exc = ["test-disp6", "test-red2"]
def test_valid_qube_name_whitelist(self):
list_exc = ["$dispvm:test-disp6", "test-red2"]
self.apply_model(Gtk.ComboBox(),
[VMListModeler.NameBlacklistFilter([list_exc[0], list_exc[1]])])
whitelist = [name for name in mock_whitelist if name not in list_exc]
self.apply_model(Gtk.ComboBox(), whitelist)
for name in list_exc:
mock = MockComboEntry(name)
self.assertIsNone(self._get_valid_qube_name(mock, mock, list_exc))
self.assertIsNone(self._get_valid_qube_name(None, mock, list_exc))
self.assertIsNone(self._get_valid_qube_name(mock, None, list_exc))
self.assertIsNone(self._get_valid_qube_name(mock, mock, whitelist))
self.assertIsNone(self._get_valid_qube_name(None, mock, whitelist))
self.assertIsNone(self._get_valid_qube_name(mock, None, whitelist))
def test_invalid_qube_name(self):
self.apply_model(Gtk.ComboBox())
self.apply_model(Gtk.ComboBox(), mock_whitelist)
for name in ["test-nonexistant", None, "", 1]:
mock = MockComboEntry(name)
self.assertIsNone(self._get_valid_qube_name(mock, mock, []))
self.assertIsNone(self._get_valid_qube_name(None, mock, []))
self.assertIsNone(self._get_valid_qube_name(mock, None, []))
self.assertIsNone(
self._get_valid_qube_name(mock, mock, mock_whitelist))
self.assertIsNone(
self._get_valid_qube_name(None, mock, mock_whitelist))
self.assertIsNone(
self._get_valid_qube_name(mock, None, mock_whitelist))
def test_apply_model(self):
new_object = Gtk.ComboBox()
self.assertIsNone(new_object.get_model())
self.apply_model(new_object)
self.apply_model(new_object, mock_whitelist)
self.assertIsNotNone(new_object.get_model())
@ -159,82 +141,39 @@ class VMListModelerTest(VMListModelerMock, unittest.TestCase):
self.assertIsNone(new_object.get_model())
self.apply_model(new_object)
self.apply_model(new_object, [])
self.assertIsNotNone(new_object.get_model())
def test_apply_model_only_combobox(self):
invalid_types = [1, "One", u'1', {'1': "one"}, VMListModelerMock()]
invalid_types = [1, "One", u'1', {'1': "one"}, VMListModeler(
mock_domains_info)]
for invalid_type in invalid_types:
with self.assertRaises(TypeError):
self.apply_model(invalid_type)
def test_apply_model_blacklist(self):
combo = Gtk.ComboBox()
self.apply_model(combo)
self.assertEquals(7, len(combo.get_model()))
names = list(self._entries.keys())
self.apply_model(combo, [
VMListModeler.NameBlacklistFilter([names[0]])])
self.assertEquals(6, len(combo.get_model()))
self.apply_model(combo, [
VMListModeler.NameBlacklistFilter([names[0]]),
VMListModeler.NameBlacklistFilter([names[1]])])
self.assertEquals(5, len(combo.get_model()))
self.apply_model(combo, [VMListModeler.NameBlacklistFilter([
names[0],
names[1]
])])
self.assertEquals(5, len(combo.get_model()))
self.apply_model(invalid_type, [])
def test_apply_model_whitelist(self):
combo = Gtk.ComboBox()
self.apply_model(combo)
self.apply_model(combo, list(mock_domains_info.keys()))
self.assertEquals(7, len(combo.get_model()))
names = list(self._entries.keys())
names = [entry['api_name'] for entry in self._entries.values()]
self.apply_model(combo, [
VMListModeler.NameWhitelistFilter([names[0]])])
self.apply_model(combo, [names[0]])
self.assertEquals(1, len(combo.get_model()))
self.apply_model(combo, [VMListModeler.NameWhitelistFilter([
names[0],
names[1]])])
self.apply_model(combo, [names[0], names[1]])
self.assertEquals(2, len(combo.get_model()))
def test_apply_model_multiple_filters(self):
combo = Gtk.ComboBox()
self.apply_model(combo)
self.assertEquals(7, len(combo.get_model()))
names = list(self._entries.keys())
self.apply_model(combo, [VMListModeler.NameWhitelistFilter([
names[0],
names[1],
names[2],
names[3],
names[4]]),
VMListModeler.NameBlacklistFilter([
names[0],
names[1]])])
self.assertEquals(3, len(combo.get_model()))
def test_apply_icon(self):
new_object = Gtk.Entry()
self.assertIsNone(
new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY))
self.apply_icon(new_object, "test-disp6")
self.apply_icon(new_object, "Disposable VM (test-disp6)")
self.assertIsNotNone(
new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY))
@ -250,7 +189,7 @@ class VMListModelerTest(VMListModelerMock, unittest.TestCase):
new_object = Gtk.Entry()
for name in ["test-red1", "test-red2", "test-red3",
"test-target", "test-disp6"]:
"test-target", "Disposable VM (test-disp6)"]:
self.apply_icon(new_object, name)
for name in ["test-nonexistant", None, "", 1]:

View File

@ -22,15 +22,16 @@
import sys
import unittest
from qubespolicy.tests.gtkhelpers import VMListModelerMock, GtkTestCase, \
FocusStealingHelperMock
from qubespolicy.tests.gtkhelpers import GtkTestCase, FocusStealingHelperMock
from qubespolicy.tests.gtkhelpers import mock_domains_info, mock_whitelist
from qubespolicy.gtkhelpers import VMListModeler
from qubespolicy.rpcconfirmation import RPCConfirmationWindow
class MockRPCConfirmationWindow(RPCConfirmationWindow):
def _new_VM_list_modeler(self):
return VMListModelerMock()
return VMListModeler(mock_domains_info)
def _new_focus_stealing_helper(self):
return FocusStealingHelperMock(
@ -38,13 +39,13 @@ class MockRPCConfirmationWindow(RPCConfirmationWindow):
self._rpc_ok_button,
self._focus_stealing_seconds)
def __init__(self, source, rpc_operation,
name_whitelist=VMListModelerMock.get_name_whitelist(),
def __init__(self, source, rpc_operation, whitelist,
target=None, focus_stealing_seconds=1):
self._focus_stealing_seconds = focus_stealing_seconds
RPCConfirmationWindow.__init__(self, source, rpc_operation,
name_whitelist, target)
RPCConfirmationWindow.__init__(
self, mock_domains_info, source, rpc_operation, whitelist,
target)
def is_error_visible(self):
return self._error_bar.get_visible()
@ -66,8 +67,7 @@ class MockRPCConfirmationWindow(RPCConfirmationWindow):
class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
def __init__(self, test_method, source_name="test-source",
rpc_operation="test.Operation",
name_whitelist=VMListModelerMock.get_name_whitelist(),
rpc_operation="test.Operation", whitelist=mock_whitelist,
target_name=None):
GtkTestCase.__init__(self, test_method)
self.test_source_name = source_name
@ -85,7 +85,7 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
MockRPCConfirmationWindow.__init__(self,
self.test_source_name,
self.test_rpc_operation,
name_whitelist,
whitelist,
self.test_target_name,
focus_stealing_seconds=self._test_time)
@ -167,7 +167,6 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
self.assertTrue(self.test_clicked_ok)
self.assertFalse(self.test_clicked_cancel)
self.assertTrue(self._confirmed)
self.assertIsNotNone(self._target_qid)
self.assertIsNotNone(self._target_name)
elif click_type == "cancel":
self._rpc_cancel_button.clicked()
@ -215,7 +214,6 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
self.assertTrue(self._rpc_ok_button.get_sensitive())
self.assertIsNotNone(self._target_qid)
self.assertIsNotNone(self._target_name)
self.assertFalse(self.test_called_close)
@ -225,7 +223,6 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
self.assertFalse(self._confirmed)
def assert_initial_state(self, after_focus_timer):
self.assertIsNone(self._target_qid)
self.assertIsNone(self._target_name)
self.assertFalse(self.test_clicked_ok)
self.assertFalse(self.test_clicked_cancel)
@ -250,7 +247,6 @@ class RPCConfirmationWindowTestWithTarget(RPCConfirmationWindowTestBase):
self._lifecycle_click(click_type="ok")
def assert_initial_state(self, after_focus_timer):
self.assertIsNotNone(self._target_qid)
self.assertIsNotNone(self._target_name)
self.assertFalse(self.test_clicked_ok)
self.assertFalse(self.test_clicked_cancel)
@ -264,7 +260,6 @@ class RPCConfirmationWindowTestWithTarget(RPCConfirmationWindowTestBase):
def _lifecycle_click(self, click_type):
RPCConfirmationWindowTestBase._lifecycle_click(self, click_type)
self.assertIsNotNone(self._target_qid)
self.assertIsNotNone(self._target_name)
@ -283,7 +278,7 @@ class RPCConfirmationWindowTestWithTargetInvalid(unittest.TestCase):
def assert_raises_error(self, expect, source, target):
rpcWindow = MockRPCConfirmationWindow(source, "test.Operation",
target=target)
mock_whitelist, target=target)
self.assertEquals(expect, rpcWindow.is_error_visible())
@ -307,9 +302,10 @@ class RPCConfirmationWindowTestWhitelist(unittest.TestCase):
def test_all_allowed_domains(self):
self._assert_whitelist(
["test-red1", "test-red2", "test-red3",
"test-target", "test-disp6", "test-source", "dom0"],
"test-target", "$dispvm:test-disp6", "test-source", "dom0"],
["test-red1", "test-red2", "test-red3",
"test-target", "test-disp6", "test-source", "dom0"])
"test-target", "Disposable VM (test-disp6)", "test-source",
"dom0"])
def _assert_whitelist(self, whitelist, expected):
rpcWindow = MockRPCConfirmationWindow(
@ -317,10 +313,7 @@ class RPCConfirmationWindowTestWhitelist(unittest.TestCase):
domains = rpcWindow.get_shown_domains()
for domain in expected:
self.assertTrue(domain in domains)
self.assertEquals(len(expected), len(domains))
self.assertCountEqual(domains, expected)
if __name__ == '__main__':
test = False
@ -336,7 +329,7 @@ if __name__ == '__main__':
if window:
print(MockRPCConfirmationWindow("test-source",
"qubes.Filecopy",
VMListModelerMock.get_name_whitelist(),
mock_whitelist,
"test-red1").confirm_rpc())
elif test:
unittest.main(argv=[sys.argv[0]])