parent
b3ceb2d7fa
commit
ab1bd77b45
@ -19,14 +19,16 @@
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import qubes
|
||||
import gi, os
|
||||
import gi
|
||||
import os
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk, Gdk, GdkPixbuf, GObject, GLib
|
||||
from . qubesutils import sanitize_domain_name
|
||||
import qubes
|
||||
from qubespolicy.utils import sanitize_domain_name
|
||||
|
||||
glade_directory = os.path.join(os.path.dirname(__file__), "glade")
|
||||
|
||||
|
||||
class GtkIconGetter:
|
||||
def __init__(self, size):
|
||||
self._icons = {}
|
||||
@ -44,6 +46,7 @@ class GtkIconGetter:
|
||||
|
||||
return self._icons[name]
|
||||
|
||||
|
||||
class VMListModeler:
|
||||
def __init__(self):
|
||||
self._icon_getter = GtkIconGetter(16)
|
||||
@ -126,7 +129,7 @@ class VMListModeler:
|
||||
list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf)
|
||||
|
||||
exclusions = []
|
||||
for vm_name in sorted(self._entries.iterkeys()):
|
||||
for vm_name in sorted(self._entries.keys()):
|
||||
entry = self._entries[vm_name]
|
||||
|
||||
matches = True
|
||||
@ -223,6 +226,7 @@ class VMListModeler:
|
||||
def matches(self, vm):
|
||||
return vm.name in self._allowed_names_list
|
||||
|
||||
|
||||
class GtkOneTimerHelper:
|
||||
def __init__(self, wait_seconds):
|
||||
self._wait_seconds = wait_seconds
|
||||
@ -254,6 +258,7 @@ class GtkOneTimerHelper:
|
||||
def _timer_has_completed(self):
|
||||
return self._timer_completed
|
||||
|
||||
|
||||
class FocusStealingHelper(GtkOneTimerHelper):
|
||||
def __init__(self, window, target_button, wait_seconds=1):
|
||||
GtkOneTimerHelper.__init__(self, wait_seconds)
|
||||
@ -298,4 +303,3 @@ class FocusStealingHelper(GtkOneTimerHelper):
|
||||
|
||||
def can_perform_action(self):
|
||||
return self._timer_has_completed()
|
||||
|
||||
|
@ -19,12 +19,16 @@
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
from . gtkhelpers import VMListModeler, FocusStealingHelper, glade_directory
|
||||
from . qubesutils import sanitize_domain_name, sanitize_service_name
|
||||
|
||||
from gi.repository import Gtk, Gdk, GLib
|
||||
import os
|
||||
from qubespolicy.gtkhelpers import VMListModeler, FocusStealingHelper, \
|
||||
glade_directory
|
||||
from qubespolicy.utils import sanitize_domain_name, \
|
||||
sanitize_service_name
|
||||
|
||||
class RPCConfirmationWindow():
|
||||
|
||||
class RPCConfirmationWindow:
|
||||
_source_file = os.path.join(glade_directory, "RPCConfirmationWindow.glade")
|
||||
_source_id = {'window': "RPCConfirmationWindow",
|
||||
'ok': "okButton",
|
||||
@ -37,7 +41,7 @@ class RPCConfirmationWindow():
|
||||
}
|
||||
|
||||
def _clicked_ok(self, source):
|
||||
assert source != None, \
|
||||
assert source is not None, \
|
||||
'Called the clicked ok callback from no source object'
|
||||
|
||||
if self._can_perform_action():
|
||||
@ -62,7 +66,7 @@ class RPCConfirmationWindow():
|
||||
self._close()
|
||||
|
||||
def _update_ok_button_sensitivity(self, data):
|
||||
valid = (data != None)
|
||||
valid = (data is not None)
|
||||
|
||||
if valid:
|
||||
(self._target_qid, self._target_name) = data
|
||||
@ -79,12 +83,13 @@ class RPCConfirmationWindow():
|
||||
def _close_error(self, error_bar, response):
|
||||
assert error_bar == self._error_bar, \
|
||||
'Closed the error bar with the wrong error bar as parameter'
|
||||
assert response != None, 'Closed the error bar with None as a response'
|
||||
assert response is not None, \
|
||||
'Closed the error bar with None as a response'
|
||||
|
||||
self._error_bar.set_visible(False)
|
||||
|
||||
def _set_initial_target(self, source, target):
|
||||
if target != None:
|
||||
if target is not None:
|
||||
if target == source:
|
||||
self._show_error(
|
||||
"Source and target domains must not be the same.")
|
||||
@ -202,6 +207,7 @@ class RPCConfirmationWindow():
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def confirm_rpc(source, rpc_operation, name_whitelist, target=None):
|
||||
window = RPCConfirmationWindow(source, rpc_operation, name_whitelist,
|
||||
target)
|
||||
|
@ -19,27 +19,36 @@
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import unittest, time
|
||||
from core.gtkhelpers import VMListModeler, GtkOneTimerHelper, FocusStealingHelper, Gtk
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from gi.repository import Gtk
|
||||
|
||||
from qubespolicy.gtkhelpers import VMListModeler, GtkOneTimerHelper, \
|
||||
FocusStealingHelper
|
||||
|
||||
|
||||
class VMListModelerMock(VMListModeler):
|
||||
def __init__(self):
|
||||
VMListModeler.__init__(self)
|
||||
|
||||
def _get_list(self):
|
||||
return [ MockVm(0, "dom0", "black"),
|
||||
return [
|
||||
MockVm(0, "dom0", "black"),
|
||||
MockVm(2, "test-red1", "red"),
|
||||
MockVm(4, "test-red2", "red"),
|
||||
MockVm(7, "test-red3", "red"),
|
||||
MockVm(8, "test-source", "green"),
|
||||
MockVm(10, "test-target", "orange"),
|
||||
MockVm(15, "test-disp6", "red", True) ]
|
||||
MockVm(15, "test-disp6", "red", True)
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_name_whitelist():
|
||||
return ["test-red1", "test-red2", "test-red3",
|
||||
"test-target", "test-disp6"]
|
||||
|
||||
|
||||
class MockVmLabel:
|
||||
def __init__(self, index, color, name, dispvm=False):
|
||||
self.index = index
|
||||
@ -48,12 +57,14 @@ class MockVmLabel:
|
||||
self.dispvm = dispvm
|
||||
self.icon = "gnome-foot"
|
||||
|
||||
|
||||
class MockVm:
|
||||
def __init__(self, qid, name, color, dispvm=False):
|
||||
self.qid = qid
|
||||
self.name = name
|
||||
self.label = MockVmLabel(qid, 0x000000, color, dispvm)
|
||||
|
||||
|
||||
class MockComboEntry:
|
||||
def __init__(self, text):
|
||||
self._text = text
|
||||
@ -64,6 +75,7 @@ class MockComboEntry:
|
||||
def get_text(self):
|
||||
return self._text
|
||||
|
||||
|
||||
class GtkTestCase(unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
unittest.TestCase.__init__(self, *args, **kwargs)
|
||||
@ -86,10 +98,11 @@ class GtkTestCase(unittest.TestCase):
|
||||
time_length = time.time() - start
|
||||
remaining_wait = wait_seconds - time_length
|
||||
|
||||
if (remaining_wait > 0):
|
||||
if remaining_wait > 0:
|
||||
time.sleep(self._smallest_wait)
|
||||
|
||||
return (iterations, time_length)
|
||||
return iterations, time_length
|
||||
|
||||
|
||||
class VMListModelerTest(VMListModelerMock, unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -163,19 +176,19 @@ class VMListModelerTest(VMListModelerMock, unittest.TestCase):
|
||||
self.apply_model(combo)
|
||||
self.assertEquals(7, len(combo.get_model()))
|
||||
|
||||
self.apply_model(combo, [ VMListModeler.NameBlacklistFilter([
|
||||
self._entries.keys()[0] ]) ])
|
||||
self.apply_model(combo, [
|
||||
VMListModeler.NameBlacklistFilter([self._entries.keys()[0]])])
|
||||
self.assertEquals(6, len(combo.get_model()))
|
||||
|
||||
self.apply_model(combo, [ VMListModeler.NameBlacklistFilter([
|
||||
self._entries.keys()[0] ]),
|
||||
VMListModeler.NameBlacklistFilter([
|
||||
self._entries.keys()[1] ]) ])
|
||||
self.apply_model(combo, [
|
||||
VMListModeler.NameBlacklistFilter([self._entries.keys()[0]]),
|
||||
VMListModeler.NameBlacklistFilter([self._entries.keys()[1]])])
|
||||
self.assertEquals(5, len(combo.get_model()))
|
||||
|
||||
self.apply_model(combo, [VMListModeler.NameBlacklistFilter([
|
||||
self._entries.keys()[0],
|
||||
self._entries.keys()[1] ]) ])
|
||||
self._entries.keys()[1]
|
||||
])])
|
||||
self.assertEquals(5, len(combo.get_model()))
|
||||
|
||||
def test_apply_model_whitelist(self):
|
||||
@ -184,8 +197,8 @@ class VMListModelerTest(VMListModelerMock, unittest.TestCase):
|
||||
self.apply_model(combo)
|
||||
self.assertEquals(7, len(combo.get_model()))
|
||||
|
||||
self.apply_model(combo, [ VMListModeler.NameWhitelistFilter([
|
||||
self._entries.keys()[0] ]) ])
|
||||
self.apply_model(combo, [
|
||||
VMListModeler.NameWhitelistFilter([self._entries.keys()[0]])])
|
||||
self.assertEquals(1, len(combo.get_model()))
|
||||
|
||||
self.apply_model(combo, [VMListModeler.NameWhitelistFilter([
|
||||
@ -239,6 +252,7 @@ class VMListModelerTest(VMListModelerMock, unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
self.apply_icon(new_object, name)
|
||||
|
||||
|
||||
class GtkOneTimerHelperTest(GtkOneTimerHelper, GtkTestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
GtkTestCase.__init__(self, *args, **kwargs)
|
||||
@ -307,7 +321,7 @@ class GtkOneTimerHelperTest(GtkOneTimerHelper, GtkTestCase):
|
||||
self._timer_schedule()
|
||||
self.flush_gtk_events(self._test_time/4)
|
||||
self._invalidate_current_timer()
|
||||
self.flush_gtk_events(self._test_time*1.75)
|
||||
self.flush_gtk_events(int(self._test_time*1.75))
|
||||
self.assertEquals([], self._run_timers)
|
||||
self.assertEquals(num+1, self._current_timer_id)
|
||||
self.assertFalse(self._timer_has_completed())
|
||||
@ -339,10 +353,12 @@ class GtkOneTimerHelperTest(GtkOneTimerHelper, GtkTestCase):
|
||||
self.assertEquals(5, self._current_timer_id)
|
||||
self.assertTrue(self._timer_has_completed())
|
||||
|
||||
|
||||
class FocusStealingHelperMock(FocusStealingHelper):
|
||||
def simulate_focus(self):
|
||||
self._window_changed_focus(True)
|
||||
|
||||
|
||||
class FocusStealingHelperTest(FocusStealingHelperMock, GtkTestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
GtkTestCase.__init__(self, *args, **kwargs)
|
||||
|
@ -19,9 +19,14 @@
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
#
|
||||
|
||||
import unittest, sys
|
||||
from core.rpcconfirmation import RPCConfirmationWindow
|
||||
from gtkhelpers import VMListModelerMock, GtkTestCase, FocusStealingHelperMock
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
from qubespolicy.tests.gtkhelpers import VMListModelerMock, GtkTestCase, \
|
||||
FocusStealingHelperMock
|
||||
|
||||
from qubespolicy.rpcconfirmation import RPCConfirmationWindow
|
||||
|
||||
|
||||
class MockRPCConfirmationWindow(RPCConfirmationWindow):
|
||||
def _new_VM_list_modeler(self):
|
||||
@ -49,15 +54,16 @@ class MockRPCConfirmationWindow(RPCConfirmationWindow):
|
||||
model_iter = model.get_iter_first()
|
||||
domains = []
|
||||
|
||||
while model_iter != None:
|
||||
while model_iter is not None:
|
||||
domain_name = model.get_value(model_iter, 1)
|
||||
|
||||
domains = domains + [domain_name]
|
||||
domains += [domain_name]
|
||||
|
||||
model_iter = model.iter_next(model_iter)
|
||||
|
||||
return domains
|
||||
|
||||
|
||||
class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
|
||||
def __init__(self, test_method, source_name="test-source",
|
||||
rpc_operation="test.Operation",
|
||||
@ -111,12 +117,10 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
|
||||
self.assertIsNotNone(self._error_message)
|
||||
|
||||
def test_is_showing_source(self):
|
||||
self.assertTrue(self.test_source_name in
|
||||
self._source_entry.get_text())
|
||||
self.assertTrue(self.test_source_name in self._source_entry.get_text())
|
||||
|
||||
def test_is_showing_operation(self):
|
||||
self.assertTrue(self.test_rpc_operation in
|
||||
self._rpc_label.get_text())
|
||||
self.assertTrue(self.test_rpc_operation in self._rpc_label.get_text())
|
||||
|
||||
def test_escape_and_format_rpc_text(self):
|
||||
self.assertEquals("qubes.<b>Test</b>",
|
||||
@ -198,7 +202,7 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
|
||||
try:
|
||||
# We expect the call to exit immediately, since no window is opened
|
||||
self.confirm_rpc()
|
||||
except BaseException:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.assertFalse(self.test_called_close)
|
||||
@ -234,6 +238,7 @@ class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase):
|
||||
else:
|
||||
self.assertFalse(self._focus_helper.can_perform_action())
|
||||
|
||||
|
||||
class RPCConfirmationWindowTestWithTarget(RPCConfirmationWindowTestBase):
|
||||
def __init__(self, test_method):
|
||||
RPCConfirmationWindowTestBase.__init__(self, test_method,
|
||||
@ -262,6 +267,7 @@ class RPCConfirmationWindowTestWithTarget(RPCConfirmationWindowTestBase):
|
||||
self.assertIsNotNone(self._target_qid)
|
||||
self.assertIsNotNone(self._target_name)
|
||||
|
||||
|
||||
class RPCConfirmationWindowTestWithTargetInvalid(unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
unittest.TestCase.__init__(self, *args, **kwargs)
|
||||
@ -280,6 +286,7 @@ class RPCConfirmationWindowTestWithTargetInvalid(unittest.TestCase):
|
||||
target=target)
|
||||
self.assertEquals(expect, rpcWindow.is_error_visible())
|
||||
|
||||
|
||||
class RPCConfirmationWindowTestWhitelist(unittest.TestCase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
unittest.TestCase.__init__(self, *args, **kwargs)
|
||||
@ -292,20 +299,21 @@ class RPCConfirmationWindowTestWhitelist(unittest.TestCase):
|
||||
["test-red1", "test-red2", "test-red3"])
|
||||
|
||||
def test_all_red_domains_plus_nonexistent(self):
|
||||
self._assert_whitelist(["test-red1", "test-red2", "test-red3",
|
||||
self._assert_whitelist(
|
||||
["test-red1", "test-red2", "test-red3",
|
||||
"test-blue1", "test-blue2", "test-blue3"],
|
||||
["test-red1", "test-red2", "test-red3"])
|
||||
|
||||
def test_all_allowed_domains(self):
|
||||
self._assert_whitelist(["test-red1", "test-red2", "test-red3",
|
||||
"test-target", "test-disp6", "test-source",
|
||||
"dom0"], ["test-red1", "test-red2", "test-red3",
|
||||
"test-target", "test-disp6", "test-source",
|
||||
"dom0"])
|
||||
self._assert_whitelist(
|
||||
["test-red1", "test-red2", "test-red3",
|
||||
"test-target", "test-disp6", "test-source", "dom0"],
|
||||
["test-red1", "test-red2", "test-red3",
|
||||
"test-target", "test-disp6", "test-source", "dom0"])
|
||||
|
||||
def _assert_whitelist(self, whitelist, expected):
|
||||
rpcWindow = MockRPCConfirmationWindow("test-source", "test.Operation",
|
||||
whitelist)
|
||||
rpcWindow = MockRPCConfirmationWindow(
|
||||
"test-source", "test.Operation", whitelist)
|
||||
|
||||
domains = rpcWindow.get_shown_domains()
|
||||
|
||||
@ -323,12 +331,12 @@ if __name__=='__main__':
|
||||
elif sys.argv[1] == '-w':
|
||||
window = True
|
||||
else:
|
||||
print "Usage: " + __file__ + " [-t|-w]"
|
||||
print("Usage: " + __file__ + " [-t|-w]")
|
||||
|
||||
if window:
|
||||
print MockRPCConfirmationWindow("test-source",
|
||||
print(MockRPCConfirmationWindow("test-source",
|
||||
"qubes.Filecopy",
|
||||
VMListModelerMock.get_name_whitelist(),
|
||||
"test-red1").confirm_rpc()
|
||||
"test-red1").confirm_rpc())
|
||||
elif test:
|
||||
unittest.main(argv=[sys.argv[0]])
|
||||
|
@ -21,25 +21,26 @@
|
||||
def _sanitize_char(input_char, extra_allowed_characters):
|
||||
input_char_ord = ord(input_char)
|
||||
|
||||
if (input_char_ord >= ord('a') and input_char_ord <= ord('z')) \
|
||||
or (input_char_ord >= ord('A') and input_char_ord <= ord('Z')) \
|
||||
or (input_char_ord >= ord('0') and input_char_ord <= ord('9')) \
|
||||
if (ord('a') <= input_char_ord <= ord('z')) \
|
||||
or (ord('A') <= input_char_ord <= ord('Z')) \
|
||||
or (ord('0') <= input_char_ord <= ord('9')) \
|
||||
or (input_char in ['$', '_', '-', '.']) \
|
||||
or (extra_allowed_characters != None
|
||||
and input_char in extra_allowed_characters):
|
||||
or (input_char in extra_allowed_characters):
|
||||
result = input_char
|
||||
else:
|
||||
result = '_'
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# This function needs to be synchronized with qrexec-daemon.c's sanitize_name()
|
||||
# from the qubes-core-admin-linux repository.
|
||||
#
|
||||
# See https://github.com/QubesOS/qubes-core-admin-linux/blob/4f0878ccbf8a95f8264b54d2b6f4dc433ca0793a/qrexec/qrexec-daemon.c#L627-L646
|
||||
# See https://github.com/QubesOS/qubes-core-admin-linux/blob/
|
||||
# 4f0878ccbf8a95f8264b54d2b6f4dc433ca0793a/qrexec/qrexec-daemon.c#L627-L646
|
||||
#
|
||||
def _sanitize_name(input_string, extra_allowed_characters, assert_sanitized):
|
||||
result = ''.join(_sanitize_char(character, extra_allowed_characters) \
|
||||
result = ''.join(_sanitize_char(character, extra_allowed_characters)
|
||||
for character in input_string)
|
||||
|
||||
if assert_sanitized:
|
||||
@ -48,8 +49,10 @@ def _sanitize_name(input_string, extra_allowed_characters, assert_sanitized):
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
def sanitize_domain_name(input_string, assert_sanitized=False):
|
||||
return _sanitize_name(input_string, None, assert_sanitized)
|
||||
return _sanitize_name(input_string, {}, assert_sanitized)
|
||||
|
||||
|
||||
def sanitize_service_name(input_string, assert_sanitized=False):
|
||||
return _sanitize_name(input_string, {'+'}, assert_sanitized)
|
||||
|
Loading…
Reference in New Issue
Block a user