tests: multi-part volume in backup tests

Create on of VM's disk volumes big enough to split it into multiple
parts. To do that, add a test of uncompressed backup.

QubesOS/qubes-issues#3167
This commit is contained in:
Marek Marczykowski-Górecki 2017-10-16 04:09:32 +02:00
parent 513163276f
commit a6ff459a1d
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724

View File

@ -324,6 +324,7 @@ parsed_qubes_xml_r2 = {
'template': None, 'template': None,
'backup_path': 'vm-templates/test-template-clone', 'backup_path': 'vm-templates/test-template-clone',
'included_in_backup': True, 'included_in_backup': True,
'root_size': 209715712,
}, },
'test-custom-template-appvm': { 'test-custom-template-appvm': {
'klass': 'AppVM', 'klass': 'AppVM',
@ -928,7 +929,7 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
self.create_sparse(self.fullpath( self.create_sparse(self.fullpath(
"vm-templates/test-template-clone/root.img"), 10*2**20) "vm-templates/test-template-clone/root.img"), 10*2**20)
self.fill_image(self.fullpath( self.fill_image(self.fullpath(
"vm-templates/test-template-clone/root.img"), 1*2**20, True, "vm-templates/test-template-clone/root.img"), 100*2**20, True,
signature=b'test-template-clone/root') signature=b'test-template-clone/root')
self.create_volatile_img(self.fullpath( self.create_volatile_img(self.fullpath(
"vm-templates/test-template-clone/volatile.img")) "vm-templates/test-template-clone/volatile.img"))
@ -1190,7 +1191,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
subdir = vm_dir subdir = vm_dir
self.handle_v3_file( self.handle_v3_file(
os.path.join(vm_dir, f_name), os.path.join(vm_dir, f_name),
subdir+'/', output, encrypted=encrypted) subdir+'/', output,
compressed=compressed,
encrypted=encrypted)
for vm_name in os.listdir(self.fullpath("vm-templates")): for vm_name in os.listdir(self.fullpath("vm-templates")):
vm_dir = os.path.join("vm-templates", vm_name) vm_dir = os.path.join("vm-templates", vm_name)
@ -1200,7 +1203,9 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
subdir = vm_dir subdir = vm_dir
self.handle_v3_file( self.handle_v3_file(
os.path.join(vm_dir, "."), os.path.join(vm_dir, "."),
subdir+'/', output, encrypted=encrypted) subdir+'/', output,
compressed=compressed,
encrypted=encrypted)
output.close() output.close()
@ -1508,6 +1513,67 @@ class TC_10_BackupCompatibility(qubesadmin.tests.backup.BackupTestCase):
self.assertAllCalled() self.assertAllCalled()
def test_230_r2_uncompressed(self):
self.create_v3_backup(False, False)
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = (
b'0\0dom0 class=AdminVM state=Running\n'
b'fedora-25 class=TemplateVM state=Halted\n'
b'testvm class=AppVM state=Running\n'
)
self.app.expected_calls[
('dom0', 'admin.property.Get', 'default_template', None)] = \
b'0\0default=no type=vm fedora-25'
self.setup_expected_calls(parsed_qubes_xml_r2, templates_map={
'fedora-20-x64': 'fedora-25'
})
firewall_data = (
'action=accept specialtarget=dns\n'
'action=accept proto=icmp\n'
'action=accept proto=tcp dstports=22-22\n'
'action=accept proto=tcp dstports=9418-9418\n'
'action=accept proto=tcp dst4=192.168.0.1/32 dstports=1234-1234\n'
'action=accept proto=tcp dsthost=fedorahosted.org dstports=443-443\n'
'action=accept proto=tcp dsthost=xenbits.xen.org dstports=80-80\n'
'action=drop\n'
)
self.app.expected_calls[
('test-work', 'admin.vm.firewall.Set', None,
firewall_data.encode())] = b'0\0'
self.app.expected_calls[
('test-custom-template-appvm', 'admin.vm.firewall.Set', None,
firewall_data.encode())] = b'0\0'
qubesd_calls_queue = multiprocessing.Queue()
patches = [
mock.patch('qubesadmin.storage.Volume',
functools.partial(MockVolume, qubesd_calls_queue)),
mock.patch(
'qubesadmin.backup.restore.BackupRestore._handle_appmenus_list',
functools.partial(self.mock_appmenus, qubesd_calls_queue)),
mock.patch(
'qubesadmin.firewall.Firewall',
functools.partial(MockFirewall, qubesd_calls_queue)),
]
for patch in patches:
patch.start()
try:
self.restore_backup(self.fullpath("backup.bin"), options={
'use-default-template': True,
'use-default-netvm': True,
})
finally:
for patch in patches:
patch.stop()
# retrieve calls from other multiprocess.Process instances
while not qubesd_calls_queue.empty():
call_args = qubesd_calls_queue.get()
self.app.qubesd_call(*call_args)
qubesd_calls_queue.close()
self.assertAllCalled()
@unittest.skipUnless(spawn.find_executable('scrypt'), @unittest.skipUnless(spawn.find_executable('scrypt'),
"scrypt not installed") "scrypt not installed")
def test_230_r4(self): def test_230_r4(self):