tests: check rejecting/accepting compression filters

3 new tests:
- custom (common) compression filter
- custom (uncommon) compression filter - should be rejected
- custom (uncommon) compression filter forced - accepted
This commit is contained in:
Marek Marczykowski-Górecki 2019-09-08 02:43:09 +02:00
parent 14f77860bf
commit 48ad8325d0
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
2 changed files with 189 additions and 12 deletions

View File

@ -169,7 +169,7 @@ class BackupTestCase(qubesadmin.tests.QubesTestCase):
def restore_backup(self, source=None, appvm=None, options=None,
expect_errors=None, manipulate_restore_info=None,
passphrase='qubes'):
passphrase='qubes', force_compression_filter=None):
if source is None:
backupfile = os.path.join(self.backupdir,
sorted(os.listdir(self.backupdir))[-1])
@ -178,7 +178,8 @@ class BackupTestCase(qubesadmin.tests.QubesTestCase):
with self.assertNotRaises(qubesadmin.exc.QubesException):
restore_op = qubesadmin.backup.restore.BackupRestore(
self.app, backupfile, appvm, passphrase)
self.app, backupfile, appvm, passphrase,
force_compression_filter=force_compression_filter)
if options:
for key, value in options.items():
setattr(restore_op.options, key, value)

View File

