瀏覽代碼

api/admin: (ext/admin) limit listing VMs based on qrexec policy

Various Admin API calls, when directed at dom0, retrieve global system
view instead of a specific VM. This applies to admin.vm.List (called at
dom0 retrieve full VM list) and admin.Events (called at dom0 listen for
events of all the VMs). This makes it tricky to configure a management
VM with access to limited set of VMs only, because many tools require
ability to list VMs, and that would return full list.

Fix this issue by adding a filter to admin.vm.List and admin.Events
calls (using event handlers in AdminExtension) that filters the output
using qrexec policy. This version evaluates policy for each VM or event
(but loads only once). If the performance will be an issue, it can be
optimized later.

Fixes QubesOS/qubes-issues#5509
Marek Marczykowski-Górecki 4 年之前
父節點
當前提交
3f96c72ee3
共有 2 個文件被更改,包括 160 次插入0 次删除
  1. 101 0
      qubes/ext/admin.py
  2. 59 0
      qubes/tests/api_admin.py

+ 101 - 0
qubes/ext/admin.py

@@ -18,10 +18,28 @@
 # License along with this library; if not, see <https://www.gnu.org/licenses/>.
 
 import qubes.api
+import qubes.api.internal
 import qubes.ext
 import qubes.vm.adminvm
+from qrexec.policy import utils, parser
+
+
+class JustEvaluateAskResolution(parser.AskResolution):
+    def execute(self, caller_ident):
+        pass
+
+
+class JustEvaluateAllowResolution(parser.AllowResolution):
+    def execute(self, caller_ident):
+        pass
+
 
 class AdminExtension(qubes.ext.Extension):
+    def __init__(self):
+        super(AdminExtension, self).__init__()
+        self.policy_cache = utils.PolicyCache(lazy_load=True)
+        self.policy_cache.initialize_watcher()
+
     # pylint: disable=too-few-public-methods
     @qubes.ext.handler(
         'admin-permission:admin.vm.tag.Set',
@@ -36,3 +54,86 @@ class AdminExtension(qubes.ext.Extension):
                     __name__, type(self).__name__))
 
     # TODO create that tag here (need to figure out how to pass mgmtvm name)
+
+    @qubes.ext.handler('admin-permission:admin.vm.List')
+    def admin_vm_list(self, vm, event, arg, **kwargs):
+        '''When called with target 'dom0' (aka "get full list"), exclude domains
+           that the caller don't have permission to list
+        '''
+        # pylint: disable=unused-argument
+
+        if vm.klass == 'AdminVM':
+            # dom0 can always list everything
+            return None
+
+        policy = self.policy_cache.get_policy()
+        system_info = qubes.api.internal.get_system_info(vm.app)
+
+        def filter_vms(dest_vm):
+            request = parser.Request(
+                'admin.vm.List',
+                '+' + arg,
+                vm.name,
+                dest_vm.name,
+                system_info=system_info,
+                ask_resolution_type=JustEvaluateAskResolution,
+                allow_resolution_type=JustEvaluateAllowResolution)
+            try:
+                resolution = policy.evaluate(request)
+                # do not consider 'ask' as allow here,
+                # this needs to be not interactive
+                return isinstance(resolution, parser.AllowResolution)
+            except parser.AccessDenied:
+                return False
+
+        return (filter_vms,)
+
+    @qubes.ext.handler('admin-permission:admin.Events')
+    def admin_events(self, vm, event, arg, **kwargs):
+        '''When called with target 'dom0' (aka "get all events"),
+           exclude domains that the caller don't have permission to receive
+           events about
+        '''
+        # pylint: disable=unused-argument
+
+        if vm.klass == 'AdminVM':
+            # dom0 can always list everything
+            return None
+
+        def filter_events(event):
+            subject, event, kwargs = event
+            try:
+                dest = subject.name
+            except AttributeError:
+                # domain-add and similar events fired on the Qubes() object
+                if 'vm' in kwargs:
+                    dest = kwargs['vm'].name
+                else:
+                    dest = '@adminvm'
+
+            policy = self.policy_cache.get_policy()
+            # TODO: cache system_info (based on last qubes.xml write time?)
+            system_info = qubes.api.internal.get_system_info(vm.app)
+            request = parser.Request(
+                'admin.Events',
+                '+' + event.replace(':', '_'),
+                vm.name,
+                dest,
+                system_info=system_info,
+                ask_resolution_type=JustEvaluateAskResolution,
+                allow_resolution_type=JustEvaluateAllowResolution)
+            try:
+                resolution = policy.evaluate(request)
+                # do not consider 'ask' as allow here,
+                # this needs to be not interactive
+                return isinstance(resolution, parser.AllowResolution)
+            except parser.AccessDenied:
+                return False
+
+        return (filter_events,)
+
+    @qubes.ext.handler('qubes-close', system=True)
+    def on_qubes_close(self, app, event, **kwargs):
+        """Unregister policy file watches on app.close()."""
+        # pylint: disable=unused-argument
+        self.policy_cache.cleanup()

