181 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			181 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | |
| #
 | |
| # The Qubes OS Project, https://www.qubes-os.org/
 | |
| #
 | |
| # Copyright (C) 2020 Paweł Marczewski <pawel@invisiblethingslab.com>
 | |
| #
 | |
| # This library is free software; you can redistribute it and/or
 | |
| # modify it under the terms of the GNU Lesser General Public
 | |
| # License as published by the Free Software Foundation; either
 | |
| # version 2.1 of the License, or (at your option) any later version.
 | |
| #
 | |
| # This library is distributed in the hope that it will be useful,
 | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | |
| # Lesser General Public License for more details.
 | |
| #
 | |
| # You should have received a copy of the GNU Lesser General Public
 | |
| # License along with this library; if not, see <https://www.gnu.org/licenses/>.
 | |
| #
 | |
| 
 | |
| import unittest
 | |
| import tempfile
 | |
| import shutil
 | |
| import os
 | |
| import subprocess
 | |
| 
 | |
| 
 | |
| import qubes.tests
 | |
| 
 | |
| 
 | |
| class TestRpcImport(qubes.tests.QubesTestCase):
 | |
|     '''
 | |
|     Tests for qubes-rpc/admin.vm.volume.Import script.
 | |
| 
 | |
|     It is a shell script that calls internal API methods via qubesd-query.
 | |
|     These tests mock all the calls.
 | |
|     '''
 | |
| 
 | |
| 
 | |
|     QUBESD_QUERY = '''\
 | |
| #!/bin/sh -e
 | |
| 
 | |
| method=$4
 | |
| echo "$@" > command-$method
 | |
| cat > payload-$method
 | |
| cat response-$method
 | |
| '''
 | |
| 
 | |
|     RPC_FILE_PATH = os.path.abspath(os.path.join(
 | |
|         os.path.dirname(__file__),
 | |
|         '../../qubes-rpc/admin.vm.volume.Import'))
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.tmpdir = tempfile.mkdtemp()
 | |
|         self.addCleanup(shutil.rmtree, self.tmpdir)
 | |
|         with open(os.path.join(self.tmpdir, 'qubesd-query'), 'w') \
 | |
|              as qubesd_query_f:
 | |
|             qubesd_query_f.write(self.QUBESD_QUERY)
 | |
|         os.chmod(os.path.join(self.tmpdir, 'qubesd-query'), 0o700)
 | |
| 
 | |
|         shutil.copy(
 | |
|             self.RPC_FILE_PATH,
 | |
|             os.path.join(self.tmpdir, 'admin.vm.volume.Import'))
 | |
|         shutil.copy(
 | |
|             self.RPC_FILE_PATH,
 | |
|             os.path.join(self.tmpdir, 'admin.vm.volume.ImportWithSize'))
 | |
| 
 | |
|     # pylint:disable=invalid-name
 | |
|     def mockMethod(self, method, response):
 | |
|         with open(os.path.join(self.tmpdir, 'response-' + method), 'wb') \
 | |
|              as response_f:
 | |
|             response_f.write(response)
 | |
| 
 | |
|     # pylint:disable=invalid-name
 | |
|     def assertMethodCalled(self, method, arg, expected_payload=b''):
 | |
|         try:
 | |
|             with open(os.path.join(self.tmpdir, 'command-' + method), 'rb') \
 | |
|                  as command_f:
 | |
|                 command = command_f.read()
 | |
|             with open(os.path.join(self.tmpdir, 'payload-' + method), 'rb') \
 | |
|                  as payload_f:
 | |
|                 payload = payload_f.read()
 | |
|         except FileNotFoundError:
 | |
|             self.fail('{} was not called'.format(method))
 | |
| 
 | |
|         self.assertListEqual(command.decode().split(), [
 | |
|             '-c', '/var/run/qubesd.internal.sock',
 | |
|             'remote', method, 'target', arg
 | |
|         ])
 | |
|         self.assertEqual(payload, expected_payload)
 | |
| 
 | |
|     # pylint:disable=invalid-name
 | |
