dispvm.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2019 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import datetime
  21. import tempfile
  22. import unittest
  23. import unittest.mock
  24. from unittest.mock import call
  25. import subprocess
  26. import qubesadmin.tests
  27. from qubesadmin.tools import qvm_backup_restore
  28. from qubesadmin.backup.dispvm import RestoreInDisposableVM
  29. class TC_00_RestoreInDispVM(qubesadmin.tests.QubesTestCase):
  30. def setUp(self):
  31. super().setUp()
  32. def test_000_prepare_inner_args(self):
  33. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  34. b'0\0dom0 class=AdminVM state=Running\n'
  35. b'fedora-25 class=TemplateVM state=Halted\n'
  36. b'testvm class=AppVM state=Running\n'
  37. )
  38. argv = ['--verbose', '--skip-broken', '--skip-dom0-home',
  39. '--dest-vm', 'testvm',
  40. '--compression-filter', 'gzip', '/backup/location']
  41. args = qvm_backup_restore.parser.parse_args(argv)
  42. obj = RestoreInDisposableVM(self.app, args)
  43. obj.storage_access_id = 'abc'
  44. reconstructed_argv = obj.prepare_inner_args()
  45. expected_argv = argv[:-1] + \
  46. ['--location-is-service', 'qubes.RestoreById+abc']
  47. self.assertCountEqual(expected_argv, reconstructed_argv)
  48. def test_001_prepare_inner_args_exclude(self):
  49. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  50. b'0\0dom0 class=AdminVM state=Running\n'
  51. b'fedora-25 class=TemplateVM state=Halted\n'
  52. b'testvm class=AppVM state=Running\n'
  53. )
  54. argv = ['--exclude', 'vm1', '--exclude', 'vm2',
  55. '/backup/location']
  56. args = qvm_backup_restore.parser.parse_args(argv)
  57. print(repr(args))
  58. obj = RestoreInDisposableVM(self.app, args)
  59. obj.storage_access_id = 'abc'
  60. reconstructed_argv = obj.prepare_inner_args()
  61. expected_argv = argv[:-1] + \
  62. ['--location-is-service', 'qubes.RestoreById+abc']
  63. self.assertCountEqual(expected_argv, reconstructed_argv)
  64. def test_002_prepare_inner_args_pass_file(self):
  65. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  66. b'0\0dom0 class=AdminVM state=Running\n'
  67. b'fedora-25 class=TemplateVM state=Halted\n'
  68. b'testvm class=AppVM state=Running\n'
  69. )
  70. argv = ['--passphrase-file=/tmp/some/file',
  71. '/backup/location']
  72. args = qvm_backup_restore.parser.parse_args(argv)
  73. print(repr(args))
  74. obj = RestoreInDisposableVM(self.app, args)
  75. obj.storage_access_id = 'abc'
  76. reconstructed_argv = obj.prepare_inner_args()
  77. expected_argv = ['--passphrase-file', '/tmp/some/file',
  78. '--location-is-service', 'qubes.RestoreById+abc']
  79. self.assertEqual(expected_argv, reconstructed_argv)
  80. def test_003_prepare_inner_args_auto_close(self):
  81. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  82. b'0\0dom0 class=AdminVM state=Running\n'
  83. b'fedora-25 class=TemplateVM state=Halted\n'
  84. b'testvm class=AppVM state=Running\n'
  85. )
  86. argv = ['--auto-close', '/backup/location']
  87. args = qvm_backup_restore.parser.parse_args(argv)
  88. print(repr(args))
  89. obj = RestoreInDisposableVM(self.app, args)
  90. obj.storage_access_id = 'abc'
  91. reconstructed_argv = obj.prepare_inner_args()
  92. expected_argv = ['--location-is-service', 'qubes.RestoreById+abc']
  93. self.assertEqual(expected_argv, reconstructed_argv)
  94. def test_010_clear_old_tags(self):
  95. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  96. b'0\0dom0 class=AdminVM state=Running\n'
  97. b'fedora-25 class=TemplateVM state=Halted\n'
  98. b'testvm class=AppVM state=Running\n'
  99. )
  100. for tag in ('backup-restore-mgmt',
  101. 'backup-restore-in-progress',
  102. 'backup-restore-storage'):
  103. self.app.expected_calls[
  104. ('dom0', 'admin.vm.tag.Remove', tag, None)] = \
  105. b'2\x00QubesTagNotFoundError\x00\x00Tag not found\x00'
  106. self.app.expected_calls[
  107. ('fedora-25', 'admin.vm.tag.Remove', tag, None)] = b'0\0'
  108. self.app.expected_calls[
  109. ('testvm', 'admin.vm.tag.Remove', tag, None)] = b'0\0'
  110. args = unittest.mock.Mock(appvm='testvm')
  111. obj = RestoreInDisposableVM(self.app, args)
  112. obj.clear_old_tags()
  113. self.assertAllCalled()
  114. @unittest.mock.patch('subprocess.check_call')
  115. def test_020_create_dispvm(self, mock_check_call):
  116. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  117. b'0\0dom0 class=AdminVM state=Running\n'
  118. b'fedora-25 class=TemplateVM state=Halted\n'
  119. b'testvm class=AppVM state=Running\n'
  120. b'mgmt-dvm class=AppVM state=Halted\n'
  121. # this should be only after creating...
  122. b'disp-backup-restore class=DispVM state=Halted\n'
  123. )
  124. self.app.expected_calls[
  125. ('dom0', 'admin.property.Get', 'management_dispvm', None)] = \
  126. b'0\0default=False type=vm mgmt-dvm'
  127. self.app.expected_calls[
  128. ('dom0', 'admin.vm.Create.DispVM', 'mgmt-dvm',
  129. b'name=disp-backup-restore label=red')] = b'0\0'
  130. self.app.expected_calls[
  131. ('disp-backup-restore', 'admin.vm.property.Set', 'auto_cleanup',
  132. b'True')] = \
  133. b'0\0'
  134. self.app.expected_calls[
  135. ('disp-backup-restore', 'admin.vm.feature.Set', 'tag-created-vm-with',
  136. b'backup-restore-in-progress')] = \
  137. b'0\0'
  138. args = unittest.mock.Mock(appvm='dom0')
  139. obj = RestoreInDisposableVM(self.app, args)
  140. obj.create_dispvm()
  141. self.assertAllCalled()
  142. @unittest.mock.patch('subprocess.check_call')
  143. @unittest.mock.patch('os.uname')
  144. def test_030_transfer_pass_file(self, mock_uname, mock_check_call):
  145. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  146. b'0\0dom0 class=AdminVM state=Running\n'
  147. b'fedora-25 class=TemplateVM state=Halted\n'
  148. b'testvm class=AppVM state=Running\n'
  149. )
  150. mock_uname.return_value = ('Linux', 'dom0', '5.0.0', '#1', 'x86_64')
  151. args = unittest.mock.Mock(appvm='testvm')
  152. obj = RestoreInDisposableVM(self.app, args)
  153. obj.dispvm = unittest.mock.Mock(default_user='user2')
  154. new_path = obj.transfer_pass_file('/some/path')
  155. self.assertEqual(new_path, '/home/user2/QubesIncoming/dom0/path')
  156. mock_check_call.assert_called_once_with(
  157. ['qvm-copy-to-vm', 'disp-backup-restore', '/some/path'],
  158. stdout=subprocess.DEVNULL,
  159. stderr=subprocess.DEVNULL)
  160. self.assertAllCalled()
  161. def test_040_register_backup_source(self):
  162. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  163. b'0\0dom0 class=AdminVM state=Running\n'
  164. b'fedora-25 class=TemplateVM state=Halted\n'
  165. b'backup-storage class=AppVM state=Running\n'
  166. )
  167. self.app.expected_service_calls[
  168. ('backup-storage', 'qubes.RegisterBackupLocation')] = \
  169. b'someid\nsomething that should not be read'
  170. self.app.expected_calls[
  171. ('backup-storage', 'admin.vm.tag.Set', 'backup-restore-storage',
  172. None)] = b'0\0'
  173. args = unittest.mock.Mock(backup_location='/backup/path',
  174. appvm='backup-storage')
  175. obj = RestoreInDisposableVM(self.app, args)
  176. obj.dispvm = unittest.mock.Mock(default_user='user2')
  177. obj.register_backup_source()
  178. self.assertEqual(obj.storage_access_id, 'someid')
  179. self.assertEqual(self.app.service_calls, [
  180. ('backup-storage', 'qubes.RegisterBackupLocation',
  181. {'stdin':subprocess.PIPE, 'stdout':subprocess.PIPE}),
  182. ('backup-storage', 'qubes.RegisterBackupLocation', b'/backup/path\n'),
  183. ])
  184. self.assertAllCalled()
  185. def test_050_invalidate_backup_access(self):
  186. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  187. b'0\0dom0 class=AdminVM state=Running\n'
  188. b'fedora-25 class=TemplateVM state=Halted\n'
  189. b'backup-storage class=AppVM state=Running\n'
  190. )
  191. self.app.expected_calls[
  192. ('backup-storage', 'admin.vm.tag.Remove', 'backup-restore-storage',
  193. None)] = b'0\0'
  194. args = unittest.mock.Mock(backup_location='/backup/path',
  195. appvm='backup-storage')
  196. obj = RestoreInDisposableVM(self.app, args)
  197. obj.storage_access_proc = unittest.mock.Mock()
  198. obj.invalidate_backup_access()
  199. self.assertEqual(obj.storage_access_proc.mock_calls, [
  200. call.stdin.close(),
  201. call.wait(),
  202. ])
  203. self.assertAllCalled()
  204. @unittest.mock.patch('datetime.date')
  205. def test_060_finalize_tags(self, mock_date):
  206. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  207. b'0\0dom0 class=AdminVM state=Running\n'
  208. b'fedora-25 class=TemplateVM state=Halted\n'
  209. b'disp-backup-restore class=DispVM state=Running\n'
  210. b'restored1 class=AppVM state=Halted\n'
  211. b'restored2 class=AppVM state=Halted\n'
  212. )
  213. self.app.expected_calls[
  214. ('dom0', 'admin.vm.tag.Get', 'backup-restore-in-progress',
  215. None)] = b'0\x000'
  216. self.app.expected_calls[
  217. ('fedora-25', 'admin.vm.tag.Get', 'backup-restore-in-progress',
  218. None)] = b'0\x000'
  219. self.app.expected_calls[
  220. ('disp-backup-restore', 'admin.vm.tag.Get', 'backup-restore-in-progress',
  221. None)] = b'0\x000'
  222. self.app.expected_calls[
  223. ('restored1', 'admin.vm.tag.Get', 'backup-restore-in-progress',
  224. None)] = b'0\x001'
  225. self.app.expected_calls[
  226. ('restored1', 'admin.vm.tag.List', None, None)] = \
  227. b'0\0backup-restore-in-progress\n' \
  228. b'restored-from-backup-12345678\n' \
  229. b'created-by-disp-backup-restore\n'
  230. self.app.expected_calls[
  231. ('restored1', 'admin.vm.tag.Remove', 'backup-restore-in-progress',
  232. None)] = b'0\0'
  233. self.app.expected_calls[
  234. ('restored2', 'admin.vm.tag.Get', 'backup-restore-in-progress',
  235. None)] = b'0\x001'
  236. self.app.expected_calls[
  237. ('restored2', 'admin.vm.tag.List', None, None)] = \
  238. b'0\0backup-restore-in-progress\n' \
  239. b'created-by-disp-backup-restore\n'
  240. self.app.expected_calls[
  241. ('restored2', 'admin.vm.tag.Set',
  242. 'restored-from-backup-at-2019-10-01',
  243. None)] = b'0\0'
  244. self.app.expected_calls[
  245. ('restored2', 'admin.vm.tag.Remove', 'backup-restore-in-progress',
  246. None)] = b'0\0'
  247. mock_date.today.return_value = datetime.date.fromisoformat('2019-10-01')
  248. mock_date.strftime.return_value = '2019-10-01'
  249. args = unittest.mock.Mock(backup_location='/backup/path',
  250. appvm=None)
  251. obj = RestoreInDisposableVM(self.app, args)
  252. obj.finalize_tags()
  253. self.assertAllCalled()
  254. def test_070_sanitize_log(self):
  255. sanitized = RestoreInDisposableVM.sanitize_log(b'sample message')
  256. self.assertEqual(sanitized, b'sample message')
  257. sanitized = RestoreInDisposableVM.sanitize_log(
  258. b'sample message\nmultiline\n')
  259. self.assertEqual(sanitized, b'sample message\nmultiline\n')
  260. sanitized = RestoreInDisposableVM.sanitize_log(
  261. b'\033[0;33m\xff\xfe\x80')
  262. self.assertEqual(sanitized, b'.[0;33m...')
  263. def test_080_extract_log(self):
  264. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  265. b'0\0dom0 class=AdminVM state=Running\n'
  266. b'fedora-25 class=TemplateVM state=Halted\n'
  267. )
  268. args = unittest.mock.Mock(backup_location='/backup/path',
  269. appvm=None)
  270. obj = RestoreInDisposableVM(self.app, args)
  271. obj.dispvm = unittest.mock.Mock()
  272. obj.dispvm.run_with_args.return_value = b'this is a log', None
  273. backup_log = obj.extract_log()
  274. obj.dispvm.run_with_args.assert_called_once_with(
  275. 'cat', '/var/tmp/backup-restore.log',
  276. stdout=subprocess.PIPE,
  277. stderr=subprocess.DEVNULL)
  278. self.assertEqual(backup_log, b'this is a log')
  279. def test_100_run(self):
  280. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  281. b'0\0dom0 class=AdminVM state=Running\n'
  282. b'fedora-25 class=TemplateVM state=Halted\n'
  283. )
  284. args = unittest.mock.Mock(backup_location='/backup/path',
  285. pass_file=None,
  286. appvm=None)
  287. obj = RestoreInDisposableVM(self.app, args)
  288. methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
  289. 'prepare_inner_args', 'extract_log', 'finalize_tags']
  290. for m in methods:
  291. setattr(obj, m, unittest.mock.Mock())
  292. obj.extract_log.return_value = b'Some logs\nexit code: 0\n'
  293. obj.transfer_pass_file = unittest.mock.Mock()
  294. obj.prepare_inner_args.return_value = ['args']
  295. obj.terminal_app = ('terminal',)
  296. obj.dispvm = unittest.mock.Mock()
  297. with tempfile.NamedTemporaryFile() as tmp:
  298. with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
  299. tmp.name):
  300. obj.run()
  301. for m in methods:
  302. self.assertEqual(len(getattr(obj, m).mock_calls), 1)
  303. self.assertEqual(obj.dispvm.mock_calls, [
  304. call.start(),
  305. call.run_service_for_stdio('qubes.WaitForSession'),
  306. call.tags.add('backup-restore-mgmt'),
  307. call.run_with_args('terminal', 'qvm-backup-restore', 'args',
  308. stdout=subprocess.DEVNULL,
  309. stderr=subprocess.DEVNULL),
  310. call.tags.discard('backup-restore-mgmt'),
  311. call.kill()
  312. ])
  313. obj.transfer_pass_file.assert_not_called()
  314. def test_101_run_pass_file(self):
  315. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  316. b'0\0dom0 class=AdminVM state=Running\n'
  317. b'fedora-25 class=TemplateVM state=Halted\n'
  318. )
  319. args = unittest.mock.Mock(backup_location='/backup/path',
  320. pass_file='/some/path',
  321. appvm=None)
  322. obj = RestoreInDisposableVM(self.app, args)
  323. methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
  324. 'prepare_inner_args', 'extract_log', 'finalize_tags',
  325. 'transfer_pass_file']
  326. for m in methods:
  327. setattr(obj, m, unittest.mock.Mock())
  328. obj.extract_log.return_value = b'Some logs\nexit code: 0\n'
  329. obj.prepare_inner_args.return_value = ['args']
  330. obj.terminal_app = ('terminal',)
  331. obj.dispvm = unittest.mock.Mock()
  332. with tempfile.NamedTemporaryFile() as tmp:
  333. with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
  334. tmp.name):
  335. obj.run()
  336. for m in methods:
  337. self.assertEqual(len(getattr(obj, m).mock_calls), 1)
  338. self.assertEqual(obj.dispvm.mock_calls, [
  339. call.start(),
  340. call.run_service_for_stdio('qubes.WaitForSession'),
  341. call.tags.add('backup-restore-mgmt'),
  342. call.run_with_args('terminal', 'qvm-backup-restore', 'args',
  343. stdout=subprocess.DEVNULL,
  344. stderr=subprocess.DEVNULL),
  345. call.tags.discard('backup-restore-mgmt'),
  346. call.kill()
  347. ])
  348. def test_102_run_error(self):
  349. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
  350. b'0\0dom0 class=AdminVM state=Running\n'
  351. b'fedora-25 class=TemplateVM state=Halted\n'
  352. )
  353. args = unittest.mock.Mock(backup_location='/backup/path',
  354. pass_file=None,
  355. appvm=None)
  356. obj = RestoreInDisposableVM(self.app, args)
  357. methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source',
  358. 'prepare_inner_args', 'extract_log', 'finalize_tags']
  359. for m in methods:
  360. setattr(obj, m, unittest.mock.Mock())
  361. obj.extract_log.return_value = b'Some error\nexit code: 1\n'
  362. obj.transfer_pass_file = unittest.mock.Mock()
  363. obj.prepare_inner_args.return_value = ['args']
  364. obj.terminal_app = ('terminal',)
  365. obj.dispvm = unittest.mock.Mock()
  366. with tempfile.NamedTemporaryFile() as tmp:
  367. with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE',
  368. tmp.name):
  369. with self.assertRaises(qubesadmin.exc.BackupRestoreError):
  370. obj.run()
  371. for m in methods:
  372. self.assertEqual(len(getattr(obj, m).mock_calls), 1)
  373. self.assertEqual(obj.dispvm.mock_calls, [
  374. call.start(),
  375. call.run_service_for_stdio('qubes.WaitForSession'),
  376. call.tags.add('backup-restore-mgmt'),
  377. call.run_with_args('terminal', 'qvm-backup-restore', 'args',
  378. stdout=subprocess.DEVNULL,
  379. stderr=subprocess.DEVNULL),
  380. call.tags.discard('backup-restore-mgmt'),
  381. call.kill()
  382. ])
  383. obj.transfer_pass_file.assert_not_called()