qvm_backup.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import io
  21. import os
  22. import unittest.mock as mock
  23. import asyncio
  24. import qubesadmin.tests
  25. import qubesadmin.tests.tools
  26. import qubesadmin.tools.qvm_backup as qvm_backup
  27. class TC_00_qvm_backup(qubesadmin.tests.QubesTestCase):
  28. def test_000_write_backup_profile(self):
  29. args = qvm_backup.parser.parse_args(['/var/tmp'], app=self.app)
  30. profile = io.StringIO()
  31. qvm_backup.write_backup_profile(profile, args)
  32. expected_profile = (
  33. '{compression: true, destination_path: /var/tmp, '
  34. 'destination_vm: dom0, include: null}\n'
  35. )
  36. self.assertEqual(profile.getvalue(), expected_profile)
  37. def test_001_write_backup_profile_include(self):
  38. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  39. b'0\0dom0 class=AdminVM state=Running\n' \
  40. b'vm1 class=AppVM state=Halted\n' \
  41. b'vm2 class=AppVM state=Halted\n' \
  42. b'vm3 class=AppVM state=Halted\n'
  43. args = qvm_backup.parser.parse_args(['/var/tmp', 'vm1', 'vm2'],
  44. app=self.app)
  45. profile = io.StringIO()
  46. qvm_backup.write_backup_profile(profile, args)
  47. expected_profile = (
  48. 'compression: true\n'
  49. 'destination_path: /var/tmp\n'
  50. 'destination_vm: dom0\n'
  51. 'include: [vm1, vm2]\n'
  52. )
  53. self.assertEqual(profile.getvalue(), expected_profile)
  54. self.assertAllCalled()
  55. def test_002_write_backup_profile_exclude(self):
  56. args = qvm_backup.parser.parse_args([
  57. '-x', 'vm1', '-x', 'vm2', '/var/tmp'], app=self.app)
  58. profile = io.StringIO()
  59. qvm_backup.write_backup_profile(profile, args)
  60. expected_profile = (
  61. 'compression: true\n'
  62. 'destination_path: /var/tmp\n'
  63. 'destination_vm: dom0\n'
  64. 'exclude: [vm1, vm2]\n'
  65. 'include: null\n'
  66. )
  67. self.assertEqual(profile.getvalue(), expected_profile)
  68. def test_003_write_backup_with_passphrase(self):
  69. args = qvm_backup.parser.parse_args(['/var/tmp'], app=self.app)
  70. profile = io.StringIO()
  71. qvm_backup.write_backup_profile(profile, args, passphrase='test123')
  72. expected_profile = (
  73. '{compression: true, destination_path: /var/tmp, '
  74. 'destination_vm: dom0, include: null,\n'
  75. ' passphrase_text: test123}\n'
  76. )
  77. self.assertEqual(profile.getvalue(), expected_profile)
  78. def test_004_write_backup_profile_no_compress(self):
  79. args = qvm_backup.parser.parse_args(['--no-compress', '/var/tmp'],
  80. app=self.app)
  81. profile = io.StringIO()
  82. qvm_backup.write_backup_profile(profile, args)
  83. expected_profile = (
  84. '{compression: false, destination_path: /var/tmp, '
  85. 'destination_vm: dom0, include: null}\n'
  86. )
  87. self.assertEqual(profile.getvalue(), expected_profile)
  88. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  89. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  90. @mock.patch('getpass.getpass')
  91. def test_010_main_save_profile_cancel(self, mock_getpass, mock_input):
  92. asyncio.set_event_loop(asyncio.new_event_loop())
  93. mock_input.return_value = 'n'
  94. mock_getpass.return_value = 'some password'
  95. self.app.qubesd_connection_type = 'socket'
  96. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  97. None)] = \
  98. b'0\0backup summary'
  99. profile_path = '/tmp/test-profile.conf'
  100. try:
  101. qvm_backup.main(['--save-profile', 'test-profile', '/var/tmp'],
  102. app=self.app)
  103. expected_profile = (
  104. '{compression: true, destination_path: /var/tmp, '
  105. 'destination_vm: dom0, include: null}\n'
  106. )
  107. with open(profile_path) as f:
  108. self.assertEqual(expected_profile, f.read())
  109. finally:
  110. os.unlink(profile_path)
  111. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  112. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  113. @mock.patch('getpass.getpass')
  114. def test_011_main_save_profile_confirm(self, mock_getpass, mock_input):
  115. asyncio.set_event_loop(asyncio.new_event_loop())
  116. mock_input.return_value = 'y'
  117. mock_getpass.return_value = 'some password'
  118. self.app.qubesd_connection_type = 'socket'
  119. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  120. None)] = \
  121. b'0\0backup summary'
  122. self.app.expected_calls[('dom0', 'admin.backup.Execute', 'test-profile',
  123. None)] = \
  124. b'0\0'
  125. profile_path = '/tmp/test-profile.conf'
  126. try:
  127. qvm_backup.main(['--save-profile', 'test-profile', '/var/tmp'],
  128. app=self.app)
  129. expected_profile = (
  130. '{compression: true, destination_path: /var/tmp, '
  131. 'destination_vm: dom0, include: null,\n'
  132. ' passphrase_text: some password}\n'
  133. )
  134. with open(profile_path) as f:
  135. self.assertEqual(expected_profile, f.read())
  136. finally:
  137. os.unlink(profile_path)
  138. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  139. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  140. @mock.patch('getpass.getpass')
  141. def test_012_main_existing_profile(self, mock_getpass, mock_input):
  142. asyncio.set_event_loop(asyncio.new_event_loop())
  143. mock_input.return_value = 'y'
  144. mock_getpass.return_value = 'some password'
  145. self.app.qubesd_connection_type = 'socket'
  146. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  147. None)] = \
  148. b'0\0backup summary'
  149. self.app.expected_calls[('dom0', 'admin.backup.Execute', 'test-profile',
  150. None)] = \
  151. b'0\0'
  152. self.app.expected_calls[('dom0', 'admin.Events', None,
  153. None)] = \
  154. b'0\0'
  155. try:
  156. patch = mock.patch(
  157. 'qubesadmin.events.EventsDispatcher._get_events_reader')
  158. mock_events = patch.start()
  159. self.addCleanup(patch.stop)
  160. mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([
  161. b'1\0\0connection-established\0\0',
  162. b'1\0\0backup-progress\0backup_profile\0test-profile\0progress\x000'
  163. b'.25\0\0',
  164. ])
  165. except ImportError:
  166. pass
  167. qvm_backup.main(['--profile', 'test-profile'],
  168. app=self.app)
  169. self.assertFalse(os.path.exists('/tmp/test-profile.conf'))
  170. self.assertFalse(mock_getpass.called)
  171. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  172. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  173. @mock.patch('getpass.getpass')
  174. def test_013_main_new_profile_vm(self, mock_getpass, mock_input):
  175. asyncio.set_event_loop(asyncio.new_event_loop())
  176. mock_input.return_value = 'y'
  177. mock_getpass.return_value = 'some password'
  178. self.app.qubesd_connection_type = 'qrexec'
  179. with qubesadmin.tests.tools.StdoutBuffer() as stdout:
  180. qvm_backup.main(['-x', 'vm1', '/var/tmp'],
  181. app=self.app)
  182. expected_output = (
  183. 'To perform the backup according to selected options, create '
  184. 'backup profile (/tmp/profile_name.conf) in dom0 with following '
  185. 'content:\n'
  186. 'compression: true\n'
  187. 'destination_path: /var/tmp\n'
  188. 'destination_vm: dom0\n'
  189. 'exclude: [vm1]\n'
  190. 'include: null\n'
  191. '# specify backup passphrase below\n'
  192. 'passphrase_text: ...\n'
  193. )
  194. self.assertEqual(stdout.getvalue(), expected_output)
  195. @mock.patch('qubesadmin.tools.qvm_backup.backup_profile_dir', '/tmp')
  196. @mock.patch('qubesadmin.tools.qvm_backup.input', create=True)
  197. @mock.patch('getpass.getpass')
  198. def test_014_main_passphrase_file(self, mock_getpass, mock_input):
  199. asyncio.set_event_loop(asyncio.new_event_loop())
  200. mock_input.return_value = 'y'
  201. mock_getpass.return_value = 'some password'
  202. self.app.qubesd_connection_type = 'socket'
  203. self.app.expected_calls[('dom0', 'admin.backup.Info', 'test-profile',
  204. None)] = \
  205. b'0\0backup summary'
  206. self.app.expected_calls[('dom0', 'admin.backup.Execute', 'test-profile',
  207. None)] = \
  208. b'0\0'
  209. profile_path = '/tmp/test-profile.conf'
  210. try:
  211. stdin = io.StringIO()
  212. stdin.write('other passphrase\n')
  213. stdin.seek(0)
  214. with mock.patch('sys.stdin', stdin):
  215. qvm_backup.main(['--passphrase-file', '-', '--save-profile',
  216. 'test-profile', '/var/tmp'],
  217. app=self.app)
  218. expected_profile = (
  219. '{compression: true, destination_path: /var/tmp, '
  220. 'destination_vm: dom0, include: null,\n'
  221. ' passphrase_text: other passphrase}\n'
  222. )
  223. with open(profile_path) as f:
  224. self.assertEqual(expected_profile, f.read())
  225. finally:
  226. os.unlink(profile_path)