Преглед на файлове

Add efficient method to handle large payloads for Admin API methods

Add qubesd_call(..., payload_stream=...) argument to allow streaming
payload directly from some file/process stdout. This is mainly (only?)
useful for admin.vm.volume.Import, where disk volume raw data is passed
to the service.
Marek Marczykowski-Górecki преди 7 години
родител
ревизия
0b2f7ac958
променени са 5 файла, в които са добавени 156 реда и са изтрити 7 реда
  1. 55 3
      qubesadmin/app.py
  2. 7 2
      qubesadmin/base.py
  3. 1 0
      qubesadmin/config.py
  4. 4 1
      qubesadmin/tests/__init__.py
  5. 89 1
      qubesadmin/tests/app.py

+ 55 - 3
qubesadmin/app.py

@@ -22,6 +22,7 @@
 '''
 Main Qubes() class and related classes.
 '''
+import os
 import shlex
 import socket
 import subprocess
@@ -335,7 +336,41 @@ class QubesLocal(QubesBase):
 
     qubesd_connection_type = 'socket'
 
-    def qubesd_call(self, dest, method, arg=None, payload=None):
+    def qubesd_call(self, dest, method, arg=None, payload=None,
+            payload_stream=None):
+        '''
+        Execute Admin API method.
+
+        Only one of `payload` and `payload_stream` can be specified.
+
+        :param dest: Destination VM name
+        :param method: Full API method name ('admin...')
+        :param arg: Method argument (if any)
+        :param payload: Payload send to the method
+        :param payload_stream: file-like object to read payload from
+        :return: Data returned by qubesd (string)
+        '''
+        if payload and payload_stream:
+            raise ValueError(
+                'Only one of payload and payload_stream can be used')
+        if payload_stream:
+            # payload_stream can be used for large amount of data,
+            # so optimize for throughput, not latency: spawn actual qrexec
+            # service implementation, which may use some optimization there (
+            # see admin.vm.volume.Import - actual data handling is done with dd)
+            method_path = os.path.join(
+                qubesadmin.config.QREXEC_SERVICES_DIR, method)
+            if not os.path.exists(method_path):
+                raise qubesadmin.exc.QubesDaemonCommunicationError(
+                    '{} not found'.format(method_path))
+            qrexec_call_env = os.environ.copy()
+            qrexec_call_env['QREXEC_REMOTE_DOMAIN'] = 'dom0'
+            qrexec_call_env['QREXEC_REQUESTED_TARGET'] = dest
+            proc = subprocess.Popen([method_path, arg], stdin=payload_stream,
+                stdout=subprocess.PIPE, env=qrexec_call_env)
+            (return_data, _) = proc.communicate()
+            return self._parse_qubesd_response(return_data)
+
         try:
             client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
             client_socket.connect(qubesadmin.config.QUBESD_SOCKET)
@@ -406,13 +441,30 @@ class QubesRemote(QubesBase):
 
     qubesd_connection_type = 'qrexec'
 
-    def qubesd_call(self, dest, method, arg=None, payload=None):
+    def qubesd_call(self, dest, method, arg=None, payload=None,
+            payload_stream=None):
+        '''
+        Execute Admin API method.
+
+        Only one of `payload` and `payload_stream` can be specified.
+
+        :param dest: Destination VM name
+        :param method: Full API method name ('admin...')
+        :param arg: Method argument (if any)
+        :param payload: Payload send to the method
+        :param payload_stream: file-like object to read payload from
+        :return: Data returned by qubesd (string)
+        '''
+        if payload and payload_stream:
+            raise ValueError(
+                'Only one of payload and payload_stream can be used')
         service_name = method
         if arg is not None:
             service_name += '+' + arg
         p = subprocess.Popen([qubesadmin.config.QREXEC_CLIENT_VM,
             dest, service_name],
-            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+            stdin=(payload_stream or subprocess.PIPE),
+            stdout=subprocess.PIPE,
             stderr=subprocess.PIPE)
         (stdout, stderr) = p.communicate(payload)
         if p.returncode != 0:

+ 7 - 2
qubesadmin/base.py

@@ -46,22 +46,27 @@ class PropertyHolder(object):
         self._properties = None
         self._properties_help = None
 
-    def qubesd_call(self, dest, method, arg=None, payload=None):
+    def qubesd_call(self, dest, method, arg=None, payload=None,
+            payload_stream=None):
         '''
         Call into qubesd using appropriate mechanism. This method should be
         defined by a subclass.
 
+        Only one of `payload` and `payload_stream` can be specified.
+
         :param dest: Destination VM name
         :param method: Full API method name ('admin...')
         :param arg: Method argument (if any)
         :param payload: Payload send to the method
+        :param payload_stream: file-like object to read payload from
         :return: Data returned by qubesd (string)
         '''
         if dest is None:
             dest = self._method_dest
         # have the actual implementation at Qubes() instance
         if self.app:
-            return self.app.qubesd_call(dest, method, arg, payload)
+            return self.app.qubesd_call(dest, method, arg, payload,
+                payload_stream)
         raise NotImplementedError
 
     @staticmethod

+ 1 - 0
qubesadmin/config.py

