From 78693c265c829b7555b156e44af09b8aaa5541b7 Mon Sep 17 00:00:00 2001 From: Wojtek Porczyk Date: Tue, 4 Apr 2017 15:57:53 +0200 Subject: [PATCH] qubes/vm/qubesvm: rework QubesVM.run*() methods 0) All those methods are now awaitable rather than synchronous. 1) The base method is run_service(). The method run() was rewritten using run_service('qubes.VMShell', input=...). There is no provision for running plain commands. 2) Get rid of passio*= arguments. If you'd like to get another return value, use another method. It's as simple as that. See: - run_service_for_stdio() - run_for_stdio() Also gone are wait= and localcmd= arguments. They are of no use inside qubesd. 3) The qvm-run tool and tests are left behind for now and will be fixed later. This is because they also need event loop, which is not implemented yet. fixes QubesOS/qubes-issues#1900 QubesOS/qubes-issues#2622 --- qubes/vm/qubesvm.py | 199 ++++++++++++++++++++------------------------ 1 file changed, 88 insertions(+), 111 deletions(-) diff --git a/qubes/vm/qubesvm.py b/qubes/vm/qubesvm.py index c3eac1b8..e41ef02f 100644 --- a/qubes/vm/qubesvm.py +++ b/qubes/vm/qubesvm.py @@ -208,7 +208,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): .. event:: domain-cmd-pre-run (subject, event, start_guid) - Fired at the beginning of :py:meth:`run` method. + Fired at the beginning of :py:meth:`run_service` method. :param subject: Event emitter (the qube object) :param event: Event name (``'domain-cmd-pre-run'``) @@ -876,7 +876,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): await self.kill() raise - asyncio.ensure_future(self.wait_for_session()) + asyncio.ensure_future(self._wait_for_session()) return self @@ -981,139 +981,116 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): return self - # TODO async def - # TODO def run_for_retcode, factor out passio - def run(self, command, user=None, autostart=False, notify_function=None, - passio=False, passio_popen=False, passio_stderr=False, - ignore_stderr=False, localcmd=None, wait=False, gui=True, - filter_esc=False): - '''Run specified command inside domain + async def run_service(self, service, source=None, user=None, + filter_esc=False, autostart=False, gui=False, **kwargs): + '''Run service on this VM - :param str command: the command to be run - :param str user: user to run the command as - :param bool autostart: if :py:obj:`True`, machine will be started if \ - it is not running - :param collections.Callable notify_function: FIXME, may go away - :param bool passio: FIXME - :param bool passio_popen: if :py:obj:`True`, \ - :py:class:`subprocess.Popen` object has connected ``stdin`` and \ - ``stdout`` - :param bool passio_stderr: if :py:obj:`True`, \ - :py:class:`subprocess.Popen` has additionaly ``stderr`` connected - :param bool ignore_stderr: if :py:obj:`True`, ``stderr`` is connected \ - to :file:`/dev/null` - :param str localcmd: local command to communicate with remote command - :param bool wait: if :py:obj:`True`, wait for command completion - :param bool gui: when autostarting, also start gui daemon + :param str service: service name + :param qubes.vm.qubesvm.QubesVM: source domain as presented to this VM + :param str user: username to run service as :param bool filter_esc: filter escape sequences to protect terminal \ emulator + :param bool autostart: if :py:obj:`True`, machine will be started if \ + it is not running + :param bool gui: when autostarting, also start gui daemon + :rtype: asyncio.subprocess.Process + + .. note:: + User ``root`` is redefined to ``SYSTEM`` in the Windows agent code ''' + # UNSUPPORTED from previous incarnation: + # localcmd, wait, passio*, notify_function, `-e` switch + # + # - passio* and friends depend on params to command (like in stdlib) + # - the filter_esc is orthogonal to passio* + # - input: see run_service_for_stdio + # - wait has no purpose since this is asynchronous + # - notify_function is gone + + source = 'dom0' if source is None else self.app.domains[source].name + if user is None: user = self.default_user - null = None - if not self.is_running() and not self.is_paused(): - if not autostart: - raise qubes.exc.QubesVMNotRunningError(self) - - if notify_function is not None: - notify_function('info', - 'Starting the {!r} VM...'.format(self.name)) - self.start(start_guid=gui, notify_function=notify_function) if self.is_paused(): # XXX what about autostart? raise qubes.exc.QubesVMNotRunningError( self, 'Domain {!r} is paused'.format(self.name)) + elif not self.is_running(): + if not autostart: + raise qubes.exc.QubesVMNotRunningError(self) + await self.start(start_guid=gui) if not self.is_qrexec_running(): raise qubes.exc.QubesVMError( self, 'Domain {!r}: qrexec not connected'.format(self.name)) - self.fire_event_pre('domain-cmd-pre-run', start_guid=gui) - if not self.have_session.is_set(): raise qubes.exc.QubesVMError(self, 'don\'t have session yet') - args = [qubes.config.system_path['qrexec_client_path'], + self.fire_event_pre('domain-cmd-pre-run', start_guid=gui) + + return await asyncio.create_subprocess_exec( + qubes.config.system_path['qrexec_client_path'], '-d', str(self.name), - '{}:{}'.format(user, command)] - if localcmd is not None: - args += ['-l', localcmd] - if filter_esc: - args += ['-t'] - if os.isatty(sys.stderr.fileno()): - args += ['-T'] + *(('-t', '-T') if filter_esc else ()), + '{}:QUBESRPC {} {}'.format(user, service, source), + **kwargs) - call_kwargs = {} - if ignore_stderr or not passio: - null = open("/dev/null", "r+") - call_kwargs['stderr'] = null - if not passio: - call_kwargs['stdin'] = null - call_kwargs['stdout'] = null + async def run_service_for_stdio(self, *args, input=None, **kwargs): + '''Run a service, pass an optional input and return (stdout, stderr). - if passio_popen: - popen_kwargs = { - 'stdout': subprocess.PIPE, - 'stdin': subprocess.PIPE - } - if passio_stderr: - popen_kwargs['stderr'] = subprocess.PIPE - else: - popen_kwargs['stderr'] = call_kwargs.get('stderr', None) - p = subprocess.Popen(args, **popen_kwargs) - if null: - null.close() - return p - if not wait and not passio: - args += ["-e"] - retcode = subprocess.call(args, **call_kwargs) - if null: - null.close() - return retcode + Raises an exception if return code != 0. - def run_service(self, service, source=None, user=None, - passio_popen=False, input=None, localcmd=None, gui=False, - wait=True, passio_stderr=False): - '''Run service on this VM + *args* and *kwargs* are passed verbatim to :py:meth:`run_service`. - **passio_popen** and **input** are mutually exclusive. - - :param str service: service name - :param qubes.vm.qubesvm.QubesVM: source domain as presented to this VM - :param str user: username to run service as - :param bool passio_popen: passed verbatim to :py:meth:`run` - :param str input: string passed as input to service + .. warning:: + There are some combinations if stdio-related *kwargs*, which are + not filtered for problems originating between the keyboard and the + chair. ''' # pylint: disable=redefined-builtin + p = await self.run_service(*args, **kwargs) - if len([i for i in (input, passio_popen, localcmd) if i]) > 1: - raise TypeError( - 'input, passio_popen and localcmd cannot be used together') + # this one is actually a tuple, but there is no need to unpack it + stdouterr = await p.communicate(input=input) - if not wait and (localcmd or input): - raise ValueError("Cannot use wait=False with input or " - "localcmd specified") + if p.returncode: + raise qubes.exc.QubesVMError(self, + 'service {!r} failed with retcode {!r}; ' + 'stdout={!r} stderr={!r}'.format( + args, p.returncode, *stdouterr)) - if passio_stderr and not passio_popen: - raise TypeError('passio_stderr can be used only with passio_popen') + return stdouterr - if input: - # Internally use passio_popen, but do not return POpen object to - # the user - use internally for p.communicate() - passio_popen = True - passio_stderr = True + @staticmethod + def _prepare_input_for_vmshell(command, input): + '''Prepare shell input for the given command and optional (real) input + ''' # pylint: disable=redefined-builtin + if input is None: + input = b'' + return b''.join((command.rstrip('\n').encode('utf-8'), b'\n', input)) - source = 'dom0' if source is None else self.app.domains[source].name + def run(self, command, input=None, **kwargs): + '''Run a shell command inside the domain using qubes.VMShell qrexec. - p = self.run('QUBESRPC {} {}'.format(service, source), - localcmd=localcmd, passio_popen=passio_popen, user=user, wait=wait, - gui=gui, passio_stderr=passio_stderr) - if input: - p.communicate(input) - return p.returncode - else: - return p + This method is a coroutine. + + *kwargs* are passed verbatim to :py:meth:`run_service`. + ''' # pylint: disable=redefined-builtin + return self.run_service('qubes.VMShell', + input=self._prepare_input_for_vmshell(command, input), **kwargs) + + def run_for_stdio(self, command, input=None, **kwargs): + '''Run a shell command inside the domain using qubes.VMShell qrexec. + + This method is a coroutine. + + *kwargs* are passed verbatim to :py:meth:`run_service_for_stdio`. + See disclaimer there. + ''' # pylint: disable=redefined-builtin + return self.run_service_for_stdio('qubes.VMShell', + input=self._prepare_input_for_vmshell(command, input), **kwargs) def request_memory(self, mem_required=None): # overhead of per-qube/per-vcpu Xen structures, @@ -1153,7 +1130,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): :py:meth:`subprocess.check_call`) :param kwargs: args for :py:meth:`subprocess.check_call` :return: None - ''' + ''' # pylint: disable=redefined-builtin if os.getuid() == 0: # try to always have VM daemons running as normal user, otherwise @@ -1211,7 +1188,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): except subprocess.CalledProcessError: raise qubes.exc.QubesException('Cannot execute qubesdb-daemon') - async def wait_for_session(self): + async def _wait_for_session(self): '''Wait until machine finished boot sequence. This is done by executing qubes RPC call that checks if dummy system @@ -1220,14 +1197,14 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM): self.log.info('Waiting for qubes-session') - # Note : User root is redefined to SYSTEM in the Windows agent code - p = await asyncio.get_event_loop().run_in_executor( - functools.partial(self.run, 'QUBESRPC qubes.WaitForSession none', - user='root', passio_popen=True, gui=False, wait=True)) - await asyncio.get_event_loop().run_in_executor(functools.partial( - p.communicate, input=self.default_user.encode())) + await self.run_service_for_stdio('qubes.WaitForSession', + user='root', gui=False, input=self.default_user.encode()) + + self.log.info('qubes-session acquired') self.have_session.set() + self.fire_event('have-session') + def create_on_disk(self, pool=None, pools=None): '''Create files needed for VM. '''