+ 59 - 0
qubes/tests/api_admin.py

@@ -30,6 +30,8 @@ import unittest.mock
 import libvirt
 import copy
 
+import pathlib
+
 import qubes
 import qubes.devices
 import qubes.firewall
@@ -135,6 +137,24 @@ class TC_00_VMs(AdminAPITestCase):
         self.assertEqual(value,
             'test-vm1 class=AppVM state=Halted\n')
 
+    def test_002_vm_list_filter(self):
+        with tempfile.TemporaryDirectory() as tmpdir:
+            tmpdir = pathlib.Path(tmpdir)
+            with unittest.mock.patch(
+                    'qubes.ext.admin.AdminExtension._instance.policy_cache.path',
+                    pathlib.Path(tmpdir)):
+                with (tmpdir / 'admin.policy').open('w') as f:
+                    f.write('admin.vm.List * @anyvm @adminvm allow\n')
+                    f.write('admin.vm.List * @anyvm test-vm1 allow')
+                mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'test-vm1',
+                    b'admin.vm.List', b'dom0', b'')
+                loop = asyncio.get_event_loop()
+                value = loop.run_until_complete(
+                    mgmt_obj.execute(untrusted_payload=b''))
+                self.assertEqual(value,
+                    'dom0 class=AdminVM state=Running\n'
+                    'test-vm1 class=AppVM state=Halted\n')
+
     def test_010_vm_property_list(self):
         # this test is kind of stupid, but at least check if appropriate
         # admin-permission event is fired
@@ -1112,6 +1132,45 @@ netvm default=True type=vm \n'''
                 unittest.mock.call(vm2, 'test-event2', arg1='abc'),
             ])
 
+    def test_272_events_filter(self):
+        with tempfile.TemporaryDirectory() as tmpdir:
+            tmpdir = pathlib.Path(tmpdir)
+            with unittest.mock.patch(
+                    'qubes.ext.admin.AdminExtension._instance.policy_cache.path',
+                    pathlib.Path(tmpdir)):
+                with (tmpdir / 'admin.policy').open('w') as f:
+                    f.write('admin.Events * @anyvm @adminvm allow\n')
+                    f.write('admin.Events * @anyvm test-vm1 allow')
+
+                send_event = unittest.mock.Mock(spec=[])
+                mgmt_obj = qubes.api.admin.QubesAdminAPI(self.app, b'test-vm1',
+                    b'admin.Events',
+                    b'dom0', b'', send_event=send_event)
+
+                @asyncio.coroutine
+                def fire_event():
+                    # add VM _after_ starting admin.Events call
+                    vm = self.app.add_new_vm('AppVM', label='red',
+                        name='test-vm2',
+                        template='test-template')
+                    vm.fire_event('test-event2', arg1='abc')
+                    self.vm.fire_event('test-event', arg1='abc')
+                    mgmt_obj.cancel()
+                    return vm
+
+                loop = asyncio.get_event_loop()
+                execute_task = asyncio.ensure_future(
+                    mgmt_obj.execute(untrusted_payload=b''))
+                event_task = asyncio.ensure_future(fire_event())
+                loop.run_until_complete(execute_task)
+                vm2 = event_task.result()
+                self.assertIsNone(execute_task.result())
+                self.assertEqual(send_event.mock_calls,
+                    [
+                        unittest.mock.call(self.app, 'connection-established'),
+                        unittest.mock.call(self.vm, 'test-event', arg1='abc'),
+                    ])
+
     def test_280_feature_list(self):
         self.vm.features['test-feature'] = 'some-value'
         value = self.call_mgmt_func(b'admin.vm.feature.List', b'test-vm1')