app.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. # -*- encoding: utf8 -*-
  2. #
  3. # The Qubes OS Project, http://www.qubes-os.org
  4. #
  5. # Copyright (C) 2017 Marek Marczykowski-Górecki
  6. # <marmarek@invisiblethingslab.com>
  7. #
  8. # This program is free software; you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation; either version 2 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License along
  19. # with this program; if not, see <http://www.gnu.org/licenses/>.
  20. import os
  21. import shutil
  22. import socket
  23. import subprocess
  24. import unittest
  25. import multiprocessing
  26. try:
  27. import unittest.mock as mock
  28. except ImportError:
  29. import mock
  30. import tempfile
  31. import qubesadmin.exc
  32. import qubesadmin.tests
  33. class TC_00_VMCollection(qubesadmin.tests.QubesTestCase):
  34. def test_000_list(self):
  35. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  36. b'0\x00test-vm class=AppVM state=Running\n'
  37. self.assertEqual(
  38. list(self.app.domains.keys()),
  39. ['test-vm'])
  40. self.assertAllCalled()
  41. def test_001_getitem(self):
  42. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  43. b'0\x00test-vm class=AppVM state=Running\n'
  44. try:
  45. vm = self.app.domains['test-vm']
  46. self.assertEqual(vm.name, 'test-vm')
  47. except KeyError:
  48. self.fail('VM not found in collection')
  49. self.assertAllCalled()
  50. with self.assertRaises(KeyError):
  51. vm = self.app.domains['test-non-existent']
  52. self.assertAllCalled()
  53. def test_002_in(self):
  54. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  55. b'0\x00test-vm class=AppVM state=Running\n'
  56. self.assertIn('test-vm', self.app.domains)
  57. self.assertAllCalled()
  58. self.assertNotIn('test-non-existent', self.app.domains)
  59. self.assertAllCalled()
  60. def test_003_iter(self):
  61. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  62. b'0\x00test-vm class=AppVM state=Running\n'
  63. self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
  64. self.assertAllCalled()
  65. def test_004_delitem(self):
  66. self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
  67. b'0\x00'
  68. del self.app.domains['test-vm']
  69. self.assertAllCalled()
  70. def test_005_keys(self):
  71. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  72. b'0\x00test-vm class=AppVM state=Running\n' \
  73. b'test-vm2 class=AppVM state=Running\n'
  74. self.assertEqual(set(self.app.domains.keys()),
  75. set(['test-vm', 'test-vm2']))
  76. self.assertAllCalled()
  77. def test_006_values(self):
  78. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  79. b'0\x00test-vm class=AppVM state=Running\n' \
  80. b'test-vm2 class=AppVM state=Running\n'
  81. values = self.app.domains.values()
  82. for obj in values:
  83. self.assertIsInstance(obj, qubesadmin.vm.QubesVM)
  84. self.assertEqual(set([vm.name for vm in values]),
  85. set(['test-vm', 'test-vm2']))
  86. self.assertAllCalled()
  87. def test_007_getitem_blind_mode(self):
  88. self.app.blind_mode = True
  89. try:
  90. vm = self.app.domains['test-vm']
  91. self.assertEqual(vm.name, 'test-vm')
  92. except KeyError:
  93. self.fail('VM not found in collection')
  94. self.assertAllCalled()
  95. with self.assertNotRaises(KeyError):
  96. vm = self.app.domains['test-non-existent']
  97. self.assertAllCalled()
  98. def test_008_in_blind_mode(self):
  99. self.app.blind_mode = True
  100. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  101. b'0\x00test-vm class=AppVM state=Running\n'
  102. self.assertIn('test-vm', self.app.domains)
  103. self.assertAllCalled()
  104. self.assertNotIn('test-non-existent', self.app.domains)
  105. self.assertAllCalled()
  106. def test_009_getitem_cache_class(self):
  107. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  108. b'0\x00test-vm class=AppVM state=Running\n'
  109. try:
  110. vm = self.app.domains['test-vm']
  111. self.assertEqual(vm.name, 'test-vm')
  112. self.assertEqual(vm.klass, 'AppVM')
  113. except KeyError:
  114. self.fail('VM not found in collection')
  115. self.assertAllCalled()
  116. class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
  117. def setUp(self):
  118. super(TC_10_QubesBase, self).setUp()
  119. self.check_output_patch = mock.patch(
  120. 'subprocess.check_output')
  121. self.check_output_mock = self.check_output_patch.start()
  122. def tearDown(self):
  123. self.check_output_patch.stop()
  124. super(TC_10_QubesBase, self).tearDown()
  125. def test_010_new_simple(self):
  126. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM', None,
  127. b'name=new-vm label=red')] = b'0\x00'
  128. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  129. b'0\x00new-vm class=AppVM state=Running\n'
  130. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red')
  131. self.assertEqual(vm.name, 'new-vm')
  132. self.assertEqual(vm.klass, 'AppVM')
  133. self.assertAllCalled()
  134. def test_011_new_template(self):
  135. self.app.expected_calls[('dom0', 'admin.vm.Create.TemplateVM', None,
  136. b'name=new-template label=red')] = b'0\x00'
  137. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  138. b'0\x00new-template class=TemplateVM state=Running\n'
  139. vm = self.app.add_new_vm('TemplateVM', 'new-template', 'red')
  140. self.assertEqual(vm.name, 'new-template')
  141. self.assertEqual(vm.klass, 'TemplateVM')
  142. self.assertAllCalled()
  143. def test_012_new_template_based(self):
  144. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  145. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  146. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  147. b'0\x00new-vm class=AppVM state=Running\n'
  148. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', 'some-template')
  149. self.assertEqual(vm.name, 'new-vm')
  150. self.assertEqual(vm.klass, 'AppVM')
  151. self.assertAllCalled()
  152. def test_013_new_objects_params(self):
  153. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  154. 'some-template', b'name=new-vm label=red')] = b'0\x00'
  155. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  156. b'0\x00red\nblue\n'
  157. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  158. b'0\x00new-vm class=AppVM state=Running\n' \
  159. b'some-template class=TemplateVM state=Running\n'
  160. vm = self.app.add_new_vm(self.app.get_vm_class('AppVM'), 'new-vm',
  161. self.app.get_label('red'), self.app.domains['some-template'])
  162. self.assertEqual(vm.name, 'new-vm')
  163. self.assertEqual(vm.klass, 'AppVM')
  164. self.assertAllCalled()
  165. def test_014_new_pool(self):
  166. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  167. b'name=new-vm label=red pool=some-pool')] = b'0\x00'
  168. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  169. b'0\x00new-vm class=AppVM state=Running\n'
  170. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red', pool='some-pool')
  171. self.assertEqual(vm.name, 'new-vm')
  172. self.assertEqual(vm.klass, 'AppVM')
  173. self.assertAllCalled()
  174. def test_015_new_pools(self):
  175. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM', None,
  176. b'name=new-vm label=red pool:private=some-pool '
  177. b'pool:volatile=other-pool')] = b'0\x00'
  178. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  179. b'0\x00new-vm class=AppVM state=Running\n'
  180. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  181. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  182. self.assertEqual(vm.name, 'new-vm')
  183. self.assertEqual(vm.klass, 'AppVM')
  184. self.assertAllCalled()
  185. def test_016_new_template_based_default(self):
  186. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  187. None, b'name=new-vm label=red')] = b'0\x00'
  188. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  189. b'0\x00new-vm class=AppVM state=Running\n'
  190. vm = self.app.add_new_vm('AppVM', 'new-vm', 'red',
  191. template=qubesadmin.DEFAULT)
  192. self.assertEqual(vm.name, 'new-vm')
  193. self.assertEqual(vm.klass, 'AppVM')
  194. self.assertAllCalled()
  195. def test_020_get_label(self):
  196. self.app.expected_calls[('dom0', 'admin.label.List', None, None)] = \
  197. b'0\x00red\nblue\n'
  198. label = self.app.get_label('red')
  199. self.assertEqual(label.name, 'red')
  200. self.assertAllCalled()
  201. def clone_setup_common_calls(self, src, dst):
  202. # have each property type with default=no, each special-cased,
  203. # and some with default=yes
  204. properties = {
  205. 'label': 'default=False type=label red',
  206. 'template': 'default=False type=vm test-template',
  207. 'memory': 'default=False type=int 400',
  208. 'kernel': 'default=False type=str 4.9.31',
  209. 'netvm': 'default=False type=vm test-net',
  210. 'virt_mode': 'default=False type=str hvm',
  211. 'default_user': 'default=True type=str user',
  212. }
  213. self.app.expected_calls[
  214. (src, 'admin.vm.property.List', None, None)] = \
  215. b'0\0qid\nname\n' + \
  216. b'\n'.join(prop.encode() for prop in properties.keys()) + \
  217. b'\n'
  218. for prop, value in properties.items():
  219. self.app.expected_calls[
  220. (src, 'admin.vm.property.Get', prop, None)] = \
  221. b'0\0' + value.encode()
  222. # special cases handled by admin.vm.Create call
  223. if prop in ('label', 'template'):
  224. continue
  225. # default properties should not be set
  226. if 'default=True' in value:
  227. continue
  228. self.app.expected_calls[
  229. (dst, 'admin.vm.property.Set', prop,
  230. value.split()[-1].encode())] = b'0\0'
  231. # tags
  232. self.app.expected_calls[
  233. (src, 'admin.vm.tag.List', None, None)] = \
  234. b'0\0tag1\ntag2\n'
  235. self.app.expected_calls[
  236. (dst, 'admin.vm.tag.Set', 'tag1', None)] = b'0\0'
  237. self.app.expected_calls[
  238. (dst, 'admin.vm.tag.Set', 'tag2', None)] = b'0\0'
  239. # features
  240. self.app.expected_calls[
  241. (src, 'admin.vm.feature.List', None, None)] = \
  242. b'0\0feat1\nfeat2\n'
  243. self.app.expected_calls[
  244. (src, 'admin.vm.feature.Get', 'feat1', None)] = \
  245. b'0\0feat1-value with spaces'
  246. self.app.expected_calls[
  247. (src, 'admin.vm.feature.Get', 'feat2', None)] = \
  248. b'0\x001'
  249. self.app.expected_calls[
  250. (dst, 'admin.vm.feature.Set', 'feat1',
  251. b'feat1-value with spaces')] = b'0\0'
  252. self.app.expected_calls[
  253. (dst, 'admin.vm.feature.Set', 'feat2', b'1')] = b'0\0'
  254. # firewall
  255. rules = (
  256. b'action=drop dst4=192.168.0.0/24\n'
  257. b'action=accept\n'
  258. )
  259. self.app.expected_calls[
  260. (src, 'admin.vm.firewall.Get', None, None)] = \
  261. b'0\x00' + rules
  262. self.app.expected_calls[
  263. (dst, 'admin.vm.firewall.Set', None, rules)] = \
  264. b'0\x00'
  265. # storage
  266. self.app.expected_calls[
  267. (dst, 'admin.vm.volume.List', None, None)] = \
  268. b'0\x00root\nprivate\nvolatile\nkernel\n'
  269. self.app.expected_calls[
  270. (dst, 'admin.vm.volume.Info', 'root', None)] = \
  271. b'0\x00pool=lvm\n' \
  272. b'vid=vm-test-vm/root\n' \
  273. b'size=10737418240\n' \
  274. b'usage=2147483648\n' \
  275. b'rw=False\n' \
  276. b'internal=True\n' \
  277. b'source=vm-test-template/root\n' \
  278. b'save_on_stop=False\n' \
  279. b'snap_on_start=True\n'
  280. self.app.expected_calls[
  281. (dst, 'admin.vm.volume.Info', 'private', None)] = \
  282. b'0\x00pool=lvm\n' \
  283. b'vid=vm-test-vm/private\n' \
  284. b'size=2147483648\n' \
  285. b'usage=214748364\n' \
  286. b'rw=True\n' \
  287. b'internal=True\n' \
  288. b'save_on_stop=True\n' \
  289. b'snap_on_start=False\n'
  290. self.app.expected_calls[
  291. (dst, 'admin.vm.volume.Info', 'volatile', None)] = \
  292. b'0\x00pool=lvm\n' \
  293. b'vid=vm-test-vm/volatile\n' \
  294. b'size=10737418240\n' \
  295. b'usage=0\n' \
  296. b'rw=True\n' \
  297. b'internal=True\n' \
  298. b'source=None\n' \
  299. b'save_on_stop=False\n' \
  300. b'snap_on_start=False\n'
  301. self.app.expected_calls[
  302. (dst, 'admin.vm.volume.Info', 'kernel', None)] = \
  303. b'0\x00pool=linux-kernel\n' \
  304. b'vid=\n' \
  305. b'size=0\n' \
  306. b'usage=0\n' \
  307. b'rw=False\n' \
  308. b'internal=True\n' \
  309. b'source=None\n' \
  310. b'save_on_stop=False\n' \
  311. b'snap_on_start=False\n'
  312. self.app.expected_calls[
  313. (src, 'admin.vm.volume.List', None, None)] = \
  314. b'0\x00root\nprivate\nvolatile\nkernel\n'
  315. self.app.expected_calls[
  316. (src, 'admin.vm.volume.CloneFrom', 'private', None)] = \
  317. b'0\x00token-private'
  318. self.app.expected_calls[
  319. (dst, 'admin.vm.volume.CloneTo', 'private', b'token-private')] = \
  320. b'0\x00'
  321. def test_030_clone(self):
  322. self.clone_setup_common_calls('test-vm', 'new-name')
  323. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  324. b'0\x00new-name class=AppVM state=Halted\n' \
  325. b'test-vm class=AppVM state=Halted\n' \
  326. b'test-template class=TemplateVM state=Halted\n' \
  327. b'test-net class=AppVM state=Halted\n'
  328. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  329. 'test-template', b'name=new-name label=red')] = b'0\x00'
  330. new_vm = self.app.clone_vm('test-vm', 'new-name')
  331. self.assertEqual(new_vm.name, 'new-name')
  332. self.check_output_mock.assert_called_once_with(
  333. ['qvm-appmenus', '--init', '--update',
  334. '--source', 'test-vm', 'new-name'],
  335. stderr=subprocess.STDOUT
  336. )
  337. self.assertAllCalled()
  338. def test_031_clone_object(self):
  339. self.clone_setup_common_calls('test-vm', 'new-name')
  340. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  341. b'0\x00new-name class=AppVM state=Halted\n' \
  342. b'test-vm class=AppVM state=Halted\n' \
  343. b'test-template class=TemplateVM state=Halted\n' \
  344. b'test-net class=AppVM state=Halted\n'
  345. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  346. 'test-template', b'name=new-name label=red')] = b'0\x00'
  347. new_vm = self.app.clone_vm(self.app.domains['test-vm'], 'new-name')
  348. self.assertEqual(new_vm.name, 'new-name')
  349. self.assertAllCalled()
  350. def test_032_clone_pool(self):
  351. self.clone_setup_common_calls('test-vm', 'new-name')
  352. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  353. 'test-template',
  354. b'name=new-name label=red pool=some-pool')] = b'0\x00'
  355. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  356. b'0\x00new-name class=AppVM state=Halted\n' \
  357. b'test-vm class=AppVM state=Halted\n' \
  358. b'test-template class=TemplateVM state=Halted\n' \
  359. b'test-net class=AppVM state=Halted\n'
  360. new_vm = self.app.clone_vm('test-vm', 'new-name', pool='some-pool')
  361. self.assertEqual(new_vm.name, 'new-name')
  362. self.assertAllCalled()
  363. def test_033_clone_pools(self):
  364. self.clone_setup_common_calls('test-vm', 'new-name')
  365. self.app.expected_calls[('dom0', 'admin.vm.CreateInPool.AppVM',
  366. 'test-template',
  367. b'name=new-name label=red pool:private=some-pool '
  368. b'pool:volatile=other-pool')] = b'0\x00'
  369. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  370. b'0\x00new-name class=AppVM state=Halted\n' \
  371. b'test-vm class=AppVM state=Halted\n' \
  372. b'test-template class=TemplateVM state=Halted\n' \
  373. b'test-net class=AppVM state=Halted\n'
  374. new_vm = self.app.clone_vm('test-vm', 'new-name',
  375. pools={'private': 'some-pool', 'volatile': 'other-pool'})
  376. self.assertEqual(new_vm.name, 'new-name')
  377. self.assertAllCalled()
  378. def test_034_clone_class_change(self):
  379. self.clone_setup_common_calls('test-vm', 'new-name')
  380. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  381. b'0\x00new-name class=StandaloneVM state=Halted\n' \
  382. b'test-vm class=AppVM state=Halted\n' \
  383. b'test-template class=TemplateVM state=Halted\n' \
  384. b'test-net class=AppVM state=Halted\n'
  385. self.app.expected_calls[('dom0', 'admin.vm.Create.StandaloneVM',
  386. 'test-template', b'name=new-name label=red')] = b'0\x00'
  387. self.app.expected_calls[
  388. ('new-name', 'admin.vm.volume.Info', 'root', None)] = \
  389. b'0\x00pool=lvm\n' \
  390. b'vid=vm-new-name/root\n' \
  391. b'size=10737418240\n' \
  392. b'usage=2147483648\n' \
  393. b'rw=True\n' \
  394. b'internal=True\n' \
  395. b'source=None\n' \
  396. b'save_on_stop=True\n' \
  397. b'snap_on_start=False\n'
  398. self.app.expected_calls[
  399. ('test-vm', 'admin.vm.volume.CloneFrom', 'root', None)] = \
  400. b'0\x00token-root'
  401. self.app.expected_calls[
  402. ('new-name', 'admin.vm.volume.CloneTo', 'root', b'token-root')] = \
  403. b'0\x00'
  404. new_vm = self.app.clone_vm('test-vm', 'new-name',
  405. new_cls='StandaloneVM')
  406. self.assertEqual(new_vm.name, 'new-name')
  407. self.assertEqual(new_vm.klass, 'StandaloneVM')
  408. self.assertAllCalled()
  409. def test_035_clone_fail(self):
  410. self.app.expected_calls[
  411. ('test-vm', 'admin.vm.property.List', None, None)] = \
  412. b'0\0qid\nname\ntemplate\nlabel\nmemory\n'
  413. self.app.expected_calls[
  414. ('test-vm', 'admin.vm.property.Get', 'label', None)] = \
  415. b'0\0default=False type=label red'
  416. self.app.expected_calls[
  417. ('test-vm', 'admin.vm.property.Get', 'template', None)] = \
  418. b'0\0default=False type=vm test-template'
  419. self.app.expected_calls[
  420. ('test-vm', 'admin.vm.property.Get', 'memory', None)] = \
  421. b'0\0default=False type=int 400'
  422. self.app.expected_calls[
  423. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  424. b'2\0QubesException\0\0something happened\0'
  425. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  426. b'0\x00new-name class=AppVM state=Halted\n' \
  427. b'test-vm class=AppVM state=Halted\n' \
  428. b'test-template class=TemplateVM state=Halted\n' \
  429. b'test-net class=AppVM state=Halted\n'
  430. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  431. 'test-template', b'name=new-name label=red')] = b'0\x00'
  432. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  433. b'0\x00'
  434. with self.assertRaises(qubesadmin.exc.QubesException):
  435. self.app.clone_vm('test-vm', 'new-name')
  436. self.assertAllCalled()
  437. def test_036_clone_ignore_errors_prop(self):
  438. self.clone_setup_common_calls('test-vm', 'new-name')
  439. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  440. b'0\x00new-name class=AppVM state=Halted\n' \
  441. b'test-vm class=AppVM state=Halted\n' \
  442. b'test-template class=TemplateVM state=Halted\n' \
  443. b'test-net class=AppVM state=Halted\n'
  444. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  445. 'test-template', b'name=new-name label=red')] = b'0\x00'
  446. self.app.expected_calls[
  447. ('new-name', 'admin.vm.property.Set', 'memory', b'400')] = \
  448. b'2\0QubesException\0\0something happened\0'
  449. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  450. self.assertEqual(new_vm.name, 'new-name')
  451. self.assertAllCalled()
  452. def test_037_clone_ignore_errors_feature(self):
  453. self.clone_setup_common_calls('test-vm', 'new-name')
  454. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  455. b'0\x00new-name class=AppVM state=Halted\n' \
  456. b'test-vm class=AppVM state=Halted\n' \
  457. b'test-template class=TemplateVM state=Halted\n' \
  458. b'test-net class=AppVM state=Halted\n'
  459. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  460. 'test-template', b'name=new-name label=red')] = b'0\x00'
  461. self.app.expected_calls[
  462. ('new-name', 'admin.vm.feature.Set', 'feat2', b'1')] = \
  463. b'2\0QubesException\0\0something happened\0'
  464. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  465. self.assertEqual(new_vm.name, 'new-name')
  466. self.assertAllCalled()
  467. def test_038_clone_ignore_errors_tag(self):
  468. self.clone_setup_common_calls('test-vm', 'new-name')
  469. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  470. b'0\x00new-name class=AppVM state=Halted\n' \
  471. b'test-vm class=AppVM state=Halted\n' \
  472. b'test-template class=TemplateVM state=Halted\n' \
  473. b'test-net class=AppVM state=Halted\n'
  474. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  475. 'test-template', b'name=new-name label=red')] = b'0\x00'
  476. self.app.expected_calls[
  477. ('new-name', 'admin.vm.tag.Set', 'tag1', None)] = \
  478. b'2\0QubesException\0\0something happened\0'
  479. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  480. self.assertEqual(new_vm.name, 'new-name')
  481. self.assertAllCalled()
  482. def test_039_clone_ignore_errors_firewall(self):
  483. self.clone_setup_common_calls('test-vm', 'new-name')
  484. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  485. b'0\x00new-name class=AppVM state=Halted\n' \
  486. b'test-vm class=AppVM state=Halted\n' \
  487. b'test-template class=TemplateVM state=Halted\n' \
  488. b'test-net class=AppVM state=Halted\n'
  489. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  490. 'test-template', b'name=new-name label=red')] = b'0\x00'
  491. self.app.expected_calls[
  492. ('new-name', 'admin.vm.firewall.Set', None,
  493. b'action=drop dst4=192.168.0.0/24\naction=accept\n')] = \
  494. b'2\0QubesException\0\0something happened\0'
  495. new_vm = self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  496. self.assertEqual(new_vm.name, 'new-name')
  497. self.assertAllCalled()
  498. def test_040_clone_ignore_errors_storage(self):
  499. self.clone_setup_common_calls('test-vm', 'new-name')
  500. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  501. b'0\x00new-name class=AppVM state=Halted\n' \
  502. b'test-vm class=AppVM state=Halted\n' \
  503. b'test-template class=TemplateVM state=Halted\n' \
  504. b'test-net class=AppVM state=Halted\n'
  505. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  506. 'test-template', b'name=new-name label=red')] = b'0\x00'
  507. self.app.expected_calls[
  508. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  509. b'token-private')] = \
  510. b'2\0QubesException\0\0something happened\0'
  511. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  512. b'0\x00'
  513. del self.app.expected_calls[
  514. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  515. del self.app.expected_calls[
  516. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  517. with self.assertRaises(qubesadmin.exc.QubesException):
  518. self.app.clone_vm('test-vm', 'new-name', ignore_errors=True)
  519. self.assertAllCalled()
  520. def test_041_clone_fail_storage(self):
  521. self.clone_setup_common_calls('test-vm', 'new-name')
  522. self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
  523. b'0\x00new-name class=AppVM state=Halted\n' \
  524. b'test-vm class=AppVM state=Halted\n' \
  525. b'test-template class=TemplateVM state=Halted\n' \
  526. b'test-net class=AppVM state=Halted\n'
  527. self.app.expected_calls[('dom0', 'admin.vm.Create.AppVM',
  528. 'test-template', b'name=new-name label=red')] = b'0\x00'
  529. self.app.expected_calls[
  530. ('new-name', 'admin.vm.volume.CloneTo', 'private',
  531. b'token-private')] = \
  532. b'2\0QubesException\0\0something happened\0'
  533. self.app.expected_calls[('new-name', 'admin.vm.Remove', None, None)] = \
  534. b'0\x00'
  535. del self.app.expected_calls[
  536. ('new-name', 'admin.vm.volume.Info', 'root', None)]
  537. del self.app.expected_calls[
  538. ('new-name', 'admin.vm.volume.Info', 'volatile', None)]
  539. with self.assertRaises(qubesadmin.exc.QubesException):
  540. self.app.clone_vm('test-vm', 'new-name')
  541. self.assertAllCalled()
  542. class TC_20_QubesLocal(unittest.TestCase):
  543. def setUp(self):
  544. super(TC_20_QubesLocal, self).setUp()
  545. self.socket_dir = tempfile.mkdtemp()
  546. self.orig_sock = qubesadmin.config.QUBESD_SOCKET
  547. qubesadmin.config.QUBESD_SOCKET = os.path.join(self.socket_dir, 'sock')
  548. self.proc = None
  549. self.app = qubesadmin.app.QubesLocal()
  550. def listen_and_send(self, send_data):
  551. '''Listen on socket and send data in response.
  552. :param bytes send_data: data to send
  553. '''
  554. self.socket_pipe, child_pipe = multiprocessing.Pipe()
  555. self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  556. self.socket.bind(os.path.join(self.socket_dir, 'sock'))
  557. self.socket.listen(1)
  558. def worker(sock, pipe, send_data_):
  559. conn, addr = sock.accept()
  560. pipe.send(conn.makefile('rb').read())
  561. conn.sendall(send_data_)
  562. conn.close()
  563. self.proc = multiprocessing.Process(target=worker,
  564. args=(self.socket, child_pipe, send_data))
  565. self.proc.start()
  566. self.socket.close()
  567. def get_request(self):
  568. '''Get request sent to "qubesd" mock'''
  569. return self.socket_pipe.recv()
  570. def tearDown(self):
  571. qubesadmin.config.QUBESD_SOCKET = self.orig_sock
  572. if self.proc is not None:
  573. try:
  574. self.proc.terminate()
  575. except OSError:
  576. pass
  577. shutil.rmtree(self.socket_dir)
  578. super(TC_20_QubesLocal, self).tearDown()
  579. def test_000_qubesd_call(self):
  580. self.listen_and_send(b'0\0')
  581. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  582. self.assertEqual(self.get_request(),
  583. b'dom0\0some.method\0test-vm\0arg1\0payload')
  584. def test_001_qubesd_call_none_arg(self):
  585. self.listen_and_send(b'0\0')
  586. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  587. self.assertEqual(self.get_request(),
  588. b'dom0\0some.method\0test-vm\0\0payload')
  589. def test_002_qubesd_call_none_payload(self):
  590. self.listen_and_send(b'0\0')
  591. self.app.qubesd_call('test-vm', 'some.method', None, None)
  592. self.assertEqual(self.get_request(),
  593. b'dom0\0some.method\0test-vm\0\0')
  594. def test_003_qubesd_call_payload_stream(self):
  595. # this should really be in setUp()...
  596. tmpdir = tempfile.mkdtemp()
  597. self.addCleanup(shutil.rmtree, tmpdir)
  598. service_path = os.path.join(tmpdir, 'test.service')
  599. payload_input = os.path.join(tmpdir, 'payload-input')
  600. with open(service_path, 'w') as f:
  601. f.write('#!/bin/bash\n'
  602. 'env > {dir}/env\n'
  603. 'echo "$@" > {dir}/args\n'
  604. 'cat > {dir}/payload\n'
  605. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  606. os.chmod(service_path, 0o755)
  607. with open(payload_input, 'w+') as payload_file:
  608. payload_file.write('some payload\n')
  609. payload_file.seek(0)
  610. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  611. tmpdir):
  612. value = self.app.qubesd_call('test-vm', 'test.service',
  613. 'some-arg', payload_stream=payload_file)
  614. self.assertEqual(value, b'return-value')
  615. self.assertTrue(os.path.exists(tmpdir + '/env'))
  616. with open(tmpdir + '/env') as env:
  617. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  618. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  619. self.assertTrue(os.path.exists(tmpdir + '/args'))
  620. with open(tmpdir + '/args') as args:
  621. self.assertEqual(args.read(), 'some-arg\n')
  622. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  623. with open(tmpdir + '/payload') as payload:
  624. self.assertEqual(payload.read(), 'some payload\n')
  625. def test_004_qubesd_call_payload_stream_proc(self):
  626. # this should really be in setUp()...
  627. tmpdir = tempfile.mkdtemp()
  628. self.addCleanup(shutil.rmtree, tmpdir)
  629. service_path = os.path.join(tmpdir, 'test.service')
  630. echo = subprocess.Popen(['echo', 'some payload'],
  631. stdout=subprocess.PIPE)
  632. with open(service_path, 'w') as f:
  633. f.write('#!/bin/bash\n'
  634. 'env > {dir}/env\n'
  635. 'echo "$@" > {dir}/args\n'
  636. 'cat > {dir}/payload\n'
  637. 'echo -en \'0\\0return-value\'\n'.format(dir=tmpdir))
  638. os.chmod(service_path, 0o755)
  639. with mock.patch('qubesadmin.config.QREXEC_SERVICES_DIR',
  640. tmpdir):
  641. value = self.app.qubesd_call('test-vm', 'test.service',
  642. 'some-arg', payload_stream=echo.stdout)
  643. echo.stdout.close()
  644. self.assertEqual(value, b'return-value')
  645. self.assertTrue(os.path.exists(tmpdir + '/env'))
  646. with open(tmpdir + '/env') as env:
  647. self.assertIn('QREXEC_REMOTE_DOMAIN=dom0\n', env)
  648. self.assertIn('QREXEC_REQUESTED_TARGET=test-vm\n', env)
  649. self.assertTrue(os.path.exists(tmpdir + '/args'))
  650. with open(tmpdir + '/args') as args:
  651. self.assertEqual(args.read(), 'some-arg\n')
  652. self.assertTrue(os.path.exists(tmpdir + '/payload'))
  653. with open(tmpdir + '/payload') as payload:
  654. self.assertEqual(payload.read(), 'some payload\n')
  655. @mock.patch('os.isatty', lambda fd: fd == 2)
  656. def test_010_run_service(self):
  657. self.listen_and_send(b'0\0')
  658. with mock.patch('subprocess.Popen') as mock_proc:
  659. p = self.app.run_service('some-vm', 'service.name')
  660. mock_proc.assert_called_once_with([
  661. qubesadmin.config.QREXEC_CLIENT,
  662. '-d', 'some-vm', '-T', 'DEFAULT:QUBESRPC service.name dom0'],
  663. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  664. stderr=subprocess.PIPE)
  665. self.assertEqual(self.get_request(),
  666. b'dom0\0admin.vm.Start\0some-vm\0\0')
  667. def test_011_run_service_filter_esc(self):
  668. self.listen_and_send(b'0\0')
  669. with mock.patch('subprocess.Popen') as mock_proc:
  670. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  671. mock_proc.assert_called_once_with([
  672. qubesadmin.config.QREXEC_CLIENT,
  673. '-d', 'some-vm', '-t', '-T',
  674. 'DEFAULT:QUBESRPC service.name dom0'],
  675. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  676. stderr=subprocess.PIPE)
  677. self.assertEqual(self.get_request(),
  678. b'dom0\0admin.vm.Start\0some-vm\0\0')
  679. @mock.patch('os.isatty', lambda fd: fd == 2)
  680. def test_012_run_service_user(self):
  681. self.listen_and_send(b'0\0')
  682. with mock.patch('subprocess.Popen') as mock_proc:
  683. p = self.app.run_service('some-vm', 'service.name', user='user')
  684. mock_proc.assert_called_once_with([
  685. qubesadmin.config.QREXEC_CLIENT,
  686. '-d', 'some-vm', '-T',
  687. 'user:QUBESRPC service.name dom0'],
  688. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  689. stderr=subprocess.PIPE)
  690. self.assertEqual(self.get_request(),
  691. b'dom0\0admin.vm.Start\0some-vm\0\0')
  692. def test_013_run_service_default_target(self):
  693. with self.assertRaises(ValueError):
  694. self.app.run_service('', 'service.name')
  695. class TC_30_QubesRemote(unittest.TestCase):
  696. def setUp(self):
  697. super(TC_30_QubesRemote, self).setUp()
  698. self.proc_mock = mock.Mock()
  699. self.proc_mock.configure_mock(**{
  700. 'return_value.returncode': 0
  701. })
  702. self.proc_patch = mock.patch('subprocess.Popen', self.proc_mock)
  703. self.proc_patch.start()
  704. self.app = qubesadmin.app.QubesRemote()
  705. def set_proc_stdout(self, send_data):
  706. self.proc_mock.configure_mock(**{
  707. 'return_value.communicate.return_value': (send_data, None)
  708. })
  709. def tearDown(self):
  710. self.proc_patch.stop()
  711. super(TC_30_QubesRemote, self).tearDown()
  712. def test_000_qubesd_call(self):
  713. self.set_proc_stdout(b'0\0')
  714. self.app.qubesd_call('test-vm', 'some.method', 'arg1', b'payload')
  715. self.assertEqual(self.proc_mock.mock_calls, [
  716. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  717. 'some.method+arg1'],
  718. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  719. stderr=subprocess.PIPE),
  720. mock.call().communicate(b'payload')
  721. ])
  722. def test_001_qubesd_call_none_arg(self):
  723. self.set_proc_stdout(b'0\0')
  724. self.app.qubesd_call('test-vm', 'some.method', None, b'payload')
  725. self.assertEqual(self.proc_mock.mock_calls, [
  726. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  727. 'some.method'],
  728. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  729. stderr=subprocess.PIPE),
  730. mock.call().communicate(b'payload')
  731. ])
  732. def test_002_qubesd_call_none_payload(self):
  733. self.set_proc_stdout(b'0\0')
  734. self.app.qubesd_call('test-vm', 'some.method', None, None)
  735. self.assertEqual(self.proc_mock.mock_calls, [
  736. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  737. 'some.method'],
  738. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  739. stderr=subprocess.PIPE),
  740. mock.call().communicate(None)
  741. ])
  742. def test_003_qubesd_call_payload_stream(self):
  743. self.set_proc_stdout(b'0\0return-value')
  744. tmpdir = tempfile.mkdtemp()
  745. self.addCleanup(shutil.rmtree, tmpdir)
  746. payload_input = os.path.join(tmpdir, 'payload-input')
  747. with open(payload_input, 'w+') as payload_file:
  748. payload_file.write('some payload\n')
  749. payload_file.seek(0)
  750. value = self.app.qubesd_call('test-vm', 'some.method',
  751. 'some-arg', payload_stream=payload_file)
  752. self.assertEqual(self.proc_mock.mock_calls, [
  753. mock.call([qubesadmin.config.QREXEC_CLIENT_VM, 'test-vm',
  754. 'some.method+some-arg'],
  755. stdin=payload_file, stdout=subprocess.PIPE,
  756. stderr=subprocess.PIPE),
  757. mock.call().communicate(None)
  758. ])
  759. self.assertEqual(value, b'return-value')
  760. def test_010_run_service(self):
  761. self.app.run_service('some-vm', 'service.name')
  762. self.proc_mock.assert_called_once_with([
  763. qubesadmin.config.QREXEC_CLIENT_VM,
  764. 'some-vm', 'service.name'],
  765. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  766. stderr=subprocess.PIPE)
  767. def test_011_run_service_filter_esc(self):
  768. with self.assertRaises(NotImplementedError):
  769. p = self.app.run_service('some-vm', 'service.name', filter_esc=True)
  770. def test_012_run_service_user(self):
  771. with self.assertRaises(ValueError):
  772. p = self.app.run_service('some-vm', 'service.name', user='user')
  773. def test_013_run_service_default_target(self):
  774. self.app.run_service('', 'service.name')
  775. self.proc_mock.assert_called_once_with([
  776. qubesadmin.config.QREXEC_CLIENT_VM,
  777. '', 'service.name'],
  778. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  779. stderr=subprocess.PIPE)