From 48ad8325d086c54198940cfe39a87f3101d90111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Sun, 8 Sep 2019 02:43:09 +0200 Subject: [PATCH] tests: check rejecting/accepting compression filters 3 new tests: - custom (common) compression filter - custom (uncommon) compression filter - should be rejected - custom (uncommon) compression filter forced - accepted --- qubesadmin/tests/backup/__init__.py | 5 +- .../tests/backup/backupcompatibility.py | 196 +++++++++++++++++- 2 files changed, 189 insertions(+), 12 deletions(-) diff --git a/qubesadmin/tests/backup/__init__.py b/qubesadmin/tests/backup/__init__.py index 4538fb3..5553850 100644 --- a/qubesadmin/tests/backup/__init__.py +++ b/qubesadmin/tests/backup/__init__.py @@ -169,7 +169,7 @@ class BackupTestCase(qubesadmin.tests.QubesTestCase): def restore_backup(self, source=None, appvm=None, options=None, expect_errors=None, manipulate_restore_info=None, - passphrase='qubes'): + passphrase='qubes', force_compression_filter=None): if source is None: backupfile = os.path.join(self.backupdir, sorted(os.listdir(self.backupdir))[-1]) @@ -178,7 +178,8 @@ class BackupTestCase(qubesadmin.tests.QubesTestCase): with self.assertNotRaises(qubesadmin.exc.QubesException): restore_op = qubesadmin.backup.restore.BackupRestore( - self.app, backupfile, appvm, passphrase) + self.app, backupfile, appvm, passphrase, + force_compression_filter=force_compression_filter) if options: for key, value in options.items(): setattr(restore_op.options, key, value) diff --git a/qubesadmin/tests/backup/backupcompatibility.py b/qubesadmin/tests/backup/backupcompatibility.py index 1b88371..c4d7b11 100644 --- a/qubesadmin/tests/backup/backupcompatibility.py +++ b/qubesadmin/tests/backup/backupcompatibility.py @@ -32,6 +32,8 @@ import logging import time +import contextlib + try: import unittest.mock as mock except ImportError: @@ -44,6 +46,7 @@ import sys import qubesadmin.backup.core2 import qubesadmin.backup.core3 +import qubesadmin.exc import qubesadmin.firewall import qubesadmin.storage import qubesadmin.tests @@ -136,7 +139,7 @@ BACKUP_HEADER_R4 = '''version=4 hmac-algorithm=scrypt encrypted=True compressed={compressed} -compression-filter=gzip +compression-filter={compression_filter} backup-id=20161020T123455-1234 ''' @@ -1166,7 +1169,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): self.append_backup_stream(part_with_dir+".hmac", stream, basedir=self.fullpath("stage1")) - def handle_v4_file(self, f_name, subdir, stream, compressed=True): + def handle_v4_file(self, f_name, subdir, stream, compressed="gzip"): # create inner archive tar_cmdline = ["tar", "-Pc", '--sparse', '-C', self.fullpath(os.path.dirname(f_name)), @@ -1175,7 +1178,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): subdir), os.path.basename(f_name) ] - if compressed: + if compressed and isinstance(compressed, str): + tar_cmdline.insert(-1, "--use-compress-program=%s" % compressed) + elif compressed: tar_cmdline.insert(-1, "--use-compress-program=%s" % "gzip") tar = subprocess.Popen(tar_cmdline, stdout=subprocess.PIPE) data = tar.stdout @@ -1268,7 +1273,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): output.close() - def create_v4_backup(self, compressed=True): + def create_v4_backup(self, compressed="gzip"): """ Create "backup format 4" backup - used in R4.0 @@ -1278,7 +1283,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): output = open(self.fullpath("backup.bin"), "w") with open(self.fullpath("backup-header"), "w") as f: f.write(BACKUP_HEADER_R4.format( - compressed=str(compressed) + compressed=str(bool(compressed)), + compression_filter=(compressed if compressed else "gzip") )) self.scrypt_encrypt("backup-header", output_name='backup-header.hmac') self.append_backup_stream("backup-header", output) @@ -1662,7 +1668,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): @unittest.skipUnless(spawn.find_executable('scrypt'), "scrypt not installed") def test_230_r4(self): - self.create_v4_backup(False) + self.create_v4_backup("") self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' b'fedora-25 class=TemplateVM state=Halted\n' @@ -1731,7 +1737,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): @unittest.skipUnless(spawn.find_executable('scrypt'), "scrypt not installed") def test_230_r4_compressed(self): - self.create_v4_backup(True) + self.create_v4_backup("gzip") self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( b'0\0dom0 class=AdminVM state=Running\n' @@ -1798,16 +1804,186 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase): self.assertDom0Restored(dummy_timestamp) + @unittest.skipUnless(spawn.find_executable('scrypt'), + "scrypt not installed") + def test_230_r4_custom_cmpression(self): + self.create_v4_backup("bzip2") + + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( + b'0\0dom0 class=AdminVM state=Running\n' + b'fedora-25 class=TemplateVM state=Halted\n' + b'testvm class=AppVM state=Running\n' + b'sys-net class=AppVM state=Running\n' + ) + self.app.expected_calls[ + ('dom0', 'admin.property.Get', 'default_template', None)] = \ + b'0\0default=no type=vm fedora-25' + self.app.expected_calls[ + ('sys-net', 'admin.vm.property.Get', 'provides_network', None)] = \ + b'0\0default=no type=bool True' + self.setup_expected_calls(parsed_qubes_xml_v4, templates_map={ + 'debian-8': 'fedora-25' + }) + firewall_data = ( + 'action=accept specialtarget=dns\n' + 'action=accept proto=icmp\n' + 'action=accept proto=tcp dstports=22-22\n' + 'action=accept proto=tcp dsthost=www.qubes-os.org ' + 'dstports=443-443\n' + 'action=accept proto=tcp dst4=192.168.0.0/24\n' + 'action=drop\n' + ) + self.app.expected_calls[ + ('test-work', 'admin.vm.firewall.Set', None, + firewall_data.encode())] = b'0\0' + + qubesd_calls_queue = multiprocessing.Queue() + + dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S") + patches = [ + mock.patch('qubesadmin.storage.Volume', + functools.partial(MockVolume, qubesd_calls_queue)), + mock.patch( + 'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list', + functools.partial(self.mock_appmenus, qubesd_calls_queue)), + mock.patch( + 'qubesadmin.firewall.Firewall', + functools.partial(MockFirewall, qubesd_calls_queue)), + mock.patch( + 'time.strftime', + return_value=dummy_timestamp) + ] + for patch in patches: + patch.start() + try: + self.restore_backup(self.fullpath("backup.bin"), options={ + 'use-default-template': True, + 'use-default-netvm': True, + }) + finally: + for patch in patches: + patch.stop() + + # retrieve calls from other multiprocess.Process instances + while not qubesd_calls_queue.empty(): + call_args = qubesd_calls_queue.get() + self.app.qubesd_call(*call_args) + qubesd_calls_queue.close() + + self.assertAllCalled() + + self.assertDom0Restored(dummy_timestamp) + + @unittest.skipUnless(spawn.find_executable('scrypt'), + "scrypt not installed") + def test_230_r4_uncommon_compression(self): + self.create_v4_backup("less") + + qubesd_calls_queue = multiprocessing.Queue() + + dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S") + patches = [ + mock.patch('qubesadmin.storage.Volume', + functools.partial(MockVolume, qubesd_calls_queue)), + mock.patch( + 'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list', + functools.partial(self.mock_appmenus, qubesd_calls_queue)), + mock.patch( + 'qubesadmin.firewall.Firewall', + functools.partial(MockFirewall, qubesd_calls_queue)), + mock.patch( + 'time.strftime', + return_value=dummy_timestamp) + ] + for patch in patches: + patch.start() + try: + with self.assertRaises(qubesadmin.exc.QubesException): + qubesadmin.backup.restore.BackupRestore( + self.app, self.fullpath("backup.bin"), None, 'qubes') + finally: + for patch in patches: + patch.stop() + + @unittest.skipUnless(spawn.find_executable('scrypt'), + "scrypt not installed") + def test_230_r4_uncommon_cmpression_forced(self): + self.create_v4_backup("less") + + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( + b'0\0dom0 class=AdminVM state=Running\n' + b'fedora-25 class=TemplateVM state=Halted\n' + b'testvm class=AppVM state=Running\n' + b'sys-net class=AppVM state=Running\n' + ) + self.app.expected_calls[ + ('dom0', 'admin.property.Get', 'default_template', None)] = \ + b'0\0default=no type=vm fedora-25' + self.app.expected_calls[ + ('sys-net', 'admin.vm.property.Get', 'provides_network', None)] = \ + b'0\0default=no type=bool True' + self.setup_expected_calls(parsed_qubes_xml_v4, templates_map={ + 'debian-8': 'fedora-25' + }) + firewall_data = ( + 'action=accept specialtarget=dns\n' + 'action=accept proto=icmp\n' + 'action=accept proto=tcp dstports=22-22\n' + 'action=accept proto=tcp dsthost=www.qubes-os.org ' + 'dstports=443-443\n' + 'action=accept proto=tcp dst4=192.168.0.0/24\n' + 'action=drop\n' + ) + self.app.expected_calls[ + ('test-work', 'admin.vm.firewall.Set', None, + firewall_data.encode())] = b'0\0' + + qubesd_calls_queue = multiprocessing.Queue() + + dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S") + patches = [ + mock.patch('qubesadmin.storage.Volume', + functools.partial(MockVolume, qubesd_calls_queue)), + mock.patch( + 'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list', + functools.partial(self.mock_appmenus, qubesd_calls_queue)), + mock.patch( + 'qubesadmin.firewall.Firewall', + functools.partial(MockFirewall, qubesd_calls_queue)), + mock.patch( + 'time.strftime', + return_value=dummy_timestamp) + ] + for patch in patches: + patch.start() + try: + self.restore_backup(self.fullpath("backup.bin"), options={ + 'use-default-template': True, + 'use-default-netvm': True, + }, force_compression_filter='less') + finally: + for patch in patches: + patch.stop() + + # retrieve calls from other multiprocess.Process instances + while not qubesd_calls_queue.empty(): + call_args = qubesd_calls_queue.get() + self.app.qubesd_call(*call_args) + qubesd_calls_queue.close() + + self.assertAllCalled() + + self.assertDom0Restored(dummy_timestamp) + class TC_11_BackupCompatibilityIntoLVM(TC_10_BackupCompatibility): storage_pool = 'some-pool' def restore_backup(self, source=None, appvm=None, options=None, - expect_errors=None, manipulate_restore_info=None, - passphrase='qubes'): + **kwargs): if options is None: options = {} options['override_pool'] = self.storage_pool super(TC_11_BackupCompatibilityIntoLVM, self).restore_backup(source, - appvm, options, expect_errors, manipulate_restore_info) + appvm, options, **kwargs)