block.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # vim: fileencoding=utf-8
  2. #
  3. # The Qubes OS Project, https://www.qubes-os.org/
  4. #
  5. # Copyright (C) 2016
  6. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU General Public License along
  19. # with this program; if not, write to the Free Software Foundation, Inc.,
  20. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  21. #
  22. import os
  23. import qubes.tests
  24. import qubes.qubesutils
  25. import subprocess
  26. # the same class for both dom0 and VMs
  27. class TC_00_List(qubes.tests.SystemTestsMixin, qubes.tests.QubesTestCase):
  28. template = None
  29. def setUp(self):
  30. super(TC_00_List, self).setUp()
  31. self.img_path = '/tmp/test.img'
  32. self.mount_point = '/tmp/test-dir'
  33. if self.template is not None:
  34. self.vm = self.qc.add_new_vm(
  35. "QubesAppVm",
  36. name=self.make_vm_name("vm"),
  37. template=self.qc.get_vm_by_name(self.template))
  38. self.vm.create_on_disk(verbose=False)
  39. self.save_and_reload_db()
  40. self.qc.unlock_db()
  41. self.vm.start()
  42. else:
  43. self.qc.unlock_db()
  44. self.vm = self.qc[0]
  45. def tearDown(self):
  46. super(TC_00_List, self).tearDown()
  47. if self.template is None:
  48. if os.path.exists(self.mount_point):
  49. subprocess.call(['sudo', 'umount', self.mount_point])
  50. subprocess.call(['sudo', 'rmdir', self.mount_point])
  51. subprocess.call(['sudo', 'dmsetup', 'remove', 'test-dm'])
  52. if os.path.exists(self.img_path):
  53. loopdev = subprocess.check_output(['losetup', '-j',
  54. self.img_path])
  55. for dev in loopdev.splitlines():
  56. subprocess.call(
  57. ['sudo', 'losetup', '-d', dev.split(':')[0]])
  58. subprocess.call(['sudo', 'rm', '-f', self.img_path])
  59. def run_script(self, script, user="user"):
  60. if self.template is None:
  61. if user == "user":
  62. subprocess.check_call(script, shell=True)
  63. elif user == "root":
  64. subprocess.check_call(['sudo', 'sh', '-c', script])
  65. else:
  66. retcode = self.vm.run(script, user=user, wait=True)
  67. if retcode != 0:
  68. raise subprocess.CalledProcessError
  69. def test_000_list_loop(self):
  70. if self.template is None:
  71. self.skipTest('loop devices excluded in dom0')
  72. self.run_script(
  73. "set -e;"
  74. "truncate -s 128M {path}; "
  75. "losetup -f {path}; "
  76. "udevadm settle".format(path=self.img_path), user="root")
  77. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  78. found = False
  79. for dev in dev_list.keys():
  80. if dev_list[dev]['desc'] == self.img_path:
  81. self.assertTrue(dev.startswith(self.vm.name + ':loop'))
  82. self.assertEquals(dev_list[dev]['mode'], 'w')
  83. self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
  84. self.assertEquals(
  85. dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
  86. found = True
  87. if not found:
  88. self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
  89. def test_001_list_loop_mounted(self):
  90. if self.template is None:
  91. self.skipTest('loop devices excluded in dom0')
  92. self.run_script(
  93. "set -e;"
  94. "truncate -s 128M {path}; "
  95. "mkfs.ext4 -q -F {path}; "
  96. "mkdir -p {mntdir}; "
  97. "mount {path} {mntdir} -o loop; "
  98. "udevadm settle".format(
  99. path=self.img_path,
  100. mntdir=self.mount_point),
  101. user="root")
  102. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  103. for dev in dev_list.keys():
  104. if dev_list[dev]['desc'] == self.img_path:
  105. self.fail(
  106. 'Device {} ({}) should not be listed because is mounted'
  107. .format(dev, self.img_path))
  108. def test_010_list_dm(self):
  109. self.run_script(
  110. "set -e;"
  111. "truncate -s 128M {path}; "
  112. "loopdev=`losetup -f`; "
  113. "losetup $loopdev {path}; "
  114. "dmsetup create test-dm --table \"0 262144 linear $(cat "
  115. "/sys/block/$(basename $loopdev)/dev) 0\";"
  116. "udevadm settle".format(path=self.img_path), user="root")
  117. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  118. found = False
  119. for dev in dev_list.keys():
  120. if dev.startswith(self.vm.name + ':loop'):
  121. self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
  122. "Device {} ({}) should not be listed as it is used in "
  123. "device-mapper".format(dev, self.img_path)
  124. )
  125. elif dev_list[dev]['desc'] == 'test-dm':
  126. self.assertEquals(dev_list[dev]['mode'], 'w')
  127. self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
  128. self.assertEquals(
  129. dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
  130. found = True
  131. if not found:
  132. self.fail("Device {} not found in {!r}".format('test-dm', dev_list))
  133. def test_011_list_dm_mounted(self):
  134. self.run_script(
  135. "set -e;"
  136. "truncate -s 128M {path}; "
  137. "loopdev=`losetup -f`; "
  138. "losetup $loopdev {path}; "
  139. "dmsetup create test-dm --table \"0 262144 linear $(cat "
  140. "/sys/block/$(basename $loopdev)/dev) 0\";"
  141. "mkfs.ext4 -q -F /dev/mapper/test-dm;"
  142. "mkdir -p {mntdir};"
  143. "mount /dev/mapper/test-dm {mntdir};"
  144. "udevadm settle".format(
  145. path=self.img_path,
  146. mntdir=self.mount_point),
  147. user="root")
  148. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  149. for dev in dev_list.keys():
  150. if dev.startswith(self.vm.name + ':loop'):
  151. self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
  152. "Device {} ({}) should not be listed as it is used in "
  153. "device-mapper".format(dev, self.img_path)
  154. )
  155. else:
  156. self.assertNotEquals(dev_list[dev]['desc'], 'test-dm',
  157. "Device {} ({}) should not be listed as it is "
  158. "mounted".format(dev, 'test-dm')
  159. )
  160. def test_012_list_dm_delayed(self):
  161. self.run_script(
  162. "set -e;"
  163. "truncate -s 128M {path}; "
  164. "loopdev=`losetup -f`; "
  165. "losetup $loopdev {path}; "
  166. "udevadm settle; "
  167. "dmsetup create test-dm --table \"0 262144 linear $(cat "
  168. "/sys/block/$(basename $loopdev)/dev) 0\";"
  169. "udevadm settle".format(path=self.img_path), user="root")
  170. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  171. found = False
  172. for dev in dev_list.keys():
  173. if dev.startswith(self.vm.name + ':loop'):
  174. self.assertNotEquals(dev_list[dev]['desc'], self.img_path,
  175. "Device {} ({}) should not be listed as it is used in "
  176. "device-mapper".format(dev, self.img_path)
  177. )
  178. elif dev_list[dev]['desc'] == 'test-dm':
  179. self.assertEquals(dev_list[dev]['mode'], 'w')
  180. self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
  181. self.assertEquals(
  182. dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
  183. found = True
  184. if not found:
  185. self.fail("Device {} not found in {!r}".format('test-dm', dev_list))
  186. def test_013_list_dm_removed(self):
  187. if self.template is None:
  188. self.skipTest('test not supported in dom0 - loop devices excluded '
  189. 'in dom0')
  190. self.run_script(
  191. "set -e;"
  192. "truncate -s 128M {path}; "
  193. "loopdev=`losetup -f`; "
  194. "losetup $loopdev {path}; "
  195. "dmsetup create test-dm --table \"0 262144 linear $(cat "
  196. "/sys/block/$(basename $loopdev)/dev) 0\";"
  197. "udevadm settle;"
  198. "dmsetup remove test-dm;"
  199. "udevadm settle".format(path=self.img_path), user="root")
  200. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  201. found = False
  202. for dev in dev_list.keys():
  203. if dev_list[dev]['desc'] == self.img_path:
  204. self.assertTrue(dev.startswith(self.vm.name + ':loop'))
  205. self.assertEquals(dev_list[dev]['mode'], 'w')
  206. self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
  207. self.assertEquals(
  208. dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
  209. found = True
  210. if not found:
  211. self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
  212. def test_020_list_loop_partition(self):
  213. if self.template is None:
  214. self.skipTest('loop devices excluded in dom0')
  215. self.run_script(
  216. "set -e;"
  217. "truncate -s 128M {path}; "
  218. "echo ,,L | sfdisk {path};"
  219. "loopdev=`losetup -f`; "
  220. "losetup -P $loopdev {path}; "
  221. "blockdev --rereadpt $loopdev; "
  222. "udevadm settle".format(path=self.img_path), user="root")
  223. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  224. found = False
  225. for dev in dev_list.keys():
  226. if dev_list[dev]['desc'] == self.img_path:
  227. self.assertTrue(dev.startswith(self.vm.name + ':loop'))
  228. self.assertEquals(dev_list[dev]['mode'], 'w')
  229. self.assertEquals(dev_list[dev]['size'], 1024 * 1024 * 128)
  230. self.assertEquals(
  231. dev_list[dev]['device'], '/dev/' + dev.split(':')[1])
  232. self.assertIn(dev + 'p1', dev_list)
  233. found = True
  234. if not found:
  235. self.fail("Device {} not found in {!r}".format(self.img_path, dev_list))
  236. def test_021_list_loop_partition_mounted(self):
  237. if self.template is None:
  238. self.skipTest('loop devices excluded in dom0')
  239. self.run_script(
  240. "set -e;"
  241. "truncate -s 128M {path}; "
  242. "echo ,,L | sfdisk {path};"
  243. "loopdev=`losetup -f`; "
  244. "losetup -P $loopdev {path}; "
  245. "blockdev --rereadpt $loopdev; "
  246. "mkfs.ext4 -q -F ${{loopdev}}p1; "
  247. "mkdir -p {mntdir}; "
  248. "mount ${{loopdev}}p1 {mntdir}; "
  249. "udevadm settle".format(
  250. path=self.img_path, mntdir=self.mount_point),
  251. user="root")
  252. dev_list = qubes.qubesutils.block_list_vm(self.vm)
  253. for dev in dev_list.keys():
  254. if dev_list[dev]['desc'] == self.img_path:
  255. self.fail(
  256. 'Device {} ({}) should not be listed because its '
  257. 'partition is mounted'
  258. .format(dev, self.img_path))
  259. elif dev.startswith(self.vm.name + ':loop') and dev.endswith('p1'):
  260. # FIXME: risky assumption that only tests create partitioned
  261. # loop devices
  262. self.fail(
  263. 'Device {} ({}) should not be listed because is mounted'
  264. .format(dev, self.img_path))
  265. def load_tests(loader, tests, pattern):
  266. try:
  267. qc = qubes.qubes.QubesVmCollection()
  268. qc.lock_db_for_reading()
  269. qc.load()
  270. qc.unlock_db()
  271. templates = [vm.name for vm in qc.values() if
  272. isinstance(vm, qubes.qubes.QubesTemplateVm)]
  273. except OSError:
  274. templates = []
  275. for template in templates:
  276. tests.addTests(loader.loadTestsFromTestCase(
  277. type(
  278. 'TC_00_List_' + template,
  279. (TC_00_List, qubes.tests.QubesTestCase),
  280. {'template': template})))
  281. return tests