Merge remote-tracking branch 'origin/pr/273'

* origin/pr/273:
  tests: check importing empty data into ReflinkVolume
  tests: check importing empty data into ThinVolume
  tests: check importing empty data into FileVolume
  tests: improve cleanup after LVM tests
This commit is contained in:
Marek Marczykowski-Górecki 2019-07-31 15:37:13 +02:00
commit 9e226ab5cf
No known key found for this signature in database
GPG Key ID: 063938BA42CFA724
4 changed files with 142 additions and 4 deletions

View File

@ -52,6 +52,7 @@ class TestVM(object):
def __init__(self, test, template=None):
self.app = test.app
self.name = test.make_vm_name('appvm')
self.dir_path_prefix = 'appvms'
self.dir_path = '/var/lib/qubes/appvms/' + self.name
self.log = qubes.log.get_vm_logger(self.name)

View File

@ -360,6 +360,29 @@ class TC_01_FileVolumes(qubes.tests.QubesTestCase):
volume_data = volume_file.read().strip('\0')
self.assertNotEqual(volume_data, 'test')
def test_022_import_data_empty(self):
config = {
'name': 'root',
'pool': self.POOL_NAME,
'save_on_stop': True,
'rw': True,
'size': 1024 * 1024,
}
vm = qubes.tests.storage.TestVM(self)
volume = self.app.get_pool(self.POOL_NAME).init_volume(vm, config)
volume.create()
with open(volume.path, 'w') as vol_file:
vol_file.write('test data')
import_path = volume.import_data()
self.assertNotEqual(volume.path, import_path)
with open(import_path, 'w+'):
pass
volume.import_data_end(True)
self.assertFalse(os.path.exists(import_path), import_path)
with open(volume.path) as volume_file:
volume_data = volume_file.read().strip('\0')
self.assertNotEqual(volume_data, 'test data')
def assertVolumePath(self, vm, dev_name, expected, rw=True):
# :pylint: disable=invalid-name
volumes = vm.volumes

View File

