Browse Source

Added dynamic X keyboard event monitoring to qvm_start_daemon.py

Update keyboard_layout property whenever guivm's layout changes, instead of
only at the start.

requires QubesOS/qubes-core-admin#350

references QubesOS/qubes-issues#1396
references QubesOS/qubes-issues#4294
Marta Marczykowska-Górecka 3 years ago
parent
commit
1446a6d7ee
3 changed files with 236 additions and 47 deletions
  1. 104 45
      qubesadmin/tools/qvm_start_daemon.py
  2. 128 0
      qubesadmin/tools/xcffibhelpers.py
  3. 4 2
      setup.py

+ 104 - 45
qubesadmin/tools/qvm_start_daemon.py

@@ -35,6 +35,7 @@ import qubesadmin
 import qubesadmin.exc
 import qubesadmin.tools
 import qubesadmin.vm
+from . import xcffibhelpers
 
 have_events = False
 try:
@@ -73,6 +74,106 @@ REGEX_OUTPUT = re.compile(r"""
         """)
 
 
+class KeyboardLayout:
+    """Class to store and parse X Keyboard layout data"""
+    # pylint: disable=too-few-public-methods
+    def __init__(self, binary_string):
+        split_string = binary_string.split(b'\0')
+        self.languages = split_string[2].decode().split(',')
+        self.variants = split_string[3].decode().split(',')
+        self.options = split_string[4].decode()
+
+    def get_property(self, layout_num):
+        """Return the selected keyboard layout as formatted for keyboard_layout
+        property."""
+        return '+'.join([self.languages[layout_num],
+                         self.variants[layout_num],
+                         self.options])
+
+
+class XWatcher:
+    """Watch and react for X events related to the keyboard layout changes."""
+    def __init__(self, conn, app):
+        self.app = app
+        self.current_vm = self.app.domains[self.app.local_name]
+
+        self.conn = conn
+        self.ext = self.initialize_extension()
+
+        # get root window
+        self.setup = self.conn.get_setup()
+        self.root = self.setup.roots[0].root
+
+        # atoms (strings) of events we need to watch
+        # keyboard layout was switched
+        self.atom_xklavier = self.conn.core.InternAtom(
+            False, len("XKLAVIER_ALLOW_SECONDARY"),
+            "XKLAVIER_ALLOW_SECONDARY").reply().atom
+        # keyboard layout was changed
+        self.atom_xkb_rules = self.conn.core.InternAtom(
+            False, len("_XKB_RULES_NAMES"),
+            "_XKB_RULES_NAMES").reply().atom
+
+        self.conn.core.ChangeWindowAttributesChecked(
+            self.root, xcffib.xproto.CW.EventMask,
+            [xcffib.xproto.EventMask.PropertyChange])
+        self.conn.flush()
+
+        # initialize state
+        self.keyboard_layout = KeyboardLayout(self.get_keyboard_layout())
+        self.selected_layout = self.get_selected_layout()
+
+    def initialize_extension(self):
+        """Initialize XKB extension (not supported by xcffib by default"""
+        ext = self.conn(xcffibhelpers.key)
+        ext.UseExtension()
+        return ext
+
+    def get_keyboard_layout(self):
+        """Check what is current keyboard layout definition"""
+        property_cookie = self.conn.core.GetProperty(
+            False,  # delete
+            self.root,  # window
+            self.atom_xkb_rules,
+            xcffib.xproto.Atom.STRING,
+            0, 1000
+        )
+        prop_reply = property_cookie.reply()
+        return prop_reply.value.buf()
+
+    def get_selected_layout(self):
+        """Check which keyboard layout is currently selected"""
+        state_reply = self.ext.GetState().reply()
+        return state_reply.lockedGroup[0]
+
+    def update_keyboard_layout(self):
+        """Update current vm's keyboard_layout property"""
+        new_property = self.keyboard_layout.get_property(
+            self.selected_layout)
+
+        current_property = self.current_vm.keyboard_layout
+
+        if new_property != current_property:
+            self.current_vm.keyboard_layout = new_property
+
+    def event_reader(self, callback):
+        """Poll for X events related to keyboard layout"""
+        try:
+            for event in iter(self.conn.poll_for_event, None):
+                if isinstance(event, xcffib.xproto.PropertyNotifyEvent):
+                    if event.atom == self.atom_xklavier:
+                        self.selected_layout = self.get_selected_layout()
+                    elif event.atom == self.atom_xkb_rules:
+                        self.keyboard_layout = KeyboardLayout(
+                            self.get_keyboard_layout())
+                    else:
+                        continue
+
+                    self.update_keyboard_layout()
+        except xcffib.ConnectionException:
+            callback()
+
+
 def get_monitor_layout():
     """Get list of monitors and their size/position"""
     outputs = []
