app.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import shutil
  22. import socket
  23. import subprocess
  24. import unittest
  25. import multiprocessing
  26. try:
  27. import unittest.mock as mock
  28. from unittest.mock import call
  29. except ImportError:
  30. import mock
  31. from mock import call
  32. import tempfile
  33. import qubesadmin.exc
  34. import qubesadmin.tests
  35. class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
  36. def test_000_list(self):
  37. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  38. b'0\x00test-vm class=AppVM state=Running\n'
  39. self.assertEqual(
  40. list(self.app.domains.keys()),
  41. ['test-vm'])
  42. self.assertAllCalled()
  43. def test_001_getitem(self):
  44. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  45. b'0\x00test-vm class=AppVM state=Running\n'
  46. try:
  47. vm = self.app.domains['test-vm']
  48. self.assertEqual(vm.name, 'test-vm')
  49. except KeyError:
  50. self.fail('VM not found in collection')
  51. self.assertAllCalled()
  52. with self.assertRaises(KeyError):
  53. vm = self.app.domains['test-non-existent']
  54. self.assertAllCalled()
  55. def test_002_in(self):
  56. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  57. b'0\x00test-vm class=AppVM state=Running\n'
  58. self.assertIn('test-vm', self.app.domains)
  59. self.assertAllCalled()
  60. self.assertNotIn('test-non-existent', self.app.domains)
  61. self.assertAllCalled()
  62. def test_003_iter(self):
  63. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  64. b'0\x00test-vm class=AppVM state=Running\n'
  65. self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
  66. self.assertAllCalled()
  67. def test_004_delitem(self):
  68. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  69. b'0\x00'
  70. del self.app.domains['test-vm']
  71. self.assertAllCalled()
  72. def test_005_keys(self):
  73. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  74. b'0\x00test-vm class=AppVM state=Running\n' \
  75. b'test-vm2 class=AppVM state=Running\n'
  76. self.assertEqual(set(self.app.domains.keys()),
  77. set(['test-vm', 'test-vm2']))
  78. self.assertAllCalled()
  79. def test_006_values(self):
  80. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  81. b'0\x00test-vm class=AppVM state=Running\n' \
  82. b'test-vm2 class=AppVM state=Running\n'
  83. values = self.app.domains.values()
  84. for obj in values:
  85. self.assertIsInstance(obj, qubesadmin.vm.QubesVM)
  86. self.assertEqual(set([vm.name for vm in values]),
  87. set(['test-vm', 'test-vm2']))
  88. self.assertAllCalled()
  89. def test_007_getitem_blind_mode(self):
  90. self.app.blind_mode = True
  91. try:
  92. vm = self.app.domains['test-vm']
  93. self.assertEqual(vm.name, 'test-vm')
  94. except KeyError:
  95. self.fail('VM not found in collection')
  96. self.assertAllCalled()
  97. with self.assertNotRaises(KeyError):
  98. vm = self.app.domains['test-non-existent']
  99. self.assertAllCalled()
  100. def test_008_in_blind_mode(self):
  101. self.app.blind_mode = True
  102. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  103. b'0\x00test-vm class=AppVM state=Running\n'
  104. self.assertIn('test-vm', self.app.domains)
  105. self.assertAllCalled()
  106. self.assertNotIn('test-non-existent', self.app.domains)
  107. self.assertAllCalled()
  108. def test_009_getitem_cache_class(self):
  109. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  110. b'0\x00test-vm class=AppVM state=Running\n'
  111. try:
  112. vm = self.app.domains['test-vm']
  113. self.assertEqual(vm.name, 'test-vm')
  114. self.assertEqual(vm.klass, 'AppVM')
  115. except KeyError:
  116. self.fail('VM not found in collection')
  117. self.assertAllCalled()
  118. class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
  119. def setUp(self):
  120. super(TC_10_QubesBase, self).setUp()
  121. self.check_output_patch = mock.patch(
  122. 'subprocess.check_output')
  123. self.check_output_mock = self.check_output_patch.start()
  124. def tearDown(self):
  125. self.check_output_patch.stop()
  126. super(TC_10_QubesBase, self).tearDown()
  127. def test_010_new_simple(self):
  128. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
  129. b'name=new-vm label=red')] = b'0\x00'
  130. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  131. b'0\x00new-vm class=AppVM state=Running\n'
  132. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
  133. self.assertEqual(vm.name, 'new-vm')
  134. self.assertEqual(vm.klass, 'AppVM')
  135. self.assertAllCalled()
  136. def test_011_new_template(self):
  137. self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
  138. b'name=new-template label=red')] = b'0\x00'
  139. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  140. b'0\x00new-template class=TemplateVM state=Running\n'
  141. vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
  142. self.assertEqual(vm.name, 'new-template')
  143. self.assertEqual(vm.klass, 'TemplateVM')
  144. self.assertAllCalled()
  145. def test_012_new_template_based(self):
  146. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  147. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  148. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  149. b'0\x00new-vm class=AppVM state=Running\n'
  150. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
  151. self.assertEqual(vm.name, 'new-vm')
  152. self.assertEqual(vm.klass, 'AppVM')
  153. self.assertAllCalled()
  154. def test_013_new_objects_params(self):
  155. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  156. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  157. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  158. b'0\x00red\nblue\n'
  159. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  160. b'0\x00new-vm class=AppVM state=Running\n' \
  161. b'some-template class=TemplateVM state=Running\n'
  162. vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
  163. self.app.get_label('red'), self.app.domains['some-template'])
  164. self.assertEqual(vm.name, 'new-vm')
  165. self.assertEqual(vm.klass, 'AppVM')
  166. self.assertAllCalled()
  167. def test_014_new_pool(self):
  168. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  169. b'name=new-vm label=red pool=some-pool')] = b'0\x00'
  170. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  171. b'0\x00new-vm class=AppVM state=Running\n'
  172. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
  173. self.assertEqual(vm.name, 'new-vm')
  174. self.assertEqual(vm.klass, 'AppVM')
  175. self.assertAllCalled()
  176. def test_015_new_pools(self):
  177. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  178. b'name=new-vm label=red pool:private=some-pool '
  179. b'pool:volatile=other-pool')] = b'0\x00'
  180. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  181. b'0\x00new-vm class=AppVM state=Running\n'
  182. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  183. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  184. self.assertEqual(vm.name, 'new-vm')
  185. self.assertEqual(vm.klass, 'AppVM')
  186. self.assertAllCalled()
  187. def test_016_new_template_based_default(self):
  188. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  189. None, b'name=new-vm label=red')] = b'0\x00'
  190. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  191. b'0\x00new-vm class=AppVM state=Running\n'
  192. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  193. template=qubesadmin.DEFAULT)
  194. self.assertEqual(vm.name, 'new-vm')
  195. self.assertEqual(vm.klass, 'AppVM')
  196. self.assertAllCalled()
  197. def test_020_get_label(self):
  198. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  199. b'0\x00red\nblue\n'
  200. label = self.app.get_label('red')
  201. self.assertEqual(label.name, 'red')
  202. self.assertAllCalled()
  203. def clone_setup_common_calls(self, src, dst):
  204. # have each property type with default=no, each special-cased,
  205. # and some with default=yes
  206. properties = {
  207. 'label': 'default=False type=label red',
  208. 'template': 'default=False type=vm test-template',
  209. 'memory': 'default=False type=int 400',
  210. 'kernel': 'default=False type=str 4.9.31',
  211. 'netvm': 'default=False type=vm test-net',
  212. 'virt_mode': 'default=False type=str hvm',
  213. 'default_user': 'default=True type=str user',
  214. }
  215. self.app.expected_calls[
  216. (src, 'admin.vm.property.List', None, None)] = \
  217. b'0\0qid\nname\n' + \
  218. b'\n'.join(prop.encode() for prop in properties.keys()) + \
  219. b'\n'
  220. for prop, value in properties.items():
  221. self.app.expected_calls[
  222. (src, 'admin.vm.property.Get', prop, None)] = \
  223. b'0\0' + value.encode()
  224. # special cases handled by admin.vm.Create call
  225. if prop in ('label', 'template'):
  226. continue
  227. # default properties should not be set
  228. if 'default=True' in value:
  229. continue
  230. self.app.expected_calls[
  231. (dst, 'admin.vm.property.Set', prop,
  232. value.split()[-1].encode())] = b'0\0'
  233. # tags
  234. self.app.expected_calls[
  235. (src, 'admin.vm.tag.List', None, None)] = \
  236. b'0\0tag1\ntag2\n'
  237. self.app.expected_calls[
  238. (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
  239. self.app.expected_calls[
  240. (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
  241. # features
  242. self.app.expected_calls[
  243. (src, 'admin.vm.feature.List', None, None)] = \
  244. b'0\0feat1\nfeat2\n'
  245. self.app.expected_calls[
  246. (src, 'admin.vm.feature.Get', 'feat1', None)] = \
  247. b'0\0feat1-value with spaces'
  248. self.app.expected_calls[
  249. (src, 'admin.vm.feature.Get', 'feat2', None)] = \
  250. b'0\x001'
  251. self.app.expected_calls[
  252. (dst, 'admin.vm.feature.Set', 'feat1',
  253. b'feat1-value with spaces')] = b'0\0'
  254. self.app.expected_calls[
  255. (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0'
  256. # firewall
  257. rules = (
  258. b'action=drop dst4=192.168.0.0/24\n'
  259. b'action=accept\n'
  260. )
  261. self.app.expected_calls[
  262. (src, 'admin.vm.firewall.Get', None, None)] = \
  263. b'0\x00' + rules
  264. self.app.expected_calls[
  265. (dst, 'admin.vm.firewall.Set', None, rules)] = \
  266. b'0\x00'
  267. # storage
  268. for vm in (src, dst):
  269. self.app.expected_calls[
  270. (vm, 'admin.vm.volume.Info', 'root', None)] = \
  271. b'0\x00pool=lvm\n' \
  272. b'vid=vm-' + vm.encode() + b'/root\n' \
  273. b'size=10737418240\n' \
  274. b'usage=2147483648\n' \
  275. b'rw=False\n' \
  276. b'internal=True\n' \
  277. b'source=vm-test-template/root\n' \
  278. b'save_on_stop=False\n' \
  279. b'snap_on_start=True\n'
  280. self.app.expected_calls[
  281. (vm, 'admin.vm.volume.Info', 'private', None)] = \
  282. b'0\x00pool=lvm\n' \
  283. b'vid=vm-' + vm.encode() + b'/private\n' \
  284. b'size=2147483648\n' \
  285. b'usage=214748364\n' \
  286. b'rw=True\n' \
  287. b'internal=True\n' \
  288. b'save_on_stop=True\n' \
  289. b'snap_on_start=False\n'
  290. self.app.expected_calls[
  291. (vm, 'admin.vm.volume.Info', 'volatile', None)] = \
  292. b'0\x00pool=lvm\n' \
  293. b'vid=vm-' + vm.encode() + b'/volatile\n' \
  294. b'size=10737418240\n' \
  295. b'usage=0\n' \
  296. b'rw=True\n' \
  297. b'internal=True\n' \
  298. b'source=None\n' \
  299. b'save_on_stop=False\n' \
  300. b'snap_on_start=False\n'
  301. self.app.expected_calls[
  302. (vm, 'admin.vm.volume.Info', 'kernel', None)] = \
  303. b'0\x00pool=linux-kernel\n' \
  304. b'vid=\n' \
  305. b'size=0\n' \
  306. b'usage=0\n' \
  307. b'rw=False\n' \
  308. b'internal=True\n' \
  309. b'source=None\n' \
  310. b'save_on_stop=False\n' \
  311. b'snap_on_start=False\n'
  312. self.app.expected_calls[
  313. (vm, 'admin.vm.volume.List', None, None)] = \
  314. b'0\x00root\nprivate\nvolatile\nkernel\n'
  315. self.app.expected_calls[
  316. (src, 'admin.vm.volume.CloneFrom', 'private', None)] = \
  317. b'0\x00token-private'
  318. self.app.expected_calls[
  319. (dst, 'admin.vm.volume.CloneTo', 'private', b'token-private')] = \
  320. b'0\x00'
  321. self.app.expected_calls[
  322. ('dom0', 'admin.property.Get', 'default_pool_private', None)] = \
  323. b'0\0default=True type=str lvm'
  324. def test_030_clone(self):
  325. self.clone_setup_common_calls('test-vm', 'new-name')
  326. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  327. b'0\x00new-name class=AppVM state=Halted\n' \
  328. b'test-vm class=AppVM state=Halted\n' \
  329. b'test-template class=TemplateVM state=Halted\n' \
  330. b'test-net class=AppVM state=Halted\n'
  331. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  332. 'test-template', b'name=new-name label=red')] = b'0\x00'
  333. new_vm = self.app.clone_vm('test-vm', 'new-name')
  334. self.assertEqual(new_vm.name, 'new-name')
  335. self.check_output_mock.assert_called_once_with(
  336. ['qvm-appmenus', '--init', '--update',
  337. '--source', 'test-vm', 'new-name'],
  338. stderr=subprocess.STDOUT
  339. )
  340. self.assertAllCalled()
  341. def test_031_clone_object(self):
  342. self.clone_setup_common_calls('test-vm', 'new-name')
  343. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  344. b'0\x00new-name class=AppVM state=Halted\n' \
  345. b'test-vm class=AppVM state=Halted\n' \
  346. b'test-template class=TemplateVM state=Halted\n' \
  347. b'test-net class=AppVM state=Halted\n'
  348. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  349. 'test-template', b'name=new-name label=red')] = b'0\x00'
  350. new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
  351. self.assertEqual(new_vm.name, 'new-name')
  352. self.assertAllCalled()
  353. def test_032_clone_pool(self):
  354. self.clone_setup_common_calls('test-vm', 'new-name')
  355. for volume in ('root', 'private', 'volatile', 'kernel'):
  356. del self.app.expected_calls[
  357. 'test-vm', 'admin.vm.volume.Info', volume, None]
  358. del self.app.expected_calls[
  359. 'dom0', 'admin.property.Get', 'default_pool_private', None]
  360. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  361. 'test-template',
  362. b'name=new-name label=red pool=some-pool')] = b'0\x00'
  363. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  364. b'0\x00new-name class=AppVM state=Halted\n' \
  365. b'test-vm class=AppVM state=Halted\n' \
  366. b'test-template class=TemplateVM state=Halted\n' \
  367. b'test-net class=AppVM state=Halted\n'
  368. new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
  369. self.assertEqual(new_vm.name, 'new-name')
  370. self.assertAllCalled()
  371. def test_033_clone_pools(self):
  372. self.clone_setup_common_calls('test-vm', 'new-name')
  373. for volume in ('root', 'private', 'volatile', 'kernel'):
  374. del self.app.expected_calls[
  375. 'test-vm', 'admin.vm.volume.Info', volume, None]
  376. del self.app.expected_calls[
  377. 'dom0', 'admin.property.Get', 'default_pool_private', None]
  378. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  379. 'test-template',
  380. b'name=new-name label=red pool:private=some-pool '
  381. b'pool:volatile=other-pool')] = b'0\x00'
  382. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  383. b'0\x00new-name class=AppVM state=Halted\n' \
  384. b'test-vm class=AppVM state=Halted\n' \
  385. b'test-template class=TemplateVM state=Halted\n' \
  386. b'test-net class=AppVM state=Halted\n'
  387. new_vm = self.app.clone_vm('test-vm', 'new-name',
  388. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  389. self.assertEqual(new_vm.name, 'new-name')
  390. self.assertAllCalled()
  391. def test_034_clone_class_change(self):
  392. self.clone_setup_common_calls('test-vm', 'new-name')
  393. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  394. b'0\x00new-name class=StandaloneVM state=Halted\n' \
  395. b'test-vm class=AppVM state=Halted\n' \
  396. b'test-template class=TemplateVM state=Halted\n' \
  397. b'test-net class=AppVM state=Halted\n'
  398. self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM',
  399. 'test-template', b'name=new-name label=red')] = b'0\x00'
  400. self.app.expected_calls[
  401. ('new-name', 'admin.vm.volume.Info', 'root', None)] = \
  402. b'0\x00pool=lvm\n' \
  403. b'vid=vm-new-name/root\n' \
  404. b'size=10737418240\n' \
  405. b'usage=2147483648\n' \
  406. b'rw=True\n' \
  407. b'internal=True\n' \
  408. b'source=None\n' \
  409. b'save_on_stop=True\n' \
  410. b'snap_on_start=False\n'
  411. self.app.expected_calls[
  412. ('test-vm', 'admin.vm.volume.CloneFrom', 'root', None)] = \
  413. b'0\x00token-root'
  414. self.app.expected_calls[
  415. ('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \
  416. b'0\x00'
  417. new_vm = self.app.clone_vm('test-vm', 'new-name',
  418. new_cls='StandaloneVM')
  419. self.assertEqual(new_vm.name, 'new-name')
  420. self.assertEqual(new_vm.klass, 'StandaloneVM')
  421. self.assertAllCalled()
  422. def test_035_clone_fail(self):
  423. self.app.expected_calls[
  424. ('test-vm', 'admin.vm.property.List', None, None)] = \
  425. b'0\0qid\nname\ntemplate\nlabel\nmemory\n'
  426. # simplify it a little, shouldn't get this far anyway
  427. self.app.expected_calls[
  428. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  429. b'0\x00'
  430. self.app.expected_calls[
  431. ('test-vm', 'admin.vm.property.Get', 'label', None)] = \
  432. b'0\0default=False type=label red'
  433. self.app.expected_calls[
  434. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  435. b'0\0default=False type=vm test-template'
  436. self.app.expected_calls[
  437. ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \
  438. b'0\0default=False type=int 400'
  439. self.app.expected_calls[
  440. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  441. b'2\0QubesException\0\0something happened\0'
  442. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  443. b'0\x00new-name class=AppVM state=Halted\n' \
  444. b'test-vm class=AppVM state=Halted\n' \
  445. b'test-template class=TemplateVM state=Halted\n' \
  446. b'test-net class=AppVM state=Halted\n'
  447. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  448. 'test-template', b'name=new-name label=red')] = b'0\x00'
  449. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  450. b'0\x00'
  451. with self.assertRaises(qubesadmin.exc.QubesException):
  452. self.app.clone_vm('test-vm', 'new-name')
  453. self.assertAllCalled()
  454. def test_036_clone_ignore_errors_prop(self):
  455. self.clone_setup_common_calls('test-vm', 'new-name')
  456. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  457. b'0\x00new-name class=AppVM state=Halted\n' \
  458. b'test-vm class=AppVM state=Halted\n' \
  459. b'test-template class=TemplateVM state=Halted\n' \
  460. b'test-net class=AppVM state=Halted\n'
  461. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  462. 'test-template', b'name=new-name label=red')] = b'0\x00'
  463. self.app.expected_calls[
  464. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  465. b'2\0QubesException\0\0something happened\0'
  466. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  467. self.assertEqual(new_vm.name, 'new-name')
  468. self.assertAllCalled()
  469. def test_037_clone_ignore_errors_feature(self):
  470. self.clone_setup_common_calls('test-vm', 'new-name')
  471. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  472. b'0\x00new-name class=AppVM state=Halted\n' \
  473. b'test-vm class=AppVM state=Halted\n' \
  474. b'test-template class=TemplateVM state=Halted\n' \
  475. b'test-net class=AppVM state=Halted\n'
  476. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  477. 'test-template', b'name=new-name label=red')] = b'0\x00'
  478. self.app.expected_calls[
  479. ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
  480. b'2\0QubesException\0\0something happened\0'
  481. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  482. self.assertEqual(new_vm.name, 'new-name')
  483. self.assertAllCalled()
  484. def test_038_clone_ignore_errors_tag(self):
  485. self.clone_setup_common_calls('test-vm', 'new-name')
  486. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  487. b'0\x00new-name class=AppVM state=Halted\n' \
  488. b'test-vm class=AppVM state=Halted\n' \
  489. b'test-template class=TemplateVM state=Halted\n' \
  490. b'test-net class=AppVM state=Halted\n'
  491. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  492. 'test-template', b'name=new-name label=red')] = b'0\x00'
  493. self.app.expected_calls[
  494. ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
  495. b'2\0QubesException\0\0something happened\0'
  496. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  497. self.assertEqual(new_vm.name, 'new-name')
  498. self.assertAllCalled()
  499. def test_039_clone_ignore_errors_firewall(self):
  500. self.clone_setup_common_calls('test-vm', 'new-name')
  501. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  502. b'0\x00new-name class=AppVM state=Halted\n' \
  503. b'test-vm class=AppVM state=Halted\n' \
  504. b'test-template class=TemplateVM state=Halted\n' \
  505. b'test-net class=AppVM state=Halted\n'
  506. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  507. 'test-template', b'name=new-name label=red')] = b'0\x00'
  508. self.app.expected_calls[
  509. ('new-name', 'admin.vm.firewall.Set', None,
  510. b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \
  511. b'2\0QubesException\0\0something happened\0'
  512. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  513. self.assertEqual(new_vm.name, 'new-name')
  514. self.assertAllCalled()
  515. def test_040_clone_ignore_errors_storage(self):
  516. self.clone_setup_common_calls('test-vm', 'new-name')
  517. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  518. b'0\x00new-name class=AppVM state=Halted\n' \
  519. b'test-vm class=AppVM state=Halted\n' \
  520. b'test-template class=TemplateVM state=Halted\n' \
  521. b'test-net class=AppVM state=Halted\n'
  522. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  523. 'test-template', b'name=new-name label=red')] = b'0\x00'
  524. self.app.expected_calls[
  525. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  526. b'token-private')] = \
  527. b'2\0QubesException\0\0something happened\0'
  528. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  529. b'0\x00'
  530. del self.app.expected_calls[
  531. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  532. del self.app.expected_calls[
  533. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  534. with self.assertRaises(qubesadmin.exc.QubesException):
  535. self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  536. self.assertAllCalled()
  537. def test_041_clone_fail_storage(self):
  538. self.clone_setup_common_calls('test-vm', 'new-name')
  539. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  540. b'0\x00new-name class=AppVM state=Halted\n' \
  541. b'test-vm class=AppVM state=Halted\n' \
  542. b'test-template class=TemplateVM state=Halted\n' \
  543. b'test-net class=AppVM state=Halted\n'
  544. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  545. 'test-template', b'name=new-name label=red')] = b'0\x00'
  546. self.app.expected_calls[
  547. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  548. b'token-private')] = \
  549. b'2\0QubesException\0\0something happened\0'
  550. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  551. b'0\x00'
  552. del self.app.expected_calls[
  553. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  554. del self.app.expected_calls[
  555. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  556. with self.assertRaises(qubesadmin.exc.QubesException):
  557. self.app.clone_vm('test-vm', 'new-name')
  558. self.assertAllCalled()
  559. def test_042_clone_nondefault_pool(self):
  560. self.clone_setup_common_calls('test-vm', 'new-name')
  561. self.app.expected_calls[
  562. ('test-vm', 'admin.vm.volume.Info', 'private', None)] = \
  563. b'0\x00pool=another\n' \
  564. b'vid=vm-test-vm/private\n' \
  565. b'size=2147483648\n' \
  566. b'usage=214748364\n' \
  567. b'rw=True\n' \
  568. b'internal=True\n' \
  569. b'save_on_stop=True\n' \
  570. b'snap_on_start=False\n'
  571. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  572. b'0\x00new-name class=AppVM state=Halted\n' \
  573. b'test-vm class=AppVM state=Halted\n' \
  574. b'test-template class=TemplateVM state=Halted\n' \
  575. b'test-net class=AppVM state=Halted\n'
  576. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  577. 'test-template', b'name=new-name label=red pool:private=another')]\
  578. = b'0\x00'
  579. new_vm = self.app.clone_vm('test-vm', 'new-name')
  580. self.assertEqual(new_vm.name, 'new-name')
  581. self.check_output_mock.assert_called_once_with(
  582. ['qvm-appmenus', '--init', '--update',
  583. '--source', 'test-vm', 'new-name'],
  584. stderr=subprocess.STDOUT
  585. )
  586. self.assertAllCalled()
  587. class TC_20_QubesLocal(unittest.TestCase):
  588. def setUp(self):
  589. super(TC_20_QubesLocal, self).setUp()
  590. self.socket_dir = tempfile.mkdtemp()
  591. self.orig_sock = qubesadmin.config.QUBESD_SOCKET
  592. qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
  593. self.proc = None
  594. self.app = qubesadmin.app.QubesLocal()
  595. def listen_and_send(self, send_data):
  596. '''Listen on socket and send data in response.
  597. :param bytes send_data: data to send
  598. '''
  599. self.socket_pipe, child_pipe = multiprocessing.Pipe()
  600. self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  601. self.socket.bind(os.path.join(self.socket_dir, 'sock'))
  602. self.socket.listen(1)
  603. def worker(sock, pipe, send_data_):
  604. conn, addr = sock.accept()
  605. pipe.send(conn.makefile('rb').read())
  606. conn.sendall(send_data_)
  607. conn.close()
  608. self.proc = multiprocessing.Process(target=worker,
  609. args=(self.socket, child_pipe, send_data))
  610. self.proc.start()
  611. self.socket.close()
  612. def get_request(self):
  613. '''Get request sent to "qubesd" mock'''
  614. return self.socket_pipe.recv()
  615. def tearDown(self):
  616. qubesadmin.config.QUBESD_SOCKET = self.orig_sock
  617. if self.proc is not None:
  618. try:
  619. self.proc.terminate()
  620. except OSError:
  621. pass
  622. shutil.rmtree(self.socket_dir)
  623. super(TC_20_QubesLocal, self).tearDown()
  624. def test_000_qubesd_call(self):
  625. self.listen_and_send(b'0\0')
  626. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  627. self.assertEqual(self.get_request(),
  628. b'dom0\0some.method\0test-vm\0arg1\0payload')
  629. def test_001_qubesd_call_none_arg(self):
  630. self.listen_and_send(b'0\0')
  631. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  632. self.assertEqual(self.get_request(),
  633. b'dom0\0some.method\0test-vm\0\0payload')
  634. def test_002_qubesd_call_none_payload(self):
  635. self.listen_and_send(b'0\0')
  636. self.app.qubesd_call('test-vm', 'some.method', None, None)
  637. self.assertEqual(self.get_request(),
  638. b'dom0\0some.method\0test-vm\0\0')
  639. def test_003_qubesd_call_payload_stream(self):
  640. # this should really be in setUp()...
  641. tmpdir = tempfile.mkdtemp()
  642. self.addCleanup(shutil.rmtree, tmpdir)
  643. service_path = os.path.join(tmpdir, 'test.service')
  644. payload_input = os.path.join(tmpdir, 'payload-input')
  645. with open(service_path, 'w') as f:
  646. f.write('#!/bin/bash\n'
  647. 'env > {dir}/env\n'
  648. 'echo "$@" > {dir}/args\n'
  649. 'cat > {dir}/payload\n'
  650. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  651. os.chmod(service_path, 0o755)
  652. with open(payload_input, 'w+') as payload_file:
  653. payload_file.write('some payload\n')
  654. payload_file.seek(0)
  655. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  656. tmpdir):
  657. value = self.app.qubesd_call('test-vm', 'test.service',
  658. 'some-arg', payload_stream=payload_file)
  659. self.assertEqual(value, b'return-value')
  660. self.assertTrue(os.path.exists(tmpdir + '/env'))
  661. with open(tmpdir + '/env') as env:
  662. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  663. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  664. self.assertTrue(os.path.exists(tmpdir + '/args'))
  665. with open(tmpdir + '/args') as args:
  666. self.assertEqual(args.read(), 'some-arg\n')
  667. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  668. with open(tmpdir + '/payload') as payload:
  669. self.assertEqual(payload.read(), 'some payload\n')
  670. def test_004_qubesd_call_payload_stream_proc(self):
  671. # this should really be in setUp()...
  672. tmpdir = tempfile.mkdtemp()
  673. self.addCleanup(shutil.rmtree, tmpdir)
  674. service_path = os.path.join(tmpdir, 'test.service')
  675. echo = subprocess.Popen(['echo', 'some payload'],
  676. stdout=subprocess.PIPE)
  677. with open(service_path, 'w') as f:
  678. f.write('#!/bin/bash\n'
  679. 'env > {dir}/env\n'
  680. 'echo "$@" > {dir}/args\n'
  681. 'cat > {dir}/payload\n'
  682. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  683. os.chmod(service_path, 0o755)
  684. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  685. tmpdir):
  686. value = self.app.qubesd_call('test-vm', 'test.service',
  687. 'some-arg', payload_stream=echo.stdout)
  688. echo.stdout.close()
  689. self.assertEqual(value, b'return-value')
  690. self.assertTrue(os.path.exists(tmpdir + '/env'))
  691. with open(tmpdir + '/env') as env:
  692. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  693. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  694. self.assertTrue(os.path.exists(tmpdir + '/args'))
  695. with open(tmpdir + '/args') as args:
  696. self.assertEqual(args.read(), 'some-arg\n')
  697. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  698. with open(tmpdir + '/payload') as payload:
  699. self.assertEqual(payload.read(), 'some payload\n')
  700. @mock.patch('os.isatty', lambda fd: fd == 2)
  701. def test_010_run_service(self):
  702. self.listen_and_send(b'0\0')
  703. with mock.patch('subprocess.Popen') as mock_proc:
  704. p = self.app.run_service('some-vm', 'service.name')
  705. mock_proc.assert_called_once_with([
  706. qubesadmin.config.QREXEC_CLIENT,
  707. '-d', 'some-vm', '-T', 'DEFAULT:QUBESRPC service.name dom0'],
  708. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  709. stderr=subprocess.PIPE)
  710. self.assertEqual(self.get_request(),
  711. b'dom0\0admin.vm.Start\0some-vm\0\0')
  712. def test_011_run_service_filter_esc(self):
  713. self.listen_and_send(b'0\0')
  714. with mock.patch('subprocess.Popen') as mock_proc:
  715. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  716. mock_proc.assert_called_once_with([
  717. qubesadmin.config.QREXEC_CLIENT,
  718. '-d', 'some-vm', '-t', '-T',
  719. 'DEFAULT:QUBESRPC service.name dom0'],
  720. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  721. stderr=subprocess.PIPE)
  722. self.assertEqual(self.get_request(),
  723. b'dom0\0admin.vm.Start\0some-vm\0\0')
  724. @mock.patch('os.isatty', lambda fd: fd == 2)
  725. def test_012_run_service_user(self):
  726. self.listen_and_send(b'0\0')
  727. with mock.patch('subprocess.Popen') as mock_proc:
  728. p = self.app.run_service('some-vm', 'service.name', user='user')
  729. mock_proc.assert_called_once_with([
  730. qubesadmin.config.QREXEC_CLIENT,
  731. '-d', 'some-vm', '-T',
  732. 'user:QUBESRPC service.name dom0'],
  733. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  734. stderr=subprocess.PIPE)
  735. self.assertEqual(self.get_request(),
  736. b'dom0\0admin.vm.Start\0some-vm\0\0')
  737. def test_013_run_service_default_target(self):
  738. with self.assertRaises(ValueError):
  739. self.app.run_service('', 'service.name')
  740. class TC_30_QubesRemote(unittest.TestCase):
  741. def setUp(self):
  742. super(TC_30_QubesRemote, self).setUp()
  743. self.proc_mock = mock.Mock()
  744. self.proc_mock.configure_mock(**{
  745. 'return_value.returncode': 0
  746. })
  747. self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
  748. self.proc_patch.start()
  749. self.app = qubesadmin.app.QubesRemote()
  750. def set_proc_stdout(self, send_data):
  751. self.proc_mock.configure_mock(**{
  752. 'return_value.communicate.return_value': (send_data, None)
  753. })
  754. def tearDown(self):
  755. self.proc_patch.stop()
  756. super(TC_30_QubesRemote, self).tearDown()
  757. def test_000_qubesd_call(self):
  758. self.set_proc_stdout(b'0\0')
  759. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  760. self.assertEqual(self.proc_mock.mock_calls, [
  761. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  762. 'some.method+arg1'],
  763. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  764. stderr=subprocess.PIPE),
  765. mock.call().communicate(b'payload')
  766. ])
  767. def test_001_qubesd_call_none_arg(self):
  768. self.set_proc_stdout(b'0\0')
  769. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  770. self.assertEqual(self.proc_mock.mock_calls, [
  771. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  772. 'some.method'],
  773. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  774. stderr=subprocess.PIPE),
  775. mock.call().communicate(b'payload')
  776. ])
  777. def test_002_qubesd_call_none_payload(self):
  778. self.set_proc_stdout(b'0\0')
  779. self.app.qubesd_call('test-vm', 'some.method', None, None)
  780. self.assertEqual(self.proc_mock.mock_calls, [
  781. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  782. 'some.method'],
  783. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  784. stderr=subprocess.PIPE),
  785. mock.call().communicate(None)
  786. ])
  787. def test_003_qubesd_call_payload_stream(self):
  788. self.set_proc_stdout(b'0\0return-value')
  789. tmpdir = tempfile.mkdtemp()
  790. self.addCleanup(shutil.rmtree, tmpdir)
  791. payload_input = os.path.join(tmpdir, 'payload-input')
  792. with open(payload_input, 'w+') as payload_file:
  793. payload_file.write('some payload\n')
  794. payload_file.seek(0)
  795. value = self.app.qubesd_call('test-vm', 'some.method',
  796. 'some-arg', payload_stream=payload_file)
  797. self.assertEqual(self.proc_mock.mock_calls, [
  798. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  799. 'some.method+some-arg'],
  800. stdin=payload_file, stdout=subprocess.PIPE,
  801. stderr=subprocess.PIPE),
  802. mock.call().communicate(None)
  803. ])
  804. self.assertEqual(value, b'return-value')
  805. @mock.patch('os.isatty', lambda fd: fd == 2)
  806. def test_010_run_service(self):
  807. self.app.run_service('some-vm', 'service.name')
  808. self.proc_mock.assert_called_once_with([
  809. qubesadmin.config.QREXEC_CLIENT_VM,
  810. '-T', 'some-vm', 'service.name'],
  811. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  812. stderr=subprocess.PIPE)
  813. @mock.patch('os.isatty', lambda fd: fd == 2)
  814. def test_011_run_service_filter_esc(self):
  815. self.app.run_service('some-vm', 'service.name', filter_esc=True)
  816. self.proc_mock.assert_called_once_with([
  817. qubesadmin.config.QREXEC_CLIENT_VM,
  818. '-t', '-T', 'some-vm', 'service.name'],
  819. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  820. stderr=subprocess.PIPE)
  821. @mock.patch('os.isatty', lambda fd: fd == 2)
  822. def test_012_run_service_user(self):
  823. with self.assertRaises(ValueError):
  824. p = self.app.run_service('some-vm', 'service.name', user='user')
  825. @mock.patch('os.isatty', lambda fd: fd == 2)
  826. def test_013_run_service_default_target(self):
  827. self.app.run_service('', 'service.name')
  828. self.proc_mock.assert_called_once_with([
  829. qubesadmin.config.QREXEC_CLIENT_VM,
  830. '-T', '', 'service.name'],
  831. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  832. stderr=subprocess.PIPE)
  833. @mock.patch('os.isatty', lambda fd: fd == 2)
  834. def test_014_run_service_no_autostart1(self):
  835. self.set_proc_stdout( b'0\x00power_state=Running')
  836. self.app.run_service('some-vm', 'service.name', autostart=False)
  837. self.proc_mock.assert_has_calls([
  838. call([qubesadmin.config.QREXEC_CLIENT_VM,
  839. 'some-vm', 'admin.vm.CurrentState'],
  840. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  841. stderr=subprocess.PIPE),
  842. call().communicate(None),
  843. call([qubesadmin.config.QREXEC_CLIENT_VM,
  844. '-T', 'some-vm', 'service.name'],
  845. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  846. stderr=subprocess.PIPE),
  847. ])
  848. @mock.patch('os.isatty', lambda fd: fd == 2)
  849. def test_015_run_service_no_autostart2(self):
  850. self.set_proc_stdout( b'0\x00power_state=Halted')
  851. with self.assertRaises(qubesadmin.exc.QubesVMNotRunningError):
  852. self.app.run_service('some-vm', 'service.name', autostart=False)
  853. self.proc_mock.assert_called_once_with([
  854. qubesadmin.config.QREXEC_CLIENT_VM,
  855. 'some-vm', 'admin.vm.CurrentState'],
  856. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  857. stderr=subprocess.PIPE)