@ -32,6 +32,8 @@ import logging
import time
import contextlib
try:
import unittest.mock as mock
except ImportError:
@ -44,6 +46,7 @@ import sys
import qubesadmin.backup.core2
import qubesadmin.backup.core3
import qubesadmin.exc
import qubesadmin.firewall
import qubesadmin.storage
import qubesadmin.tests
@ -136,7 +139,7 @@ BACKUP_HEADER_R4 = '''version=4
hmac-algorithm=scrypt
encrypted=True
compressed={compressed}
compression-filter=gzip
compression-filter={compression_filter}
backup-id=20161020T123455-1234
'''
@ -1166,7 +1169,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
self.append_backup_stream(part_with_dir+".hmac", stream,
basedir=self.fullpath("stage1"))
def handle_v4_file(self, f_name, subdir, stream, compressed=True):
def handle_v4_file(self, f_name, subdir, stream, compressed="gzip"):
# create inner archive
tar_cmdline = ["tar", "-Pc", '--sparse',
'-C', self.fullpath(os.path.dirname(f_name)),
@ -1175,7 +1178,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
subdir),
os.path.basename(f_name)
]
if compressed:
if compressed and isinstance(compressed, str):
tar_cmdline.insert(-1, "--use-compress-program=%s" % compressed)
elif compressed:
tar_cmdline.insert(-1, "--use-compress-program=%s" % "gzip")
tar = subprocess.Popen(tar_cmdline, stdout=subprocess.PIPE)
data = tar.stdout
@ -1268,7 +1273,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
output.close()
def create_v4_backup(self, compressed=True):
def create_v4_backup(self, compressed="gzip"):
"""
Create "backup format 4" backup - used in R4.0
@ -1278,7 +1283,8 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
output = open(self.fullpath("backup.bin"), "w")
with open(self.fullpath("backup-header"), "w") as f:
f.write(BACKUP_HEADER_R4.format(
compressed=str(compressed)
compressed=str(bool(compressed)),
compression_filter=(compressed if compressed else "gzip")
))
self.scrypt_encrypt("backup-header", output_name='backup-header.hmac')
self.append_backup_stream("backup-header", output)
@ -1662,7 +1668,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
@unittest.skipUnless(spawn.find_executable('scrypt'),
"scrypt not installed")
def test_230_r4(self):
self.create_v4_backup(False)
self.create_v4_backup("")
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
@ -1731,7 +1737,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
@unittest.skipUnless(spawn.find_executable('scrypt'),
"scrypt not installed")
def test_230_r4_compressed(self):
self.create_v4_backup(True)
self.create_v4_backup("gzip")
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
@ -1798,16 +1804,186 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
self.assertDom0Restored(dummy_timestamp)
@unittest.skipUnless(spawn.find_executable('scrypt'),
"scrypt not installed")
def test_230_r4_custom_cmpression(self):
self.create_v4_backup("bzip2")
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
b'sys-net class=AppVM state=Running\n'
)
self.app.expected_calls[
('dom0', 'admin.property.Get', 'default_template', None)] = \
b'0\0default=no type=vm fedora-25'
self.app.expected_calls[
('sys-net', 'admin.vm.property.Get', 'provides_network', None)] = \
b'0\0default=no type=bool True'
self.setup_expected_calls(parsed_qubes_xml_v4, templates_map={
'debian-8': 'fedora-25'
})
firewall_data = (
'action=accept specialtarget=dns\n'
'action=accept proto=icmp\n'
'action=accept proto=tcp dstports=22-22\n'
'action=accept proto=tcp dsthost=www.qubes-os.org '
'dstports=443-443\n'
'action=accept proto=tcp dst4=192.168.0.0/24\n'
'action=drop\n'
)
self.app.expected_calls[
('test-work', 'admin.vm.firewall.Set', None,
firewall_data.encode())] = b'0\0'
qubesd_calls_queue = multiprocessing.Queue()
dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
patches = [
mock.patch('qubesadmin.storage.Volume',
functools.partial(MockVolume, qubesd_calls_queue)),
mock.patch(
'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list',
functools.partial(self.mock_appmenus, qubesd_calls_queue)),
mock.patch(
'qubesadmin.firewall.Firewall',
functools.partial(MockFirewall, qubesd_calls_queue)),
mock.patch(
'time.strftime',
return_value=dummy_timestamp)
]
for patch in patches:
patch.start()
try:
self.restore_backup(self.fullpath("backup.bin"), options={
'use-default-template': True,
'use-default-netvm': True,
})
finally:
for patch in patches:
patch.stop()
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
self.assertDom0Restored(dummy_timestamp)
@unittest.skipUnless(spawn.find_executable('scrypt'),
"scrypt not installed")
def test_230_r4_uncommon_compression(self):
self.create_v4_backup("less")
qubesd_calls_queue = multiprocessing.Queue()
dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
patches = [
mock.patch('qubesadmin.storage.Volume',
functools.partial(MockVolume, qubesd_calls_queue)),
mock.patch(
'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list',
functools.partial(self.mock_appmenus, qubesd_calls_queue)),
mock.patch(
'qubesadmin.firewall.Firewall',
functools.partial(MockFirewall, qubesd_calls_queue)),
mock.patch(
'time.strftime',
return_value=dummy_timestamp)
]
for patch in patches:
patch.start()
try:
with self.assertRaises(qubesadmin.exc.QubesException):
qubesadmin.backup.restore.BackupRestore(
self.app, self.fullpath("backup.bin"), None, 'qubes')
finally:
for patch in patches:
patch.stop()
@unittest.skipUnless(spawn.find_executable('scrypt'),
"scrypt not installed")
def test_230_r4_uncommon_cmpression_forced(self):
self.create_v4_backup("less")
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
b'sys-net class=AppVM state=Running\n'
)
self.app.expected_calls[
('dom0', 'admin.property.Get', 'default_template', None)] = \
b'0\0default=no type=vm fedora-25'
self.app.expected_calls[
('sys-net', 'admin.vm.property.Get', 'provides_network', None)] = \
b'0\0default=no type=bool True'
self.setup_expected_calls(parsed_qubes_xml_v4, templates_map={
'debian-8': 'fedora-25'
})
firewall_data = (
'action=accept specialtarget=dns\n'
'action=accept proto=icmp\n'
'action=accept proto=tcp dstports=22-22\n'
'action=accept proto=tcp dsthost=www.qubes-os.org '
'dstports=443-443\n'
'action=accept proto=tcp dst4=192.168.0.0/24\n'
'action=drop\n'
)
self.app.expected_calls[
('test-work', 'admin.vm.firewall.Set', None,
firewall_data.encode())] = b'0\0'
qubesd_calls_queue = multiprocessing.Queue()
dummy_timestamp = time.strftime("test-%Y-%m-%d-%H%M%S")
patches = [
mock.patch('qubesadmin.storage.Volume',
functools.partial(MockVolume, qubesd_calls_queue)),
mock.patch(
'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list',
functools.partial(self.mock_appmenus, qubesd_calls_queue)),
mock.patch(
'qubesadmin.firewall.Firewall',
functools.partial(MockFirewall, qubesd_calls_queue)),
mock.patch(
'time.strftime',
return_value=dummy_timestamp)
]
for patch in patches:
patch.start()
try:
self.restore_backup(self.fullpath("backup.bin"), options={
'use-default-template': True,
'use-default-netvm': True,
}, force_compression_filter='less')
finally:
for patch in patches:
patch.stop()
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
self.assertDom0Restored(dummy_timestamp)
class TC_11_BackupCompatibilityIntoLVM(TC_10_BackupCompatibility):
storage_pool = 'some-pool'
def restore_backup(self, source=None, appvm=None, options=None,
expect_errors=None, manipulate_restore_info=None,
passphrase='qubes'):
**kwargs):
if options is None:
options = {}
options['override_pool'] = self.storage_pool
super(TC_11_BackupCompatibilityIntoLVM, self).restore_backup(source,
appvm, options, expect_errors, manipulate_restore_info)
appvm, options, **kwargs)