From b3ceb2d7faf31ecbee82a3d3e86ff936ed7137fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Wed, 22 Mar 2017 15:14:28 +0100 Subject: [PATCH] Import new rpc confirmation window code Import unmodified implementation done by @boring-stuff. Full history for reference is available in rpc-confirmation-window branch. QubesOS/qubes-issues#910 --- qubespolicy/glade/RPCConfirmationWindow.glade | 359 ++++++++++++++ qubespolicy/gtkhelpers.py | 301 ++++++++++++ qubespolicy/rpcconfirmation.py | 210 +++++++++ qubespolicy/tests/gtkhelpers.py | 444 ++++++++++++++++++ qubespolicy/tests/rpcconfirmation.py | 334 +++++++++++++ qubespolicy/utils.py | 56 +++ 6 files changed, 1704 insertions(+) create mode 100644 qubespolicy/glade/RPCConfirmationWindow.glade create mode 100644 qubespolicy/gtkhelpers.py create mode 100644 qubespolicy/rpcconfirmation.py create mode 100755 qubespolicy/tests/gtkhelpers.py create mode 100755 qubespolicy/tests/rpcconfirmation.py create mode 100644 qubespolicy/utils.py diff --git a/qubespolicy/glade/RPCConfirmationWindow.glade b/qubespolicy/glade/RPCConfirmationWindow.glade new file mode 100644 index 00000000..0c0ccba5 --- /dev/null +++ b/qubespolicy/glade/RPCConfirmationWindow.glade @@ -0,0 +1,359 @@ + + + + + + 400 + False + Operation execution + center + dialog-question + dialog + True + center + + + True + False + vertical + + + True + False + True + error + True + + + False + 6 + end + + + + + + False + False + 0 + + + + + False + 16 + + + True + False + gtk-dialog-error + + + False + True + 0 + + + + + True + False + ErrorMessage + + + False + True + 1 + + + + + False + False + 0 + + + + + False + True + 0 + + + + + 100 + 80 + True + False + 12 + 12 + 12 + 12 + True + vertical + 6 + + + True + False + 6 + end + + + gtk-cancel + True + True + True + True + + + True + True + 0 + + + + + gtk-ok + True + False + True + True + True + True + + + True + True + 1 + + + + + False + True + end + 1 + + + + + True + False + vertical + 6 + + + True + False + 6 + 12 + + + True + False + gtk-dialog-question + 6 + + + False + True + 0 + + + + + True + False + start + Do you want to allow the following operation? +<small>Select the target domain and confirm with 'OK'</small> + True + + + True + True + 1 + + + + + False + True + 0 + + + + + True + False + 12 + 6 + 12 + 6 + + + True + False + 1 + Target: + + + 0 + 2 + + + + + True + False + True + True + + + True + 5 + False + Start typing or use the arrow + GTK_INPUT_HINT_WORD_COMPLETION | GTK_INPUT_HINT_NONE + + + + + 1 + 2 + + + + + True + False + 1 + Source: + + + 0 + 0 + + + + + True + False + False + source + False + + + 1 + 0 + + + + + True + False + 1 + Operation: + + + 0 + 1 + + + + + True + False + 0 + qubes.<b>MyOperation</b> + True + + + 1 + 1 + + + + + False + True + 1 + + + + + True + True + + + True + False + 6 + vertical + 6 + + + Display templates in the target list + True + True + False + 0 + True + + + False + True + 0 + + + + + Choose a custom destination in the target + True + True + False + 0 + True + + + False + True + 1 + + + + + + + False + Advanced options + True + + + + + False + True + 2 + + + + + True + True + 2 + + + + + False + True + 1 + + + + + + diff --git a/qubespolicy/gtkhelpers.py b/qubespolicy/gtkhelpers.py new file mode 100644 index 00000000..58b30a7f --- /dev/null +++ b/qubespolicy/gtkhelpers.py @@ -0,0 +1,301 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import qubes +import gi, os +gi.require_version('Gtk', '3.0') +from gi.repository import Gtk, Gdk, GdkPixbuf, GObject, GLib +from . qubesutils import sanitize_domain_name + +glade_directory = os.path.join(os.path.dirname(__file__), "glade") + +class GtkIconGetter: + def __init__(self, size): + self._icons = {} + self._size = size + self._theme = Gtk.IconTheme.get_default() + + def get_icon(self, name): + if name not in self._icons: + try: + icon = self._theme.load_icon(name, self._size, 0) + except GLib.Error: + icon = self._theme.load_icon("gnome-foot", self._size, 0) + + self._icons[name] = icon + + return self._icons[name] + +class VMListModeler: + def __init__(self): + self._icon_getter = GtkIconGetter(16) + + self._entries = {} + self._create_entries() + + + def _get_icon(self, vm): + return self._icon_getter.get_icon(vm.label.icon) + + def _get_list(self): + collection = qubes.QubesVmCollection() + try: + collection.lock_db_for_reading() + + collection.load() + + return [vm for vm in collection.values()] + finally: + collection.unlock_db() + + def _create_entries(self): + for vm in self._get_list(): + sanitize_domain_name(vm.name, assert_sanitized = True) + + icon = self._get_icon(vm) + + self._entries[vm.name] = {'qid': vm.qid, + 'icon': icon, + 'vm': vm} + + + def _get_valid_qube_name(self, combo, entry_box, exclusions): + name = None + + if combo and combo.get_active_id(): + selected = combo.get_active_id() + + if selected in self._entries and selected not in exclusions: + name = selected + + if not name and entry_box: + typed = entry_box.get_text() + + if typed in self._entries and typed not in exclusions: + name = typed + + return name + + def _combo_change(self, selection_trigger, combo, entry_box, exclusions): + data = None + name = self._get_valid_qube_name(combo, entry_box, exclusions) + + if name: + entry = self._entries[name] + + data = (entry['qid'], name) + + if entry_box: + entry_box.set_icon_from_pixbuf( + Gtk.EntryIconPosition.PRIMARY, entry['icon']) + else: + if entry_box: + entry_box.set_icon_from_stock( + Gtk.EntryIconPosition.PRIMARY, "gtk-find") + + if selection_trigger: + selection_trigger(data) + + def _entry_activate(self, activation_trigger, combo, entry_box, exclusions): + name = self._get_valid_qube_name(combo, entry_box, exclusions) + + if name: + activation_trigger(entry_box) + + def apply_model(self, destination_object, vm_filter_list = None, + selection_trigger = None, activation_trigger = None): + if isinstance(destination_object, Gtk.ComboBox): + list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf) + + exclusions = [] + for vm_name in sorted(self._entries.iterkeys()): + entry = self._entries[vm_name] + + matches = True + + if vm_filter_list: + for vm_filter in vm_filter_list: + if not vm_filter.matches(entry['vm']): + matches = False + break + + if matches: + list_store.append([entry['qid'], vm_name, entry['icon']]) + else: + exclusions += [vm_name] + + destination_object.set_model(list_store) + destination_object.set_id_column(1) + + icon_column = Gtk.CellRendererPixbuf() + destination_object.pack_start(icon_column, False) + destination_object.add_attribute(icon_column, "pixbuf", 2) + destination_object.set_entry_text_column(1) + + if destination_object.get_has_entry(): + entry_box = destination_object.get_child() + + area = Gtk.CellAreaBox() + area.pack_start(icon_column, False, False, False) + area.add_attribute(icon_column, "pixbuf", 2) + + completion = Gtk.EntryCompletion.new_with_area(area) + completion.set_inline_selection(True) + completion.set_inline_completion(True) + completion.set_popup_completion(True) + completion.set_popup_single_match(False) + completion.set_model(list_store) + completion.set_text_column(1) + + entry_box.set_completion(completion) + if activation_trigger: + entry_box.connect("activate", + lambda entry: self._entry_activate( + activation_trigger, + destination_object, + entry, + exclusions)) + + # A Combo with an entry has a text column already + text_column = destination_object.get_cells()[0] + destination_object.reorder(text_column, 1) + else: + entry_box = None + + text_column = Gtk.CellRendererText() + destination_object.pack_start(text_column, False) + destination_object.add_attribute(text_column, "text", 1) + + changed_function = lambda combo: self._combo_change( + selection_trigger, + combo, + entry_box, + exclusions) + + destination_object.connect("changed", changed_function) + changed_function(destination_object) + + else: + raise TypeError( + "Only expecting Gtk.ComboBox objects to want our model.") + + def apply_icon(self, entry, qube_name): + if isinstance(entry, Gtk.Entry): + if qube_name in self._entries: + entry.set_icon_from_pixbuf( + Gtk.EntryIconPosition.PRIMARY, + self._entries[qube_name]['icon']) + else: + raise ValueError("The specified source qube does not exist!") + else: + raise TypeError( + "Only expecting Gtk.Entry objects to want our icon.") + + class NameBlacklistFilter: + def __init__(self, avoid_names_list): + self._avoid_names_list = avoid_names_list + + def matches(self, vm): + return vm.name not in self._avoid_names_list + + class NameWhitelistFilter: + def __init__(self, allowed_names_list): + self._allowed_names_list = allowed_names_list + + def matches(self, vm): + return vm.name in self._allowed_names_list + +class GtkOneTimerHelper: + def __init__(self, wait_seconds): + self._wait_seconds = wait_seconds + self._current_timer_id = 0 + self._timer_completed = False + + def _invalidate_timer_completed(self): + self._timer_completed = False + + def _invalidate_current_timer(self): + self._current_timer_id += 1 + + def _timer_check_run(self, timer_id): + if self._current_timer_id == timer_id: + self._timer_run(timer_id) + self._timer_completed = True + else: + pass + + def _timer_run(self, timer_id): + raise NotImplementedError("Not yet implemented") + + def _timer_schedule(self): + self._invalidate_current_timer() + GObject.timeout_add(int(round(self._wait_seconds * 1000)), + self._timer_check_run, + self._current_timer_id) + + def _timer_has_completed(self): + return self._timer_completed + +class FocusStealingHelper(GtkOneTimerHelper): + def __init__(self, window, target_button, wait_seconds = 1): + GtkOneTimerHelper.__init__(self, wait_seconds) + self._window = window + self._target_button = target_button + + self._window.connect("window-state-event", self._window_state_event) + + self._target_sensitivity = False + self._target_button.set_sensitive(self._target_sensitivity) + + def _window_changed_focus(self, window_is_focused): + self._target_button.set_sensitive(False) + self._invalidate_timer_completed() + + if window_is_focused: + self._timer_schedule() + else: + self._invalidate_current_timer() + + def _window_state_event(self, window, event): + assert window == self._window, \ + 'Window state callback called with wrong window' + + changed_focus = event.changed_mask & Gdk.WindowState.FOCUSED + window_focus = event.new_window_state & Gdk.WindowState.FOCUSED + + if changed_focus: + self._window_changed_focus(window_focus != 0) + + # Propagate event further + return False + + def _timer_run(self, timer_id): + self._target_button.set_sensitive(self._target_sensitivity) + + def request_sensitivity(self, sensitivity): + if self._timer_has_completed() or not sensitivity: + self._target_button.set_sensitive(sensitivity) + + self._target_sensitivity = sensitivity + + def can_perform_action(self): + return self._timer_has_completed() + diff --git a/qubespolicy/rpcconfirmation.py b/qubespolicy/rpcconfirmation.py new file mode 100644 index 00000000..9df3f583 --- /dev/null +++ b/qubespolicy/rpcconfirmation.py @@ -0,0 +1,210 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +from . gtkhelpers import VMListModeler, FocusStealingHelper, glade_directory +from . qubesutils import sanitize_domain_name, sanitize_service_name +from gi.repository import Gtk, Gdk, GLib +import os + +class RPCConfirmationWindow(): + _source_file = os.path.join(glade_directory, "RPCConfirmationWindow.glade") + _source_id = { 'window': "RPCConfirmationWindow", + 'ok': "okButton", + 'cancel': "cancelButton", + 'source': "sourceEntry", + 'rpc_label' : "rpcLabel", + 'target': "TargetCombo", + 'error_bar': "ErrorBar", + 'error_message': "ErrorMessage", + } + + def _clicked_ok(self, source): + assert source != None, \ + 'Called the clicked ok callback from no source object' + + if self._can_perform_action(): + self._confirmed = True + self._close() + + def _clicked_cancel(self, button): + assert button == self._rpc_cancel_button, \ + 'Called the clicked cancel callback through the wrong button' + + if self._can_perform_action(): + self._confirmed = False + self._close() + + def _key_pressed(self, window, key): + assert window == self._rpc_window, \ + 'Key pressed callback called with wrong window' + + if self._can_perform_action(): + if key.keyval == Gdk.KEY_Escape: + self._confirmed = False + self._close() + + def _update_ok_button_sensitivity(self, data): + valid = (data != None) + + if valid: + (self._target_qid, self._target_name) = data + else: + self._target_qid = None + self._target_name = None + + self._focus_helper.request_sensitivity(valid) + + def _show_error(self, error_message): + self._error_message.set_text(error_message) + self._error_bar.set_visible(True) + + def _close_error(self, error_bar, response): + assert error_bar == self._error_bar, \ + 'Closed the error bar with the wrong error bar as parameter' + assert response != None, 'Closed the error bar with None as a response' + + self._error_bar.set_visible(False) + + def _set_initial_target(self, source, target): + if target != None: + if target == source: + self._show_error( + "Source and target domains must not be the same.") + else: + model = self._rpc_combo_box.get_model() + + found = False + for item in model: + if item[1] == target: + found = True + + self._rpc_combo_box.set_active_iter( + model.get_iter(item.path)) + + break + + if not found: + self._show_error("Domain '%s' doesn't exist." % target) + + def _can_perform_action(self): + return self._focus_helper.can_perform_action() + + def _escape_and_format_rpc_text(self, rpc_operation): + escaped = GLib.markup_escape_text(rpc_operation) + + partitioned = escaped.partition('.') + formatted = partitioned[0] + partitioned[1] + + if len(partitioned[2]) > 0: + formatted += "" + partitioned[2] + "" + else: + formatted = "" + formatted + "" + + return formatted + + def _connect_events(self): + self._rpc_window.connect("key-press-event",self._key_pressed) + self._rpc_ok_button.connect("clicked", self._clicked_ok) + self._rpc_cancel_button.connect("clicked", self._clicked_cancel) + + self._error_bar.connect("response", self._close_error) + + def __init__(self, source, rpc_operation, name_whitelist, target = None): + sanitize_domain_name(source, assert_sanitized = True) + sanitize_service_name(source, assert_sanitized = True) + + self._gtk_builder = Gtk.Builder() + self._gtk_builder.add_from_file(self._source_file) + self._rpc_window = self._gtk_builder.get_object( + self._source_id['window']) + self._rpc_ok_button = self._gtk_builder.get_object( + self._source_id['ok']) + self._rpc_cancel_button = self._gtk_builder.get_object( + self._source_id['cancel']) + self._rpc_label = self._gtk_builder.get_object( + self._source_id['rpc_label']) + self._source_entry = self._gtk_builder.get_object( + self._source_id['source']) + self._rpc_combo_box = self._gtk_builder.get_object( + self._source_id['target']) + self._error_bar = self._gtk_builder.get_object( + self._source_id['error_bar']) + self._error_message = self._gtk_builder.get_object( + self._source_id['error_message']) + self._target_qid = None + self._target_name = None + + self._focus_helper = self._new_focus_stealing_helper() + + self._rpc_label.set_markup( + self._escape_and_format_rpc_text(rpc_operation)) + + list_modeler = self._new_VM_list_modeler() + + domain_filters = [VMListModeler.NameWhitelistFilter(name_whitelist)] + + list_modeler.apply_model(self._rpc_combo_box, domain_filters, + selection_trigger = self._update_ok_button_sensitivity, + activation_trigger = self._clicked_ok ) + + self._source_entry.set_text(source) + list_modeler.apply_icon(self._source_entry, source) + + self._confirmed = None + + self._set_initial_target(source, target) + + self._connect_events() + + def _close(self): + self._rpc_window.close() + + def _show(self): + self._rpc_window.set_keep_above(True) + self._rpc_window.connect("delete-event", Gtk.main_quit) + self._rpc_window.show_all() + + Gtk.main() + + def _new_VM_list_modeler(self): + return VMListModeler() + + def _new_focus_stealing_helper(self): + return FocusStealingHelper( + self._rpc_window, + self._rpc_ok_button, + 1) + + def confirm_rpc(self): + self._show() + + if self._confirmed: + return { 'name': self._target_name, 'qid': self._target_qid, + 'parameters': {} } + else: + return False + +def confirm_rpc(source, rpc_operation, name_whitelist, target = None): + window = RPCConfirmationWindow(source, rpc_operation, name_whitelist, + target) + + return window.confirm_rpc() + diff --git a/qubespolicy/tests/gtkhelpers.py b/qubespolicy/tests/gtkhelpers.py new file mode 100755 index 00000000..4e757cae --- /dev/null +++ b/qubespolicy/tests/gtkhelpers.py @@ -0,0 +1,444 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest, time +from core.gtkhelpers import VMListModeler, GtkOneTimerHelper, FocusStealingHelper, Gtk + +class VMListModelerMock(VMListModeler): + def __init__(self): + VMListModeler.__init__(self) + + def _get_list(self): + return [ MockVm(0, "dom0", "black"), + MockVm(2, "test-red1", "red"), + MockVm(4, "test-red2", "red"), + MockVm(7, "test-red3", "red"), + MockVm(8, "test-source", "green"), + MockVm(10, "test-target", "orange"), + MockVm(15, "test-disp6", "red", True) ] + + @staticmethod + def get_name_whitelist(): + return [ "test-red1", "test-red2", "test-red3", + "test-target", "test-disp6" ] + +class MockVmLabel: + def __init__(self, index, color, name, dispvm = False): + self.index = index + self.color = color + self.name = name + self.dispvm = dispvm + self.icon = "gnome-foot" + +class MockVm: + def __init__(self, qid, name, color, dispvm = False): + self.qid = qid + self.name = name + self.label = MockVmLabel(qid, 0x000000, color, dispvm) + +class MockComboEntry: + def __init__(self, text): + self._text = text + + def get_active_id(self): + return self._text + + def get_text(self): + return self._text + +class GtkTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + self._smallest_wait = 0.01 + + def flush_gtk_events(self, wait_seconds = 0): + start = time.time() + iterations = 0 + remaining_wait = wait_seconds + time_length = 0 + + if wait_seconds < 0: + raise ValueError("Only non-negative intervals are allowed.") + + while remaining_wait >= 0: + while Gtk.events_pending(): + Gtk.main_iteration_do(blocking = False) + iterations += 1 + + time_length = time.time() - start + remaining_wait = wait_seconds - time_length + + if (remaining_wait > 0): + time.sleep(self._smallest_wait) + + return (iterations, time_length) + +class VMListModelerTest(VMListModelerMock, unittest.TestCase): + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + VMListModelerMock.__init__(self) + + def test_entries_gets_loaded(self): + self.assertIsNotNone(self._entries) + + def test_valid_qube_name(self): + self.apply_model(Gtk.ComboBox()) + + for name in [ "test-red1", "test-red2", "test-red3", + "test-target", "test-disp6" ]: + + mock = MockComboEntry(name) + self.assertEquals(name, self._get_valid_qube_name(mock, mock, [])) + self.assertEquals(name, self._get_valid_qube_name(None, mock, [])) + self.assertEquals(name, self._get_valid_qube_name(mock, None, [])) + self.assertIsNone(self._get_valid_qube_name(None, None, [])) + + def test_valid_qube_name_exceptions(self): + list_exc = ["test-disp6", "test-red2"] + + self.apply_model(Gtk.ComboBox(), + [VMListModeler.NameBlacklistFilter([ list_exc[0], list_exc[1] ]) ]) + + for name in list_exc: + mock = MockComboEntry(name) + self.assertIsNone(self._get_valid_qube_name(mock, mock, list_exc)) + self.assertIsNone(self._get_valid_qube_name(None, mock, list_exc)) + self.assertIsNone(self._get_valid_qube_name(mock, None, list_exc)) + + def test_invalid_qube_name(self): + self.apply_model(Gtk.ComboBox()) + + for name in [ "test-nonexistant", None, "", 1 ]: + + mock = MockComboEntry(name) + self.assertIsNone(self._get_valid_qube_name(mock, mock, [])) + self.assertIsNone(self._get_valid_qube_name(None, mock, [])) + self.assertIsNone(self._get_valid_qube_name(mock, None, [])) + + def test_apply_model(self): + new_object = Gtk.ComboBox() + self.assertIsNone(new_object.get_model()) + + self.apply_model(new_object) + + self.assertIsNotNone(new_object.get_model()) + + def test_apply_model_with_entry(self): + new_object = Gtk.ComboBox.new_with_entry() + + self.assertIsNone(new_object.get_model()) + + self.apply_model(new_object) + + self.assertIsNotNone(new_object.get_model()) + + def test_apply_model_only_combobox(self): + invalid_types = [ 1, "One", u'1', {'1': "one"}, VMListModelerMock()] + + for invalid_type in invalid_types: + with self.assertRaises(TypeError): + self.apply_model(invalid_type) + + def test_apply_model_blacklist(self): + combo = Gtk.ComboBox() + + self.apply_model(combo) + self.assertEquals(7, len(combo.get_model())) + + self.apply_model(combo, [ VMListModeler.NameBlacklistFilter([ + self._entries.keys()[0] ]) ]) + self.assertEquals(6, len(combo.get_model())) + + self.apply_model(combo, [ VMListModeler.NameBlacklistFilter([ + self._entries.keys()[0] ]), + VMListModeler.NameBlacklistFilter([ + self._entries.keys()[1] ]) ]) + self.assertEquals(5, len(combo.get_model())) + + self.apply_model(combo, [ VMListModeler.NameBlacklistFilter([ + self._entries.keys()[0], + self._entries.keys()[1] ]) ]) + self.assertEquals(5, len(combo.get_model())) + + def test_apply_model_whitelist(self): + combo = Gtk.ComboBox() + + self.apply_model(combo) + self.assertEquals(7, len(combo.get_model())) + + self.apply_model(combo, [ VMListModeler.NameWhitelistFilter([ + self._entries.keys()[0] ]) ]) + self.assertEquals(1, len(combo.get_model())) + + self.apply_model(combo, [ VMListModeler.NameWhitelistFilter([ + self._entries.keys()[0], + self._entries.keys()[1] ]) ]) + self.assertEquals(2, len(combo.get_model())) + + def test_apply_model_multiple_filters(self): + combo = Gtk.ComboBox() + + self.apply_model(combo) + self.assertEquals(7, len(combo.get_model())) + + self.apply_model(combo, [ VMListModeler.NameWhitelistFilter([ + self._entries.keys()[0], + self._entries.keys()[1], + self._entries.keys()[2], + self._entries.keys()[3], + self._entries.keys()[4] ]), + VMListModeler.NameBlacklistFilter([ + self._entries.keys()[0], + self._entries.keys()[1] ]) ]) + self.assertEquals(3, len(combo.get_model())) + + def test_apply_icon(self): + new_object = Gtk.Entry() + + self.assertIsNone( + new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY)) + + self.apply_icon(new_object, "test-disp6") + + self.assertIsNotNone( + new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY)) + + def test_apply_icon_only_entry(self): + invalid_types = [ 1, "One", u'1', {'1': "one"}, Gtk.ComboBox()] + + for invalid_type in invalid_types: + with self.assertRaises(TypeError): + self.apply_icon(invalid_type, "test-disp6") + + def test_apply_icon_only_existing(self): + new_object = Gtk.Entry() + + for name in [ "test-red1", "test-red2", "test-red3", + "test-target", "test-disp6" ]: + self.apply_icon(new_object, name) + + for name in [ "test-nonexistant", None, "", 1 ]: + with self.assertRaises(ValueError): + self.apply_icon(new_object, name) + +class GtkOneTimerHelperTest(GtkOneTimerHelper, GtkTestCase): + def __init__(self, *args, **kwargs): + GtkTestCase.__init__(self, *args, **kwargs) + + self._test_time = 0.1 + + GtkOneTimerHelper.__init__(self, self._test_time) + self._run_timers = [] + + def _timer_run(self, timer_id): + self._run_timers.append(timer_id) + + def test_nothing_runs_automatically(self): + self.flush_gtk_events(self._test_time*2) + self.assertEquals([], self._run_timers) + self.assertEquals(0, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + def test_schedule_one_task(self): + self._timer_schedule() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1], self._run_timers) + self.assertEquals(1, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + def test_invalidate_completed(self): + self._timer_schedule() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1], self._run_timers) + self.assertEquals(1, self._current_timer_id) + + self.assertTrue(self._timer_has_completed()) + self._invalidate_timer_completed() + self.assertFalse(self._timer_has_completed()) + + def test_schedule_and_cancel_one_task(self): + self._timer_schedule() + self._invalidate_current_timer() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([], self._run_timers) + self.assertEquals(2, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + def test_two_tasks(self): + self._timer_schedule() + self.flush_gtk_events(self._test_time/4) + self._timer_schedule() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([2], self._run_timers) + self.assertEquals(2, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + def test_more_tasks(self): + num = 0 + for num in range(1,10): + self._timer_schedule() + self.flush_gtk_events(self._test_time/4) + self.flush_gtk_events(self._test_time*1.75) + self.assertEquals([num], self._run_timers) + self.assertEquals(num, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + def test_more_tasks_cancel(self): + num = 0 + for num in range(1,10): + self._timer_schedule() + self.flush_gtk_events(self._test_time/4) + self._invalidate_current_timer() + self.flush_gtk_events(self._test_time*1.75) + self.assertEquals([], self._run_timers) + self.assertEquals(num+1, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + def test_subsequent_tasks(self): + self._timer_schedule() #1 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1], self._run_timers) + self.assertEquals(1, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + self._timer_schedule() #2 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1,2], self._run_timers) + self.assertEquals(2, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + self._invalidate_timer_completed() + self._timer_schedule() #3 + self._invalidate_current_timer() #4 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1,2], self._run_timers) + self.assertEquals(4, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + self._timer_schedule() #5 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1,2,5], self._run_timers) + self.assertEquals(5, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + +class FocusStealingHelperMock(FocusStealingHelper): + def simulate_focus(self): + self._window_changed_focus(True) + +class FocusStealingHelperTest(FocusStealingHelperMock, GtkTestCase): + def __init__(self, *args, **kwargs): + GtkTestCase.__init__(self, *args, **kwargs) + + self._test_time = 0.1 + self._test_button = Gtk.Button() + self._test_window = Gtk.Window() + + FocusStealingHelperMock.__init__(self, self._test_window, + self._test_button, self._test_time) + + def test_nothing_runs_automatically(self): + self.assertFalse(self.can_perform_action()) + self.flush_gtk_events(self._test_time*2) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def test_nothing_runs_automatically_with_request(self): + self.request_sensitivity(True) + self.assertFalse(self.can_perform_action()) + self.flush_gtk_events(self._test_time*2) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def _simulate_focus(self, focused): + self._window_changed_focus(focused) + + def test_focus_with_request(self): + self.request_sensitivity(True) + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + def test_focus_with_late_request(self): + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self.request_sensitivity(True) + self.assertTrue(self._test_button.get_sensitive()) + + def test_immediate_defocus(self): + self.request_sensitivity(True) + self._simulate_focus(True) + self._simulate_focus(False) + self.flush_gtk_events(self._test_time*2) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def test_focus_then_unfocus(self): + self.request_sensitivity(True) + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + self._simulate_focus(False) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def test_focus_cycle(self): + self.request_sensitivity(True) + + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + self._simulate_focus(False) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self._simulate_focus(True) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + self.request_sensitivity(False) + self.assertTrue(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self._simulate_focus(False) + self.assertFalse(self.can_perform_action()) + + self._simulate_focus(True) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + +if __name__=='__main__': + unittest.main() diff --git a/qubespolicy/tests/rpcconfirmation.py b/qubespolicy/tests/rpcconfirmation.py new file mode 100755 index 00000000..a3ec3d7b --- /dev/null +++ b/qubespolicy/tests/rpcconfirmation.py @@ -0,0 +1,334 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest, sys +from core.rpcconfirmation import RPCConfirmationWindow +from gtkhelpers import VMListModelerMock, GtkTestCase, FocusStealingHelperMock + +class MockRPCConfirmationWindow(RPCConfirmationWindow): + def _new_VM_list_modeler(self): + return VMListModelerMock() + + def _new_focus_stealing_helper(self): + return FocusStealingHelperMock( + self._rpc_window, + self._rpc_ok_button, + self._focus_stealing_seconds) + + def __init__(self, source, rpc_operation, + name_whitelist = VMListModelerMock.get_name_whitelist(), + target = None, focus_stealing_seconds = 1): + self._focus_stealing_seconds = focus_stealing_seconds + + RPCConfirmationWindow.__init__(self, source, rpc_operation, + name_whitelist, target) + + def is_error_visible(self): + return self._error_bar.get_visible() + + def get_shown_domains(self): + model = self._rpc_combo_box.get_model() + model_iter = model.get_iter_first() + domains = [] + + while model_iter != None: + domain_name = model.get_value(model_iter, 1) + + domains = domains + [domain_name] + + model_iter = model.iter_next(model_iter) + + return domains + +class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase): + def __init__(self, test_method, source_name = "test-source", + rpc_operation = "test.Operation", + name_whitelist = VMListModelerMock.get_name_whitelist(), + target_name = None): + GtkTestCase.__init__(self, test_method) + self.test_source_name = source_name + self.test_rpc_operation = rpc_operation + self.test_target_name = target_name + + self._test_time = 0.1 + + self.test_called_close = False + self.test_called_show = False + + self.test_clicked_ok = False + self.test_clicked_cancel = False + + MockRPCConfirmationWindow.__init__(self, + self.test_source_name, + self.test_rpc_operation, + name_whitelist, + self.test_target_name, + focus_stealing_seconds = self._test_time) + + def _can_perform_action(self): + return True + + def _close(self): + self.test_called_close = True + + def _show(self): + self.test_called_show = True + + def _clicked_ok(self, button): + MockRPCConfirmationWindow._clicked_ok(self, button) + self.test_clicked_ok = True + + def _clicked_cancel(self, button): + MockRPCConfirmationWindow._clicked_cancel(self, button) + self.test_clicked_cancel = True + + def test_has_linked_the_fields(self): + self.assertIsNotNone(self._rpc_window) + self.assertIsNotNone(self._rpc_ok_button) + self.assertIsNotNone(self._rpc_cancel_button) + self.assertIsNotNone(self._rpc_label) + self.assertIsNotNone(self._source_entry) + self.assertIsNotNone(self._rpc_combo_box) + self.assertIsNotNone(self._error_bar) + self.assertIsNotNone(self._error_message) + + def test_is_showing_source(self): + self.assertTrue(self.test_source_name in + self._source_entry.get_text()) + + def test_is_showing_operation(self): + self.assertTrue(self.test_rpc_operation in + self._rpc_label.get_text()) + + def test_escape_and_format_rpc_text(self): + self.assertEquals("qubes.Test", + self._escape_and_format_rpc_text("qubes.Test")) + self.assertEquals("custom.Domain", + self._escape_and_format_rpc_text("custom.Domain")) + self.assertEquals("nodomain", + self._escape_and_format_rpc_text("nodomain")) + self.assertEquals("domain.Sub.Operation", + self._escape_and_format_rpc_text("domain.Sub.Operation")) + self.assertEquals("", + self._escape_and_format_rpc_text("")) + self.assertEquals(".", + self._escape_and_format_rpc_text(".")) + self.assertEquals("inject.<script>", + self._escape_and_format_rpc_text("inject.