qvm_template_postprocess.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2.1 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import argparse
  21. import asyncio
  22. import os
  23. import subprocess
  24. import tempfile
  25. from unittest import mock
  26. import qubesadmin.tests
  27. import qubesadmin.tools.qvm_template_postprocess
  28. class QubesLocalMock(qubesadmin.tests.QubesTest):
  29. def __init__(self):
  30. super(QubesLocalMock, self).__init__()
  31. self.__class__ = qubesadmin.app.QubesLocal
  32. qubesd_call = qubesadmin.tests.QubesTest.qubesd_call
  33. run_service = qubesadmin.tests.QubesTest.run_service
  34. class TC_00_qvm_template_postprocess(qubesadmin.tests.QubesTestCase):
  35. def setUp(self):
  36. super(TC_00_qvm_template_postprocess, self).setUp()
  37. self.source_dir = tempfile.TemporaryDirectory()
  38. def tearDown(self):
  39. try:
  40. self.source_dir.cleanup()
  41. except FileNotFoundError:
  42. pass
  43. super(TC_00_qvm_template_postprocess, self).tearDown()
  44. def test_000_import_root_img_raw(self):
  45. root_img = os.path.join(self.source_dir.name, 'root.img')
  46. volume_data = b'volume data'
  47. with open(root_img, 'wb') as f:
  48. f.write(volume_data)
  49. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  50. b'0\0test-vm class=TemplateVM state=Halted\n'
  51. self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
  52. None)] = \
  53. b'0\0root\nprivate\nvolatile\nkernel\n'
  54. self.app.expected_calls[
  55. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  56. b'0\x00pool=lvm\n' \
  57. b'vid=qubes_dom0/vm-test-vm-root\n' \
  58. b'size=10737418240\n' \
  59. b'usage=0\n' \
  60. b'rw=True\n' \
  61. b'source=\n' \
  62. b'save_on_stop=True\n' \
  63. b'snap_on_start=False\n' \
  64. b'revisions_to_keep=3\n' \
  65. b'is_outdated=False\n'
  66. self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
  67. str(len(volume_data)).encode())] = \
  68. b'0\0'
  69. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
  70. volume_data)] = b'0\0'
  71. vm = self.app.domains['test-vm']
  72. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  73. vm, self.source_dir.name)
  74. self.assertAllCalled()
  75. def test_001_import_root_img_tar(self):
  76. root_img = os.path.join(self.source_dir.name, 'root.img')
  77. volume_data = b'volume data' * 1000
  78. with open(root_img, 'wb') as f:
  79. f.write(volume_data)
  80. subprocess.check_call(['tar', 'cf', 'root.img.tar', 'root.img'],
  81. cwd=self.source_dir.name)
  82. subprocess.check_call(['split', '-d', '-b', '1024', 'root.img.tar',
  83. 'root.img.part.'], cwd=self.source_dir.name)
  84. os.unlink(root_img)
  85. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  86. b'0\0test-vm class=TemplateVM state=Halted\n'
  87. self.app.expected_calls[
  88. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  89. b'0\x00pool=lvm\n' \
  90. b'vid=qubes_dom0/vm-test-vm-root\n' \
  91. b'size=10737418240\n' \
  92. b'usage=0\n' \
  93. b'rw=True\n' \
  94. b'source=\n' \
  95. b'save_on_stop=True\n' \
  96. b'snap_on_start=False\n' \
  97. b'revisions_to_keep=3\n' \
  98. b'is_outdated=False\n'
  99. self.app.expected_calls[('test-vm', 'admin.vm.volume.List', None,
  100. None)] = \
  101. b'0\0root\nprivate\nvolatile\nkernel\n'
  102. self.app.expected_calls[('test-vm', 'admin.vm.volume.Resize', 'root',
  103. str(len(volume_data)).encode())] = \
  104. b'0\0'
  105. self.app.expected_calls[('test-vm', 'admin.vm.volume.Import', 'root',
  106. volume_data)] = b'0\0'
  107. vm = self.app.domains['test-vm']
  108. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  109. vm, self.source_dir.name)
  110. self.assertAllCalled()
  111. def test_002_import_root_img_no_overwrite(self):
  112. self.app.qubesd_connection_type = 'socket'
  113. template_dir = os.path.join(self.source_dir.name, 'vm-templates',
  114. 'test-vm')
  115. os.makedirs(template_dir)
  116. root_img = os.path.join(template_dir, 'root.img')
  117. volume_data = b'volume data'
  118. with open(root_img, 'wb') as f:
  119. f.write(volume_data)
  120. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  121. b'0\0test-vm class=TemplateVM state=Halted\n'
  122. self.app.expected_calls[
  123. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  124. b'0\0root\nprivate\nvolatile\nkernel\n'
  125. self.app.expected_calls[
  126. ('test-vm', 'admin.vm.volume.Info', 'root', None)] = \
  127. b'0\x00pool=default\n' \
  128. b'vid=vm-templates/test-vm/root\n' \
  129. b'size=10737418240\n' \
  130. b'usage=0\n' \
  131. b'rw=True\n' \
  132. b'source=\n' \
  133. b'save_on_stop=True\n' \
  134. b'snap_on_start=False\n' \
  135. b'revisions_to_keep=3\n' \
  136. b'is_outdated=False\n'
  137. self.app.expected_calls[
  138. ('dom0', 'admin.pool.List', None, None)] = \
  139. b'0\0default\n'
  140. self.app.expected_calls[
  141. ('dom0', 'admin.pool.Info', 'default', None)] = \
  142. b'0\0driver=file\ndir_path=' + self.source_dir.name.encode() + b'\n'
  143. vm = self.app.domains['test-vm']
  144. qubesadmin.tools.qvm_template_postprocess.import_root_img(
  145. vm, template_dir)
  146. self.assertAllCalled()
  147. def test_005_reset_private_img(self):
  148. self.app.qubesd_connection_type = 'socket'
  149. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  150. b'0\0test-vm class=TemplateVM state=Halted\n'
  151. self.app.expected_calls[
  152. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  153. b'0\0root\nprivate\nvolatile\nkernel\n'
  154. self.app.expected_calls[('test-vm', 'admin.vm.volume.Clear', 'private',
  155. None)] = b'0\0'
  156. vm = self.app.domains['test-vm']
  157. qubesadmin.tools.qvm_template_postprocess.reset_private_img(vm)
  158. self.assertAllCalled()
  159. def test_010_import_appmenus(self):
  160. default_menu_items = [
  161. 'org.gnome.Terminal.desktop',
  162. 'firefox.desktop']
  163. menu_items = [
  164. 'org.gnome.Terminal.desktop',
  165. 'org.gnome.Software.desktop',
  166. 'gnome-control-center.desktop']
  167. netvm_menu_items = [
  168. 'org.gnome.Terminal.desktop',
  169. 'nm-connection-editor.desktop']
  170. with open(os.path.join(self.source_dir.name,
  171. 'vm-whitelisted-appmenus.list'), 'w') as f:
  172. for entry in default_menu_items:
  173. f.write(entry + '\n')
  174. with open(os.path.join(self.source_dir.name,
  175. 'whitelisted-appmenus.list'), 'w') as f:
  176. for entry in menu_items:
  177. f.write(entry + '\n')
  178. with open(os.path.join(self.source_dir.name,
  179. 'netvm-whitelisted-appmenus.list'), 'w') as f:
  180. for entry in netvm_menu_items:
  181. f.write(entry + '\n')
  182. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  183. b'0\0test-vm class=TemplateVM state=Halted\n'
  184. self.app.expected_calls[(
  185. 'test-vm',
  186. 'admin.vm.feature.Set',
  187. 'default-menu-items',
  188. ' '.join(default_menu_items).encode())] = b'0\0'
  189. self.app.expected_calls[(
  190. 'test-vm',
  191. 'admin.vm.feature.Set',
  192. 'menu-items',
  193. ' '.join(menu_items).encode())] = b'0\0'
  194. self.app.expected_calls[(
  195. 'test-vm',
  196. 'admin.vm.feature.Set',
  197. 'netvm-menu-items',
  198. ' '.join(netvm_menu_items).encode())] = b'0\0'
  199. vm = self.app.domains['test-vm']
  200. with mock.patch('subprocess.check_call') as mock_proc:
  201. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  202. vm, self.source_dir.name, skip_generate=False)
  203. self.assertEqual(mock_proc.mock_calls, [
  204. mock.call(['qvm-appmenus',
  205. '--set-default-whitelist=' + os.path.join(self.source_dir.name,
  206. 'vm-whitelisted-appmenus.list'), 'test-vm']),
  207. mock.call(['qvm-appmenus', '--set-whitelist=' + os.path.join(
  208. self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
  209. ])
  210. self.assertAllCalled()
  211. @mock.patch('grp.getgrnam')
  212. @mock.patch('os.getuid')
  213. def test_011_import_appmenus_as_root(self, mock_getuid, mock_getgrnam):
  214. default_menu_items = [
  215. 'org.gnome.Terminal.desktop',
  216. 'firefox.desktop']
  217. menu_items = [
  218. 'org.gnome.Terminal.desktop',
  219. 'org.gnome.Software.desktop',
  220. 'gnome-control-center.desktop']
  221. netvm_menu_items = [
  222. 'org.gnome.Terminal.desktop',
  223. 'nm-connection-editor.desktop']
  224. with open(os.path.join(self.source_dir.name,
  225. 'vm-whitelisted-appmenus.list'), 'w') as f:
  226. for entry in default_menu_items:
  227. f.write(entry + '\n')
  228. with open(os.path.join(self.source_dir.name,
  229. 'whitelisted-appmenus.list'), 'w') as f:
  230. for entry in menu_items:
  231. f.write(entry + '\n')
  232. with open(os.path.join(self.source_dir.name,
  233. 'netvm-whitelisted-appmenus.list'), 'w') as f:
  234. for entry in netvm_menu_items:
  235. f.write(entry + '\n')
  236. self.app.expected_calls[(
  237. 'test-vm',
  238. 'admin.vm.feature.Set',
  239. 'default-menu-items',
  240. ' '.join(default_menu_items).encode())] = b'0\0'
  241. self.app.expected_calls[(
  242. 'test-vm',
  243. 'admin.vm.feature.Set',
  244. 'menu-items',
  245. ' '.join(menu_items).encode())] = b'0\0'
  246. self.app.expected_calls[(
  247. 'test-vm',
  248. 'admin.vm.feature.Set',
  249. 'netvm-menu-items',
  250. ' '.join(netvm_menu_items).encode())] = b'0\0'
  251. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  252. b'0\0test-vm class=TemplateVM state=Halted\n'
  253. mock_getuid.return_value = 0
  254. mock_getgrnam.configure_mock(**{
  255. 'return_value.gr_mem.__getitem__.return_value': 'user'
  256. })
  257. vm = self.app.domains['test-vm']
  258. with mock.patch('subprocess.check_call') as mock_proc:
  259. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  260. vm, self.source_dir.name, skip_generate=False)
  261. self.assertEqual(mock_proc.mock_calls, [
  262. mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
  263. 'qvm-appmenus',
  264. '--set-default-whitelist=' + os.path.join(self.source_dir.name,
  265. 'vm-whitelisted-appmenus.list'), 'test-vm']),
  266. mock.call(['runuser', '-u', 'user', '--', 'env', 'DISPLAY=:0',
  267. 'qvm-appmenus', '--set-whitelist=' + os.path.join(
  268. self.source_dir.name, 'whitelisted-appmenus.list'), 'test-vm']),
  269. ])
  270. self.assertAllCalled()
  271. @mock.patch('grp.getgrnam')
  272. @mock.patch('os.getuid')
  273. def test_012_import_appmenus_missing_user(self, mock_getuid, mock_getgrnam):
  274. with open(os.path.join(self.source_dir.name,
  275. 'vm-whitelisted-appmenus.list'), 'w') as f:
  276. f.write('org.gnome.Terminal.desktop\n')
  277. f.write('firefox.desktop\n')
  278. with open(os.path.join(self.source_dir.name,
  279. 'whitelisted-appmenus.list'), 'w') as f:
  280. f.write('org.gnome.Terminal.desktop\n')
  281. f.write('org.gnome.Software.desktop\n')
  282. f.write('gnome-control-center.desktop\n')
  283. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  284. b'0\0test-vm class=TemplateVM state=Halted\n'
  285. mock_getuid.return_value = 0
  286. mock_getgrnam.side_effect = KeyError
  287. vm = self.app.domains['test-vm']
  288. with mock.patch('subprocess.check_call') as mock_proc:
  289. qubesadmin.tools.qvm_template_postprocess.import_appmenus(
  290. vm, self.source_dir.name, skip_generate=False)
  291. self.assertEqual(mock_proc.mock_calls, [])
  292. self.assertAllCalled()
  293. def add_new_vm_side_effect(self, *args, **kwargs):
  294. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  295. b'0\0test-vm class=TemplateVM state=Halted\n'
  296. self.app.domains.clear_cache()
  297. return self.app.domains['test-vm']
  298. @asyncio.coroutine
  299. def wait_for_shutdown(self, vm):
  300. pass
  301. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  302. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  303. def test_020_post_install(self, mock_import_root_img,
  304. mock_import_appmenus):
  305. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  306. b'0\0'
  307. self.app.add_new_vm = mock.Mock(side_effect=self.add_new_vm_side_effect)
  308. self.app.expected_calls[
  309. ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
  310. self.app.expected_calls[
  311. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  312. = b'0\0'
  313. self.app.expected_calls[
  314. ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
  315. self.app.expected_calls[
  316. ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  317. self.app.expected_calls[
  318. ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
  319. self.app.expected_calls[
  320. ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
  321. if qubesadmin.tools.qvm_template_postprocess.have_events:
  322. patch_domain_shutdown = mock.patch(
  323. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  324. self.addCleanup(patch_domain_shutdown.stop)
  325. mock_domain_shutdown = patch_domain_shutdown.start()
  326. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  327. else:
  328. self.app.expected_calls[
  329. ('test-vm', 'admin.vm.List', None, None)] = \
  330. b'0\0test-vm class=TemplateVM state=Halted\n'
  331. asyncio.set_event_loop(asyncio.new_event_loop())
  332. ret = qubesadmin.tools.qvm_template_postprocess.main([
  333. '--really', 'post-install', 'test-vm', self.source_dir.name],
  334. app=self.app)
  335. self.assertEqual(ret, 0)
  336. self.app.add_new_vm.assert_called_once_with('TemplateVM',
  337. name='test-vm', label='black', pool=None)
  338. mock_import_root_img.assert_called_once_with(self.app.domains[
  339. 'test-vm'], self.source_dir.name)
  340. mock_import_appmenus.assert_called_once_with(self.app.domains[
  341. 'test-vm'], self.source_dir.name, skip_generate=True)
  342. if qubesadmin.tools.qvm_template_postprocess.have_events:
  343. mock_domain_shutdown.assert_called_once_with([self.app.domains[
  344. 'test-vm']])
  345. self.assertEqual(self.app.service_calls, [
  346. ('test-vm', 'qubes.PostInstall', {}),
  347. ('test-vm', 'qubes.PostInstall', b''),
  348. ])
  349. self.assertAllCalled()
  350. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  351. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  352. @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img')
  353. def test_021_post_install_reinstall(self, mock_reset_private_img,
  354. mock_import_root_img, mock_import_appmenus):
  355. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  356. b'0\0test-vm class=TemplateVM state=Halted\n'
  357. self.app.add_new_vm = mock.Mock()
  358. self.app.expected_calls[
  359. ('test-vm', 'admin.vm.property.Set', 'netvm', b'')] = b'0\0'
  360. self.app.expected_calls[
  361. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  362. = b'0\0'
  363. self.app.expected_calls[
  364. ('test-vm', 'admin.vm.property.Reset', 'netvm', None)] = b'0\0'
  365. self.app.expected_calls[
  366. ('test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  367. self.app.expected_calls[
  368. ('test-vm', 'admin.vm.Start', None, None)] = b'0\0'
  369. self.app.expected_calls[
  370. ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0'
  371. if qubesadmin.tools.qvm_template_postprocess.have_events:
  372. patch_domain_shutdown = mock.patch(
  373. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  374. self.addCleanup(patch_domain_shutdown.stop)
  375. mock_domain_shutdown = patch_domain_shutdown.start()
  376. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  377. else:
  378. self.app.expected_calls[
  379. ('test-vm', 'admin.vm.List', None, None)] = \
  380. b'0\0test-vm class=TemplateVM state=Halted\n'
  381. asyncio.set_event_loop(asyncio.new_event_loop())
  382. ret = qubesadmin.tools.qvm_template_postprocess.main([
  383. '--really', 'post-install', 'test-vm', self.source_dir.name],
  384. app=self.app)
  385. self.assertEqual(ret, 0)
  386. self.assertFalse(self.app.add_new_vm.called)
  387. mock_import_root_img.assert_called_once_with(self.app.domains[
  388. 'test-vm'], self.source_dir.name)
  389. mock_reset_private_img.assert_called_once_with(self.app.domains[
  390. 'test-vm'])
  391. mock_import_appmenus.assert_called_once_with(self.app.domains[
  392. 'test-vm'], self.source_dir.name, skip_generate=True)
  393. if qubesadmin.tools.qvm_template_postprocess.have_events:
  394. mock_domain_shutdown.assert_called_once_with([self.app.domains[
  395. 'test-vm']])
  396. self.assertEqual(self.app.service_calls, [
  397. ('test-vm', 'qubes.PostInstall', {}),
  398. ('test-vm', 'qubes.PostInstall', b''),
  399. ])
  400. self.assertAllCalled()
  401. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_appmenus')
  402. @mock.patch('qubesadmin.tools.qvm_template_postprocess.import_root_img')
  403. @mock.patch('qubesadmin.tools.qvm_template_postprocess.reset_private_img')
  404. def test_022_post_install_skip_start(self, mock_reset_private_img,
  405. mock_import_root_img, mock_import_appmenus):
  406. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  407. b'0\0test-vm class=TemplateVM state=Halted\n'
  408. self.app.expected_calls[
  409. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'True')] \
  410. = b'0\0'
  411. self.app.add_new_vm = mock.Mock()
  412. if qubesadmin.tools.qvm_template_postprocess.have_events:
  413. patch_domain_shutdown = mock.patch(
  414. 'qubesadmin.events.utils.wait_for_domain_shutdown')
  415. self.addCleanup(patch_domain_shutdown.stop)
  416. mock_domain_shutdown = patch_domain_shutdown.start()
  417. mock_domain_shutdown.side_effect = self.wait_for_shutdown
  418. asyncio.set_event_loop(asyncio.new_event_loop())
  419. ret = qubesadmin.tools.qvm_template_postprocess.main([
  420. '--really', '--skip-start', 'post-install', 'test-vm',
  421. self.source_dir.name],
  422. app=self.app)
  423. self.assertEqual(ret, 0)
  424. self.assertFalse(self.app.add_new_vm.called)
  425. mock_import_root_img.assert_called_once_with(self.app.domains[
  426. 'test-vm'], self.source_dir.name)
  427. mock_reset_private_img.assert_called_once_with(self.app.domains[
  428. 'test-vm'])
  429. mock_import_appmenus.assert_called_once_with(self.app.domains[
  430. 'test-vm'], self.source_dir.name, skip_generate=False)
  431. if qubesadmin.tools.qvm_template_postprocess.have_events:
  432. self.assertFalse(mock_domain_shutdown.called)
  433. self.assertEqual(self.app.service_calls, [])
  434. self.assertAllCalled()
  435. def test_030_pre_remove(self):
  436. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  437. b'0\0test-vm class=TemplateVM state=Halted\n'
  438. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  439. b'0\0'
  440. self.app.expected_calls[
  441. ('test-vm', 'admin.vm.property.Set', 'installed_by_rpm', b'False')]\
  442. = b'0\0'
  443. self.app.expected_calls[
  444. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  445. b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
  446. b'test-vm\0'
  447. ret = qubesadmin.tools.qvm_template_postprocess.main([
  448. '--really', 'pre-remove', 'test-vm',
  449. self.source_dir.name],
  450. app=self.app)
  451. self.assertEqual(ret, 0)
  452. self.assertEqual(self.app.service_calls, [])
  453. self.assertAllCalled()
  454. def test_031_pre_remove_existing_appvm(self):
  455. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  456. b'0\0test-vm class=TemplateVM state=Halted\n' \
  457. b'test-vm2 class=AppVM state=Halted\n'
  458. self.app.expected_calls[
  459. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  460. b'2\0QubesNoSuchPropertyError\0\0invalid property \'template\' of ' \
  461. b'test-vm\0'
  462. self.app.expected_calls[
  463. ('test-vm2', 'admin.vm.property.Get', 'template', None)] = \
  464. b'0\0default=no type=vm test-vm'
  465. with self.assertRaises(SystemExit):
  466. qubesadmin.tools.qvm_template_postprocess.main([
  467. '--really', 'pre-remove', 'test-vm',
  468. self.source_dir.name],
  469. app=self.app)
  470. self.assertEqual(self.app.service_calls, [])
  471. self.assertAllCalled()
  472. def test_040_missing_really(self):
  473. with self.assertRaises(SystemExit):
  474. qubesadmin.tools.qvm_template_postprocess.main([
  475. 'post-install', 'test-vm', self.source_dir.name],
  476. app=self.app)
  477. self.assertAllCalled()
  478. def test_050_template_config(self):
  479. template_config = """gui=1
  480. qrexec=1
  481. linux-stubdom=1
  482. net.fake-ip=192.168.1.100
  483. net.fake-netmask=255.255.255.0
  484. net.fake-gateway=192.168.1.1
  485. virt-mode=hvm
  486. kernel=
  487. """
  488. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  489. with open(template_conf, 'w') as f:
  490. f.write(template_config)
  491. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  492. b'0\0test-vm class=TemplateVM state=Halted\n'
  493. self.app.expected_calls[(
  494. 'test-vm', 'admin.vm.feature.Set', 'gui', b'1')] = b'0\0'
  495. self.app.expected_calls[(
  496. 'test-vm', 'admin.vm.feature.Set', 'qrexec', b'1')] = b'0\0'
  497. self.app.expected_calls[(
  498. 'test-vm', 'admin.vm.feature.Set', 'linux-stubdom', b'1')] = b'0\0'
  499. self.app.expected_calls[(
  500. 'test-vm', 'admin.vm.feature.Set', 'net.fake-ip', b'192.168.1.100')] = b'0\0'
  501. self.app.expected_calls[(
  502. 'test-vm', 'admin.vm.feature.Set', 'net.fake-netmask', b'255.255.255.0')] = b'0\0'
  503. self.app.expected_calls[(
  504. 'test-vm', 'admin.vm.feature.Set', 'net.fake-gateway', b'192.168.1.1')] = b'0\0'
  505. self.app.expected_calls[(
  506. 'test-vm', 'admin.vm.property.Set', 'virt_mode', b'hvm')] = b'0\0'
  507. self.app.expected_calls[(
  508. 'test-vm', 'admin.vm.property.Set', 'kernel', b'')] = b'0\0'
  509. vm = self.app.domains['test-vm']
  510. args = argparse.Namespace(
  511. allow_pv=False,
  512. )
  513. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  514. args, template_conf, vm)
  515. self.assertAllCalled()
  516. def test_051_template_config_invalid(self):
  517. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  518. b'0\0test-vm class=TemplateVM state=Halted\n'
  519. vm = self.app.domains['test-vm']
  520. args = argparse.Namespace(
  521. allow_pv=False,
  522. )
  523. with self.subTest('invalid feature value'):
  524. template_config = "gui=false\n"
  525. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  526. with open(template_conf, 'w') as f:
  527. f.write(template_config)
  528. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  529. args, template_conf, vm)
  530. with self.subTest('invalid feature name'):
  531. template_config = "invalid=1\n"
  532. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  533. with open(template_conf, 'w') as f:
  534. f.write(template_config)
  535. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  536. args, template_conf, vm)
  537. with self.subTest('invalid ip'):
  538. template_config = "net.fake-ip=1.2.3.4.5\n"
  539. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  540. with open(template_conf, 'w') as f:
  541. f.write(template_config)
  542. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  543. args, template_conf, vm)
  544. with self.subTest('invalid virt-mode'):
  545. template_config = "virt-mode=invalid\n"
  546. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  547. with open(template_conf, 'w') as f:
  548. f.write(template_config)
  549. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  550. args, template_conf, vm)
  551. with self.subTest('invalid kernel'):
  552. template_config = "kernel=1.2.3.4.5\n"
  553. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  554. with open(template_conf, 'w') as f:
  555. f.write(template_config)
  556. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  557. args, template_conf, vm)
  558. self.assertAllCalled()
  559. def test_052_template_config_virt_mode_pv(self):
  560. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  561. b'0\0test-vm class=TemplateVM state=Halted\n'
  562. vm = self.app.domains['test-vm']
  563. args = argparse.Namespace(
  564. allow_pv=False,
  565. )
  566. with self.subTest('not allowed'):
  567. template_config = "virt-mode=pv\n"
  568. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  569. with open(template_conf, 'w') as f:
  570. f.write(template_config)
  571. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  572. args, template_conf, vm)
  573. with self.subTest('allowed'):
  574. args = argparse.Namespace(
  575. allow_pv=True,
  576. )
  577. self.app.expected_calls[(
  578. 'test-vm', 'admin.vm.property.Set', 'virt_mode', b'pv')] = b'0\0'
  579. template_config = "virt-mode=pv\n"
  580. template_conf = os.path.join(self.source_dir.name, 'template.conf')
  581. with open(template_conf, 'w') as f:
  582. f.write(template_config)
  583. qubesadmin.tools.qvm_template_postprocess.import_template_config(
  584. args, template_conf, vm)
  585. self.assertAllCalled()