parent
d181c0f354
commit
51adb434d1
@ -25,7 +25,10 @@ import shutil
|
|||||||
import asyncio
|
import asyncio
|
||||||
import unittest.mock
|
import unittest.mock
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
|
||||||
import qubes.storage
|
import qubes.storage
|
||||||
|
import qubes.utils
|
||||||
import qubes.tests.storage
|
import qubes.tests.storage
|
||||||
from qubes.config import defaults
|
from qubes.config import defaults
|
||||||
|
|
||||||
@ -383,6 +386,73 @@ class TC_01_FileVolumes(qubes.tests.QubesTestCase):
|
|||||||
volume_data = volume_file.read().strip('\0')
|
volume_data = volume_file.read().strip('\0')
|
||||||
self.assertNotEqual(volume_data, 'test data')
|
self.assertNotEqual(volume_data, 'test data')
|
||||||
|
|
||||||
|
def test_023_resize(self):
|
||||||
|
config = {
|
||||||
|
'name': 'root',
|
||||||
|
'pool': self.POOL_NAME,
|
||||||
|
'rw': True,
|
||||||
|
'save_on_stop': True,
|
||||||
|
'size': 32 * 1024**2,
|
||||||
|
}
|
||||||
|
vm = qubes.tests.storage.TestVM(self)
|
||||||
|
volume = self.app.get_pool(self.POOL_NAME).init_volume(vm, config)
|
||||||
|
qubes.utils.void_coros_maybe([volume.create()])
|
||||||
|
new_size = 64 * 1024 ** 2
|
||||||
|
qubes.utils.void_coros_maybe([volume.resize(new_size)])
|
||||||
|
self.assertEqual(os.path.getsize(volume.path), new_size)
|
||||||
|
self.assertEqual(volume.size, new_size)
|
||||||
|
|
||||||
|
def _get_loop_size(self, path):
|
||||||
|
sudo = [] if os.getuid() == 0 else ['sudo']
|
||||||
|
try:
|
||||||
|
loop_name = subprocess.check_output(
|
||||||
|
sudo + ['losetup', '--associated', path]).decode().split(':')[0]
|
||||||
|
if os.getuid() != 0:
|
||||||
|
return int(
|
||||||
|
subprocess.check_output(
|
||||||
|
['sudo', 'blockdev', '--getsize64', loop_name]))
|
||||||
|
fd = os.open(loop_name, os.O_RDONLY)
|
||||||
|
try:
|
||||||
|
return os.lseek(fd, 0, os.SEEK_END)
|
||||||
|
finally:
|
||||||
|
os.close(fd)
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _setup_loop(self, path):
|
||||||
|
sudo = [] if os.getuid() == 0 else ['sudo']
|
||||||
|
loop_name = subprocess.check_output(
|
||||||
|
sudo + ['losetup', '--show', '--find', path]).decode().strip()
|
||||||
|
self.addCleanup(subprocess.call, sudo + ['losetup', '-d', loop_name])
|
||||||
|
|
||||||
|
def test_007_resize_running(self):
|
||||||
|
old_size = 32 * 1024**2
|
||||||
|
config = {
|
||||||
|
'name': 'root',
|
||||||
|
'pool': self.POOL_NAME,
|
||||||
|
'rw': True,
|
||||||
|
'save_on_stop': True,
|
||||||
|
'size': old_size,
|
||||||
|
}
|
||||||
|
vm = qubes.tests.storage.TestVM(self)
|
||||||
|
volume = self.app.get_pool(self.POOL_NAME).init_volume(vm, config)
|
||||||
|
qubes.utils.void_coros_maybe([volume.create()])
|
||||||
|
qubes.utils.void_coros_maybe([volume.start()])
|
||||||
|
self._setup_loop(volume.path)
|
||||||
|
new_size = 64 * 1024 ** 2
|
||||||
|
orig_check_call = subprocess.check_call
|
||||||
|
with unittest.mock.patch('subprocess.check_call') as mock_subprocess:
|
||||||
|
sudo = [] if os.getuid() == 0 else ['sudo']
|
||||||
|
mock_subprocess.side_effect = (lambda *args, **kwargs:
|
||||||
|
orig_check_call(sudo + args[0], *args[1:], **kwargs))
|
||||||
|
qubes.utils.void_coros_maybe([volume.resize(new_size)])
|
||||||
|
self.assertEqual(os.path.getsize(volume.path), new_size)
|
||||||
|
self.assertEqual(self._get_loop_size(volume.path), new_size)
|
||||||
|
self.assertEqual(volume.size, new_size)
|
||||||
|
qubes.utils.void_coros_maybe([volume.stop()])
|
||||||
|
self.assertEqual(os.path.getsize(volume.path), new_size)
|
||||||
|
self.assertEqual(volume.size, new_size)
|
||||||
|
|
||||||
def assertVolumePath(self, vm, dev_name, expected, rw=True):
|
def assertVolumePath(self, vm, dev_name, expected, rw=True):
|
||||||
# :pylint: disable=invalid-name
|
# :pylint: disable=invalid-name
|
||||||
volumes = vm.volumes
|
volumes = vm.volumes
|
||||||
|
Loading…
Reference in New Issue
Block a user