瀏覽代碼

qubes/vm/qubesvm: rework QubesVM.run*() methods

0) All those methods are now awaitable rather than synchronous.

1) The base method is run_service(). The method run() was rewritten
   using run_service('qubes.VMShell', input=...). There is no provision
   for running plain commands.

2) Get rid of passio*= arguments. If you'd like to get another return
   value, use another method. It's as simple as that.
   See:
      - run_service_for_stdio()
      - run_for_stdio()

   Also gone are wait= and localcmd= arguments. They are of no use
   inside qubesd.

3) The qvm-run tool and tests are left behind for now and will be fixed
   later. This is because they also need event loop, which is not
   implemented yet.

fixes QubesOS/qubes-issues#1900
QubesOS/qubes-issues#2622
Wojtek Porczyk 7 年之前
父節點
當前提交
78693c265c
共有 1 個文件被更改,包括 89 次插入112 次删除
  1. 89 112
      qubes/vm/qubesvm.py

+ 89 - 112
qubes/vm/qubesvm.py

@@ -208,7 +208,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
 
         .. event:: domain-cmd-pre-run (subject, event, start_guid)
 
-            Fired at the beginning of :py:meth:`run` method.
+            Fired at the beginning of :py:meth:`run_service` method.
 
             :param subject: Event emitter (the qube object)
             :param event: Event name (``'domain-cmd-pre-run'``)
@@ -876,7 +876,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
                 await self.kill()
             raise
 
-        asyncio.ensure_future(self.wait_for_session())
+        asyncio.ensure_future(self._wait_for_session())
 
         return self
 
@@ -981,139 +981,116 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
 
         return self
 
-    # TODO async def
-    # TODO def run_for_retcode, factor out passio
-    def run(self, command, user=None, autostart=False, notify_function=None,
-            passio=False, passio_popen=False, passio_stderr=False,
-            ignore_stderr=False, localcmd=None, wait=False, gui=True,
-            filter_esc=False):
-        '''Run specified command inside domain
+    async def run_service(self, service, source=None, user=None,
+            filter_esc=False, autostart=False, gui=False, **kwargs):
+        '''Run service on this VM
 
-        :param str command: the command to be run
-        :param str user: user to run the command as
+        :param str service: service name
+        :param qubes.vm.qubesvm.QubesVM: source domain as presented to this VM
+        :param str user: username to run service as
+        :param bool filter_esc: filter escape sequences to protect terminal \
+            emulator
         :param bool autostart: if :py:obj:`True`, machine will be started if \
             it is not running
-        :param collections.Callable notify_function: FIXME, may go away
-        :param bool passio: FIXME
-        :param bool passio_popen: if :py:obj:`True`, \
-            :py:class:`subprocess.Popen` object has connected ``stdin`` and \
-            ``stdout``
-        :param bool passio_stderr: if :py:obj:`True`, \
-            :py:class:`subprocess.Popen` has additionaly ``stderr`` connected
-        :param bool ignore_stderr: if :py:obj:`True`, ``stderr`` is connected \
-            to :file:`/dev/null`
-        :param str localcmd: local command to communicate with remote command
-        :param bool wait: if :py:obj:`True`, wait for command completion
         :param bool gui: when autostarting, also start gui daemon
