tools/qvm-run: fix handling copying stdin to the process
Launch stdin copy loop in a separate process (multiprocessing.Process) and terminate it when target process is terminated. Another idea here was threads, but there is no API to kill a thread waiting on read().
This commit is contained in:
		
							parent
							
								
									8f7b0009db
								
							
						
					
					
						commit
						941b553b81
					
				@ -21,14 +21,12 @@
 | 
			
		||||
''' qvm-run tool'''
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import signal
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import asyncio
 | 
			
		||||
 | 
			
		||||
import functools
 | 
			
		||||
import subprocess
 | 
			
		||||
 | 
			
		||||
import multiprocessing
 | 
			
		||||
 | 
			
		||||
import qubesadmin.tools
 | 
			
		||||
import qubesadmin.exc
 | 
			
		||||
 | 
			
		||||
@ -93,30 +91,12 @@ parser.add_argument('--service',
 | 
			
		||||
parser.add_argument('cmd', metavar='COMMAND',
 | 
			
		||||
    help='command to run')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DataCopyProtocol(asyncio.Protocol):
 | 
			
		||||
    '''Simple protocol to copy received data into another stream'''
 | 
			
		||||
 | 
			
		||||
    def __init__(self, target_stream, eof_callback=None):
 | 
			
		||||
        self.target_stream = target_stream
 | 
			
		||||
        self.eof_callback = eof_callback
 | 
			
		||||
 | 
			
		||||
    def data_received(self, data):
 | 
			
		||||
        '''Handle received data'''
 | 
			
		||||
        self.target_stream.write(data)
 | 
			
		||||
        self.target_stream.flush()
 | 
			
		||||
 | 
			
		||||
    def eof_received(self):
 | 
			
		||||
        '''Handle received EOF'''
 | 
			
		||||
        if self.eof_callback:
 | 
			
		||||
            self.eof_callback()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def stop_loop_if_terminated(proc, loop):
 | 
			
		||||
    '''Stop event loop if given process is terminated'''
 | 
			
		||||
    if proc.poll():
 | 
			
		||||
        loop.stop()
 | 
			
		||||
 | 
			
		||||
def copy_stdin(stream):
 | 
			
		||||
    # multiprocessing.Process have sys.stdin connected to /dev/null
 | 
			
		||||
    stdin = open(0)
 | 
			
		||||
    for data in iter(lambda: stdin.buffer.read(4096), b''):
 | 
			
		||||
        stream.write(data)
 | 
			
		||||
    stream.close()
 | 
			
		||||
 | 
			
		||||
def main(args=None, app=None):
 | 
			
		||||
    '''Main function of qvm-run tool'''
 | 
			
		||||
@ -161,6 +141,7 @@ def main(args=None, app=None):
 | 
			
		||||
    if args.color_stderr:
 | 
			
		||||
        sys.stderr.write('\033[0;{}m'.format(args.color_stderr))
 | 
			
		||||
        sys.stderr.flush()
 | 
			
		||||
    copy_proc = None
 | 
			
		||||
    try:
 | 
			
		||||
        procs = []
 | 
			
		||||
        for vm in args.domains:
 | 
			
		||||
@ -194,16 +175,10 @@ def main(args=None, app=None):
 | 
			
		||||
                    proc.stdin.write(vm.prepare_input_for_vmshell(args.cmd))
 | 
			
		||||
                    proc.stdin.flush()
 | 
			
		||||
                if args.passio and not args.localcmd:
 | 
			
		||||
                    loop = asyncio.new_event_loop()
 | 
			
		||||
                    loop.add_signal_handler(signal.SIGCHLD,
 | 
			
		||||
                        functools.partial(stop_loop_if_terminated, proc, loop))
 | 
			
		||||
                    asyncio.ensure_future(loop.connect_read_pipe(
 | 
			
		||||
                        functools.partial(DataCopyProtocol, proc.stdin,
 | 
			
		||||
                            loop.stop),
 | 
			
		||||
                        sys.stdin), loop=loop)
 | 
			
		||||
                    stop_loop_if_terminated(proc, loop)
 | 
			
		||||
                    loop.run_forever()
 | 
			
		||||
                    loop.close()
 | 
			
		||||
                    copy_proc = multiprocessing.Process(target=copy_stdin,
 | 
			
		||||
                        args=(proc.stdin,))
 | 
			
		||||
                    copy_proc.start()
 | 
			
		||||
                    # keep the copying process running
 | 
			
		||||
                proc.stdin.close()
 | 
			
		||||
                procs.append(proc)
 | 
			
		||||
            except qubesadmin.exc.QubesException as e:
 | 
			
		||||
@ -221,6 +196,8 @@ def main(args=None, app=None):
 | 
			
		||||
        if args.color_stderr:
 | 
			
		||||
            sys.stderr.write('\033[0m')
 | 
			
		||||
            sys.stderr.flush()
 | 
			
		||||
        if copy_proc is not None:
 | 
			
		||||
            copy_proc.terminate()
 | 
			
		||||
 | 
			
		||||
    return retcode
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user