@@ -25,3 +25,4 @@ QUBESD_SOCKET = '/var/run/qubesd.sock'
 QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client'
 QREXEC_CLIENT_VM = '/usr/bin/qrexec-client-vm'
 QUBESD_RECONNECT_DELAY = 1.0
+QREXEC_SERVICES_DIR = '/etc/qubes-rpc'

+ 4 - 1
qubesadmin/tests/__init__.py

@@ -125,7 +125,10 @@ class QubesTest(qubesadmin.app.QubesBase):
         #: rpc service calls
         self.service_calls = []
 
-    def qubesd_call(self, dest, method, arg=None, payload=None):
+    def qubesd_call(self, dest, method, arg=None, payload=None,
+            payload_stream=None):
+        if payload_stream:
+            payload = payload_stream.read()
         call_key = (dest, method, arg, payload)
         self.actual_calls.append(call_key)
         if call_key not in self.expected_calls:

+ 89 - 1
qubesadmin/tests/app.py

@@ -21,7 +21,6 @@ import os
 import shutil
 import socket
 import subprocess
-import tempfile
 import unittest
 
 import multiprocessing
@@ -31,6 +30,8 @@ try:
 except ImportError:
     import mock
 
+import tempfile
+
 import qubesadmin.tests
 
 
@@ -257,6 +258,72 @@ class TC_20_QubesLocal(unittest.TestCase):
         self.assertEqual(self.get_request(),
             b'dom0\0some.method\0test-vm\0\0')
 
+    def test_003_qubesd_call_payload_stream(self):
+        # this should really be in setUp()...
+        tmpdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, tmpdir)
+
+        service_path = os.path.join(tmpdir, 'test.service')
+        payload_input = os.path.join(tmpdir, 'payload-input')
+        with open(service_path, 'w') as f:
+            f.write('#!/bin/bash\n'
+                    'env > {dir}/env\n'
+                    'echo "$@" > {dir}/args\n'
+                    'cat > {dir}/payload\n'
+                    'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
+        os.chmod(service_path, 0o755)
+        with open(payload_input, 'w+') as payload_file:
+            payload_file.write('some payload\n')
+            payload_file.seek(0)
+            with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
+                    tmpdir):
+                value = self.app.qubesd_call('test-vm', 'test.service',
+                    'some-arg', payload_stream=payload_file)
+        self.assertEqual(value, b'return-value')
+        self.assertTrue(os.path.exists(tmpdir + '/env'))
+        with open(tmpdir + '/env') as env:
+            self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
+            self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
+        self.assertTrue(os.path.exists(tmpdir + '/args'))
+        with open(tmpdir + '/args') as args:
+            self.assertEqual(args.read(), 'some-arg\n')
+        self.assertTrue(os.path.exists(tmpdir + '/payload'))
+        with open(tmpdir + '/payload') as payload:
+            self.assertEqual(payload.read(), 'some payload\n')
+
+    def test_004_qubesd_call_payload_stream_proc(self):
+        # this should really be in setUp()...
+        tmpdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, tmpdir)
+
+        service_path = os.path.join(tmpdir, 'test.service')
+        echo = subprocess.Popen(['echo', 'some payload'],
+            stdout=subprocess.PIPE)
+        with open(service_path, 'w') as f:
+            f.write('#!/bin/bash\n'
+                    'env > {dir}/env\n'
+                    'echo "$@" > {dir}/args\n'
+                    'cat > {dir}/payload\n'
+                    'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
+        os.chmod(service_path, 0o755)
+        with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
+                tmpdir):
+            value = self.app.qubesd_call('test-vm', 'test.service',
+                'some-arg', payload_stream=echo.stdout)
+        echo.stdout.close()
+        self.assertEqual(value, b'return-value')
+        self.assertTrue(os.path.exists(tmpdir + '/env'))
+        with open(tmpdir + '/env') as env:
+            self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
+            self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
+        self.assertTrue(os.path.exists(tmpdir + '/args'))
+        with open(tmpdir + '/args') as args:
+            self.assertEqual(args.read(), 'some-arg\n')
+        self.assertTrue(os.path.exists(tmpdir + '/payload'))
+        with open(tmpdir + '/payload') as payload:
+            self.assertEqual(payload.read(), 'some payload\n')
+
+
     def test_010_run_service(self):
         self.listen_and_send(b'0\0')
         with mock.patch('subprocess.Popen') as mock_proc:
@@ -356,6 +423,27 @@ class TC_30_QubesRemote(unittest.TestCase):
             mock.call().communicate(None)
         ])
 
+    def test_003_qubesd_call_payload_stream(self):
+        self.set_proc_stdout(b'0\0return-value')
+        tmpdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, tmpdir)
+
+        payload_input = os.path.join(tmpdir, 'payload-input')
+        with open(payload_input, 'w+') as payload_file:
+            payload_file.write('some payload\n')
+            payload_file.seek(0)
+
+            value = self.app.qubesd_call('test-vm', 'some.method',
+                'some-arg', payload_stream=payload_file)
+        self.assertEqual(self.proc_mock.mock_calls, [
+            mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
+                'some.method+some-arg'],
+                stdin=payload_file, stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE),
+            mock.call().communicate(None)
+        ])
+        self.assertEqual(value, b'return-value')
+
     def test_010_run_service(self):
         self.app.run_service('some-vm', 'service.name')
         self.proc_mock.assert_called_once_with([