# -*- encoding: utf8 -*- # # The Qubes OS Project, http://www.qubes-os.org # # Copyright (C) 2019 Marek Marczykowski-Górecki # # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation; either version 2.1 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License along # with this program; if not, see . import datetime import tempfile import unittest import unittest.mock from unittest.mock import call import subprocess import qubesadmin.tests from qubesadmin.tools import qvm_backup_restore from qubesadmin.backup.dispvm import RestoreInDisposableVM class TC_00_RestoreInDispVM(qubesadmin.tests.QubesTestCase): def setUp(self): super().setUp() def test_000_prepare_inner_args(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' ) argv = ['--verbose', '--skip-broken', '--skip-dom0-home', '--dest-vm', 'testvm', '--compression-filter', 'gzip', '/backup/location'] args = qvm_backup_restore.parser.parse_args(argv) obj = RestoreInDisposableVM(self.app, args) obj.storage_access_id = 'abc' reconstructed_argv = obj.prepare_inner_args() expected_argv = argv[:-1] + \ ['--location-is-service', 'qubes.RestoreById+abc'] self.assertCountEqual(expected_argv, reconstructed_argv) def test_001_prepare_inner_args_exclude(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' ) argv = ['--exclude', 'vm1', '--exclude', 'vm2', '/backup/location'] args = qvm_backup_restore.parser.parse_args(argv) print(repr(args)) obj = RestoreInDisposableVM(self.app, args) obj.storage_access_id = 'abc' reconstructed_argv = obj.prepare_inner_args() expected_argv = argv[:-1] + \ ['--location-is-service', 'qubes.RestoreById+abc'] self.assertCountEqual(expected_argv, reconstructed_argv) def test_002_prepare_inner_args_pass_file(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' ) argv = ['--passphrase-file=/tmp/some/file', '/backup/location'] args = qvm_backup_restore.parser.parse_args(argv) print(repr(args)) obj = RestoreInDisposableVM(self.app, args) obj.storage_access_id = 'abc' reconstructed_argv = obj.prepare_inner_args() expected_argv = ['--passphrase-file', '/tmp/some/file', '--location-is-service', 'qubes.RestoreById+abc'] self.assertEqual(expected_argv, reconstructed_argv) def test_003_prepare_inner_args_auto_close(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' ) argv = ['--auto-close', '/backup/location'] args = qvm_backup_restore.parser.parse_args(argv) print(repr(args)) obj = RestoreInDisposableVM(self.app, args) obj.storage_access_id = 'abc' reconstructed_argv = obj.prepare_inner_args() expected_argv = ['--location-is-service', 'qubes.RestoreById+abc'] self.assertEqual(expected_argv, reconstructed_argv) def test_010_clear_old_tags(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' ) for tag in ('backup-restore-mgmt', 'backup-restore-in-progress', 'backup-restore-storage'): self.app.expected_calls[ ('dom0', 'admin.vm.tag.Remove', tag, None)] = \ b'2\x00QubesTagNotFoundError\x00\x00Tag not found\x00' self.app.expected_calls[ ('fedora-25', 'admin.vm.tag.Remove', tag, None)] = b'0\0' self.app.expected_calls[ ('testvm', 'admin.vm.tag.Remove', tag, None)] = b'0\0' args = unittest.mock.Mock(appvm='testvm') obj = RestoreInDisposableVM(self.app, args) obj.clear_old_tags() self.assertAllCalled() @unittest.mock.patch('subprocess.check_call') def test_020_create_dispvm(self, mock_check_call): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' b'mgmt-dvm class=AppVM state=Halted\n' # this should be only after creating... b'disp-backup-restore class=DispVM state=Halted\n' ) self.app.expected_calls[ ('dom0', 'admin.property.Get', 'management_dispvm', None)] = \ b'0\0default=False type=vm mgmt-dvm' self.app.expected_calls[ ('dom0', 'admin.vm.Create.DispVM', 'mgmt-dvm', b'name=disp-backup-restore label=red')] = b'0\0' self.app.expected_calls[ ('disp-backup-restore', 'admin.vm.property.Set', 'auto_cleanup', b'True')] = \ b'0\0' self.app.expected_calls[ ('disp-backup-restore', 'admin.vm.feature.Set', 'tag-created-vm-with', b'backup-restore-in-progress')] = \ b'0\0' args = unittest.mock.Mock(appvm='dom0') obj = RestoreInDisposableVM(self.app, args) obj.create_dispvm() self.assertAllCalled() @unittest.mock.patch('subprocess.check_call') @unittest.mock.patch('os.uname') def test_030_transfer_pass_file(self, mock_uname, mock_check_call): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'testvm class=AppVM state=Running\n' ) mock_uname.return_value = ('Linux', 'dom0', '5.0.0', '#1', 'x86_64') args = unittest.mock.Mock(appvm='testvm') obj = RestoreInDisposableVM(self.app, args) obj.dispvm = unittest.mock.Mock(default_user='user2') new_path = obj.transfer_pass_file('/some/path') self.assertEqual(new_path, '/home/user2/QubesIncoming/dom0/path') mock_check_call.assert_called_once_with( ['qvm-copy-to-vm', 'disp-backup-restore', '/some/path'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) self.assertAllCalled() def test_040_register_backup_source(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'backup-storage class=AppVM state=Running\n' ) self.app.expected_service_calls[ ('backup-storage', 'qubes.RegisterBackupLocation')] = \ b'someid\nsomething that should not be read' self.app.expected_calls[ ('backup-storage', 'admin.vm.tag.Set', 'backup-restore-storage', None)] = b'0\0' args = unittest.mock.Mock(backup_location='/backup/path', appvm='backup-storage') obj = RestoreInDisposableVM(self.app, args) obj.dispvm = unittest.mock.Mock(default_user='user2') obj.register_backup_source() self.assertEqual(obj.storage_access_id, 'someid') self.assertEqual(self.app.service_calls, [ ('backup-storage', 'qubes.RegisterBackupLocation', {'stdin':subprocess.PIPE, 'stdout':subprocess.PIPE}), ('backup-storage', 'qubes.RegisterBackupLocation', b'/backup/path\n'), ]) self.assertAllCalled() def test_050_invalidate_backup_access(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'backup-storage class=AppVM state=Running\n' ) self.app.expected_calls[ ('backup-storage', 'admin.vm.tag.Remove', 'backup-restore-storage', None)] = b'0\0' args = unittest.mock.Mock(backup_location='/backup/path', appvm='backup-storage') obj = RestoreInDisposableVM(self.app, args) obj.storage_access_proc = unittest.mock.Mock() obj.invalidate_backup_access() self.assertEqual(obj.storage_access_proc.mock_calls, [ call.stdin.close(), call.wait(), ]) self.assertAllCalled() @unittest.mock.patch('datetime.date') def test_060_finalize_tags(self, mock_date): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' b'disp-backup-restore class=DispVM state=Running\n' b'restored1 class=AppVM state=Halted\n' b'restored2 class=AppVM state=Halted\n' ) self.app.expected_calls[ ('dom0', 'admin.vm.tag.Get', 'backup-restore-in-progress', None)] = b'0\x000' self.app.expected_calls[ ('fedora-25', 'admin.vm.tag.Get', 'backup-restore-in-progress', None)] = b'0\x000' self.app.expected_calls[ ('disp-backup-restore', 'admin.vm.tag.Get', 'backup-restore-in-progress', None)] = b'0\x000' self.app.expected_calls[ ('restored1', 'admin.vm.tag.Get', 'backup-restore-in-progress', None)] = b'0\x001' self.app.expected_calls[ ('restored1', 'admin.vm.tag.List', None, None)] = \ b'0\0backup-restore-in-progress\n' \ b'restored-from-backup-12345678\n' \ b'created-by-disp-backup-restore\n' self.app.expected_calls[ ('restored1', 'admin.vm.tag.Remove', 'backup-restore-in-progress', None)] = b'0\0' self.app.expected_calls[ ('restored2', 'admin.vm.tag.Get', 'backup-restore-in-progress', None)] = b'0\x001' self.app.expected_calls[ ('restored2', 'admin.vm.tag.List', None, None)] = \ b'0\0backup-restore-in-progress\n' \ b'created-by-disp-backup-restore\n' self.app.expected_calls[ ('restored2', 'admin.vm.tag.Set', 'restored-from-backup-at-2019-10-01', None)] = b'0\0' self.app.expected_calls[ ('restored2', 'admin.vm.tag.Remove', 'backup-restore-in-progress', None)] = b'0\0' mock_date.today.return_value = datetime.date.fromisoformat('2019-10-01') mock_date.strftime.return_value = '2019-10-01' args = unittest.mock.Mock(backup_location='/backup/path', appvm=None) obj = RestoreInDisposableVM(self.app, args) obj.finalize_tags() self.assertAllCalled() def test_070_sanitize_log(self): sanitized = RestoreInDisposableVM.sanitize_log(b'sample message') self.assertEqual(sanitized, b'sample message') sanitized = RestoreInDisposableVM.sanitize_log( b'sample message\nmultiline\n') self.assertEqual(sanitized, b'sample message\nmultiline\n') sanitized = RestoreInDisposableVM.sanitize_log( b'\033[0;33m\xff\xfe\x80') self.assertEqual(sanitized, b'.[0;33m...') def test_080_extract_log(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' ) args = unittest.mock.Mock(backup_location='/backup/path', appvm=None) obj = RestoreInDisposableVM(self.app, args) obj.dispvm = unittest.mock.Mock() obj.dispvm.run_with_args.return_value = b'this is a log', None backup_log = obj.extract_log() obj.dispvm.run_with_args.assert_called_once_with( 'cat', '/var/tmp/backup-restore.log', stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) self.assertEqual(backup_log, b'this is a log') def test_100_run(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' ) args = unittest.mock.Mock(backup_location='/backup/path', pass_file=None, appvm=None) obj = RestoreInDisposableVM(self.app, args) methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source', 'prepare_inner_args', 'extract_log', 'finalize_tags'] for m in methods: setattr(obj, m, unittest.mock.Mock()) obj.extract_log.return_value = b'Some logs\nexit code: 0\n' obj.transfer_pass_file = unittest.mock.Mock() obj.prepare_inner_args.return_value = ['args'] obj.terminal_app = ('terminal',) obj.dispvm = unittest.mock.Mock() with tempfile.NamedTemporaryFile() as tmp: with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE', tmp.name): obj.run() for m in methods: self.assertEqual(len(getattr(obj, m).mock_calls), 1) self.assertEqual(obj.dispvm.mock_calls, [ call.start(), call.run_service_for_stdio('qubes.WaitForSession'), call.tags.add('backup-restore-mgmt'), call.run_with_args('terminal', 'qvm-backup-restore', 'args', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL), call.tags.discard('backup-restore-mgmt'), call.kill() ]) obj.transfer_pass_file.assert_not_called() def test_101_run_pass_file(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' ) args = unittest.mock.Mock(backup_location='/backup/path', pass_file='/some/path', appvm=None) obj = RestoreInDisposableVM(self.app, args) methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source', 'prepare_inner_args', 'extract_log', 'finalize_tags', 'transfer_pass_file'] for m in methods: setattr(obj, m, unittest.mock.Mock()) obj.extract_log.return_value = b'Some logs\nexit code: 0\n' obj.prepare_inner_args.return_value = ['args'] obj.terminal_app = ('terminal',) obj.dispvm = unittest.mock.Mock() with tempfile.NamedTemporaryFile() as tmp: with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE', tmp.name): obj.run() for m in methods: self.assertEqual(len(getattr(obj, m).mock_calls), 1) self.assertEqual(obj.dispvm.mock_calls, [ call.start(), call.run_service_for_stdio('qubes.WaitForSession'), call.tags.add('backup-restore-mgmt'), call.run_with_args('terminal', 'qvm-backup-restore', 'args', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL), call.tags.discard('backup-restore-mgmt'), call.kill() ]) def test_102_run_error(self): self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' ) args = unittest.mock.Mock(backup_location='/backup/path', pass_file=None, appvm=None) obj = RestoreInDisposableVM(self.app, args) methods = ['create_dispvm', 'clear_old_tags', 'register_backup_source', 'prepare_inner_args', 'extract_log', 'finalize_tags'] for m in methods: setattr(obj, m, unittest.mock.Mock()) obj.extract_log.return_value = b'Some error\nexit code: 1\n' obj.transfer_pass_file = unittest.mock.Mock() obj.prepare_inner_args.return_value = ['args'] obj.terminal_app = ('terminal',) obj.dispvm = unittest.mock.Mock() with tempfile.NamedTemporaryFile() as tmp: with unittest.mock.patch('qubesadmin.backup.dispvm.LOCKFILE', tmp.name): with self.assertRaises(qubesadmin.exc.BackupRestoreError): obj.run() for m in methods: self.assertEqual(len(getattr(obj, m).mock_calls), 1) self.assertEqual(obj.dispvm.mock_calls, [ call.start(), call.run_service_for_stdio('qubes.WaitForSession'), call.tags.add('backup-restore-mgmt'), call.run_with_args('terminal', 'qvm-backup-restore', 'args', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL), call.tags.discard('backup-restore-mgmt'), call.kill() ]) obj.transfer_pass_file.assert_not_called()