@@ -114,31 +215,6 @@ def get_monitor_layout():
     return outputs
 
 
-def set_keyboard_layout(vm):
-    """Set layout configuration into features for Gui admin extension"""
-    try:
-        # Examples of 'xprop -root _XKB_RULES_NAMES' output values:
-        # "evdev", "pc105", "fr", "oss", ""
-        # "evdev", "pc105", "pl,us", ",", "grp:win_switch,compose:caps"
-
-        # We use the first layout provided
-        xkb_re = r'_XKB_RULES_NAMES\(STRING\) = ' \
-                 r'\"(.*)\", \"(.*)\", \"(.*)\", \"(.*)\", \"(.*)\"\n'
-        xkb_rules_names = subprocess.check_output(
-            ['xprop', '-root', '_XKB_RULES_NAMES']).decode()
-        xkb_parsed = re.match(xkb_re, xkb_rules_names)
-        if xkb_parsed:
-            xkb_layout = [x.split(',')[0] for x in xkb_parsed.groups()[2:4]]
-            # We keep all options
-            xkb_layout.append(xkb_parsed.group(5))
-            keyboard_layout = '+'.join(xkb_layout)
-            vm.features['keyboard-layout'] = keyboard_layout
-        else:
-            vm.log.warning('Failed to parse layout for %s', vm)
-    except subprocess.CalledProcessError as e:
-        vm.log.warning('Failed to set layout for %s: %s', vm, str(e))
-
-
 class DAEMONLauncher:
     """Launch GUI/AUDIO daemon for VMs"""
 
@@ -461,17 +537,6 @@ class DAEMONLauncher:
         events.add_handler('connection-established',
                            self.on_connection_established)
 
-
-def x_reader(conn, callback):
-    """Try reading something from X connection to check if it's still alive.
-    In case it isn't, call *callback*.
-    """
-    try:
-        conn.poll_for_event()
-    except xcffib.ConnectionException:
-        callback()
-
-
 if 'XDG_RUNTIME_DIR' in os.environ:
     pidfile_path = os.path.join(os.environ['XDG_RUNTIME_DIR'],
                                 'qvm-start-daemon.pid')
