qvm_template_postprocess.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import asyncio
  21. import os
  22. import subprocess
  23. import tempfile
  24. from unittest import mock
  25. import qubesadmin.tests
  26. import qubesadmin.tools.qvm_template_postprocess
  27. class QubesLocalMock(qubesadmin.tests.QubesTest):
  28. def __init__(self):
  29. super(QubesLocalMock, self).__init__()
  30. self.__class__ = qubesadmin.app.QubesLocal
  31. qubesd_call = qubesadmin.tests.QubesTest.qubesd_call
  32. run_service = qubesadmin.tests.QubesTest.run_service
  33. class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
  34. def setUp(self):
  35. super(TC_00_qvm_template_postprocess, self).setUp()
  36. self.source_dir = tempfile.TemporaryDirectory()
  37. def tearDown(self):
  38. try:
  39. self.source_dir.cleanup()
  40. except FileNotFoundError:
  41. pass
  42. super(TC_00_qvm_template_postprocess, self).tearDown()
  43. def test_000_import_root_img_raw(self):
  44. root_img = os.path.join(self.source_dir.name, 'root.img')
  45. volume_data = b'volume data'
  46. with open(root_img, 'wb') as f:
  47. f.write(volume_data)
  48. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  49. b'0\0test-vm class=TemplateVM state=Halted\n'
  50. self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
  51. None)] = \
  52. b'0\0root\nprivate\nvolatile\nkernel\n'
  53. self.app.expected_calls[
  54. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  55. b'0\x00pool=lvm\n' \
  56. b'vid=qubes_dom0/vm-test-vm-root\n' \
  57. b'size=10737418240\n' \
  58. b'usage=0\n' \
  59. b'rw=True\n' \
  60. b'source=\n' \
  61. b'save_on_stop=True\n' \
  62. b'snap_on_start=False\n' \
  63. b'revisions_to_keep=3\n' \
  64. b'is_outdated=False\n'
  65. self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
  66. str(len(volume_data)).encode())] = \
  67. b'0\0'
  68. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
  69. volume_data)] = b'0\0'
  70. vm = self.app.domains['test-vm']
  71. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  72. vm, self.source_dir.name)
  73. self.assertAllCalled()
  74. def test_001_import_root_img_tar(self):
  75. root_img = os.path.join(self.source_dir.name, 'root.img')
  76. volume_data = b'volume data' * 1000
  77. with open(root_img, 'wb') as f:
  78. f.write(volume_data)
  79. subprocess.check_call(['tar', 'cf', 'root.img.tar', 'root.img'],
  80. cwd=self.source_dir.name)
  81. subprocess.check_call(['split', '-d', '-b', '1024', 'root.img.tar',
  82. 'root.img.part.'], cwd=self.source_dir.name)
  83. os.unlink(root_img)
  84. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  85. b'0\0test-vm class=TemplateVM state=Halted\n'
  86. self.app.expected_calls[
  87. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  88. b'0\x00pool=lvm\n' \
  89. b'vid=qubes_dom0/vm-test-vm-root\n' \
  90. b'size=10737418240\n' \
  91. b'usage=0\n' \
  92. b'rw=True\n' \
  93. b'source=\n' \
  94. b'save_on_stop=True\n' \
  95. b'snap_on_start=False\n' \
  96. b'revisions_to_keep=3\n' \
  97. b'is_outdated=False\n'
  98. self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
  99. None)] = \
  100. b'0\0root\nprivate\nvolatile\nkernel\n'
  101. self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
  102. str(len(volume_data)).encode())] = \
  103. b'0\0'
  104. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
  105. volume_data)] = b'0\0'
  106. vm = self.app.domains['test-vm']
  107. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  108. vm, self.source_dir.name)
  109. self.assertAllCalled()
  110. def test_002_import_root_img_no_overwrite(self):
  111. self.app.qubesd_connection_type = 'socket'
  112. template_dir = os.path.join(self.source_dir.name, 'vm-templates',
  113. 'test-vm')
  114. os.makedirs(template_dir)
  115. root_img = os.path.join(template_dir, 'root.img')
  116. volume_data = b'volume data'
  117. with open(root_img, 'wb') as f:
  118. f.write(volume_data)
  119. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  120. b'0\0test-vm class=TemplateVM state=Halted\n'
  121. self.app.expected_calls[
  122. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  123. b'0\0root\nprivate\nvolatile\nkernel\n'
  124. self.app.expected_calls[
  125. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  126. b'0\x00pool=default\n' \
  127. b'vid=vm-templates/test-vm/root\n' \
  128. b'size=10737418240\n' \
  129. b'usage=0\n' \
  130. b'rw=True\n' \
  131. b'source=\n' \
  132. b'save_on_stop=True\n' \
  133. b'snap_on_start=False\n' \
  134. b'revisions_to_keep=3\n' \
  135. b'is_outdated=False\n'
  136. self.app.expected_calls[
  137. ('dom0', 'admin.pool.List', None, None)] = \
  138. b'0\0default\n'
  139. self.app.expected_calls[
  140. ('dom0', 'admin.pool.Info', 'default', None)] = \
  141. b'0\0driver=file\ndir_path=' + self.source_dir.name.encode() + b'\n'
  142. vm = self.app.domains['test-vm']
  143. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  144. vm, template_dir)
  145. self.assertAllCalled()
  146. def test_010_import_appmenus(self):
  147. with open(os.path.join(self.source_dir.name,
  148. 'vm-whitelisted-appmenus.list'), 'w') as f:
  149. f.write('org.gnome.Terminal.desktop\n')
  150. f.write('firefox.desktop\n')
  151. with open(os.path.join(self.source_dir.name,
  152. 'whitelisted-appmenus.list'), 'w') as f:
  153. f.write('org.gnome.Terminal.desktop\n')
  154. f.write('org.gnome.Software.desktop\n')
  155. f.write('gnome-control-center.desktop\n')
  156. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  157. b'0\0test-vm class=TemplateVM state=Halted\n'
  158. vm = self.app.domains['test-vm']
  159. with mock.patch('subprocess.check_call') as mock_proc:
  160. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  161. vm, self.source_dir.name)
  162. self.assertEqual(mock_proc.mock_calls, [
  163. mock.call(['qvm-appmenus',
  164. '--set-default-whitelist=' + os.path.join(self.source_dir.name,
  165. 'vm-whitelisted-appmenus.list'), 'test-vm']),
  166. mock.call(['qvm-appmenus', '--set-whitelist=' + os.path.join(
  167. self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
  168. ])
  169. self.assertAllCalled()
  170. @mock.patch('grp.getgrnam')
  171. @mock.patch('os.getuid')
  172. def test_011_import_appmenus_as_root(self, mock_getuid, mock_getgrnam):
  173. with open(os.path.join(self.source_dir.name,
  174. 'vm-whitelisted-appmenus.list'), 'w') as f:
  175. f.write('org.gnome.Terminal.desktop\n')
  176. f.write('firefox.desktop\n')
  177. with open(os.path.join(self.source_dir.name,
  178. 'whitelisted-appmenus.list'), 'w') as f:
  179. f.write('org.gnome.Terminal.desktop\n')
  180. f.write('org.gnome.Software.desktop\n')
  181. f.write('gnome-control-center.desktop\n')
  182. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  183. b'0\0test-vm class=TemplateVM state=Halted\n'
  184. mock_getuid.return_value = 0
  185. mock_getgrnam.configure_mock(**{
  186. 'return_value.gr_mem.__getitem__.return_value': 'user'
  187. })
  188. vm = self.app.domains['test-vm']
  189. with mock.patch('subprocess.check_call') as mock_proc:
  190. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  191. vm, self.source_dir.name)
  192. self.assertEqual(mock_proc.mock_calls, [
  193. mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
  194. 'qvm-appmenus',
  195. '--set-default-whitelist=' + os.path.join(self.source_dir.name,
  196. 'vm-whitelisted-appmenus.list'), 'test-vm']),
  197. mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
  198. 'qvm-appmenus', '--set-whitelist=' + os.path.join(
  199. self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
  200. ])
  201. self.assertAllCalled()
  202. @mock.patch('grp.getgrnam')
  203. @mock.patch('os.getuid')
  204. def test_012_import_appmenus_missing_user(self, mock_getuid, mock_getgrnam):
  205. with open(os.path.join(self.source_dir.name,
  206. 'vm-whitelisted-appmenus.list'), 'w') as f:
  207. f.write('org.gnome.Terminal.desktop\n')
  208. f.write('firefox.desktop\n')
  209. with open(os.path.join(self.source_dir.name,
  210. 'whitelisted-appmenus.list'), 'w') as f:
  211. f.write('org.gnome.Terminal.desktop\n')
  212. f.write('org.gnome.Software.desktop\n')
  213. f.write('gnome-control-center.desktop\n')
  214. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  215. b'0\0test-vm class=TemplateVM state=Halted\n'
  216. mock_getuid.return_value = 0
  217. mock_getgrnam.side_effect = KeyError
  218. vm = self.app.domains['test-vm']
  219. with mock.patch('subprocess.check_call') as mock_proc:
  220. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  221. vm, self.source_dir.name)
  222. self.assertEqual(mock_proc.mock_calls, [])
  223. self.assertAllCalled()
  224. def add_new_vm_side_effect(self, *args, **kwargs):
  225. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  226. b'0\0test-vm class=TemplateVM state=Halted\n'
  227. self.app.domains.clear_cache()
  228. return self.app.domains['test-vm']
  229. @asyncio.coroutine
  230. def wait_for_shutdown(self, vm):
  231. pass
  232. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  233. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  234. def test_020_post_install(self, mock_import_root_img,
  235. mock_import_appmenus):
  236. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  237. b'0\0'
  238. self.app.add_new_vm = mock.Mock(side_effect=self.add_new_vm_side_effect)
  239. self.app.expected_calls[
  240. ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
  241. self.app.expected_calls[
  242. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  243. = b'0\0'
  244. self.app.expected_calls[
  245. ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
  246. self.app.expected_calls[
  247. ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  248. self.app.expected_calls[
  249. ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
  250. self.app.expected_calls[
  251. ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
  252. if qubesadmin.tools.qvm_template_postprocess.have_events:
  253. patch_domain_shutdown = mock.patch(
  254. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  255. self.addCleanup(patch_domain_shutdown.stop)
  256. mock_domain_shutdown = patch_domain_shutdown.start()
  257. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  258. else:
  259. self.app.expected_calls[
  260. ('test-vm', 'admin.vm.List', None, None)] = \
  261. b'0\0test-vm class=TemplateVM state=Halted\n'
  262. asyncio.set_event_loop(asyncio.new_event_loop())
  263. ret = qubesadmin.tools.qvm_template_postprocess.main([
  264. '--really', 'post-install', 'test-vm', self.source_dir.name],
  265. app=self.app)
  266. self.assertEqual(ret, 0)
  267. self.app.add_new_vm.assert_called_once_with('TemplateVM',
  268. name='test-vm', label='black')
  269. mock_import_root_img.assert_called_once_with(self.app.domains[
  270. 'test-vm'], self.source_dir.name)
  271. mock_import_appmenus.assert_called_once_with(self.app.domains[
  272. 'test-vm'], self.source_dir.name)
  273. if qubesadmin.tools.qvm_template_postprocess.have_events:
  274. mock_domain_shutdown.assert_called_once_with([self.app.domains[
  275. 'test-vm']])
  276. self.assertEqual(self.app.service_calls, [
  277. ('test-vm', 'qubes.PostInstall', {}),
  278. ('test-vm', 'qubes.PostInstall', b''),
  279. ])
  280. self.assertAllCalled()
  281. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  282. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  283. def test_021_post_install_reinstall(self, mock_import_root_img,
  284. mock_import_appmenus):
  285. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  286. b'0\0test-vm class=TemplateVM state=Halted\n'
  287. self.app.add_new_vm = mock.Mock()
  288. self.app.expected_calls[
  289. ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
  290. self.app.expected_calls[
  291. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  292. = b'0\0'
  293. self.app.expected_calls[
  294. ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
  295. self.app.expected_calls[
  296. ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  297. self.app.expected_calls[
  298. ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
  299. self.app.expected_calls[
  300. ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
  301. if qubesadmin.tools.qvm_template_postprocess.have_events:
  302. patch_domain_shutdown = mock.patch(
  303. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  304. self.addCleanup(patch_domain_shutdown.stop)
  305. mock_domain_shutdown = patch_domain_shutdown.start()
  306. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  307. else:
  308. self.app.expected_calls[
  309. ('test-vm', 'admin.vm.List', None, None)] = \
  310. b'0\0test-vm class=TemplateVM state=Halted\n'
  311. asyncio.set_event_loop(asyncio.new_event_loop())
  312. ret = qubesadmin.tools.qvm_template_postprocess.main([
  313. '--really', 'post-install', 'test-vm', self.source_dir.name],
  314. app=self.app)
  315. self.assertEqual(ret, 0)
  316. self.assertFalse(self.app.add_new_vm.called)
  317. mock_import_root_img.assert_called_once_with(self.app.domains[
  318. 'test-vm'], self.source_dir.name)
  319. mock_import_appmenus.assert_called_once_with(self.app.domains[
  320. 'test-vm'], self.source_dir.name)
  321. if qubesadmin.tools.qvm_template_postprocess.have_events:
  322. mock_domain_shutdown.assert_called_once_with([self.app.domains[
  323. 'test-vm']])
  324. self.assertEqual(self.app.service_calls, [
  325. ('test-vm', 'qubes.PostInstall', {}),
  326. ('test-vm', 'qubes.PostInstall', b''),
  327. ])
  328. self.assertAllCalled()
  329. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  330. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  331. def test_022_post_install_skip_start(self, mock_import_root_img,
  332. mock_import_appmenus):
  333. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  334. b'0\0test-vm class=TemplateVM state=Halted\n'
  335. self.app.expected_calls[
  336. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  337. = b'0\0'
  338. self.app.add_new_vm = mock.Mock()
  339. if qubesadmin.tools.qvm_template_postprocess.have_events:
  340. patch_domain_shutdown = mock.patch(
  341. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  342. self.addCleanup(patch_domain_shutdown.stop)
  343. mock_domain_shutdown = patch_domain_shutdown.start()
  344. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  345. asyncio.set_event_loop(asyncio.new_event_loop())
  346. ret = qubesadmin.tools.qvm_template_postprocess.main([
  347. '--really', '--skip-start', 'post-install', 'test-vm',
  348. self.source_dir.name],
  349. app=self.app)
  350. self.assertEqual(ret, 0)
  351. self.assertFalse(self.app.add_new_vm.called)
  352. mock_import_root_img.assert_called_once_with(self.app.domains[
  353. 'test-vm'], self.source_dir.name)
  354. mock_import_appmenus.assert_called_once_with(self.app.domains[
  355. 'test-vm'], self.source_dir.name)
  356. if qubesadmin.tools.qvm_template_postprocess.have_events:
  357. self.assertFalse(mock_domain_shutdown.called)
  358. self.assertEqual(self.app.service_calls, [])
  359. self.assertAllCalled()
  360. def test_030_pre_remove(self):
  361. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  362. b'0\0test-vm class=TemplateVM state=Halted\n'
  363. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  364. b'0\0'
  365. self.app.expected_calls[
  366. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'False')]\
  367. = b'0\0'
  368. self.app.expected_calls[
  369. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  370. b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
  371. b'test-vm\0'
  372. ret = qubesadmin.tools.qvm_template_postprocess.main([
  373. '--really', 'pre-remove', 'test-vm',
  374. self.source_dir.name],
  375. app=self.app)
  376. self.assertEqual(ret, 0)
  377. self.assertEqual(self.app.service_calls, [])
  378. self.assertAllCalled()
  379. def test_031_pre_remove_existing_appvm(self):
  380. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  381. b'0\0test-vm class=TemplateVM state=Halted\n' \
  382. b'test-vm2 class=AppVM state=Halted\n'
  383. self.app.expected_calls[
  384. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  385. b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
  386. b'test-vm\0'
  387. self.app.expected_calls[
  388. ('test-vm2', 'admin.vm.property.Get', 'template', None)] = \
  389. b'0\0default=no type=vm test-vm'
  390. with self.assertRaises(SystemExit):
  391. qubesadmin.tools.qvm_template_postprocess.main([
  392. '--really', 'pre-remove', 'test-vm',
  393. self.source_dir.name],
  394. app=self.app)
  395. self.assertEqual(self.app.service_calls, [])
  396. self.assertAllCalled()
  397. def test_040_missing_really(self):
  398. with self.assertRaises(SystemExit):
  399. qubesadmin.tools.qvm_template_postprocess.main([
  400. 'post-install', 'test-vm', self.source_dir.name],
  401. app=self.app)
  402. self.assertAllCalled()