-        :param bool filter_esc: filter escape sequences to protect terminal \
-            emulator
+        :rtype: asyncio.subprocess.Process
+
+        .. note::
+            User ``root`` is redefined to ``SYSTEM`` in the Windows agent code
         '''
 
+        # UNSUPPORTED from previous incarnation:
+        #   localcmd, wait, passio*, notify_function, `-e` switch
+        #
+        # - passio* and friends depend on params to command (like in stdlib)
+        # - the filter_esc is orthogonal to passio*
+        # - input: see run_service_for_stdio
+        # - wait has no purpose since this is asynchronous
+        # - notify_function is gone
+
+        source = 'dom0' if source is None else self.app.domains[source].name
+
         if user is None:
             user = self.default_user
-        null = None
-        if not self.is_running() and not self.is_paused():
-            if not autostart:
-                raise qubes.exc.QubesVMNotRunningError(self)
-
-            if notify_function is not None:
-                notify_function('info',
-                    'Starting the {!r} VM...'.format(self.name))
-            self.start(start_guid=gui, notify_function=notify_function)
 
         if self.is_paused():
             # XXX what about autostart?
             raise qubes.exc.QubesVMNotRunningError(
                 self, 'Domain {!r} is paused'.format(self.name))
+        elif not self.is_running():
+            if not autostart:
+                raise qubes.exc.QubesVMNotRunningError(self)
+            await self.start(start_guid=gui)
 
         if not self.is_qrexec_running():
             raise qubes.exc.QubesVMError(
                 self, 'Domain {!r}: qrexec not connected'.format(self.name))
 
-        self.fire_event_pre('domain-cmd-pre-run', start_guid=gui)
-
         if not self.have_session.is_set():
             raise qubes.exc.QubesVMError(self, 'don\'t have session yet')
 
-        args = [qubes.config.system_path['qrexec_client_path'],
+        self.fire_event_pre('domain-cmd-pre-run', start_guid=gui)
+
+        return await asyncio.create_subprocess_exec(
+            qubes.config.system_path['qrexec_client_path'],
             '-d', str(self.name),
-            '{}:{}'.format(user, command)]
-        if localcmd is not None:
-            args += ['-l', localcmd]
-        if filter_esc:
-            args += ['-t']
-        if os.isatty(sys.stderr.fileno()):
-            args += ['-T']
-
-        call_kwargs = {}
-        if ignore_stderr or not passio:
-            null = open("/dev/null", "r+")
-            call_kwargs['stderr'] = null
-        if not passio:
-            call_kwargs['stdin'] = null
-            call_kwargs['stdout'] = null
-
-        if passio_popen:
-            popen_kwargs = {
-                'stdout': subprocess.PIPE,
-                'stdin': subprocess.PIPE
-            }
-            if passio_stderr:
-                popen_kwargs['stderr'] = subprocess.PIPE
-            else:
-                popen_kwargs['stderr'] = call_kwargs.get('stderr', None)
-            p = subprocess.Popen(args, **popen_kwargs)
-            if null:
-                null.close()
-            return p
-        if not wait and not passio:
-            args += ["-e"]
-        retcode = subprocess.call(args, **call_kwargs)
-        if null:
-            null.close()
-        return retcode
-
-    def run_service(self, service, source=None, user=None,
-                    passio_popen=False, input=None, localcmd=None, gui=False,
-                    wait=True, passio_stderr=False):
-        '''Run service on this VM
+            *(('-t', '-T') if filter_esc else ()),
+            '{}:QUBESRPC {} {}'.format(user, service, source),
+            **kwargs)
 
-        **passio_popen** and **input** are mutually exclusive.
+    async def run_service_for_stdio(self, *args, input=None, **kwargs):
+        '''Run a service, pass an optional input and return (stdout, stderr).
 
-        :param str service: service name
-        :param qubes.vm.qubesvm.QubesVM: source domain as presented to this VM
-        :param str user: username to run service as
-        :param bool passio_popen: passed verbatim to :py:meth:`run`
-        :param str input: string passed as input to service
+        Raises an exception if return code != 0.
+
+        *args* and *kwargs* are passed verbatim to :py:meth:`run_service`.
+
+        .. warning::
+            There are some combinations if stdio-related *kwargs*, which are
+            not filtered for problems originating between the keyboard and the
+            chair.
         '''  # pylint: disable=redefined-builtin
+        p = await self.run_service(*args, **kwargs)
 
-        if len([i for i in (input, passio_popen, localcmd) if i]) > 1:
-            raise TypeError(
-                'input, passio_popen and localcmd cannot be used together')
+        # this one is actually a tuple, but there is no need to unpack it
+        stdouterr = await p.communicate(input=input)
 
