diff --git a/.travis.yml b/.travis.yml index 198b4e15..50618add 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ install: - pip install --quiet -r ci/requirements.txt - git clone https://github.com/"${TRAVIS_REPO_SLUG%%/*}"/qubes-builder ~/qubes-builder script: - - PYTHONPATH=test-packages pylint --rcfile=ci/pylintrc qubes + - PYTHONPATH=test-packages pylint --rcfile=ci/pylintrc qubes qubespolicy - ./run-tests --no-syslog - ~/qubes-builder/scripts/travis-build env: diff --git a/ci/requirements.txt b/ci/requirements.txt index 81325bdf..a95201dc 100644 --- a/ci/requirements.txt +++ b/ci/requirements.txt @@ -6,3 +6,4 @@ jinja2 lxml pylint sphinx +pydbus diff --git a/doc/index.rst b/doc/index.rst index 39c5284f..3ace08e2 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -20,6 +20,7 @@ manpages and API documentation. For primary user documentation, see qubes-ext qubes-log qubes-mgmt + qubes-policy qubes-tools/index qubes-tests qubes-dochelpers diff --git a/doc/qubes-policy.rst b/doc/qubes-policy.rst new file mode 100644 index 00000000..0be819ca --- /dev/null +++ b/doc/qubes-policy.rst @@ -0,0 +1,87 @@ +:py:mod:`qubes.policy` -- Qubes RPC policy +========================================== + +Every Qubes domain can trigger various RPC services, but if such call would be +allowed depends on Qubes RPC policy (qrexec policy in short). + +Qrexec policy format +-------------------- + +Policy consists of a file, which is parsed line-by-line. First matching line +is used as an action. + +Each line consist of three values separated by white characters (space(s), tab(s)): +1. Source specification, which is one of: + + - domain name + - `$anyvm` - any domain + - `$tag:some-tag` - VM having tag `some-tag` + - `$type:vm-type` - VM of `vm-type` type, available types: + AppVM, TemplateVM, StandaloneVM, DispVM + +2. Target specification, one of: + + - domain name + - `$anyvm` - any domain, excluding dom0 + - `$tag:some-tag` - domain having tag `some-tag` + - `$type:vm-type` - domain of `vm-type` type, available types: + AppVM, TemplateVM, StandaloneVM, DispVM + - `$default` - used when caller did not specified any VM + - `$dispvm:vm-name` - _new_ Disposable VM created from AppVM `vm-name` + - `$dispvm` - _new_ Disposable VM created from AppVM pointed by caller + property `default_dispvm`, which defaults to global property `default_dispvm` + +3. Action and optional action parameters, one of: + + - `allow` - allow the call, without further questions; optional parameters: + - `target=` - override caller provided call target - + possible values are: domain name, `$dispvm` or `$dispvm:vm-name` + - `user=` - call the service using this user, instead of the user + pointed by target VM's `default_user` property + - `deny` - deny the call, without further questions; no optional + parameters are supported + - `ask` - ask the user for confirmation; optional parameters: + - `target=` - override user provided call target + - `user=` - call the service using this user, instead of the user + pointed by target VM's `default_user` property + - `default_target=` - suggest this target when prompting the user for + confirmation + +Alternatively, a line may consist of a single keyword `$include:` followed by a +path. This will load a given file as its content would be in place of +`$include` line. Relative paths are resolved relative to +`/etc/qubes-rpc/policy` directory. + +Evaluating `ask` action +----------------------- + +When qrexec policy specify `ask` action, the user is asked whether the call +should be allowed or denied. In addition to that, user also need to choose +target domain. User have to choose from a set of targets specified by the +policy. Such set is calculated using the algorithm below: + +1. If `ask` action have `target=` option specified, only that target is +considered. A prompt window will allow to choose only this value and it will +also be pre-filled value. + +2. If no `target=` option is specified, all rules are evaluated to see what +target domains (for a given source domain) would result in `ask` or `allow` +action. If any of them have `target=` option set, that value is used instead of +the one specified in "target" column (for this particular line). Then the user +is presented with a confirmation dialog and an option to choose from those +domains. + +3. If `default_target=` option is set, it is used as +suggested value, otherwise no suggestion is made (regardless of calling domain +specified any target or not). + + + +Module contents +--------------- + +.. automodule:: qubespolicy + :members: + :show-inheritance: + +.. vim: ts=3 sw=3 et diff --git a/linux/system-config/Makefile b/linux/system-config/Makefile index 51536153..d7c54b1f 100644 --- a/linux/system-config/Makefile +++ b/linux/system-config/Makefile @@ -8,4 +8,8 @@ install: ln -s block-snapshot $(DESTDIR)/etc/xen/scripts/block-origin install -d $(DESTDIR)/etc/xdg/autostart install -m 0644 qubes-guid.desktop $(DESTDIR)/etc/xdg/autostart/ + install -m 0644 qrexec-policy-agent.desktop $(DESTDIR)/etc/xdg/autostart/ install -m 0644 -D tmpfiles-qubes.conf $(DESTDIR)/usr/lib/tmpfiles.d/qubes.conf + install -d $(DESTDIR)/etc/dbus-1/system.d + install -m 0644 dbus-org.qubesos.PolicyAgent.conf \ + $(DESTDIR)/etc/dbus-1/system.d/org.qubesos.PolicyAgent.conf diff --git a/linux/system-config/dbus-org.qubesos.PolicyAgent.conf b/linux/system-config/dbus-org.qubesos.PolicyAgent.conf new file mode 100644 index 00000000..e1dd2b9d --- /dev/null +++ b/linux/system-config/dbus-org.qubesos.PolicyAgent.conf @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + diff --git a/linux/system-config/qrexec-policy-agent.desktop b/linux/system-config/qrexec-policy-agent.desktop new file mode 100644 index 00000000..0e1cd8d3 --- /dev/null +++ b/linux/system-config/qrexec-policy-agent.desktop @@ -0,0 +1,7 @@ +[Desktop Entry] +Name=Qubes Qrexec Policy agent +Comment=Agent for handling policy confirmation prompts +Icon=qubes +Exec=qrexec-policy-agent +Terminal=false +Type=Application diff --git a/qubes/app.py b/qubes/app.py index 00c7c126..2709ccc1 100644 --- a/qubes/app.py +++ b/qubes/app.py @@ -480,7 +480,7 @@ class VMCollection(object): # if not self[netvm_qid].is_netvm(): # return set([]) - while len(new_vms) > 0: + while new_vms: cur_vm = new_vms.pop() for vm in cur_vm.connected_vms: if vm in dependent_vms: diff --git a/qubes/devices.py b/qubes/devices.py index e51af94a..4fc24ebd 100644 --- a/qubes/devices.py +++ b/qubes/devices.py @@ -216,8 +216,7 @@ class DeviceCollection(object): if dev: assert len(dev) == 1 return dev[0] - else: - return UnknownDevice(self._vm, ident) + return UnknownDevice(self._vm, ident) class DeviceManager(dict): diff --git a/qubes/firewall.py b/qubes/firewall.py index 6861bcb3..804f7d58 100644 --- a/qubes/firewall.py +++ b/qubes/firewall.py @@ -406,8 +406,7 @@ class Firewall(object): def _translate_action(key): if xml_root.get(key, policy_v1) == 'allow': return Action.accept - else: - return Action.drop + return Action.drop self.rules.append(Rule(None, action=_translate_action('dns'), diff --git a/qubes/mgmtinternal.py b/qubes/mgmtinternal.py new file mode 100644 index 00000000..519df410 --- /dev/null +++ b/qubes/mgmtinternal.py @@ -0,0 +1,84 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +''' Internal interface for dom0 components to communicate with qubesd. ''' + +import asyncio +import json + +import qubes.mgmt +import qubes.vm.dispvm + +api = qubes.mgmt.api + + +class QubesInternalMgmt(qubes.mgmt.AbstractQubesMgmt): + ''' Communication interface for dom0 components, + by design the input here is trusted.''' + # + # PRIVATE METHODS, not to be called via RPC + # + + # + # ACTUAL RPC CALLS + # + + @api('mgmtinternal.GetSystemInfo', no_payload=True) + @asyncio.coroutine + def getsysteminfo(self): + assert self.dest.name == 'dom0' + assert not self.arg + + system_info = {'domains': { + domain.name: { + 'tags': list(domain.tags), + 'type': domain.__class__.__name__, + 'dispvm_allowed': getattr(domain, 'dispvm_allowed', False), + 'default_dispvm': (str(domain.default_dispvm) if + domain.default_dispvm else None), + 'icon': str(domain.label.icon), + } for domain in self.app.domains + }} + + return json.dumps(system_info) + + @api('mgmtinternal.vm.Start', no_payload=True) + @asyncio.coroutine + def start(self): + assert not self.arg + + yield from self.dest.start() + + @api('mgmtinternal.vm.Create.DispVM', no_payload=True) + @asyncio.coroutine + def create_dispvm(self): + assert not self.arg + + # TODO convert to coroutine + dispvm = qubes.vm.dispvm.DispVM.from_appvm(self.dest) + return dispvm.name + + @api('mgmtinternal.vm.CleanupDispVM', no_payload=True) + @asyncio.coroutine + def cleanup_dispvm(self): + assert not self.arg + + # TODO convert to coroutine + self.dest.cleanup() diff --git a/qubes/rngdoc.py b/qubes/rngdoc.py index 205b7ea3..5730f8b3 100755 --- a/qubes/rngdoc.py +++ b/qubes/rngdoc.py @@ -55,8 +55,7 @@ class Element(object): if wrap: return ''.join(self.schema.wrapper.fill(p) + '\n\n' for p in textwrap.dedent(xml.text.strip('\n')).split('\n\n')) - else: - return ' '.join(xml.text.strip().split()) + return ' '.join(xml.text.strip().split()) def get_data_type(self, xml=None): @@ -93,7 +92,7 @@ class Element(object): for xml in self.xml.xpath('''./rng:attribute | ./rng:optional/rng:attribute | ./rng:choice/rng:attribute''', namespaces=self.nsmap): - required = xml.getparent() == self.xml and 'yes' or 'no' + required = 'yes' if xml.getparent() == self.xml else 'no' yield (xml, required) @@ -212,6 +211,6 @@ Quick example, worth thousands lines of specification: if __name__ == '__main__': - main(*sys.argv[1:]) + main(*sys.argv[1:]) # pylint: disable=no-value-for-parameter # vim: ts=4 sw=4 et diff --git a/qubes/storage/__init__.py b/qubes/storage/__init__.py index 1f8aa44a..fa8bc80d 100644 --- a/qubes/storage/__init__.py +++ b/qubes/storage/__init__.py @@ -446,8 +446,7 @@ class Storage(object): "You need to pass a Volume or pool name as str" if isinstance(volume, Volume): return self.pools[volume.name] - else: - return self.vm.app.pools[volume] + return self.vm.app.pools[volume] def commit(self): ''' Makes changes to an 'origin' volume persistent ''' @@ -476,8 +475,7 @@ class Storage(object): "You need to pass a Volume or pool name as str" if isinstance(volume, Volume): return self.pools[volume.name].export(volume) - else: - return self.pools[volume].export(self.vm.volumes[volume]) + return self.pools[volume].export(self.vm.volumes[volume]) class Pool(object): diff --git a/qubes/storage/file.py b/qubes/storage/file.py index 1f939b0f..1a07224c 100644 --- a/qubes/storage/file.py +++ b/qubes/storage/file.py @@ -378,10 +378,9 @@ class FileVolume(qubes.storage.Volume): if not os.path.exists(old_revision): return {} - else: - seconds = os.path.getctime(old_revision) - iso_date = qubes.storage.isodate(seconds).split('.', 1)[0] - return {iso_date: old_revision} + seconds = os.path.getctime(old_revision) + iso_date = qubes.storage.isodate(seconds).split('.', 1)[0] + return {iso_date: old_revision} @property def usage(self): diff --git a/qubes/storage/lvm.py b/qubes/storage/lvm.py index 58137148..051d75dc 100644 --- a/qubes/storage/lvm.py +++ b/qubes/storage/lvm.py @@ -401,8 +401,7 @@ class ThinVolume(qubes.storage.Volume): return qubes.devices.BlockDevice( '/dev/' + self._vid_snap, self.name, self.script, self.rw, self.domain, self.devtype) - else: - return super(ThinVolume, self).block_device() + return super(ThinVolume, self).block_device() @property def usage(self): # lvm thin usage always returns at least the same usage as diff --git a/qubes/tarwriter.py b/qubes/tarwriter.py index 35bef30d..bc9e90dd 100644 --- a/qubes/tarwriter.py +++ b/qubes/tarwriter.py @@ -38,10 +38,9 @@ class TarSparseInfo(tarfile.TarInfo): @property def realsize(self): - if len(self.sparsemap): + if self.sparsemap: return self.sparsemap[-1][0] + self.sparsemap[-1][1] - else: - return self.size + return self.size def sparse_header_chunk(self, index): if index < len(self.sparsemap): @@ -49,8 +48,7 @@ class TarSparseInfo(tarfile.TarInfo): tarfile.itn(self.sparsemap[index][0], 12, tarfile.GNU_FORMAT), tarfile.itn(self.sparsemap[index][1], 12, tarfile.GNU_FORMAT), ]) - else: - return b'\0' * 12 * 2 + return b'\0' * 12 * 2 def get_gnu_header(self): '''Part placed in 'prefix' field of posix header''' @@ -81,8 +79,7 @@ class TarSparseInfo(tarfile.TarInfo): header_buf = super(TarSparseInfo, self).tobuf(format, encoding, errors) if len(self.sparsemap) > 4: return header_buf + b''.join(self.create_ext_sparse_headers()) - else: - return header_buf + return header_buf def create_ext_sparse_headers(self): for ext_hdr in range(4, len(self.sparsemap), 21): diff --git a/qubes/tests/__init__.py b/qubes/tests/__init__.py index 7b0b2684..e35a2f88 100644 --- a/qubes/tests/__init__.py +++ b/qubes/tests/__init__.py @@ -909,9 +909,18 @@ def load_tests(loader, tests, pattern): # pylint: disable=unused-argument 'qubes.tests.tools.qvm_device', 'qubes.tests.tools.qvm_firewall', 'qubes.tests.tools.qvm_ls', + 'qubespolicy.tests', ): tests.addTests(loader.loadTestsFromName(modname)) + # GTK/Glib is way too old there + if 'TRAVIS' not in os.environ: + for modname in ( + 'qubespolicy.tests.gtkhelpers', + 'qubespolicy.tests.rpcconfirmation', + ): + tests.addTests(loader.loadTestsFromName(modname)) + tests.addTests(loader.discover( os.path.join(os.path.dirname(__file__), 'tools'))) diff --git a/qubes/tools/__init__.py b/qubes/tools/__init__.py index f17abde3..3fc5e794 100644 --- a/qubes/tools/__init__.py +++ b/qubes/tools/__init__.py @@ -258,7 +258,7 @@ class VolumeAction(QubesAction): pool = app.pools[pool_name] volume = [v for v in pool.volumes if v.vid == vid] assert volume > 1, 'Duplicate vids in pool %s' % pool_name - if len(volume) == 0: + if not volume: parser.error_runtime( 'no volume with id {!r} pool: {!r}'.format(vid, pool_name)) @@ -353,6 +353,7 @@ class QubesArgumentParser(argparse.ArgumentParser): self.set_defaults(verbose=1, quiet=0) def parse_args(self, *args, **kwargs): + # pylint: disable=arguments-differ namespace = super(QubesArgumentParser, self).parse_args(*args, **kwargs) if self._want_app and not self._want_app_no_instance: @@ -433,7 +434,7 @@ class AliasedSubParsersAction(argparse._SubParsersAction): # source https://gist.github.com/sampsyo/471779 # pylint: disable=protected-access,too-few-public-methods class _AliasedPseudoAction(argparse.Action): - # pylint: disable=redefined-builtin + # pylint: disable=redefined-builtin,arguments-differ def __init__(self, name, aliases, help): dest = name if aliases: @@ -442,8 +443,7 @@ class AliasedSubParsersAction(argparse._SubParsersAction): sup.__init__(option_strings=[], dest=dest, help=help) def __call__(self, **kwargs): - super(AliasedSubParsersAction._AliasedPseudoAction, self).__call__( - **kwargs) + pass def add_parser(self, name, **kwargs): if 'aliases' in kwargs: diff --git a/qubes/tools/qubesd.py b/qubes/tools/qubesd.py index a624fcc9..65e5138b 100644 --- a/qubes/tools/qubesd.py +++ b/qubes/tools/qubesd.py @@ -13,17 +13,20 @@ import libvirtaio import qubes import qubes.mgmt +import qubes.mgmtinternal import qubes.utils import qubes.vm.qubesvm QUBESD_SOCK = '/var/run/qubesd.sock' +QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock' class QubesDaemonProtocol(asyncio.Protocol): buffer_size = 65536 header = struct.Struct('Bx') - def __init__(self, *args, app, debug=False, **kwargs): + def __init__(self, handler, *args, app, debug=False, **kwargs): super().__init__(*args, **kwargs) + self.handler = handler self.app = app self.untrusted_buffer = io.BytesIO() self.len_untrusted_buffer = 0 @@ -39,6 +42,7 @@ class QubesDaemonProtocol(asyncio.Protocol): self.untrusted_buffer.close() def data_received(self, untrusted_data): + # pylint: disable=arguments-differ print('data_received(untrusted_data={!r})'.format(untrusted_data)) if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size: self.app.log.warning('request too long') @@ -69,7 +73,7 @@ class QubesDaemonProtocol(asyncio.Protocol): @asyncio.coroutine def respond(self, src, method, dest, arg, *, untrusted_payload): try: - mgmt = qubes.mgmt.QubesMgmt(self.app, src, method, dest, arg) + mgmt = self.handler(self.app, src, method, dest, arg) response = yield from mgmt.execute( untrusted_payload=untrusted_payload) @@ -147,9 +151,10 @@ class QubesDaemonProtocol(asyncio.Protocol): self.transport.write(str(exc).encode('utf-8') + b'\0') -def sighandler(loop, signame, server): +def sighandler(loop, signame, server, server_internal): print('caught {}, exiting'.format(signame)) server.close() + server_internal.close() loop.stop() parser = qubes.tools.QubesArgumentParser(description='Qubes OS daemon') @@ -166,20 +171,35 @@ def main(args=None): pass old_umask = os.umask(0o007) server = loop.run_until_complete(loop.create_unix_server( - functools.partial(QubesDaemonProtocol, app=args.app), QUBESD_SOCK)) + functools.partial(QubesDaemonProtocol, qubes.mgmt.QubesMgmt, + app=args.app), QUBESD_SOCK)) shutil.chown(QUBESD_SOCK, group='qubes') + + try: + os.unlink(QUBESD_INTERNAL_SOCK) + except FileNotFoundError: + pass + server_internal = loop.run_until_complete(loop.create_unix_server( + functools.partial(QubesDaemonProtocol, + qubes.mgmtinternal.QubesInternalMgmt, + app=args.app), QUBESD_INTERNAL_SOCK)) + shutil.chown(QUBESD_INTERNAL_SOCK, group='qubes') + os.umask(old_umask) del old_umask for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), - sighandler, loop, signame, server) + sighandler, loop, signame, server, server_internal) qubes.utils.systemd_notify() try: loop.run_forever() - loop.run_until_complete(server.wait_closed()) + loop.run_until_complete(asyncio.wait([ + server.wait_closed(), + server_internal.wait_closed(), + ])) finally: loop.close() diff --git a/qubes/tools/qvm_backup_restore.py b/qubes/tools/qvm_backup_restore.py index 88dbf1f4..d8cc87f0 100644 --- a/qubes/tools/qvm_backup_restore.py +++ b/qubes/tools/qvm_backup_restore.py @@ -204,7 +204,6 @@ def main(args=None): "and (if encrypted) decrypt the backup: ") encoding = sys.stdin.encoding or locale.getpreferredencoding() - # pylint: disable=redefined-variable-type passphrase = passphrase.decode(encoding) args.app.log.info("Checking backup content...") diff --git a/qubes/tools/qvm_check.py b/qubes/tools/qvm_check.py index 0a911528..756e4695 100644 --- a/qubes/tools/qvm_check.py +++ b/qubes/tools/qvm_check.py @@ -38,6 +38,7 @@ parser.add_argument("--template", action="store_true", dest="template", def print_msg(domains, what_single, what_plural): + # pylint: disable=len-as-condition if len(domains) == 0: print("None of given VM {!s}".format(what_single)) elif len(domains) == 1: diff --git a/qubes/tools/qvm_ls.py b/qubes/tools/qvm_ls.py index 4748b3f0..c477346f 100644 --- a/qubes/tools/qvm_ls.py +++ b/qubes/tools/qvm_ls.py @@ -309,8 +309,7 @@ class StatusColumn(Column): if ret is not None: if getattr(vm, 'hvm', False): return ret.upper() - else: - return ret + return ret @flag(2) @@ -478,8 +477,7 @@ class Table(object): '''Format single table row (all columns for one domain).''' if self.raw_data: return '|'.join(col.format(vm) for col in self.columns) - else: - return ''.join(col.cell(vm) for col in self.columns) + return ''.join(col.cell(vm) for col in self.columns) def write_table(self, stream=sys.stdout): diff --git a/qubes/tools/qvm_pool.py b/qubes/tools/qvm_pool.py index 2c141988..4c028b8a 100644 --- a/qubes/tools/qvm_pool.py +++ b/qubes/tools/qvm_pool.py @@ -84,7 +84,7 @@ def list_pools(app): ''' Prints out all known pools and their drivers ''' result = [('NAME', 'DRIVER')] for pool in app.pools.values(): - if len(pool.volumes) == 0 and issubclass( + if not pool.volumes and issubclass( pool.__class__, qubes.storage.domain.DomainPool): # skip empty DomainPools continue diff --git a/qubes/utils.py b/qubes/utils.py index 39a78943..d15300dc 100644 --- a/qubes/utils.py +++ b/qubes/utils.py @@ -111,22 +111,19 @@ def parse_size(size): def mbytes_to_kmg(size): if size > 1024: return "%d GiB" % (size / 1024) - else: - return "%d MiB" % size + return "%d MiB" % size def kbytes_to_kmg(size): if size > 1024: return mbytes_to_kmg(size / 1024) - else: - return "%d KiB" % size + return "%d KiB" % size def bytes_to_kmg(size): if size > 1024: return kbytes_to_kmg(size / 1024) - else: - return "%d B" % size + return "%d B" % size def size_to_human(size): @@ -137,8 +134,7 @@ def size_to_human(size): return str(round(size / 1024.0, 1)) + ' KiB' elif size < 1024 * 1024 * 1024: return str(round(size / (1024.0 * 1024), 1)) + ' MiB' - else: - return str(round(size / (1024.0 * 1024 * 1024), 1)) + ' GiB' + return str(round(size / (1024.0 * 1024 * 1024), 1)) + ' GiB' def urandom(size): diff --git a/qubes/vm/adminvm.py b/qubes/vm/adminvm.py index a571a3de..cd852ceb 100644 --- a/qubes/vm/adminvm.py +++ b/qubes/vm/adminvm.py @@ -132,7 +132,7 @@ class AdminVM(qubes.vm.qubesvm.QubesVM): .. seealso: :py:meth:`qubes.vm.qubesvm.QubesVM.start` - ''' # pylint: disable=unused-argument + ''' # pylint: disable=unused-argument,arguments-differ raise qubes.exc.QubesVMError(self, 'Cannot start Dom0 fake domain!') def suspend(self): diff --git a/qubes/vm/appvm.py b/qubes/vm/appvm.py index 3733f6ce..d5269742 100644 --- a/qubes/vm/appvm.py +++ b/qubes/vm/appvm.py @@ -37,6 +37,12 @@ class AppVM(qubes.vm.qubesvm.QubesVM): ls_width=31, doc='Template, on which this AppVM is based.') + dispvm_allowed = qubes.property('dispvm_allowed', + type=bool, + default=False, + doc='Should this VM be allowed to start as Disposable VM' + ) + def __init__(self, app, xml, template=None, **kwargs): self.volume_config = { 'root': { diff --git a/qubes/vm/dispvm.py b/qubes/vm/dispvm.py index 234626a3..bd421831 100644 --- a/qubes/vm/dispvm.py +++ b/qubes/vm/dispvm.py @@ -123,7 +123,7 @@ class DispVM(qubes.vm.qubesvm.QubesVM): '''Create a new instance from given AppVM :param qubes.vm.appvm.AppVM appvm: template from which the VM should \ - be created (could also be name or qid) + be created :returns: new disposable vm *kwargs* are passed to the newly created VM @@ -133,12 +133,14 @@ class DispVM(qubes.vm.qubesvm.QubesVM): >>> dispvm.run_service('qubes.VMShell', input='firefox') >>> dispvm.cleanup() - This method modifies :file:`qubes.xml` file. In fact, the newly created - vm belongs to other :py:class:`qubes.Qubes` instance than the *app*. + This method modifies :file:`qubes.xml` file. The qube returned is not started. ''' - store = appvm.app.store if isinstance(appvm, qubes.vm.BaseVM) else None - app = qubes.Qubes(store) + if not appvm.dispvm_allowed: + raise qubes.exc.QubesException( + 'Refusing to start DispVM out of this AppVM, because ' + 'dispvm_allowed=False') + app = appvm.app dispvm = app.add_new_vm( cls, dispid=app.domains.get_new_unused_dispid(), @@ -156,15 +158,12 @@ class DispVM(qubes.vm.qubesvm.QubesVM): '''Clean up after the DispVM This stops the disposable qube and removes it from the store. - This method modifies :file:`qubes.xml` file. ''' - app = qubes.Qubes(self.app.store) - self = app.domains[self.uuid] try: self.force_shutdown() except qubes.exc.QubesVMNotStartedError: pass self.remove_from_disk() - del app.domains[self] - app.save() + del self.app.domains[self] + self.app.save() diff --git a/qubes/vm/mix/net.py b/qubes/vm/mix/net.py index 8597e30f..d2f82370 100644 --- a/qubes/vm/mix/net.py +++ b/qubes/vm/mix/net.py @@ -49,8 +49,7 @@ def _default_ip(self): return None if self.netvm is not None: return self.netvm.get_ip_for_vm(self) # pylint: disable=no-member - else: - return self.get_ip_for_vm(self) + return self.get_ip_for_vm(self) def _setter_ip(self, prop, value): @@ -173,8 +172,7 @@ class NetVMMixin(qubes.events.Emitter): '10.139.1.1', '10.139.1.2', ) - else: - return None + return None def __init__(self, *args, **kwargs): self._firewall = None diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index 803e0755..ca5377aa 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -427,6 +427,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): doc='Kernel used by this domain.') # CORE2: swallowed uses_default_kernelopts + # pylint: disable=no-member kernelopts = qubes.property('kernelopts', type=str, load_stage=4, default=(lambda self: qubes.config.defaults['kernelopts_pcidevs'] if list(self.devices['pci'].attached(persistent=True)) @@ -449,6 +450,8 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): ls_width=12, doc='FIXME') + # pylint: enable=no-member + # @property # def default_user(self): # if self.template is not None: @@ -1432,15 +1435,12 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): else: if not self.is_fully_usable(): return "Transient" - else: - return "Running" - else: - return 'Halted' + return "Running" + return 'Halted' except libvirt.libvirtError as e: if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: return 'Halted' - else: - raise + raise assert False @@ -1614,8 +1614,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): '/vm/{}/start_time'.format(self.uuid)) if start_time != '': return datetime.datetime.fromtimestamp(float(start_time)) - else: - return None + return None def is_outdated(self): '''Check whether domain needs restart to update root image from \ diff --git a/qubespolicy/__init__.py b/qubespolicy/__init__.py new file mode 100755 index 00000000..e092f711 --- /dev/null +++ b/qubespolicy/__init__.py @@ -0,0 +1,646 @@ +# coding=utf-8 +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2013-2015 Joanna Rutkowska +# Copyright (C) 2013-2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +''' Qrexec policy parser and evaluator ''' +import json +import os +import os.path +import socket +import subprocess +import enum +import itertools + +# don't import 'qubes.config' please, it takes 0.3s +QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client' +QUBES_RPC_MULTIPLEXER_PATH = '/usr/lib/qubes/qubes-rpc-multiplexer' +POLICY_DIR = '/etc/qubes-rpc/policy' +QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock' + + +class AccessDenied(Exception): + ''' Raised when qrexec policy denied access ''' + pass + + +class PolicySyntaxError(AccessDenied): + ''' Syntax error in qrexec policy, abort parsing ''' + def __init__(self, filename, lineno, msg): + super(PolicySyntaxError, self).__init__( + '{}:{}: {}'.format(filename, lineno, msg)) + + +class Action(enum.Enum): + ''' Action as defined by policy ''' + allow = 1 + deny = 2 + ask = 3 + + +def verify_target_value(system_info, value): + ''' Check if given value names valid target + + This function check if given value is not only syntactically correct, + but also if names valid service call target (existing domain, + or valid $dispvm like keyword) + + :param system_info: information about the system + :param value: value to be checked + ''' + if value == '$dispvm': + return True + elif value.startswith('$dispvm:'): + dispvm_base = value.split(':', 1)[1] + if dispvm_base not in system_info['domains']: + return False + dispvm_base_info = system_info['domains'][dispvm_base] + return bool(dispvm_base_info['dispvm_allowed']) + else: + return value in system_info['domains'] + + +def verify_special_value(value, for_target=True): + ''' + Verify if given special VM-specifier ('$...') is valid + + :param value: value to verify + :param for_target: should classify target-only values as valid ( + '$default', '$dispvm') + :return: True or False + ''' + # pylint: disable=too-many-return-statements + + if value.startswith('$tag:') and len(value) > len('$tag:'): + return True + elif value.startswith('$type:') and len(value) > len('$type:'): + return True + elif value == '$anyvm': + return True + elif value.startswith('$dispvm:') and for_target: + return True + elif value == '$dispvm' and for_target: + return True + elif value == '$default' and for_target: + return True + return False + + +class PolicyRule(object): + ''' A single line of policy file ''' + def __init__(self, line, filename=None, lineno=None): + ''' + Load a single line of qrexec policy and check its syntax. + Do not verify existence of named objects. + + :raise PolicySyntaxError: when syntax error is found + + :param line: a single line of actual qrexec policy (not a comment, + empty line or $include) + :param filename: name of the file from which this line is loaded + :param lineno: line number from which this line is loaded + ''' + + self.lineno = lineno + self.filename = filename + + try: + self.source, self.target, self.full_action = line.split() + except ValueError: + raise PolicySyntaxError(filename, lineno, 'wrong number of fields') + + (action, *params) = self.full_action.split(',') + try: + self.action = Action[action] + except KeyError: + raise PolicySyntaxError(filename, lineno, + 'invalid action: {}'.format(action)) + + #: alternative target, used instead of the one specified by the caller + self.override_target = None + + #: alternative user, used instead of vm.default_user + self.override_user = None + + #: default target when asking the user for confirmation + self.default_target = None + + for param in params: + try: + param_name, value = param.split('=') + except ValueError: + raise PolicySyntaxError(filename, lineno, + 'invalid action parameter syntax: {}'.format(param)) + if param_name == 'target': + if self.action == Action.deny: + raise PolicySyntaxError(filename, lineno, + 'target= option not allowed for deny action') + self.override_target = value + elif param_name == 'user': + if self.action == Action.deny: + raise PolicySyntaxError(filename, lineno, + 'user= option not allowed for deny action') + self.override_user = value + elif param_name == 'default_target': + if self.action != Action.ask: + raise PolicySyntaxError(filename, lineno, + 'default_target= option allowed only for ask action') + self.default_target = value + else: + raise PolicySyntaxError(filename, lineno, + 'invalid option {} for {} action'.format(param, action)) + + # verify special values + if self.source.startswith('$'): + if not verify_special_value(self.source, False): + raise PolicySyntaxError(filename, lineno, + 'invalid source specification: {}'.format(self.source)) + + if self.target.startswith('$'): + if not verify_special_value(self.target, True): + raise PolicySyntaxError(filename, lineno, + 'invalid target specification: {}'.format(self.target)) + + if self.target == '$default' \ + and self.action == Action.allow \ + and self.override_target is None: + raise PolicySyntaxError(filename, lineno, + 'allow action for $default rule must specify target= option') + + if self.override_target is not None: + if self.override_target.startswith('$') and not \ + self.override_target.startswith('$dispvm'): + raise PolicySyntaxError(filename, lineno, + 'target= option needs to name specific target') + + @staticmethod + def is_match_single(system_info, policy_value, value): + ''' + Evaluate if a single value (VM name or '$default') matches policy + specification + + :param system_info: information about the system + :param policy_value: value from qrexec policy (either self.source or + self.target) + :param value: value to be compared (source or target) + :return: True or False + ''' + # pylint: disable=too-many-return-statements + + # not specified target matches only with $default and $anyvm policy + # entry + if value == '$default' or value == '': + return policy_value in ('$default', '$anyvm') + + # if specific target used, check if it's valid + # this function (is_match_single) is also used for checking call source + # values, but this isn't a problem, because it will always be a + # domain name (not $dispvm or such) - this is guaranteed by a nature + # of qrexec call + if not verify_target_value(system_info, value): + return False + + # allow any _valid_, non-dom0 target + if policy_value == '$anyvm': + return value != 'dom0' + + # exact match, including $dispvm* + if value == policy_value: + return True + + # if $dispvm* not matched above, reject it; missing ':' is + # intentional - handle both '$dispvm' and '$dispvm:xxx' + if value.startswith('$dispvm'): + return False + + # at this point, value name a specific target + domain_info = system_info['domains'][value] + + if policy_value.startswith('$tag:'): + tag = policy_value.split(':', 1)[1] + return tag in domain_info['tags'] + + if policy_value.startswith('$type:'): + type_ = policy_value.split(':', 1)[1] + return type_ == domain_info['type'] + + return False + + def is_match(self, system_info, source, target): + ''' + Check if given (source, target) matches this policy line. + + :param system_info: information about the system - available VMs, + their types, labels, tags etc. as returned by + :py:func:`app_to_system_info` + :param source: name of the source VM + :param target: name of the target VM, or None if not specified + :return: True or False + ''' + + if not self.is_match_single(system_info, self.source, source): + return False + if not self.is_match_single(system_info, self.target, target): + return False + return True + + def expand_target(self, system_info): + ''' + Return domains matching target of this policy line + + :param system_info: information about the system + :return: matching domains + ''' + + if self.target.startswith('$tag:'): + tag = self.target.split(':', 1)[1] + for name, domain in system_info['domains'].items(): + if tag in domain['tags']: + yield name + elif self.target.startswith('$type:'): + type_ = self.target.split(':', 1)[1] + for name, domain in system_info['domains'].items(): + if type_ == domain['type']: + yield name + elif self.target == '$anyvm': + for name, domain in system_info['domains'].items(): + if name != 'dom0': + yield name + if domain['dispvm_allowed']: + yield '$dispvm:' + name + yield '$dispvm' + elif self.target.startswith('$dispvm:'): + dispvm_base = self.target.split(':', 1)[1] + try: + if system_info['domains'][dispvm_base]['dispvm_allowed']: + yield self.target + except KeyError: + # TODO log a warning? + pass + elif self.target == '$dispvm': + yield self.target + else: + if self.target in system_info['domains']: + yield self.target + + def expand_override_target(self, system_info, source): + ''' + Replace '$dispvm' with specific '$dispvm:...' value, based on qrexec + call source. + + :param system_info: System information + :param source: Source domain name + :return: :py:attr:`override_target` with '$dispvm' substituted + ''' + if self.override_target == '$dispvm': + if system_info['domains'][source]['default_dispvm'] is None: + return None + return '$dispvm:' + system_info['domains'][source]['default_dispvm'] + else: + return self.override_target + + +class PolicyAction(object): + ''' Object representing positive policy evaluation result - + either ask or allow action ''' + def __init__(self, service, source, target, rule, original_target, + targets_for_ask=None): + #: service name + self.service = service + #: calling domain + self.source = source + #: target domain the service should be connected to, None if + # not chosen yet + if targets_for_ask is None or target in targets_for_ask: + self.target = target + else: + # TODO: log a warning? + self.target = None + #: original target specified by the caller + self.original_target = original_target + #: targets for the user to choose from + self.targets_for_ask = targets_for_ask + #: policy rule from which this action is derived + self.rule = rule + if rule.action == Action.deny: + # this should be really rejected by Policy.eval() + raise AccessDenied( + 'denied by policy {}:{}'.format(rule.filename, rule.lineno)) + elif rule.action == Action.ask: + assert targets_for_ask is not None + elif rule.action == Action.allow: + assert targets_for_ask is None + assert target is not None + self.action = rule.action + + def handle_user_response(self, response, target=None): + ''' + Handle user response for the 'ask' action + + :param response: whether the call was allowed or denied (bool) + :param target: target chosen by the user (if reponse==True) + :return: None + ''' + assert self.action == Action.ask + assert self.target is None + if response: + assert target in self.targets_for_ask + self.target = target + self.action = Action.allow + else: + self.action = Action.deny + raise AccessDenied( + 'denied by the user {}:{}'.format(self.rule.filename, + self.rule.lineno)) + + def execute(self, caller_ident): + ''' Execute allowed service call + + :param caller_ident: Service caller ident (`process_ident,source_name, + source_id`) + ''' + assert self.action == Action.allow + assert self.target is not None + + if self.target == 'dom0': + cmd = '{multiplexer} {service} {source} {original_target}'.format( + multiplexer=QUBES_RPC_MULTIPLEXER_PATH, + service=self.service, + source=self.source, + original_target=self.original_target) + else: + cmd = '{user}:QUBESRPC {service} {source}'.format( + user=(self.rule.override_user or 'DEFAULT'), + service=self.service, + source=self.source) + if self.target.startswith('$dispvm:'): + target = self.spawn_dispvm() + dispvm = True + else: + target = self.target + dispvm = False + self.ensure_target_running() + qrexec_opts = ['-d', target, '-c', caller_ident] + if dispvm: + qrexec_opts.append('-W') + try: + subprocess.call([QREXEC_CLIENT] + qrexec_opts + [cmd]) + finally: + if dispvm: + self.cleanup_dispvm(target) + + + def spawn_dispvm(self): + ''' + Create and start Disposable VM based on AppVM specified in + :py:attr:`target` + :return: name of new Disposable VM + ''' + base_appvm = self.target.split(':', 1)[1] + dispvm_name = qubesd_call(base_appvm, 'mgmtinternal.vm.Create.DispVM') + dispvm_name = dispvm_name.decode('ascii') + qubesd_call(dispvm_name, 'mgmtinternal.vm.Start') + return dispvm_name + + def ensure_target_running(self): + ''' + Start domain if not running already + + :return: None + ''' + try: + qubesd_call(self.target, 'mgmtinternal.vm.Start') + except QubesMgmtException as e: + if e.exc_type == 'QubesVMNotHaltedError': + pass + else: + raise + + @staticmethod + def cleanup_dispvm(dispvm): + ''' + Kill and remove Disposable VM + + :param dispvm: name of Disposable VM + :return: None + ''' + qubesd_call(dispvm, 'mgmtinternal.vm.CleanupDispVM') + + +class Policy(object): + ''' Full policy for a given service + + Usage: + >>> system_info = get_system_info() + >>> policy = Policy('some-service') + >>> action = policy.evaluate(system_info, 'source-name', 'target-name') + >>> if action.action == Action.ask: + (... ask the user, see action.targets_for_ask ...) + >>> action.handle_user_response(response, target_chosen_by_user) + >>> action.execute('process-ident') + + ''' + + def __init__(self, service): + policy_file = os.path.join(POLICY_DIR, service) + if not os.path.exists(policy_file): + # fallback to policy without specific argument set (if any) + policy_file = os.path.join(POLICY_DIR, service.split('+')[0]) + + #: service name + self.service = service + + #: list of PolicyLine objects + self.policy_rules = [] + try: + self.load_policy_file(policy_file) + except OSError as e: + raise AccessDenied( + 'failed to load {} file: {!s}'.format(e.filename, e)) + + def load_policy_file(self, path): + ''' Load policy file and append rules to :py:attr:`policy_rules` + + :param path: file to load + ''' + with open(path) as policy_file: + for lineno, line in zip(itertools.count(start=1), + policy_file.readlines()): + line = line.strip() + if not line: + # skip empty lines + continue + if line[0] == '#': + # skip comments + continue + if line.startswith('$include:'): + include_path = line.split(':', 1)[1] + # os.path.join will leave include_path unchanged if it's + # already absolute + include_path = os.path.join(POLICY_DIR, include_path) + self.load_policy_file(include_path) + else: + self.policy_rules.append(PolicyRule(line, path, lineno)) + + def find_matching_rule(self, system_info, source, target): + ''' Find the first rule matching given arguments ''' + + for rule in self.policy_rules: + if rule.is_match(system_info, source, target): + return rule + raise AccessDenied('no matching rule found') + + + def collect_targets_for_ask(self, system_info, source): + ''' Collect targets the user can choose from in 'ask' action + + Word 'targets' is used intentionally instead of 'domains', because it + can also contains $dispvm like keywords. + ''' + targets = set() + + # iterate over rules in reversed order to easier handle 'deny' + # actions - simply remove matching domains from allowed set + for rule in reversed(self.policy_rules): + if rule.is_match_single(system_info, rule.source, source): + if rule.action == Action.deny: + targets -= set(rule.expand_target(system_info)) + else: + if rule.override_target is not None: + override_target = rule.expand_override_target( + system_info, source) + if verify_target_value(system_info, override_target): + targets.add(rule.override_target) + else: + targets.update(rule.expand_target(system_info)) + + # expand default DispVM + if '$dispvm' in targets: + targets.remove('$dispvm') + if system_info['domains'][source]['default_dispvm'] is not None: + dispvm = '$dispvm:' + \ + system_info['domains'][source]['default_dispvm'] + if verify_target_value(system_info, dispvm): + targets.add(dispvm) + + return targets + + def evaluate(self, system_info, source, target): + ''' Evaluate policy + + :raise AccessDenied: when action should be denied unconditionally + + :return tuple(rule, considered_targets) - where considered targets is a + list of possible targets for 'ask' action (rule.action == Action.ask) + ''' + rule = self.find_matching_rule(system_info, source, target) + if rule.action == Action.deny: + raise AccessDenied( + 'denied by policy {}:{}'.format(rule.filename, rule.lineno)) + + if rule.override_target is not None: + override_target = rule.expand_override_target(system_info, source) + if not verify_target_value(system_info, override_target): + raise AccessDenied('invalid target= value in {}:{}'.format( + rule.filename, rule.lineno)) + actual_target = override_target + else: + actual_target = target + + if rule.action == Action.ask: + if rule.override_target is not None: + targets = [actual_target] + else: + targets = list( + self.collect_targets_for_ask(system_info, source)) + if not targets: + raise AccessDenied( + 'policy define \'ask\' action at {}:{} but no target is ' + 'available to choose from'.format( + rule.filename, rule.lineno)) + return PolicyAction(self.service, source, rule.default_target, + rule, target, targets) + elif rule.action == Action.allow: + if actual_target == '$default': + raise AccessDenied( + 'policy define \'allow\' action at {}:{} but no target is ' + 'specified by caller or policy'.format( + rule.filename, rule.lineno)) + return PolicyAction(self.service, source, + actual_target, rule, target) + else: + # should be unreachable + raise AccessDenied( + 'invalid action?! {}:{}'.format(rule.filename, rule.lineno)) + + +class QubesMgmtException(Exception): + ''' Exception returned by qubesd ''' + def __init__(self, exc_type): + super(QubesMgmtException, self).__init__() + self.exc_type = exc_type + + +def qubesd_call(dest, method, arg=None, payload=None): + try: + client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client_socket.connect(QUBESD_INTERNAL_SOCK) + except IOError: + # TODO: + raise + + # src, method, dest, arg + for call_arg in ('dom0', method, dest, arg): + if call_arg is not None: + client_socket.sendall(call_arg.encode('ascii')) + client_socket.sendall(b'\0') + if payload is not None: + client_socket.sendall(payload) + + client_socket.shutdown(socket.SHUT_WR) + + return_data = client_socket.makefile('rb').read() + if return_data.startswith(b'0\x00'): + return return_data[2:] + elif return_data.startswith(b'2\x00'): + (_, exc_type, _traceback, _format_string, _args) = \ + return_data.split(b'\x00', 4) + raise QubesMgmtException(exc_type.decode('ascii')) + else: + raise AssertionError( + 'invalid qubesd response: {!r}'.format(return_data)) + + +def get_system_info(): + ''' Get system information + + This retrieve information necessary to process qrexec policy. Returned + data is nested dict structure with this structure: + + - domains: + - : + - tags: list of tags + - type: domain type + - dispvm_allowed: should DispVM based on this VM be allowed + - default_dispvm: name of default AppVM for DispVMs started from here + + ''' + + system_info = qubesd_call('dom0', 'mgmtinternal.GetSystemInfo') + return json.loads(system_info.decode('utf-8')) diff --git a/qubespolicy/agent.py b/qubespolicy/agent.py new file mode 100644 index 00000000..e06c4681 --- /dev/null +++ b/qubespolicy/agent.py @@ -0,0 +1,76 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + + +''' Agent running in user session, responsible for asking the user about policy +decisions.''' + +import pydbus +# pylint: disable=import-error,wrong-import-position +import gi +gi.require_version('Gtk', '3.0') +from gi.repository import GLib +# pylint: enable=import-error + +import qubespolicy.rpcconfirmation +# pylint: enable=wrong-import-position + +class PolicyAgent(object): + # pylint: disable=too-few-public-methods + dbus = """ + + + + + + + + + + + + + """ + + @staticmethod + def Ask(source, service_name, targets, default_target, + icons): + # pylint: disable=invalid-name + entries_info = {} + for target in targets: + entries_info[target] = {} + entries_info[target]['icon'] = icons.get(target, None) + + response = qubespolicy.rpcconfirmation.confirm_rpc( + entries_info, source, service_name, + targets, default_target or None) + return response or '' + + +def main(): + loop = GLib.MainLoop() + bus = pydbus.SystemBus() + obj = PolicyAgent() + bus.publish('org.qubesos.PolicyAgent', obj) + loop.run() + + +if __name__ == '__main__': + main() diff --git a/qubespolicy/cli.py b/qubespolicy/cli.py new file mode 100644 index 00000000..4a3c9ef4 --- /dev/null +++ b/qubespolicy/cli.py @@ -0,0 +1,104 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . +import argparse +import logging +import logging.handlers + +import sys + +import qubespolicy + +parser = argparse.ArgumentParser(description="Evaluate qrexec policy") + +parser.add_argument("--assume-yes-for-ask", action="store_true", + dest="assume_yes_for_ask", default=False, + help="Allow run of service without confirmation if policy say 'ask'") +parser.add_argument("--just-evaluate", action="store_true", + dest="just_evaluate", default=False, + help="Do not run the service, only evaluate policy; " + "retcode=0 means 'allow'") +parser.add_argument('domain_id', metavar='src-domain-id', + help='Source domain ID (Xen ID or similar, not Qubes ID)') +parser.add_argument('domain', metavar='src-domain-name', + help='Source domain name') +parser.add_argument('target', metavar='dst-domain-name', + help='Target domain name') +parser.add_argument('service_name', metavar='service-name', + help='Service name') +parser.add_argument('process_ident', metavar='process-ident', + help='Qrexec process identifier - for connecting data channel') + + +def main(args=None): + args = parser.parse_args(args) + + # Add source domain information, required by qrexec-client for establishing + # connection + caller_ident = args.process_ident + "," + args.domain + "," + args.domain_id + log = logging.getLogger('qubespolicy') + log.setLevel(logging.INFO) + handler = logging.handlers.SysLogHandler(address='/dev/log') + log.addHandler(handler) + log_prefix = 'qrexec: {}: {} -> {}: '.format( + args.service_name, args.domain, args.target) + try: + system_info = qubespolicy.get_system_info() + except qubespolicy.QubesMgmtException as e: + log.error(log_prefix + 'error getting system info: ' + str(e)) + return 1 + try: + policy = qubespolicy.Policy(args.service_name) + action = policy.evaluate(system_info, args.domain, args.target) + if action.action == qubespolicy.Action.ask: + # late import to save on time for allow/deny actions + import pydbus + bus = pydbus.SystemBus() + proxy = bus.get('org.qubesos.PolicyAgent', + '/org/qubesos/PolicyAgent') + + icons = {name: system_info['domains'][name]['icon'] + for name in system_info['domains'].keys()} + for dispvm_base in system_info['domains']: + if not system_info['domains'][dispvm_base]['dispvm_allowed']: + continue + dispvm_api_name = '$dispvm:' + dispvm_base + icons[dispvm_api_name] = \ + system_info['domains'][dispvm_base]['icon'] + icons[dispvm_api_name] = \ + icons[dispvm_api_name].replace('app', 'disp') + + response = proxy.Ask(args.domain, args.service_name, + action.targets_for_ask, action.target or '', icons) + if response: + action.handle_user_response(True, response) + else: + action.handle_user_response(False) + log.info(log_prefix + 'allowed') + action.execute(caller_ident) + except qubespolicy.PolicySyntaxError as e: + log.error(log_prefix + 'error loading policy: ' + str(e)) + return 1 + except qubespolicy.AccessDenied as e: + log.info(log_prefix + 'denied: ' + str(e)) + return 1 + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/qubespolicy/glade/RPCConfirmationWindow.glade b/qubespolicy/glade/RPCConfirmationWindow.glade new file mode 100644 index 00000000..0c0ccba5 --- /dev/null +++ b/qubespolicy/glade/RPCConfirmationWindow.glade @@ -0,0 +1,359 @@ + + + + + + 400 + False + Operation execution + center + dialog-question + dialog + True + center + + + True + False + vertical + + + True + False + True + error + True + + + False + 6 + end + + + + + + False + False + 0 + + + + + False + 16 + + + True + False + gtk-dialog-error + + + False + True + 0 + + + + + True + False + ErrorMessage + + + False + True + 1 + + + + + False + False + 0 + + + + + False + True + 0 + + + + + 100 + 80 + True + False + 12 + 12 + 12 + 12 + True + vertical + 6 + + + True + False + 6 + end + + + gtk-cancel + True + True + True + True + + + True + True + 0 + + + + + gtk-ok + True + False + True + True + True + True + + + True + True + 1 + + + + + False + True + end + 1 + + + + + True + False + vertical + 6 + + + True + False + 6 + 12 + + + True + False + gtk-dialog-question + 6 + + + False + True + 0 + + + + + True + False + start + Do you want to allow the following operation? +<small>Select the target domain and confirm with 'OK'</small> + True + + + True + True + 1 + + + + + False + True + 0 + + + + + True + False + 12 + 6 + 12 + 6 + + + True + False + 1 + Target: + + + 0 + 2 + + + + + True + False + True + True + + + True + 5 + False + Start typing or use the arrow + GTK_INPUT_HINT_WORD_COMPLETION | GTK_INPUT_HINT_NONE + + + + + 1 + 2 + + + + + True + False + 1 + Source: + + + 0 + 0 + + + + + True + False + False + source + False + + + 1 + 0 + + + + + True + False + 1 + Operation: + + + 0 + 1 + + + + + True + False + 0 + qubes.<b>MyOperation</b> + True + + + 1 + 1 + + + + + False + True + 1 + + + + + True + True + + + True + False + 6 + vertical + 6 + + + Display templates in the target list + True + True + False + 0 + True + + + False + True + 0 + + + + + Choose a custom destination in the target + True + True + False + 0 + True + + + False + True + 1 + + + + + + + False + Advanced options + True + + + + + False + True + 2 + + + + + True + True + 2 + + + + + False + True + 1 + + + + + + diff --git a/qubespolicy/gtkhelpers.py b/qubespolicy/gtkhelpers.py new file mode 100644 index 00000000..1c546ba2 --- /dev/null +++ b/qubespolicy/gtkhelpers.py @@ -0,0 +1,273 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import itertools +# pylint: disable=import-error,wrong-import-position +import gi +gi.require_version('Gtk', '3.0') +from gi.repository import Gtk, Gdk, GdkPixbuf, GObject, GLib +# pylint: enable=import-error + +from qubespolicy.utils import sanitize_domain_name +# pylint: enable=wrong-import-position + +class VMListModeler: + def __init__(self, domains_info=None): + self._entries = {} + self._domains_info = domains_info + self._icons = {} + self._icon_size = 16 + self._theme = Gtk.IconTheme.get_default() + self._create_entries() + + def _get_icon(self, name): + if name not in self._icons: + try: + icon = self._theme.load_icon(name, self._icon_size, 0) + except GLib.Error: # pylint: disable=catching-non-exception + icon = self._theme.load_icon("edit-find", self._icon_size, 0) + + self._icons[name] = icon + + return self._icons[name] + + def _create_entries(self): + for name, vm in self._domains_info.items(): + if name.startswith('$dispvm:'): + vm_name = name[len('$dispvm:'):] + dispvm = True + else: + vm_name = name + dispvm = False + sanitize_domain_name(vm_name, assert_sanitized=True) + + icon = self._get_icon(vm.get('icon', None)) + + if dispvm: + display_name = 'Disposable VM ({})'.format(vm_name) + else: + display_name = vm_name + self._entries[display_name] = { + 'api_name': name, + 'icon': icon, + 'vm': vm} + + def _get_valid_qube_name(self, combo, entry_box, whitelist): + name = None + + if combo and combo.get_active_id(): + selected = combo.get_active_id() + + if selected in self._entries and \ + self._entries[selected]['api_name'] in whitelist: + name = selected + + if not name and entry_box: + typed = entry_box.get_text() + + if typed in self._entries and \ + self._entries[typed]['api_name'] in whitelist: + name = typed + + return name + + def _combo_change(self, selection_trigger, combo, entry_box, whitelist): + data = None + name = self._get_valid_qube_name(combo, entry_box, whitelist) + + if name: + entry = self._entries[name] + + data = entry['api_name'] + + if entry_box: + entry_box.set_icon_from_pixbuf( + Gtk.EntryIconPosition.PRIMARY, entry['icon']) + else: + if entry_box: + entry_box.set_icon_from_stock( + Gtk.EntryIconPosition.PRIMARY, "gtk-find") + + if selection_trigger: + selection_trigger(data) + + def _entry_activate(self, activation_trigger, combo, entry_box, whitelist): + name = self._get_valid_qube_name(combo, entry_box, whitelist) + + if name: + activation_trigger(entry_box) + + def apply_model(self, destination_object, vm_list, + selection_trigger=None, activation_trigger=None): + if isinstance(destination_object, Gtk.ComboBox): + list_store = Gtk.ListStore(int, str, GdkPixbuf.Pixbuf) + + for entry_no, display_name in zip(itertools.count(), + sorted(self._entries)): + entry = self._entries[display_name] + if entry['api_name'] in vm_list: + list_store.append([entry_no, display_name, entry['icon']]) + + destination_object.set_model(list_store) + destination_object.set_id_column(1) + + icon_column = Gtk.CellRendererPixbuf() + destination_object.pack_start(icon_column, False) + destination_object.add_attribute(icon_column, "pixbuf", 2) + destination_object.set_entry_text_column(1) + + if destination_object.get_has_entry(): + entry_box = destination_object.get_child() + + area = Gtk.CellAreaBox() + area.pack_start(icon_column, False, False, False) + area.add_attribute(icon_column, "pixbuf", 2) + + completion = Gtk.EntryCompletion.new_with_area(area) + completion.set_inline_selection(True) + completion.set_inline_completion(True) + completion.set_popup_completion(True) + completion.set_popup_single_match(False) + completion.set_model(list_store) + completion.set_text_column(1) + + entry_box.set_completion(completion) + if activation_trigger: + entry_box.connect("activate", + lambda entry: self._entry_activate( + activation_trigger, + destination_object, + entry, + vm_list)) + + # A Combo with an entry has a text column already + text_column = destination_object.get_cells()[0] + destination_object.reorder(text_column, 1) + else: + entry_box = None + + text_column = Gtk.CellRendererText() + destination_object.pack_start(text_column, False) + destination_object.add_attribute(text_column, "text", 1) + + changed_function = lambda combo: self._combo_change( + selection_trigger, + combo, + entry_box, + vm_list) + + destination_object.connect("changed", changed_function) + changed_function(destination_object) + + else: + raise TypeError( + "Only expecting Gtk.ComboBox objects to want our model.") + + def apply_icon(self, entry, qube_name): + if isinstance(entry, Gtk.Entry): + if qube_name in self._entries: + entry.set_icon_from_pixbuf( + Gtk.EntryIconPosition.PRIMARY, + self._entries[qube_name]['icon']) + else: + raise ValueError("The specified source qube does not exist!") + else: + raise TypeError( + "Only expecting Gtk.Entry objects to want our icon.") + + +class GtkOneTimerHelper: + # pylint: disable=too-few-public-methods + def __init__(self, wait_seconds): + self._wait_seconds = wait_seconds + self._current_timer_id = 0 + self._timer_completed = False + + def _invalidate_timer_completed(self): + self._timer_completed = False + + def _invalidate_current_timer(self): + self._current_timer_id += 1 + + def _timer_check_run(self, timer_id): + if self._current_timer_id == timer_id: + self._timer_run(timer_id) + self._timer_completed = True + else: + pass + + def _timer_run(self, timer_id): + raise NotImplementedError("Not yet implemented") + + def _timer_schedule(self): + self._invalidate_current_timer() + GObject.timeout_add(int(round(self._wait_seconds * 1000)), + self._timer_check_run, + self._current_timer_id) + + def _timer_has_completed(self): + return self._timer_completed + + +class FocusStealingHelper(GtkOneTimerHelper): + def __init__(self, window, target_button, wait_seconds=1): + GtkOneTimerHelper.__init__(self, wait_seconds) + self._window = window + self._target_button = target_button + + self._window.connect("window-state-event", self._window_state_event) + + self._target_sensitivity = False + self._target_button.set_sensitive(self._target_sensitivity) + + def _window_changed_focus(self, window_is_focused): + self._target_button.set_sensitive(False) + self._invalidate_timer_completed() + + if window_is_focused: + self._timer_schedule() + else: + self._invalidate_current_timer() + + def _window_state_event(self, window, event): + assert window == self._window, \ + 'Window state callback called with wrong window' + + changed_focus = event.changed_mask & Gdk.WindowState.FOCUSED + window_focus = event.new_window_state & Gdk.WindowState.FOCUSED + + if changed_focus: + self._window_changed_focus(window_focus != 0) + + # Propagate event further + return False + + def _timer_run(self, timer_id): + self._target_button.set_sensitive(self._target_sensitivity) + + def request_sensitivity(self, sensitivity): + if self._timer_has_completed() or not sensitivity: + self._target_button.set_sensitive(sensitivity) + + self._target_sensitivity = sensitivity + + def can_perform_action(self): + return self._timer_has_completed() diff --git a/qubespolicy/rpcconfirmation.py b/qubespolicy/rpcconfirmation.py new file mode 100644 index 00000000..ee14204c --- /dev/null +++ b/qubespolicy/rpcconfirmation.py @@ -0,0 +1,213 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +import os +from gi.repository import Gtk, Gdk, GLib # pylint: disable=import-error +import pkg_resources + +from qubespolicy.gtkhelpers import VMListModeler, FocusStealingHelper +from qubespolicy.utils import sanitize_domain_name, \ + sanitize_service_name + + +class RPCConfirmationWindow: + # pylint: disable=too-few-public-methods + _source_file = pkg_resources.resource_filename('qubespolicy', + os.path.join('glade', "RPCConfirmationWindow.glade")) + _source_id = {'window': "RPCConfirmationWindow", + 'ok': "okButton", + 'cancel': "cancelButton", + 'source': "sourceEntry", + 'rpc_label': "rpcLabel", + 'target': "TargetCombo", + 'error_bar': "ErrorBar", + 'error_message': "ErrorMessage", + } + + def _clicked_ok(self, source): + assert source is not None, \ + 'Called the clicked ok callback from no source object' + + if self._can_perform_action(): + self._confirmed = True + self._close() + + def _clicked_cancel(self, button): + assert button == self._rpc_cancel_button, \ + 'Called the clicked cancel callback through the wrong button' + + if self._can_perform_action(): + self._confirmed = False + self._close() + + def _key_pressed(self, window, key): + assert window == self._rpc_window, \ + 'Key pressed callback called with wrong window' + + if self._can_perform_action(): + if key.keyval == Gdk.KEY_Escape: + self._confirmed = False + self._close() + + def _update_ok_button_sensitivity(self, data): + valid = (data is not None) + + if valid: + self._target_name = data + else: + self._target_name = None + + self._focus_helper.request_sensitivity(valid) + + def _show_error(self, error_message): + self._error_message.set_text(error_message) + self._error_bar.set_visible(True) + + def _close_error(self, error_bar, response): + assert error_bar == self._error_bar, \ + 'Closed the error bar with the wrong error bar as parameter' + assert response is not None, \ + 'Closed the error bar with None as a response' + + self._error_bar.set_visible(False) + + def _set_initial_target(self, source, target): + if target is not None: + if target == source: + self._show_error( + "Source and target domains must not be the same.") + else: + model = self._rpc_combo_box.get_model() + + found = False + for item in model: + if item[1] == target: + found = True + + self._rpc_combo_box.set_active_iter( + model.get_iter(item.path)) + + break + + if not found: + self._show_error("Domain '%s' doesn't exist." % target) + + def _can_perform_action(self): + return self._focus_helper.can_perform_action() + + @staticmethod + def _escape_and_format_rpc_text(rpc_operation): + escaped = GLib.markup_escape_text(rpc_operation) + + partitioned = escaped.partition('.') + formatted = partitioned[0] + partitioned[1] + + if partitioned[2]: + formatted += "" + partitioned[2] + "" + else: + formatted = "" + formatted + "" + + return formatted + + def _connect_events(self): + self._rpc_window.connect("key-press-event", self._key_pressed) + self._rpc_ok_button.connect("clicked", self._clicked_ok) + self._rpc_cancel_button.connect("clicked", self._clicked_cancel) + + self._error_bar.connect("response", self._close_error) + + def __init__(self, entries_info, source, rpc_operation, targets_list, + target=None): + sanitize_domain_name(source, assert_sanitized=True) + sanitize_service_name(source, assert_sanitized=True) + + self._gtk_builder = Gtk.Builder() + self._gtk_builder.add_from_file(self._source_file) + self._rpc_window = self._gtk_builder.get_object( + self._source_id['window']) + self._rpc_ok_button = self._gtk_builder.get_object( + self._source_id['ok']) + self._rpc_cancel_button = self._gtk_builder.get_object( + self._source_id['cancel']) + self._rpc_label = self._gtk_builder.get_object( + self._source_id['rpc_label']) + self._source_entry = self._gtk_builder.get_object( + self._source_id['source']) + self._rpc_combo_box = self._gtk_builder.get_object( + self._source_id['target']) + self._error_bar = self._gtk_builder.get_object( + self._source_id['error_bar']) + self._error_message = self._gtk_builder.get_object( + self._source_id['error_message']) + self._target_name = None + + self._focus_helper = self._new_focus_stealing_helper() + + self._rpc_label.set_markup( + self._escape_and_format_rpc_text(rpc_operation)) + + self._entries_info = entries_info + list_modeler = self._new_vm_list_modeler() + + list_modeler.apply_model(self._rpc_combo_box, targets_list, + selection_trigger=self._update_ok_button_sensitivity, + activation_trigger=self._clicked_ok) + + self._source_entry.set_text(source) + list_modeler.apply_icon(self._source_entry, source) + + self._confirmed = None + + self._set_initial_target(source, target) + + self._connect_events() + + def _close(self): + self._rpc_window.close() + + def _show(self): + self._rpc_window.set_keep_above(True) + self._rpc_window.connect("delete-event", Gtk.main_quit) + self._rpc_window.show_all() + + Gtk.main() + + def _new_vm_list_modeler(self): + return VMListModeler(self._entries_info) + + def _new_focus_stealing_helper(self): + return FocusStealingHelper( + self._rpc_window, + self._rpc_ok_button, + 1) + + def confirm_rpc(self): + self._show() + + if self._confirmed: + return self._target_name + return False + + +def confirm_rpc(entries_info, source, rpc_operation, targets_list, target=None): + window = RPCConfirmationWindow(entries_info, source, rpc_operation, + targets_list, target) + + return window.confirm_rpc() diff --git a/qubespolicy/tests/__init__.py b/qubespolicy/tests/__init__.py new file mode 100644 index 00000000..9cca2726 --- /dev/null +++ b/qubespolicy/tests/__init__.py @@ -0,0 +1,765 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . +import os +import socket +import unittest.mock + +import shutil + +import qubes.tests +import qubespolicy + +tmp_policy_dir = '/tmp/policy' + +system_info = { + 'domains': { + 'dom0': { + 'tags': [], + 'type': 'AdminVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-vm1': { + 'tags': ['tag1', 'tag2'], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-vm2': { + 'tags': ['tag2'], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-vm3': { + 'tags': [], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': True, + }, + 'default-dvm': { + 'tags': [], + 'type': 'AppVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': True, + }, + 'test-invalid-dvm': { + 'tags': ['tag1', 'tag2'], + 'type': 'AppVM', + 'default_dispvm': 'test-vm1', + 'dispvm_allowed': False, + }, + 'test-no-dvm': { + 'tags': ['tag1', 'tag2'], + 'type': 'AppVM', + 'default_dispvm': None, + 'dispvm_allowed': False, + }, + 'test-template': { + 'tags': ['tag1', 'tag2'], + 'type': 'TemplateVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + 'test-standalone': { + 'tags': ['tag1', 'tag2'], + 'type': 'StandaloneVM', + 'default_dispvm': 'default-dvm', + 'dispvm_allowed': False, + }, + } +} + + +class TC_00_PolicyRule(qubes.tests.QubesTestCase): + def test_000_verify_target_value(self): + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'test-vm1')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'default-dvm')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, '$dispvm')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, '$dispvm:default-dvm')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'test-template')) + self.assertTrue( + qubespolicy.verify_target_value(system_info, 'test-standalone')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, 'no-such-vm')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, + '$dispvm:test-invalid-dvm')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$dispvm:test-vm1')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$default')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$anyvm')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$tag:tag1')) + self.assertFalse( + qubespolicy.verify_target_value(system_info, '$invalid')) + + def test_010_verify_special_value(self): + self.assertTrue(qubespolicy.verify_special_value('$tag:tag', + for_target=False)) + self.assertTrue(qubespolicy.verify_special_value('$tag:other-tag', + for_target=False)) + self.assertTrue(qubespolicy.verify_special_value('$type:AppVM', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$default', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$dispvm', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$dispvm:some-vm', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$invalid', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('vm-name', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$tag:', + for_target=False)) + self.assertFalse(qubespolicy.verify_special_value('$type:', + for_target=False)) + + def test_020_line_simple(self): + line = qubespolicy.PolicyRule('$anyvm $anyvm ask', 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.ask) + self.assertEqual(line.source, '$anyvm') + self.assertEqual(line.target, '$anyvm') + self.assertEqual(line.full_action, 'ask') + self.assertIsNone(line.override_target) + self.assertIsNone(line.override_user) + self.assertIsNone(line.default_target) + + def test_021_line_simple(self): + line = qubespolicy.PolicyRule( + '$tag:tag1 $type:AppVM ask,target=test-vm2,user=user', + 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.ask) + self.assertEqual(line.source, '$tag:tag1') + self.assertEqual(line.target, '$type:AppVM') + self.assertEqual(line.full_action, 'ask,target=test-vm2,user=user') + self.assertEqual(line.override_target, 'test-vm2') + self.assertEqual(line.override_user, 'user') + self.assertIsNone(line.default_target) + + def test_022_line_simple(self): + line = qubespolicy.PolicyRule( + '$anyvm $default allow,target=$dispvm:test-vm2', + 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.allow) + self.assertEqual(line.source, '$anyvm') + self.assertEqual(line.target, '$default') + self.assertEqual(line.full_action, 'allow,target=$dispvm:test-vm2') + self.assertEqual(line.override_target, '$dispvm:test-vm2') + self.assertIsNone(line.override_user) + self.assertIsNone(line.default_target) + + def test_023_line_simple(self): + line = qubespolicy.PolicyRule( + '$anyvm $default ask,default_target=test-vm1', + 'filename', 12) + self.assertEqual(line.filename, 'filename') + self.assertEqual(line.lineno, 12) + self.assertEqual(line.action, qubespolicy.Action.ask) + self.assertEqual(line.source, '$anyvm') + self.assertEqual(line.target, '$default') + self.assertEqual(line.full_action, 'ask,default_target=test-vm1') + self.assertIsNone(line.override_target) + self.assertIsNone(line.override_user) + self.assertEqual(line.default_target, 'test-vm1') + + def test_030_line_invalid(self): + invalid_lines = [ + '$dispvm $default allow', # $dispvm can't be a source + '$default $default allow', # $default can't be a source + '$anyvm $default deny,target=test-vm1', # target= used with deny + '$anyvm $anyvm deny,default_target=test-vm1', # default_target= + # with deny + '$anyvm $anyvm deny,user=user', # user= with deny + '$anyvm $anyvm invalid', # invalid action + '$anyvm $anyvm allow,invalid=xx', # invalid option + '$anyvm $anyvm', # missing action + '$anyvm $anyvm allow,default_target=test-vm1', # default_target= + # with allow + '$invalid $anyvm allow', # invalid source + '$anyvm $invalid deny', # invalid target + '', # empty line + '$anyvm $anyvm allow extra', # trailing words + '$anyvm $default allow', # $default allow without target= + ] + for line in invalid_lines: + with self.subTest(line): + with self.assertRaises(qubespolicy.PolicySyntaxError): + qubespolicy.PolicyRule(line, 'filename', 12) + + def test_040_match_single(self): + is_match_single = qubespolicy.PolicyRule.is_match_single + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$anyvm', '$default')) + self.assertTrue(is_match_single(system_info, '$anyvm', '')) + self.assertTrue(is_match_single(system_info, '$default', '')) + self.assertTrue(is_match_single(system_info, '$default', '$default')) + self.assertTrue(is_match_single(system_info, '$tag:tag1', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$type:AppVM', 'test-vm1')) + self.assertTrue(is_match_single(system_info, + '$type:TemplateVM', 'test-template')) + self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm')) + self.assertTrue(is_match_single(system_info, + '$anyvm', '$dispvm:default-dvm')) + self.assertTrue(is_match_single(system_info, '$dispvm', '$dispvm')) + self.assertTrue(is_match_single(system_info, 'dom0', 'dom0')) + self.assertTrue(is_match_single(system_info, + '$dispvm:default-dvm', '$dispvm:default-dvm')) + self.assertTrue(is_match_single(system_info, '$anyvm', '$dispvm')) + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + self.assertTrue(is_match_single(system_info, '$anyvm', 'test-vm1')) + + self.assertFalse(is_match_single(system_info, '$default', 'test-vm1')) + self.assertFalse(is_match_single(system_info, '$tag:tag1', 'test-vm3')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm')) + # test-vm1.dispvm_allowed=False + self.assertFalse(is_match_single(system_info, + '$anyvm', '$dispvm:test-vm1')) + # test-vm1.dispvm_allowed=False + self.assertFalse(is_match_single(system_info, + '$dispvm:test-vm1', '$dispvm:test-vm1')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'dom0')) + self.assertFalse(is_match_single(system_info, '$tag:tag1', 'dom0')) + self.assertFalse(is_match_single(system_info, '$anyvm', '$tag:tag1')) + self.assertFalse(is_match_single(system_info, '$anyvm', '$type:AppVM')) + self.assertFalse(is_match_single(system_info, '$anyvm', '$invalid')) + self.assertFalse(is_match_single(system_info, '$invalid', '$invalid')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'no-such-vm')) + self.assertFalse(is_match_single(system_info, + 'no-such-vm', 'no-such-vm')) + self.assertFalse(is_match_single(system_info, '$dispvm', 'test-vm1')) + self.assertFalse(is_match_single(system_info, '$dispvm', 'default-dvm')) + self.assertFalse(is_match_single(system_info, + '$dispvm:default-dvm', 'default-dvm')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1\n')) + self.assertFalse(is_match_single(system_info, '$anyvm', 'test-vm1 ')) + + def test_050_match(self): + line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + self.assertTrue(line.is_match(system_info, 'test-vm1', 'test-vm2')) + line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + self.assertFalse(line.is_match(system_info, 'no-such-vm', 'test-vm2')) + line = qubespolicy.PolicyRule('$anyvm $anyvm allow') + self.assertFalse(line.is_match(system_info, 'test-vm1', 'no-such-vm')) + + def test_060_expand_target(self): + lines = { + '$anyvm $anyvm allow': ['test-vm1', 'test-vm2', 'test-vm3', + '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone', '$dispvm'], + '$anyvm $dispvm allow': ['$dispvm'], + '$anyvm $dispvm:default-dvm allow': ['$dispvm:default-dvm'], + # no DispVM from test-vm1 allowed + '$anyvm $dispvm:test-vm1 allow': [], + '$anyvm test-vm1 allow': ['test-vm1'], + '$anyvm $type:AppVM allow': ['test-vm1', 'test-vm2', 'test-vm3', + 'default-dvm', 'test-invalid-dvm', 'test-no-dvm'], + '$anyvm $type:TemplateVM allow': ['test-template'], + '$anyvm $tag:tag1 allow': ['test-vm1', 'test-invalid-dvm', + 'test-template', 'test-standalone', 'test-no-dvm'], + '$anyvm $tag:tag2 allow': ['test-vm1', 'test-vm2', + 'test-invalid-dvm', 'test-template', 'test-standalone', + 'test-no-dvm'], + '$anyvm $tag:no-such-tag allow': [], + } + for line in lines: + with self.subTest(line): + policy_line = qubespolicy.PolicyRule(line) + self.assertCountEqual(list(policy_line.expand_target(system_info)), + lines[line]) + + def test_070_expand_override_target(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=test-vm2') + self.assertEqual( + line.expand_override_target(system_info, 'test-vm1'), + 'test-vm2') + + def test_071_expand_override_target_dispvm(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=$dispvm') + self.assertEqual( + line.expand_override_target(system_info, 'test-vm1'), + '$dispvm:default-dvm') + + def test_072_expand_override_target_dispvm_specific(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=$dispvm:test-vm3') + self.assertEqual( + line.expand_override_target(system_info, 'test-vm1'), + '$dispvm:test-vm3') + + def test_073_expand_override_target_dispvm_none(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=$dispvm') + self.assertEqual( + line.expand_override_target(system_info, 'test-no-dvm'), + None) + + def test_074_expand_override_target_dom0(self): + line = qubespolicy.PolicyRule( + '$anyvm $anyvm allow,target=dom0') + self.assertEqual( + line.expand_override_target(system_info, 'test-no-dvm'), + 'dom0') + + +class TC_10_PolicyAction(qubes.tests.QubesTestCase): + def test_000_init(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm deny') + with self.assertRaises(qubespolicy.AccessDenied): + qubespolicy.PolicyAction('test.service', 'test-vm1', 'test-vm2', + rule, 'test-vm2') + + def test_001_init(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + self.assertEqual(action.service, 'test.service') + self.assertEqual(action.source, 'test-vm1') + self.assertIsNone(action.target) + self.assertEqual(action.original_target, 'test-vm2') + self.assertEqual(action.targets_for_ask, ['test-vm2', 'test-vm3']) + self.assertEqual(action.rule, rule) + self.assertEqual(action.action, qubespolicy.Action.ask) + + def test_002_init_invalid(self): + rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask') + rule_allow = qubespolicy.PolicyRule('$anyvm $anyvm allow') + with self.assertRaises(AssertionError): + qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule_allow, 'test-vm2', None) + with self.assertRaises(AssertionError): + qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule_allow, 'test-vm2', ['test-vm2', 'test-vm3']) + + with self.assertRaises(AssertionError): + qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule_ask, 'test-vm2', None) + + def test_003_init_default_target(self): + rule_ask = qubespolicy.PolicyRule('$anyvm $anyvm ask') + + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm1', rule_ask, 'test-vm2', ['test-vm2']) + self.assertIsNone(action.target) + + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule_ask, 'test-vm2', ['test-vm2']) + self.assertEqual(action.target, 'test-vm2') + + def test_010_handle_user_response(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + action.handle_user_response(True, 'test-vm2') + self.assertEqual(action.action, qubespolicy.Action.allow) + self.assertEqual(action.target, 'test-vm2') + + def test_011_handle_user_response(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + with self.assertRaises(AssertionError): + action.handle_user_response(True, 'test-no-dvm') + + def test_012_handle_user_response(self): + rule = qubespolicy.PolicyRule('$anyvm $anyvm ask') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + None, rule, 'test-vm2', ['test-vm2', 'test-vm3']) + with self.assertRaises(qubespolicy.AccessDenied): + action.handle_user_response(False, None) + self.assertEqual(action.action, qubespolicy.Action.deny) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_020_execute(self, mock_subprocess, mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule, 'test-vm2') + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2', + '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_021_execute_dom0(self, mock_subprocess, mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm dom0 allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'dom0', rule, 'dom0') + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('dom0', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dom0', + '-c', 'some-ident', + qubespolicy.QUBES_RPC_MULTIPLEXER_PATH + + ' test.service test-vm1 dom0'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_022_execute_dispvm(self, mock_subprocess, mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $dispvm:default-dvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + '$dispvm:default-dvm', rule, '$dispvm:default-dvm') + mock_qubesd_call.side_effect = (lambda target, call: + b'dispvm-name' if call == 'mgmtinternal.vm.Create.DispVM' else + unittest.mock.DEFAULT) + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('default-dvm', 'mgmtinternal.vm.Create.DispVM'), + unittest.mock.call('dispvm-name', 'mgmtinternal.vm.Start'), + unittest.mock.call('dispvm-name', + 'mgmtinternal.vm.CleanupDispVM')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'dispvm-name', + '-c', 'some-ident', '-W', + 'DEFAULT:QUBESRPC test.service test-vm1'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_023_execute_already_running(self, mock_subprocess, + mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule, 'test-vm2') + mock_qubesd_call.side_effect = \ + qubespolicy.QubesMgmtException('QubesVMNotHaltedError') + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, + [unittest.mock.call([qubespolicy.QREXEC_CLIENT, '-d', 'test-vm2', + '-c', 'some-ident', 'DEFAULT:QUBESRPC test.service test-vm1'])]) + + @unittest.mock.patch('qubespolicy.qubesd_call') + @unittest.mock.patch('subprocess.call') + def test_024_execute_startup_error(self, mock_subprocess, + mock_qubesd_call): + rule = qubespolicy.PolicyRule('$anyvm $anyvm allow') + action = qubespolicy.PolicyAction('test.service', 'test-vm1', + 'test-vm2', rule, 'test-vm2') + mock_qubesd_call.side_effect = \ + qubespolicy.QubesMgmtException('QubesVMError') + with self.assertRaises(qubespolicy.QubesMgmtException): + action.execute('some-ident') + self.assertEqual(mock_qubesd_call.mock_calls, + [unittest.mock.call('test-vm2', 'mgmtinternal.vm.Start')]) + self.assertEqual(mock_subprocess.mock_calls, []) + +@unittest.mock.patch('qubespolicy.POLICY_DIR', tmp_policy_dir) +class TC_20_Policy(qubes.tests.QubesTestCase): + + def setUp(self): + super(TC_20_Policy, self).setUp() + if not os.path.exists(tmp_policy_dir): + os.mkdir(tmp_policy_dir) + + def tearDown(self): + shutil.rmtree(tmp_policy_dir) + super(TC_20_Policy, self).tearDown() + + def test_000_load(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('\n') + f.write('# comment\n') + f.write('test-vm2 test-vm3 ask\n') + f.write(' # comment \n') + f.write('$anyvm $anyvm ask\n') + policy = qubespolicy.Policy('test.service') + self.assertEqual(policy.service, 'test.service') + self.assertEqual(len(policy.policy_rules), 3) + self.assertEqual(policy.policy_rules[0].source, 'test-vm1') + self.assertEqual(policy.policy_rules[0].target, 'test-vm2') + self.assertEqual(policy.policy_rules[0].action, + qubespolicy.Action.allow) + + def test_001_not_existent(self): + with self.assertRaises(qubespolicy.AccessDenied): + qubespolicy.Policy('no-such.service') + + def test_002_include(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('$include:test.service2\n') + f.write('$anyvm $anyvm deny\n') + with open(os.path.join(tmp_policy_dir, 'test.service2'), 'w') as f: + f.write('test-vm3 $default allow,target=test-vm2\n') + policy = qubespolicy.Policy('test.service') + self.assertEqual(policy.service, 'test.service') + self.assertEqual(len(policy.policy_rules), 3) + self.assertEqual(policy.policy_rules[0].source, 'test-vm1') + self.assertEqual(policy.policy_rules[0].target, 'test-vm2') + self.assertEqual(policy.policy_rules[0].action, + qubespolicy.Action.allow) + self.assertEqual(policy.policy_rules[0].filename, + tmp_policy_dir + '/test.service') + self.assertEqual(policy.policy_rules[0].lineno, 1) + self.assertEqual(policy.policy_rules[1].source, 'test-vm3') + self.assertEqual(policy.policy_rules[1].target, '$default') + self.assertEqual(policy.policy_rules[1].action, + qubespolicy.Action.allow) + self.assertEqual(policy.policy_rules[1].filename, + tmp_policy_dir + '/test.service2') + self.assertEqual(policy.policy_rules[1].lineno, 1) + self.assertEqual(policy.policy_rules[2].source, '$anyvm') + self.assertEqual(policy.policy_rules[2].target, '$anyvm') + self.assertEqual(policy.policy_rules[2].action, + qubespolicy.Action.deny) + self.assertEqual(policy.policy_rules[2].filename, + tmp_policy_dir + '/test.service') + self.assertEqual(policy.policy_rules[2].lineno, 3) + + def test_010_find_rule(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $anyvm ask\n') + f.write('test-vm2 $tag:tag1 deny\n') + f.write('test-vm2 $tag:tag2 allow\n') + f.write('$type:AppVM $default allow,target=test-vm3\n') + f.write('$tag:tag1 $type:AppVM allow\n') + policy = qubespolicy.Policy('test.service') + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm1', 'test-vm2'), policy.policy_rules[0]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm1', 'test-vm3'), policy.policy_rules[1]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', 'test-vm2'), policy.policy_rules[3]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', 'test-no-dvm'), policy.policy_rules[2]) + # $anyvm matches $default too + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm1', ''), policy.policy_rules[1]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', ''), policy.policy_rules[4]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-vm2', '$default'), policy.policy_rules[4]) + self.assertEqual(policy.find_matching_rule( + system_info, 'test-no-dvm', 'test-vm3'), policy.policy_rules[5]) + with self.assertRaises(qubespolicy.AccessDenied): + policy.find_matching_rule( + system_info, 'test-no-dvm', 'test-standalone') + with self.assertRaises(qubespolicy.AccessDenied): + policy.find_matching_rule( + system_info, 'test-standalone', '$default') + + def test_020_collect_targets_for_ask(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $anyvm ask\n') + f.write('test-vm2 $tag:tag1 deny\n') + f.write('test-vm2 $tag:tag2 allow\n') + f.write('test-no-dvm $type:AppVM deny\n') + f.write('$type:AppVM $default allow,target=test-vm3\n') + f.write('$tag:tag1 $type:AppVM allow\n') + f.write('test-no-dvm $dispvm allow\n') + f.write('test-standalone $dispvm allow\n') + policy = qubespolicy.Policy('test.service') + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-vm1'), ['test-vm1', 'test-vm2', 'test-vm3', + '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-vm2'), ['test-vm2', 'test-vm3']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-vm3'), ['test-vm3']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-standalone'), ['test-vm1', 'test-vm2', 'test-vm3', + 'default-dvm', 'test-no-dvm', 'test-invalid-dvm', + '$dispvm:default-dvm']) + self.assertCountEqual(policy.collect_targets_for_ask(system_info, + 'test-no-dvm'), []) + + def test_030_eval_simple(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-vm1', 'test-vm2') + self.assertEqual(action.rule, policy.policy_rules[0]) + self.assertEqual(action.action, qubespolicy.Action.allow) + self.assertEqual(action.target, 'test-vm2') + self.assertEqual(action.original_target, 'test-vm2') + self.assertEqual(action.service, 'test.service') + self.assertIsNone(action.targets_for_ask) + with self.assertRaises(qubespolicy.AccessDenied): + policy.evaluate(system_info, 'test-vm2', '$default') + + def test_031_eval_default(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $default allow,target=test-vm2\n') + f.write('$tag:tag1 test-vm2 ask\n') + f.write('$tag:tag2 $anyvm allow\n') + f.write('test-vm3 $anyvm deny\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-vm1', '$default') + self.assertEqual(action.rule, policy.policy_rules[1]) + self.assertEqual(action.action, qubespolicy.Action.allow) + self.assertEqual(action.target, 'test-vm2') + self.assertEqual(action.original_target, '$default') + self.assertEqual(action.service, 'test.service') + self.assertIsNone(action.targets_for_ask) + with self.assertRaises(qubespolicy.AccessDenied): + # action allow should hit, but no target specified (either by + # caller or policy) + policy.evaluate(system_info, 'test-standalone', '$default') + + def test_032_eval_ask(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $default allow,target=test-vm2\n') + f.write('$tag:tag1 test-vm2 ask\n') + f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n') + f.write('$tag:tag2 $anyvm allow\n') + f.write('test-vm3 $anyvm deny\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-standalone', 'test-vm2') + self.assertEqual(action.rule, policy.policy_rules[2]) + self.assertEqual(action.action, qubespolicy.Action.ask) + self.assertIsNone(action.target) + self.assertEqual(action.original_target, 'test-vm2') + self.assertEqual(action.service, 'test.service') + self.assertCountEqual(action.targets_for_ask, + ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone']) + + def test_033_eval_ask(self): + with open(os.path.join(tmp_policy_dir, 'test.service'), 'w') as f: + f.write('test-vm1 test-vm2 allow\n') + f.write('test-vm1 $default allow,target=test-vm2\n') + f.write('$tag:tag1 test-vm2 ask\n') + f.write('$tag:tag1 test-vm3 ask,default_target=test-vm3\n') + f.write('$tag:tag2 $anyvm allow\n') + f.write('test-vm3 $anyvm deny\n') + + policy = qubespolicy.Policy('test.service') + action = policy.evaluate(system_info, 'test-standalone', 'test-vm3') + self.assertEqual(action.rule, policy.policy_rules[3]) + self.assertEqual(action.action, qubespolicy.Action.ask) + self.assertEqual(action.target, 'test-vm3') + self.assertEqual(action.original_target, 'test-vm3') + self.assertEqual(action.service, 'test.service') + self.assertCountEqual(action.targets_for_ask, + ['test-vm1', 'test-vm2', 'test-vm3', '$dispvm:test-vm3', + 'default-dvm', '$dispvm:default-dvm', 'test-invalid-dvm', + 'test-no-dvm', 'test-template', 'test-standalone']) + + +class TC_30_Misc(qubes.tests.QubesTestCase): + @unittest.mock.patch('socket.socket') + def test_000_qubesd_call(self, mock_socket): + mock_config = { + 'return_value.makefile.return_value.read.return_value': b'0\x00data' + } + mock_socket.configure_mock(**mock_config) + result = qubespolicy.qubesd_call('test', 'method') + self.assertEqual(result, b'data') + self.assertEqual(mock_socket.mock_calls, [ + unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), + unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), + unittest.mock.call().sendall(b'dom0'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'method'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'test'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().shutdown(socket.SHUT_WR), + unittest.mock.call().makefile('rb'), + unittest.mock.call().makefile().read(), + ]) + + @unittest.mock.patch('socket.socket') + def test_001_qubesd_call_arg_payload(self, mock_socket): + mock_config = { + 'return_value.makefile.return_value.read.return_value': b'0\x00data' + } + mock_socket.configure_mock(**mock_config) + result = qubespolicy.qubesd_call('test', 'method', 'arg', b'payload') + self.assertEqual(result, b'data') + self.assertEqual(mock_socket.mock_calls, [ + unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), + unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), + unittest.mock.call().sendall(b'dom0'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'method'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'test'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'arg'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'payload'), + unittest.mock.call().shutdown(socket.SHUT_WR), + unittest.mock.call().makefile('rb'), + unittest.mock.call().makefile().read(), + ]) + + @unittest.mock.patch('socket.socket') + def test_002_qubesd_call_exception(self, mock_socket): + mock_config = { + 'return_value.makefile.return_value.read.return_value': + b'2\x00SomeError\x00traceback\x00message\x00' + } + mock_socket.configure_mock(**mock_config) + with self.assertRaises(qubespolicy.QubesMgmtException) as e: + qubespolicy.qubesd_call('test', 'method') + self.assertEqual(e.exception.exc_type, 'SomeError') + self.assertEqual(mock_socket.mock_calls, [ + unittest.mock.call(socket.AF_UNIX, socket.SOCK_STREAM), + unittest.mock.call().connect(qubespolicy.QUBESD_INTERNAL_SOCK), + unittest.mock.call().sendall(b'dom0'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'method'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'test'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().sendall(b'\x00'), + unittest.mock.call().shutdown(socket.SHUT_WR), + unittest.mock.call().makefile('rb'), + unittest.mock.call().makefile().read(), + ]) + diff --git a/qubespolicy/tests/gtkhelpers.py b/qubespolicy/tests/gtkhelpers.py new file mode 100755 index 00000000..d305f14e --- /dev/null +++ b/qubespolicy/tests/gtkhelpers.py @@ -0,0 +1,404 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import time +import unittest + +from gi.repository import Gtk + +from qubespolicy.gtkhelpers import VMListModeler, GtkOneTimerHelper, \ + FocusStealingHelper + +mock_domains_info = { + 'dom0': {'icon': 'black', 'type': 'AdminVM'}, + 'test-red1': {'icon': 'red', 'type': 'AppVM'}, + 'test-red2': {'icon': 'red', 'type': 'AppVM'}, + 'test-red3': {'icon': 'red', 'type': 'AppVM'}, + 'test-source': {'icon': 'green', 'type': 'AppVM'}, + 'test-target': {'icon': 'orange', 'type': 'AppVM'}, + '$dispvm:test-disp6': {'icon': 'red', 'type': 'DispVM'}, +} + +mock_whitelist = ["test-red1", "test-red2", "test-red3", + "test-target", "$dispvm:test-disp6"] + +class MockComboEntry: + def __init__(self, text): + self._text = text + + def get_active_id(self): + return self._text + + def get_text(self): + return self._text + + +class GtkTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + self._smallest_wait = 0.01 + + def flush_gtk_events(self, wait_seconds=0): + start = time.time() + iterations = 0 + remaining_wait = wait_seconds + time_length = 0 + + if wait_seconds < 0: + raise ValueError("Only non-negative intervals are allowed.") + + while remaining_wait >= 0: + while Gtk.events_pending(): + Gtk.main_iteration_do(blocking=False) + iterations += 1 + + time_length = time.time() - start + remaining_wait = wait_seconds - time_length + + if remaining_wait > 0: + time.sleep(self._smallest_wait) + + return iterations, time_length + + +class VMListModelerTest(VMListModeler, unittest.TestCase): + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + VMListModeler.__init__(self, mock_domains_info) + + def test_entries_gets_loaded(self): + self.assertIsNotNone(self._entries) + + def test_valid_qube_name(self): + self.apply_model(Gtk.ComboBox(), list(mock_domains_info.keys())) + + for name in ["test-red1", "test-red2", "test-red3", + "test-target", "Disposable VM (test-disp6)"]: + + mock = MockComboEntry(name) + self.assertEquals(name, + self._get_valid_qube_name(mock, mock, mock_whitelist)) + self.assertEquals(name, + self._get_valid_qube_name(None, mock, mock_whitelist)) + self.assertEquals(name, + self._get_valid_qube_name(mock, None, mock_whitelist)) + self.assertIsNone( + self._get_valid_qube_name(None, None, mock_whitelist)) + + def test_valid_qube_name_whitelist(self): + list_exc = ["$dispvm:test-disp6", "test-red2"] + + whitelist = [name for name in mock_whitelist if name not in list_exc] + self.apply_model(Gtk.ComboBox(), whitelist) + + for name in list_exc: + mock = MockComboEntry(name) + self.assertIsNone(self._get_valid_qube_name(mock, mock, whitelist)) + self.assertIsNone(self._get_valid_qube_name(None, mock, whitelist)) + self.assertIsNone(self._get_valid_qube_name(mock, None, whitelist)) + + def test_invalid_qube_name(self): + self.apply_model(Gtk.ComboBox(), mock_whitelist) + + for name in ["test-nonexistant", None, "", 1]: + + mock = MockComboEntry(name) + self.assertIsNone( + self._get_valid_qube_name(mock, mock, mock_whitelist)) + self.assertIsNone( + self._get_valid_qube_name(None, mock, mock_whitelist)) + self.assertIsNone( + self._get_valid_qube_name(mock, None, mock_whitelist)) + + def test_apply_model(self): + new_object = Gtk.ComboBox() + self.assertIsNone(new_object.get_model()) + + self.apply_model(new_object, mock_whitelist) + + self.assertIsNotNone(new_object.get_model()) + + def test_apply_model_with_entry(self): + new_object = Gtk.ComboBox.new_with_entry() + + self.assertIsNone(new_object.get_model()) + + self.apply_model(new_object, []) + + self.assertIsNotNone(new_object.get_model()) + + def test_apply_model_only_combobox(self): + invalid_types = [1, "One", u'1', {'1': "one"}, VMListModeler( + mock_domains_info)] + + for invalid_type in invalid_types: + with self.assertRaises(TypeError): + self.apply_model(invalid_type, []) + + def test_apply_model_whitelist(self): + combo = Gtk.ComboBox() + + self.apply_model(combo, list(mock_domains_info.keys())) + self.assertEquals(7, len(combo.get_model())) + + names = [entry['api_name'] for entry in self._entries.values()] + + self.apply_model(combo, [names[0]]) + self.assertEquals(1, len(combo.get_model())) + + self.apply_model(combo, [names[0], names[1]]) + self.assertEquals(2, len(combo.get_model())) + + def test_apply_icon(self): + new_object = Gtk.Entry() + + self.assertIsNone( + new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY)) + + self.apply_icon(new_object, "Disposable VM (test-disp6)") + + self.assertIsNotNone( + new_object.get_icon_pixbuf(Gtk.EntryIconPosition.PRIMARY)) + + def test_apply_icon_only_entry(self): + invalid_types = [1, "One", u'1', {'1': "one"}, Gtk.ComboBox()] + + for invalid_type in invalid_types: + with self.assertRaises(TypeError): + self.apply_icon(invalid_type, "test-disp6") + + def test_apply_icon_only_existing(self): + new_object = Gtk.Entry() + + for name in ["test-red1", "test-red2", "test-red3", + "test-target", "Disposable VM (test-disp6)"]: + self.apply_icon(new_object, name) + + for name in ["test-nonexistant", None, "", 1]: + with self.assertRaises(ValueError): + self.apply_icon(new_object, name) + + +class GtkOneTimerHelperTest(GtkOneTimerHelper, GtkTestCase): + def __init__(self, *args, **kwargs): + GtkTestCase.__init__(self, *args, **kwargs) + + self._test_time = 0.1 + + GtkOneTimerHelper.__init__(self, self._test_time) + self._run_timers = [] + + def _timer_run(self, timer_id): + self._run_timers.append(timer_id) + + def test_nothing_runs_automatically(self): + self.flush_gtk_events(self._test_time*2) + self.assertEquals([], self._run_timers) + self.assertEquals(0, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + def test_schedule_one_task(self): + self._timer_schedule() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1], self._run_timers) + self.assertEquals(1, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + def test_invalidate_completed(self): + self._timer_schedule() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1], self._run_timers) + self.assertEquals(1, self._current_timer_id) + + self.assertTrue(self._timer_has_completed()) + self._invalidate_timer_completed() + self.assertFalse(self._timer_has_completed()) + + def test_schedule_and_cancel_one_task(self): + self._timer_schedule() + self._invalidate_current_timer() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([], self._run_timers) + self.assertEquals(2, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + def test_two_tasks(self): + self._timer_schedule() + self.flush_gtk_events(self._test_time/4) + self._timer_schedule() + self.flush_gtk_events(self._test_time*2) + self.assertEquals([2], self._run_timers) + self.assertEquals(2, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + def test_more_tasks(self): + num = 0 + for num in range(1, 10): + self._timer_schedule() + self.flush_gtk_events(self._test_time/4) + self.flush_gtk_events(self._test_time*1.75) + self.assertEquals([num], self._run_timers) + self.assertEquals(num, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + def test_more_tasks_cancel(self): + num = 0 + for num in range(1, 10): + self._timer_schedule() + self.flush_gtk_events(self._test_time/4) + self._invalidate_current_timer() + self.flush_gtk_events(int(self._test_time*1.75)) + self.assertEquals([], self._run_timers) + self.assertEquals(num+1, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + def test_subsequent_tasks(self): + self._timer_schedule() # 1 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1], self._run_timers) + self.assertEquals(1, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + self._timer_schedule() # 2 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1, 2], self._run_timers) + self.assertEquals(2, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + self._invalidate_timer_completed() + self._timer_schedule() # 3 + self._invalidate_current_timer() # 4 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1, 2], self._run_timers) + self.assertEquals(4, self._current_timer_id) + self.assertFalse(self._timer_has_completed()) + + self._timer_schedule() # 5 + self.flush_gtk_events(self._test_time*2) + self.assertEquals([1, 2, 5], self._run_timers) + self.assertEquals(5, self._current_timer_id) + self.assertTrue(self._timer_has_completed()) + + +class FocusStealingHelperMock(FocusStealingHelper): + def simulate_focus(self): + self._window_changed_focus(True) + + +class FocusStealingHelperTest(FocusStealingHelperMock, GtkTestCase): + def __init__(self, *args, **kwargs): + GtkTestCase.__init__(self, *args, **kwargs) + + self._test_time = 0.1 + self._test_button = Gtk.Button() + self._test_window = Gtk.Window() + + FocusStealingHelperMock.__init__(self, self._test_window, + self._test_button, self._test_time) + + def test_nothing_runs_automatically(self): + self.assertFalse(self.can_perform_action()) + self.flush_gtk_events(self._test_time*2) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def test_nothing_runs_automatically_with_request(self): + self.request_sensitivity(True) + self.assertFalse(self.can_perform_action()) + self.flush_gtk_events(self._test_time*2) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def _simulate_focus(self, focused): + self._window_changed_focus(focused) + + def test_focus_with_request(self): + self.request_sensitivity(True) + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + def test_focus_with_late_request(self): + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self.request_sensitivity(True) + self.assertTrue(self._test_button.get_sensitive()) + + def test_immediate_defocus(self): + self.request_sensitivity(True) + self._simulate_focus(True) + self._simulate_focus(False) + self.flush_gtk_events(self._test_time*2) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def test_focus_then_unfocus(self): + self.request_sensitivity(True) + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + self._simulate_focus(False) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + def test_focus_cycle(self): + self.request_sensitivity(True) + + self._simulate_focus(True) + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + self._simulate_focus(False) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self._simulate_focus(True) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertTrue(self._test_button.get_sensitive()) + + self.request_sensitivity(False) + self.assertTrue(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self._simulate_focus(False) + self.assertFalse(self.can_perform_action()) + + self._simulate_focus(True) + self.assertFalse(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + + self.flush_gtk_events(self._test_time*2) + self.assertTrue(self.can_perform_action()) + self.assertFalse(self._test_button.get_sensitive()) + +if __name__ == '__main__': + unittest.main() diff --git a/qubespolicy/tests/rpcconfirmation.py b/qubespolicy/tests/rpcconfirmation.py new file mode 100755 index 00000000..fdff54f0 --- /dev/null +++ b/qubespolicy/tests/rpcconfirmation.py @@ -0,0 +1,335 @@ +#!/usr/bin/python +# +# The Qubes OS Project, https://www.qubes-os.org/ +# +# Copyright (C) 2017 boring-stuff +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import sys +import unittest + +from qubespolicy.tests.gtkhelpers import GtkTestCase, FocusStealingHelperMock +from qubespolicy.tests.gtkhelpers import mock_domains_info, mock_whitelist + +from qubespolicy.gtkhelpers import VMListModeler +from qubespolicy.rpcconfirmation import RPCConfirmationWindow + + +class MockRPCConfirmationWindow(RPCConfirmationWindow): + def _new_vm_list_modeler(self): + return VMListModeler(mock_domains_info) + + def _new_focus_stealing_helper(self): + return FocusStealingHelperMock( + self._rpc_window, + self._rpc_ok_button, + self._focus_stealing_seconds) + + def __init__(self, source, rpc_operation, whitelist, + target=None, focus_stealing_seconds=1): + self._focus_stealing_seconds = focus_stealing_seconds + + RPCConfirmationWindow.__init__( + self, mock_domains_info, source, rpc_operation, whitelist, + target) + + def is_error_visible(self): + return self._error_bar.get_visible() + + def get_shown_domains(self): + model = self._rpc_combo_box.get_model() + model_iter = model.get_iter_first() + domains = [] + + while model_iter is not None: + domain_name = model.get_value(model_iter, 1) + + domains += [domain_name] + + model_iter = model.iter_next(model_iter) + + return domains + + +class RPCConfirmationWindowTestBase(MockRPCConfirmationWindow, GtkTestCase): + def __init__(self, test_method, source_name="test-source", + rpc_operation="test.Operation", whitelist=mock_whitelist, + target_name=None): + GtkTestCase.__init__(self, test_method) + self.test_source_name = source_name + self.test_rpc_operation = rpc_operation + self.test_target_name = target_name + + self._test_time = 0.1 + + self.test_called_close = False + self.test_called_show = False + + self.test_clicked_ok = False + self.test_clicked_cancel = False + + MockRPCConfirmationWindow.__init__(self, + self.test_source_name, + self.test_rpc_operation, + whitelist, + self.test_target_name, + focus_stealing_seconds=self._test_time) + + def _can_perform_action(self): + return True + + def _close(self): + self.test_called_close = True + + def _show(self): + self.test_called_show = True + + def _clicked_ok(self, button): + MockRPCConfirmationWindow._clicked_ok(self, button) + self.test_clicked_ok = True + + def _clicked_cancel(self, button): + MockRPCConfirmationWindow._clicked_cancel(self, button) + self.test_clicked_cancel = True + + def test_has_linked_the_fields(self): + self.assertIsNotNone(self._rpc_window) + self.assertIsNotNone(self._rpc_ok_button) + self.assertIsNotNone(self._rpc_cancel_button) + self.assertIsNotNone(self._rpc_label) + self.assertIsNotNone(self._source_entry) + self.assertIsNotNone(self._rpc_combo_box) + self.assertIsNotNone(self._error_bar) + self.assertIsNotNone(self._error_message) + + def test_is_showing_source(self): + self.assertTrue(self.test_source_name in self._source_entry.get_text()) + + def test_is_showing_operation(self): + self.assertTrue(self.test_rpc_operation in self._rpc_label.get_text()) + + def test_escape_and_format_rpc_text(self): + self.assertEquals("qubes.Test", + self._escape_and_format_rpc_text("qubes.Test")) + self.assertEquals("custom.Domain", + self._escape_and_format_rpc_text("custom.Domain")) + self.assertEquals("nodomain", + self._escape_and_format_rpc_text("nodomain")) + self.assertEquals("domain.Sub.Operation", + self._escape_and_format_rpc_text("domain.Sub.Operation")) + self.assertEquals("", + self._escape_and_format_rpc_text("")) + self.assertEquals(".", + self._escape_and_format_rpc_text(".")) + self.assertEquals("inject.<script>", + self._escape_and_format_rpc_text("inject.