rpc_import.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2020 Paweł Marczewski <pawel@invisiblethingslab.com>
  5. #
  6. # This library is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU Lesser General Public
  8. # License as published by the Free Software Foundation; either
  9. # version 2.1 of the License, or (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. # Lesser General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Lesser General Public
  17. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  18. #
  19. import unittest
  20. import tempfile
  21. import shutil
  22. import os
  23. import subprocess
  24. import qubes.tests
  25. class TestRpcImport(qubes.tests.QubesTestCase):
  26. '''
  27. Tests for qubes-rpc/admin.vm.volume.Import script.
  28. It is a shell script that calls internal API methods via qubesd-query.
  29. These tests mock all the calls.
  30. '''
  31. QUBESD_QUERY = '''\
  32. #!/bin/sh -e
  33. method=$4
  34. echo "$@" > command-$method
  35. cat > payload-$method
  36. cat response-$method
  37. '''
  38. RPC_FILE_PATH = os.path.abspath(os.path.join(
  39. os.path.dirname(__file__),
  40. '../../qubes-rpc/admin.vm.volume.Import'))
  41. def setUp(self):
  42. self.tmpdir = tempfile.mkdtemp()
  43. self.addCleanup(shutil.rmtree, self.tmpdir)
  44. with open(os.path.join(self.tmpdir, 'qubesd-query'), 'w') \
  45. as qubesd_query_f:
  46. qubesd_query_f.write(self.QUBESD_QUERY)
  47. os.chmod(os.path.join(self.tmpdir, 'qubesd-query'), 0o700)
  48. shutil.copy(
  49. self.RPC_FILE_PATH,
  50. os.path.join(self.tmpdir, 'admin.vm.volume.Import'))
  51. shutil.copy(
  52. self.RPC_FILE_PATH,
  53. os.path.join(self.tmpdir, 'admin.vm.volume.ImportWithSize'))
  54. # pylint:disable=invalid-name
  55. def mockMethod(self, method, response):
  56. with open(os.path.join(self.tmpdir, 'response-' + method), 'wb') \
  57. as response_f:
  58. response_f.write(response)
  59. # pylint:disable=invalid-name
  60. def assertMethodCalled(self, method, arg, expected_payload=b''):
  61. try:
  62. with open(os.path.join(self.tmpdir, 'command-' + method), 'rb') \
  63. as command_f:
  64. command = command_f.read()
  65. with open(os.path.join(self.tmpdir, 'payload-' + method), 'rb') \
  66. as payload_f:
  67. payload = payload_f.read()
  68. except FileNotFoundError:
  69. self.fail('{} was not called'.format(method))
  70. self.assertListEqual(command.decode().split(), [
  71. '-c', '/var/run/qubesd.internal.sock',
  72. 'remote', method, 'target', arg
  73. ])
  74. self.assertEqual(payload, expected_payload)
  75. # pylint:disable=invalid-name
  76. def assertFileData(self, path, expected_data):
  77. with open(path, 'rb') as data_f:
  78. data = data_f.read()
  79. self.assertEquals(data, expected_data)
  80. def setup_import(self, size):
  81. self.target = os.path.join(self.tmpdir, 'target')
  82. os.mknod(self.target)
  83. self.mockMethod(
  84. 'internal.vm.volume.ImportBegin',
  85. '\x30\x00{} {}'.format(size, self.target).encode())
  86. self.mockMethod(
  87. 'internal.vm.volume.ImportEnd',
  88. b'\x30\x00import-end')
  89. def run_rpc(self, command, arg, data):
  90. with open(os.path.join(self.tmpdir, 'data'), 'w+b') as data_f:
  91. data_f.write(data)
  92. data_f.seek(0)
  93. env = {
  94. 'PATH': self.tmpdir + ':' + os.getenv('PATH'),
  95. 'QREXEC_REMOTE_DOMAIN': 'remote',
  96. 'QREXEC_REQUESTED_TARGET': 'target',
  97. }
  98. output = subprocess.check_output(
  99. [command, arg],
  100. env=env,
  101. cwd=self.tmpdir,
  102. stdin=data_f
  103. )
  104. self.assertEqual(output, b'\x30\x00import-end')
  105. def test_00_import(self):
  106. data = b'abcd' * 1024
  107. size = len(data)
  108. self.setup_import(size)
  109. self.run_rpc('admin.vm.volume.Import', 'volume', data)
  110. self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume')
  111. self.assertMethodCalled('internal.vm.volume.ImportEnd', 'volume',
  112. b'ok')
  113. self.assertFileData(self.target, data)
  114. def test_01_import_with_size(self):
  115. data = b'abcd' * 1024
  116. size = len(data)
  117. self.setup_import(size)
  118. self.run_rpc('admin.vm.volume.ImportWithSize', 'volume',
  119. str(size).encode() + b'\n' + data)
  120. self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume',
  121. str(size).encode())
  122. self.assertMethodCalled('internal.vm.volume.ImportEnd', 'volume',
  123. b'ok')
  124. self.assertFileData(self.target, data)
  125. def test_02_import_not_enough_data(self):
  126. data = b'abcd' * 1024
  127. size = len(data) + 1
  128. self.setup_import(size)
  129. self.run_rpc('admin.vm.volume.Import', 'volume', data)
  130. self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume')
  131. self.assertMethodCalled(
  132. 'internal.vm.volume.ImportEnd', 'volume',
  133. b'fail\n' +
  134. ('not enough data (copied {} bytes, expected {} bytes)'
  135. .format(len(data), size).encode()))
  136. def test_03_import_too_much_data(self):
  137. data = b'abcd' * 1024
  138. size = len(data) - 1
  139. self.setup_import(size)
  140. output = self.run_rpc('admin.vm.volume.Import', 'volume', data)
  141. self.assertMethodCalled('internal.vm.volume.ImportBegin', 'volume')
  142. self.assertMethodCalled(
  143. 'internal.vm.volume.ImportEnd', 'volume',
  144. b'fail\n' +
  145. ('too much data (expected {} bytes)'
  146. .format(size).encode()))