-        if not wait and (localcmd or input):
-            raise ValueError("Cannot use wait=False with input or "
-                             "localcmd specified")
+        if p.returncode:
+            raise qubes.exc.QubesVMError(self,
+                'service {!r} failed with retcode {!r}; '
+                'stdout={!r} stderr={!r}'.format(
+                    args, p.returncode, *stdouterr))
 
-        if passio_stderr and not passio_popen:
-            raise TypeError('passio_stderr can be used only with passio_popen')
+        return stdouterr
 
-        if input:
-            # Internally use passio_popen, but do not return POpen object to
-            # the user - use internally for p.communicate()
-            passio_popen = True
-            passio_stderr = True
+    @staticmethod
+    def _prepare_input_for_vmshell(command, input):
+        '''Prepare shell input for the given command and optional (real) input
+        '''  # pylint: disable=redefined-builtin
+        if input is None:
+            input = b''
+        return b''.join((command.rstrip('\n').encode('utf-8'), b'\n', input))
 
-        source = 'dom0' if source is None else self.app.domains[source].name
+    def run(self, command, input=None, **kwargs):
+        '''Run a shell command inside the domain using qubes.VMShell qrexec.
 
-        p = self.run('QUBESRPC {} {}'.format(service, source),
-            localcmd=localcmd, passio_popen=passio_popen, user=user, wait=wait,
-            gui=gui, passio_stderr=passio_stderr)
-        if input:
-            p.communicate(input)
-            return p.returncode
-        else:
-            return p
+        This method is a coroutine.
+
+        *kwargs* are passed verbatim to :py:meth:`run_service`.
+        '''  # pylint: disable=redefined-builtin
+        return self.run_service('qubes.VMShell',
+            input=self._prepare_input_for_vmshell(command, input), **kwargs)
+
+    def run_for_stdio(self, command, input=None, **kwargs):
+        '''Run a shell command inside the domain using qubes.VMShell qrexec.
+
+        This method is a coroutine.
+
+        *kwargs* are passed verbatim to :py:meth:`run_service_for_stdio`.
+        See disclaimer there.
+        '''  # pylint: disable=redefined-builtin
+        return self.run_service_for_stdio('qubes.VMShell',
+            input=self._prepare_input_for_vmshell(command, input), **kwargs)
 
     def request_memory(self, mem_required=None):
         # overhead of per-qube/per-vcpu Xen structures,
@@ -1153,7 +1130,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
         :py:meth:`subprocess.check_call`)
         :param kwargs: args for :py:meth:`subprocess.check_call`
         :return: None
-        '''
+        '''  # pylint: disable=redefined-builtin
 
         if os.getuid() == 0:
             # try to always have VM daemons running as normal user, otherwise
@@ -1211,7 +1188,7 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
         except subprocess.CalledProcessError:
             raise qubes.exc.QubesException('Cannot execute qubesdb-daemon')
 
-    async def wait_for_session(self):
+    async def _wait_for_session(self):
         '''Wait until machine finished boot sequence.
 
         This is done by executing qubes RPC call that checks if dummy system
@@ -1220,14 +1197,14 @@ class QubesVM(qubes.vm.mix.net.NetVMMixin, qubes.vm.BaseVM):
 
         self.log.info('Waiting for qubes-session')
 
-        # Note : User root is redefined to SYSTEM in the Windows agent code
-        p = await asyncio.get_event_loop().run_in_executor(
-            functools.partial(self.run, 'QUBESRPC qubes.WaitForSession none',
-                user='root', passio_popen=True, gui=False, wait=True))
-        await asyncio.get_event_loop().run_in_executor(functools.partial(
-            p.communicate, input=self.default_user.encode()))
+        await self.run_service_for_stdio('qubes.WaitForSession',
+            user='root', gui=False, input=self.default_user.encode())
+
+        self.log.info('qubes-session acquired')
         self.have_session.set()
 
+        self.fire_event('have-session')
+
     def create_on_disk(self, pool=None, pools=None):
         '''Create files needed for VM.
         '''