Import data using ImportWithSize call, not manual resizing

See the API call in QubesOS/qubes-core-admin#309.
This commit is contained in:
Pawel Marczewski 2020-01-20 10:38:52 +01:00
parent 204c33afd1
commit 9cf05e5180
No known key found for this signature in database
GPG Key ID: DE42EE9B14F96465
7 changed files with 158 additions and 103 deletions

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
@ -25,6 +25,7 @@ Main Qubes() class and related classes.
import os
import shlex
import socket
import shutil
import subprocess
import sys
@ -38,8 +39,6 @@ import qubesadmin.utils
import qubesadmin.vm
import qubesadmin.config
BUF_SIZE = 4096
class VMCollection(object):
"""Collection of VMs objects"""
@ -480,7 +479,8 @@ class QubesBase(qubesadmin.base.PropertyHolder):
"""
Execute Admin API method.
Only one of `payload` and `payload_stream` can be specified.
If `payload` and `payload_stream` are both specified, they will be sent
in that order.
:param dest: Destination VM name
:param method: Full API method name ('admin...')
@ -516,6 +516,46 @@ class QubesBase(qubesadmin.base.PropertyHolder):
'run_service not implemented in QubesBase class; use specialized '
'class: qubesadmin.Qubes()')
@staticmethod
def _call_with_stream(command, payload, payload_stream):
"""Helper method to pass data to qubesd. Calls a command with
payload and payload_stream as input.
:param command: command to run
:param payload: Initial payload, or None
:param payload_stream: File-like object with the rest of data
:return: (process, stdout, stderr)
"""
if payload:
# It's not strictly correct to write data to stdin in this way,
# because the process can get blocked on stdout or stderr pipe.
# However, in practice the output should be always smaller than 4K.
proc = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.stdin.write(payload)
try:
shutil.copyfileobj(payload_stream, proc.stdin)
except BrokenPipeError:
# We might receive an error from qubesd before we sent
# everything (for instance, because we are sending too much
# data).
pass
else:
# Connect the stream directly.
proc = subprocess.Popen(
command,
stdin=payload_stream,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
payload_stream.close()
stdout, stderr = proc.communicate()
return proc, stdout, stderr
class QubesLocal(QubesBase):
"""Application object communicating through local socket.
@ -530,7 +570,8 @@ class QubesLocal(QubesBase):
"""
Execute Admin API method.
Only one of `payload` and `payload_stream` can be specified.
If `payload` and `payload_stream` are both specified, they will be sent
in that order.
:param dest: Destination VM name
:param method: Full API method name ('admin...')
@ -541,9 +582,6 @@ class QubesLocal(QubesBase):
.. warning:: *payload_stream* will get closed by this function
"""
if payload and payload_stream:
raise ValueError(
'Only one of payload and payload_stream can be used')
if payload_stream:
# payload_stream can be used for large amount of data,
# so optimize for throughput, not latency: spawn actual qrexec
@ -558,11 +596,9 @@ class QubesLocal(QubesBase):
'QREXEC_REQUESTED_TARGET=' + dest, method_path, arg]
if os.getuid() != 0:
command.insert(0, 'sudo')
proc = subprocess.Popen(command, stdin=payload_stream,
stdout=subprocess.PIPE)
payload_stream.close()
(return_data, _) = proc.communicate()
return self._parse_qubesd_response(return_data)
(_, stdout, _) = self._call_with_stream(
command, payload, payload_stream)
return self._parse_qubesd_response(stdout)
try:
client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@ -647,7 +683,8 @@ class QubesRemote(QubesBase):
"""
Execute Admin API method.
Only one of `payload` and `payload_stream` can be specified.
If `payload` and `payload_stream` are both specified, they will be sent
in that order.
:param dest: Destination VM name
:param method: Full API method name ('admin...')
@ -658,20 +695,20 @@ class QubesRemote(QubesBase):
.. warning:: *payload_stream* will get closed by this function
"""
if payload and payload_stream:
raise ValueError(
'Only one of payload and payload_stream can be used')
service_name = method
if arg is not None:
service_name += '+' + arg
p = subprocess.Popen([qubesadmin.config.QREXEC_CLIENT_VM,
dest, service_name],
stdin=(payload_stream or subprocess.PIPE),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if payload_stream is not None:
payload_stream.close()
(stdout, stderr) = p.communicate(payload)
command = [qubesadmin.config.QREXEC_CLIENT_VM,
dest, service_name]
if payload_stream:
(p, stdout, stderr) = self._call_with_stream(
command, payload, payload_stream)
else:
p = subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate(payload)
if p.returncode != 0:
raise qubesadmin.exc.QubesDaemonNoResponseError(
'Service call error: %s', stderr.decode())

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
@ -20,7 +20,6 @@
'''Storage subsystem.'''
class Volume(object):
'''Storage volume.'''
def __init__(self, app, pool=None, vid=None, vm=None, vm_name=None):
@ -217,12 +216,26 @@ class Volume(object):
def import_data(self, stream):
''' Import volume data from a given file-like object.
This function override existing volume content
This function overrides existing volume content.
:param stream: file-like object, must support fileno()
'''
self._qubesd_call('Import', payload_stream=stream)
def import_data_with_size(self, stream, size):
''' Import volume data from a given file-like object, informing qubesd
that data has a specific size.
This function overrides existing volume content.
:param stream: file-like object, must support fileno()
:param size: size of data in bytes
'''
size_line = str(size) + '\n'
self._qubesd_call(
'ImportWithSize', payload=size_line.encode(),
payload_stream=stream)
def clone(self, source):
''' Clone data from sane volume of another VM.

View File

@ -137,7 +137,7 @@ class QubesTest(qubesadmin.app.QubesBase):
def qubesd_call(self, dest, method, arg=None, payload=None,
payload_stream=None):
if payload_stream:
payload = payload_stream.read()
payload = (payload or b'') + payload_stream.read()
call_key = (dest, method, arg, payload)
self.actual_calls.append(call_key)
if call_key not in self.expected_calls:

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
@ -642,6 +642,8 @@ class TC_20_QubesLocal(unittest.TestCase):
qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
self.proc = None
self.app = qubesadmin.app.QubesLocal()
self.tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmpdir)
def listen_and_send(self, send_data):
'''Listen on socket and send data in response.
@ -696,69 +698,58 @@ class TC_20_QubesLocal(unittest.TestCase):
b'dom0\0some.method\0test-vm\0\0')
def test_003_qubesd_call_payload_stream(self):
# this should really be in setUp()...
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
service_path = os.path.join(tmpdir, 'test.service')
payload_input = os.path.join(tmpdir, 'payload-input')
with open(service_path, 'w') as f:
f.write('#!/bin/bash\n'
'env > {dir}/env\n'
'echo "$@" > {dir}/args\n'
'cat > {dir}/payload\n'
'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
os.chmod(service_path, 0o755)
with open(payload_input, 'w+') as payload_file:
payload_file.write('some payload\n')
payload_input = os.path.join(self.tmpdir, 'payload-input')
with open(payload_input, 'w+b') as payload_file:
payload_file.write(b'some payload\n')
payload_file.seek(0)
with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
tmpdir):
value = self.app.qubesd_call('test-vm', 'test.service',
'some-arg', payload_stream=payload_file)
self.assertEqual(value, b'return-value')
self.assertTrue(os.path.exists(tmpdir + '/env'))
with open(tmpdir + '/env') as env:
self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
self.assertTrue(os.path.exists(tmpdir + '/args'))
with open(tmpdir + '/args') as args:
self.assertEqual(args.read(), 'some-arg\n')
self.assertTrue(os.path.exists(tmpdir + '/payload'))
with open(tmpdir + '/payload') as payload:
self.assertEqual(payload.read(), 'some payload\n')
self._call_test_service_with_payload_stream(
payload_file, expected=b'some payload\n')
def test_004_qubesd_call_payload_stream_proc(self):
# this should really be in setUp()...
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
service_path = os.path.join(tmpdir, 'test.service')
service_path = os.path.join(self.tmpdir, 'test.service')
echo = subprocess.Popen(['echo', 'some payload'],
stdout=subprocess.PIPE)
self._call_test_service_with_payload_stream(
echo.stdout, expected=b'some payload\n')
def test_005_qubesd_call_payload_stream_with_prefix(self):
payload_input = os.path.join(self.tmpdir, 'payload-input')
with open(payload_input, 'w+b') as payload_file:
payload_file.write(b'some payload\n')
payload_file.seek(0)
self._call_test_service_with_payload_stream(
payload_file, payload=b'first line\n',
expected=b'first line\nsome payload\n')
def _call_test_service_with_payload_stream(
self, payload_stream, payload=None, expected=b''):
service_path = os.path.join(self.tmpdir, 'test.service')
with open(service_path, 'w') as f:
f.write('#!/bin/bash\n'
'env > {dir}/env\n'
'echo "$@" > {dir}/args\n'
'cat > {dir}/payload\n'
'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
'echo -en \'0\\0return-value\'\n'.format(dir=self.tmpdir))
os.chmod(service_path, 0o755)
with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
tmpdir):
value = self.app.qubesd_call('test-vm', 'test.service',
'some-arg', payload_stream=echo.stdout)
echo.stdout.close()
self.tmpdir):
value = self.app.qubesd_call(
'test-vm', 'test.service',
'some-arg', payload=payload, payload_stream=payload_stream)
self.assertEqual(value, b'return-value')
self.assertTrue(os.path.exists(tmpdir + '/env'))
with open(tmpdir + '/env') as env:
self.assertTrue(os.path.exists(self.tmpdir + '/env'))
with open(self.tmpdir + '/env') as env:
self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
self.assertTrue(os.path.exists(tmpdir + '/args'))
with open(tmpdir + '/args') as args:
self.assertTrue(os.path.exists(self.tmpdir + '/args'))
with open(self.tmpdir + '/args') as args:
self.assertEqual(args.read(), 'some-arg\n')
self.assertTrue(os.path.exists(tmpdir + '/payload'))
with open(tmpdir + '/payload') as payload:
self.assertEqual(payload.read(), 'some payload\n')
self.assertTrue(os.path.exists(self.tmpdir + '/payload'))
with open(self.tmpdir + '/payload', 'rb') as payload_f:
self.assertEqual(payload_f.read(), expected)
@mock.patch('os.isatty', lambda fd: fd == 2)
def test_010_run_service(self):
@ -867,18 +858,43 @@ class TC_30_QubesRemote(unittest.TestCase):
self.addCleanup(shutil.rmtree, tmpdir)
payload_input = os.path.join(tmpdir, 'payload-input')
with open(payload_input, 'w+') as payload_file:
payload_file.write('some payload\n')
with open(payload_input, 'w+b') as payload_file:
payload_file.write(b'some payload\n')
payload_file.seek(0)
value = self.app.qubesd_call('test-vm', 'some.method',
'some-arg', payload_stream=payload_file)
self.assertEqual(self.proc_mock.mock_calls, [
self.assertListEqual(self.proc_mock.mock_calls, [
mock.call(
[qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
'some.method+some-arg'],
stdin=payload_file,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE),
mock.call().communicate()
])
self.assertEqual(value, b'return-value')
def test_004_qubesd_call_payload_stream_with_prefix(self):
self.set_proc_stdout(b'0\0return-value')
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
payload_input = os.path.join(tmpdir, 'payload-input')
with open(payload_input, 'w+b') as payload_file:
payload_file.write(b'some payload\n')
payload_file.seek(0)
value = self.app.qubesd_call('test-vm', 'some.method',
'some-arg', payload=b'first line\n', payload_stream=payload_file)
self.assertListEqual(self.proc_mock.mock_calls, [
mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
'some.method+some-arg'],
stdin=payload_file, stdout=subprocess.PIPE,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE),
mock.call().communicate(None)
mock.call().stdin.write(b'first line\n'),
mock.call().stdin.write(b'some payload\n'),
mock.call().communicate()
])
self.assertEqual(value, b'return-value')

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
@ -508,10 +508,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase):
b'revisions_to_keep=0\n' \
b'is_outdated=False\n'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Resize', 'private', b'9')] = \
b'0\x00'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \
('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \
b'0\x00'
with tempfile.NamedTemporaryFile() as input_file:
input_file.write(b'test-data')
@ -541,10 +538,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase):
b'revisions_to_keep=0\n' \
b'is_outdated=False\n'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Resize', 'private', b'9')] = \
b'0\x00'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \
('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \
b'0\x00'
with tempfile.NamedTemporaryFile() as input_file:
input_file.write(b'test-data')
@ -575,10 +569,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase):
b'revisions_to_keep=0\n' \
b'is_outdated=False\n'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Resize', 'private', b'512')] = \
b'0\x00'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \
('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'512\ntest-data')] = \
b'0\x00'
with tempfile.NamedTemporaryFile() as input_file:
input_file.write(b'test-data')
@ -638,7 +629,7 @@ class TC_00_qvm_volume(qubesadmin.tests.QubesTestCase):
b'revisions_to_keep=0\n' \
b'is_outdated=False\n'
self.app.expected_calls[
('testvm', 'admin.vm.volume.Import', 'private', b'test-data')] = \
('testvm', 'admin.vm.volume.ImportWithSize', 'private', b'9\ntest-data')] = \
b'0\x00'
with tempfile.NamedTemporaryFile() as input_file:
input_file.write(b'test-data')

View File

@ -1,4 +1,4 @@
# -*- encoding: utf8 -*-
# -*- encoding: utf-8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#

View File

@ -144,7 +144,9 @@ def import_volume(args):
else:
input_file = open(input_path, 'rb')
try:
if not args.no_resize:
if args.no_resize:
volume.import_data(stream=input_file)
else:
if args.size:
size = args.size
else:
@ -155,11 +157,7 @@ def import_volume(args):
'Failed to get %s file size, '
'specify it explicitly with --size, '
'or use --no-resize option', str(e))
if size > old_size:
volume.resize(size)
volume.import_data(stream=input_file)
if not args.no_resize and size < old_size:
volume.resize(size)
volume.import_data_with_size(stream=input_file, size=size)
finally:
if input_path != '-':
input_file.close()