storage_lvm.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. #
  2. # The Qubes OS Project, http://www.qubes-os.org
  3. #
  4. # Copyright (C) 2016 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
  5. #
  6. # This library is free software; you can redistribute it and/or
  7. # modify it under the terms of the GNU Lesser General Public
  8. # License as published by the Free Software Foundation; either
  9. # version 2.1 of the License, or (at your option) any later version.
  10. #
  11. # This library is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. # Lesser General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU Lesser General Public
  17. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  18. #
  19. ''' Tests for lvm storage driver. By default tests are going to use the
  20. 'qubes_dom0/pool00'. An alternative LVM thin pool may be provided via
  21. :envvar:`DEFAULT_LVM_POOL` shell variable.
  22. Any pool variables prefixed with 'LVM_' or 'lvm_' represent a LVM
  23. 'volume_group/thin_pool' combination. Pool variables without a prefix
  24. represent a :py:class:`qubes.storage.lvm.ThinPool`.
  25. '''
  26. import os
  27. import subprocess
  28. import tempfile
  29. import unittest
  30. import qubes.tests
  31. import qubes.storage
  32. from qubes.storage.lvm import ThinPool, ThinVolume
  33. if 'DEFAULT_LVM_POOL' in os.environ.keys():
  34. DEFAULT_LVM_POOL = os.environ['DEFAULT_LVM_POOL']
  35. else:
  36. DEFAULT_LVM_POOL = 'qubes_dom0/pool00'
  37. def lvm_pool_exists(volume_group, thin_pool):
  38. ''' Returns ``True`` if thin pool exists in the volume group. '''
  39. path = "/dev/mapper/{!s}-{!s}".format(volume_group, thin_pool)
  40. return os.path.exists(path)
  41. def skipUnlessLvmPoolExists(test_item): # pylint: disable=invalid-name
  42. ''' Decorator that skips LVM tests if the default pool is missing. '''
  43. volume_group, thin_pool = DEFAULT_LVM_POOL.split('/', 1)
  44. result = lvm_pool_exists(volume_group, thin_pool)
  45. msg = 'LVM thin pool {!r} does not exist'.format(DEFAULT_LVM_POOL)
  46. return unittest.skipUnless(result, msg)(test_item)
  47. POOL_CONF = {'name': 'test-lvm',
  48. 'driver': 'lvm_thin',
  49. 'volume_group': DEFAULT_LVM_POOL.split('/')[0],
  50. 'thin_pool': DEFAULT_LVM_POOL.split('/')[1]}
  51. class ThinPoolBase(qubes.tests.QubesTestCase):
  52. ''' Sanity tests for :py:class:`qubes.storage.lvm.ThinPool` '''
  53. created_pool = False
  54. def setUp(self):
  55. super(ThinPoolBase, self).setUp()
  56. volume_group, thin_pool = DEFAULT_LVM_POOL.split('/', 1)
  57. self.pool = self._find_pool(volume_group, thin_pool)
  58. if not self.pool:
  59. self.pool = self.app.add_pool(**POOL_CONF)
  60. self.created_pool = True
  61. def tearDown(self):
  62. ''' Remove the default lvm pool if it was created only for this test '''
  63. if self.created_pool:
  64. self.app.remove_pool(self.pool.name)
  65. super(ThinPoolBase, self).tearDown()
  66. def _find_pool(self, volume_group, thin_pool):
  67. ''' Returns the pool matching the specified ``volume_group`` &
  68. ``thin_pool``, or None.
  69. '''
  70. pools = [p for p in self.app.pools
  71. if issubclass(p.__class__, ThinPool)]
  72. for pool in pools:
  73. if pool.volume_group == volume_group \
  74. and pool.thin_pool == thin_pool:
  75. return pool
  76. return None
  77. @skipUnlessLvmPoolExists
  78. class TC_00_ThinPool(ThinPoolBase):
  79. ''' Sanity tests for :py:class:`qubes.storage.lvm.ThinPool` '''
  80. def setUp(self):
  81. xml_path = '/tmp/qubes-test.xml'
  82. self.app = qubes.Qubes.create_empty_store(store=xml_path,
  83. clockvm=None,
  84. updatevm=None,
  85. offline_mode=True,
  86. )
  87. os.environ['QUBES_XML_PATH'] = xml_path
  88. super(TC_00_ThinPool, self).setUp()
  89. def tearDown(self):
  90. super(TC_00_ThinPool, self).tearDown()
  91. os.unlink(self.app.store)
  92. del self.app
  93. for attr in dir(self):
  94. if isinstance(getattr(self, attr), qubes.vm.BaseVM):
  95. delattr(self, attr)
  96. def test_000_default_thin_pool(self):
  97. ''' Check whether :py:data`DEFAULT_LVM_POOL` exists. This pool is
  98. created by default, if at installation time LVM + Thin was chosen.
  99. '''
  100. msg = 'Thin pool {!r} does not exist'.format(DEFAULT_LVM_POOL)
  101. self.assertTrue(self.pool, msg)
  102. def test_001_origin_volume(self):
  103. ''' Test origin volume creation '''
  104. config = {
  105. 'name': 'root',
  106. 'pool': self.pool.name,
  107. 'save_on_stop': True,
  108. 'rw': True,
  109. 'size': qubes.config.defaults['root_img_size'],
  110. }
  111. vm = qubes.tests.storage.TestVM(self)
  112. volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
  113. self.assertIsInstance(volume, ThinVolume)
  114. self.assertEqual(volume.name, 'root')
  115. self.assertEqual(volume.pool, self.pool.name)
  116. self.assertEqual(volume.size, qubes.config.defaults['root_img_size'])
  117. volume.create()
  118. path = "/dev/%s" % volume.vid
  119. self.assertTrue(os.path.exists(path))
  120. volume.remove()
  121. def test_003_read_write_volume(self):
  122. ''' Test read-write volume creation '''
  123. config = {
  124. 'name': 'root',
  125. 'pool': self.pool.name,
  126. 'rw': True,
  127. 'save_on_stop': True,
  128. 'size': qubes.config.defaults['root_img_size'],
  129. }
  130. vm = qubes.tests.storage.TestVM(self)
  131. volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
  132. self.assertIsInstance(volume, ThinVolume)
  133. self.assertEqual(volume.name, 'root')
  134. self.assertEqual(volume.pool, self.pool.name)
  135. self.assertEqual(volume.size, qubes.config.defaults['root_img_size'])
  136. volume.create()
  137. path = "/dev/%s" % volume.vid
  138. self.assertTrue(os.path.exists(path))
  139. volume.remove()
  140. def test_004_size(self):
  141. with self.assertNotRaises(NotImplementedError):
  142. size = self.pool.size
  143. environ = os.environ.copy()
  144. environ['LC_ALL'] = 'C.utf8'
  145. pool_size = subprocess.check_output(['sudo', 'lvs', '--noheadings',
  146. '-o', 'lv_size',
  147. '--units', 'b', self.pool.volume_group + '/' + self.pool.thin_pool],
  148. env=environ)
  149. self.assertEqual(size, int(pool_size.strip()[:-1]))
  150. def test_005_usage(self):
  151. with self.assertNotRaises(NotImplementedError):
  152. usage = self.pool.usage
  153. environ = os.environ.copy()
  154. environ['LC_ALL'] = 'C.utf8'
  155. pool_info = subprocess.check_output(['sudo', 'lvs', '--noheadings',
  156. '-o', 'lv_size,data_percent',
  157. '--units', 'b', self.pool.volume_group + '/' + self.pool.thin_pool],
  158. env=environ)
  159. pool_size, pool_usage = pool_info.strip().split()
  160. pool_size = int(pool_size[:-1])
  161. pool_usage = float(pool_usage)
  162. self.assertEqual(usage, int(pool_size * pool_usage / 100))
  163. def _get_size(self, path):
  164. if os.getuid() != 0:
  165. return int(
  166. subprocess.check_output(
  167. ['sudo', 'blockdev', '--getsize64', path]))
  168. fd = os.open(path, os.O_RDONLY)
  169. try:
  170. return os.lseek(fd, 0, os.SEEK_END)
  171. finally:
  172. os.close(fd)
  173. def test_006_resize(self):
  174. config = {
  175. 'name': 'root',
  176. 'pool': self.pool.name,
  177. 'rw': True,
  178. 'save_on_stop': True,
  179. 'size': 32 * 1024**2,
  180. }
  181. vm = qubes.tests.storage.TestVM(self)
  182. volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
  183. volume.create()
  184. self.addCleanup(volume.remove)
  185. path = "/dev/%s" % volume.vid
  186. new_size = 64 * 1024 ** 2
  187. volume.resize(new_size)
  188. self.assertEqual(self._get_size(path), new_size)
  189. self.assertEqual(volume.size, new_size)
  190. def test_007_resize_running(self):
  191. old_size = 32 * 1024**2
  192. config = {
  193. 'name': 'root',
  194. 'pool': self.pool.name,
  195. 'rw': True,
  196. 'save_on_stop': True,
  197. 'size': old_size,
  198. }
  199. vm = qubes.tests.storage.TestVM(self)
  200. volume = self.app.get_pool(self.pool.name).init_volume(vm, config)
  201. volume.create()
  202. self.addCleanup(volume.remove)
  203. volume.start()
  204. path = "/dev/%s" % volume.vid
  205. path2 = "/dev/%s" % volume._vid_snap
  206. new_size = 64 * 1024 ** 2
  207. volume.resize(new_size)
  208. self.assertEqual(self._get_size(path), old_size)
  209. self.assertEqual(self._get_size(path2), new_size)
  210. self.assertEqual(volume.size, new_size)
  211. volume.stop()
  212. self.assertEqual(self._get_size(path), new_size)
  213. self.assertEqual(volume.size, new_size)
  214. @skipUnlessLvmPoolExists
  215. class TC_01_ThinPool(ThinPoolBase, qubes.tests.SystemTestCase):
  216. ''' Sanity tests for :py:class:`qubes.storage.lvm.ThinPool` '''
  217. def setUp(self):
  218. super(TC_01_ThinPool, self).setUp()
  219. self.init_default_template()
  220. def test_004_import(self):
  221. template_vm = self.app.default_template
  222. name = self.make_vm_name('import')
  223. vm = self.app.add_new_vm(qubes.vm.templatevm.TemplateVM, name=name,
  224. label='red')
  225. vm.clone_properties(template_vm)
  226. vm.clone_disk_files(template_vm, pool='test-lvm')
  227. for v_name, volume in vm.volumes.items():
  228. if volume.save_on_stop:
  229. expected = "/dev/{!s}/vm-{!s}-{!s}".format(
  230. DEFAULT_LVM_POOL.split('/')[0], vm.name, v_name)
  231. self.assertEqual(volume.path, expected)
  232. with self.assertNotRaises(qubes.exc.QubesException):
  233. vm.start()
  234. def test_005_create_appvm(self):
  235. vm = self.app.add_new_vm(cls=qubes.vm.appvm.AppVM,
  236. name=self.make_vm_name('appvm'), label='red')
  237. vm.create_on_disk(pool='test-lvm')
  238. for v_name, volume in vm.volumes.items():
  239. if volume.save_on_stop:
  240. expected = "/dev/{!s}/vm-{!s}-{!s}".format(
  241. DEFAULT_LVM_POOL.split('/')[0], vm.name, v_name)
  242. self.assertEqual(volume.path, expected)
  243. with self.assertNotRaises(qubes.exc.QubesException):
  244. vm.start()
  245. @skipUnlessLvmPoolExists
  246. class TC_02_StorageHelpers(ThinPoolBase):
  247. def setUp(self):
  248. xml_path = '/tmp/qubes-test.xml'
  249. self.app = qubes.Qubes.create_empty_store(store=xml_path,
  250. clockvm=None,
  251. updatevm=None,
  252. offline_mode=True,
  253. )
  254. os.environ['QUBES_XML_PATH'] = xml_path
  255. super(TC_02_StorageHelpers, self).setUp()
  256. # reset cache
  257. qubes.storage.DirectoryThinPool._thin_pool = {}
  258. self.thin_dir = tempfile.TemporaryDirectory()
  259. subprocess.check_call(
  260. ['sudo', 'lvcreate', '-q', '-V', '32M',
  261. '-T', DEFAULT_LVM_POOL, '-n',
  262. 'test-file-pool'], stdout=subprocess.DEVNULL)
  263. self.thin_dev = '/dev/{}/test-file-pool'.format(
  264. DEFAULT_LVM_POOL.split('/')[0])
  265. subprocess.check_call(
  266. ['sudo', 'mkfs.ext4', '-q', self.thin_dev])
  267. subprocess.check_call(['sudo', 'mount', self.thin_dev,
  268. self.thin_dir.name])
  269. subprocess.check_call(['sudo', 'chmod', '777',
  270. self.thin_dir.name])
  271. def tearDown(self):
  272. subprocess.check_call(['sudo', 'umount', self.thin_dir.name])
  273. subprocess.check_call(
  274. ['sudo', 'lvremove', '-q', '-f', self.thin_dev],
  275. stdout = subprocess.DEVNULL)
  276. self.thin_dir.cleanup()
  277. super(TC_02_StorageHelpers, self).tearDown()
  278. os.unlink(self.app.store)
  279. del self.app
  280. for attr in dir(self):
  281. if isinstance(getattr(self, attr), qubes.vm.BaseVM):
  282. delattr(self, attr)
  283. def test_000_search_thin_pool(self):
  284. pool = qubes.storage.search_pool_containing_dir(
  285. self.app.pools.values(), self.thin_dir.name)
  286. self.assertEqual(pool, self.pool)
  287. def test_001_search_none(self):
  288. pool = qubes.storage.search_pool_containing_dir(
  289. self.app.pools.values(), '/tmp')
  290. self.assertIsNone(pool)
  291. def test_002_search_subdir(self):
  292. subdir = os.path.join(self.thin_dir.name, 'some-dir')
  293. os.mkdir(subdir)
  294. pool = qubes.storage.search_pool_containing_dir(
  295. self.app.pools.values(), subdir)
  296. self.assertEqual(pool, self.pool)
  297. def test_003_search_file_pool(self):
  298. subdir = os.path.join(self.thin_dir.name, 'some-dir')
  299. file_pool_config = {
  300. 'name': 'test-file-pool',
  301. 'driver': 'file',
  302. 'dir_path': subdir
  303. }
  304. pool2 = self.app.add_pool(**file_pool_config)
  305. pool = qubes.storage.search_pool_containing_dir(
  306. self.app.pools.values(), subdir)
  307. self.assertEqual(pool, pool2)
  308. pool = qubes.storage.search_pool_containing_dir(
  309. self.app.pools.values(), self.thin_dir.name)
  310. self.assertEqual(pool, self.pool)