app.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. # -*- encoding: utf-8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import shutil
  22. import socket
  23. import subprocess
  24. import unittest
  25. import multiprocessing
  26. import unittest.mock as mock
  27. from unittest.mock import call
  28. import tempfile
  29. import qubesadmin.exc
  30. import qubesadmin.tests
  31. class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
  32. def test_000_list(self):
  33. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  34. b'0\x00test-vm class=AppVM state=Running\n'
  35. self.assertEqual(
  36. list(self.app.domains.keys()),
  37. ['test-vm'])
  38. self.assertAllCalled()
  39. def test_001_getitem(self):
  40. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  41. b'0\x00test-vm class=AppVM state=Running\n'
  42. try:
  43. vm = self.app.domains['test-vm']
  44. self.assertEqual(vm.name, 'test-vm')
  45. except KeyError:
  46. self.fail('VM not found in collection')
  47. self.assertAllCalled()
  48. with self.assertRaises(KeyError):
  49. vm = self.app.domains['test-non-existent']
  50. self.assertAllCalled()
  51. def test_002_in(self):
  52. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  53. b'0\x00test-vm class=AppVM state=Running\n'
  54. self.assertIn('test-vm', self.app.domains)
  55. self.assertAllCalled()
  56. self.assertNotIn('test-non-existent', self.app.domains)
  57. self.assertAllCalled()
  58. def test_003_iter(self):
  59. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  60. b'0\x00test-vm class=AppVM state=Running\n'
  61. self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
  62. self.assertAllCalled()
  63. def test_004_delitem(self):
  64. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  65. b'0\x00'
  66. del self.app.domains['test-vm']
  67. self.assertAllCalled()
  68. def test_005_keys(self):
  69. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  70. b'0\x00test-vm class=AppVM state=Running\n' \
  71. b'test-vm2 class=AppVM state=Running\n'
  72. self.assertEqual(set(self.app.domains.keys()),
  73. set(['test-vm', 'test-vm2']))
  74. self.assertAllCalled()
  75. def test_006_values(self):
  76. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  77. b'0\x00test-vm class=AppVM state=Running\n' \
  78. b'test-vm2 class=AppVM state=Running\n'
  79. values = self.app.domains.values()
  80. for obj in values:
  81. self.assertIsInstance(obj, qubesadmin.vm.QubesVM)
  82. self.assertEqual(set([vm.name for vm in values]),
  83. set(['test-vm', 'test-vm2']))
  84. self.assertAllCalled()
  85. def test_007_getitem_blind_mode(self):
  86. self.app.blind_mode = True
  87. try:
  88. vm = self.app.domains['test-vm']
  89. self.assertEqual(vm.name, 'test-vm')
  90. except KeyError:
  91. self.fail('VM not found in collection')
  92. self.assertAllCalled()
  93. with self.assertNotRaises(KeyError):
  94. vm = self.app.domains['test-non-existent']
  95. self.assertAllCalled()
  96. def test_008_in_blind_mode(self):
  97. self.app.blind_mode = True
  98. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  99. b'0\x00test-vm class=AppVM state=Running\n'
  100. self.assertIn('test-vm', self.app.domains)
  101. self.assertAllCalled()
  102. self.assertNotIn('test-non-existent', self.app.domains)
  103. self.assertAllCalled()
  104. def test_009_getitem_cache_class(self):
  105. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  106. b'0\x00test-vm class=AppVM state=Running\n'
  107. try:
  108. vm = self.app.domains['test-vm']
  109. self.assertEqual(vm.name, 'test-vm')
  110. self.assertEqual(vm.klass, 'AppVM')
  111. except KeyError:
  112. self.fail('VM not found in collection')
  113. self.assertAllCalled()
  114. def test_010_getitem_cache_power_state(self):
  115. self.app.cache_enabled = True
  116. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  117. b'0\x00test-vm class=AppVM state=Running\n'
  118. try:
  119. vm = self.app.domains['test-vm']
  120. self.assertEqual(vm.name, 'test-vm')
  121. self.assertEqual(vm.klass, 'AppVM')
  122. self.assertEqual(vm.get_power_state(), 'Running')
  123. except KeyError:
  124. self.fail('VM not found in collection')
  125. self.assertAllCalled()
  126. def test_011_getitem_non_cache_power_state(self):
  127. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  128. b'0\x00test-vm class=AppVM state=Running\n'
  129. self.app.expected_calls[('test-vm', 'admin.vm.CurrentState', None, None)] = \
  130. b'0\x00power_state=Running mem=1024'
  131. try:
  132. vm = self.app.domains['test-vm']
  133. self.assertEqual(vm.name, 'test-vm')
  134. self.assertEqual(vm.klass, 'AppVM')
  135. self.assertEqual(vm.get_power_state(), 'Running')
  136. except KeyError:
  137. self.fail('VM not found in collection')
  138. self.assertAllCalled()
  139. def test_012_getitem_cached_object(self):
  140. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  141. b'0\x00test-vm class=AppVM state=Running\n'
  142. try:
  143. vm1 = self.app.domains['test-vm']
  144. vm2 = self.app.domains['test-vm']
  145. self.assertIs(vm1, vm2)
  146. except KeyError:
  147. self.fail('VM not found in collection')
  148. self.app.domains.clear_cache()
  149. # even after clearing the cache, the same instance should be returned
  150. # if the class haven't changed
  151. vm3 = self.app.domains['test-vm']
  152. self.assertIs(vm1, vm3)
  153. # change the class and expected different object
  154. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  155. b'0\x00test-vm class=StandaloneVM state=Running\n'
  156. self.app.domains.clear_cache()
  157. vm4 = self.app.domains['test-vm']
  158. self.assertIsNot(vm1, vm4)
  159. self.assertAllCalled()
  160. class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
  161. def setUp(self):
  162. super(TC_10_QubesBase, self).setUp()
  163. self.check_output_patch = mock.patch(
  164. 'subprocess.check_output')
  165. self.check_output_mock = self.check_output_patch.start()
  166. def tearDown(self):
  167. self.check_output_patch.stop()
  168. super(TC_10_QubesBase, self).tearDown()
  169. def test_010_new_simple(self):
  170. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
  171. b'name=new-vm label=red')] = b'0\x00'
  172. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  173. b'0\x00new-vm class=AppVM state=Running\n'
  174. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
  175. self.assertEqual(vm.name, 'new-vm')
  176. self.assertEqual(vm.klass, 'AppVM')
  177. self.assertAllCalled()
  178. def test_011_new_template(self):
  179. self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
  180. b'name=new-template label=red')] = b'0\x00'
  181. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  182. b'0\x00new-template class=TemplateVM state=Running\n'
  183. vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
  184. self.assertEqual(vm.name, 'new-template')
  185. self.assertEqual(vm.klass, 'TemplateVM')
  186. self.assertAllCalled()
  187. def test_012_new_template_based(self):
  188. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  189. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  190. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  191. b'0\x00new-vm class=AppVM state=Running\n'
  192. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
  193. self.assertEqual(vm.name, 'new-vm')
  194. self.assertEqual(vm.klass, 'AppVM')
  195. self.assertAllCalled()
  196. def test_013_new_objects_params(self):
  197. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  198. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  199. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  200. b'0\x00red\nblue\n'
  201. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  202. b'0\x00new-vm class=AppVM state=Running\n' \
  203. b'some-template class=TemplateVM state=Running\n'
  204. vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
  205. self.app.get_label('red'), self.app.domains['some-template'])
  206. self.assertEqual(vm.name, 'new-vm')
  207. self.assertEqual(vm.klass, 'AppVM')
  208. self.assertAllCalled()
  209. def test_014_new_pool(self):
  210. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  211. b'name=new-vm label=red pool=some-pool')] = b'0\x00'
  212. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  213. b'0\x00new-vm class=AppVM state=Running\n'
  214. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
  215. self.assertEqual(vm.name, 'new-vm')
  216. self.assertEqual(vm.klass, 'AppVM')
  217. self.assertAllCalled()
  218. def test_015_new_pools(self):
  219. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  220. b'name=new-vm label=red pool:private=some-pool '
  221. b'pool:volatile=other-pool')] = b'0\x00'
  222. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  223. b'0\x00new-vm class=AppVM state=Running\n'
  224. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  225. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  226. self.assertEqual(vm.name, 'new-vm')
  227. self.assertEqual(vm.klass, 'AppVM')
  228. self.assertAllCalled()
  229. def test_016_new_template_based_default(self):
  230. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  231. None, b'name=new-vm label=red')] = b'0\x00'
  232. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  233. b'0\x00new-vm class=AppVM state=Running\n'
  234. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  235. template=qubesadmin.DEFAULT)
  236. self.assertEqual(vm.name, 'new-vm')
  237. self.assertEqual(vm.klass, 'AppVM')
  238. self.assertAllCalled()
  239. def test_020_get_label(self):
  240. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  241. b'0\x00red\nblue\n'
  242. label = self.app.get_label('red')
  243. self.assertEqual(label.name, 'red')
  244. self.assertAllCalled()
  245. def test_021_get_nonexistant_label(self):
  246. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  247. b'0\x00red\nblue\n'
  248. with self.assertRaises(qubesadmin.exc.QubesLabelNotFoundError):
  249. self.app.get_label('green')
  250. self.assertAllCalled()
  251. def clone_setup_common_calls(self, src, dst):
  252. # have each property type with default=no, each special-cased,
  253. # and some with default=yes
  254. properties = {
  255. 'label': 'default=False type=label red',
  256. 'template': 'default=False type=vm test-template',
  257. 'memory': 'default=False type=int 400',
  258. 'kernel': 'default=False type=str 4.9.31',
  259. 'netvm': 'default=False type=vm test-net',
  260. 'virt_mode': 'default=False type=str hvm',
  261. 'default_user': 'default=True type=str user',
  262. }
  263. self.app.expected_calls[
  264. (src, 'admin.vm.property.List', None, None)] = \
  265. b'0\0qid\nname\n' + \
  266. b'\n'.join(prop.encode() for prop in properties.keys()) + \
  267. b'\n'
  268. for prop, value in properties.items():
  269. self.app.expected_calls[
  270. (src, 'admin.vm.property.Get', prop, None)] = \
  271. b'0\0' + value.encode()
  272. # special cases handled by admin.vm.Create call
  273. if prop in ('label', 'template'):
  274. continue
  275. # default properties should not be set
  276. if 'default=True' in value:
  277. continue
  278. self.app.expected_calls[
  279. (dst, 'admin.vm.property.Set', prop,
  280. value.split()[-1].encode())] = b'0\0'
  281. # tags
  282. self.app.expected_calls[
  283. (src, 'admin.vm.tag.List', None, None)] = \
  284. b'0\0tag1\ntag2\n'
  285. self.app.expected_calls[
  286. (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
  287. self.app.expected_calls[
  288. (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
  289. # features
  290. self.app.expected_calls[
  291. (src, 'admin.vm.feature.List', None, None)] = \
  292. b'0\0feat1\nfeat2\n'
  293. self.app.expected_calls[
  294. (src, 'admin.vm.feature.Get', 'feat1', None)] = \
  295. b'0\0feat1-value with spaces'
  296. self.app.expected_calls[
  297. (src, 'admin.vm.feature.Get', 'feat2', None)] = \
  298. b'0\x001'
  299. self.app.expected_calls[
  300. (dst, 'admin.vm.feature.Set', 'feat1',
  301. b'feat1-value with spaces')] = b'0\0'
  302. self.app.expected_calls[
  303. (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0'
  304. # firewall
  305. rules = (
  306. b'action=drop dst4=192.168.0.0/24\n'
  307. b'action=accept\n'
  308. )
  309. self.app.expected_calls[
  310. (src, 'admin.vm.firewall.Get', None, None)] = \
  311. b'0\x00' + rules
  312. self.app.expected_calls[
  313. (dst, 'admin.vm.firewall.Set', None, rules)] = \
  314. b'0\x00'
  315. # storage
  316. for vm in (src, dst):
  317. self.app.expected_calls[
  318. (vm, 'admin.vm.volume.Info', 'root', None)] = \
  319. b'0\x00pool=lvm\n' \
  320. b'vid=vm-' + vm.encode() + b'/root\n' \
  321. b'size=10737418240\n' \
  322. b'usage=2147483648\n' \
  323. b'rw=False\n' \
  324. b'internal=True\n' \
  325. b'source=vm-test-template/root\n' \
  326. b'save_on_stop=False\n' \
  327. b'snap_on_start=True\n'
  328. self.app.expected_calls[
  329. (vm, 'admin.vm.volume.Info', 'private', None)] = \
  330. b'0\x00pool=lvm\n' \
  331. b'vid=vm-' + vm.encode() + b'/private\n' \
  332. b'size=2147483648\n' \
  333. b'usage=214748364\n' \
  334. b'rw=True\n' \
  335. b'internal=True\n' \
  336. b'save_on_stop=True\n' \
  337. b'snap_on_start=False\n'
  338. self.app.expected_calls[
  339. (vm, 'admin.vm.volume.Info', 'volatile', None)] = \
  340. b'0\x00pool=lvm\n' \
  341. b'vid=vm-' + vm.encode() + b'/volatile\n' \
  342. b'size=10737418240\n' \
  343. b'usage=0\n' \
  344. b'rw=True\n' \
  345. b'internal=True\n' \
  346. b'source=None\n' \
  347. b'save_on_stop=False\n' \
  348. b'snap_on_start=False\n'
  349. self.app.expected_calls[
  350. (vm, 'admin.vm.volume.Info', 'kernel', None)] = \
  351. b'0\x00pool=linux-kernel\n' \
  352. b'vid=\n' \
  353. b'size=0\n' \
  354. b'usage=0\n' \
  355. b'rw=False\n' \
  356. b'internal=True\n' \
  357. b'source=None\n' \
  358. b'save_on_stop=False\n' \
  359. b'snap_on_start=False\n'
  360. self.app.expected_calls[
  361. (vm, 'admin.vm.volume.List', None, None)] = \
  362. b'0\x00root\nprivate\nvolatile\nkernel\n'
  363. self.app.expected_calls[
  364. (src, 'admin.vm.volume.CloneFrom', 'private', None)] = \
  365. b'0\x00token-private'
  366. self.app.expected_calls[
  367. (dst, 'admin.vm.volume.CloneTo', 'private', b'token-private')] = \
  368. b'0\x00'
  369. self.app.expected_calls[
  370. ('dom0', 'admin.property.Get', 'default_pool_private', None)] = \
  371. b'0\0default=True type=str lvm'
  372. def test_030_clone(self):
  373. self.clone_setup_common_calls('test-vm', 'new-name')
  374. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  375. b'0\x00new-name class=AppVM state=Halted\n' \
  376. b'test-vm class=AppVM state=Halted\n' \
  377. b'test-template class=TemplateVM state=Halted\n' \
  378. b'test-net class=AppVM state=Halted\n'
  379. self.app.expected_calls[
  380. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  381. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  382. 'test-template', b'name=new-name label=red')] = b'0\x00'
  383. new_vm = self.app.clone_vm('test-vm', 'new-name')
  384. self.assertEqual(new_vm.name, 'new-name')
  385. self.check_output_mock.assert_called_once_with(
  386. ['qvm-appmenus', '--init', '--update',
  387. '--source', 'test-vm', 'new-name'],
  388. stderr=subprocess.STDOUT
  389. )
  390. self.assertAllCalled()
  391. def test_031_clone_object(self):
  392. self.clone_setup_common_calls('test-vm', 'new-name')
  393. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  394. b'0\x00new-name class=AppVM state=Halted\n' \
  395. b'test-vm class=AppVM state=Halted\n' \
  396. b'test-template class=TemplateVM state=Halted\n' \
  397. b'test-net class=AppVM state=Halted\n'
  398. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  399. 'test-template', b'name=new-name label=red')] = b'0\x00'
  400. self.app.expected_calls[
  401. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  402. new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
  403. self.assertEqual(new_vm.name, 'new-name')
  404. self.assertAllCalled()
  405. def test_032_clone_pool(self):
  406. self.clone_setup_common_calls('test-vm', 'new-name')
  407. for volume in ('root', 'private', 'volatile', 'kernel'):
  408. del self.app.expected_calls[
  409. 'test-vm', 'admin.vm.volume.Info', volume, None]
  410. del self.app.expected_calls[
  411. 'dom0', 'admin.property.Get', 'default_pool_private', None]
  412. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  413. 'test-template',
  414. b'name=new-name label=red pool=some-pool')] = b'0\x00'
  415. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  416. b'0\x00new-name class=AppVM state=Halted\n' \
  417. b'test-vm class=AppVM state=Halted\n' \
  418. b'test-template class=TemplateVM state=Halted\n' \
  419. b'test-net class=AppVM state=Halted\n'
  420. self.app.expected_calls[
  421. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  422. new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
  423. self.assertEqual(new_vm.name, 'new-name')
  424. self.assertAllCalled()
  425. def test_033_clone_pools(self):
  426. self.clone_setup_common_calls('test-vm', 'new-name')
  427. for volume in ('root', 'private', 'volatile', 'kernel'):
  428. del self.app.expected_calls[
  429. 'test-vm', 'admin.vm.volume.Info', volume, None]
  430. del self.app.expected_calls[
  431. 'dom0', 'admin.property.Get', 'default_pool_private', None]
  432. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  433. 'test-template',
  434. b'name=new-name label=red pool:private=some-pool '
  435. b'pool:volatile=other-pool')] = b'0\x00'
  436. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  437. b'0\x00new-name class=AppVM state=Halted\n' \
  438. b'test-vm class=AppVM state=Halted\n' \
  439. b'test-template class=TemplateVM state=Halted\n' \
  440. b'test-net class=AppVM state=Halted\n'
  441. self.app.expected_calls[
  442. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  443. new_vm = self.app.clone_vm('test-vm', 'new-name',
  444. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  445. self.assertEqual(new_vm.name, 'new-name')
  446. self.assertAllCalled()
  447. def test_034_clone_class_change(self):
  448. self.clone_setup_common_calls('test-vm', 'new-name')
  449. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  450. b'0\x00new-name class=StandaloneVM state=Halted\n' \
  451. b'test-vm class=AppVM state=Halted\n' \
  452. b'test-template class=TemplateVM state=Halted\n' \
  453. b'test-net class=AppVM state=Halted\n'
  454. self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM',
  455. 'test-template', b'name=new-name label=red')] = b'0\x00'
  456. self.app.expected_calls[
  457. ('new-name', 'admin.vm.volume.Info', 'root', None)] = \
  458. b'0\x00pool=lvm\n' \
  459. b'vid=vm-new-name/root\n' \
  460. b'size=10737418240\n' \
  461. b'usage=2147483648\n' \
  462. b'rw=True\n' \
  463. b'internal=True\n' \
  464. b'source=None\n' \
  465. b'save_on_stop=True\n' \
  466. b'snap_on_start=False\n'
  467. self.app.expected_calls[
  468. ('test-vm', 'admin.vm.volume.CloneFrom', 'root', None)] = \
  469. b'0\x00token-root'
  470. self.app.expected_calls[
  471. ('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \
  472. b'0\x00'
  473. self.app.expected_calls[
  474. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  475. new_vm = self.app.clone_vm('test-vm', 'new-name',
  476. new_cls='StandaloneVM')
  477. self.assertEqual(new_vm.name, 'new-name')
  478. self.assertEqual(new_vm.klass, 'StandaloneVM')
  479. self.assertAllCalled()
  480. def test_035_clone_fail(self):
  481. self.app.expected_calls[
  482. ('test-vm', 'admin.vm.property.List', None, None)] = \
  483. b'0\0qid\nname\ntemplate\nlabel\nmemory\n'
  484. # simplify it a little, shouldn't get this far anyway
  485. self.app.expected_calls[
  486. ('test-vm', 'admin.vm.volume.List', None, None)] = \
  487. b'0\x00'
  488. self.app.expected_calls[
  489. ('test-vm', 'admin.vm.property.Get', 'label', None)] = \
  490. b'0\0default=False type=label red'
  491. self.app.expected_calls[
  492. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  493. b'0\0default=False type=vm test-template'
  494. self.app.expected_calls[
  495. ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \
  496. b'0\0default=False type=int 400'
  497. self.app.expected_calls[
  498. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  499. b'2\0QubesException\0\0something happened\0'
  500. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  501. b'0\x00new-name class=AppVM state=Halted\n' \
  502. b'test-vm class=AppVM state=Halted\n' \
  503. b'test-template class=TemplateVM state=Halted\n' \
  504. b'test-net class=AppVM state=Halted\n'
  505. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  506. 'test-template', b'name=new-name label=red')] = b'0\x00'
  507. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  508. b'0\x00'
  509. with self.assertRaises(qubesadmin.exc.QubesException):
  510. self.app.clone_vm('test-vm', 'new-name')
  511. self.assertAllCalled()
  512. def test_036_clone_ignore_errors_prop(self):
  513. self.clone_setup_common_calls('test-vm', 'new-name')
  514. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  515. b'0\x00new-name class=AppVM state=Halted\n' \
  516. b'test-vm class=AppVM state=Halted\n' \
  517. b'test-template class=TemplateVM state=Halted\n' \
  518. b'test-net class=AppVM state=Halted\n'
  519. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  520. 'test-template', b'name=new-name label=red')] = b'0\x00'
  521. self.app.expected_calls[
  522. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  523. b'2\0QubesException\0\0something happened\0'
  524. self.app.expected_calls[
  525. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  526. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  527. self.assertEqual(new_vm.name, 'new-name')
  528. self.assertAllCalled()
  529. def test_037_clone_ignore_errors_feature(self):
  530. self.clone_setup_common_calls('test-vm', 'new-name')
  531. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  532. b'0\x00new-name class=AppVM state=Halted\n' \
  533. b'test-vm class=AppVM state=Halted\n' \
  534. b'test-template class=TemplateVM state=Halted\n' \
  535. b'test-net class=AppVM state=Halted\n'
  536. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  537. 'test-template', b'name=new-name label=red')] = b'0\x00'
  538. self.app.expected_calls[
  539. ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
  540. b'2\0QubesException\0\0something happened\0'
  541. self.app.expected_calls[
  542. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  543. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  544. self.assertEqual(new_vm.name, 'new-name')
  545. self.assertAllCalled()
  546. def test_038_clone_ignore_errors_tag(self):
  547. self.clone_setup_common_calls('test-vm', 'new-name')
  548. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  549. b'0\x00new-name class=AppVM state=Halted\n' \
  550. b'test-vm class=AppVM state=Halted\n' \
  551. b'test-template class=TemplateVM state=Halted\n' \
  552. b'test-net class=AppVM state=Halted\n'
  553. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  554. 'test-template', b'name=new-name label=red')] = b'0\x00'
  555. self.app.expected_calls[
  556. ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
  557. b'2\0QubesException\0\0something happened\0'
  558. self.app.expected_calls[
  559. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  560. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  561. self.assertEqual(new_vm.name, 'new-name')
  562. self.assertAllCalled()
  563. def test_039_clone_ignore_errors_firewall(self):
  564. self.clone_setup_common_calls('test-vm', 'new-name')
  565. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  566. b'0\x00new-name class=AppVM state=Halted\n' \
  567. b'test-vm class=AppVM state=Halted\n' \
  568. b'test-template class=TemplateVM state=Halted\n' \
  569. b'test-net class=AppVM state=Halted\n'
  570. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  571. 'test-template', b'name=new-name label=red')] = b'0\x00'
  572. self.app.expected_calls[
  573. ('new-name', 'admin.vm.firewall.Set', None,
  574. b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \
  575. b'2\0QubesException\0\0something happened\0'
  576. self.app.expected_calls[
  577. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  578. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  579. self.assertEqual(new_vm.name, 'new-name')
  580. self.assertAllCalled()
  581. def test_040_clone_ignore_errors_storage(self):
  582. self.clone_setup_common_calls('test-vm', 'new-name')
  583. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  584. b'0\x00new-name class=AppVM state=Halted\n' \
  585. b'test-vm class=AppVM state=Halted\n' \
  586. b'test-template class=TemplateVM state=Halted\n' \
  587. b'test-net class=AppVM state=Halted\n'
  588. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  589. 'test-template', b'name=new-name label=red')] = b'0\x00'
  590. self.app.expected_calls[
  591. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  592. b'token-private')] = \
  593. b'2\0QubesException\0\0something happened\0'
  594. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  595. b'0\x00'
  596. del self.app.expected_calls[
  597. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  598. del self.app.expected_calls[
  599. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  600. with self.assertRaises(qubesadmin.exc.QubesException):
  601. self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  602. self.assertAllCalled()
  603. def test_041_clone_fail_storage(self):
  604. self.clone_setup_common_calls('test-vm', 'new-name')
  605. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  606. b'0\x00new-name class=AppVM state=Halted\n' \
  607. b'test-vm class=AppVM state=Halted\n' \
  608. b'test-template class=TemplateVM state=Halted\n' \
  609. b'test-net class=AppVM state=Halted\n'
  610. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  611. 'test-template', b'name=new-name label=red')] = b'0\x00'
  612. self.app.expected_calls[
  613. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  614. b'token-private')] = \
  615. b'2\0QubesException\0\0something happened\0'
  616. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  617. b'0\x00'
  618. del self.app.expected_calls[
  619. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  620. del self.app.expected_calls[
  621. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  622. with self.assertRaises(qubesadmin.exc.QubesException):
  623. self.app.clone_vm('test-vm', 'new-name')
  624. self.assertAllCalled()
  625. def test_042_clone_nondefault_pool(self):
  626. self.clone_setup_common_calls('test-vm', 'new-name')
  627. self.app.expected_calls[
  628. ('test-vm', 'admin.vm.volume.Info', 'private', None)] = \
  629. b'0\x00pool=another\n' \
  630. b'vid=vm-test-vm/private\n' \
  631. b'size=2147483648\n' \
  632. b'usage=214748364\n' \
  633. b'rw=True\n' \
  634. b'internal=True\n' \
  635. b'save_on_stop=True\n' \
  636. b'snap_on_start=False\n'
  637. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  638. b'0\x00new-name class=AppVM state=Halted\n' \
  639. b'test-vm class=AppVM state=Halted\n' \
  640. b'test-template class=TemplateVM state=Halted\n' \
  641. b'test-net class=AppVM state=Halted\n'
  642. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  643. 'test-template', b'name=new-name label=red pool:private=another')]\
  644. = b'0\x00'
  645. self.app.expected_calls[
  646. ('dom0', 'admin.deviceclass.List', None, None)] = b'0\0'
  647. new_vm = self.app.clone_vm('test-vm', 'new-name')
  648. self.assertEqual(new_vm.name, 'new-name')
  649. self.check_output_mock.assert_called_once_with(
  650. ['qvm-appmenus', '--init', '--update',
  651. '--source', 'test-vm', 'new-name'],
  652. stderr=subprocess.STDOUT
  653. )
  654. self.assertAllCalled()
  655. def test_043_clone_devices(self):
  656. self.clone_setup_common_calls('test-vm', 'new-name')
  657. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  658. b'0\x00new-name class=AppVM state=Halted\n' \
  659. b'test-vm class=AppVM state=Halted\n' \
  660. b'test-vm2 class=AppVM state=Halted\n' \
  661. b'test-vm3 class=AppVM state=Halted\n' \
  662. b'test-template class=TemplateVM state=Halted\n' \
  663. b'test-net class=AppVM state=Halted\n'
  664. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  665. 'test-template', b'name=new-name label=red')] = b'0\x00'
  666. self.app.expected_calls[
  667. ('dom0', 'admin.deviceclass.List', None, None)] = \
  668. b'0\0pci\n'
  669. self.app.expected_calls[
  670. ('test-vm', 'admin.vm.device.pci.List', None, None)] = \
  671. b'0\0test-vm2+dev1 ro=True\n' \
  672. b'test-vm3+dev2 persistent=True\n'
  673. self.app.expected_calls[
  674. ('new-name', 'admin.vm.device.pci.Attach', 'test-vm3+dev2',
  675. b'persistent=True')] = b'0\0'
  676. new_vm = self.app.clone_vm('test-vm', 'new-name')
  677. self.assertEqual(new_vm.name, 'new-name')
  678. self.check_output_mock.assert_called_once_with(
  679. ['qvm-appmenus', '--init', '--update',
  680. '--source', 'test-vm', 'new-name'],
  681. stderr=subprocess.STDOUT
  682. )
  683. self.assertAllCalled()
  684. def test_044_clone_devices_fail(self):
  685. self.clone_setup_common_calls('test-vm', 'new-name')
  686. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  687. b'0\x00new-name class=AppVM state=Halted\n' \
  688. b'test-vm class=AppVM state=Halted\n' \
  689. b'test-vm2 class=AppVM state=Halted\n' \
  690. b'test-vm3 class=AppVM state=Halted\n' \
  691. b'test-template class=TemplateVM state=Halted\n' \
  692. b'test-net class=AppVM state=Halted\n'
  693. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  694. 'test-template', b'name=new-name label=red')] = b'0\x00'
  695. self.app.expected_calls[
  696. ('dom0', 'admin.deviceclass.List', None, None)] = \
  697. b'0\0pci\n'
  698. self.app.expected_calls[
  699. ('test-vm', 'admin.vm.device.pci.List', None, None)] = \
  700. b'0\0test-vm2+dev1 ro=True\n' \
  701. b'test-vm3+dev2 persistent=True\n'
  702. self.app.expected_calls[
  703. ('new-name', 'admin.vm.device.pci.Attach', 'test-vm3+dev2',
  704. b'persistent=True')] = \
  705. b'2\0QubesException\0\0something happened\0'
  706. self.app.expected_calls[
  707. ('new-name', 'admin.vm.Remove', None, None)] = b'0\0'
  708. with self.assertRaises(qubesadmin.exc.QubesException):
  709. new_vm = self.app.clone_vm('test-vm', 'new-name')
  710. self.assertAllCalled()
  711. class TC_20_QubesLocal(unittest.TestCase):
  712. def setUp(self):
  713. super(TC_20_QubesLocal, self).setUp()
  714. self.socket_dir = tempfile.mkdtemp()
  715. self.orig_sock = qubesadmin.config.QUBESD_SOCKET
  716. qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
  717. self.proc = None
  718. self.app = qubesadmin.app.QubesLocal()
  719. self.tmpdir = tempfile.mkdtemp()
  720. self.addCleanup(shutil.rmtree, self.tmpdir)
  721. def listen_and_send(self, send_data):
  722. '''Listen on socket and send data in response.
  723. :param bytes send_data: data to send
  724. '''
  725. self.socket_pipe, child_pipe = multiprocessing.Pipe()
  726. self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  727. self.socket.bind(os.path.join(self.socket_dir, 'sock'))
  728. self.socket.listen(1)
  729. def worker(sock, pipe, send_data_):
  730. conn, addr = sock.accept()
  731. pipe.send(conn.makefile('rb').read())
  732. conn.sendall(send_data_)
  733. conn.close()
  734. self.proc = multiprocessing.Process(target=worker,
  735. args=(self.socket, child_pipe, send_data))
  736. self.proc.start()
  737. self.socket.close()
  738. def get_request(self):
  739. '''Get request sent to "qubesd" mock'''
  740. return self.socket_pipe.recv()
  741. def tearDown(self):
  742. qubesadmin.config.QUBESD_SOCKET = self.orig_sock
  743. if self.proc is not None:
  744. try:
  745. self.proc.terminate()
  746. except OSError:
  747. pass
  748. shutil.rmtree(self.socket_dir)
  749. super(TC_20_QubesLocal, self).tearDown()
  750. def test_000_qubesd_call(self):
  751. self.listen_and_send(b'0\0')
  752. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  753. self.assertEqual(self.get_request(),
  754. b'some.method+arg1 dom0 name test-vm\0payload')
  755. def test_001_qubesd_call_none_arg(self):
  756. self.listen_and_send(b'0\0')
  757. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  758. self.assertEqual(self.get_request(),
  759. b'some.method+ dom0 name test-vm\0payload')
  760. def test_002_qubesd_call_none_payload(self):
  761. self.listen_and_send(b'0\0')
  762. self.app.qubesd_call('test-vm', 'some.method', None, None)
  763. self.assertEqual(self.get_request(),
  764. b'some.method+ dom0 name test-vm\0')
  765. def test_003_qubesd_call_payload_stream(self):
  766. payload_input = os.path.join(self.tmpdir, 'payload-input')
  767. with open(payload_input, 'w+b') as payload_file:
  768. payload_file.write(b'some payload\n')
  769. payload_file.seek(0)
  770. self._call_test_service_with_payload_stream(
  771. payload_file, expected=b'some payload\n')
  772. def test_004_qubesd_call_payload_stream_proc(self):
  773. service_path = os.path.join(self.tmpdir, 'test.service')
  774. echo = subprocess.Popen(['echo', 'some payload'],
  775. stdout=subprocess.PIPE)
  776. self._call_test_service_with_payload_stream(
  777. echo.stdout, expected=b'some payload\n')
  778. def test_005_qubesd_call_payload_stream_with_prefix(self):
  779. payload_input = os.path.join(self.tmpdir, 'payload-input')
  780. with open(payload_input, 'w+b') as payload_file:
  781. payload_file.write(b'some payload\n')
  782. payload_file.seek(0)
  783. self._call_test_service_with_payload_stream(
  784. payload_file, payload=b'first line\n',
  785. expected=b'first line\nsome payload\n')
  786. def _call_test_service_with_payload_stream(
  787. self, payload_stream, payload=None, expected=b''):
  788. service_path = os.path.join(self.tmpdir, 'test.service')
  789. with open(service_path, 'w') as f:
  790. f.write('#!/bin/bash\n'
  791. 'env > {dir}/env\n'
  792. 'echo "$@" > {dir}/args\n'
  793. 'cat > {dir}/payload\n'
  794. 'echo -en \'0\\0return-value\'\n'.format(dir=self.tmpdir))
  795. os.chmod(service_path, 0o755)
  796. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  797. self.tmpdir):
  798. value = self.app.qubesd_call(
  799. 'test-vm', 'test.service',
  800. 'some-arg', payload=payload, payload_stream=payload_stream)
  801. self.assertEqual(value, b'return-value')
  802. self.assertTrue(os.path.exists(self.tmpdir + '/env'))
  803. with open(self.tmpdir + '/env') as env:
  804. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  805. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  806. self.assertTrue(os.path.exists(self.tmpdir + '/args'))
  807. with open(self.tmpdir + '/args') as args:
  808. self.assertEqual(args.read(), 'some-arg\n')
  809. self.assertTrue(os.path.exists(self.tmpdir + '/payload'))
  810. with open(self.tmpdir + '/payload', 'rb') as payload_f:
  811. self.assertEqual(payload_f.read(), expected)
  812. @mock.patch('os.isatty', lambda fd: fd == 2)
  813. def test_010_run_service(self):
  814. self.listen_and_send(b'0\0')
  815. with mock.patch('subprocess.Popen') as mock_proc:
  816. p = self.app.run_service('some-vm', 'service.name')
  817. mock_proc.assert_called_once_with([
  818. qubesadmin.config.QREXEC_CLIENT,
  819. '-d', 'some-vm', '-T', 'DEFAULT:QUBESRPC service.name dom0'],
  820. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  821. stderr=subprocess.PIPE)
  822. self.assertEqual(self.get_request(),
  823. b'admin.vm.Start+ dom0 name some-vm\0')
  824. def test_011_run_service_filter_esc(self):
  825. self.listen_and_send(b'0\0')
  826. with mock.patch('subprocess.Popen') as mock_proc:
  827. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  828. mock_proc.assert_called_once_with([
  829. qubesadmin.config.QREXEC_CLIENT,
  830. '-d', 'some-vm', '-t', '-T',
  831. 'DEFAULT:QUBESRPC service.name dom0'],
  832. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  833. stderr=subprocess.PIPE)
  834. self.assertEqual(self.get_request(),
  835. b'admin.vm.Start+ dom0 name some-vm\0')
  836. @mock.patch('os.isatty', lambda fd: fd == 2)
  837. def test_012_run_service_user(self):
  838. self.listen_and_send(b'0\0')
  839. with mock.patch('subprocess.Popen') as mock_proc:
  840. p = self.app.run_service('some-vm', 'service.name', user='user')
  841. mock_proc.assert_called_once_with([
  842. qubesadmin.config.QREXEC_CLIENT,
  843. '-d', 'some-vm', '-T',
  844. 'user:QUBESRPC service.name dom0'],
  845. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  846. stderr=subprocess.PIPE)
  847. self.assertEqual(self.get_request(),
  848. b'admin.vm.Start+ dom0 name some-vm\0')
  849. def test_013_run_service_default_target(self):
  850. with self.assertRaises(ValueError):
  851. self.app.run_service('', 'service.name')
  852. class TC_30_QubesRemote(unittest.TestCase):
  853. def setUp(self):
  854. super(TC_30_QubesRemote, self).setUp()
  855. self.proc_mock = mock.Mock()
  856. self.proc_mock.configure_mock(**{
  857. 'return_value.returncode': 0
  858. })
  859. self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
  860. self.proc_patch.start()
  861. self.app = qubesadmin.app.QubesRemote()
  862. def set_proc_stdout(self, send_data):
  863. self.proc_mock.configure_mock(**{
  864. 'return_value.communicate.return_value': (send_data, None)
  865. })
  866. def tearDown(self):
  867. self.proc_patch.stop()
  868. super(TC_30_QubesRemote, self).tearDown()
  869. def test_000_qubesd_call(self):
  870. self.set_proc_stdout(b'0\0')
  871. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  872. self.assertEqual(self.proc_mock.mock_calls, [
  873. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  874. 'some.method+arg1'],
  875. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  876. stderr=subprocess.PIPE),
  877. mock.call().communicate(b'payload')
  878. ])
  879. def test_001_qubesd_call_none_arg(self):
  880. self.set_proc_stdout(b'0\0')
  881. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  882. self.assertEqual(self.proc_mock.mock_calls, [
  883. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  884. 'some.method'],
  885. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  886. stderr=subprocess.PIPE),
  887. mock.call().communicate(b'payload')
  888. ])
  889. def test_002_qubesd_call_none_payload(self):
  890. self.set_proc_stdout(b'0\0')
  891. self.app.qubesd_call('test-vm', 'some.method', None, None)
  892. self.assertEqual(self.proc_mock.mock_calls, [
  893. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  894. 'some.method'],
  895. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  896. stderr=subprocess.PIPE),
  897. mock.call().communicate(None)
  898. ])
  899. def test_003_qubesd_call_payload_stream(self):
  900. self.set_proc_stdout(b'0\0return-value')
  901. tmpdir = tempfile.mkdtemp()
  902. self.addCleanup(shutil.rmtree, tmpdir)
  903. payload_input = os.path.join(tmpdir, 'payload-input')
  904. with open(payload_input, 'w+b') as payload_file:
  905. payload_file.write(b'some payload\n')
  906. payload_file.seek(0)
  907. value = self.app.qubesd_call('test-vm', 'some.method',
  908. 'some-arg', payload_stream=payload_file)
  909. self.assertListEqual(self.proc_mock.mock_calls, [
  910. mock.call(
  911. [qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  912. 'some.method+some-arg'],
  913. stdin=payload_file,
  914. stdout=subprocess.PIPE,
  915. stderr=subprocess.PIPE),
  916. mock.call().communicate()
  917. ])
  918. self.assertEqual(value, b'return-value')
  919. def test_004_qubesd_call_payload_stream_with_prefix(self):
  920. self.set_proc_stdout(b'0\0return-value')
  921. tmpdir = tempfile.mkdtemp()
  922. self.addCleanup(shutil.rmtree, tmpdir)
  923. payload_input = os.path.join(tmpdir, 'payload-input')
  924. with open(payload_input, 'w+b') as payload_file:
  925. payload_file.write(b'some payload\n')
  926. payload_file.seek(0)
  927. value = self.app.qubesd_call('test-vm', 'some.method',
  928. 'some-arg', payload=b'first line\n', payload_stream=payload_file)
  929. self.assertListEqual(self.proc_mock.mock_calls, [
  930. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  931. 'some.method+some-arg'],
  932. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  933. stderr=subprocess.PIPE),
  934. mock.call().stdin.write(b'first line\n'),
  935. mock.call().stdin.write(b'some payload\n'),
  936. mock.call().communicate()
  937. ])
  938. self.assertEqual(value, b'return-value')
  939. @mock.patch('os.isatty', lambda fd: fd == 2)
  940. def test_010_run_service(self):
  941. self.app.run_service('some-vm', 'service.name')
  942. self.proc_mock.assert_called_once_with([
  943. qubesadmin.config.QREXEC_CLIENT_VM,
  944. '-T', 'some-vm', 'service.name'],
  945. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  946. stderr=subprocess.PIPE)
  947. @mock.patch('os.isatty', lambda fd: fd == 2)
  948. def test_011_run_service_filter_esc(self):
  949. self.app.run_service('some-vm', 'service.name', filter_esc=True)
  950. self.proc_mock.assert_called_once_with([
  951. qubesadmin.config.QREXEC_CLIENT_VM,
  952. '-t', '-T', 'some-vm', 'service.name'],
  953. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  954. stderr=subprocess.PIPE)
  955. @mock.patch('os.isatty', lambda fd: fd == 2)
  956. def test_012_run_service_user(self):
  957. with self.assertRaises(ValueError):
  958. p = self.app.run_service('some-vm', 'service.name', user='user')
  959. @mock.patch('os.isatty', lambda fd: fd == 2)
  960. def test_013_run_service_default_target(self):
  961. self.app.run_service('', 'service.name')
  962. self.proc_mock.assert_called_once_with([
  963. qubesadmin.config.QREXEC_CLIENT_VM,
  964. '-T', '', 'service.name'],
  965. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  966. stderr=subprocess.PIPE)
  967. @mock.patch('os.isatty', lambda fd: fd == 2)
  968. def test_014_run_service_no_autostart1(self):
  969. self.set_proc_stdout( b'0\x00power_state=Running')
  970. self.app.run_service('some-vm', 'service.name', autostart=False)
  971. self.proc_mock.assert_has_calls([
  972. call([qubesadmin.config.QREXEC_CLIENT_VM,
  973. 'some-vm', 'admin.vm.CurrentState'],
  974. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  975. stderr=subprocess.PIPE),
  976. call().communicate(None),
  977. call([qubesadmin.config.QREXEC_CLIENT_VM,
  978. '-T', 'some-vm', 'service.name'],
  979. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  980. stderr=subprocess.PIPE),
  981. ])
  982. @mock.patch('os.isatty', lambda fd: fd == 2)
  983. def test_015_run_service_no_autostart2(self):
  984. self.set_proc_stdout( b'0\x00power_state=Halted')
  985. with self.assertRaises(qubesadmin.exc.QubesVMNotRunningError):
  986. self.app.run_service('some-vm', 'service.name', autostart=False)
  987. self.proc_mock.assert_called_once_with([
  988. qubesadmin.config.QREXEC_CLIENT_VM,
  989. 'some-vm', 'admin.vm.CurrentState'],
  990. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  991. stderr=subprocess.PIPE)