qvm_template_postprocess.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import asyncio
  21. import os
  22. import subprocess
  23. import tempfile
  24. from unittest import mock
  25. import qubesadmin.tests
  26. import qubesadmin.tools.qvm_template_postprocess
  27. class QubesLocalMock(qubesadmin.tests.QubesTest):
  28. def __init__(self):
  29. super(QubesLocalMock, self).__init__()
  30. self.__class__ = qubesadmin.app.QubesLocal
  31. qubesd_call = qubesadmin.tests.QubesTest.qubesd_call
  32. run_service = qubesadmin.tests.QubesTest.run_service
  33. class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
  34. def setUp(self):
  35. super(TC_00_qvm_template_postprocess, self).setUp()
  36. self.source_dir = tempfile.TemporaryDirectory()
  37. def tearDown(self):
  38. try:
  39. self.source_dir.cleanup()
  40. except FileNotFoundError:
  41. pass
  42. super(TC_00_qvm_template_postprocess, self).tearDown()
  43. def test_000_import_root_img_raw(self):
  44. root_img = os.path.join(self.source_dir.name, 'root.img')
  45. volume_data = b'volume data'
  46. with open(root_img, 'wb') as f:
  47. f.write(volume_data)
  48. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  49. b'0\0test-vm class=TemplateVM state=Halted\n'
  50. self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
  51. None)] = \
  52. b'0\0root\nprivate\nvolatile\nkernel\n'
  53. self.app.expected_calls[
  54. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  55. b'0\x00pool=lvm\n' \
  56. b'vid=qubes_dom0/vm-test-vm-root\n' \
  57. b'size=10737418240\n' \
  58. b'usage=0\n' \
  59. b'rw=True\n' \
  60. b'source=\n' \
  61. b'save_on_stop=True\n' \
  62. b'snap_on_start=False\n' \
  63. b'revisions_to_keep=3\n' \
  64. b'is_outdated=False\n'
  65. self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
  66. str(len(volume_data)).encode())] = \
  67. b'0\0'
  68. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
  69. volume_data)] = b'0\0'
  70. vm = self.app.domains['test-vm']
  71. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  72. vm, self.source_dir.name)
  73. self.assertAllCalled()
  74. def test_001_import_root_img_tar(self):
  75. root_img = os.path.join(self.source_dir.name, 'root.img')
  76. volume_data = b'volume data' * 1000
  77. with open(root_img, 'wb') as f:
  78. f.write(volume_data)
  79. subprocess.check_call(['tar', 'cf', 'root.img.tar', 'root.img'],
  80. cwd=self.source_dir.name)
  81. subprocess.check_call(['split', '-d', '-b', '1024', 'root.img.tar',
  82. 'root.img.part.'], cwd=self.source_dir.name)
  83. os.unlink(root_img)
  84. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  85. b'0\0test-vm class=TemplateVM state=Halted\n'
  86. self.app.expected_calls[
  87. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  88. b'0\x00pool=lvm\n' \
  89. b'vid=qubes_dom0/vm-test-vm-root\n' \
  90. b'size=10737418240\n' \
  91. b'usage=0\n' \
  92. b'rw=True\n' \
  93. b'source=\n' \
  94. b'save_on_stop=True\n' \
  95. b'snap_on_start=False\n' \
  96. b'revisions_to_keep=3\n' \
  97. b'is_outdated=False\n'
  98. self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
  99. None)] = \
  100. b'0\0root\nprivate\nvolatile\nkernel\n'
  101. self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
  102. str(len(volume_data)).encode())] = \
  103. b'0\0'
  104. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
  105. volume_data)] = b'0\0'
  106. vm = self.app.domains['test-vm']
  107. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  108. vm, self.source_dir.name)
  109. self.assertAllCalled()
  110. def test_002_import_root_img_no_overwrite(self):
  111. self.app.qubesd_connection_type = 'socket'
  112. template_dir = os.path.join(self.source_dir.name, 'vm-templates',
  113. 'test-vm')
  114. os.makedirs(template_dir)
  115. root_img = os.path.join(template_dir, 'root.img')
  116. volume_data = b'volume data'
  117. with open(root_img, 'wb') as f:
  118. f.write(volume_data)
  119. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  120. b'0\0test-vm class=TemplateVM state=Halted\n'
  121. self.app.expected_calls[
  122. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  123. b'0\0root\nprivate\nvolatile\nkernel\n'
  124. self.app.expected_calls[
  125. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  126. b'0\x00pool=default\n' \
  127. b'vid=vm-templates/test-vm/root\n' \
  128. b'size=10737418240\n' \
  129. b'usage=0\n' \
  130. b'rw=True\n' \
  131. b'source=\n' \
  132. b'save_on_stop=True\n' \
  133. b'snap_on_start=False\n' \
  134. b'revisions_to_keep=3\n' \
  135. b'is_outdated=False\n'
  136. self.app.expected_calls[
  137. ('dom0', 'admin.pool.List', None, None)] = \
  138. b'0\0default\n'
  139. self.app.expected_calls[
  140. ('dom0', 'admin.pool.Info', 'default', None)] = \
  141. b'0\0driver=file\ndir_path=' + self.source_dir.name.encode() + b'\n'
  142. vm = self.app.domains['test-vm']
  143. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  144. vm, template_dir)
  145. self.assertAllCalled()
  146. def test_005_reset_private_img(self):
  147. self.app.qubesd_connection_type = 'socket'
  148. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  149. b'0\0test-vm class=TemplateVM state=Halted\n'
  150. self.app.expected_calls[
  151. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  152. b'0\0root\nprivate\nvolatile\nkernel\n'
  153. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'private',
  154. b'')] = b'0\0'
  155. vm = self.app.domains['test-vm']
  156. qubesadmin.tools.qvm_template_postprocess.reset_private_img(vm)
  157. self.assertAllCalled()
  158. def test_010_import_appmenus(self):
  159. with open(os.path.join(self.source_dir.name,
  160. 'vm-whitelisted-appmenus.list'), 'w') as f:
  161. f.write('org.gnome.Terminal.desktop\n')
  162. f.write('firefox.desktop\n')
  163. with open(os.path.join(self.source_dir.name,
  164. 'whitelisted-appmenus.list'), 'w') as f:
  165. f.write('org.gnome.Terminal.desktop\n')
  166. f.write('org.gnome.Software.desktop\n')
  167. f.write('gnome-control-center.desktop\n')
  168. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  169. b'0\0test-vm class=TemplateVM state=Halted\n'
  170. vm = self.app.domains['test-vm']
  171. with mock.patch('subprocess.check_call') as mock_proc:
  172. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  173. vm, self.source_dir.name)
  174. self.assertEqual(mock_proc.mock_calls, [
  175. mock.call(['qvm-appmenus',
  176. '--set-default-whitelist=' + os.path.join(self.source_dir.name,
  177. 'vm-whitelisted-appmenus.list'), 'test-vm']),
  178. mock.call(['qvm-appmenus', '--set-whitelist=' + os.path.join(
  179. self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
  180. ])
  181. self.assertAllCalled()
  182. @mock.patch('grp.getgrnam')
  183. @mock.patch('os.getuid')
  184. def test_011_import_appmenus_as_root(self, mock_getuid, mock_getgrnam):
  185. with open(os.path.join(self.source_dir.name,
  186. 'vm-whitelisted-appmenus.list'), 'w') as f:
  187. f.write('org.gnome.Terminal.desktop\n')
  188. f.write('firefox.desktop\n')
  189. with open(os.path.join(self.source_dir.name,
  190. 'whitelisted-appmenus.list'), 'w') as f:
  191. f.write('org.gnome.Terminal.desktop\n')
  192. f.write('org.gnome.Software.desktop\n')
  193. f.write('gnome-control-center.desktop\n')
  194. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  195. b'0\0test-vm class=TemplateVM state=Halted\n'
  196. mock_getuid.return_value = 0
  197. mock_getgrnam.configure_mock(**{
  198. 'return_value.gr_mem.__getitem__.return_value': 'user'
  199. })
  200. vm = self.app.domains['test-vm']
  201. with mock.patch('subprocess.check_call') as mock_proc:
  202. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  203. vm, self.source_dir.name)
  204. self.assertEqual(mock_proc.mock_calls, [
  205. mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
  206. 'qvm-appmenus',
  207. '--set-default-whitelist=' + os.path.join(self.source_dir.name,
  208. 'vm-whitelisted-appmenus.list'), 'test-vm']),
  209. mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
  210. 'qvm-appmenus', '--set-whitelist=' + os.path.join(
  211. self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
  212. ])
  213. self.assertAllCalled()
  214. @mock.patch('grp.getgrnam')
  215. @mock.patch('os.getuid')
  216. def test_012_import_appmenus_missing_user(self, mock_getuid, mock_getgrnam):
  217. with open(os.path.join(self.source_dir.name,
  218. 'vm-whitelisted-appmenus.list'), 'w') as f:
  219. f.write('org.gnome.Terminal.desktop\n')
  220. f.write('firefox.desktop\n')
  221. with open(os.path.join(self.source_dir.name,
  222. 'whitelisted-appmenus.list'), 'w') as f:
  223. f.write('org.gnome.Terminal.desktop\n')
  224. f.write('org.gnome.Software.desktop\n')
  225. f.write('gnome-control-center.desktop\n')
  226. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  227. b'0\0test-vm class=TemplateVM state=Halted\n'
  228. mock_getuid.return_value = 0
  229. mock_getgrnam.side_effect = KeyError
  230. vm = self.app.domains['test-vm']
  231. with mock.patch('subprocess.check_call') as mock_proc:
  232. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  233. vm, self.source_dir.name)
  234. self.assertEqual(mock_proc.mock_calls, [])
  235. self.assertAllCalled()
  236. def add_new_vm_side_effect(self, *args, **kwargs):
  237. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  238. b'0\0test-vm class=TemplateVM state=Halted\n'
  239. self.app.domains.clear_cache()
  240. return self.app.domains['test-vm']
  241. @asyncio.coroutine
  242. def wait_for_shutdown(self, vm):
  243. pass
  244. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  245. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  246. def test_020_post_install(self, mock_import_root_img,
  247. mock_import_appmenus):
  248. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  249. b'0\0'
  250. self.app.add_new_vm = mock.Mock(side_effect=self.add_new_vm_side_effect)
  251. self.app.expected_calls[
  252. ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
  253. self.app.expected_calls[
  254. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  255. = b'0\0'
  256. self.app.expected_calls[
  257. ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
  258. self.app.expected_calls[
  259. ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  260. self.app.expected_calls[
  261. ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
  262. self.app.expected_calls[
  263. ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
  264. if qubesadmin.tools.qvm_template_postprocess.have_events:
  265. patch_domain_shutdown = mock.patch(
  266. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  267. self.addCleanup(patch_domain_shutdown.stop)
  268. mock_domain_shutdown = patch_domain_shutdown.start()
  269. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  270. else:
  271. self.app.expected_calls[
  272. ('test-vm', 'admin.vm.List', None, None)] = \
  273. b'0\0test-vm class=TemplateVM state=Halted\n'
  274. asyncio.set_event_loop(asyncio.new_event_loop())
  275. ret = qubesadmin.tools.qvm_template_postprocess.main([
  276. '--really', 'post-install', 'test-vm', self.source_dir.name],
  277. app=self.app)
  278. self.assertEqual(ret, 0)
  279. self.app.add_new_vm.assert_called_once_with('TemplateVM',
  280. name='test-vm', label='black')
  281. mock_import_root_img.assert_called_once_with(self.app.domains[
  282. 'test-vm'], self.source_dir.name)
  283. mock_import_appmenus.assert_called_once_with(self.app.domains[
  284. 'test-vm'], self.source_dir.name)
  285. if qubesadmin.tools.qvm_template_postprocess.have_events:
  286. mock_domain_shutdown.assert_called_once_with([self.app.domains[
  287. 'test-vm']])
  288. self.assertEqual(self.app.service_calls, [
  289. ('test-vm', 'qubes.PostInstall', {}),
  290. ('test-vm', 'qubes.PostInstall', b''),
  291. ])
  292. self.assertAllCalled()
  293. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  294. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  295. @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img')
  296. def test_021_post_install_reinstall(self, mock_reset_private_img,
  297. mock_import_root_img, mock_import_appmenus):
  298. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  299. b'0\0test-vm class=TemplateVM state=Halted\n'
  300. self.app.add_new_vm = mock.Mock()
  301. self.app.expected_calls[
  302. ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
  303. self.app.expected_calls[
  304. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  305. = b'0\0'
  306. self.app.expected_calls[
  307. ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
  308. self.app.expected_calls[
  309. ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  310. self.app.expected_calls[
  311. ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
  312. self.app.expected_calls[
  313. ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
  314. if qubesadmin.tools.qvm_template_postprocess.have_events:
  315. patch_domain_shutdown = mock.patch(
  316. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  317. self.addCleanup(patch_domain_shutdown.stop)
  318. mock_domain_shutdown = patch_domain_shutdown.start()
  319. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  320. else:
  321. self.app.expected_calls[
  322. ('test-vm', 'admin.vm.List', None, None)] = \
  323. b'0\0test-vm class=TemplateVM state=Halted\n'
  324. asyncio.set_event_loop(asyncio.new_event_loop())
  325. ret = qubesadmin.tools.qvm_template_postprocess.main([
  326. '--really', 'post-install', 'test-vm', self.source_dir.name],
  327. app=self.app)
  328. self.assertEqual(ret, 0)
  329. self.assertFalse(self.app.add_new_vm.called)
  330. mock_import_root_img.assert_called_once_with(self.app.domains[
  331. 'test-vm'], self.source_dir.name)
  332. mock_reset_private_img.assert_called_once_with(self.app.domains[
  333. 'test-vm'])
  334. mock_import_appmenus.assert_called_once_with(self.app.domains[
  335. 'test-vm'], self.source_dir.name)
  336. if qubesadmin.tools.qvm_template_postprocess.have_events:
  337. mock_domain_shutdown.assert_called_once_with([self.app.domains[
  338. 'test-vm']])
  339. self.assertEqual(self.app.service_calls, [
  340. ('test-vm', 'qubes.PostInstall', {}),
  341. ('test-vm', 'qubes.PostInstall', b''),
  342. ])
  343. self.assertAllCalled()
  344. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  345. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  346. @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img')
  347. def test_022_post_install_skip_start(self, mock_reset_private_img,
  348. mock_import_root_img, mock_import_appmenus):
  349. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  350. b'0\0test-vm class=TemplateVM state=Halted\n'
  351. self.app.expected_calls[
  352. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  353. = b'0\0'
  354. self.app.add_new_vm = mock.Mock()
  355. if qubesadmin.tools.qvm_template_postprocess.have_events:
  356. patch_domain_shutdown = mock.patch(
  357. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  358. self.addCleanup(patch_domain_shutdown.stop)
  359. mock_domain_shutdown = patch_domain_shutdown.start()
  360. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  361. asyncio.set_event_loop(asyncio.new_event_loop())
  362. ret = qubesadmin.tools.qvm_template_postprocess.main([
  363. '--really', '--skip-start', 'post-install', 'test-vm',
  364. self.source_dir.name],
  365. app=self.app)
  366. self.assertEqual(ret, 0)
  367. self.assertFalse(self.app.add_new_vm.called)
  368. mock_import_root_img.assert_called_once_with(self.app.domains[
  369. 'test-vm'], self.source_dir.name)
  370. mock_reset_private_img.assert_called_once_with(self.app.domains[
  371. 'test-vm'])
  372. mock_import_appmenus.assert_called_once_with(self.app.domains[
  373. 'test-vm'], self.source_dir.name)
  374. if qubesadmin.tools.qvm_template_postprocess.have_events:
  375. self.assertFalse(mock_domain_shutdown.called)
  376. self.assertEqual(self.app.service_calls, [])
  377. self.assertAllCalled()
  378. def test_030_pre_remove(self):
  379. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  380. b'0\0test-vm class=TemplateVM state=Halted\n'
  381. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  382. b'0\0'
  383. self.app.expected_calls[
  384. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'False')]\
  385. = b'0\0'
  386. self.app.expected_calls[
  387. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  388. b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
  389. b'test-vm\0'
  390. ret = qubesadmin.tools.qvm_template_postprocess.main([
  391. '--really', 'pre-remove', 'test-vm',
  392. self.source_dir.name],
  393. app=self.app)
  394. self.assertEqual(ret, 0)
  395. self.assertEqual(self.app.service_calls, [])
  396. self.assertAllCalled()
  397. def test_031_pre_remove_existing_appvm(self):
  398. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  399. b'0\0test-vm class=TemplateVM state=Halted\n' \
  400. b'test-vm2 class=AppVM state=Halted\n'
  401. self.app.expected_calls[
  402. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  403. b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
  404. b'test-vm\0'
  405. self.app.expected_calls[
  406. ('test-vm2', 'admin.vm.property.Get', 'template', None)] = \
  407. b'0\0default=no type=vm test-vm'
  408. with self.assertRaises(SystemExit):
  409. qubesadmin.tools.qvm_template_postprocess.main([
  410. '--really', 'pre-remove', 'test-vm',
  411. self.source_dir.name],
  412. app=self.app)
  413. self.assertEqual(self.app.service_calls, [])
  414. self.assertAllCalled()
  415. def test_040_missing_really(self):
  416. with self.assertRaises(SystemExit):
  417. qubesadmin.tools.qvm_template_postprocess.main([
  418. 'post-install', 'test-vm', self.source_dir.name],
  419. app=self.app)
  420. self.assertAllCalled()