From 0b2f7ac958590cf5cb0167e68206c6edc1620b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Wed, 24 May 2017 04:24:22 +0200 Subject: [PATCH] Add efficient method to handle large payloads for Admin API methods Add qubesd_call(..., payload_stream=...) argument to allow streaming payload directly from some file/process stdout. This is mainly (only?) useful for admin.vm.volume.Import, where disk volume raw data is passed to the service. --- qubesadmin/app.py | 58 +++++++++++++++++++++-- qubesadmin/base.py | 9 +++- qubesadmin/config.py | 1 + qubesadmin/tests/__init__.py | 5 +- qubesadmin/tests/app.py | 90 +++++++++++++++++++++++++++++++++++- 5 files changed, 156 insertions(+), 7 deletions(-) diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 5aae294..83fc059 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -22,6 +22,7 @@ ''' Main Qubes() class and related classes. ''' +import os import shlex import socket import subprocess @@ -335,7 +336,41 @@ class QubesLocal(QubesBase): qubesd_connection_type = 'socket' - def qubesd_call(self, dest, method, arg=None, payload=None): + def qubesd_call(self, dest, method, arg=None, payload=None, + payload_stream=None): + ''' + Execute Admin API method. + + Only one of `payload` and `payload_stream` can be specified. + + :param dest: Destination VM name + :param method: Full API method name ('admin...') + :param arg: Method argument (if any) + :param payload: Payload send to the method + :param payload_stream: file-like object to read payload from + :return: Data returned by qubesd (string) + ''' + if payload and payload_stream: + raise ValueError( + 'Only one of payload and payload_stream can be used') + if payload_stream: + # payload_stream can be used for large amount of data, + # so optimize for throughput, not latency: spawn actual qrexec + # service implementation, which may use some optimization there ( + # see admin.vm.volume.Import - actual data handling is done with dd) + method_path = os.path.join( + qubesadmin.config.QREXEC_SERVICES_DIR, method) + if not os.path.exists(method_path): + raise qubesadmin.exc.QubesDaemonCommunicationError( + '{} not found'.format(method_path)) + qrexec_call_env = os.environ.copy() + qrexec_call_env['QREXEC_REMOTE_DOMAIN'] = 'dom0' + qrexec_call_env['QREXEC_REQUESTED_TARGET'] = dest + proc = subprocess.Popen([method_path, arg], stdin=payload_stream, + stdout=subprocess.PIPE, env=qrexec_call_env) + (return_data, _) = proc.communicate() + return self._parse_qubesd_response(return_data) + try: client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) client_socket.connect(qubesadmin.config.QUBESD_SOCKET) @@ -406,13 +441,30 @@ class QubesRemote(QubesBase): qubesd_connection_type = 'qrexec' - def qubesd_call(self, dest, method, arg=None, payload=None): + def qubesd_call(self, dest, method, arg=None, payload=None, + payload_stream=None): + ''' + Execute Admin API method. + + Only one of `payload` and `payload_stream` can be specified. + + :param dest: Destination VM name + :param method: Full API method name ('admin...') + :param arg: Method argument (if any) + :param payload: Payload send to the method + :param payload_stream: file-like object to read payload from + :return: Data returned by qubesd (string) + ''' + if payload and payload_stream: + raise ValueError( + 'Only one of payload and payload_stream can be used') service_name = method if arg is not None: service_name += '+' + arg p = subprocess.Popen([qubesadmin.config.QREXEC_CLIENT_VM, dest, service_name], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stdin=(payload_stream or subprocess.PIPE), + stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = p.communicate(payload) if p.returncode != 0: diff --git a/qubesadmin/base.py b/qubesadmin/base.py index f3d8f3d..2f680aa 100644 --- a/qubesadmin/base.py +++ b/qubesadmin/base.py @@ -46,22 +46,27 @@ class PropertyHolder(object): self._properties = None self._properties_help = None - def qubesd_call(self, dest, method, arg=None, payload=None): + def qubesd_call(self, dest, method, arg=None, payload=None, + payload_stream=None): ''' Call into qubesd using appropriate mechanism. This method should be defined by a subclass. + Only one of `payload` and `payload_stream` can be specified. + :param dest: Destination VM name :param method: Full API method name ('admin...') :param arg: Method argument (if any) :param payload: Payload send to the method + :param payload_stream: file-like object to read payload from :return: Data returned by qubesd (string) ''' if dest is None: dest = self._method_dest # have the actual implementation at Qubes() instance if self.app: - return self.app.qubesd_call(dest, method, arg, payload) + return self.app.qubesd_call(dest, method, arg, payload, + payload_stream) raise NotImplementedError @staticmethod diff --git a/qubesadmin/config.py b/qubesadmin/config.py index 4a08944..b0788df 100644 --- a/qubesadmin/config.py +++ b/qubesadmin/config.py @@ -25,3 +25,4 @@ QUBESD_SOCKET = '/var/run/qubesd.sock' QREXEC_CLIENT = '/usr/lib/qubes/qrexec-client' QREXEC_CLIENT_VM = '/usr/bin/qrexec-client-vm' QUBESD_RECONNECT_DELAY = 1.0 +QREXEC_SERVICES_DIR = '/etc/qubes-rpc' diff --git a/qubesadmin/tests/__init__.py b/qubesadmin/tests/__init__.py index 42ad41a..a460a59 100644 --- a/qubesadmin/tests/__init__.py +++ b/qubesadmin/tests/__init__.py @@ -125,7 +125,10 @@ class QubesTest(qubesadmin.app.QubesBase): #: rpc service calls self.service_calls = [] - def qubesd_call(self, dest, method, arg=None, payload=None): + def qubesd_call(self, dest, method, arg=None, payload=None, + payload_stream=None): + if payload_stream: + payload = payload_stream.read() call_key = (dest, method, arg, payload) self.actual_calls.append(call_key) if call_key not in self.expected_calls: diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 02ff4a3..7384eaf 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -21,7 +21,6 @@ import os import shutil import socket import subprocess -import tempfile import unittest import multiprocessing @@ -31,6 +30,8 @@ try: except ImportError: import mock +import tempfile + import qubesadmin.tests @@ -257,6 +258,72 @@ class TC_20_QubesLocal(unittest.TestCase): self.assertEqual(self.get_request(), b'dom0\0some.method\0test-vm\0\0') + def test_003_qubesd_call_payload_stream(self): + # this should really be in setUp()... + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + + service_path = os.path.join(tmpdir, 'test.service') + payload_input = os.path.join(tmpdir, 'payload-input') + with open(service_path, 'w') as f: + f.write('#!/bin/bash\n' + 'env > {dir}/env\n' + 'echo "$@" > {dir}/args\n' + 'cat > {dir}/payload\n' + 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir)) + os.chmod(service_path, 0o755) + with open(payload_input, 'w+') as payload_file: + payload_file.write('some payload\n') + payload_file.seek(0) + with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR', + tmpdir): + value = self.app.qubesd_call('test-vm', 'test.service', + 'some-arg', payload_stream=payload_file) + self.assertEqual(value, b'return-value') + self.assertTrue(os.path.exists(tmpdir + '/env')) + with open(tmpdir + '/env') as env: + self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env) + self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env) + self.assertTrue(os.path.exists(tmpdir + '/args')) + with open(tmpdir + '/args') as args: + self.assertEqual(args.read(), 'some-arg\n') + self.assertTrue(os.path.exists(tmpdir + '/payload')) + with open(tmpdir + '/payload') as payload: + self.assertEqual(payload.read(), 'some payload\n') + + def test_004_qubesd_call_payload_stream_proc(self): + # this should really be in setUp()... + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + + service_path = os.path.join(tmpdir, 'test.service') + echo = subprocess.Popen(['echo', 'some payload'], + stdout=subprocess.PIPE) + with open(service_path, 'w') as f: + f.write('#!/bin/bash\n' + 'env > {dir}/env\n' + 'echo "$@" > {dir}/args\n' + 'cat > {dir}/payload\n' + 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir)) + os.chmod(service_path, 0o755) + with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR', + tmpdir): + value = self.app.qubesd_call('test-vm', 'test.service', + 'some-arg', payload_stream=echo.stdout) + echo.stdout.close() + self.assertEqual(value, b'return-value') + self.assertTrue(os.path.exists(tmpdir + '/env')) + with open(tmpdir + '/env') as env: + self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env) + self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env) + self.assertTrue(os.path.exists(tmpdir + '/args')) + with open(tmpdir + '/args') as args: + self.assertEqual(args.read(), 'some-arg\n') + self.assertTrue(os.path.exists(tmpdir + '/payload')) + with open(tmpdir + '/payload') as payload: + self.assertEqual(payload.read(), 'some payload\n') + + def test_010_run_service(self): self.listen_and_send(b'0\0') with mock.patch('subprocess.Popen') as mock_proc: @@ -356,6 +423,27 @@ class TC_30_QubesRemote(unittest.TestCase): mock.call().communicate(None) ]) + def test_003_qubesd_call_payload_stream(self): + self.set_proc_stdout(b'0\0return-value') + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + + payload_input = os.path.join(tmpdir, 'payload-input') + with open(payload_input, 'w+') as payload_file: + payload_file.write('some payload\n') + payload_file.seek(0) + + value = self.app.qubesd_call('test-vm', 'some.method', + 'some-arg', payload_stream=payload_file) + self.assertEqual(self.proc_mock.mock_calls, [ + mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm', + 'some.method+some-arg'], + stdin=payload_file, stdout=subprocess.PIPE, + stderr=subprocess.PIPE), + mock.call().communicate(None) + ]) + self.assertEqual(value, b'return-value') + def test_010_run_service(self): self.app.run_service('some-vm', 'service.name') self.proc_mock.assert_called_once_with([