diff --git a/qubes/api/misc.py b/qubes/api/misc.py index 8de735fb..232d6ae0 100644 --- a/qubes/api/misc.py +++ b/qubes/api/misc.py @@ -71,64 +71,22 @@ class QubesMiscAPI(qubes.api.AbstractQubesAPI): assert self.dest.name == 'dom0' assert not self.arg - if getattr(self.src, 'template', None): - self.src.log.warning( - 'Ignoring qubes.NotifyTools for template-based VM') - return + untrusted_features = {} + safe_set = string.ascii_letters + string.digits + expected_features = ('version', 'qrexec', 'gui', 'default-user') + for feature in expected_features: + untrusted_value = self.src.qdb.read('/qubes-tools/' + feature) + if untrusted_value: + untrusted_value = untrusted_value.decode('ascii', + errors='strict') + assert all((c in safe_set) for c in untrusted_value) + untrusted_features[feature] = untrusted_value + del untrusted_value - # for now used only to check for the tools presence - untrusted_version = self.src.qdb.read('/qubes-tools/version') - - # reserved for future use - #untrusted_os = self.src.qdb.read('/qubes-tools/os') - - # qrexec agent presence (0 or 1) - untrusted_qrexec = self.src.qdb.read('/qubes-tools/qrexec') - - # gui agent presence (0 or 1) - untrusted_gui = self.src.qdb.read('/qubes-tools/gui') - - # default user for qvm-run etc - # starting with Qubes 4.x ignored - #untrusted_user = self.src.qdb.read('/qubes-tools/default-user') - - if untrusted_version is None: - # tools didn't advertised its features; it's strange that this - # service is called, but ignore it - return - - # any suspicious string will raise exception here - int(untrusted_version) - del untrusted_version - - # untrusted_os - ignore for now - - if untrusted_qrexec is None: - qrexec = False - else: - qrexec = bool(int(untrusted_qrexec)) - del untrusted_qrexec - - if untrusted_gui is None: - gui = False - else: - gui = bool(int(untrusted_gui)) - del untrusted_gui - - # ignore default_user - - prev_qrexec = self.src.features.get('qrexec', False) - # Let the tools to be able to enable *or disable* - # each particular component - self.src.features['qrexec'] = qrexec - self.src.features['gui'] = gui + self.src.fire_event('features-request', + untrusted_features=untrusted_features) self.app.save() - if not prev_qrexec and qrexec: - # if this is the first time qrexec was advertised, now can finish - # template setup - self.src.fire_event('template-postinstall') - @qubes.api.method('qubes.NotifyUpdates') @asyncio.coroutine def qubes_notify_updates(self, untrusted_payload): diff --git a/qubes/ext/core_features.py b/qubes/ext/core_features.py new file mode 100644 index 00000000..87d096ad --- /dev/null +++ b/qubes/ext/core_features.py @@ -0,0 +1,62 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +import qubes.ext + +class CoreFeatures(qubes.ext.Extension): + # pylint: disable=too-few-public-methods + @qubes.ext.handler('features-request') + def qubes_features_request(self, vm, event, untrusted_features): + '''Handle features provided by qubes-core-agent and qubes-gui-agent''' + # pylint: disable=no-self-use,unused-argument + if getattr(vm, 'template', None): + vm.log.warning( + 'Ignoring qubes.NotifyTools for template-based VM') + return + + # for now used only to check for the tools presence + if 'version' in untrusted_features: + # any suspicious string will raise exception here, + # but otherwise ignored + int(untrusted_features['version']) + + requested_features = {} + for feature in ('qrexec', 'gui'): + untrusted_value = untrusted_features.get(feature, None) + if untrusted_value in ('1', '0'): + requested_features[feature] = bool(int(untrusted_value)) + del untrusted_features + + # default user for qvm-run etc + # starting with Qubes 4.x ignored + # qrexec agent presence (0 or 1) + # gui agent presence (0 or 1) + + qrexec_before = vm.features.get('qrexec', False) + for feature in ('qrexec', 'gui'): + # do not allow (Template)VM to override setting if already set + # some other way + if feature in requested_features and feature not in vm.features: + vm.features[feature] = requested_features[feature] + + if not qrexec_before and vm.features.get('qrexec', False): + # if this is the first time qrexec was advertised, now can finish + # template setup + vm.fire_event('template-postinstall') diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index a53ca25b..09c9d980 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -963,6 +963,7 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.storage_file', 'qubes.tests.storage_lvm', 'qubes.tests.storage_kernels', + 'qubes.tests.ext', 'qubes.tests.vm.qubesvm', 'qubes.tests.vm.mix.net', 'qubes.tests.vm.adminvm', diff --git a/qubes/tests/api_misc.py b/qubes/tests/api_misc.py index 46c71f03..59c735bf 100644 --- a/qubes/tests/api_misc.py +++ b/qubes/tests/api_misc.py @@ -118,8 +118,6 @@ class TC_00_API_Misc(qubes.tests.QubesTestCase): '/qubes-tools/default-user': b'user', } self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': False}) response = self.call_mgmt_func(b'qubes.NotifyTools') self.assertIsNone(response) self.assertEqual(self.app.mock_calls, [ @@ -129,78 +127,36 @@ class TC_00_API_Misc(qubes.tests.QubesTestCase): mock.call.qdb.read('/qubes-tools/version'), mock.call.qdb.read('/qubes-tools/qrexec'), mock.call.qdb.read('/qubes-tools/gui'), - mock.call.features.get('qrexec', False), - mock.call.features.__setitem__('qrexec', True), - mock.call.features.__setitem__('gui', True), - mock.call.fire_event('template-postinstall') - ]) - - def test_011_notify_tools_uninstall(self): - qdb_entries = { - '/qubes-tools/version': b'1', - '/qubes-tools/qrexec': b'0', - '/qubes-tools/gui': b'0', - '/qubes-tools/os': b'Linux', - '/qubes-tools/default-user': b'user', - } - self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) - response = self.call_mgmt_func(b'qubes.NotifyTools') - self.assertIsNone(response) - self.assertEqual(self.app.mock_calls, [ - mock.call.save() - ]) - self.assertEqual(self.src.mock_calls, [ - mock.call.qdb.read('/qubes-tools/version'), - mock.call.qdb.read('/qubes-tools/qrexec'), - mock.call.qdb.read('/qubes-tools/gui'), - mock.call.features.get('qrexec', False), - mock.call.features.__setitem__('qrexec', False), - mock.call.features.__setitem__('gui', False), - ]) - - def test_012_notify_tools_uninstall2(self): - qdb_entries = { - '/qubes-tools/version': b'1', - '/qubes-tools/os': b'Linux', - '/qubes-tools/default-user': b'user', - } - self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) - response = self.call_mgmt_func(b'qubes.NotifyTools') - self.assertIsNone(response) - self.assertEqual(self.app.mock_calls, [ - mock.call.save() - ]) - self.assertEqual(self.src.mock_calls, [ - mock.call.qdb.read('/qubes-tools/version'), - mock.call.qdb.read('/qubes-tools/qrexec'), - mock.call.qdb.read('/qubes-tools/gui'), - mock.call.features.get('qrexec', False), - mock.call.features.__setitem__('qrexec', False), - mock.call.features.__setitem__('gui', False), + mock.call.qdb.read('/qubes-tools/default-user'), + mock.call.fire_event('features-request', untrusted_features={ + 'gui': '1', + 'version': '1', + 'default-user': 'user', + 'qrexec': '1'}), ]) + self.assertEqual(self.app.mock_calls, [mock.call.save()]) def test_013_notify_tools_no_version(self): qdb_entries = { - '/qubes-tools/qrexec': b'0', - '/qubes-tools/gui': b'0', + '/qubes-tools/qrexec': b'1', + '/qubes-tools/gui': b'1', '/qubes-tools/os': b'Linux', '/qubes-tools/default-user': b'user', } self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) response = self.call_mgmt_func(b'qubes.NotifyTools') self.assertIsNone(response) - self.assertEqual(self.app.mock_calls, []) self.assertEqual(self.src.mock_calls, [ mock.call.qdb.read('/qubes-tools/version'), mock.call.qdb.read('/qubes-tools/qrexec'), mock.call.qdb.read('/qubes-tools/gui'), + mock.call.qdb.read('/qubes-tools/default-user'), + mock.call.fire_event('features-request', untrusted_features={ + 'gui': '1', + 'default-user': 'user', + 'qrexec': '1'}), ]) + self.assertEqual(self.app.mock_calls, [mock.call.save()]) def test_014_notify_tools_invalid_version(self): qdb_entries = { @@ -211,50 +167,41 @@ class TC_00_API_Misc(qubes.tests.QubesTestCase): '/qubes-tools/default-user': b'user', } self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) - with self.assertRaises(ValueError): + with self.assertRaises(AssertionError): self.call_mgmt_func(b'qubes.NotifyTools') - self.assertEqual(self.app.mock_calls, []) + # should be rejected later self.assertEqual(self.src.mock_calls, [ mock.call.qdb.read('/qubes-tools/version'), - mock.call.qdb.read('/qubes-tools/qrexec'), - mock.call.qdb.read('/qubes-tools/gui'), ]) - + self.assertEqual(self.app.mock_calls, []) def test_015_notify_tools_invalid_value_qrexec(self): qdb_entries = { '/qubes-tools/version': b'1', - '/qubes-tools/qrexec': b'invalid', + '/qubes-tools/qrexec': b'invalid value', '/qubes-tools/gui': b'0', '/qubes-tools/os': b'Linux', '/qubes-tools/default-user': b'user', } self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) - with self.assertRaises(ValueError): + with self.assertRaises(AssertionError): self.call_mgmt_func(b'qubes.NotifyTools') self.assertEqual(self.app.mock_calls, []) self.assertEqual(self.src.mock_calls, [ mock.call.qdb.read('/qubes-tools/version'), mock.call.qdb.read('/qubes-tools/qrexec'), - mock.call.qdb.read('/qubes-tools/gui'), ]) def test_016_notify_tools_invalid_value_gui(self): qdb_entries = { '/qubes-tools/version': b'1', '/qubes-tools/qrexec': b'1', - '/qubes-tools/gui': b'invalid', + '/qubes-tools/gui': b'invalid value', '/qubes-tools/os': b'Linux', '/qubes-tools/default-user': b'user', } self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) - with self.assertRaises(ValueError): + with self.assertRaises(AssertionError): self.call_mgmt_func(b'qubes.NotifyTools') self.assertEqual(self.app.mock_calls, []) self.assertEqual(self.src.mock_calls, [ @@ -263,50 +210,6 @@ class TC_00_API_Misc(qubes.tests.QubesTestCase): mock.call.qdb.read('/qubes-tools/gui'), ]) - def test_017_notify_tools_template_based(self): - qdb_entries = { - '/qubes-tools/version': b'1', - '/qubes-tools/qrexec': b'1', - '/qubes-tools/gui': b'invalid', - '/qubes-tools/os': b'Linux', - '/qubes-tools/default-user': b'user', - } - self.configure_qdb(qdb_entries) - self.src.configure_mock(**{'features.get.return_value': True}) - response = self.call_mgmt_func(b'qubes.NotifyTools') - self.assertIsNone(response) - self.assertEqual(self.app.mock_calls, []) - self.assertEqual(self.src.mock_calls, [ - mock.call.template.__bool__(), - mock.call.log.warning( - 'Ignoring qubes.NotifyTools for template-based VM') - ]) - - def test_018_notify_tools_already_installed(self): - qdb_entries = { - '/qubes-tools/version': b'1', - '/qubes-tools/qrexec': b'1', - '/qubes-tools/gui': b'1', - '/qubes-tools/os': b'Linux', - '/qubes-tools/default-user': b'user', - } - self.configure_qdb(qdb_entries) - del self.src.template - self.src.configure_mock(**{'features.get.return_value': True}) - response = self.call_mgmt_func(b'qubes.NotifyTools') - self.assertIsNone(response) - self.assertEqual(self.app.mock_calls, [ - mock.call.save() - ]) - self.assertEqual(self.src.mock_calls, [ - mock.call.qdb.read('/qubes-tools/version'), - mock.call.qdb.read('/qubes-tools/qrexec'), - mock.call.qdb.read('/qubes-tools/gui'), - mock.call.features.get('qrexec', False), - mock.call.features.__setitem__('qrexec', True), - mock.call.features.__setitem__('gui', True), - ]) - def test_020_notify_updates_standalone(self): del self.src.template response = self.call_mgmt_func(b'qubes.NotifyUpdates', payload=b'1\n') @@ -315,9 +218,7 @@ class TC_00_API_Misc(qubes.tests.QubesTestCase): mock.call.updateable.__bool__(), ]) self.assertEqual(self.src.updates_available, True) - self.assertEqual(self.app.mock_calls, [ - mock.call.save() - ]) + self.assertEqual(self.app.mock_calls, [mock.call.save()]) def test_021_notify_updates_standalone2(self): del self.src.template diff --git a/qubes/tests/ext.py b/qubes/tests/ext.py new file mode 100644 index 00000000..3dc229a2 --- /dev/null +++ b/qubes/tests/ext.py @@ -0,0 +1,177 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +from unittest import mock + +import qubes.ext.core_features +import qubes.tests + + +class TC_00_CoreFeatures(qubes.tests.QubesTestCase): + def setUp(self): + super().setUp() + self.ext = qubes.ext.core_features.CoreFeatures() + self.vm = mock.MagicMock() + self.features = {} + self.vm.configure_mock(**{ + 'features.get.side_effect': self.features.get, + 'features.__contains__.side_effect': self.features.__contains__, + 'features.__setitem__.side_effect': self.features.__setitem__, + }) + + def test_010_notify_tools(self): + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'gui': '1', + 'version': '1', + 'default-user': 'user', + 'qrexec': '1'}), + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.__contains__', ('qrexec',), {}), + ('features.__setitem__', ('qrexec', True), {}), + ('features.__contains__', ('gui',), {}), + ('features.__setitem__', ('gui', True), {}), + ('features.get', ('qrexec', False), {}), + ('fire_event', ('template-postinstall',), {}) + ]) + + def test_011_notify_tools_uninstall(self): + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'gui': '0', + 'version': '1', + 'default-user': 'user', + 'qrexec': '0'}), + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.__contains__', ('qrexec',), {}), + ('features.__setitem__', ('qrexec', False), {}), + ('features.__contains__', ('gui',), {}), + ('features.__setitem__', ('gui', False), {}), + ('features.get', ('qrexec', False), {}), + ]) + + def test_012_notify_tools_uninstall2(self): + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'version': '1', + 'default-user': 'user', + }) + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.get', ('qrexec', False), {}), + ]) + + def test_013_notify_tools_no_version(self): + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'qrexec': '1', + 'gui': '1', + 'default-user': 'user', + }) + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.__contains__', ('qrexec',), {}), + ('features.__setitem__', ('qrexec', True), {}), + ('features.__contains__', ('gui',), {}), + ('features.__setitem__', ('gui', True), {}), + ('features.get', ('qrexec', False), {}), + ('fire_event', ('template-postinstall',), {}) + ]) + + def test_014_notify_tools_invalid_version(self): + del self.vm.template + with self.assertRaises(ValueError): + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'version': 'this is invalid', + 'qrexec': '1', + 'gui': '1', + 'default-user': 'user', + }) + self.assertEqual(self.vm.mock_calls, []) + + def test_015_notify_tools_invalid_value_qrexec(self): + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'version': '1', + 'qrexec': 'invalid', + 'gui': '1', + 'default-user': 'user', + }) + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.__contains__', ('gui',), {}), + ('features.__setitem__', ('gui', True), {}), + ('features.get', ('qrexec', False), {}), + ]) + + def test_016_notify_tools_invalid_value_gui(self): + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'version': '1', + 'qrexec': '1', + 'gui': 'invalid', + 'default-user': 'user', + }) + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.__contains__', ('qrexec',), {}), + ('features.__setitem__', ('qrexec', True), {}), + ('features.get', ('qrexec', False), {}), + ('fire_event', ('template-postinstall',), {}) + ]) + + def test_017_notify_tools_template_based(self): + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'version': '1', + 'qrexec': '1', + 'gui': '1', + 'default-user': 'user', + }) + self.assertEqual(self.vm.mock_calls, [ + ('template.__bool__', (), {}), + ('log.warning', ('Ignoring qubes.NotifyTools for template-based ' + 'VM',), {}) + ]) + + def test_018_notify_tools_already_installed(self): + self.features['qrexec'] = True + self.features['gui'] = True + del self.vm.template + self.ext.qubes_features_request(self.vm, 'features-request', + untrusted_features={ + 'gui': '1', + 'version': '1', + 'default-user': 'user', + 'qrexec': '1'}), + self.assertEqual(self.vm.mock_calls, [ + ('features.get', ('qrexec', False), {}), + ('features.__contains__', ('qrexec',), {}), + ('features.__contains__', ('gui',), {}), + ]) diff --git a/rpm_spec/core-dom0.spec b/rpm_spec/core-dom0.spec index 4573836e..729df0ef 100644 --- a/rpm_spec/core-dom0.spec +++ b/rpm_spec/core-dom0.spec @@ -286,6 +286,7 @@ fi %{python3_sitelib}/qubes/ext/__pycache__/* %{python3_sitelib}/qubes/ext/__init__.py %{python3_sitelib}/qubes/ext/block.py +%{python3_sitelib}/qubes/ext/core_features.py %{python3_sitelib}/qubes/ext/gui.py %{python3_sitelib}/qubes/ext/pci.py %{python3_sitelib}/qubes/ext/qubesmanager.py @@ -304,6 +305,7 @@ fi %{python3_sitelib}/qubes/tests/devices.py %{python3_sitelib}/qubes/tests/devices_block.py %{python3_sitelib}/qubes/tests/events.py +%{python3_sitelib}/qubes/tests/ext.py %{python3_sitelib}/qubes/tests/firewall.py %{python3_sitelib}/qubes/tests/init.py %{python3_sitelib}/qubes/tests/storage.py diff --git a/setup.py b/setup.py index 36360c26..0bcd0714 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,7 @@ if __name__ == '__main__': 'DispVM = qubes.vm.dispvm:DispVM', ], 'qubes.ext': [ + 'qubes.ext.core_features = qubes.ext.core_features:CoreFeatures', 'qubes.ext.qubesmanager = qubes.ext.qubesmanager:QubesManager', 'qubes.ext.gui = qubes.ext.gui:GUI', 'qubes.ext.r3compatibility = qubes.ext.r3compatibility:R3Compatibility',