qvm_backup.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import os
  22. import unittest.mock as mock
  23. import asyncio
  24. import asynctest
  25. import qubesadmin.tests
  26. import qubesadmin.tests.tools
  27. import qubesadmin.tools.qvm_backup as qvm_backup
  28. class TC_00_qvm_backup(qubesadmin.tests.QubesTestCase):
  29. def test_000_write_backup_profile(self):
  30. args = qvm_backup.parser.parse_args(['/var/tmp'], app=self.app)
  31. profile = io.StringIO()
  32. qvm_backup.write_backup_profile(profile, args)
  33. expected_profile = (
  34. 'compression: true\n'
  35. 'destination_path: /var/tmp\n'
  36. 'destination_vm: dom0\n'
  37. 'include: null\n'
  38. )
  39. self.assertEqual(profile.getvalue(), expected_profile)
  40. def test_001_write_backup_profile_include(self):
  41. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  42. b'0\0dom0 class=AdminVM state=Running\n' \
  43. b'vm1 class=AppVM state=Halted\n' \
  44. b'vm2 class=AppVM state=Halted\n' \
  45. b'vm3 class=AppVM state=Halted\n'
  46. args = qvm_backup.parser.parse_args(['/var/tmp', 'vm1', 'vm2'],
  47. app=self.app)
  48. profile = io.StringIO()
  49. qvm_backup.write_backup_profile(profile, args)
  50. expected_profile = (
  51. 'compression: true\n'
  52. 'destination_path: /var/tmp\n'
  53. 'destination_vm: dom0\n'
  54. 'include:\n'
  55. '- vm1\n'
  56. '- vm2\n'
  57. )
  58. self.assertEqual(profile.getvalue(), expected_profile)
  59. self.assertAllCalled()
  60. def test_002_write_backup_profile_exclude(self):
  61. args = qvm_backup.parser.parse_args([
  62. '-x', 'vm1', '-x', 'vm2', '/var/tmp'], app=self.app)
  63. profile = io.StringIO()
  64. qvm_backup.write_backup_profile(profile, args)
  65. expected_profile = (
  66. 'compression: true\n'
  67. 'destination_path: /var/tmp\n'
  68. 'destination_vm: dom0\n'
  69. 'exclude:\n'
  70. '- vm1\n'
  71. '- vm2\n'
  72. 'include: null\n'
  73. )
  74. self.assertEqual(profile.getvalue(), expected_profile)
  75. def test_003_write_backup_with_passphrase(self):
  76. args = qvm_backup.parser.parse_args(['/var/tmp'], app=self.app)
  77. profile = io.StringIO()
  78. qvm_backup.write_backup_profile(profile, args, passphrase='test123')
  79. expected_profile = (
  80. 'compression: true\n'
  81. 'destination_path: /var/tmp\n'
  82. 'destination_vm: dom0\n'
  83. 'include: null\n'
  84. 'passphrase_text: test123\n'
  85. )
  86. self.assertEqual(profile.getvalue(), expected_profile)
  87. def test_004_write_backup_profile_no_compress(self):
  88. args = qvm_backup.parser.parse_args(['--no-compress', '/var/tmp'],
  89. app=self.app)
  90. profile = io.StringIO()
  91. qvm_backup.write_backup_profile(profile, args)
  92. expected_profile = (
  93. 'compression: false\n'
  94. 'destination_path: /var/tmp\n'
  95. 'destination_vm: dom0\n'
  96. 'include: null\n'
  97. )
  98. self.assertEqual(profile.getvalue(), expected_profile)
  99. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  100. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  101. @mock.patch('getpass.getpass')
  102. def test_010_main_save_profile_cancel(self, mock_getpass, mock_input):
  103. asyncio.set_event_loop(asyncio.new_event_loop())
  104. mock_input.return_value = 'n'
  105. mock_getpass.return_value = 'some password'
  106. self.app.qubesd_connection_type = 'socket'
  107. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  108. None)] = \
  109. b'0\0backup summary'
  110. profile_path = '/tmp/test-profile.conf'
  111. try:
  112. qvm_backup.main(['--save-profile', 'test-profile', '/var/tmp'],
  113. app=self.app)
  114. expected_profile = (
  115. 'compression: true\n'
  116. 'destination_path: /var/tmp\n'
  117. 'destination_vm: dom0\n'
  118. 'include: null\n'
  119. )
  120. with open(profile_path) as f:
  121. self.assertEqual(expected_profile, f.read())
  122. finally:
  123. os.unlink(profile_path)
  124. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  125. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  126. @mock.patch('getpass.getpass')
  127. def test_011_main_save_profile_confirm(self, mock_getpass, mock_input):
  128. asyncio.set_event_loop(asyncio.new_event_loop())
  129. mock_input.return_value = 'y'
  130. mock_getpass.return_value = 'some password'
  131. self.app.qubesd_connection_type = 'socket'
  132. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  133. None)] = \
  134. b'0\0backup summary'
  135. self.app.expected_calls[('dom0', 'admin.backup.Execute', 'test-profile',
  136. None)] = \
  137. b'0\0'
  138. profile_path = '/tmp/test-profile.conf'
  139. try:
  140. qvm_backup.main(['--save-profile', 'test-profile', '/var/tmp'],
  141. app=self.app)
  142. expected_profile = (
  143. 'compression: true\n'
  144. 'destination_path: /var/tmp\n'
  145. 'destination_vm: dom0\n'
  146. 'include: null\n'
  147. 'passphrase_text: some password\n'
  148. )
  149. with open(profile_path) as f:
  150. self.assertEqual(expected_profile, f.read())
  151. finally:
  152. os.unlink(profile_path)
  153. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  154. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  155. @mock.patch('getpass.getpass')
  156. def test_012_main_existing_profile(self, mock_getpass, mock_input):
  157. asyncio.set_event_loop(asyncio.new_event_loop())
  158. mock_input.return_value = 'y'
  159. mock_getpass.return_value = 'some password'
  160. self.app.qubesd_connection_type = 'socket'
  161. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  162. None)] = \
  163. b'0\0backup summary'
  164. self.app.expected_calls[('dom0', 'admin.backup.Execute', 'test-profile',
  165. None)] = \
  166. b'0\0'
  167. self.app.expected_calls[('dom0', 'admin.Events', None,
  168. None)] = \
  169. b'0\0'
  170. try:
  171. mock_events = asynctest.CoroutineMock()
  172. patch = mock.patch(
  173. 'qubesadmin.events.EventsDispatcher._get_events_reader',
  174. mock_events)
  175. patch.start()
  176. self.addCleanup(patch.stop)
  177. mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([
  178. b'1\0\0connection-established\0\0',
  179. b'1\0\0backup-progress\0backup_profile\0test-profile\0progress\x000'
  180. b'.25\0\0',
  181. ])
  182. except ImportError:
  183. pass
  184. qvm_backup.main(['--profile', 'test-profile'],
  185. app=self.app)
  186. self.assertFalse(os.path.exists('/tmp/test-profile.conf'))
  187. self.assertFalse(mock_getpass.called)
  188. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  189. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  190. @mock.patch('getpass.getpass')
  191. def test_013_main_new_profile_vm(self, mock_getpass, mock_input):
  192. asyncio.set_event_loop(asyncio.new_event_loop())
  193. mock_input.return_value = 'y'
  194. mock_getpass.return_value = 'some password'
  195. self.app.qubesd_connection_type = 'qrexec'
  196. with qubesadmin.tests.tools.StdoutBuffer() as stdout:
  197. qvm_backup.main(['-x', 'vm1', '/var/tmp'],
  198. app=self.app)
  199. expected_output = (
  200. 'To perform the backup according to selected options, create '
  201. 'backup profile (/tmp/profile_name.conf) in dom0 with following '
  202. 'content:\n'
  203. 'compression: true\n'
  204. 'destination_path: /var/tmp\n'
  205. 'destination_vm: dom0\n'
  206. 'exclude:\n'
  207. '- vm1\n'
  208. 'include: null\n'
  209. '# specify backup passphrase below\n'
  210. 'passphrase_text: ...\n'
  211. )
  212. self.assertEqual(stdout.getvalue(), expected_output)
  213. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  214. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  215. @mock.patch('getpass.getpass')
  216. def test_014_main_passphrase_file(self, mock_getpass, mock_input):
  217. asyncio.set_event_loop(asyncio.new_event_loop())
  218. mock_input.return_value = 'y'
  219. mock_getpass.return_value = 'some password'
  220. self.app.qubesd_connection_type = 'socket'
  221. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  222. None)] = \
  223. b'0\0backup summary'
  224. self.app.expected_calls[('dom0', 'admin.backup.Execute', 'test-profile',
  225. None)] = \
  226. b'0\0'
  227. profile_path = '/tmp/test-profile.conf'
  228. try:
  229. stdin = io.StringIO()
  230. stdin.write('other passphrase\n')
  231. stdin.seek(0)
  232. with mock.patch('sys.stdin', stdin):
  233. qvm_backup.main(['--passphrase-file', '-', '--save-profile',
  234. 'test-profile', '/var/tmp'],
  235. app=self.app)
  236. expected_profile = (
  237. 'compression: true\n'
  238. 'destination_path: /var/tmp\n'
  239. 'destination_vm: dom0\n'
  240. 'include: null\n'
  241. 'passphrase_text: other passphrase\n'
  242. )
  243. with open(profile_path) as f:
  244. self.assertEqual(expected_profile, f.read())
  245. finally:
  246. os.unlink(profile_path)