diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 4be23cd..010603d 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -1,4 +1,4 @@ -# -*- encoding: utf8 -*- +# -*- encoding: utf-8 -*- # # The Qubes OS Project, http://www.qubes-os.org # @@ -25,6 +25,7 @@ Main Qubes() class and related classes. import os import shlex import socket +import shutil import subprocess import sys @@ -38,8 +39,6 @@ import qubesadmin.utils import qubesadmin.vm import qubesadmin.config -BUF_SIZE = 4096 - class VMCollection(object): """Collection of VMs objects""" @@ -480,7 +479,8 @@ class QubesBase(qubesadmin.base.PropertyHolder): """ Execute Admin API method. - Only one of `payload` and `payload_stream` can be specified. + If `payload` and `payload_stream` are both specified, they will be sent + in that order. :param dest: Destination VM name :param method: Full API method name ('admin...') @@ -516,6 +516,46 @@ class QubesBase(qubesadmin.base.PropertyHolder): 'run_service not implemented in QubesBase class; use specialized ' 'class: qubesadmin.Qubes()') + @staticmethod + def _call_with_stream(command, payload, payload_stream): + """Helper method to pass data to qubesd. Calls a command with + payload and payload_stream as input. + + :param command: command to run + :param payload: Initial payload, or None + :param payload_stream: File-like object with the rest of data + :return: (process, stdout, stderr) + """ + + if payload: + # It's not strictly correct to write data to stdin in this way, + # because the process can get blocked on stdout or stderr pipe. + # However, in practice the output should be always smaller than 4K. + proc = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc.stdin.write(payload) + try: + shutil.copyfileobj(payload_stream, proc.stdin) + except BrokenPipeError: + # We might receive an error from qubesd before we sent + # everything (for instance, because we are sending too much + # data). + pass + else: + # Connect the stream directly. + proc = subprocess.Popen( + command, + stdin=payload_stream, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + payload_stream.close() + stdout, stderr = proc.communicate() + return proc, stdout, stderr + class QubesLocal(QubesBase): """Application object communicating through local socket. @@ -530,7 +570,8 @@ class QubesLocal(QubesBase): """ Execute Admin API method. - Only one of `payload` and `payload_stream` can be specified. + If `payload` and `payload_stream` are both specified, they will be sent + in that order. :param dest: Destination VM name :param method: Full API method name ('admin...') @@ -541,9 +582,6 @@ class QubesLocal(QubesBase): .. warning:: *payload_stream* will get closed by this function """ - if payload and payload_stream: - raise ValueError( - 'Only one of payload and payload_stream can be used') if payload_stream: # payload_stream can be used for large amount of data, # so optimize for throughput, not latency: spawn actual qrexec @@ -558,11 +596,9 @@ class QubesLocal(QubesBase): 'QREXEC_REQUESTED_TARGET=' + dest, method_path, arg] if os.getuid() != 0: command.insert(0, 'sudo') - proc = subprocess.Popen(command, stdin=payload_stream, - stdout=subprocess.PIPE) - payload_stream.close() - (return_data, _) = proc.communicate() - return self._parse_qubesd_response(return_data) + (_, stdout, _) = self._call_with_stream( + command, payload, payload_stream) + return self._parse_qubesd_response(stdout) try: client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -647,7 +683,8 @@ class QubesRemote(QubesBase): """ Execute Admin API method. - Only one of `payload` and `payload_stream` can be specified. + If `payload` and `payload_stream` are both specified, they will be sent + in that order. :param dest: Destination VM name :param method: Full API method name ('admin...') @@ -658,20 +695,20 @@ class QubesRemote(QubesBase): .. warning:: *payload_stream* will get closed by this function """ - if payload and payload_stream: - raise ValueError( - 'Only one of payload and payload_stream can be used') service_name = method if arg is not None: service_name += '+' + arg - p = subprocess.Popen([qubesadmin.config.QREXEC_CLIENT_VM, - dest, service_name], - stdin=(payload_stream or subprocess.PIPE), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - if payload_stream is not None: - payload_stream.close() - (stdout, stderr) = p.communicate(payload) + command = [qubesadmin.config.QREXEC_CLIENT_VM, + dest, service_name] + if payload_stream: + (p, stdout, stderr) = self._call_with_stream( + command, payload, payload_stream) + else: + p = subprocess.Popen(command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate(payload) if p.returncode != 0: raise qubesadmin.exc.QubesDaemonNoResponseError( 'Service call error: %s', stderr.decode()) diff --git a/qubesadmin/storage.py b/qubesadmin/storage.py index 717e902..e4959b5 100644 --- a/qubesadmin/storage.py +++ b/qubesadmin/storage.py @@ -1,4 +1,4 @@ -# -*- encoding: utf8 -*- +# -*- encoding: utf-8 -*- # # The Qubes OS Project, http://www.qubes-os.org # @@ -20,7 +20,6 @@ '''Storage subsystem.''' - class Volume(object): '''Storage volume.''' def __init__(self, app, pool=None, vid=None, vm=None, vm_name=None): @@ -217,12 +216,26 @@ class Volume(object): def import_data(self, stream): ''' Import volume data from a given file-like object. - This function override existing volume content + This function overrides existing volume content. :param stream: file-like object, must support fileno() ''' self._qubesd_call('Import', payload_stream=stream) + def import_data_with_size(self, stream, size): + ''' Import volume data from a given file-like object, informing qubesd + that data has a specific size. + + This function overrides existing volume content. + + :param stream: file-like object, must support fileno() + :param size: size of data in bytes + ''' + size_line = str(size) + '\n' + self._qubesd_call( + 'ImportWithSize', payload=size_line.encode(), + payload_stream=stream) + def clone(self, source): ''' Clone data from sane volume of another VM. diff --git a/qubesadmin/tests/__init__.py b/qubesadmin/tests/__init__.py index d663f95..a116039 100644 --- a/qubesadmin/tests/__init__.py +++ b/qubesadmin/tests/__init__.py @@ -137,7 +137,7 @@ class QubesTest(qubesadmin.app.QubesBase): def qubesd_call(self, dest, method, arg=None, payload=None, payload_stream=None): if payload_stream: - payload = payload_stream.read() + payload = (payload or b'') + payload_stream.read() call_key = (dest, method, arg, payload) self.actual_calls.append(call_key) if call_key not in self.expected_calls: diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 1d607cd..ed9942a 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -1,4 +1,4 @@ -# -*- encoding: utf8 -*- +# -*- encoding: utf-8 -*- # # The Qubes OS Project, http://www.qubes-os.org # @@ -642,6 +642,8 @@ class TC_20_QubesLocal(unittest.TestCase): qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock') self.proc = None self.app = qubesadmin.app.QubesLocal() + self.tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmpdir) def listen_and_send(self, send_data): '''Listen on socket and send data in response. @@ -696,69 +698,58 @@ class TC_20_QubesLocal(unittest.TestCase): b'dom0\0some.method\0test-vm\0\0') def test_003_qubesd_call_payload_stream(self): - # this should really be in setUp()... - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - - service_path = os.path.join(tmpdir, 'test.service') - payload_input = os.path.join(tmpdir, 'payload-input') - with open(service_path, 'w') as f: - f.write('#!/bin/bash\n' - 'env > {dir}/env\n' - 'echo "$@" > {dir}/args\n' - 'cat > {dir}/payload\n' - 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir)) - os.chmod(service_path, 0o755) - with open(payload_input, 'w+') as payload_file: - payload_file.write('some payload\n') + payload_input = os.path.join(self.tmpdir, 'payload-input') + with open(payload_input, 'w+b') as payload_file: + payload_file.write(b'some payload\n') payload_file.seek(0) - with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR', - tmpdir): - value = self.app.qubesd_call('test-vm', 'test.service', - 'some-arg', payload_stream=payload_file) - self.assertEqual(value, b'return-value') - self.assertTrue(os.path.exists(tmpdir + '/env')) - with open(tmpdir + '/env') as env: - self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env) - self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env) - self.assertTrue(os.path.exists(tmpdir + '/args')) - with open(tmpdir + '/args') as args: - self.assertEqual(args.read(), 'some-arg\n') - self.assertTrue(os.path.exists(tmpdir + '/payload')) - with open(tmpdir + '/payload') as payload: - self.assertEqual(payload.read(), 'some payload\n') + + self._call_test_service_with_payload_stream( + payload_file, expected=b'some payload\n') def test_004_qubesd_call_payload_stream_proc(self): - # this should really be in setUp()... - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - - service_path = os.path.join(tmpdir, 'test.service') + service_path = os.path.join(self.tmpdir, 'test.service') echo = subprocess.Popen(['echo', 'some payload'], stdout=subprocess.PIPE) + self._call_test_service_with_payload_stream( + echo.stdout, expected=b'some payload\n') + + def test_005_qubesd_call_payload_stream_with_prefix(self): + payload_input = os.path.join(self.tmpdir, 'payload-input') + with open(payload_input, 'w+b') as payload_file: + payload_file.write(b'some payload\n') + payload_file.seek(0) + + self._call_test_service_with_payload_stream( + payload_file, payload=b'first line\n', + expected=b'first line\nsome payload\n') + + def _call_test_service_with_payload_stream( + self, payload_stream, payload=None, expected=b''): + service_path = os.path.join(self.tmpdir, 'test.service') with open(service_path, 'w') as f: f.write('#!/bin/bash\n' 'env > {dir}/env\n' 'echo "$@" > {dir}/args\n' 'cat > {dir}/payload\n' - 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir)) + 'echo -en \'0\\0return-value\'\n'.format(dir=self.tmpdir)) os.chmod(service_path, 0o755) + with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR', - tmpdir): - value = self.app.qubesd_call('test-vm', 'test.service', - 'some-arg', payload_stream=echo.stdout) - echo.stdout.close() + self.tmpdir): + value = self.app.qubesd_call( + 'test-vm', 'test.service', + 'some-arg', payload=payload, payload_stream=payload_stream) self.assertEqual(value, b'return-value') - self.assertTrue(os.path.exists(tmpdir + '/env')) - with open(tmpdir + '/env') as env: + self.assertTrue(os.path.exists(self.tmpdir + '/env')) + with open(self.tmpdir + '/env') as env: self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env) self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env) - self.assertTrue(os.path.exists(tmpdir + '/args')) - with open(tmpdir + '/args') as args: + self.assertTrue(os.path.exists(self.tmpdir + '/args')) + with open(self.tmpdir + '/args') as args: self.assertEqual(args.read(), 'some-arg\n') - self.assertTrue(os.path.exists(tmpdir + '/payload')) - with open(tmpdir + '/payload') as payload: - self.assertEqual(payload.read(), 'some payload\n') + self.assertTrue(os.path.exists(self.tmpdir + '/payload')) + with open(self.tmpdir + '/payload', 'rb') as payload_f: + self.assertEqual(payload_f.read(), expected) @mock.patch('os.isatty', lambda fd: fd == 2) def test_010_run_service(self): @@ -867,18 +858,43 @@ class TC_30_QubesRemote(unittest.TestCase): self.addCleanup(shutil.rmtree, tmpdir) payload_input = os.path.join(tmpdir, 'payload-input') - with open(payload_input, 'w+') as payload_file: - payload_file.write('some payload\n') + with open(payload_input, 'w+b') as payload_file: + payload_file.write(b'some payload\n') payload_file.seek(0) value = self.app.qubesd_call('test-vm', 'some.method', 'some-arg', payload_stream=payload_file) - self.assertEqual(self.proc_mock.mock_calls, [ + self.assertListEqual(self.proc_mock.mock_calls, [ + mock.call( + [qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm', + 'some.method+some-arg'], + stdin=payload_file, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE), + mock.call().communicate() + ]) + self.assertEqual(value, b'return-value') + + def test_004_qubesd_call_payload_stream_with_prefix(self): + self.set_proc_stdout(b'0\0return-value') + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + + payload_input = os.path.join(tmpdir, 'payload-input') + with open(payload_input, 'w+b') as payload_file: + payload_file.write(b'some payload\n') + payload_file.seek(0) + + value = self.app.qubesd_call('test-vm', 'some.method', + 'some-arg', payload=b'first line\n', payload_stream=payload_file) + self.assertListEqual(self.proc_mock.mock_calls, [ mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm', 'some.method+some-arg'], - stdin=payload_file, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE), - mock.call().communicate(None) + mock.call().stdin.write(b'first line\n'), + mock.call().stdin.write(b'some payload\n'), + mock.call().communicate() ]) self.assertEqual(value, b'return-value') diff --git a/qubesadmin/tests/tools/qvm_volume.py b/qubesadmin/tests/tools/qvm_volume.py index 37af3a8..a73f50c 100644 --- a/qubesadmin/tests/tools/qvm_volume.py +++ b/qubesadmin/tests/tools/qvm_volume.py @@ -1,4 +1,4 @@ -# -*- encoding: utf8 -*- +# -*- encoding: utf-8 -*- # # The Qubes OS Project, http://www.qubes-os.org # @@ -508,10 +508,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase): b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Resize', 'private', b'9')] = \ - b'0\x00' - self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ + ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') @@ -541,10 +538,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase): b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Resize', 'private', b'9')] = \ - b'0\x00' - self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ + ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') @@ -575,10 +569,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase): b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Resize', 'private', b'512')] = \ - b'0\x00' - self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ + ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'512\ntest-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') @@ -638,7 +629,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase): b'revisions_to_keep=0\n' \ b'is_outdated=False\n' self.app.expected_calls[ - ('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \ + ('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \ b'0\x00' with tempfile.NamedTemporaryFile() as input_file: input_file.write(b'test-data') diff --git a/qubesadmin/tests/utils.py b/qubesadmin/tests/utils.py index f9b5509..4362aca 100644 --- a/qubesadmin/tests/utils.py +++ b/qubesadmin/tests/utils.py @@ -1,4 +1,4 @@ -# -*- encoding: utf8 -*- +# -*- encoding: utf-8 -*- # # The Qubes OS Project, http://www.qubes-os.org # diff --git a/qubesadmin/tools/qvm_volume.py b/qubesadmin/tools/qvm_volume.py index 018e9f8..5ed77d1 100644 --- a/qubesadmin/tools/qvm_volume.py +++ b/qubesadmin/tools/qvm_volume.py @@ -144,7 +144,9 @@ def import_volume(args): else: input_file = open(input_path, 'rb') try: - if not args.no_resize: + if args.no_resize: + volume.import_data(stream=input_file) + else: if args.size: size = args.size else: @@ -155,11 +157,7 @@ def import_volume(args): 'Failed to get %s file size, ' 'specify it explicitly with --size, ' 'or use --no-resize option', str(e)) - if size > old_size: - volume.resize(size) - volume.import_data(stream=input_file) - if not args.no_resize and size < old_size: - volume.resize(size) + volume.import_data_with_size(stream=input_file, size=size) finally: if input_path != '-': input_file.close()