@@ -492,9 +557,6 @@ parser.add_argument('--pidfile', action='store', default=pidfile_path,
 parser.add_argument('--notify-monitor-layout', action='store_true',
                     help='Notify running instance in --watch mode'
                          ' about changed monitor layout')
-parser.add_argument('--set-keyboard-layout', action='store_true',
-                    help='Set keyboard layout values into GuiVM features.'
-                         'This option is implied by --watch')
 # Add it for the help only
 parser.add_argument('--force', action='store_true', default=False,
                     help='Force running daemon without enabled services'
@@ -515,11 +577,6 @@ def main(args=None):
         parser.error('--watch option must be used with --all')
     if args.watch and args.notify_monitor_layout:
         parser.error('--watch cannot be used with --notify-monitor-layout')
-    if args.watch and 'guivm-gui-agent' in enabled_services:
-        args.set_keyboard_layout = True
-    if args.set_keyboard_layout or os.path.exists('/etc/qubes-release'):
-        guivm = args.app.domains.get_blind(args.app.local_name)
-        set_keyboard_layout(guivm)
     launcher = DAEMONLauncher(args.app)
     if args.watch:
         if not have_events:
@@ -541,8 +598,10 @@ def main(args=None):
                                     launcher.send_monitor_layout_all)
 
             conn = xcffib.connect()
+            x_watcher = XWatcher(conn, args.app)
             x_fd = conn.get_file_descriptor()
-            loop.add_reader(x_fd, x_reader, conn, events_listener.cancel)
+            loop.add_reader(x_fd, x_watcher.event_reader,
+                            events_listener.cancel)
 
             try:
                 loop.run_until_complete(events_listener)

+ 128 - 0
qubesadmin/tools/xcffibhelpers.py

@@ -0,0 +1,128 @@
+# -*- encoding: utf8 -*-
+#
+# The Qubes OS Project, http://www.qubes-os.org
+#
+# Copyright (C) 2020 Marek Marczykowski-Górecki
+#                               <marmarek@invisiblethingslab.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with this program; if not, see <http://www.gnu.org/licenses/>.
+"""
+This is a set of helper classes, designed to facilitate importing an X extension
+that's not supported by default by xcffib.
+"""
+import io
+import struct
+import xcffib
+
+
+
+class XkbUseExtensionReply(xcffib.Reply):
+    """Helper class to parse XkbUseExtensionReply
+    Contains hardcoded values based on X11/XKBproto.h"""
+    # pylint: disable=too-few-public-methods
+    def __init__(self, unpacker):
+        if isinstance(unpacker, xcffib.Protobj):
+            unpacker = xcffib.MemoryUnpacker(unpacker.pack())
+        xcffib.Reply.__init__(self, unpacker)
+        base = unpacker.offset
+        self.major_version, self.minor_version = unpacker.unpack(
+            "xx2x4xHH4x4x4x4x")
+        self.bufsize = unpacker.offset - base
+
+
+class XkbUseExtensionCookie(xcffib.Cookie):
+    """Helper class for use in loading Xkb extension"""
+    reply_type = XkbUseExtensionReply
+
+
+class XkbGetStateReply(xcffib.Reply):
+    """Helper class to parse XkbGetState; copy&paste from X11/XKBproto.h"""
+    # pylint: disable=too-few-public-methods
+    _typedef = """
+        BYTE    type;
+        BYTE    deviceID;
+        CARD16  sequenceNumber B16;
+        CARD32  length B32;
+        CARD8   mods;
+        CARD8   baseMods;
+        CARD8   latchedMods;
+        CARD8   lockedMods;
+        CARD8   group;
+        CARD8   lockedGroup;
+        INT16   baseGroup B16;
+        INT16   latchedGroup B16;
+        CARD8   compatState;
+        CARD8   grabMods;
+        CARD8   compatGrabMods;
+        CARD8   lookupMods;
+        CARD8   compatLookupMods;
+        CARD8   pad1;
+        CARD16  ptrBtnState B16;
+        CARD16  pad2 B16;
+        CARD32  pad3 B32;"""
+    _type_mapping = {
+        "BYTE": "B",
+        "CARD16": "H",
+        "CARD8": "B",
+        "CARD32": "I",
+        "INT16": "h",
+    }
+
+    def __init__(self, unpacker):
+        if isinstance(unpacker, xcffib.Protobj):
+            unpacker = xcffib.MemoryUnpacker(unpacker.pack())
+        xcffib.Reply.__init__(self, unpacker)
+        base = unpacker.offset
+
+        # dynamic parse of copy&pasted struct content, for easy re-usability
+        for line in self._typedef.splitlines():
+            line = line.strip()
+            line = line.rstrip(';')
+            if not line:
+                continue
+            typename, name = line.split()[:2]  # ignore optional third part
+            setattr(self, name, unpacker.unpack(self._type_mapping[typename]))
+
+        self.bufsize = unpacker.offset - base
+
+
+class XkbGetStateCookie(xcffib.Cookie):
+    """Helper class for use in parsing Xkb GetState"""
+    reply_type = XkbGetStateReply
+
+
+class XkbExtension(xcffib.Extension):
+    """Helper class to load and use Xkb xcffib extension; needed
+    because there is not XKB support in xcffib."""
+    # pylint: disable=invalid-name,missing-function-docstring
+    def UseExtension(self, is_checked=True):
+        buf = io.BytesIO()
+        buf.write(struct.pack("=xx2xHH", 1, 0))
+        return self.send_request(0, buf, XkbGetStateCookie,
+                                 is_checked=is_checked)
+
+    def GetState(self, deviceSpec=0x100, is_checked=True):
+        buf = io.BytesIO()
+        buf.write(struct.pack("=xx2xHxx", deviceSpec))
+        return self.send_request(4, buf, XkbGetStateCookie,
+                                 is_checked=is_checked)
+
+
+key = xcffib.ExtensionKey("XKEYBOARD")
+# this is a lie: there are events and errors types
+_events = {}
+_errors = {}
+
+# pylint: disable=protected-access
+xcffib._add_ext(key, XkbExtension, _events, _errors)

+ 4 - 2
setup.py

@@ -17,9 +17,11 @@ def get_console_scripts():
     if sys.version_info[0:2] >= (3, 4):
         for filename in os.listdir('./qubesadmin/tools'):
             basename, ext = os.path.splitext(os.path.basename(filename))
-            if basename in ['__init__', 'dochelpers'] or ext != '.py':
+            if basename in ['__init__', 'dochelpers', 'xcffibhelpers']\
+                    or ext != '.py':
                 continue
-            yield basename.replace('_', '-'), 'qubesadmin.tools.{}'.format(basename)
+            yield basename.replace('_', '-'), 'qubesadmin.tools.{}'.format(
+                basename)
 
 # create simple scripts that run much faster than "console entry points"
 class CustomInstall(setuptools.command.install.install):