@ -79,8 +79,24 @@ class ThinPoolBase(qubes.tests.QubesTestCase):
self.app.add_pool(**POOL_CONF))
self.created_pool = True
def cleanup_test_volumes(self):
p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
'sudo', 'lvs', '--noheadings', '-o', 'lv_name', self.pool.volume_group,
stdout=subprocess.PIPE
))
volumes, _ = self.loop.run_until_complete(p.communicate())
for volume in volumes.decode().splitlines():
volume = volume.strip()
if not volume.startswith('vm-' + qubes.tests.VMPREFIX):
continue
p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
'sudo', 'lvremove', '-f', '/'.join([self.pool.volume_group, volume])
))
self.loop.run_until_complete(p.wait())
def tearDown(self):
''' Remove the default lvm pool if it was created only for this test '''
self.cleanup_test_volumes()
if self.created_pool:
self.loop.run_until_complete(self.app.remove_pool(self.pool.name))
super(ThinPoolBase, self).tearDown()
@ -597,11 +613,17 @@ class TC_00_ThinPool(ThinPoolBase):
}
vm = qubes.tests.storage.TestVM(self)
volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
# mock logging, to not interfere with time.time() mock
volume.log = unittest.mock.Mock()
self.loop.run_until_complete(volume.create())
self.loop.run_until_complete(volume.start())
self.loop.run_until_complete(volume.stop())
with unittest.mock.patch('time.time') as mock_time:
mock_time.side_effect = [521065906]
self.loop.run_until_complete(volume.stop())
self.loop.run_until_complete(volume.start())
self.loop.run_until_complete(volume.stop())
with unittest.mock.patch('time.time') as mock_time:
mock_time.side_effect = [521065907]
self.loop.run_until_complete(volume.stop())
self.assertEqual(len(volume.revisions), 2)
revisions = volume.revisions
revision_id = max(revisions.keys())
@ -632,11 +654,17 @@ class TC_00_ThinPool(ThinPoolBase):
}
vm = qubes.tests.storage.TestVM(self)
volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
# mock logging, to not interfere with time.time() mock
volume.log = unittest.mock.Mock()
self.loop.run_until_complete(volume.create())
self.loop.run_until_complete(volume.start())
self.loop.run_until_complete(volume.stop())
with unittest.mock.patch('time.time') as mock_time:
mock_time.side_effect = [521065906]
self.loop.run_until_complete(volume.stop())
self.loop.run_until_complete(volume.start())
self.loop.run_until_complete(volume.stop())
with unittest.mock.patch('time.time') as mock_time:
mock_time.side_effect = [521065907]
self.loop.run_until_complete(volume.stop())
self.assertEqual(len(volume.revisions), 2)
revisions = volume.revisions
revision_id = min(revisions.keys())
@ -815,6 +843,37 @@ class TC_00_ThinPool(ThinPoolBase):
self.loop.run_until_complete(volume.remove())
def test_034_import_data_empty(self):
config = {
'name': 'root',
'pool': self.pool.name,
'save_on_stop': True,
'rw': True,
'size': 1024 * 1024,
}
vm = qubes.tests.storage.TestVM(self)
volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
with unittest.mock.patch('time.time') as mock_time:
mock_time.side_effect = [1521065905]
self.loop.run_until_complete(volume.create())
p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
'sudo', 'dd', 'if=/dev/urandom', 'of=' + volume.path, 'count=1', 'bs=1M'
))
self.loop.run_until_complete(p.wait())
import_path = self.loop.run_until_complete(volume.import_data())
self.assertNotEqual(volume.path, import_path)
p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
'sudo', 'touch', import_path))
self.loop.run_until_complete(p.wait())
self.loop.run_until_complete(volume.import_data_end(True))
self.assertFalse(os.path.exists(import_path), import_path)
p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
'sudo', 'cat', volume.path,
stdout=subprocess.PIPE
))
volume_data, _ = self.loop.run_until_complete(p.communicate())
self.assertEqual(volume_data.strip(b'\0'), b'')
def test_040_volatile(self):
'''Volatile volume test'''
config = {

View File

@ -28,8 +28,16 @@ import subprocess
import sys
import qubes.tests
import qubes.tests.storage
from qubes.storage import reflink
class TestApp(qubes.Qubes):
''' A Mock App object '''
def __init__(self, *args, **kwargs): # pylint: disable=unused-argument
super(TestApp, self).__init__('/tmp/qubes-test.xml', load=False,
offline_mode=True, **kwargs)
self.load_initial_values()
class ReflinkMixin:
def setUp(self, fs_type='btrfs'): # pylint: disable=arguments-differ
@ -84,6 +92,53 @@ class ReflinkMixin:
for dev in (dev_from_real, dev_from_sym):
self.assertEqual(get_blockdev_size(dev), size_resized)
class TC_10_ReflinkPool(qubes.tests.QubesTestCase):
def setUp(self):
super().setUp()
self.test_dir = '/var/tmp/test-reflink-units-on-btrfs'
pool_conf = {
'driver': 'file-reflink',
'dir_path': self.test_dir,
'name': 'test-btrfs'
}
mkdir_fs(self.test_dir, 'btrfs', cleanup_via=self.addCleanup)
self.app = TestApp()
self.pool = self.loop.run_until_complete(self.app.add_pool(**pool_conf))
self.app.default_pool = self.app.get_pool(pool_conf['name'])
def tearDown(self) -> None:
self.app.default_pool = 'varlibqubes'
self.loop.run_until_complete(self.app.remove_pool(self.pool.name))
del self.pool
self.app.close()
del self.app
super(TC_10_ReflinkPool, self).tearDown()
def test_012_import_data_empty(self):
config = {
'name': 'root',
'pool': self.pool.name,
'save_on_stop': True,
'rw': True,
'size': 1024 * 1024,
}
vm = qubes.tests.storage.TestVM(self)
volume = self.pool.init_volume(vm, config)
self.loop.run_until_complete(volume.create())
with open(volume.export(), 'w') as vol_file:
vol_file.write('test data')
import_path = self.loop.run_until_complete(volume.import_data())
self.assertNotEqual(volume.path, import_path)
with open(import_path, 'w+'):
pass
self.loop.run_until_complete(volume.import_data_end(True))
self.assertFalse(os.path.exists(import_path), import_path)
with open(volume.export()) as volume_file:
volume_data = volume_file.read().strip('\0')
self.assertNotEqual(volume_data, 'test data')
class TC_00_ReflinkOnBtrfs(ReflinkMixin, qubes.tests.QubesTestCase):
def setUp(self): # pylint: disable=arguments-differ
super().setUp('btrfs')