|     def assertFileData(self, path, expected_data):
 | |
|         with open(path, 'rb') as data_f:
 | |
|             data = data_f.read()
 | |
|         self.assertEquals(data, expected_data)
 | |
| 
 | |
|     def setup_import(self, size):
 | |
|         self.target = os.path.join(self.tmpdir, 'target')
 | |
|         os.mknod(self.target)
 | |
| 
 | |
|         self.mockMethod(
 | |
|             'internal.vm.volume.ImportBegin',
 | |
|             '\x30\x00{} {}'.format(size, self.target).encode())
 | |
| 
 | |
|         self.mockMethod(
 | |
|             'internal.vm.volume.ImportEnd',
 | |
|             b'\x30\x00import-end')
 | |
| 
 | |
|     def run_rpc(self, command, arg, data):
 | |
|         with open(os.path.join(self.tmpdir, 'data'), 'w+b') as data_f:
 | |
|             data_f.write(data)
 | |
|             data_f.seek(0)
 | |
|             env = {
 | |
|                 'PATH': self.tmpdir + ':' + os.getenv('PATH'),
 | |
|                 'QREXEC_REMOTE_DOMAIN': 'remote',
 | |
|                 'QREXEC_REQUESTED_TARGET': 'target',
 | |
|             }
 | |
|             output = subprocess.check_output(
 | |
|                 [command, arg],
 | |
|                 env=env,
 | |
|                 cwd=self.tmpdir,
 | |
|                 stdin=data_f
 | |
|             )
 | |
| 
 | |
|         self.assertEqual(output, b'\x30\x00import-end')
 | |
| 
 | |
|     def test_00_import(self):
 | |
|         data = b'abcd' * 1024
 | |
|         size = len(data)
 | |
|         self.setup_import(size)
 | |
| 
 | |
|         self.run_rpc('admin.vm.volume.Import', 'volume', data)
 | |
| 
 | |
|         self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume')
 | |
|         self.assertMethodCalled('internal.vm.volume.ImportEnd', 'volume',
 | |
|                                 b'ok')
 | |
|         self.assertFileData(self.target, data)
 | |
| 
 | |
|     def test_01_import_with_size(self):
 | |
|         data = b'abcd' * 1024
 | |
|         size = len(data)
 | |
|         self.setup_import(size)
 | |
| 
 | |
|         self.run_rpc('admin.vm.volume.ImportWithSize', 'volume',
 | |
|                      str(size).encode() + b'\n' + data)
 | |
| 
 | |
|         self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume',
 | |
|                                 str(size).encode())
 | |
|         self.assertMethodCalled('internal.vm.volume.ImportEnd', 'volume',
 | |
|                                 b'ok')
 | |
|         self.assertFileData(self.target, data)
 | |
| 
 | |
|     def test_02_import_not_enough_data(self):
 | |
|         data = b'abcd' * 1024
 | |
|         size = len(data) + 1
 | |
|         self.setup_import(size)
 | |
| 
 | |
|         self.run_rpc('admin.vm.volume.Import', 'volume', data)
 | |
| 
 | |
|         self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume')
 | |
|         self.assertMethodCalled(
 | |
|             'internal.vm.volume.ImportEnd', 'volume',
 | |
|             b'fail\n' +
 | |
|             ('not enough data (copied {} bytes, expected {} bytes)'
 | |
|              .format(len(data), size).encode()))
 | |
| 
 | |
|     def test_03_import_too_much_data(self):
 | |
|         data = b'abcd' * 1024
 | |
|         size = len(data) - 1
 | |
|         self.setup_import(size)
 | |
| 
 | |
|         output = self.run_rpc('admin.vm.volume.Import', 'volume', data)
 | |
| 
 | |
|         self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume')
 | |
|         self.assertMethodCalled(
 | |
|             'internal.vm.volume.ImportEnd', 'volume',
 | |
|             b'fail\n' +
 | |
|             ('too much data (expected {} bytes)'
 | |
|              .format(size).encode()))
 | 
